Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

crash: repro with local jsonl jetstream sample

+70 -25
+10 -10
ufos/src/consumer.rs
··· 11 11 use crate::error::{BatchInsertError, FirehoseEventError}; 12 12 use crate::{DeleteAccount, EventBatch, UFOsCommit}; 13 13 14 - const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached. 15 - const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case 16 - const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection 17 - const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe 18 - const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now 19 - const SEND_TIMEOUT_S: f64 = 15.; // if the channel is blocked longer than this, something is probably up 20 - const BATCH_QUEUE_SIZE: usize = 1; // nearly-rendez-vous 14 + pub const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached. 15 + pub const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case 16 + pub const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection 17 + pub const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe 18 + pub const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now 19 + pub const SEND_TIMEOUT_S: f64 = 15.; // if the channel is blocked longer than this, something is probably up 20 + pub const BATCH_QUEUE_SIZE: usize = 1; // nearly-rendez-vous 21 21 22 22 pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>; 23 23 ··· 28 28 } 29 29 30 30 #[derive(Debug)] 31 - struct Batcher { 31 + pub struct Batcher { 32 32 jetstream_receiver: JetstreamReceiver, 33 33 batch_sender: Sender<LimitedBatch>, 34 34 current_batch: CurrentBatch, ··· 66 66 } 67 67 68 68 impl Batcher { 69 - fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<LimitedBatch>) -> Self { 69 + pub fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<LimitedBatch>) -> Self { 70 70 Self { 71 71 jetstream_receiver, 72 72 batch_sender, ··· 74 74 } 75 75 } 76 76 77 - async fn run(&mut self) -> anyhow::Result<()> { 77 + pub async fn run(&mut self) -> anyhow::Result<()> { 78 78 loop { 79 79 if let Some(event) = self.jetstream_receiver.recv().await { 80 80 self.handle_event(event).await?
-1
ufos/src/db_types.rs
··· 49 49 } 50 50 51 51 fn bincode_conf() -> impl Config { 52 - log::trace!("bincode conf"); 53 52 standard() 54 53 .with_big_endian() 55 54 .with_fixed_int_encoding()
+37
ufos/src/file_consumer.rs
··· 1 + use anyhow::Result; 2 + use jetstream::{ 3 + error::JetstreamEventError, 4 + events::JetstreamEvent, 5 + }; 6 + use std::path::PathBuf; 7 + use tokio::{ 8 + fs::File, 9 + io::{AsyncBufReadExt, BufReader}, 10 + sync::mpsc::{Sender, Receiver, channel}, 11 + }; 12 + use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE}; 13 + 14 + async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>) -> Result<()> { 15 + let mut lines = BufReader::new(f).lines(); 16 + while let Some(line) = lines.next_line().await? { 17 + let event: JetstreamEvent = serde_json::from_str(&line) 18 + .map_err(JetstreamEventError::ReceivedMalformedJSON)?; 19 + if sender.send(event).await.is_err() { 20 + log::info!("All receivers for the jsonl fixture have been dropped, bye."); 21 + return Err(JetstreamEventError::ReceiverClosedError.into()); 22 + } 23 + } 24 + Ok(()) 25 + } 26 + 27 + pub async fn consume( 28 + p: PathBuf, 29 + ) -> Result<Receiver<LimitedBatch>> { 30 + let f = File::open(p).await?; 31 + let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16); 32 + let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE); 33 + let mut batcher = Batcher::new(jsonl_receiver, batch_sender); 34 + tokio::task::spawn(async move { read_jsonl(f, jsonl_sender).await }); 35 + tokio::task::spawn(async move { batcher.run().await }); 36 + Ok(batch_reciever) 37 + }
+1
ufos/src/lib.rs
··· 1 1 pub mod consumer; 2 2 pub mod db_types; 3 3 pub mod error; 4 + pub mod file_consumer; 4 5 pub mod server; 5 6 pub mod storage; 6 7 pub mod storage_fjall;
+21 -13
ufos/src/main.rs
··· 1 1 use clap::Parser; 2 2 use std::path::PathBuf; 3 3 use ufos::consumer; 4 + use ufos::file_consumer; 4 5 use ufos::error::StorageError; 5 - use ufos::server; 6 + // use ufos::server; 6 7 use jetstream::events::Cursor; 7 8 use ufos::storage::{StorageWhatever, StoreReader, StoreWriter}; 8 9 use ufos::storage_fjall::FjallStorage; ··· 45 46 /// DEBUG: use an in-memory store instead of fjall 46 47 #[arg(long, action)] 47 48 in_mem: bool, 49 + /// DEBUG: interpret jetstream as a file fixture 50 + #[arg(long, action)] 51 + jetstream_fixture: bool, 48 52 } 49 53 50 54 // #[tokio::main(flavor = "current_thread")] // TODO: move this to config via args ··· 61 65 args.jetstream_force, 62 66 Default::default(), 63 67 )?; 64 - go(args.jetstream, args.pause_writer, read_store, write_store, cursor).await?; 68 + go(args.jetstream, args.jetstream_fixture, args.pause_writer, read_store, write_store, cursor).await?; 65 69 } else { 66 70 let (read_store, write_store, cursor) = FjallStorage::init( 67 71 args.data, ··· 69 73 args.jetstream_force, 70 74 Default::default(), 71 75 )?; 72 - go(args.jetstream, args.pause_writer, read_store, write_store, cursor).await?; 76 + go(args.jetstream, args.jetstream_fixture, args.pause_writer, read_store, write_store, cursor).await?; 73 77 } 74 78 75 79 Ok(()) ··· 77 81 78 82 async fn go( 79 83 jetstream: String, 84 + jetstream_fixture: bool, 80 85 pause_writer: bool, 81 - read_store: impl StoreReader + 'static, 86 + _read_store: impl StoreReader + 'static, 82 87 mut write_store: impl StoreWriter + 'static, 83 88 cursor: Option<Cursor>, 84 89 ) -> anyhow::Result<()> { 85 - println!("starting server with storage..."); 86 - let serving = server::serve(read_store); 90 + // println!("starting server with storage..."); 91 + // let serving = server::serve(read_store); 87 92 88 - let t1 = tokio::task::spawn(async { 89 - let r = serving.await; 90 - log::warn!("serving ended with: {r:?}"); 91 - }); 93 + // let t1 = tokio::task::spawn(async { 94 + // let r = serving.await; 95 + // log::warn!("serving ended with: {r:?}"); 96 + // }); 92 97 93 98 let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 94 99 async move { ··· 97 102 "starting consumer with cursor: {cursor:?} from {:?} ago", 98 103 cursor.map(|c| c.elapsed()) 99 104 ); 100 - let mut batches = 101 - consumer::consume(&jetstream, cursor, false).await?; 105 + let mut batches = if jetstream_fixture { 106 + file_consumer::consume(jetstream.into()).await? 107 + } else { 108 + consumer::consume(&jetstream, cursor, false).await? 109 + }; 102 110 103 111 log::info!("started consumer, got chan etc..."); 104 112 ··· 142 150 143 151 log::trace!("tasks running. waiting."); 144 152 tokio::select! { 145 - z = t1 => log::warn!("serve task ended: {z:?}"), 153 + // z = t1 => log::warn!("serve task ended: {z:?}"), 146 154 z = t2 => log::warn!("storage task ended: {z:?}"), 147 155 }; 148 156
+1 -1
ufos/src/storage_mem.rs
··· 659 659 660 660 log::warn!("about to loop...."); 661 661 for (i, kv) in timelies.enumerate() { 662 - // log::warn!("loop {i}..."); 662 + log::warn!("loop {i} {kv:?}..."); 663 663 if i >= rollup_limit { 664 664 break; 665 665 }