Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

pass in time bounds and remove memstore

idk why these are the same commit, except that i'm finally annoyed enough at stubbing the memstore to remove it. it wasn't ever that real since it was just a hacked copy of the fjall stuff.

+83 -1838
-1
ufos/src/lib.rs
··· 6 6 pub mod server; 7 7 pub mod storage; 8 8 pub mod storage_fjall; 9 - pub mod storage_mem; 10 9 pub mod store_types; 11 10 12 11 use crate::error::BatchInsertError;
+17 -57
ufos/src/main.rs
··· 7 7 use ufos::server; 8 8 use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 9 9 use ufos::storage_fjall::FjallStorage; 10 - use ufos::storage_mem::MemStorage; 11 10 use ufos::store_types::SketchSecretPrefix; 12 11 use ufos::{nice_duration, ConsumerInfo}; 13 12 ··· 19 18 static GLOBAL: Jemalloc = Jemalloc; 20 19 21 20 /// Aggregate links in the at-mosphere 22 - #[derive(Parser, Debug)] 21 + #[derive(Parser, Debug, Clone)] 23 22 #[command(version, about, long_about = None)] 24 23 struct Args { 25 24 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: ··· 47 46 /// todo: restore this 48 47 #[arg(long, action)] 49 48 pause_rw: bool, 50 - /// DEBUG: use an in-memory store instead of fjall 51 - #[arg(long, action)] 52 - in_mem: bool, 53 49 /// reset the rollup cursor, scrape through missed things in the past (backfill) 54 50 #[arg(long, action)] 55 51 reroll: bool, ··· 64 60 65 61 let args = Args::parse(); 66 62 let jetstream = args.jetstream.clone(); 67 - if args.in_mem { 68 - let (read_store, write_store, cursor, sketch_secret) = MemStorage::init( 69 - args.data, 70 - jetstream, 71 - args.jetstream_force, 72 - Default::default(), 73 - )?; 74 - go( 75 - args.jetstream, 76 - args.jetstream_fixture, 77 - args.pause_writer, 78 - args.backfill, 79 - args.reroll, 80 - read_store, 81 - write_store, 82 - cursor, 83 - sketch_secret, 84 - ) 85 - .await?; 86 - } else { 87 - let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init( 88 - args.data, 89 - jetstream, 90 - args.jetstream_force, 91 - Default::default(), 92 - )?; 93 - go( 94 - args.jetstream, 95 - args.jetstream_fixture, 96 - args.pause_writer, 97 - args.backfill, 98 - args.reroll, 99 - read_store, 100 - write_store, 101 - cursor, 102 - sketch_secret, 103 - ) 104 - .await?; 105 - } 106 - 63 + let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init( 64 + args.data.clone(), 65 + jetstream, 66 + args.jetstream_force, 67 + Default::default(), 68 + )?; 69 + go(args, read_store, write_store, cursor, sketch_secret).await?; 107 70 Ok(()) 108 71 } 109 72 110 - #[allow(clippy::too_many_arguments)] 111 73 async fn go<B: StoreBackground>( 112 - jetstream: String, 113 - jetstream_fixture: bool, 114 - pause_writer: bool, 115 - backfill: bool, 116 - reroll: bool, 74 + args: Args, 117 75 read_store: impl StoreReader + 'static + Clone, 118 76 mut write_store: impl StoreWriter<B> + 'static, 119 77 cursor: Option<Cursor>, ··· 122 80 println!("starting server with storage..."); 123 81 let serving = server::serve(read_store.clone()); 124 82 125 - if pause_writer { 83 + if args.pause_writer { 126 84 log::info!("not starting jetstream or the write loop."); 127 85 serving.await.map_err(|e| anyhow::anyhow!(e))?; 128 86 return Ok(()); 129 87 } 130 88 131 - let batches = if jetstream_fixture { 132 - log::info!("starting with jestream file fixture: {jetstream:?}"); 133 - file_consumer::consume(jetstream.into(), sketch_secret, cursor).await? 89 + let batches = if args.jetstream_fixture { 90 + log::info!("starting with jestream file fixture: {:?}", args.jetstream); 91 + file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await? 134 92 } else { 135 93 log::info!( 136 94 "starting consumer with cursor: {cursor:?} from {:?} ago", 137 95 cursor.map(|c| c.elapsed()) 138 96 ); 139 - consumer::consume(&jetstream, cursor, false, sketch_secret).await? 97 + consumer::consume(&args.jetstream, cursor, false, sketch_secret).await? 140 98 }; 141 99 142 - let rolling = write_store.background_tasks(reroll)?.run(backfill); 100 + let rolling = write_store 101 + .background_tasks(args.reroll)? 102 + .run(args.backfill); 143 103 let storing = write_store.receive_batches(batches); 144 104 145 105 let stating = do_update_stuff(read_store);
+8 -3
ufos/src/server/mod.rs
··· 232 232 let q = query.into_inner(); 233 233 let collections: HashSet<Nsid> = collections_query.try_into()?; 234 234 235 - let _since = q.since.map(dt_to_cursor).transpose()?; 236 - let _until = q.until.map(dt_to_cursor).transpose()?; 235 + let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 236 + let week_ago_secs = 7 * 86_400; 237 + let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 238 + Cursor::at(week_ago).into() 239 + }); 240 + 241 + let until = q.until.map(dt_to_cursor).transpose()?; 237 242 238 243 let mut seen_by_collection = HashMap::with_capacity(collections.len()); 239 244 240 245 for collection in &collections { 241 246 let (total_creates, dids_estimate) = storage 242 - .get_counts_by_collection(collection) 247 + .get_counts_by_collection(collection, since, until) 243 248 .await 244 249 .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 245 250
+6 -1
ufos/src/storage.rs
··· 92 92 step: u64, 93 93 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>; 94 94 95 - async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>; 95 + async fn get_counts_by_collection( 96 + &self, 97 + collection: &Nsid, 98 + since: HourTruncatedCursor, 99 + until: Option<HourTruncatedCursor>, 100 + ) -> StorageResult<(u64, u64)>; 96 101 97 102 async fn get_records_by_collections( 98 103 &self,
+52 -19
ufos/src/storage_fjall.rs
··· 715 715 Ok((output_hours, output_series)) 716 716 } 717 717 718 - fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 718 + fn get_counts_by_collection( 719 + &self, 720 + collection: &Nsid, 721 + _since: HourTruncatedCursor, 722 + _until: Option<HourTruncatedCursor>, 723 + ) -> StorageResult<(u64, u64)> { 719 724 // 0. grab a snapshot in case rollups happen while we're working 720 725 let instant = self.keyspace.instant(); 721 726 let global = self.global.snapshot_at(instant); ··· 840 845 }) 841 846 .await? 842 847 } 843 - async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 848 + async fn get_counts_by_collection( 849 + &self, 850 + collection: &Nsid, 851 + since: HourTruncatedCursor, 852 + until: Option<HourTruncatedCursor>, 853 + ) -> StorageResult<(u64, u64)> { 844 854 let s = self.clone(); 845 855 let collection = collection.clone(); 846 - tokio::task::spawn_blocking(move || FjallReader::get_counts_by_collection(&s, &collection)) 847 - .await? 856 + tokio::task::spawn_blocking(move || { 857 + FjallReader::get_counts_by_collection(&s, &collection, since, until) 858 + }) 859 + .await? 848 860 } 849 861 async fn get_records_by_collections( 850 862 &self, ··· 1485 1497 } 1486 1498 1487 1499 const TEST_BATCH_LIMIT: usize = 16; 1500 + fn beginning() -> HourTruncatedCursor { 1501 + Cursor::from_start().into() 1502 + } 1488 1503 1489 1504 #[derive(Debug, Default)] 1490 1505 struct TestBatch { ··· 1622 1637 fn test_hello() -> anyhow::Result<()> { 1623 1638 let (read, mut write) = fjall_db(); 1624 1639 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1625 - let (records, dids) = 1626 - read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?; 1640 + let (records, dids) = read.get_counts_by_collection( 1641 + &Nsid::new("a.b.c".to_string()).unwrap(), 1642 + beginning(), 1643 + None, 1644 + )?; 1627 1645 assert_eq!(records, 0); 1628 1646 assert_eq!(dids, 0); 1629 1647 Ok(()) ··· 1645 1663 ); 1646 1664 write.insert_batch(batch.batch)?; 1647 1665 1648 - let (records, dids) = read.get_counts_by_collection(&collection)?; 1666 + let (records, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1649 1667 assert_eq!(records, 1); 1650 1668 assert_eq!(dids, 1); 1651 - let (records, dids) = 1652 - read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?; 1669 + let (records, dids) = read.get_counts_by_collection( 1670 + &Nsid::new("d.e.f".to_string()).unwrap(), 1671 + beginning(), 1672 + None, 1673 + )?; 1653 1674 assert_eq!(records, 0); 1654 1675 assert_eq!(dids, 0); 1655 1676 ··· 1816 1837 ); 1817 1838 write.insert_batch(batch.batch)?; 1818 1839 1819 - let (records, dids) = read.get_counts_by_collection(&collection)?; 1840 + let (records, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1820 1841 assert_eq!(records, 1); 1821 1842 assert_eq!(dids, 1); 1822 1843 ··· 1854 1875 ); 1855 1876 write.insert_batch(batch.batch)?; 1856 1877 1857 - let (creates, dids) = read.get_counts_by_collection(&collection)?; 1878 + let (creates, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1858 1879 assert_eq!(creates, 1); 1859 1880 assert_eq!(dids, 1); 1860 1881 ··· 2170 2191 write.insert_batch(batch.batch)?; 2171 2192 2172 2193 // before any rollup 2173 - let (records, dids) = 2174 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 2194 + let (records, dids) = read.get_counts_by_collection( 2195 + &Nsid::new("a.a.a".to_string()).unwrap(), 2196 + beginning(), 2197 + None, 2198 + )?; 2175 2199 assert_eq!(records, 3); 2176 2200 assert_eq!(dids, 2); 2177 2201 ··· 2179 2203 let (n, _) = write.step_rollup()?; 2180 2204 assert_eq!(n, 1); 2181 2205 2182 - let (records, dids) = 2183 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 2206 + let (records, dids) = read.get_counts_by_collection( 2207 + &Nsid::new("a.a.a".to_string()).unwrap(), 2208 + beginning(), 2209 + None, 2210 + )?; 2184 2211 assert_eq!(records, 3); 2185 2212 assert_eq!(dids, 2); 2186 2213 ··· 2188 2215 let (n, _) = write.step_rollup()?; 2189 2216 assert_eq!(n, 1); 2190 2217 2191 - let (records, dids) = 2192 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 2218 + let (records, dids) = read.get_counts_by_collection( 2219 + &Nsid::new("a.a.a".to_string()).unwrap(), 2220 + beginning(), 2221 + None, 2222 + )?; 2193 2223 assert_eq!(records, 3); 2194 2224 assert_eq!(dids, 2); 2195 2225 ··· 2197 2227 let (n, _) = write.step_rollup()?; 2198 2228 assert_eq!(n, 1); 2199 2229 2200 - let (records, dids) = 2201 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 2230 + let (records, dids) = read.get_counts_by_collection( 2231 + &Nsid::new("a.a.a".to_string()).unwrap(), 2232 + beginning(), 2233 + None, 2234 + )?; 2202 2235 assert_eq!(records, 3); 2203 2236 assert_eq!(dids, 2); 2204 2237
-1757
ufos/src/storage_mem.rs
··· 1 - use std::ops::Bound; 2 - use std::sync::Arc; 3 - 4 - use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr}; 5 - use crate::error::StorageError; 6 - use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 7 - use crate::store_types::{ 8 - AllTimeRollupKey, CommitCounts, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 9 - HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, 10 - JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 11 - NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 12 - RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey, 13 - TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, WithCollection, 14 - }; 15 - use crate::{ 16 - CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord, 17 - }; 18 - use async_trait::async_trait; 19 - use jetstream::events::Cursor; 20 - use lsm_tree::range::prefix_to_range; 21 - use std::collections::{BTreeMap, HashMap, HashSet}; 22 - use std::path::Path; 23 - use std::sync::{Mutex, RwLock}; 24 - use std::time::SystemTime; 25 - 26 - const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds 27 - const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 28 - const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 29 - 30 - /// 31 - /// new data format, roughly: 32 - /// 33 - /// Partion: 'global' 34 - /// 35 - /// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 36 - /// - key: "js_cursor" (literal) 37 - /// - val: u64 38 - /// 39 - /// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 40 - /// - key: "js_endpoint" (literal) 41 - /// - val: string (URL of the instance) 42 - /// 43 - /// - Launch date 44 - /// - key: "takeoff" (literal) 45 - /// - val: u64 (micros timestamp, not from jetstream for now so not precise) 46 - /// 47 - /// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 48 - /// - key: "rollup_cursor" (literal) 49 - /// - val: u64 (tracks behind js_cursor) 50 - /// 51 - /// 52 - /// Partition: 'feed' 53 - /// 54 - /// - Per-collection list of record references ordered by jetstream cursor 55 - /// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor) 56 - /// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.) 57 - /// 58 - /// 59 - /// Partition: 'records' 60 - /// 61 - /// - Actual records by their atproto location 62 - /// - key: nullstr || nullstr || nullstr (did, collection, rkey) 63 - /// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 64 - /// 65 - /// 66 - /// Partition: 'rollups' 67 - /// 68 - /// - Live (batched) records counts and dids estimate per collection 69 - /// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 70 - /// - val: u64 || HLL (count (not cursor), estimator) 71 - /// 72 - /// - Hourly total record counts and dids estimate per collection 73 - /// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 74 - /// - val: u64 || HLL (count (not cursor), estimator) 75 - /// 76 - /// - Weekly total record counts and dids estimate per collection 77 - /// - key: "weekly_counts" || u64 || nullstr (hour, nsid) 78 - /// - val: u64 || HLL (count (not cursor), estimator) 79 - /// 80 - /// - All-time total record counts and dids estimate per collection 81 - /// - key: "ever_counts" || nullstr (nsid) 82 - /// - val: u64 || HLL (count (not cursor), estimator) 83 - /// 84 - /// - TODO: sorted indexes for all-times? 85 - /// 86 - /// 87 - /// Partition: 'queues' 88 - /// 89 - /// - Delete account queue 90 - /// - key: "delete_acount" || u64 (js_cursor) 91 - /// - val: nullstr (did) 92 - /// 93 - /// 94 - /// TODO: moderation actions 95 - /// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 96 - #[derive(Debug)] 97 - pub struct MemStorage {} 98 - 99 - #[derive(Debug, Default)] 100 - pub struct MemConfig { 101 - /// drop the db when the storage is dropped 102 - /// 103 - /// this is only meant for tests 104 - #[cfg(test)] 105 - pub temp: bool, 106 - } 107 - 108 - //////////// 109 - //////////// 110 - //////////// 111 - //////////// 112 - //////////// 113 - //////////// 114 - 115 - struct BatchSentinel {} 116 - 117 - #[derive(Clone)] 118 - struct MemKeyspace { 119 - keyspace_guard: Arc<RwLock<BatchSentinel>>, 120 - } 121 - 122 - impl MemKeyspace { 123 - pub fn open() -> Self { 124 - Self { 125 - keyspace_guard: Arc::new(RwLock::new(BatchSentinel {})), 126 - } 127 - } 128 - pub fn open_partition(&self, _name: &str) -> StorageResult<MemPartion> { 129 - Ok(MemPartion { 130 - // name: name.to_string(), 131 - keyspace_guard: self.keyspace_guard.clone(), 132 - contents: Default::default(), 133 - }) 134 - } 135 - pub fn batch(&self) -> MemBatch { 136 - MemBatch { 137 - keyspace_guard: self.keyspace_guard.clone(), 138 - tasks: Vec::new(), 139 - } 140 - } 141 - pub fn instant(&self) -> u64 { 142 - 1 143 - } 144 - } 145 - 146 - enum BatchTask { 147 - Insert { 148 - p: MemPartion, 149 - key: Vec<u8>, 150 - val: Vec<u8>, 151 - }, 152 - Remove { 153 - p: MemPartion, 154 - key: Vec<u8>, 155 - }, 156 - } 157 - struct MemBatch { 158 - keyspace_guard: Arc<RwLock<BatchSentinel>>, 159 - tasks: Vec<BatchTask>, 160 - } 161 - impl MemBatch { 162 - pub fn insert(&mut self, p: &MemPartion, key: &[u8], val: &[u8]) { 163 - self.tasks.push(BatchTask::Insert { 164 - p: p.clone(), 165 - key: key.to_vec(), 166 - val: val.to_vec(), 167 - }); 168 - } 169 - pub fn remove(&mut self, p: &MemPartion, key: &[u8]) { 170 - self.tasks.push(BatchTask::Remove { 171 - p: p.clone(), 172 - key: key.to_vec(), 173 - }); 174 - } 175 - pub fn len(&self) -> usize { 176 - self.tasks.len() 177 - } 178 - pub fn commit(&mut self) -> StorageResult<()> { 179 - let _guard = self.keyspace_guard.write().unwrap(); 180 - for task in &mut self.tasks { 181 - match task { 182 - BatchTask::Insert { p, key, val } => p 183 - .contents 184 - .try_lock() 185 - .unwrap() 186 - .insert(key.to_vec(), val.to_vec()), 187 - BatchTask::Remove { p, key } => p.contents.try_lock().unwrap().remove(key), 188 - }; 189 - } 190 - Ok(()) 191 - } 192 - } 193 - 194 - #[derive(Clone)] 195 - struct MemPartion { 196 - // name: String, 197 - keyspace_guard: Arc<RwLock<BatchSentinel>>, 198 - contents: Arc<Mutex<BTreeMap<Vec<u8>, Vec<u8>>>>, 199 - } 200 - impl MemPartion { 201 - pub fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>> { 202 - let _guard = self.keyspace_guard.read().unwrap(); 203 - Ok(self.contents.lock().unwrap().get(key).cloned()) 204 - } 205 - pub fn prefix(&self, pre: &[u8]) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> { 206 - // let prefix_bytes = prefix.to_db_bytes()?; 207 - let (_, Bound::Excluded(range_end)) = prefix_to_range(pre) else { 208 - panic!("bad range thing"); 209 - }; 210 - 211 - return self.range(pre.to_vec()..range_end.to_vec()); 212 - } 213 - pub fn range(&self, r: std::ops::Range<Vec<u8>>) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> { 214 - let _guard = self.keyspace_guard.read().unwrap(); 215 - self.contents 216 - .lock() 217 - .unwrap() 218 - .range(r) 219 - .map(|(k, v)| Ok((k.clone(), v.clone()))) 220 - .collect() 221 - } 222 - pub fn insert(&self, key: &[u8], val: &[u8]) -> StorageResult<()> { 223 - let _guard = self.keyspace_guard.read().unwrap(); 224 - self.contents 225 - .lock() 226 - .unwrap() 227 - .insert(key.to_vec(), val.to_vec()); 228 - Ok(()) 229 - } 230 - // pub fn remove(&self, key: &[u8]) -> StorageResult<()> { 231 - // let _guard = self.keyspace_guard.read().unwrap(); 232 - // self.contents 233 - // .lock() 234 - // .unwrap() 235 - // .remove(key); 236 - // Ok(()) 237 - // } 238 - pub fn snapshot_at(&self, _instant: u64) -> Self { 239 - self.clone() 240 - } 241 - pub fn snapshot(&self) -> Self { 242 - self.clone() 243 - } 244 - } 245 - 246 - //////////// 247 - //////////// 248 - //////////// 249 - //////////// 250 - //////////// 251 - //////////// 252 - 253 - impl StorageWhatever<MemReader, MemWriter, MemBackground, MemConfig> for MemStorage { 254 - fn init( 255 - _path: impl AsRef<Path>, 256 - endpoint: String, 257 - force_endpoint: bool, 258 - _config: MemConfig, 259 - ) -> StorageResult<(MemReader, MemWriter, Option<Cursor>, SketchSecretPrefix)> { 260 - let keyspace = MemKeyspace::open(); 261 - 262 - let global = keyspace.open_partition("global")?; 263 - let feeds = keyspace.open_partition("feeds")?; 264 - let records = keyspace.open_partition("records")?; 265 - let rollups = keyspace.open_partition("rollups")?; 266 - let queues = keyspace.open_partition("queues")?; 267 - 268 - let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 269 - 270 - if js_cursor.is_some() { 271 - let stored_endpoint = 272 - get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 273 - 274 - let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError( 275 - "found cursor but missing js_endpoint, refusing to start.".to_string(), 276 - ))?; 277 - 278 - if stored != endpoint { 279 - if force_endpoint { 280 - log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 281 - insert_static_neu::<JetstreamEndpointKey>( 282 - &global, 283 - JetstreamEndpointValue(endpoint.to_string()), 284 - )?; 285 - } else { 286 - return Err(StorageError::InitError(format!( 287 - "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start."))); 288 - } 289 - } 290 - } else { 291 - insert_static_neu::<JetstreamEndpointKey>( 292 - &global, 293 - JetstreamEndpointValue(endpoint.to_string()), 294 - )?; 295 - insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?; 296 - insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?; 297 - } 298 - 299 - let reader = MemReader { 300 - keyspace: keyspace.clone(), 301 - global: global.clone(), 302 - feeds: feeds.clone(), 303 - records: records.clone(), 304 - rollups: rollups.clone(), 305 - }; 306 - let writer = MemWriter { 307 - keyspace, 308 - global, 309 - feeds, 310 - records, 311 - rollups, 312 - queues, 313 - }; 314 - let secret_prefix = [0u8; 16]; // in-mem store is always deterministic: no secret 315 - Ok((reader, writer, js_cursor, secret_prefix)) 316 - } 317 - } 318 - 319 - type MemRKV = StorageResult<(Vec<u8>, Vec<u8>)>; 320 - 321 - #[derive(Clone)] 322 - pub struct MemReader { 323 - keyspace: MemKeyspace, 324 - global: MemPartion, 325 - feeds: MemPartion, 326 - records: MemPartion, 327 - rollups: MemPartion, 328 - } 329 - 330 - /// An iterator that knows how to skip over deleted/invalidated records 331 - struct RecordIterator { 332 - db_iter: Box<dyn Iterator<Item = MemRKV>>, 333 - records: MemPartion, 334 - limit: usize, 335 - fetched: usize, 336 - } 337 - impl RecordIterator { 338 - pub fn new( 339 - feeds: &MemPartion, 340 - records: MemPartion, 341 - collection: &Nsid, 342 - limit: usize, 343 - ) -> StorageResult<Self> { 344 - let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 345 - let db_iter = feeds.prefix(&prefix).into_iter().rev(); 346 - Ok(Self { 347 - db_iter: Box::new(db_iter), 348 - records, 349 - limit, 350 - fetched: 0, 351 - }) 352 - } 353 - fn get_record(&self, db_next: MemRKV) -> StorageResult<Option<UFOsRecord>> { 354 - let (key_bytes, val_bytes) = db_next?; 355 - let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 356 - let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 357 - let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 358 - 359 - let Some(location_val_bytes) = self.records.get(&location_key.to_db_bytes()?)? else { 360 - // record was deleted (hopefully) 361 - return Ok(None); 362 - }; 363 - 364 - let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 365 - 366 - if meta.cursor() != feed_key.cursor() { 367 - // older/different version 368 - return Ok(None); 369 - } 370 - if meta.rev != feed_val.rev() { 371 - // weird... 372 - log::warn!("record lookup: cursor match but rev did not...? excluding."); 373 - return Ok(None); 374 - } 375 - let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 376 - log::warn!( 377 - "record lookup: found record but could not get bytes to decode the record??" 378 - ); 379 - return Ok(None); 380 - }; 381 - let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 382 - Ok(Some(UFOsRecord { 383 - collection: feed_key.collection().clone(), 384 - cursor: feed_key.cursor(), 385 - did: feed_val.did().clone(), 386 - rkey: feed_val.rkey().clone(), 387 - rev: meta.rev.to_string(), 388 - record: rawval.try_into()?, 389 - is_update: meta.is_update, 390 - })) 391 - } 392 - } 393 - impl Iterator for RecordIterator { 394 - type Item = StorageResult<Option<UFOsRecord>>; 395 - fn next(&mut self) -> Option<Self::Item> { 396 - if self.fetched == self.limit { 397 - return Some(Ok(None)); 398 - } 399 - let record = loop { 400 - let db_next = self.db_iter.next()?; // None short-circuits here 401 - match self.get_record(db_next) { 402 - Err(e) => return Some(Err(e)), 403 - Ok(Some(record)) => break record, 404 - Ok(None) => continue, 405 - } 406 - }; 407 - self.fetched += 1; 408 - Some(Ok(Some(record))) 409 - } 410 - } 411 - 412 - impl MemReader { 413 - fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 414 - let rollup_cursor = 415 - get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 416 - .map(|c| c.to_raw_u64()); 417 - 418 - Ok(serde_json::json!({ 419 - "rollup_cursor": rollup_cursor, 420 - })) 421 - } 422 - 423 - fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 424 - let global = self.global.snapshot(); 425 - 426 - let endpoint = 427 - get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)? 428 - .ok_or(StorageError::BadStateError( 429 - "Could not find jetstream endpoint".to_string(), 430 - ))? 431 - .0; 432 - 433 - let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)? 434 - .ok_or(StorageError::BadStateError( 435 - "Could not find jetstream takeoff time".to_string(), 436 - ))? 437 - .to_raw_u64(); 438 - 439 - let latest_cursor = 440 - get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)? 441 - .map(|c| c.to_raw_u64()); 442 - 443 - let rollup_cursor = 444 - get_snapshot_static_neu::<NewRollupCursorKey, JetstreamCursorValue>(&global)? 445 - .map(|c| c.to_raw_u64()); 446 - 447 - Ok(ConsumerInfo::Jetstream { 448 - endpoint, 449 - started_at, 450 - latest_cursor, 451 - rollup_cursor, 452 - }) 453 - } 454 - 455 - fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 456 - // 0. grab a snapshot in case rollups happen while we're working 457 - let instant = self.keyspace.instant(); 458 - let global = self.global.snapshot_at(instant); 459 - let rollups = self.rollups.snapshot_at(instant); 460 - 461 - // 1. all-time counts 462 - let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?; 463 - let mut total_counts = rollups 464 - .get(&all_time_key)? 465 - .as_deref() 466 - .map(db_complete::<CountsValue>) 467 - .transpose()? 468 - .unwrap_or_default(); 469 - 470 - // 2. live counts that haven't been rolled into all-time yet. 471 - let rollup_cursor = 472 - get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or( 473 - StorageError::BadStateError("Could not find current rollup cursor".to_string()), 474 - )?; 475 - 476 - let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 477 - for kv in rollups.range(full_range) { 478 - let (key_bytes, val_bytes) = kv?; 479 - let key = db_complete::<LiveCountsKey>(&key_bytes)?; 480 - if key.collection() == collection { 481 - let counts = db_complete::<CountsValue>(&val_bytes)?; 482 - total_counts.merge(&counts); 483 - } 484 - } 485 - Ok(( 486 - total_counts.counts().creates, 487 - total_counts.dids().estimate() as u64, 488 - )) 489 - } 490 - 491 - fn get_records_by_collections( 492 - &self, 493 - collections: HashSet<Nsid>, 494 - limit: usize, 495 - _expand_each_collection: bool, 496 - ) -> StorageResult<Vec<UFOsRecord>> { 497 - if collections.is_empty() { 498 - return Ok(vec![]); 499 - } 500 - let mut record_iterators = Vec::new(); 501 - for collection in collections { 502 - let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?; 503 - record_iterators.push(iter.peekable()); 504 - } 505 - let mut merged = Vec::new(); 506 - loop { 507 - let mut latest: Option<(Cursor, usize)> = None; // ugh 508 - for (i, iter) in record_iterators.iter_mut().enumerate() { 509 - let Some(it) = iter.peek_mut() else { 510 - continue; 511 - }; 512 - let it = match it { 513 - Ok(v) => v, 514 - Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 515 - }; 516 - let Some(rec) = it else { 517 - break; 518 - }; 519 - if let Some((cursor, _)) = latest { 520 - if rec.cursor > cursor { 521 - latest = Some((rec.cursor, i)) 522 - } 523 - } else { 524 - latest = Some((rec.cursor, i)); 525 - } 526 - } 527 - let Some((_, idx)) = latest else { 528 - break; 529 - }; 530 - // yeah yeah whateverrrrrrrrrrrrrrrr 531 - merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 532 - } 533 - Ok(merged) 534 - } 535 - } 536 - 537 - #[async_trait] 538 - impl StoreReader for MemReader { 539 - fn name(&self) -> String { 540 - "in-memory store".into() 541 - } 542 - async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 543 - let s = self.clone(); 544 - tokio::task::spawn_blocking(move || MemReader::get_storage_stats(&s)).await? 545 - } 546 - async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 547 - let s = self.clone(); 548 - tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await? 549 - } 550 - async fn get_collections( 551 - &self, 552 - _: usize, 553 - _: OrderCollectionsBy, 554 - _: Option<HourTruncatedCursor>, 555 - _: Option<HourTruncatedCursor>, 556 - ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 557 - todo!() 558 - } 559 - async fn get_timeseries( 560 - &self, 561 - _: Vec<Nsid>, 562 - _: HourTruncatedCursor, 563 - _: Option<HourTruncatedCursor>, 564 - _: u64, 565 - ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)> { 566 - todo!() 567 - } 568 - async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 569 - let s = self.clone(); 570 - let collection = collection.clone(); 571 - tokio::task::spawn_blocking(move || MemReader::get_counts_by_collection(&s, &collection)) 572 - .await? 573 - } 574 - async fn get_records_by_collections( 575 - &self, 576 - collections: HashSet<Nsid>, 577 - limit: usize, 578 - expand_each_collection: bool, 579 - ) -> StorageResult<Vec<UFOsRecord>> { 580 - let s = self.clone(); 581 - tokio::task::spawn_blocking(move || { 582 - MemReader::get_records_by_collections(&s, collections, limit, expand_each_collection) 583 - }) 584 - .await? 585 - } 586 - } 587 - 588 - pub struct MemWriter { 589 - keyspace: MemKeyspace, 590 - global: MemPartion, 591 - feeds: MemPartion, 592 - records: MemPartion, 593 - rollups: MemPartion, 594 - queues: MemPartion, 595 - } 596 - 597 - impl MemWriter { 598 - fn rollup_delete_account( 599 - &mut self, 600 - cursor: Cursor, 601 - key_bytes: &[u8], 602 - val_bytes: &[u8], 603 - ) -> StorageResult<usize> { 604 - let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?; 605 - self.delete_account(&did)?; 606 - let mut batch = self.keyspace.batch(); 607 - batch.remove(&self.queues, key_bytes); 608 - insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?; 609 - batch.commit()?; 610 - Ok(1) 611 - } 612 - 613 - fn rollup_live_counts( 614 - &mut self, 615 - timelies: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), StorageError>>, 616 - cursor_exclusive_limit: Option<Cursor>, 617 - rollup_limit: usize, 618 - ) -> StorageResult<usize> { 619 - // current strategy is to buffer counts in mem before writing the rollups 620 - // we *could* read+write every single batch to rollup.. but their merge is associative so 621 - // ...so save the db some work up front? is this worth it? who knows... 622 - 623 - log::warn!("sup!!!"); 624 - 625 - #[derive(Eq, Hash, PartialEq)] 626 - enum Rollup { 627 - Hourly(HourTruncatedCursor), 628 - Weekly(WeekTruncatedCursor), 629 - AllTime, 630 - } 631 - 632 - let mut batch = self.keyspace.batch(); 633 - let mut cursors_advanced = 0; 634 - let mut last_cursor = Cursor::from_start(); 635 - let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 636 - 637 - log::warn!("about to loop...."); 638 - for (i, kv) in timelies.enumerate() { 639 - log::warn!("loop {i} {kv:?}..."); 640 - if i >= rollup_limit { 641 - break; 642 - } 643 - 644 - let (key_bytes, val_bytes) = kv?; 645 - let key = db_complete::<LiveCountsKey>(&key_bytes) 646 - .inspect_err(|e| log::warn!("rlc: key: {e:?}"))?; 647 - 648 - if cursor_exclusive_limit 649 - .map(|limit| key.cursor() > limit) 650 - .unwrap_or(false) 651 - { 652 - break; 653 - } 654 - 655 - batch.remove(&self.rollups, &key_bytes); 656 - let val = db_complete::<CountsValue>(&val_bytes) 657 - .inspect_err(|e| log::warn!("rlc: val: {e:?}"))?; 658 - counts_by_rollup 659 - .entry(( 660 - key.collection().clone(), 661 - Rollup::Hourly(key.cursor().into()), 662 - )) 663 - .or_default() 664 - .merge(&val); 665 - counts_by_rollup 666 - .entry(( 667 - key.collection().clone(), 668 - Rollup::Weekly(key.cursor().into()), 669 - )) 670 - .or_default() 671 - .merge(&val); 672 - counts_by_rollup 673 - .entry((key.collection().clone(), Rollup::AllTime)) 674 - .or_default() 675 - .merge(&val); 676 - 677 - cursors_advanced += 1; 678 - last_cursor = key.cursor(); 679 - } 680 - log::warn!("done looping. looping cbr counts(?).."); 681 - 682 - for ((nsid, rollup), counts) in counts_by_rollup { 683 - log::warn!( 684 - "######################## cbr loop {nsid:?} {counts:?} ########################" 685 - ); 686 - let key_bytes = match rollup { 687 - Rollup::Hourly(hourly_cursor) => { 688 - let k = HourlyRollupKey::new(hourly_cursor, &nsid); 689 - log::info!("hrly k: {k:?}"); 690 - k.to_db_bytes()? 691 - } 692 - Rollup::Weekly(weekly_cursor) => { 693 - let k = WeeklyRollupKey::new(weekly_cursor, &nsid); 694 - log::info!("weekly k: {k:?}"); 695 - k.to_db_bytes()? 696 - } 697 - Rollup::AllTime => { 698 - let k = AllTimeRollupKey::new(&nsid); 699 - log::info!("alltime k: {k:?}"); 700 - k.to_db_bytes()? 701 - } 702 - }; 703 - // log::info!("key bytes: {key_bytes:?}"); 704 - let mut rolled: CountsValue = self 705 - .rollups 706 - .get(&key_bytes)? 707 - .inspect(|v| { 708 - let lax = CountsValue::from_db_bytes(v); 709 - log::info!( 710 - "val: len={}, lax={lax:?} first32={:?}", 711 - v.len(), 712 - v.get(..32) 713 - ); 714 - }) 715 - .as_deref() 716 - .map(db_complete::<CountsValue>) 717 - .transpose() 718 - .inspect_err(|e| log::warn!("oooh did we break on the rolled thing? {e:?}"))? 719 - .unwrap_or_default(); 720 - 721 - // try to round-trip before inserting, for funsies 722 - let tripppin = counts.to_db_bytes()?; 723 - let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 724 - assert_eq!(n, tripppin.len()); 725 - assert_eq!(counts.prefix, and_back.prefix); 726 - assert_eq!(counts.dids().estimate(), and_back.dids().estimate()); 727 - if counts.counts().creates > 20000000 { 728 - panic!("COUNTS maybe wtf? {counts:?}") 729 - } 730 - // assert_eq!(rolled, and_back); 731 - 732 - rolled.merge(&counts); 733 - 734 - // try to round-trip before inserting, for funsies 735 - let tripppin = rolled.to_db_bytes()?; 736 - let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 737 - assert_eq!(n, tripppin.len()); 738 - assert_eq!(rolled.prefix, and_back.prefix); 739 - assert_eq!(rolled.dids().estimate(), and_back.dids().estimate()); 740 - if rolled.counts().creates > 20000000 { 741 - panic!("maybe wtf? {rolled:?}") 742 - } 743 - // assert_eq!(rolled, and_back); 744 - 745 - batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?); 746 - } 747 - 748 - log::warn!("done cbr loop."); 749 - 750 - insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor) 751 - .inspect_err(|e| log::warn!("insert neu: {e:?}"))?; 752 - 753 - batch.commit()?; 754 - 755 - log::warn!("ok finished rlc stuff. huh."); 756 - Ok(cursors_advanced) 757 - } 758 - } 759 - 760 - impl StoreWriter<MemBackground> for MemWriter { 761 - fn background_tasks(&mut self, _reroll: bool) -> StorageResult<MemBackground> { 762 - Ok(MemBackground {}) 763 - } 764 - 765 - fn insert_batch<const LIMIT: usize>( 766 - &mut self, 767 - event_batch: EventBatch<LIMIT>, 768 - ) -> StorageResult<()> { 769 - if event_batch.is_empty() { 770 - return Ok(()); 771 - } 772 - 773 - let mut batch = self.keyspace.batch(); 774 - 775 - // would be nice not to have to iterate everything at once here 776 - let latest = event_batch.latest_cursor().unwrap(); 777 - 778 - for (nsid, commits) in event_batch.commits_by_nsid { 779 - for commit in commits.commits { 780 - let location_key: RecordLocationKey = (&commit, &nsid).into(); 781 - 782 - match commit.action { 783 - CommitAction::Cut => { 784 - batch.remove(&self.records, &location_key.to_db_bytes()?); 785 - } 786 - CommitAction::Put(put_action) => { 787 - let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor); 788 - let feed_val: NsidRecordFeedVal = 789 - (&commit.did, &commit.rkey, commit.rev.as_str()).into(); 790 - batch.insert( 791 - &self.feeds, 792 - &feed_key.to_db_bytes()?, 793 - &feed_val.to_db_bytes()?, 794 - ); 795 - 796 - let location_val: RecordLocationVal = 797 - (commit.cursor, commit.rev.as_str(), put_action).into(); 798 - batch.insert( 799 - &self.records, 800 - &location_key.to_db_bytes()?, 801 - &location_val.to_db_bytes()?, 802 - ); 803 - } 804 - } 805 - } 806 - let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 807 - let counts_value = CountsValue::new( 808 - CommitCounts { 809 - creates: commits.creates as u64, 810 - updates: commits.updates as u64, 811 - deletes: commits.deletes as u64, 812 - }, 813 - commits.dids_estimate, 814 - ); 815 - batch.insert( 816 - &self.rollups, 817 - &live_counts_key.to_db_bytes()?, 818 - &counts_value.to_db_bytes()?, 819 - ); 820 - } 821 - 822 - for remove in event_batch.account_removes { 823 - let queue_key = DeleteAccountQueueKey::new(remove.cursor); 824 - let queue_val: DeleteAccountQueueVal = remove.did; 825 - batch.insert( 826 - &self.queues, 827 - &queue_key.to_db_bytes()?, 828 - &queue_val.to_db_bytes()?, 829 - ); 830 - } 831 - 832 - batch.insert( 833 - &self.global, 834 - &DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?, 835 - &latest.to_db_bytes()?, 836 - ); 837 - 838 - batch.commit()?; 839 - Ok(()) 840 - } 841 - 842 - fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> { 843 - let mut dirty_nsids = HashSet::new(); 844 - 845 - let rollup_cursor = 846 - get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 847 - .ok_or(StorageError::BadStateError( 848 - "Could not find current rollup cursor".to_string(), 849 - )) 850 - .inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?; 851 - 852 - // timelies 853 - let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor) 854 - .inspect_err(|e| log::warn!("live counts range: {e:?}"))?; 855 - let mut timely_iter = self.rollups.range(live_counts_range).into_iter().peekable(); 856 - 857 - let timely_next = timely_iter 858 - .peek_mut() 859 - .map(|kv| -> StorageResult<LiveCountsKey> { 860 - match kv { 861 - Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 862 - Ok((key_bytes, _)) => { 863 - let key = db_complete::<LiveCountsKey>(key_bytes).inspect_err(|e| { 864 - log::warn!("failed getting key for next timely: {e:?}") 865 - })?; 866 - Ok(key) 867 - } 868 - } 869 - }) 870 - .transpose() 871 - .inspect_err(|e| log::warn!("something about timely: {e:?}"))?; 872 - 873 - // delete accounts 874 - let delete_accounts_range = 875 - DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 876 - 877 - let next_delete = self 878 - .queues 879 - .range(delete_accounts_range) 880 - .into_iter() 881 - .next() 882 - .transpose() 883 - .inspect_err(|e| log::warn!("range for next delete: {e:?}"))? 884 - .map(|(key_bytes, val_bytes)| { 885 - db_complete::<DeleteAccountQueueKey>(&key_bytes) 886 - .inspect_err(|e| log::warn!("failed inside next delete thing????: {e:?}")) 887 - .map(|k| (k.suffix, key_bytes, val_bytes)) 888 - }) 889 - .transpose() 890 - .inspect_err(|e| log::warn!("failed getting next delete: {e:?}"))?; 891 - 892 - let cursors_stepped = match (timely_next, next_delete) { 893 - (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 894 - if timely.cursor() < delete_cursor { 895 - let n = self 896 - .rollup_live_counts( 897 - timely_iter, 898 - Some(delete_cursor), 899 - MAX_BATCHED_ROLLUP_COUNTS, 900 - ) 901 - .inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))?; 902 - dirty_nsids.insert(timely.collection().clone()); 903 - n 904 - } else { 905 - self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 906 - .inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))? 907 - } 908 - } 909 - (Some(timely), None) => { 910 - let n = self 911 - .rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS) 912 - .inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?; 913 - dirty_nsids.insert(timely.collection().clone()); 914 - n 915 - } 916 - (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => self 917 - .rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 918 - .inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))?, 919 - (None, None) => 0, 920 - }; 921 - 922 - Ok((cursors_stepped, dirty_nsids)) 923 - } 924 - 925 - fn trim_collection( 926 - &mut self, 927 - collection: &Nsid, 928 - limit: usize, 929 - _full_scan: bool, 930 - // TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end) 931 - ) -> StorageResult<(usize, usize, bool)> { 932 - let mut dangling_feed_keys_cleaned = 0; 933 - let mut records_deleted = 0; 934 - 935 - let mut batch = self.keyspace.batch(); 936 - 937 - let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 938 - let mut found = 0; 939 - for kv in self.feeds.prefix(&prefix).into_iter().rev() { 940 - let (key_bytes, val_bytes) = kv?; 941 - let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 942 - let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 943 - let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 944 - let location_key_bytes = location_key.to_db_bytes()?; 945 - 946 - let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 947 - // record was deleted (hopefully) 948 - batch.remove(&self.feeds, &location_key_bytes); 949 - dangling_feed_keys_cleaned += 1; 950 - continue; 951 - }; 952 - 953 - let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 954 - 955 - if meta.cursor() != feed_key.cursor() { 956 - // older/different version 957 - batch.remove(&self.feeds, &location_key_bytes); 958 - dangling_feed_keys_cleaned += 1; 959 - continue; 960 - } 961 - if meta.rev != feed_val.rev() { 962 - // weird... 963 - log::warn!("record lookup: cursor match but rev did not...? removing."); 964 - batch.remove(&self.feeds, &location_key_bytes); 965 - dangling_feed_keys_cleaned += 1; 966 - continue; 967 - } 968 - 969 - if batch.len() >= MAX_BATCHED_CLEANUP_SIZE { 970 - batch.commit()?; 971 - batch = self.keyspace.batch(); 972 - } 973 - 974 - found += 1; 975 - if found <= limit { 976 - continue; 977 - } 978 - 979 - batch.remove(&self.feeds, &location_key_bytes); 980 - batch.remove(&self.records, &location_key_bytes); 981 - records_deleted += 1; 982 - } 983 - 984 - batch.commit()?; 985 - 986 - log::info!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records"); 987 - Ok((dangling_feed_keys_cleaned, records_deleted, false)) 988 - } 989 - 990 - fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> { 991 - let mut records_deleted = 0; 992 - let mut batch = self.keyspace.batch(); 993 - let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?; 994 - for kv in self.records.prefix(&prefix) { 995 - let (key_bytes, _) = kv?; 996 - batch.remove(&self.records, &key_bytes); 997 - records_deleted += 1; 998 - if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS { 999 - batch.commit()?; 1000 - batch = self.keyspace.batch(); 1001 - } 1002 - } 1003 - batch.commit()?; 1004 - Ok(records_deleted) 1005 - } 1006 - } 1007 - 1008 - pub struct MemBackground; 1009 - 1010 - #[async_trait] 1011 - impl StoreBackground for MemBackground { 1012 - async fn run(mut self, _backfill: bool) -> StorageResult<()> { 1013 - // noop for mem (is there a nicer way to do this?) 1014 - loop { 1015 - tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await; 1016 - } 1017 - } 1018 - } 1019 - 1020 - /// Get a value from a fixed key 1021 - fn get_static_neu<K: StaticStr, V: DbBytes>(global: &MemPartion) -> StorageResult<Option<V>> { 1022 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1023 - let value = global 1024 - .get(&key_bytes)? 1025 - .map(|value_bytes| db_complete(&value_bytes)) 1026 - .transpose()?; 1027 - Ok(value) 1028 - } 1029 - 1030 - /// Get a value from a fixed key 1031 - fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>( 1032 - global: &MemPartion, 1033 - ) -> StorageResult<Option<V>> { 1034 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1035 - let value = global 1036 - .get(&key_bytes)? 1037 - .map(|value_bytes| db_complete(&value_bytes)) 1038 - .transpose()?; 1039 - Ok(value) 1040 - } 1041 - 1042 - /// Set a value to a fixed key 1043 - fn insert_static_neu<K: StaticStr>(global: &MemPartion, value: impl DbBytes) -> StorageResult<()> { 1044 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1045 - let value_bytes = value.to_db_bytes()?; 1046 - global.insert(&key_bytes, &value_bytes)?; 1047 - Ok(()) 1048 - } 1049 - 1050 - /// Set a value to a fixed key 1051 - fn insert_batch_static_neu<K: StaticStr>( 1052 - batch: &mut MemBatch, 1053 - global: &MemPartion, 1054 - value: impl DbBytes, 1055 - ) -> StorageResult<()> { 1056 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1057 - let value_bytes = value.to_db_bytes()?; 1058 - batch.insert(global, &key_bytes, &value_bytes); 1059 - Ok(()) 1060 - } 1061 - 1062 - #[derive(Debug, serde::Serialize, schemars::JsonSchema)] 1063 - pub struct StorageInfo { 1064 - pub keyspace_disk_space: u64, 1065 - pub keyspace_journal_count: usize, 1066 - pub keyspace_sequence: u64, 1067 - pub global_approximate_len: usize, 1068 - } 1069 - 1070 - #[cfg(test)] 1071 - mod tests { 1072 - use super::*; 1073 - use crate::{DeleteAccount, RecordKey, UFOsCommit}; 1074 - use jetstream::events::{CommitEvent, CommitOp}; 1075 - use jetstream::exports::Cid; 1076 - use serde_json::value::RawValue; 1077 - 1078 - fn fjall_db() -> (MemReader, MemWriter) { 1079 - let (read, write, _, _) = MemStorage::init( 1080 - tempfile::tempdir().unwrap(), 1081 - "offline test (no real jetstream endpoint)".to_string(), 1082 - false, 1083 - MemConfig { temp: true }, 1084 - ) 1085 - .unwrap(); 1086 - (read, write) 1087 - } 1088 - 1089 - const TEST_BATCH_LIMIT: usize = 16; 1090 - 1091 - #[derive(Debug, Default)] 1092 - struct TestBatch { 1093 - pub batch: EventBatch<TEST_BATCH_LIMIT>, 1094 - } 1095 - 1096 - impl TestBatch { 1097 - #[allow(clippy::too_many_arguments)] 1098 - pub fn create( 1099 - &mut self, 1100 - did: &str, 1101 - collection: &str, 1102 - rkey: &str, 1103 - record: &str, 1104 - rev: Option<&str>, 1105 - cid: Option<Cid>, 1106 - cursor: u64, 1107 - ) -> Nsid { 1108 - let did = Did::new(did.to_string()).unwrap(); 1109 - let collection = Nsid::new(collection.to_string()).unwrap(); 1110 - let record = RawValue::from_string(record.to_string()).unwrap(); 1111 - let cid = cid.unwrap_or( 1112 - "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1113 - .parse() 1114 - .unwrap(), 1115 - ); 1116 - 1117 - let event = CommitEvent { 1118 - collection, 1119 - rkey: RecordKey::new(rkey.to_string()).unwrap(), 1120 - rev: rev.unwrap_or("asdf").to_string(), 1121 - operation: CommitOp::Create, 1122 - record: Some(record), 1123 - cid: Some(cid), 1124 - }; 1125 - 1126 - let (commit, collection) = 1127 - UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1128 - .unwrap(); 1129 - 1130 - self.batch 1131 - .commits_by_nsid 1132 - .entry(collection.clone()) 1133 - .or_default() 1134 - .truncating_insert(commit, &[0u8; 16]) 1135 - .unwrap(); 1136 - 1137 - collection 1138 - } 1139 - #[allow(clippy::too_many_arguments)] 1140 - pub fn update( 1141 - &mut self, 1142 - did: &str, 1143 - collection: &str, 1144 - rkey: &str, 1145 - record: &str, 1146 - rev: Option<&str>, 1147 - cid: Option<Cid>, 1148 - cursor: u64, 1149 - ) -> Nsid { 1150 - let did = Did::new(did.to_string()).unwrap(); 1151 - let collection = Nsid::new(collection.to_string()).unwrap(); 1152 - let record = RawValue::from_string(record.to_string()).unwrap(); 1153 - let cid = cid.unwrap_or( 1154 - "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1155 - .parse() 1156 - .unwrap(), 1157 - ); 1158 - 1159 - let event = CommitEvent { 1160 - collection, 1161 - rkey: RecordKey::new(rkey.to_string()).unwrap(), 1162 - rev: rev.unwrap_or("asdf").to_string(), 1163 - operation: CommitOp::Update, 1164 - record: Some(record), 1165 - cid: Some(cid), 1166 - }; 1167 - 1168 - let (commit, collection) = 1169 - UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1170 - .unwrap(); 1171 - 1172 - self.batch 1173 - .commits_by_nsid 1174 - .entry(collection.clone()) 1175 - .or_default() 1176 - .truncating_insert(commit, &[0u8; 16]) 1177 - .unwrap(); 1178 - 1179 - collection 1180 - } 1181 - #[allow(clippy::too_many_arguments)] 1182 - pub fn delete( 1183 - &mut self, 1184 - did: &str, 1185 - collection: &str, 1186 - rkey: &str, 1187 - rev: Option<&str>, 1188 - cursor: u64, 1189 - ) -> Nsid { 1190 - let did = Did::new(did.to_string()).unwrap(); 1191 - let collection = Nsid::new(collection.to_string()).unwrap(); 1192 - let event = CommitEvent { 1193 - collection, 1194 - rkey: RecordKey::new(rkey.to_string()).unwrap(), 1195 - rev: rev.unwrap_or("asdf").to_string(), 1196 - operation: CommitOp::Delete, 1197 - record: None, 1198 - cid: None, 1199 - }; 1200 - 1201 - let (commit, collection) = 1202 - UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap(); 1203 - 1204 - self.batch 1205 - .commits_by_nsid 1206 - .entry(collection.clone()) 1207 - .or_default() 1208 - .truncating_insert(commit, &[0u8; 16]) 1209 - .unwrap(); 1210 - 1211 - collection 1212 - } 1213 - pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did { 1214 - let did = Did::new(did.to_string()).unwrap(); 1215 - self.batch.account_removes.push(DeleteAccount { 1216 - did: did.clone(), 1217 - cursor: Cursor::from_raw_u64(cursor), 1218 - }); 1219 - did 1220 - } 1221 - } 1222 - 1223 - #[test] 1224 - fn test_hello() -> anyhow::Result<()> { 1225 - let (read, mut write) = fjall_db(); 1226 - write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1227 - let (records, dids) = 1228 - read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?; 1229 - assert_eq!(records, 0); 1230 - assert_eq!(dids, 0); 1231 - Ok(()) 1232 - } 1233 - 1234 - #[test] 1235 - fn test_insert_one() -> anyhow::Result<()> { 1236 - let (read, mut write) = fjall_db(); 1237 - 1238 - let mut batch = TestBatch::default(); 1239 - let collection = batch.create( 1240 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1241 - "a.b.c", 1242 - "asdf", 1243 - "{}", 1244 - Some("rev-z"), 1245 - None, 1246 - 100, 1247 - ); 1248 - write.insert_batch(batch.batch)?; 1249 - 1250 - let (records, dids) = read.get_counts_by_collection(&collection)?; 1251 - assert_eq!(records, 1); 1252 - assert_eq!(dids, 1); 1253 - let (records, dids) = 1254 - read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?; 1255 - assert_eq!(records, 0); 1256 - assert_eq!(dids, 0); 1257 - 1258 - let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?; 1259 - assert_eq!(records.len(), 1); 1260 - let rec = &records[0]; 1261 - assert_eq!(rec.record.get(), "{}"); 1262 - assert!(!rec.is_update); 1263 - 1264 - let records = read.get_records_by_collections( 1265 - HashSet::from([Nsid::new("d.e.f".to_string()).unwrap()]), 1266 - 2, 1267 - false, 1268 - )?; 1269 - assert_eq!(records.len(), 0); 1270 - 1271 - Ok(()) 1272 - } 1273 - 1274 - #[test] 1275 - fn test_get_multi_collection() -> anyhow::Result<()> { 1276 - let (read, mut write) = fjall_db(); 1277 - 1278 - let mut batch = TestBatch::default(); 1279 - batch.create( 1280 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1281 - "a.a.a", 1282 - "aaa", 1283 - r#""earliest""#, 1284 - Some("rev-a"), 1285 - None, 1286 - 100, 1287 - ); 1288 - batch.create( 1289 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1290 - "a.a.b", 1291 - "aab", 1292 - r#""in between""#, 1293 - Some("rev-ab"), 1294 - None, 1295 - 101, 1296 - ); 1297 - batch.create( 1298 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1299 - "a.a.a", 1300 - "aaa-2", 1301 - r#""last""#, 1302 - Some("rev-a-2"), 1303 - None, 1304 - 102, 1305 - ); 1306 - write.insert_batch(batch.batch)?; 1307 - 1308 - let records = read.get_records_by_collections( 1309 - HashSet::from([ 1310 - Nsid::new("a.a.a".to_string()).unwrap(), 1311 - Nsid::new("a.a.b".to_string()).unwrap(), 1312 - Nsid::new("a.a.c".to_string()).unwrap(), 1313 - ]), 1314 - 100, 1315 - false, 1316 - )?; 1317 - assert_eq!(records.len(), 3); 1318 - assert_eq!(records[0].record.get(), r#""last""#); 1319 - assert_eq!( 1320 - records[0].collection, 1321 - Nsid::new("a.a.a".to_string()).unwrap() 1322 - ); 1323 - assert_eq!(records[1].record.get(), r#""in between""#); 1324 - assert_eq!( 1325 - records[1].collection, 1326 - Nsid::new("a.a.b".to_string()).unwrap() 1327 - ); 1328 - assert_eq!(records[2].record.get(), r#""earliest""#); 1329 - assert_eq!( 1330 - records[2].collection, 1331 - Nsid::new("a.a.a".to_string()).unwrap() 1332 - ); 1333 - 1334 - Ok(()) 1335 - } 1336 - 1337 - #[test] 1338 - fn test_update_one() -> anyhow::Result<()> { 1339 - let (read, mut write) = fjall_db(); 1340 - 1341 - let mut batch = TestBatch::default(); 1342 - let collection = batch.create( 1343 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1344 - "a.b.c", 1345 - "rkey-asdf", 1346 - "{}", 1347 - Some("rev-a"), 1348 - None, 1349 - 100, 1350 - ); 1351 - write.insert_batch(batch.batch)?; 1352 - 1353 - let mut batch = TestBatch::default(); 1354 - batch.update( 1355 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1356 - "a.b.c", 1357 - "rkey-asdf", 1358 - r#"{"ch": "ch-ch-ch-changes"}"#, 1359 - Some("rev-z"), 1360 - None, 1361 - 101, 1362 - ); 1363 - write.insert_batch(batch.batch)?; 1364 - 1365 - let (records, dids) = read.get_counts_by_collection(&collection)?; 1366 - assert_eq!(records, 1); 1367 - assert_eq!(dids, 1); 1368 - 1369 - let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?; 1370 - assert_eq!(records.len(), 1); 1371 - let rec = &records[0]; 1372 - assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); 1373 - assert!(rec.is_update); 1374 - Ok(()) 1375 - } 1376 - 1377 - #[test] 1378 - fn test_delete_one() -> anyhow::Result<()> { 1379 - let (read, mut write) = fjall_db(); 1380 - 1381 - let mut batch = TestBatch::default(); 1382 - let collection = batch.create( 1383 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1384 - "a.b.c", 1385 - "rkey-asdf", 1386 - "{}", 1387 - Some("rev-a"), 1388 - None, 1389 - 100, 1390 - ); 1391 - write.insert_batch(batch.batch)?; 1392 - 1393 - let mut batch = TestBatch::default(); 1394 - batch.delete( 1395 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1396 - "a.b.c", 1397 - "rkey-asdf", 1398 - Some("rev-z"), 1399 - 101, 1400 - ); 1401 - write.insert_batch(batch.batch)?; 1402 - 1403 - let (records, dids) = read.get_counts_by_collection(&collection)?; 1404 - assert_eq!(records, 1); 1405 - assert_eq!(dids, 1); 1406 - 1407 - let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?; 1408 - assert_eq!(records.len(), 0); 1409 - 1410 - Ok(()) 1411 - } 1412 - 1413 - #[test] 1414 - fn test_collection_trim() -> anyhow::Result<()> { 1415 - let (read, mut write) = fjall_db(); 1416 - 1417 - let mut batch = TestBatch::default(); 1418 - batch.create( 1419 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1420 - "a.a.a", 1421 - "rkey-aaa", 1422 - "{}", 1423 - Some("rev-aaa"), 1424 - None, 1425 - 10_000, 1426 - ); 1427 - let mut last_b_cursor; 1428 - for i in 1..=10 { 1429 - last_b_cursor = 11_000 + i; 1430 - batch.create( 1431 - &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3), 1432 - "a.a.b", 1433 - &format!("rkey-bbb-{i}"), 1434 - &format!(r#"{{"n": {i}}}"#), 1435 - Some(&format!("rev-bbb-{i}")), 1436 - None, 1437 - last_b_cursor, 1438 - ); 1439 - } 1440 - batch.create( 1441 - "did:plc:inze6wrmsm7pjl7yta3oig77", 1442 - "a.a.c", 1443 - "rkey-ccc", 1444 - "{}", 1445 - Some("rev-ccc"), 1446 - None, 1447 - 12_000, 1448 - ); 1449 - 1450 - write.insert_batch(batch.batch)?; 1451 - 1452 - let records = read.get_records_by_collections( 1453 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1454 - 100, 1455 - false, 1456 - )?; 1457 - assert_eq!(records.len(), 1); 1458 - let records = read.get_records_by_collections( 1459 - HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 1460 - 100, 1461 - false, 1462 - )?; 1463 - assert_eq!(records.len(), 10); 1464 - let records = read.get_records_by_collections( 1465 - HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 1466 - 100, 1467 - false, 1468 - )?; 1469 - assert_eq!(records.len(), 1); 1470 - let records = read.get_records_by_collections( 1471 - HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 1472 - 100, 1473 - false, 1474 - )?; 1475 - assert_eq!(records.len(), 0); 1476 - 1477 - write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6, false)?; 1478 - write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6, false)?; 1479 - write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6, false)?; 1480 - write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?; 1481 - 1482 - let records = read.get_records_by_collections( 1483 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1484 - 100, 1485 - false, 1486 - )?; 1487 - assert_eq!(records.len(), 1); 1488 - let records = read.get_records_by_collections( 1489 - HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 1490 - 100, 1491 - false, 1492 - )?; 1493 - assert_eq!(records.len(), 6); 1494 - let records = read.get_records_by_collections( 1495 - HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 1496 - 100, 1497 - false, 1498 - )?; 1499 - assert_eq!(records.len(), 1); 1500 - let records = read.get_records_by_collections( 1501 - HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 1502 - 100, 1503 - false, 1504 - )?; 1505 - assert_eq!(records.len(), 0); 1506 - 1507 - Ok(()) 1508 - } 1509 - 1510 - #[test] 1511 - fn test_delete_account() -> anyhow::Result<()> { 1512 - let (read, mut write) = fjall_db(); 1513 - 1514 - let mut batch = TestBatch::default(); 1515 - batch.create( 1516 - "did:plc:person-a", 1517 - "a.a.a", 1518 - "rkey-aaa", 1519 - "{}", 1520 - Some("rev-aaa"), 1521 - None, 1522 - 10_000, 1523 - ); 1524 - for i in 1..=2 { 1525 - batch.create( 1526 - "did:plc:person-b", 1527 - "a.a.a", 1528 - &format!("rkey-bbb-{i}"), 1529 - &format!(r#"{{"n": {i}}}"#), 1530 - Some(&format!("rev-bbb-{i}")), 1531 - None, 1532 - 11_000 + i, 1533 - ); 1534 - } 1535 - write.insert_batch(batch.batch)?; 1536 - 1537 - let records = read.get_records_by_collections( 1538 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1539 - 100, 1540 - false, 1541 - )?; 1542 - assert_eq!(records.len(), 3); 1543 - 1544 - let records_deleted = 1545 - write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?; 1546 - assert_eq!(records_deleted, 2); 1547 - 1548 - let records = read.get_records_by_collections( 1549 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1550 - 100, 1551 - false, 1552 - )?; 1553 - assert_eq!(records.len(), 1); 1554 - 1555 - Ok(()) 1556 - } 1557 - 1558 - #[test] 1559 - fn rollup_delete_account_removes_record() -> anyhow::Result<()> { 1560 - let (read, mut write) = fjall_db(); 1561 - 1562 - let mut batch = TestBatch::default(); 1563 - batch.create( 1564 - "did:plc:person-a", 1565 - "a.a.a", 1566 - "rkey-aaa", 1567 - "{}", 1568 - Some("rev-aaa"), 1569 - None, 1570 - 10_000, 1571 - ); 1572 - write.insert_batch(batch.batch)?; 1573 - 1574 - let mut batch = TestBatch::default(); 1575 - batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup 1576 - write.insert_batch(batch.batch)?; 1577 - 1578 - write.step_rollup()?; 1579 - 1580 - let records = read.get_records_by_collections( 1581 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1582 - 1, 1583 - false, 1584 - )?; 1585 - assert_eq!(records.len(), 0); 1586 - 1587 - Ok(()) 1588 - } 1589 - 1590 - #[test] 1591 - fn rollup_delete_live_count_step() -> anyhow::Result<()> { 1592 - let (read, mut write) = fjall_db(); 1593 - 1594 - let mut batch = TestBatch::default(); 1595 - batch.create( 1596 - "did:plc:person-a", 1597 - "a.a.a", 1598 - "rkey-aaa", 1599 - "{}", 1600 - Some("rev-aaa"), 1601 - None, 1602 - 10_000, 1603 - ); 1604 - write.insert_batch(batch.batch)?; 1605 - 1606 - let (n, _) = write.step_rollup()?; 1607 - assert_eq!(n, 1); 1608 - 1609 - let mut batch = TestBatch::default(); 1610 - batch.delete_account("did:plc:person-a", 10_001); 1611 - write.insert_batch(batch.batch)?; 1612 - 1613 - let records = read.get_records_by_collections( 1614 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1615 - 1, 1616 - false, 1617 - )?; 1618 - assert_eq!(records.len(), 1); 1619 - 1620 - let (n, _) = write.step_rollup()?; 1621 - assert_eq!(n, 1); 1622 - 1623 - let records = read.get_records_by_collections( 1624 - HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1625 - 1, 1626 - false, 1627 - )?; 1628 - assert_eq!(records.len(), 0); 1629 - 1630 - let mut batch = TestBatch::default(); 1631 - batch.delete_account("did:plc:person-a", 9_999); 1632 - write.insert_batch(batch.batch)?; 1633 - 1634 - let (n, _) = write.step_rollup()?; 1635 - assert_eq!(n, 0); 1636 - 1637 - Ok(()) 1638 - } 1639 - 1640 - #[test] 1641 - fn rollup_multiple_count_batches() -> anyhow::Result<()> { 1642 - let (_read, mut write) = fjall_db(); 1643 - 1644 - let mut batch = TestBatch::default(); 1645 - batch.create( 1646 - "did:plc:person-a", 1647 - "a.a.a", 1648 - "rkey-aaa", 1649 - "{}", 1650 - Some("rev-aaa"), 1651 - None, 1652 - 10_000, 1653 - ); 1654 - write.insert_batch(batch.batch)?; 1655 - 1656 - let mut batch = TestBatch::default(); 1657 - batch.create( 1658 - "did:plc:person-a", 1659 - "a.a.a", 1660 - "rkey-aab", 1661 - "{}", 1662 - Some("rev-aab"), 1663 - None, 1664 - 10_001, 1665 - ); 1666 - write.insert_batch(batch.batch)?; 1667 - 1668 - let (n, _) = write.step_rollup()?; 1669 - assert_eq!(n, 2); 1670 - 1671 - let (n, _) = write.step_rollup()?; 1672 - assert_eq!(n, 0); 1673 - 1674 - Ok(()) 1675 - } 1676 - 1677 - #[test] 1678 - fn counts_before_and_after_rollup() -> anyhow::Result<()> { 1679 - let (read, mut write) = fjall_db(); 1680 - 1681 - let mut batch = TestBatch::default(); 1682 - batch.create( 1683 - "did:plc:person-a", 1684 - "a.a.a", 1685 - "rkey-aaa", 1686 - "{}", 1687 - Some("rev-aaa"), 1688 - None, 1689 - 10_000, 1690 - ); 1691 - batch.create( 1692 - "did:plc:person-b", 1693 - "a.a.a", 1694 - "rkey-bbb", 1695 - "{}", 1696 - Some("rev-bbb"), 1697 - None, 1698 - 10_001, 1699 - ); 1700 - write.insert_batch(batch.batch)?; 1701 - 1702 - let mut batch = TestBatch::default(); 1703 - batch.delete_account("did:plc:person-a", 11_000); 1704 - write.insert_batch(batch.batch)?; 1705 - 1706 - let mut batch = TestBatch::default(); 1707 - batch.create( 1708 - "did:plc:person-a", 1709 - "a.a.a", 1710 - "rkey-aac", 1711 - "{}", 1712 - Some("rev-aac"), 1713 - None, 1714 - 12_000, 1715 - ); 1716 - write.insert_batch(batch.batch)?; 1717 - 1718 - // before any rollup 1719 - let (records, dids) = 1720 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1721 - assert_eq!(records, 3); 1722 - assert_eq!(dids, 2); 1723 - 1724 - // first batch rolled up 1725 - let (n, _) = write.step_rollup()?; 1726 - assert_eq!(n, 1); 1727 - 1728 - let (records, dids) = 1729 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1730 - assert_eq!(records, 3); 1731 - assert_eq!(dids, 2); 1732 - 1733 - // delete account rolled up 1734 - let (n, _) = write.step_rollup()?; 1735 - assert_eq!(n, 1); 1736 - 1737 - let (records, dids) = 1738 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1739 - assert_eq!(records, 3); 1740 - assert_eq!(dids, 2); 1741 - 1742 - // second batch rolled up 1743 - let (n, _) = write.step_rollup()?; 1744 - assert_eq!(n, 1); 1745 - 1746 - let (records, dids) = 1747 - read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1748 - assert_eq!(records, 3); 1749 - assert_eq!(dids, 2); 1750 - 1751 - // no more rollups left 1752 - let (n, _) = write.step_rollup()?; 1753 - assert_eq!(n, 0); 1754 - 1755 - Ok(()) 1756 - } 1757 - }