Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

write deletes and updates to record modification queue

a worker will read this queue to actually do the updates + deletes

+234 -64
+1 -1
ufos/readme.md
··· 91 91 running 92 92 93 93 ```bash 94 - RUST_LOG=info ./ufos --jetstream ws://192.168.1.139:8080/subscribe --force --data /mnt/ufos-data-blah/ 94 + RUST_LOG=info ./ufos --jetstream ws://192.168.1.139:8080/subscribe --jetstream-force --jetstream-no-zstd --data /mnt/ufos-data-blah/ 95 95 ``` 96 96 97 97 try without info-level logs for better perf
+139 -63
ufos/src/store.rs
··· 1 1 use crate::db_types::{db_complete, DbBytes}; 2 2 use crate::store_types::{ 3 3 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, 4 + ModQueueItemKey, ModQueueItemValue, 4 5 }; 5 - use crate::{CollectionSamples, CreateRecord, EventBatch, Nsid}; 6 - use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle, Slice}; 6 + use crate::{CollectionSamples, CreateRecord, DeleteAccount, EventBatch, ModifyRecord, Nsid}; 7 + use fjall::{ 8 + Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Slice, 9 + }; 7 10 use jetstream::events::Cursor; 11 + use std::collections::HashMap; 8 12 use std::path::Path; 9 13 use std::time::{Duration, Instant}; 10 14 use tokio::{sync::mpsc::Receiver, time::sleep}; ··· 15 19 * Global Meta: 16 20 * ["js_cursor"] => js_cursor(u64), // used as global sequence 17 21 * ["js_endpoint"] => &str, // checked on startup because jetstream instance cursors are not interchangeable 18 - * ["mod_cursor"] => [u64]; 22 + * ["mod_cursor"] => js_cursor(u64); 19 23 * ["rollup_cursor"] => [js_cursor|collection]; // how far the rollup helper has progressed 20 24 * Mod queue 21 - * ["mod_queue"|mod_cursor] => one of { 22 - * DeleteAccount(did, js_cursor) // delete all account content older than cursor 23 - * DeleteRecord(did, collection, rkey, js_cursor) // delete record older than cursor 24 - * UpdateRecord(did, collection, rkey, new_record, js_cursor) // delete + put, but don't delete if cursor is newer 25 + * ["mod_queue"|js_cursor] => one of { 26 + * DeleteAccount(did) // delete all account content older than cursor 27 + * DeleteRecord(did, collection, rkey) // delete record older than cursor 28 + * UpdateRecord(did, collection, rkey, new_record) // delete + put, but don't delete if cursor is newer 25 29 * } 26 30 * Collection and rollup meta: 27 31 * ["seen_by_js_cursor_collection"|js_cursor|collection] => u64 // batched total, gets cleaned up by rollup ··· 88 92 if let Some(event_batch) = receiver.recv().await { 89 93 let batch_summary = summarize_batch(&event_batch); 90 94 91 - let t0 = Instant::now(); 92 - 93 95 let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first. 94 96 95 97 let keyspace = self.keyspace.clone(); 96 98 let partition = self.partition.clone(); 97 99 100 + let writer_t0 = Instant::now(); 98 101 tokio::task::spawn_blocking(move || { 99 - let mut db_batch = keyspace.batch(); 100 - for ( 101 - collection, 102 - CollectionSamples { 103 - total_seen, 104 - samples, 105 - }, 106 - ) in event_batch.record_creates.into_iter() 107 - { 108 - if let Some(last_record) = &samples.back() { 109 - db_batch.insert( 110 - &partition, 111 - ByCursorSeenKey::new(last_record.cursor.clone(), collection.clone()) 112 - .to_db_bytes().unwrap(), 113 - ByCursorSeenValue::new(total_seen as u64).to_db_bytes().unwrap(), 114 - ); 115 - } else { 116 - log::error!("collection samples should only exist when at least one sample has been added"); 117 - } 118 - 119 - for CreateRecord { 120 - did, 121 - rkey, 122 - cursor, 123 - record, 124 - } in samples 125 - { 126 - // ["by_collection"|collection|js_cursor] => [did|rkey|record] 127 - db_batch.insert( 128 - &partition, 129 - ByCollectionKey::new(collection.clone(), cursor.clone()) 130 - .to_db_bytes().unwrap(), 131 - ByCollectionValue::new(did.clone(), rkey.clone(), record) 132 - .to_db_bytes().unwrap(), 133 - ); 134 - 135 - // ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 136 - db_batch.insert( 137 - &partition, 138 - ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes().unwrap(), 139 - ByIdValue::default().to_db_bytes().unwrap(), 140 - ); 141 - } 102 + BatchWriter { 103 + keyspace, 104 + partition, 142 105 } 143 - if let Some(cursor) = last { 144 - db_batch.insert(&partition, "js_cursor", cursor_to_slice(cursor)); 145 - } 146 - db_batch.commit().unwrap(); 106 + .write(event_batch, last) 107 + }) 108 + .await??; 109 + let wrote_for = writer_t0.elapsed(); 147 110 148 - let batched_for = t0.elapsed(); 149 - 150 - 151 - println!("{batch_summary}, slept for {slept_for: <12?}, wrote for {batched_for: <11?}, queue size: {queue_size}"); 152 - 153 - }).await?; 111 + println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}"); 154 112 } else { 155 113 anyhow::bail!("receive channel closed"); 156 114 } ··· 183 141 } 184 142 } 185 143 144 + struct BatchWriter { 145 + keyspace: Keyspace, 146 + partition: PartitionHandle, 147 + } 148 + 149 + impl BatchWriter { 150 + fn write(self, event_batch: EventBatch, last: Option<Cursor>) -> anyhow::Result<()> { 151 + let mut db_batch = self.keyspace.batch(); 152 + 153 + let EventBatch { 154 + record_creates, 155 + record_modifies, 156 + account_removes, 157 + .. 158 + } = event_batch; 159 + self.add_record_creates(&mut db_batch, record_creates)?; 160 + self.add_record_modifies(&mut db_batch, record_modifies)?; 161 + self.add_account_removes(&mut db_batch, account_removes)?; 162 + 163 + if let Some(cursor) = last { 164 + db_batch.insert(&self.partition, "js_cursor", cursor_to_slice(cursor)); 165 + } 166 + db_batch.commit()?; 167 + Ok(()) 168 + } 169 + 170 + fn add_record_creates( 171 + &self, 172 + db_batch: &mut FjallBatch, 173 + record_creates: HashMap<Nsid, CollectionSamples>, 174 + ) -> anyhow::Result<()> { 175 + for ( 176 + collection, 177 + CollectionSamples { 178 + total_seen, 179 + samples, 180 + }, 181 + ) in record_creates.into_iter() 182 + { 183 + if let Some(last_record) = &samples.back() { 184 + db_batch.insert( 185 + &self.partition, 186 + ByCursorSeenKey::new(last_record.cursor.clone(), collection.clone()) 187 + .to_db_bytes()?, 188 + ByCursorSeenValue::new(total_seen as u64).to_db_bytes()?, 189 + ); 190 + } else { 191 + log::error!( 192 + "collection samples should only exist when at least one sample has been added" 193 + ); 194 + } 195 + 196 + for CreateRecord { 197 + did, 198 + rkey, 199 + cursor, 200 + record, 201 + } in samples.into_iter().rev() 202 + { 203 + // ["by_collection"|collection|js_cursor] => [did|rkey|record] 204 + db_batch.insert( 205 + &self.partition, 206 + ByCollectionKey::new(collection.clone(), cursor.clone()).to_db_bytes()?, 207 + ByCollectionValue::new(did.clone(), rkey.clone(), record).to_db_bytes()?, 208 + ); 209 + 210 + // ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 211 + db_batch.insert( 212 + &self.partition, 213 + ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?, 214 + ByIdValue::default().to_db_bytes()?, 215 + ); 216 + } 217 + } 218 + Ok(()) 219 + } 220 + 221 + fn add_record_modifies( 222 + &self, 223 + db_batch: &mut FjallBatch, 224 + record_modifies: Vec<ModifyRecord>, 225 + ) -> anyhow::Result<()> { 226 + for modification in record_modifies { 227 + let (cursor, db_val) = match modification { 228 + ModifyRecord::Update(u) => ( 229 + u.cursor, 230 + ModQueueItemValue::UpdateRecord(u.did, u.collection, u.rkey, u.record), 231 + ), 232 + ModifyRecord::Delete(d) => ( 233 + d.cursor, 234 + ModQueueItemValue::DeleteRecord(d.did, d.collection, d.rkey), 235 + ), 236 + }; 237 + db_batch.insert( 238 + &self.partition, 239 + ModQueueItemKey::new(cursor).to_db_bytes()?, 240 + db_val.to_db_bytes()?, 241 + ); 242 + } 243 + Ok(()) 244 + } 245 + 246 + fn add_account_removes( 247 + &self, 248 + db_batch: &mut FjallBatch, 249 + account_removes: Vec<DeleteAccount>, 250 + ) -> anyhow::Result<()> { 251 + for deletion in account_removes { 252 + db_batch.insert( 253 + &self.partition, 254 + ModQueueItemKey::new(deletion.cursor).to_db_bytes()?, 255 + ModQueueItemValue::DeleteAccount(deletion.did).to_db_bytes()?, 256 + ); 257 + } 258 + Ok(()) 259 + } 260 + } 261 + 186 262 fn summarize_batch(batch: &EventBatch) -> String { 187 263 let EventBatch { 188 264 record_creates, ··· 194 270 let total_records: usize = record_creates.values().map(|v| v.total_seen).sum(); 195 271 let total_samples: usize = record_creates.values().map(|v| v.samples.len()).sum(); 196 272 format!( 197 - "got batch of {total_samples: >3} samples from {total_records: >4} records in {: >2} collections, {: >3} record modifies, {} account removes, cursor {: <14?}", 273 + "batch of {total_samples: >3} samples from {total_records: >4} records in {: >2} collections, {: >3} modifies, {} acct removes, cursor {: <12?}", 198 274 record_creates.len(), 199 275 record_modifies.len(), 200 276 account_removes.len(),
+94
ufos/src/store_types.rs
··· 136 136 137 137 pub type ByCursorSeenValue = SeenCounter; 138 138 139 + #[derive(Debug, PartialEq)] 140 + pub struct _ModQueueItemStaticStr {} 141 + impl StaticStr for _ModQueueItemStaticStr { 142 + fn static_str() -> &'static str { 143 + "mod_queue" 144 + } 145 + } 146 + type ModQueueItemPrefix = DbStaticStr<_ModQueueItemStaticStr>; 147 + /// key format: ["mod_queue"|js_cursor] 148 + pub type ModQueueItemKey = DbConcat<ModQueueItemPrefix, Cursor>; 149 + impl ModQueueItemKey { 150 + pub fn new(cursor: Cursor) -> Self { 151 + Self::from_pair(Default::default(), cursor) 152 + } 153 + } 154 + impl From<ModQueueItemKey> for Cursor { 155 + fn from(k: ModQueueItemKey) -> Self { 156 + k.suffix 157 + } 158 + } 159 + 160 + #[derive(Debug, Encode, Decode)] 161 + pub enum ModQueueItemStringValue { 162 + DeleteAccount(String), // did 163 + DeleteRecord(String, String, String), // did, collection, rkey 164 + UpdateRecord(String, String, String, String), // did, collection, rkey, json record 165 + } 166 + impl UseBincodePlz for ModQueueItemStringValue {} 167 + #[derive(Debug, Clone, PartialEq)] 168 + pub enum ModQueueItemValue { 169 + DeleteAccount(Did), 170 + DeleteRecord(Did, Nsid, RecordKey), 171 + UpdateRecord(Did, Nsid, RecordKey, serde_json::Value), 172 + } 173 + impl From<ModQueueItemValue> for ModQueueItemStringValue { 174 + fn from(v: ModQueueItemValue) -> Self { 175 + match v { 176 + ModQueueItemValue::DeleteAccount(did) => { 177 + ModQueueItemStringValue::DeleteAccount(did.to_string()) 178 + } 179 + ModQueueItemValue::DeleteRecord(did, collection, rkey) => { 180 + ModQueueItemStringValue::DeleteRecord( 181 + did.to_string(), 182 + collection.to_string(), 183 + rkey.to_string(), 184 + ) 185 + } 186 + ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 187 + ModQueueItemStringValue::UpdateRecord( 188 + did.to_string(), 189 + collection.to_string(), 190 + rkey.to_string(), 191 + record.to_string(), 192 + ) 193 + } 194 + } 195 + } 196 + } 197 + impl TryFrom<ModQueueItemStringValue> for ModQueueItemValue { 198 + type Error = EncodingError; 199 + fn try_from(v: ModQueueItemStringValue) -> Result<Self, Self::Error> { 200 + match v { 201 + ModQueueItemStringValue::DeleteAccount(did) => Ok(ModQueueItemValue::DeleteAccount( 202 + Did::new(did).map_err(EncodingError::BadAtriumStringType)?, 203 + )), 204 + ModQueueItemStringValue::DeleteRecord(did, collection, rkey) => { 205 + Ok(ModQueueItemValue::DeleteRecord( 206 + Did::new(did).map_err(EncodingError::BadAtriumStringType)?, 207 + Nsid::new(collection).map_err(EncodingError::BadAtriumStringType)?, 208 + RecordKey::new(rkey).map_err(EncodingError::BadAtriumStringType)?, 209 + )) 210 + } 211 + ModQueueItemStringValue::UpdateRecord(did, collection, rkey, record) => { 212 + Ok(ModQueueItemValue::UpdateRecord( 213 + Did::new(did).map_err(EncodingError::BadAtriumStringType)?, 214 + Nsid::new(collection).map_err(EncodingError::BadAtriumStringType)?, 215 + RecordKey::new(rkey).map_err(EncodingError::BadAtriumStringType)?, 216 + record.parse()?, 217 + )) 218 + } 219 + } 220 + } 221 + } 222 + impl DbBytes for ModQueueItemValue { 223 + fn to_db_bytes(&self) -> Result<std::vec::Vec<u8>, EncodingError> { 224 + Into::<ModQueueItemStringValue>::into(self.clone()).to_db_bytes() 225 + } 226 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 227 + let (stringy, n) = ModQueueItemStringValue::from_db_bytes(bytes)?; 228 + let me = TryInto::<ModQueueItemValue>::try_into(stringy)?; 229 + Ok((me, n)) 230 + } 231 + } 232 + 139 233 #[cfg(test)] 140 234 mod test { 141 235 use super::{ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, Nsid, RecordKey};