Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

roll up seen counts by collection

on read. need to actually write rollups still.

+112 -8
+14 -5
ufos/src/db_types.rs
··· 73 73 pub fn to_prefix_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 74 74 self.prefix.to_db_bytes() 75 75 } 76 - pub fn range_to_prefix_end(&self) -> Result<Range<Vec<u8>>, EncodingError> { 77 - let mut key_bytes = self.prefix.to_db_bytes()?; 78 - let (_, Bound::Excluded(range_end)) = prefix_to_range(&key_bytes) else { 76 + pub fn range_end(&self) -> Result<Vec<u8>, EncodingError> { 77 + let prefix_bytes = self.prefix.to_db_bytes()?; 78 + let (_, Bound::Excluded(range_end)) = prefix_to_range(&prefix_bytes) else { 79 + return Err(EncodingError::BadRangeBound); 80 + }; 81 + Ok(range_end.to_vec()) 82 + } 83 + pub fn range(&self) -> Result<Range<Vec<u8>>, EncodingError> { 84 + let prefix_bytes = self.prefix.to_db_bytes()?; 85 + let (Bound::Included(start), Bound::Excluded(end)) = prefix_to_range(&prefix_bytes) else { 79 86 return Err(EncodingError::BadRangeBound); 80 87 }; 81 - key_bytes.append(&mut self.suffix.to_db_bytes()?); 82 - Ok(key_bytes..range_end.to_vec()) 88 + Ok(start.to_vec()..end.to_vec()) 89 + } 90 + pub fn range_to_prefix_end(&self) -> Result<Range<Vec<u8>>, EncodingError> { 91 + Ok(self.to_db_bytes()?..self.range_end()?) 83 92 } 84 93 } 85 94
+25
ufos/src/server.rs
··· 148 148 Ok(HttpResponseOk(api_records)) 149 149 } 150 150 151 + /// Get total records seen by collection 152 + #[endpoint { 153 + method = GET, 154 + path = "/records/total-seen" 155 + }] 156 + async fn get_asdf( 157 + ctx: RequestContext<Context>, 158 + collection_query: Query<CollectionQuery>, 159 + ) -> Result<HttpResponseOk<u64>, HttpError> { 160 + let Ok(collection) = Nsid::new(collection_query.into_inner().collection) else { 161 + return Err(HttpError::for_bad_request( 162 + None, 163 + "collection must be an NSID".to_string(), 164 + )); 165 + }; 166 + let Context { storage, .. } = ctx.context(); 167 + let total = storage 168 + .get_collection_total_seen(&collection) 169 + .await 170 + .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 171 + 172 + Ok(HttpResponseOk(total)) 173 + } 174 + 151 175 pub async fn serve(storage: Storage) -> Result<(), String> { 152 176 let log = ConfigLogging::StderrTerminal { 153 177 level: ConfigLoggingLevel::Info, ··· 160 184 api.register(get_openapi).unwrap(); 161 185 api.register(get_meta_info).unwrap(); 162 186 api.register(get_records_by_collection).unwrap(); 187 + api.register(get_asdf).unwrap(); 163 188 164 189 let context = Context { 165 190 spec: Arc::new(
+43
ufos/src/store.rs
··· 3 3 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, 4 4 JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, 5 5 ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, 6 + RollupCursorKey, RollupCursorValue, SeenCounter, 6 7 }; 7 8 use crate::{ 8 9 CollectionSamples, CreateRecord, DeleteAccount, Did, EventBatch, ModifyRecord, Nsid, RecordKey, ··· 205 206 .await? 206 207 } 207 208 209 + pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> { 210 + let partition = self.partition.clone(); 211 + let collection = collection.clone(); 212 + tokio::task::spawn_blocking(move || get_unrolled_asdf(&partition, collection)).await? 213 + } 214 + 208 215 pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> { 209 216 let partition = self.partition.clone(); 210 217 tokio::task::spawn_blocking(move || { ··· 279 286 let key_bytes = key.to_db_bytes()?; 280 287 batch.remove(partition, &key_bytes); 281 288 Ok(()) 289 + } 290 + 291 + /// Get stats that haven't been rolled up yet 292 + fn get_unrolled_asdf(partition: &PartitionHandle, collection: Nsid) -> anyhow::Result<u64> { 293 + let range = 294 + if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? { 295 + eprintln!("found existing cursor"); 296 + let key: ByCursorSeenKey = cursor_value.into(); 297 + key.range_from()? 298 + } else { 299 + eprintln!("cursor from start."); 300 + ByCursorSeenKey::full_range()? 301 + }; 302 + 303 + let mut collection_total = 0; 304 + 305 + let mut scanned = 0; 306 + let mut rolled = 0; 307 + for (i, pair) in partition.range(range).enumerate() { 308 + let (key_bytes, value_bytes) = pair?; 309 + let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 310 + let val = db_complete::<ByCursorSeenValue>(&value_bytes)?; 311 + if i >= 20 { 312 + eprintln!("{key:?} => {val:?}") 313 + } 314 + if *key.collection() == collection { 315 + let SeenCounter(n) = val; 316 + collection_total += n; 317 + rolled += 1; 318 + } 319 + scanned += 1; 320 + } 321 + 322 + eprintln!("scanned: {scanned}, rolled: {rolled}"); 323 + 324 + Ok(collection_total) 282 325 } 283 326 284 327 impl DBWriter {
+30 -3
ufos/src/store_types.rs
··· 3 3 }; 4 4 use crate::{Cursor, Did, Nsid, RecordKey}; 5 5 use bincode::{Decode, Encode}; 6 + use std::ops::Range; 6 7 7 8 /// key format: ["js_cursor"] 8 9 #[derive(Debug, PartialEq)] ··· 23 24 } 24 25 } 25 26 pub type ModCursorValue = Cursor; 27 + 28 + /// key format: ["rollup_cursor"] 29 + #[derive(Debug, PartialEq)] 30 + pub struct RollupCursorKey {} 31 + impl StaticStr for RollupCursorKey { 32 + fn static_str() -> &'static str { 33 + "rollup_cursor" 34 + } 35 + } 36 + /// value format: [rollup_cursor(Cursor)|collection(Nsid)] 37 + pub type RollupCursorValue = DbConcat<Cursor, Nsid>; 26 38 27 39 /// key format: ["js_endpoint"] 28 40 #[derive(Debug, PartialEq)] ··· 165 177 } 166 178 } 167 179 type ByCursorSeenPrefix = DbStaticStr<_ByCursorSeenStaticStr>; 180 + type ByCursorSeenCursorPrefix = DbConcat<ByCursorSeenPrefix, Cursor>; 168 181 /// key format: ["seen_by_js_cursor"|js_cursor|collection] 169 - pub type ByCursorSeenKey = DbConcat<DbConcat<ByCursorSeenPrefix, Cursor>, Nsid>; 182 + pub type ByCursorSeenKey = DbConcat<ByCursorSeenCursorPrefix, Nsid>; 170 183 impl ByCursorSeenKey { 171 184 pub fn new(cursor: Cursor, nsid: Nsid) -> Self { 172 185 Self { ··· 174 187 suffix: nsid, 175 188 } 176 189 } 177 - pub fn prefix_from_cursor(cursor: Cursor) -> Result<Vec<u8>, EncodingError> { 178 - DbConcat::from_pair(ByCursorSeenPrefix::default(), cursor).to_db_bytes() 190 + pub fn full_range() -> Result<Range<Vec<u8>>, EncodingError> { 191 + let prefix = ByCursorSeenCursorPrefix::from_pair(Default::default(), Cursor::from_start()); 192 + prefix.range() 193 + } 194 + pub fn range_from(&self) -> Result<Range<Vec<u8>>, EncodingError> { 195 + let start = self.to_db_bytes()?; 196 + let end = self.prefix.range_end()?; 197 + Ok(start..end) 198 + } 199 + pub fn collection(&self) -> &Nsid { 200 + &self.suffix 201 + } 202 + } 203 + impl From<RollupCursorValue> for ByCursorSeenKey { 204 + fn from(v: RollupCursorValue) -> Self { 205 + Self::new(v.prefix, v.suffix) 179 206 } 180 207 } 181 208 impl From<ByCursorSeenKey> for (Cursor, Nsid) {