Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

top collections

merges records and dids estimates

alsdjf

+166 -2
+7
ufos/src/lib.rs
··· 218 218 }, 219 219 } 220 220 221 + #[derive(Debug, Default, PartialEq, Serialize, JsonSchema)] 222 + pub struct TopCollections { 223 + total_records: u64, 224 + dids_estimate: u64, 225 + nsid_child_segments: HashMap<String, TopCollections>, 226 + } 227 + 221 228 #[cfg(test)] 222 229 mod tests { 223 230 use super::*;
+3 -1
ufos/src/storage.rs
··· 1 1 // use crate::store_types::CountsValue; 2 - use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, UFOsRecord}; 2 + use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, TopCollections, UFOsRecord}; 3 3 use jetstream::exports::{Did, Nsid}; 4 4 use schemars::JsonSchema; 5 5 use serde::Serialize; ··· 38 38 fn get_storage_stats(&self) -> StorageResult<S>; 39 39 40 40 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 41 + 42 + fn get_top_collections(&self) -> StorageResult<TopCollections>; 41 43 42 44 fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>; 43 45
+153 -1
ufos/src/storage_fjall.rs
··· 12 12 RecordLocationMeta, RecordLocationVal, RecordRawValue, RollupCursorKey, RollupCursorValue, 13 13 SeenCounter, TakeoffKey, TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, 14 14 }; 15 - use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, RecordKey, UFOsRecord}; 15 + use crate::{ 16 + CommitAction, ConsumerInfo, Did, EventBatch, Nsid, RecordKey, TopCollections, UFOsRecord, 17 + }; 16 18 use fjall::{ 17 19 Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, 18 20 }; ··· 253 255 started_at, 254 256 latest_cursor, 255 257 }) 258 + } 259 + 260 + fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 261 + // TODO: limit nsid traversal depth 262 + // TODO: limit nsid traversal breadth 263 + // TODO: be serious about anything 264 + 265 + // TODO: probably use a stack of segments to reduce to ~log-n merges 266 + 267 + #[derive(Default)] 268 + struct Blah { 269 + counts: CountsValue, 270 + children: HashMap<String, Blah>, 271 + } 272 + impl From<&Blah> for TopCollections { 273 + fn from(bla: &Blah) -> Self { 274 + let mut me = Self { 275 + total_records: bla.counts.records(), 276 + dids_estimate: bla.counts.dids().estimate() as u64, 277 + ..Default::default() 278 + }; 279 + for (k, v) in &bla.children { 280 + me.nsid_child_segments.insert(k.to_string(), v.into()); 281 + } 282 + me 283 + } 284 + } 285 + 286 + let mut b = Blah::default(); 287 + let prefix = AllTimeRollupKey::from_prefix_to_db_bytes(&Default::default())?; 288 + for kv in self.rollups.prefix(&prefix.to_db_bytes()?) { 289 + let (key_bytes, val_bytes) = kv?; 290 + let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 291 + let val = db_complete::<CountsValue>(&val_bytes)?; 292 + 293 + let mut node = &mut b; 294 + node.counts.merge(&val); 295 + for segment in key.collection().split('.') { 296 + node = node.children.entry(segment.to_string()).or_default(); 297 + node.counts.merge(&val); 298 + } 299 + } 300 + 301 + Ok((&b).into()) 256 302 } 257 303 258 304 fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { ··· 1942 1988 let n = write.step_rollup()?; 1943 1989 assert_eq!(n, 0); 1944 1990 1991 + Ok(()) 1992 + } 1993 + 1994 + #[test] 1995 + fn get_top_collections() -> anyhow::Result<()> { 1996 + let (read, mut write) = fjall_db(); 1997 + 1998 + let mut batch = TestBatch::default(); 1999 + batch.create( 2000 + "did:plc:person-a", 2001 + "a.a.a", 2002 + "rkey-aaa", 2003 + "{}", 2004 + Some("rev-aaa"), 2005 + None, 2006 + 10_000, 2007 + ); 2008 + batch.create( 2009 + "did:plc:person-b", 2010 + "a.a.b", 2011 + "rkey-bbb", 2012 + "{}", 2013 + Some("rev-bbb"), 2014 + None, 2015 + 10_001, 2016 + ); 2017 + batch.create( 2018 + "did:plc:person-c", 2019 + "a.b.c", 2020 + "rkey-ccc", 2021 + "{}", 2022 + Some("rev-ccc"), 2023 + None, 2024 + 10_002, 2025 + ); 2026 + batch.create( 2027 + "did:plc:person-a", 2028 + "a.a.a", 2029 + "rkey-aaa-2", 2030 + "{}", 2031 + Some("rev-aaa-2"), 2032 + None, 2033 + 10_003, 2034 + ); 2035 + write.insert_batch(batch.batch)?; 2036 + 2037 + let n = write.step_rollup()?; 2038 + assert_eq!(n, 3); // 3 collections 2039 + 2040 + let tops = read.get_top_collections()?; 2041 + assert_eq!( 2042 + tops, 2043 + TopCollections { 2044 + total_records: 4, 2045 + dids_estimate: 3, 2046 + nsid_child_segments: HashMap::from([( 2047 + "a".to_string(), 2048 + TopCollections { 2049 + total_records: 4, 2050 + dids_estimate: 3, 2051 + nsid_child_segments: HashMap::from([ 2052 + ( 2053 + "a".to_string(), 2054 + TopCollections { 2055 + total_records: 3, 2056 + dids_estimate: 2, 2057 + nsid_child_segments: HashMap::from([ 2058 + ( 2059 + "a".to_string(), 2060 + TopCollections { 2061 + total_records: 2, 2062 + dids_estimate: 1, 2063 + nsid_child_segments: HashMap::from([]), 2064 + }, 2065 + ), 2066 + ( 2067 + "b".to_string(), 2068 + TopCollections { 2069 + total_records: 1, 2070 + dids_estimate: 1, 2071 + nsid_child_segments: HashMap::from([]), 2072 + } 2073 + ), 2074 + ]), 2075 + }, 2076 + ), 2077 + ( 2078 + "b".to_string(), 2079 + TopCollections { 2080 + total_records: 1, 2081 + dids_estimate: 1, 2082 + nsid_child_segments: HashMap::from([( 2083 + "c".to_string(), 2084 + TopCollections { 2085 + total_records: 1, 2086 + dids_estimate: 1, 2087 + nsid_child_segments: HashMap::from([]), 2088 + }, 2089 + ),]), 2090 + }, 2091 + ), 2092 + ]), 2093 + }, 2094 + ),]), 2095 + } 2096 + ); 1945 2097 Ok(()) 1946 2098 } 1947 2099 }
+3
ufos/src/store_types.rs
··· 334 334 pub fn new(nsid: &Nsid) -> Self { 335 335 Self::from_pair(Default::default(), nsid.clone()) 336 336 } 337 + pub fn collection(&self) -> &Nsid { 338 + &self.suffix 339 + } 337 340 } 338 341 pub type AllTimeRollupVal = CountsValue; 339 342