Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

combine counts into (records, dids_estimate) pair

scanning counts-only will be a bit more expensive, but we *can* at least avoid deserializing the HLL if needed, since DbConcat has the (cheap) count as a prefix.

+77 -116
+1 -2
ufos/src/storage.rs
··· 19 19 } 20 20 21 21 pub trait StoreReader: Clone { 22 - fn get_total_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError>; 23 - fn get_dids_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError>; 22 + fn get_counts_by_collection(&self, collection: &Nsid) -> Result<(u64, u64), StorageError>; 24 23 // fn get_records_by_collections(&self, collections: &) 25 24 }
+45 -72
ufos/src/storage_fjall.rs
··· 3 3 use crate::storage::{StorageWhatever, StoreReader, StoreWriter}; 4 4 use crate::store_types::{ 5 5 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, 6 - DeleteAccountQueueKey, DeleteAccountQueueVal, JetstreamCursorKey, JetstreamCursorValue, 7 - JetstreamEndpointKey, JetstreamEndpointValue, LiveDidsKey, LiveDidsValue, LiveRecordsKey, 8 - LiveRecordsValue, ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, 9 - ModQueueItemValue, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, 10 - NsidRecordFeedVal, RecordLocationKey, RecordLocationVal, RollupCursorKey, RollupCursorValue, 11 - SeenCounter, TakeoffKey, TakeoffValue, 6 + CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, JetstreamCursorKey, 7 + JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, 8 + ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, 9 + NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 10 + RecordLocationKey, RecordLocationVal, RollupCursorKey, RollupCursorValue, SeenCounter, 11 + TakeoffKey, TakeoffValue, 12 12 }; 13 13 use crate::{CommitAction, DeleteAccount, Did, EventBatch, Nsid, RecordKey}; 14 + use cardinality_estimator::CardinalityEstimator; 14 15 use fjall::{ 15 16 Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, 16 17 }; ··· 74 75 * key: nullstr || nullstr || nullstr (did, collection, rkey) 75 76 * val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 76 77 * 78 + * 77 79 * Partition: 'rollups' 78 80 * 79 - * - Live (batched) records per collection 80 - * key: "live_records" || u64 || nullstr (js_cursor, nsid) 81 - * val: u64 81 + * - Live (batched) records counts and dids estimate per collection 82 + * key: "live_counts" || u64 || nullstr (js_cursor, nsid) 83 + * val: u64 || HLL (count (not cursor), estimator) 82 84 * 83 - * - Live (batched) DIDs estimate per collections 84 - * key: "live_dids" || u64 || nullstr 85 - * val: HLL (estimator) 85 + * - Hourly total record counts and dids estimate per collection 86 + * key: "hourly_counts" || u64 || nullstr (hour, nsid) 87 + * val: u64 || HLL (count (not cursor), estimator) 86 88 * 87 - * - Hourly total records per collection 88 - * key: "hourly_records" || u64 || nullstr (hour, nsid) 89 - * val: u64 (total count, not jetstream cursor) 89 + * - TODO: weekly rollups? 90 90 * 91 - * - Hourly unique DIDs estimate per collection 92 - * key: "hourly_dids" || u64 || nullstr (hour, nsid) 93 - * val: HLL (estimator) 91 + * - All-time total record counts and dids estimate per collection 92 + * key: "ever_counts" || nullstr (nsid) 93 + * val: u64 || HLL (count (not cursor), estimator) 94 94 * 95 - * - All-time total records per collection 96 - * key: "ever_records" || u64 || nullstr (total, nsid. yeah, total is in the *key*, and acts as a sorter. every update requires a delete+put) 97 - * val: (empty) 98 - * 99 - * - All-time total DIDs estimate per collection 100 - * key: "ever_dids" || u64 || nullstr (estimated cardinality, nsid. like ever_records) 101 - * val: HLL (estimator) 95 + * - TODO: sorted indexes for all-times? 102 96 * 103 97 * 104 98 * Partition: 'queues' ··· 203 197 } 204 198 205 199 impl StoreReader for FjallReader { 206 - fn get_total_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError> { 200 + fn get_counts_by_collection(&self, collection: &Nsid) -> Result<(u64, u64), StorageError> { 207 201 // TODO: start from rollup 208 - let full_range = LiveRecordsKey::range_from_cursor(Cursor::from_start())?; 202 + let full_range = LiveCountsKey::range_from_cursor(Cursor::from_start())?; 209 203 let mut total = 0; 204 + let mut dids = CardinalityEstimator::new(); 210 205 for kv in self.rollups.range(full_range) { 211 206 let (key_bytes, val_bytes) = kv?; 212 - let key = db_complete::<LiveRecordsKey>(&key_bytes)?; 207 + let key = db_complete::<LiveCountsKey>(&key_bytes)?; 213 208 if key.collection() == collection { 214 - let LiveRecordsValue(n) = db_complete(&val_bytes)?; 215 - total += n; 209 + let counts = db_complete::<CountsValue>(&val_bytes)?; 210 + total += counts.records(); 211 + dids.merge(counts.dids()); 216 212 } 217 213 } 218 - Ok(total) 219 - } 220 - fn get_dids_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError> { 221 - // TODO: start from rollup 222 - let full_range = LiveDidsKey::range_from_cursor(Cursor::from_start())?; 223 - let mut total_estimate = cardinality_estimator::CardinalityEstimator::new(); 224 - for kv in self.rollups.range(full_range) { 225 - let (key_bytes, val_bytes) = kv?; 226 - let key = db_complete::<LiveDidsKey>(&key_bytes)?; 227 - if key.collection() == collection { 228 - let LiveDidsValue(estimate) = db_complete(&val_bytes)?; 229 - total_estimate.merge(&estimate); 230 - } 231 - } 232 - Ok(total_estimate.estimate() as u64) 214 + Ok((total, dids.estimate() as u64)) 233 215 } 234 216 } 235 217 ··· 252 234 )?; 253 235 254 236 // timelies 255 - let live_records_range = LiveRecordsKey::range_from_cursor(rollup_cursor)?; 256 - // let live_dids_range = LiveDidsKey::range_from_cursor(rollup_cursor)?; // shoudl be in sync with live records range. we could keep both value under same key? 257 - let mut timely_iter = self.rollups.range(live_records_range); 237 + let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 238 + let mut timely_iter = self.rollups.range(live_counts_range); 258 239 259 240 let next_timely = timely_iter 260 241 .next() 261 242 .transpose()? 262 243 .map(|(key_bytes, val_bytes)| { 263 - db_complete::<LiveRecordsKey>(&key_bytes).map(|k| (k, val_bytes)) 244 + db_complete::<LiveCountsKey>(&key_bytes).map(|k| (k, val_bytes)) 264 245 }) 265 246 .transpose()?; 266 247 ··· 341 322 } 342 323 } 343 324 } 344 - let live_records_key: LiveRecordsKey = (latest, &nsid).into(); 345 - let live_records_value = LiveRecordsValue(commits.total_seen as u64); 346 - batch.insert( 347 - &self.rollups, 348 - &live_records_key.to_db_bytes()?, 349 - &live_records_value.to_db_bytes()?, 350 - ); 351 - 352 - let live_dids_key: LiveDidsKey = (latest, &nsid).into(); 353 - let live_dids_value = LiveDidsValue(commits.dids_estimate); 325 + let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 326 + let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate); 354 327 batch.insert( 355 328 &self.rollups, 356 - &live_dids_key.to_db_bytes()?, 357 - &live_dids_value.to_db_bytes()?, 329 + &live_counts_key.to_db_bytes()?, 330 + &counts_value.to_db_bytes()?, 358 331 ); 359 332 } 360 333 ··· 1202 1175 fn test_hello() -> anyhow::Result<()> { 1203 1176 let (read, mut write) = fjall_db(); 1204 1177 write.insert_batch(EventBatch::default())?; 1205 - let total = read.get_total_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?; 1206 - assert_eq!(total, 0); 1178 + let (records, dids) = 1179 + read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?; 1180 + assert_eq!(records, 0); 1181 + assert_eq!(dids, 0); 1207 1182 Ok(()) 1208 1183 } 1209 1184 ··· 1223 1198 ); 1224 1199 write.insert_batch(batch.batch)?; 1225 1200 1226 - let total = read.get_total_by_collection(&collection)?; 1227 - assert_eq!(total, 1); 1228 - let total = read.get_total_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?; 1229 - assert_eq!(total, 0); 1230 - 1231 - let total = read.get_dids_by_collection(&collection)?; 1232 - assert_eq!(total, 1); 1233 - let total = read.get_dids_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?; 1234 - assert_eq!(total, 0); 1201 + let (records, dids) = read.get_counts_by_collection(&collection)?; 1202 + assert_eq!(records, 1); 1203 + assert_eq!(dids, 1); 1204 + let (records, dids) = 1205 + read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?; 1206 + assert_eq!(records, 0); 1207 + assert_eq!(dids, 0); 1235 1208 1236 1209 // let records = read.get_records_by_collections(&vec![collection], 2); 1237 1210 // assert_eq!(records.len, 1);
+31 -42
ufos/src/store_types.rs
··· 1 1 use crate::db_types::{ 2 2 DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, SerdeBytes, StaticStr, UseBincodePlz, 3 3 }; 4 - use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit}; 4 + use crate::{CollectionCommits, Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit}; 5 5 use bincode::{Decode, Encode}; 6 6 use cardinality_estimator::CardinalityEstimator; 7 7 use std::ops::Range; ··· 136 136 pub struct _LiveRecordsStaticStr {} 137 137 impl StaticStr for _LiveRecordsStaticStr { 138 138 fn static_str() -> &'static str { 139 - "live_records" 139 + "live_counts" 140 140 } 141 141 } 142 - // TODO: merge counts with hlls 143 - type LiveRecordsStaticPrefix = DbStaticStr<_LiveRecordsStaticStr>; 144 - type LiveRecordsCursorPrefix = DbConcat<LiveRecordsStaticPrefix, Cursor>; 145 - pub type LiveRecordsKey = DbConcat<LiveRecordsCursorPrefix, Nsid>; 146 - impl LiveRecordsKey { 142 + 143 + type LiveCountsStaticPrefix = DbStaticStr<_LiveRecordsStaticStr>; 144 + type LiveCountsCursorPrefix = DbConcat<LiveCountsStaticPrefix, Cursor>; 145 + pub type LiveCountsKey = DbConcat<LiveCountsCursorPrefix, Nsid>; 146 + impl LiveCountsKey { 147 147 pub fn range_from_cursor(cursor: Cursor) -> Result<Range<Vec<u8>>, EncodingError> { 148 - let prefix = LiveRecordsCursorPrefix::from_pair(Default::default(), cursor); 148 + let prefix = LiveCountsCursorPrefix::from_pair(Default::default(), cursor); 149 149 Ok(prefix.range_to_prefix_end()?) 150 150 } 151 151 pub fn cursor(&self) -> Cursor { ··· 155 155 &self.suffix 156 156 } 157 157 } 158 - impl From<(Cursor, &Nsid)> for LiveRecordsKey { 158 + impl From<(Cursor, &Nsid)> for LiveCountsKey { 159 159 fn from((cursor, collection): (Cursor, &Nsid)) -> Self { 160 160 Self::from_pair( 161 - LiveRecordsCursorPrefix::from_pair(Default::default(), cursor), 161 + LiveCountsCursorPrefix::from_pair(Default::default(), cursor), 162 162 collection.clone(), 163 163 ) 164 164 } 165 165 } 166 166 #[derive(Debug, PartialEq, Decode, Encode)] 167 - pub struct LiveRecordsValue(pub u64); 168 - impl UseBincodePlz for LiveRecordsValue {} 167 + pub struct TotalRecordsValue(pub u64); 168 + impl UseBincodePlz for TotalRecordsValue {} 169 169 170 - #[derive(Debug, PartialEq)] 171 - pub struct _LiveDidsStaticStr {} 172 - impl StaticStr for _LiveDidsStaticStr { 173 - fn static_str() -> &'static str { 174 - "live_dids" 175 - } 176 - } 177 - pub type LiveDidsStaticPrefix = DbStaticStr<_LiveDidsStaticStr>; 178 - pub type LiveDidsCursorPrefix = DbConcat<LiveDidsStaticPrefix, Cursor>; 179 - pub type LiveDidsKey = DbConcat<LiveDidsCursorPrefix, Nsid>; 180 - impl LiveDidsKey { 181 - pub fn range_from_cursor(cursor: Cursor) -> Result<Range<Vec<u8>>, EncodingError> { 182 - let prefix = LiveDidsCursorPrefix::from_pair(Default::default(), cursor); 183 - Ok(prefix.range_to_prefix_end()?) 184 - } 185 - pub fn collection(&self) -> &Nsid { 186 - &self.suffix 187 - } 188 - } 189 - impl From<(Cursor, &Nsid)> for LiveDidsKey { 190 - fn from((cursor, collection): (Cursor, &Nsid)) -> Self { 191 - Self::from_pair( 192 - LiveDidsCursorPrefix::from_pair(Default::default(), cursor), 193 - collection.clone(), 194 - ) 195 - } 196 - } 197 170 #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] 198 - pub struct LiveDidsValue(pub CardinalityEstimator<Did>); 199 - impl SerdeBytes for LiveDidsValue {} 200 - impl DbBytes for LiveDidsValue { 171 + pub struct EstimatedDidsValue(pub CardinalityEstimator<Did>); 172 + impl SerdeBytes for EstimatedDidsValue {} 173 + impl DbBytes for EstimatedDidsValue { 201 174 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 202 175 SerdeBytes::to_bytes(self) 203 176 } 204 177 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 205 178 SerdeBytes::from_bytes(bytes) 179 + } 180 + } 181 + 182 + pub type CountsValue = DbConcat<TotalRecordsValue, EstimatedDidsValue>; 183 + impl CountsValue { 184 + pub fn new(total: u64, dids: CardinalityEstimator<Did>) -> Self { 185 + Self { 186 + prefix: TotalRecordsValue(total), 187 + suffix: EstimatedDidsValue(dids), 188 + } 189 + } 190 + pub fn records(&self) -> u64 { 191 + self.prefix.0 192 + } 193 + pub fn dids(&self) -> &CardinalityEstimator<Did> { 194 + &self.suffix.0 206 195 } 207 196 } 208 197