Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

consumer inserts for new ufos data model

+246 -34
+10
ufos/src/db_types.rs
··· 184 184 } 185 185 } 186 186 187 + /// helper trait: impl on a type to get helpers to implement DbBytes 188 + pub trait SerdeBytes: serde::Serialize + for<'a> serde::Deserialize<'a> { 189 + fn to_bytes(&self) -> Result<Vec<u8>, EncodingError> { 190 + Ok(bincode::serde::encode_to_vec(self, bincode_conf())?) 191 + } 192 + fn from_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 193 + Ok(bincode::serde::decode_from_slice(bytes, bincode_conf())?) 194 + } 195 + } 196 + 187 197 ////// 188 198 189 199 /// Lexicographic-sort-friendly null-terminating serialization for String
+6 -3
ufos/src/lib.rs
··· 36 36 37 37 #[derive(Debug, Clone)] 38 38 pub enum CommitAction { 39 - Put { record: Box<RawValue>, is_update: bool }, 39 + Put(PutAction), 40 40 Cut, 41 41 } 42 + 43 + #[derive(Debug, Clone)] 44 + pub struct PutAction { record: Box<RawValue>, is_update: bool } 42 45 43 46 #[derive(Debug, Clone)] 44 47 pub struct UFOsCommit { ··· 57 60 ) -> Result<(Self, Nsid), FirehoseEventError> { 58 61 let action = match commit.operation { 59 62 CommitOp::Delete => CommitAction::Cut, 60 - cru @ _ => CommitAction::Put { 63 + cru @ _ => CommitAction::Put(PutAction { 61 64 record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?, 62 65 is_update: cru == CommitOp::Update, 63 - } 66 + }) 64 67 }; 65 68 let batched = Self { 66 69 cursor,
+27 -16
ufos/src/main.rs
··· 1 + use ufos::error::StorageError; 2 + use ufos::storage_fjall::StoreWriter; 3 + use ufos::storage_fjall::StorageWhatever; 1 4 use clap::Parser; 2 5 use std::path::PathBuf; 3 - use ufos::{consumer, server, storage_fjall}; 6 + use ufos::{consumer, storage_fjall}; 4 7 5 8 #[cfg(not(target_env = "msvc"))] 6 9 use tikv_jemallocator::Jemalloc; ··· 42 45 env_logger::init(); 43 46 44 47 let args = Args::parse(); 45 - let (storage, cursor) = 46 - storage_fjall::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?; 48 + let jetstream = args.jetstream.clone(); 49 + let (_read_store, mut write_store, cursor) = 50 + storage_fjall::FjallStorage::init(args.data, jetstream, args.jetstream_force)?; 47 51 48 - println!("starting server with storage..."); 49 - let serving = server::serve(storage.clone()); 52 + // println!("starting server with storage..."); 53 + // let serving = server::serve(storage.clone()); 50 54 51 - let t1 = tokio::task::spawn(async { 52 - let r = serving.await; 53 - log::warn!("serving ended with: {r:?}"); 54 - }); 55 + // let t1 = tokio::task::spawn(async { 56 + // let r = serving.await; 57 + // log::warn!("serving ended with: {r:?}"); 58 + // }); 55 59 56 60 let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 57 - let _storage = storage.clone(); 58 61 async move { 59 62 if !args.pause_writer { 60 63 println!( 61 64 "starting consumer with cursor: {cursor:?} from {:?} ago", 62 65 cursor.clone().map(|c| c.elapsed()) 63 66 ); 64 - let batches = 67 + let mut batches = 65 68 consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?; 66 - let r = storage.receive(batches).await; 67 - log::warn!("storage.receive ended with: {r:?}"); 69 + 70 + tokio::task::spawn_blocking(move || { 71 + while let Some(event_batch) = batches.blocking_recv() { 72 + write_store.insert_batch(event_batch)? 73 + } 74 + Ok::<(), StorageError>(()) 75 + }).await??; 76 + 77 + // let r = storage.receive(batches).await; 78 + log::warn!("storage.receive ended with"); 68 79 } else { 69 80 log::info!("not starting jetstream or the write loop."); 70 81 } ··· 87 98 // v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"), 88 99 // }; 89 100 90 - log::trace!("tasks running. waiting."); 91 - t1.await?; 92 - log::trace!("serve task ended."); 101 + // log::trace!("tasks running. waiting."); 102 + // t1.await?; 103 + // log::trace!("serve task ended."); 93 104 t2.await??; 94 105 log::trace!("storage receive task ended."); 95 106 // t3.await?;
+87 -12
ufos/src/storage_fjall.rs
··· 4 4 JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, 5 5 ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, 6 6 RollupCursorKey, RollupCursorValue, SeenCounter, 7 + NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, RecordLocationVal, 8 + LiveRecordsKey, LiveRecordsValue, LiveDidsKey, LiveDidsValue, 9 + DeleteAccountQueueKey, DeleteAccountQueueVal, 7 10 }; 8 11 use crate::{ 9 - DeleteAccount, Did, EventBatch, Nsid, RecordKey, 12 + DeleteAccount, Did, EventBatch, Nsid, RecordKey, CommitAction, 10 13 }; 11 14 use crate::error::StorageError; 12 15 use fjall::{ ··· 69 72 * key: nullstr || nullstr || nullstr (did, collection, rkey) 70 73 * val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 71 74 * 72 - * 73 75 * Partition: 'rollups' 74 76 * 75 77 * - Live (batched) records per collection ··· 96 98 * key: "ever_dids" || u64 || nullstr (estimated cardinality, nsid. like ever_records) 97 99 * val: HLL (estimator) 98 100 * 101 + * 102 + * Partition: 'queues' 103 + * 104 + * - Delete account queue 105 + * key: "delete_acount" || u64 (js_cursor) 106 + * val: nullstr (did) 107 + * 108 + * 99 109 * TODO: moderation actions 100 110 * TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 101 111 **/ 102 112 pub trait StorageWhatever { // TODO: extract this 103 113 fn init( 104 114 path: impl AsRef<Path>, 105 - endpoint: &str, 115 + endpoint: String, 106 116 force_endpoint: bool, 107 - ) -> Result<(impl StoreReader, impl StoreWriter, bool), StorageError> where Self: Sized; 117 + ) -> Result<(impl StoreReader, impl StoreWriter, Option<Cursor>), StorageError> where Self: Sized; 108 118 } 109 119 110 120 pub trait StoreWriter { 111 - fn insert_batch(batch: EventBatch) -> Result<(), StorageError>; 121 + fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError>; 112 122 } 113 123 114 124 pub trait StoreReader: Clone {} ··· 117 127 impl StorageWhatever for FjallStorage { 118 128 fn init( 119 129 path: impl AsRef<Path>, 120 - endpoint: &str, 130 + endpoint: String, 121 131 force_endpoint: bool, 122 - ) -> Result<(impl StoreReader, impl StoreWriter, bool), StorageError> { 123 - let mut fresh = true; 132 + ) -> Result<(impl StoreReader, impl StoreWriter, Option<Cursor>), StorageError> { 124 133 let keyspace = Config::new(path).fsync_ms(Some(4_000)).open()?; 125 134 126 135 let global = keyspace.open_partition("global", PartitionCreateOptions::default())?; 127 136 let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?; 128 137 let records = keyspace.open_partition("records", PartitionCreateOptions::default())?; 129 138 let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?; 139 + let queues = keyspace.open_partition("queues", PartitionCreateOptions::default())?; 130 140 131 141 let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 132 142 133 143 if js_cursor.is_some() { 134 - fresh = false; 135 144 let stored_endpoint = get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 136 145 137 146 let JetstreamEndpointValue(stored) = stored_endpoint ··· 162 171 records: records.clone(), 163 172 rollups: rollups.clone(), 164 173 }; 165 - let writer = FjallWriter { keyspace, global, feeds, records, rollups }; 166 - Ok((reader, writer, fresh)) 174 + let writer = FjallWriter { keyspace, global, feeds, records, rollups, queues }; 175 + Ok((reader, writer, js_cursor)) 167 176 } 168 177 } 169 178 ··· 183 192 feeds: PartitionHandle, 184 193 records: PartitionHandle, 185 194 rollups: PartitionHandle, 195 + queues: PartitionHandle, 186 196 } 187 197 188 198 impl StoreWriter for FjallWriter { 189 - fn insert_batch(_batch: EventBatch) -> Result<(), StorageError> { 199 + fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError> { 200 + let mut batch = self.keyspace.batch(); 201 + 202 + // would be nice not to have to iterate everything at once here 203 + let latest = event_batch.latest_cursor().unwrap(); 204 + 205 + for (nsid, commits) in event_batch.commits_by_nsid { 206 + for commit in commits.commits { 207 + let location_key: RecordLocationKey = (&commit, &nsid).into(); 208 + 209 + match commit.action { 210 + CommitAction::Cut => { 211 + batch.remove(&self.records, &location_key.to_db_bytes()?); 212 + } 213 + CommitAction::Put(put_action) => { 214 + let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor); 215 + let feed_val: NsidRecordFeedVal = (&commit.did, &commit.rkey, commit.rev.as_str()).into(); 216 + batch.insert( 217 + &self.feeds, 218 + feed_key.to_db_bytes()?, 219 + feed_val.to_db_bytes()?, 220 + ); 221 + 222 + let location_val: RecordLocationVal = (commit.cursor, commit.rev.as_str(), put_action).into(); 223 + batch.insert( 224 + &self.records, 225 + &location_key.to_db_bytes()?, 226 + &location_val.to_db_bytes()?, 227 + ); 228 + } 229 + } 230 + } 231 + let live_records_key: LiveRecordsKey = (latest, &nsid).into(); 232 + let live_records_value = LiveRecordsValue(commits.total_seen as u64); 233 + batch.insert( 234 + &self.rollups, 235 + &live_records_key.to_db_bytes()?, 236 + &live_records_value.to_db_bytes()?, 237 + ); 238 + 239 + let live_dids_key: LiveDidsKey = (latest, &nsid).into(); 240 + let live_dids_value = LiveDidsValue(commits.dids_estimate); 241 + batch.insert( 242 + &self.rollups, 243 + &live_dids_key.to_db_bytes()?, 244 + &live_dids_value.to_db_bytes()?, 245 + ); 246 + } 247 + 248 + for remove in event_batch.account_removes { 249 + let queue_key = DeleteAccountQueueKey::new(remove.cursor); 250 + let queue_val: DeleteAccountQueueVal = remove.did; 251 + batch.insert( 252 + &self.queues, 253 + &queue_key.to_db_bytes()?, 254 + &queue_val.to_db_bytes()?, 255 + ); 256 + } 257 + 258 + batch.insert( 259 + &self.global, 260 + DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?, 261 + latest.to_db_bytes()?, 262 + ); 263 + 264 + batch.commit()?; 190 265 Ok(()) 191 266 } 192 267 }
+116 -3
ufos/src/store_types.rs
··· 1 + use cardinality_estimator::CardinalityEstimator; 1 2 use crate::db_types::{ 2 - DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, StaticStr, UseBincodePlz, 3 + DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, StaticStr, UseBincodePlz, SerdeBytes, 3 4 }; 4 - use crate::{Cursor, Did, Nsid, RecordKey}; 5 + use crate::{Cursor, Did, Nsid, RecordKey, UFOsCommit, PutAction}; 5 6 use bincode::{Decode, Encode}; 6 7 use std::ops::Range; 7 8 ··· 59 60 Ok((Self(s), bytes.len())) 60 61 } 61 62 } 63 + 64 + pub type NsidRecordFeedKey = DbConcat<Nsid, Cursor>; 65 + pub type NsidRecordFeedVal = DbConcat<Did, DbConcat<RecordKey, String>>; 66 + impl From<(&Did, &RecordKey, &str)> for NsidRecordFeedVal { 67 + fn from((did, rkey, rev): (&Did, &RecordKey, &str)) -> Self { 68 + Self::from_pair( 69 + did.clone(), 70 + DbConcat::from_pair(rkey.clone(), rev.to_string())) 71 + } 72 + } 73 + 74 + pub type RecordLocationKey = DbConcat<Did, DbConcat<Nsid, RecordKey>>; 75 + impl From<(&UFOsCommit, &Nsid)> for RecordLocationKey { 76 + fn from((commit, collection): (&UFOsCommit, &Nsid)) -> Self { 77 + Self::from_pair(commit.did.clone(), DbConcat::from_pair(collection.clone(), commit.rkey.clone())) 78 + } 79 + } 80 + #[derive(Debug, PartialEq, Encode, Decode)] 81 + pub struct RecordLocationMeta { 82 + pub cursor: u64, // ugh no bincode impl 83 + pub is_update: bool, 84 + pub rev: String, 85 + } 86 + impl UseBincodePlz for RecordLocationMeta {} 87 + 88 + impl DbBytes for Vec<u8> { 89 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 90 + Ok(self.to_vec()) 91 + } 92 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 93 + Ok((bytes.to_owned(), bytes.len())) 94 + } 95 + } 96 + 97 + pub type RecordLocationVal = DbConcat<RecordLocationMeta, Vec<u8>>; 98 + impl From<(Cursor, &str, PutAction)> for RecordLocationVal { 99 + fn from((cursor, rev, put): (Cursor, &str, PutAction)) -> Self { 100 + let meta = RecordLocationMeta { 101 + cursor: cursor.to_raw_u64(), 102 + is_update: put.is_update, 103 + rev: rev.to_string(), 104 + }; 105 + Self::from_pair(meta, put.record.get().into()) 106 + } 107 + } 108 + 109 + #[derive(Debug, PartialEq)] 110 + pub struct _LiveRecordsStaticStr {} 111 + impl StaticStr for _LiveRecordsStaticStr { 112 + fn static_str() -> &'static str { 113 + "live_records" 114 + } 115 + } 116 + type LiveRecordsStaticPrefix = DbStaticStr<_LiveRecordsStaticStr>; 117 + pub type LiveRecordsKey = DbConcat<LiveRecordsStaticPrefix, DbConcat<Cursor, Nsid>>; 118 + impl From<(Cursor, &Nsid)> for LiveRecordsKey { 119 + fn from((cursor, collection): (Cursor, &Nsid)) -> Self { 120 + Self::from_pair( 121 + Default::default(), 122 + DbConcat::from_pair(cursor, collection.clone()), 123 + ) 124 + } 125 + } 126 + #[derive(Debug, PartialEq, Decode, Encode)] 127 + pub struct LiveRecordsValue(pub u64); 128 + impl UseBincodePlz for LiveRecordsValue {} 129 + 130 + #[derive(Debug, PartialEq)] 131 + pub struct _LiveDidsStaticStr {} 132 + impl StaticStr for _LiveDidsStaticStr { 133 + fn static_str() -> &'static str { 134 + "live_dids" 135 + } 136 + } 137 + pub type LiveDidsStaticPrefix = DbStaticStr<_LiveDidsStaticStr>; 138 + pub type LiveDidsKey = DbConcat<LiveDidsStaticPrefix, DbConcat<Cursor, Nsid>>; 139 + impl From<(Cursor, &Nsid)> for LiveDidsKey { 140 + fn from((cursor, collection): (Cursor, &Nsid)) -> Self { 141 + Self::from_pair( 142 + Default::default(), 143 + DbConcat::from_pair(cursor, collection.clone()), 144 + ) 145 + } 146 + } 147 + #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] 148 + pub struct LiveDidsValue(pub CardinalityEstimator::<Did>); 149 + impl SerdeBytes for LiveDidsValue {} 150 + impl DbBytes for LiveDidsValue { 151 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 152 + SerdeBytes::to_bytes(self) 153 + } 154 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 155 + SerdeBytes::from_bytes(bytes) 156 + } 157 + } 158 + 159 + #[derive(Debug, PartialEq)] 160 + pub struct _DeleteAccountStaticStr {} 161 + impl StaticStr for _DeleteAccountStaticStr { 162 + fn static_str() -> &'static str { 163 + "delete_acount" 164 + } 165 + } 166 + pub type DeleteAccountStaticPrefix = DbStaticStr<_DeleteAccountStaticStr>; 167 + pub type DeleteAccountQueueKey = DbConcat<DeleteAccountStaticPrefix, Cursor>; 168 + impl DeleteAccountQueueKey { 169 + pub fn new(cursor: Cursor) -> Self { 170 + Self::from_pair(Default::default(), cursor) 171 + } 172 + } 173 + pub type DeleteAccountQueueVal = Did; 174 + 62 175 63 176 #[derive(Debug, Clone, Encode, Decode)] 64 177 pub struct SeenCounter(pub u64); ··· 303 416 } 304 417 } 305 418 impl DbBytes for ModQueueItemValue { 306 - fn to_db_bytes(&self) -> Result<std::vec::Vec<u8>, EncodingError> { 419 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 307 420 Into::<ModQueueItemStringValue>::into(self.clone()).to_db_bytes() 308 421 } 309 422 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {