Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

pausible jetstream consumer + get top collections

+104 -35
+3 -6
jetstream/src/lib.rs
··· 382 382 websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor) 383 383 .await 384 384 { 385 - match e { 386 - JetstreamEventError::ReceiverClosedError => { 387 - log::error!("Jetstream receiver channel closed. Exiting consumer."); 388 - return; 389 - } 390 - _ => {} 385 + if let JetstreamEventError::ReceiverClosedError = e { 386 + log::error!("Jetstream receiver channel closed. Exiting consumer."); 387 + return; 391 388 } 392 389 log::error!("Jetstream closed after encountering error: {e:?}"); 393 390 } else {
+4 -1
ufos/src/consumer.rs
··· 144 144 // holds up all consumer progress until it can send to the channel 145 145 // use this when the current batch is too full to add more to it 146 146 async fn send_current_batch_now(&mut self) -> anyhow::Result<()> { 147 - log::warn!("attempting to send batch now (capacity: {})", self.batch_sender.capacity()); 147 + log::warn!( 148 + "attempting to send batch now (capacity: {})", 149 + self.batch_sender.capacity() 150 + ); 148 151 self.batch_sender 149 152 .send_timeout( 150 153 mem::take(&mut self.current_batch),
+18 -10
ufos/src/main.rs
··· 28 28 /// Location to store persist data to disk 29 29 #[arg(long)] 30 30 data: PathBuf, 31 + /// DEBUG: don't start the jetstream consumer or its write loop 32 + #[arg(long, action)] 33 + pause_writer: bool, 31 34 /// DEBUG: force the rw loop to fall behind by pausing it 32 35 #[arg(long, action)] 33 36 pause_rw: bool, ··· 42 45 let (storage, cursor) = 43 46 store::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?; 44 47 45 - println!( 46 - "starting consumer with cursor: {cursor:?} from {:?} ago", 47 - cursor.clone().map(|c| c.elapsed()) 48 - ); 49 - let batches = consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?; 50 - 51 48 println!("starting server with storage..."); 52 49 let serving = server::serve(storage.clone()); 53 50 ··· 56 53 log::warn!("serving ended with: {r:?}"); 57 54 }); 58 55 59 - let t2 = tokio::task::spawn({ 56 + let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 60 57 let storage = storage.clone(); 61 58 async move { 62 - let r = storage.receive(batches).await; 63 - log::warn!("storage.receive ended with: {r:?}"); 59 + if !args.pause_writer { 60 + println!( 61 + "starting consumer with cursor: {cursor:?} from {:?} ago", 62 + cursor.clone().map(|c| c.elapsed()) 63 + ); 64 + let batches = 65 + consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?; 66 + let r = storage.receive(batches).await; 67 + log::warn!("storage.receive ended with: {r:?}"); 68 + } else { 69 + log::info!("not starting jetstream or the write loop."); 70 + } 71 + Ok(()) 64 72 } 65 73 }); 66 74 ··· 82 90 log::trace!("tasks running. waiting."); 83 91 t1.await?; 84 92 log::trace!("serve task ended."); 85 - t2.await?; 93 + t2.await??; 86 94 log::trace!("storage receive task ended."); 87 95 t3.await?; 88 96 log::trace!("storage rw task ended.");
+21 -2
ufos/src/server.rs
··· 12 12 use dropshot::ServerBuilder; 13 13 use schemars::JsonSchema; 14 14 use serde::{Deserialize, Serialize}; 15 + use std::collections::HashMap; 15 16 use std::sync::Arc; 16 17 17 18 #[derive(Clone)] ··· 153 154 method = GET, 154 155 path = "/records/total-seen" 155 156 }] 156 - async fn get_asdf( 157 + async fn get_records_total_seen( 157 158 ctx: RequestContext<Context>, 158 159 collection_query: Query<CollectionQuery>, 159 160 ) -> Result<HttpResponseOk<u64>, HttpError> { ··· 172 173 Ok(HttpResponseOk(total)) 173 174 } 174 175 176 + /// Get top collections 177 + #[endpoint { 178 + method = GET, 179 + path = "/collections" 180 + }] 181 + async fn get_top_collections( 182 + ctx: RequestContext<Context>, 183 + ) -> Result<HttpResponseOk<HashMap<String, u64>>, HttpError> { 184 + let Context { storage, .. } = ctx.context(); 185 + let collections = storage 186 + .get_top_collections() 187 + .await 188 + .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 189 + 190 + Ok(HttpResponseOk(collections)) 191 + } 192 + 175 193 pub async fn serve(storage: Storage) -> Result<(), String> { 176 194 let log = ConfigLogging::StderrTerminal { 177 195 level: ConfigLoggingLevel::Info, ··· 184 202 api.register(get_openapi).unwrap(); 185 203 api.register(get_meta_info).unwrap(); 186 204 api.register(get_records_by_collection).unwrap(); 187 - api.register(get_asdf).unwrap(); 205 + api.register(get_records_total_seen).unwrap(); 206 + api.register(get_top_collections).unwrap(); 188 207 189 208 let context = Context { 190 209 spec: Arc::new(
+58 -16
ufos/src/store.rs
··· 1 - use std::sync::Arc; 2 1 use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr}; 3 2 use crate::store_types::{ 4 3 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, ··· 15 14 use jetstream::events::Cursor; 16 15 use std::collections::HashMap; 17 16 use std::path::{Path, PathBuf}; 17 + use std::sync::Arc; 18 18 use std::time::{Duration, Instant}; 19 + use tokio::sync::mpsc::Receiver; 19 20 use tokio::time::sleep; 20 - use tokio::sync::{mpsc::Receiver}; 21 21 22 22 /// Commit the RW batch immediately if this number of events have been read off the mod queue 23 23 const MAX_BATCHED_RW_EVENTS: usize = 18; ··· 30 30 /// 31 31 /// this is higher than [MAX_BATCHED_RW_EVENTS] because account-deletes can have lots of items 32 32 const MAX_BATCHED_RW_ITEMS: usize = 24; 33 - 34 33 35 34 #[derive(Clone)] 36 35 struct SerialDb { ··· 89 88 PartitionCreateOptions::default().compression(CompressionType::None), 90 89 )?; 91 90 Ok(Self { 92 - db: Arc::new(FakeMutex::new(SerialDb { keyspace, partition })), 91 + db: Arc::new(FakeMutex::new(SerialDb { 92 + keyspace, 93 + partition, 94 + })), 93 95 }) 94 96 } 95 97 ··· 191 193 //// ITER 192 194 193 195 { 194 - let iterator = partition.range(range.clone()).enumerate().into_iter(); 196 + let iterator = partition.range(range.clone()).enumerate(); 195 197 196 198 for (i, pair) in iterator { 197 199 log::trace!("rw: iterating {i}"); ··· 256 258 tokio::task::spawn_blocking(move || { 257 259 let mut output = Vec::new(); 258 260 259 - 260 261 ////// ITER 261 262 { 262 263 for pair in partition.prefix(&prefix).rev().take(limit) { ··· 294 295 pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> { 295 296 let partition = self.db.lock().await.partition.clone(); 296 297 let collection = collection.clone(); 297 - tokio::task::spawn_blocking(move || get_unrolled_asdf(&partition, collection)).await? 298 + tokio::task::spawn_blocking(move || get_unrolled_collection_seen(&partition, collection)) 299 + .await? 300 + } 301 + 302 + pub async fn get_top_collections(&self) -> anyhow::Result<HashMap<String, u64>> { 303 + let partition = self.db.lock().await.partition.clone(); 304 + tokio::task::spawn_blocking(move || get_unrolled_top_collections(&partition)).await? 298 305 } 299 306 300 307 pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> { ··· 374 381 } 375 382 376 383 /// Get stats that haven't been rolled up yet 377 - fn get_unrolled_asdf(partition: &PartitionHandle, collection: Nsid) -> anyhow::Result<u64> { 384 + fn get_unrolled_collection_seen( 385 + partition: &PartitionHandle, 386 + collection: Nsid, 387 + ) -> anyhow::Result<u64> { 378 388 let range = 379 389 if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? { 380 390 eprintln!("found existing cursor"); ··· 389 399 390 400 let mut scanned = 0; 391 401 let mut rolled = 0; 392 - 393 402 394 403 ////// ITER 395 404 ··· 413 422 Ok(collection_total) 414 423 } 415 424 425 + fn get_unrolled_top_collections( 426 + partition: &PartitionHandle, 427 + ) -> anyhow::Result<HashMap<String, u64>> { 428 + let range = 429 + if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? { 430 + eprintln!("found existing cursor"); 431 + let key: ByCursorSeenKey = cursor_value.into(); 432 + key.range_from()? 433 + } else { 434 + eprintln!("cursor from start."); 435 + ByCursorSeenKey::full_range()? 436 + }; 437 + 438 + let mut res = HashMap::new(); 439 + let mut scanned = 0; 440 + 441 + for pair in partition.range(range) { 442 + let (key_bytes, value_bytes) = pair?; 443 + let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 444 + let SeenCounter(n) = db_complete(&value_bytes)?; 445 + 446 + *res.entry(key.collection().to_string()).or_default() += n; 447 + 448 + scanned += 1; 449 + } 450 + 451 + eprintln!("scanned: {scanned} seen-counts."); 452 + 453 + Ok(res) 454 + } 455 + 416 456 impl DBWriter { 417 457 fn write_batch(self, event_batch: EventBatch, last: Option<Cursor>) -> anyhow::Result<()> { 418 458 let mut db_batch = self.keyspace.batch(); ··· 460 500 items + 1 461 501 } 462 502 ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 463 - let items = self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?; 503 + let items = 504 + self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?; 464 505 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 465 506 items + 1 466 507 } ··· 512 553 513 554 log::trace!("delete_record: iterate over up to current cursor..."); 514 555 515 - 516 556 ////////// ITER 517 557 518 558 { 519 - for (i, pair) in self.partition.range(key_prefix_bytes..key_limit).enumerate() { 559 + for (i, pair) in self 560 + .partition 561 + .range(key_prefix_bytes..key_limit) 562 + .enumerate() 563 + { 520 564 log::trace!("delete_record iter {i}: found"); 521 565 // find all (hopefully 1) 522 566 let (key_bytes, _) = pair?; ··· 568 612 569 613 let mut items_added = 0; 570 614 571 - 572 - 573 615 ////////// ITER 574 616 575 617 { 576 618 for pair in self.partition.prefix(&key_prefix_bytes) { 577 619 let (key_bytes, _) = pair?; 578 620 579 - let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into(); 621 + let (_, collection, _rkey, found_cursor) = 622 + db_complete::<ByIdKey>(&key_bytes)?.into(); 580 623 if found_cursor > cursor { 581 624 log::trace!( 582 625 "delete account: found (and ignoring) newer records than the delete event??" ··· 598 641 } 599 642 } 600 643 } 601 - 602 644 603 645 Ok((items_added, true)) 604 646 }