Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

handle record updates and account deletes

- bug from jetstream oxide: https://github.com/videah/jetstream-oxide/pull/9
- locally just merging create and update into a single variant, switching on `operation` in client
- mod cursor seeks with bounds instead of prefix.next, since post prefixes will be delete tombstones
- the exclusive bound is *super* annoying. this pulls lsm-tree in but that was not even nice. should ask fjall to expose the upper bound calculation as a distinct function?

+197 -68
+1
Cargo.lock
··· 3719 3719 "fjall", 3720 3720 "jetstream", 3721 3721 "log", 3722 + "lsm-tree", 3722 3723 "schemars", 3723 3724 "semver", 3724 3725 "serde",
+2 -2
jetstream/examples/arbitrary_record.rs
··· 42 42 let mut receiver = jetstream.connect().await?; 43 43 44 44 println!( 45 - "Listening for '{}' events on DIDs: {:?}", 45 + "Listening for new and updated '{}' events on DIDs: {:?}", 46 46 args.nsid.as_str(), 47 47 dids 48 48 ); 49 49 50 50 while let Some(event) = receiver.recv().await { 51 - if let Commit(CommitEvent::Create { commit, .. }) = event { 51 + if let Commit(CommitEvent::CreateOrUpdate { commit, .. }) = event { 52 52 println!("got record: {:?}", commit.record); 53 53 } 54 54 }
+7 -2
jetstream/examples/basic.rs
··· 7 7 use clap::Parser; 8 8 use jetstream::{ 9 9 events::{ 10 - commit::CommitEvent, 10 + commit::{ 11 + CommitEvent, 12 + CommitType, 13 + }, 11 14 JetstreamEvent::Commit, 12 15 }, 13 16 DefaultJetstreamEndpoints, ··· 52 55 while let Some(event) = receiver.recv().await { 53 56 if let Commit(commit) = event { 54 57 match commit { 55 - CommitEvent::Create { info: _, commit } => { 58 + CommitEvent::CreateOrUpdate { info: _, commit } 59 + if commit.info.operation == CommitType::Create => 60 + { 56 61 if let AppBskyFeedPost(record) = commit.record { 57 62 println!( 58 63 "New post created! ({})\n\n'{}'",
+2 -7
jetstream/src/events/commit.rs
··· 9 9 #[derive(Deserialize, Debug)] 10 10 #[serde(untagged, rename_all = "snake_case")] 11 11 pub enum CommitEvent<R> { 12 - Create { 13 - #[serde(flatten)] 14 - info: EventInfo, 15 - commit: CommitData<R>, 16 - }, 17 - Update { 12 + CreateOrUpdate { 18 13 #[serde(flatten)] 19 14 info: EventInfo, 20 15 commit: CommitData<R>, ··· 27 22 } 28 23 29 24 /// The type of commit operation that was performed. 30 - #[derive(Deserialize, Debug)] 25 + #[derive(Deserialize, Debug, PartialEq)] 31 26 #[serde(rename_all = "snake_case")] 32 27 pub enum CommitType { 33 28 Create,
+1 -4
jetstream/src/events/mod.rs
··· 46 46 impl<R> JetstreamEvent<R> { 47 47 pub fn cursor(&self) -> Cursor { 48 48 match self { 49 - JetstreamEvent::Commit(commit::CommitEvent::Create { info, .. }) => { 50 - info.time_us.clone() 51 - } 52 - JetstreamEvent::Commit(commit::CommitEvent::Update { info, .. }) => { 49 + JetstreamEvent::Commit(commit::CommitEvent::CreateOrUpdate { info, .. }) => { 53 50 info.time_us.clone() 54 51 } 55 52 JetstreamEvent::Commit(commit::CommitEvent::Delete { info, .. }) => {
+1
ufos/Cargo.toml
··· 12 12 fjall = { version = "2.6.7", features = ["bytes", "single_writer_tx"], default-features = false } 13 13 jetstream = { path = "../jetstream" } 14 14 log = "0.4.26" 15 + lsm-tree = "2.6.6" 15 16 schemars = "0.8.22" 16 17 semver = "1.0.26" 17 18 serde = "1.0.219"
+12 -7
ufos/src/consumer.rs
··· 1 1 use jetstream::{ 2 2 events::{ 3 3 account::AccountEvent, 4 - commit::{CommitData, CommitEvent, CommitInfo}, 4 + commit::{CommitData, CommitEvent, CommitInfo, CommitType}, 5 5 Cursor, EventInfo, JetstreamEvent, 6 6 }, 7 7 exports::Did, ··· 95 95 } 96 96 97 97 match event { 98 - JetstreamEvent::Commit(CommitEvent::Create { commit, info }) => { 99 - self.handle_create_record(commit, info).await? 100 - } 101 - JetstreamEvent::Commit(CommitEvent::Update { commit, info }) => { 102 - self.handle_modify_record(modify_update(commit, info)) 103 - .await? 98 + JetstreamEvent::Commit(CommitEvent::CreateOrUpdate { commit, info }) => { 99 + match commit.info.operation { 100 + CommitType::Create => self.handle_create_record(commit, info).await?, 101 + CommitType::Update => { 102 + self.handle_modify_record(modify_update(commit, info)) 103 + .await? 104 + } 105 + CommitType::Delete => { 106 + panic!("jetstream Commit::CreateOrUpdate had Delete operation type") 107 + } 108 + } 104 109 } 105 110 JetstreamEvent::Commit(CommitEvent::Delete { commit, info }) => { 106 111 self.handle_modify_record(modify_delete(commit, info))
+29 -3
ufos/src/db_types.rs
··· 7 7 encode_to_vec, 8 8 error::{DecodeError, EncodeError}, 9 9 }; 10 + use lsm_tree::range::prefix_to_range; 11 + use std::fmt; 10 12 use std::marker::PhantomData; 13 + use std::ops::{Bound, Range}; 11 14 use thiserror::Error; 12 15 13 16 #[non_exhaustive] ··· 37 40 JsonError(#[from] serde_json::Error), 38 41 #[error("unexpected extra bytes ({0} bytes) left after decoding")] 39 42 DecodeTooManyBytes(usize), 43 + #[error("expected exclusive bound from lsm_tree (likely bug)")] 44 + BadRangeBound, 40 45 } 41 46 42 47 fn bincode_conf() -> impl Config { ··· 50 55 Self: Sized; 51 56 } 52 57 53 - #[derive(Debug, PartialEq)] 58 + #[derive(PartialEq)] 54 59 pub struct DbConcat<P: DbBytes, S: DbBytes> { 55 60 pub prefix: P, 56 61 pub suffix: S, 57 62 } 58 63 59 - impl<P: DbBytes, S: DbBytes> DbConcat<P, S> { 64 + impl<P: DbBytes + PartialEq + std::fmt::Debug, S: DbBytes + PartialEq + std::fmt::Debug> 65 + DbConcat<P, S> 66 + { 60 67 pub fn from_pair(prefix: P, suffix: S) -> Self { 61 68 Self { prefix, suffix } 62 69 } ··· 66 73 pub fn to_prefix_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 67 74 self.prefix.to_db_bytes() 68 75 } 76 + pub fn range_to_prefix_end(&self) -> Result<Range<Vec<u8>>, EncodingError> { 77 + let mut key_bytes = self.prefix.to_db_bytes()?; 78 + let (_, Bound::Excluded(range_end)) = prefix_to_range(&key_bytes) else { 79 + return Err(EncodingError::BadRangeBound); 80 + }; 81 + key_bytes.append(&mut self.suffix.to_db_bytes()?); 82 + Ok(key_bytes..range_end.to_vec()) 83 + } 84 + } 85 + 86 + impl<P: DbBytes + std::fmt::Debug, S: DbBytes + std::fmt::Debug> fmt::Debug for DbConcat<P, S> { 87 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 88 + write!(f, "DbConcat<{:?} || {:?}>", self.prefix, self.suffix) 89 + } 69 90 } 70 91 71 92 impl<P: DbBytes, S: DbBytes> DbBytes for DbConcat<P, S> { ··· 102 123 fn static_str() -> &'static str; 103 124 } 104 125 105 - #[derive(Debug, PartialEq)] 126 + #[derive(PartialEq)] 106 127 pub struct DbStaticStr<S: StaticStr> { 107 128 marker: PhantomData<S>, 108 129 } ··· 111 132 Self { 112 133 marker: PhantomData, 113 134 } 135 + } 136 + } 137 + impl<S: StaticStr> fmt::Debug for DbStaticStr<S> { 138 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 139 + write!(f, "DbStaticStr({:?})", S::static_str()) 114 140 } 115 141 } 116 142 impl<S: StaticStr> DbBytes for DbStaticStr<S> {
+4 -1
ufos/src/main.rs
··· 39 39 let (storage, cursor) = 40 40 store::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?; 41 41 42 - println!("starting consumer with cursor: {cursor:?}"); 42 + println!( 43 + "starting consumer with cursor: {cursor:?} from {:?} ago", 44 + cursor.clone().map(|c| c.elapsed()) 45 + ); 43 46 let batches = consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?; 44 47 45 48 println!("starting server with storage...");
+138 -42
ufos/src/store.rs
··· 2 2 use crate::store_types::{ 3 3 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, 4 4 JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, 5 - ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemPrefix, ModQueueItemStringValue, 6 - ModQueueItemValue, 5 + ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, 7 6 }; 8 7 use crate::{ 9 8 CollectionSamples, CreateRecord, DeleteAccount, Did, EventBatch, ModifyRecord, Nsid, RecordKey, ··· 16 15 use std::path::{Path, PathBuf}; 17 16 use std::time::{Duration, Instant}; 18 17 use tokio::{sync::mpsc::Receiver, time::sleep}; 18 + 19 + const MAX_BATCHED_DELETE_ACCOUNT_RECORDS: usize = 32; // there are probably some efficiency gains for higher, at cost of more memory 19 20 20 21 /** 21 22 * data format, roughly: ··· 130 131 pub async fn rw_loop(&self) -> anyhow::Result<()> { 131 132 // TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time. 132 133 loop { 133 - sleep(Duration::from_secs_f64(0.2)).await; 134 + sleep(Duration::from_secs_f64(0.001)).await; 134 135 let keyspace = self.keyspace.clone(); 135 136 let partition = self.partition.clone(); 136 137 tokio::task::spawn_blocking(move || -> anyhow::Result<()> { 137 - let prefix = ModQueueItemPrefix::default().to_db_bytes()?; 138 - // TODO: use the mod cursor to avoid scanning over all the deletes (delete range would actually be nice) 139 - let Some(pair) = partition.prefix(prefix).next() else { 140 - eprintln!("mod queue empty."); 138 + let mod_cursor = get_static::<ModCursorKey, ModCursorValue>(&partition)? 139 + .unwrap_or(Cursor::from_start()); 140 + let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?; 141 + let Some(pair) = partition.range(range.clone()).next() else { 142 + // eprintln!("mod queue empty."); 141 143 return Ok(()); 142 144 }; 145 + 143 146 let (key_bytes, val_bytes) = pair?; 144 - let mod_key: ModQueueItemKey = db_complete::<ModQueueItemKey>(&key_bytes)?; 147 + let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) { 148 + Ok(k) => k, 149 + Err(EncodingError::WrongStaticPrefix(_, _)) => { 150 + panic!("wsp: mod queue empty."); 151 + } 152 + otherwise => otherwise?, 153 + }; 154 + 145 155 let mod_value: ModQueueItemValue = 146 156 db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?; 147 157 ··· 291 301 let mut db_batch = self.keyspace.batch(); 292 302 293 303 // update the current rw cursor to this item (atomically with the batch if it succeeds) 294 - let mod_cursor_value: ModCursorValue = (&mod_key).into(); 295 - insert_batch_static::<ModCursorKey>( 296 - &mut db_batch, 297 - &self.partition, 298 - mod_cursor_value.clone(), 299 - )?; 304 + let mod_cursor: Cursor = (&mod_key).into(); 305 + insert_batch_static::<ModCursorKey>(&mut db_batch, &self.partition, mod_cursor.clone())?; 300 306 301 - // remove the queued rw task so that we'll continue with the *next* one (atomically with batch) 302 - remove_batch::<ModQueueItemKey>(&mut db_batch, &self.partition, mod_key)?; 303 - 304 - match mod_value { 307 + let completed = match mod_value { 305 308 ModQueueItemValue::DeleteAccount(did) => { 306 - eprintln!("delete account: {did:?} (not yet implemented)"); 307 - return Ok(()); // don't let the batch commit until we implement this 309 + self.delete_account(&mut db_batch, mod_cursor, did)? 308 310 } 309 311 ModQueueItemValue::DeleteRecord(did, collection, rkey) => { 310 - eprintln!("delete record: {did:?} {collection:?} {rkey:?}"); 311 - self.delete_record(&mut db_batch, mod_cursor_value, did, collection, rkey)?; 312 + self.delete_record(&mut db_batch, mod_cursor, did, collection, rkey)?; 313 + true 312 314 } 313 315 ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 314 - eprintln!("update record: {did:?} {collection:?} {rkey:?} {record:?} (not yet implemented)"); 315 - return Ok(()); // don't let the batch commit until we implement this 316 + self.update_record(&mut db_batch, mod_cursor, did, collection, rkey, record)?; 317 + true 316 318 } 319 + }; 320 + if completed { 321 + // remove the queued rw task so that we'll continue with the *next* one (atomically with batch) 322 + remove_batch::<ModQueueItemKey>(&mut db_batch, &self.partition, mod_key)?; 317 323 } 318 324 Ok(db_batch.commit()?) 319 325 } 320 326 327 + fn update_record( 328 + &self, 329 + db_batch: &mut FjallBatch, 330 + cursor: Cursor, 331 + did: Did, 332 + collection: Nsid, 333 + rkey: RecordKey, 334 + record: serde_json::Value, 335 + ) -> anyhow::Result<usize> { 336 + // 1. delete any existing versions older than us 337 + let n_deleted = self.delete_record( 338 + db_batch, 339 + cursor.clone(), 340 + did.clone(), 341 + collection.clone(), 342 + rkey.clone(), 343 + )?; 344 + 345 + // 2. insert the updated version, at our new cursor 346 + self.add_record(db_batch, cursor, did, collection, rkey, record)?; 347 + 348 + Ok(n_deleted) 349 + } 350 + 321 351 fn delete_record( 322 352 &self, 323 353 db_batch: &mut FjallBatch, ··· 325 355 did: Did, 326 356 collection: Nsid, 327 357 rkey: RecordKey, 328 - ) -> anyhow::Result<()> { 358 + ) -> anyhow::Result<usize> { 329 359 let key_prefix_bytes = 330 360 ByIdKey::record_prefix(did, collection.clone(), rkey).to_db_bytes()?; 331 361 ··· 333 363 for pair in self.partition.prefix(&key_prefix_bytes) { 334 364 // find all (hopefully 1) 335 365 let (key_bytes, _) = pair?; 336 - let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor(); 366 + let key = db_complete::<ByIdKey>(&key_bytes)?; 367 + let found_cursor = key.cursor(); 337 368 if found_cursor > cursor { 338 369 // we are *only* allowed to delete records that came before the record delete event 339 - eprintln!("delete_record: found and ignoring newer version(s)"); 370 + eprintln!("delete_record: found (and ignoring) newer version(s). key: {key:?}"); 340 371 break; 341 372 } 342 373 ··· 351 382 n_removed += 1; 352 383 } 353 384 354 - eprintln!("removed {n_removed} records."); 355 - Ok(()) 385 + if n_removed > 1 { 386 + eprintln!("odd, removed {n_removed} records for one record removal:"); 387 + for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() { 388 + // find all (hopefully 1) 389 + let (key_bytes, _) = pair?; 390 + let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor(); 391 + if found_cursor > cursor { 392 + break; 393 + } 394 + 395 + let key = db_complete::<ByIdKey>(&key_bytes)?; 396 + eprintln!(" {i}: key {key:?}"); 397 + } 398 + } 399 + Ok(n_removed) 400 + } 401 + 402 + fn delete_account( 403 + &self, 404 + db_batch: &mut FjallBatch, 405 + cursor: Cursor, 406 + did: Did, 407 + ) -> anyhow::Result<bool> { 408 + let key_prefix_bytes = ByIdKey::did_prefix(did).to_db_bytes()?; 409 + 410 + let mut n_found = 0; 411 + for pair in self.partition.prefix(&key_prefix_bytes) { 412 + let (key_bytes, _) = pair?; 413 + 414 + let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into(); 415 + if found_cursor > cursor { 416 + eprintln!( 417 + "delete account: found (and ignoring) newer records than the delete event??" 418 + ); 419 + continue; 420 + } 421 + 422 + // remove the by_id entry 423 + db_batch.remove(&self.partition, key_bytes); 424 + 425 + // remove its record sample 426 + let by_collection_key_bytes = 427 + ByCollectionKey::new(collection, found_cursor).to_db_bytes()?; 428 + db_batch.remove(&self.partition, by_collection_key_bytes); 429 + 430 + n_found += 1; 431 + if n_found >= MAX_BATCHED_DELETE_ACCOUNT_RECORDS { 432 + return Ok(false); // there might be more records but we've done enough for this batch 433 + } 434 + } 435 + 436 + // eprintln!("removed {n_found} account records."); 437 + Ok(true) 356 438 } 357 439 358 440 fn add_record_creates( ··· 388 470 record, 389 471 } in samples.into_iter().rev() 390 472 { 391 - // ["by_collection"|collection|js_cursor] => [did|rkey|record] 392 - db_batch.insert( 393 - &self.partition, 394 - ByCollectionKey::new(collection.clone(), cursor.clone()).to_db_bytes()?, 395 - ByCollectionValue::new(did.clone(), rkey.clone(), record).to_db_bytes()?, 396 - ); 397 - 398 - // ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 399 - db_batch.insert( 400 - &self.partition, 401 - ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?, 402 - ByIdValue::default().to_db_bytes()?, 403 - ); 473 + self.add_record(db_batch, cursor, did, collection.clone(), rkey, record)?; 404 474 } 405 475 } 476 + Ok(()) 477 + } 478 + 479 + fn add_record( 480 + &self, 481 + db_batch: &mut FjallBatch, 482 + cursor: Cursor, 483 + did: Did, 484 + collection: Nsid, 485 + rkey: RecordKey, 486 + record: serde_json::Value, 487 + ) -> anyhow::Result<()> { 488 + // ["by_collection"|collection|js_cursor] => [did|rkey|record] 489 + db_batch.insert( 490 + &self.partition, 491 + ByCollectionKey::new(collection.clone(), cursor.clone()).to_db_bytes()?, 492 + ByCollectionValue::new(did.clone(), rkey.clone(), record).to_db_bytes()?, 493 + ); 494 + 495 + // ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 496 + db_batch.insert( 497 + &self.partition, 498 + ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?, 499 + ByIdValue::default().to_db_bytes()?, 500 + ); 501 + 406 502 Ok(()) 407 503 } 408 504