Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

backfill: rate-limit and don't die on parse fail

+48 -24
+6 -9
Cargo.lock
··· 225 225 226 226 [[package]] 227 227 name = "atrium-api" 228 - version = "0.25.2" 229 - source = "registry+https://github.com/rust-lang/crates.io-index" 230 - checksum = "0d4eb9b4787aba546015c8ccda1d3924c157cee13d67848997fba74ac8144a07" 228 + version = "0.25.3" 229 + source = "git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits#c4364f318d337bbc3e3e3aaf97c9f971e95f5f7e" 231 230 dependencies = [ 232 231 "atrium-common", 233 232 "atrium-xrpc", ··· 245 244 246 245 [[package]] 247 246 name = "atrium-common" 248 - version = "0.1.1" 249 - source = "registry+https://github.com/rust-lang/crates.io-index" 250 - checksum = "ba30d2f9e1a8b3db8fc97d0a5f91ee5a28f8acdddb771ad74c1b08eda357ca3d" 247 + version = "0.1.2" 248 + source = "git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits#c4364f318d337bbc3e3e3aaf97c9f971e95f5f7e" 251 249 dependencies = [ 252 250 "dashmap", 253 251 "lru", ··· 260 258 261 259 [[package]] 262 260 name = "atrium-xrpc" 263 - version = "0.12.2" 264 - source = "registry+https://github.com/rust-lang/crates.io-index" 265 - checksum = "18a9e526cb2ed3e0a2ca78c3ce2a943d9041a68e067dadf42923b523771e07df" 261 + version = "0.12.3" 262 + source = "git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits#c4364f318d337bbc3e3e3aaf97c9f971e95f5f7e" 266 263 dependencies = [ 267 264 "http", 268 265 "serde",
+1 -1
jetstream/Cargo.toml
··· 10 10 11 11 [dependencies] 12 12 async-trait = "0.1.83" 13 - atrium-api = { version = "0.25.2", default-features = false, features = [ 13 + atrium-api = { git = "https://github.com/uniphil/atrium", branch = "fix/nsid-allow-nonleading-name-digits", default-features = false, features = [ 14 14 "namespace-appbsky", 15 15 ] } 16 16 tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
+11 -2
ufos/src/consumer.rs
··· 8 8 use std::mem; 9 9 use std::time::Duration; 10 10 use tokio::sync::mpsc::{channel, Receiver, Sender}; 11 - use tokio::time::timeout; 11 + use tokio::time::{timeout, Interval}; 12 12 13 13 use crate::error::{BatchInsertError, FirehoseEventError}; 14 14 use crate::{DeleteAccount, EventBatch, UFOsCommit}; ··· 35 35 batch_sender: Sender<LimitedBatch>, 36 36 current_batch: CurrentBatch, 37 37 sketch_secret: SketchSecretPrefix, 38 + rate_limit: Interval, 38 39 } 39 40 40 41 pub async fn consume( ··· 65 66 .await?; 66 67 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE); 67 68 let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret); 68 - tokio::task::spawn(async move { batcher.run().await }); 69 + tokio::task::spawn(async move { 70 + let r = batcher.run().await; 71 + log::info!("batcher ended: {r:?}"); 72 + }); 69 73 Ok(batch_reciever) 70 74 } 71 75 ··· 75 79 batch_sender: Sender<LimitedBatch>, 76 80 sketch_secret: SketchSecretPrefix, 77 81 ) -> Self { 82 + let mut rate_limit = tokio::time::interval(std::time::Duration::from_micros(3_900)); 83 + rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 78 84 Self { 79 85 jetstream_receiver, 80 86 batch_sender, 81 87 current_batch: Default::default(), 82 88 sketch_secret, 89 + rate_limit, 83 90 } 84 91 } 85 92 86 93 pub async fn run(&mut self) -> anyhow::Result<()> { 94 + // TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish 87 95 loop { 88 96 match timeout(Duration::from_millis(9_000), self.jetstream_receiver.recv()).await { 89 97 Err(_elapsed) => self.no_events_step().await?, ··· 191 199 self.batch_sender.capacity(), 192 200 ); 193 201 let current = mem::take(&mut self.current_batch); 202 + self.rate_limit.tick().await; 194 203 self.batch_sender 195 204 .send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S)) 196 205 .await?;
+20 -7
ufos/src/file_consumer.rs
··· 12 12 async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>) -> Result<()> { 13 13 let mut lines = BufReader::new(f).lines(); 14 14 while let Some(line) = lines.next_line().await? { 15 - let event: JetstreamEvent = 16 - serde_json::from_str(&line).map_err(JetstreamEventError::ReceivedMalformedJSON)?; 17 - if sender.send(event).await.is_err() { 18 - log::warn!("All receivers for the jsonl fixture have been dropped, bye."); 19 - return Err(JetstreamEventError::ReceiverClosedError.into()); 15 + match serde_json::from_str::<JetstreamEvent>(&line) { 16 + Ok(event) => match sender.send(event).await { 17 + Ok(_) => {} 18 + Err(e) => { 19 + log::warn!("All receivers for the jsonl fixture have been dropped, bye: {e:?}"); 20 + return Err(JetstreamEventError::ReceiverClosedError.into()); 21 + } 22 + }, 23 + Err(parse_err) => { 24 + log::warn!("failed to parse event: {parse_err:?} from event:\n{line}"); 25 + continue; 26 + } 20 27 } 21 28 } 22 29 log::info!("reached end of jsonl file, looping on noop to keep server alive."); ··· 33 40 let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16); 34 41 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE); 35 42 let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret); 36 - tokio::task::spawn(async move { read_jsonl(f, jsonl_sender).await }); 37 - tokio::task::spawn(async move { batcher.run().await }); 43 + tokio::task::spawn(async move { 44 + let r = read_jsonl(f, jsonl_sender).await; 45 + log::info!("read_jsonl finished: {r:?}"); 46 + }); 47 + tokio::task::spawn(async move { 48 + let r = batcher.run().await; 49 + log::info!("batcher finished: {r:?}"); 50 + }); 38 51 Ok(batch_reciever) 39 52 }
-3
ufos/src/main.rs
··· 169 169 let mut last_cursor = None; 170 170 let mut last_rollup = None; 171 171 loop { 172 - log::info!("stat thing: sleeping"); 173 172 tokio::time::sleep(std::time::Duration::from_secs_f64(4.)).await; 174 - log::info!("stat thing: slept, getting info"); 175 173 match read_store.get_consumer_info().await { 176 174 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"), 177 175 Ok(ConsumerInfo::Jetstream { ··· 179 177 rollup_cursor, 180 178 .. 181 179 }) => { 182 - log::info!("stat thing: got info, reporting"); 183 180 let now = std::time::SystemTime::now(); 184 181 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64); 185 182 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64);
+10 -2
ufos/src/storage_fjall.rs
··· 979 979 let mut live_records_found = 0; 980 980 let mut latest_expired_feed_cursor = None; 981 981 let mut batch = self.keyspace.batch(); 982 - for kv in self.feeds.range(live_range).rev() { 982 + for (i, kv) in self.feeds.range(live_range).rev().enumerate() { 983 + if i > 1_000_000 { 984 + log::info!("stopping collection trim early: already scanned 1M elements"); 985 + break; 986 + } 983 987 let (key_bytes, val_bytes) = kv?; 984 988 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 985 989 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; ··· 1068 1072 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 1069 1073 1070 1074 let mut trim = 1071 - tokio::time::interval(Duration::from_millis(if backfill { 12_000 } else { 6_000 })); 1075 + tokio::time::interval(Duration::from_millis(if backfill { 3_000 } else { 6_000 })); 1072 1076 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1073 1077 1074 1078 loop { ··· 1090 1094 let (danglers, deleted) = self.0.trim_collection(collection, 512).inspect_err(|e| log::error!("trim error: {e:?}"))?; 1091 1095 total_danglers += danglers; 1092 1096 total_deleted += deleted; 1097 + if total_deleted > 1_000_000 { 1098 + log::info!("trim stopped early, more than 1M records already deleted."); 1099 + break; 1100 + } 1093 1101 } 1094 1102 log::info!("finished trimming {n} nsids in {:?}: {total_danglers} dangling and {total_deleted} total removed.", t0.elapsed()); 1095 1103 dirty_nsids.clear();