Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

truncate without removing deletes or updates

+224 -41
+8 -6
ufos/src/consumer.rs
··· 19 19 const SEND_TIMEOUT_S: f64 = 15.; // if the channel is blocked longer than this, something is probably up 20 20 const BATCH_QUEUE_SIZE: usize = 1; // nearly-rendez-vous 21 21 22 + pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>; 23 + 22 24 #[derive(Debug, Default)] 23 25 struct CurrentBatch { 24 26 initial_cursor: Option<Cursor>, 25 - batch: EventBatch, 27 + batch: LimitedBatch, 26 28 } 27 29 28 30 #[derive(Debug)] 29 31 struct Batcher { 30 32 jetstream_receiver: JetstreamReceiver, 31 - batch_sender: Sender<EventBatch>, 33 + batch_sender: Sender<LimitedBatch>, 32 34 current_batch: CurrentBatch, 33 35 } 34 36 ··· 36 38 jetstream_endpoint: &str, 37 39 cursor: Option<Cursor>, 38 40 no_compress: bool, 39 - ) -> anyhow::Result<Receiver<EventBatch>> { 41 + ) -> anyhow::Result<Receiver<LimitedBatch>> { 40 42 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint); 41 43 if endpoint == jetstream_endpoint { 42 44 log::info!("connecting to jetstream at {endpoint}"); ··· 57 59 let jetstream_receiver = JetstreamConnector::new(config)? 58 60 .connect_cursor(cursor) 59 61 .await?; 60 - let (batch_sender, batch_reciever) = channel::<EventBatch>(BATCH_QUEUE_SIZE); 62 + let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE); 61 63 let mut batcher = Batcher::new(jetstream_receiver, batch_sender); 62 64 tokio::task::spawn(async move { batcher.run().await }); 63 65 Ok(batch_reciever) 64 66 } 65 67 66 68 impl Batcher { 67 - fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<EventBatch>) -> Self { 69 + fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<LimitedBatch>) -> Self { 68 70 Self { 69 71 jetstream_receiver, 70 72 batch_sender, ··· 135 137 .commits_by_nsid 136 138 .entry(nsid) 137 139 .or_default() 138 - .truncating_insert(commit, MAX_BATCHED_RECORDS); 140 + .truncating_insert(commit); 139 141 140 142 Ok(()) 141 143 }
+11
ufos/src/error.rs
··· 1 1 use crate::db_types::EncodingError; 2 + use crate::UFOsCommit; 2 3 use thiserror::Error; 3 4 4 5 #[derive(Debug, Error)] ··· 9 10 AccountEventMissingAccount, 10 11 #[error("Commit event missing commit info")] 11 12 CommitEventMissingCommit, 13 + } 14 + 15 + #[derive(Debug, Error)] 16 + pub enum BatchInsertError { 17 + #[error("Batch is full and no creates are left to be truncated")] 18 + BatchFull(UFOsCommit), 19 + #[error("Bug: tried to index beyond batch limit: {0}")] 20 + BatchOverflow(usize), 21 + #[error("Bug: non-terminating head advancement??")] 22 + BatchForever, 12 23 } 13 24 14 25 #[derive(Debug, Error)]
+187 -17
ufos/src/lib.rs
··· 6 6 pub mod storage_fjall; 7 7 pub mod store_types; 8 8 9 + use crate::error::BatchInsertError; 9 10 use cardinality_estimator::CardinalityEstimator; 10 11 use error::FirehoseEventError; 11 12 use jetstream::events::{CommitEvent, CommitOp, Cursor}; 12 13 use jetstream::exports::{Did, Nsid, RecordKey}; 13 14 use serde::Serialize; 14 15 use serde_json::value::RawValue; 15 - use std::collections::{HashMap, VecDeque}; 16 + use std::collections::HashMap; 16 17 17 18 #[derive(Debug, Default, Clone)] 18 - pub struct CollectionCommits { 19 + pub struct CollectionCommits<const LIMIT: usize> { 19 20 pub total_seen: usize, 20 21 pub dids_estimate: CardinalityEstimator<Did>, 21 - pub commits: VecDeque<UFOsCommit>, 22 + pub commits: Vec<UFOsCommit>, 23 + head: usize, 24 + non_creates: usize, 22 25 } 23 26 24 - impl CollectionCommits { 25 - pub fn truncating_insert(&mut self, commit: UFOsCommit, limit: usize) { 26 - if let CommitAction::Put(PutAction { 27 - is_update: false, .. 28 - }) = commit.action 29 - { 27 + impl<const LIMIT: usize> CollectionCommits<LIMIT> { 28 + fn advance_head(&mut self) { 29 + self.head += 1; 30 + if self.head > LIMIT { 31 + self.head = 0; 32 + } 33 + } 34 + pub fn truncating_insert(&mut self, commit: UFOsCommit) -> Result<(), BatchInsertError> { 35 + if self.non_creates == LIMIT { 36 + return Err(BatchInsertError::BatchFull(commit)); 37 + } 38 + let did = commit.did.clone(); 39 + let is_create = commit.action.is_create(); 40 + if self.commits.len() < LIMIT { 41 + self.commits.push(commit); 42 + if self.commits.capacity() > LIMIT { 43 + self.commits.shrink_to(LIMIT); // save mem?????? maybe?? 44 + } 45 + } else { 46 + let head_started_at = self.head; 47 + loop { 48 + let candidate = self 49 + .commits 50 + .get_mut(self.head) 51 + .ok_or(BatchInsertError::BatchOverflow(self.head))?; 52 + if candidate.action.is_create() { 53 + std::mem::replace(candidate, commit); 54 + break; 55 + } 56 + self.advance_head(); 57 + if self.head == head_started_at { 58 + return Err(BatchInsertError::BatchForever); 59 + } 60 + } 61 + } 62 + 63 + if is_create { 30 64 self.total_seen += 1; 31 - self.dids_estimate.insert(&commit.did); 65 + self.dids_estimate.insert(&did); 66 + } else { 67 + self.non_creates += 1; 32 68 } 33 - // TODO: oops we can't truncate *delete* commits!!!!!!! 34 - self.commits.truncate(limit - 1); 35 - self.commits.push_front(commit); 69 + 70 + Ok(()) 36 71 } 37 72 } 38 73 ··· 47 82 Put(PutAction), 48 83 Cut, 49 84 } 85 + impl CommitAction { 86 + pub fn is_create(&self) -> bool { 87 + match self { 88 + CommitAction::Put(PutAction { is_update, .. }) => !is_update, 89 + CommitAction::Cut => false, 90 + } 91 + } 92 + } 50 93 51 94 #[derive(Debug, Clone)] 52 95 pub struct PutAction { ··· 100 143 } 101 144 102 145 #[derive(Debug, Default, Clone)] 103 - pub struct EventBatch { 104 - pub commits_by_nsid: HashMap<Nsid, CollectionCommits>, 146 + pub struct EventBatch<const LIMIT: usize> { 147 + pub commits_by_nsid: HashMap<Nsid, CollectionCommits<LIMIT>>, 105 148 pub account_removes: Vec<DeleteAccount>, 106 149 } 107 150 108 - impl EventBatch { 151 + impl<const LIMIT: usize> EventBatch<LIMIT> { 109 152 pub fn total_records(&self) -> usize { 110 153 self.commits_by_nsid.values().map(|v| v.commits.len()).sum() 111 154 } ··· 128 171 pub fn latest_cursor(&self) -> Option<Cursor> { 129 172 let mut oldest = Cursor::from_start(); 130 173 for commits in self.commits_by_nsid.values() { 131 - if let Some(commit) = commits.commits.front() { 174 + for commit in &commits.commits { 132 175 if commit.cursor > oldest { 133 176 oldest = commit.cursor; 134 177 } ··· 149 192 self.commits_by_nsid.is_empty() && self.account_removes.is_empty() 150 193 } 151 194 } 195 + 196 + #[cfg(test)] 197 + mod tests { 198 + use super::*; 199 + 200 + #[test] 201 + fn test_truncating_insert_truncates() -> anyhow::Result<()> { 202 + let mut commits: CollectionCommits<2> = Default::default(); 203 + 204 + commits.truncating_insert(UFOsCommit { 205 + cursor: Cursor::from_raw_u64(100), 206 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 207 + rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 208 + rev: "rev-asdf".to_string(), 209 + action: CommitAction::Put(PutAction { 210 + record: RawValue::from_string("{}".to_string())?, 211 + is_update: false, 212 + }), 213 + }); 214 + 215 + commits.truncating_insert(UFOsCommit { 216 + cursor: Cursor::from_raw_u64(101), 217 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 218 + rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(), 219 + rev: "rev-asdg".to_string(), 220 + action: CommitAction::Put(PutAction { 221 + record: RawValue::from_string("{}".to_string())?, 222 + is_update: false, 223 + }), 224 + }); 225 + 226 + commits.truncating_insert(UFOsCommit { 227 + cursor: Cursor::from_raw_u64(102), 228 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 229 + rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(), 230 + rev: "rev-asdh".to_string(), 231 + action: CommitAction::Put(PutAction { 232 + record: RawValue::from_string("{}".to_string())?, 233 + is_update: false, 234 + }), 235 + }); 236 + 237 + assert_eq!(commits.total_seen, 3); 238 + assert_eq!(commits.dids_estimate.estimate(), 1); 239 + assert_eq!(commits.commits.len(), 2); 240 + 241 + let mut found_first = false; 242 + let mut found_last = false; 243 + for commit in commits.commits { 244 + match commit.rev.as_ref() { 245 + "rev-asdf" => { 246 + found_first = true; 247 + } 248 + "rev-asdh" => { 249 + found_last = true; 250 + } 251 + _ => {} 252 + } 253 + } 254 + assert!(!found_first); 255 + assert!(found_last); 256 + 257 + Ok(()) 258 + } 259 + 260 + #[test] 261 + fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> { 262 + let mut commits: CollectionCommits<2> = Default::default(); 263 + 264 + commits.truncating_insert(UFOsCommit { 265 + cursor: Cursor::from_raw_u64(100), 266 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 267 + rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 268 + rev: "rev-asdf".to_string(), 269 + action: CommitAction::Cut, 270 + }); 271 + 272 + commits.truncating_insert(UFOsCommit { 273 + cursor: Cursor::from_raw_u64(101), 274 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 275 + rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(), 276 + rev: "rev-asdg".to_string(), 277 + action: CommitAction::Put(PutAction { 278 + record: RawValue::from_string("{}".to_string())?, 279 + is_update: false, 280 + }), 281 + }); 282 + 283 + commits.truncating_insert(UFOsCommit { 284 + cursor: Cursor::from_raw_u64(102), 285 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 286 + rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(), 287 + rev: "rev-asdh".to_string(), 288 + action: CommitAction::Put(PutAction { 289 + record: RawValue::from_string("{}".to_string())?, 290 + is_update: false, 291 + }), 292 + }); 293 + 294 + assert_eq!(commits.total_seen, 2); 295 + assert_eq!(commits.dids_estimate.estimate(), 1); 296 + assert_eq!(commits.commits.len(), 2); 297 + 298 + let mut found_first = false; 299 + let mut found_last = false; 300 + let mut found_delete = false; 301 + for commit in commits.commits { 302 + match commit.rev.as_ref() { 303 + "rev-asdg" => { 304 + found_first = true; 305 + } 306 + "rev-asdh" => { 307 + found_last = true; 308 + } 309 + _ => {} 310 + } 311 + if let CommitAction::Cut = commit.action { 312 + found_delete = true; 313 + } 314 + } 315 + assert!(!found_first); 316 + assert!(found_last); 317 + assert!(found_delete); 318 + 319 + Ok(()) 320 + } 321 + }
+4 -1
ufos/src/storage.rs
··· 17 17 } 18 18 19 19 pub trait StoreWriter { 20 - fn insert_batch(&mut self, event_batch: EventBatch) -> StorageResult<()>; 20 + fn insert_batch<const LIMIT: usize>( 21 + &mut self, 22 + event_batch: EventBatch<LIMIT>, 23 + ) -> StorageResult<()>; 21 24 fn trim_collection(&mut self, collection: &Nsid, limit: usize) -> StorageResult<()>; 22 25 } 23 26
+14 -17
ufos/src/storage_fjall.rs
··· 1 + use crate::consumer::LimitedBatch; 1 2 use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr}; 2 3 use crate::error::StorageError; 3 4 use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter}; ··· 349 350 } 350 351 351 352 impl StoreWriter for FjallWriter { 352 - fn insert_batch(&mut self, event_batch: EventBatch) -> StorageResult<()> { 353 + fn insert_batch<const LIMIT: usize>( 354 + &mut self, 355 + event_batch: EventBatch<LIMIT>, 356 + ) -> StorageResult<()> { 353 357 if event_batch.is_empty() { 354 358 return Ok(()); 355 359 } ··· 529 533 /// Jetstream event batch receiver: writes without any reads 530 534 /// 531 535 /// Events that require reads like record updates or delets are written to a queue 532 - pub async fn receive(&self, mut receiver: Receiver<EventBatch>) -> anyhow::Result<()> { 536 + pub async fn receive(&self, mut receiver: Receiver<LimitedBatch>) -> anyhow::Result<()> { 533 537 // TODO: see rw_loop: enforce single-thread. 534 538 loop { 535 539 let t_sleep = Instant::now(); ··· 1119 1123 1120 1124 ////////// temp stuff to remove: 1121 1125 1122 - fn summarize_batch(batch: &EventBatch) -> String { 1126 + fn summarize_batch<const LIMIT: usize>(batch: &EventBatch<LIMIT>) -> String { 1123 1127 format!( 1124 1128 "batch of {: >3} samples from {: >4} records in {: >2} collections from ~{: >4} DIDs, {} acct removes, cursor {: <12?}", 1125 1129 batch.total_records(), ··· 1149 1153 .unwrap(); 1150 1154 (read, write) 1151 1155 } 1156 + 1157 + const TEST_BATCH_LIMIT: usize = 16; 1152 1158 1153 1159 #[derive(Debug, Default)] 1154 1160 struct TestBatch { 1155 - pub batch: EventBatch, 1161 + pub batch: EventBatch<TEST_BATCH_LIMIT>, 1156 1162 } 1157 1163 1158 1164 impl TestBatch { ··· 1165 1171 rev: Option<&str>, 1166 1172 cid: Option<Cid>, 1167 1173 cursor: u64, 1168 - truncate_at: usize, 1169 1174 ) -> Nsid { 1170 1175 let did = Did::new(did.to_string()).unwrap(); 1171 1176 let collection = Nsid::new(collection.to_string()).unwrap(); ··· 1193 1198 .commits_by_nsid 1194 1199 .entry(collection.clone()) 1195 1200 .or_default() 1196 - .truncating_insert(commit, truncate_at); 1201 + .truncating_insert(commit); 1197 1202 1198 1203 collection 1199 1204 } ··· 1206 1211 rev: Option<&str>, 1207 1212 cid: Option<Cid>, 1208 1213 cursor: u64, 1209 - truncate_at: usize, 1210 1214 ) -> Nsid { 1211 1215 let did = Did::new(did.to_string()).unwrap(); 1212 1216 let collection = Nsid::new(collection.to_string()).unwrap(); ··· 1234 1238 .commits_by_nsid 1235 1239 .entry(collection.clone()) 1236 1240 .or_default() 1237 - .truncating_insert(commit, truncate_at); 1241 + .truncating_insert(commit); 1238 1242 1239 1243 collection 1240 1244 } ··· 1264 1268 .commits_by_nsid 1265 1269 .entry(collection.clone()) 1266 1270 .or_default() 1267 - .truncating_insert(commit, 10000); // eek this needs to be fixed!! 1271 + .truncating_insert(commit); 1268 1272 1269 1273 collection 1270 1274 } ··· 1273 1277 #[test] 1274 1278 fn test_hello() -> anyhow::Result<()> { 1275 1279 let (read, mut write) = fjall_db(); 1276 - write.insert_batch(EventBatch::default())?; 1280 + write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1277 1281 let (records, dids) = 1278 1282 read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?; 1279 1283 assert_eq!(records, 0); ··· 1294 1298 Some("rev-z"), 1295 1299 None, 1296 1300 100, 1297 - 1, 1298 1301 ); 1299 1302 write.insert_batch(batch.batch)?; 1300 1303 ··· 1332 1335 Some("rev-a"), 1333 1336 None, 1334 1337 100, 1335 - 1, 1336 1338 ); 1337 1339 write.insert_batch(batch.batch)?; 1338 1340 ··· 1345 1347 Some("rev-z"), 1346 1348 None, 1347 1349 101, 1348 - 1, 1349 1350 ); 1350 1351 write.insert_batch(batch.batch)?; 1351 1352 ··· 1374 1375 Some("rev-a"), 1375 1376 None, 1376 1377 100, 1377 - 1, 1378 1378 ); 1379 1379 write.insert_batch(batch.batch)?; 1380 1380 ··· 1411 1411 Some("rev-aaa"), 1412 1412 None, 1413 1413 10_000, 1414 - 100, 1415 1414 ); 1416 1415 let mut last_b_cursor; 1417 1416 for i in 1..=10 { ··· 1424 1423 Some(&format!("rev-bbb-{i}")), 1425 1424 None, 1426 1425 last_b_cursor, 1427 - 100, 1428 1426 ); 1429 1427 } 1430 1428 batch.create( ··· 1435 1433 Some("rev-ccc"), 1436 1434 None, 1437 1435 12_000, 1438 - 100, 1439 1436 ); 1440 1437 1441 1438 write.insert_batch(batch.batch)?;