···382382 websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor)
383383 .await
384384 {
385385+ match e {
386386+ JetstreamEventError::ReceiverClosedError => {
387387+ log::error!("Jetstream receiver channel closed. Exiting consumer.");
388388+ return;
389389+ }
390390+ _ => {}
391391+ }
385392 log::error!("Jetstream closed after encountering error: {e:?}");
386393 } else {
387394 log::error!("Jetstream connection closed cleanly");
···404411405412 if retry_attempt > 0 {
406413 // Exponential backoff
407407- let delay = (base_delay_ms * (2_u64.pow(retry_attempt))).min(max_delay_ms);
408408- log::error!("Connection failed, retrying in {delay}ms...");
414414+ let delay =
415415+ (base_delay_ms * (2_u64.saturating_pow(retry_attempt))).min(max_delay_ms);
416416+ log::error!("Connection failed, retry #{retry_attempt} in {delay}ms...");
409417 tokio::time::sleep(Duration::from_millis(delay)).await;
410418 log::info!("Attempting to reconnect...");
411419 }
···451459 log::info!(
452460 "All receivers for the Jetstream connection have been dropped, closing connection."
453461 );
454454- closing_connection = true;
462462+ socket_write.close().await?;
463463+ return Err(JetstreamEventError::ReceiverClosedError);
455464 } else if let Some(last) = last_cursor.as_mut() {
456465 *last = event_cursor;
457466 }
···484493 // We can assume that all receivers have been dropped, so we can close
485494 // the connection and exit the task.
486495 log::info!(
487487- "All receivers for the Jetstream connection have been dropped, closing connection..."
496496+ "All receivers for the Jetstream connection have been dropped, closing connection."
488497 );
489489- closing_connection = true;
498498+ socket_write.close().await?;
499499+ return Err(JetstreamEventError::ReceiverClosedError);
490500 } else if let Some(last) = last_cursor.as_mut() {
491501 *last = event_cursor;
492502 }
+27
ufos/readme.md
···1616cross build --release --target arm-unknown-linux-gnueabihf && scp ../target/arm-unknown-linux-gnueabihf/release/ufos angel-hair.local:ufos
1717```
18181919+for bonilla (rp4)
2020+2121+```bash
2222+cross build --release --target armv7-unknown-linux-gnueabihf && scp ../target/armv7-unknown-linux-gnueabihf/release/ufos pi@bonilla.local:ufos
2323+```
2424+2525+glibc will cause problems when switching between (`GLIBC_2.25` message). clean up (next build will be slowww)
2626+2727+```bash
2828+cargo clean
2929+```
3030+1931nginx forward proxy for websocket (run this on another host):
20322133```nginx
···95107```
9610897109try without info-level logs for better perf
110110+111111+running on bonilla
112112+113113+```bash
114114+./ufos --jetstream us-west-2 --jetstream-force --data /mnt/ufos-data-no-compression-2/
115115+```
116116+117117+(reusing data dir from angel-hair)
118118+119119+120120+ipv6 is having some trouble. but also maybe there's a deadlock somewhere
121121+122122+```bash
123123+sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
124124+```
+12-8
ufos/src/consumer.rs
···1919const MAX_ACCOUNT_REMOVES: usize = 512; // hard limit, total account deletions. actually the least frequent event, but tiny.
2020const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per collection
2121const MIN_BATCH_SPAN_SECS: f64 = 2.; // try to get a bit of rest a bit.
2222-const MAX_BATCH_SPAN_SECS: f64 = 10.; // hard limit of duration from oldest to latest event cursor within a batch, in seconds.
2323-2424-const SEND_TIMEOUT_S: f64 = 6.;
2222+const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit of duration from oldest to latest event cursor within a batch, in seconds.
25232626-const BATCH_QUEUE_SIZE: usize = 32;
2727-// const BATCH_QUEUE_SIZE: usize = 4096;
2424+const SEND_TIMEOUT_S: f64 = 60.;
2525+const BATCH_QUEUE_SIZE: usize = 1024; // 4096 got OOM'd
28262927#[derive(Debug)]
3028struct Batcher {
···3836 cursor: Option<Cursor>,
3937 no_compress: bool,
4038) -> anyhow::Result<Receiver<EventBatch>> {
3939+ let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
4040+ if endpoint == jetstream_endpoint {
4141+ eprintln!("connecting to jetstream at {endpoint}");
4242+ } else {
4343+ eprintln!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
4444+ }
4145 let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
4242- endpoint: DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint),
4646+ endpoint,
4347 compression: if no_compress {
4448 JetstreamCompression::None
4549 } else {
···124128 if event_cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
125129 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
126130 {
127127- log::warn!("queue empty: immediately sending batch.");
131131+ log::trace!("queue empty: immediately sending batch.");
128132 if let Err(send_err) = self
129133 .batch_sender
130134 .send(mem::take(&mut self.current_batch))
···140144 // holds up all consumer progress until it can send to the channel
141145 // use this when the current batch is too full to add more to it
142146 async fn send_current_batch_now(&mut self) -> anyhow::Result<()> {
143143- log::warn!("attempting to send batch now");
147147+ log::warn!("attempting to send batch now (capacity: {})", self.batch_sender.capacity());
144148 self.batch_sender
145149 .send_timeout(
146150 mem::take(&mut self.current_batch),
+33-7
ufos/src/main.rs
···3030 data: PathBuf,
3131}
32323333-// #[tokio::main]
3434-#[tokio::main(flavor = "current_thread")] // TODO: move this to config via args
3333+// #[tokio::main(flavor = "current_thread")] // TODO: move this to config via args
3434+#[tokio::main]
3535async fn main() -> anyhow::Result<()> {
3636 env_logger::init();
3737···4848 println!("starting server with storage...");
4949 let serving = server::serve(storage.clone());
50505151- tokio::select! {
5252- v = serving => eprintln!("serving ended: {v:?}"),
5353- v = storage.receive(batches) => eprintln!("storage consumer ended: {v:?}"),
5454- v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"),
5555- };
5151+ let t1 = tokio::task::spawn(async {
5252+ let r = serving.await;
5353+ log::warn!("serving ended with: {r:?}");
5454+ });
5555+5656+ let t2 = tokio::task::spawn({
5757+ let storage = storage.clone();
5858+ async move {
5959+ let r = storage.receive(batches).await;
6060+ log::warn!("storage.receive ended with: {r:?}");
6161+ }
6262+ });
6363+6464+ let t3 = tokio::task::spawn(async move {
6565+ let r = storage.rw_loop().await;
6666+ log::warn!("storage.rw_loop ended with: {r:?}");
6767+ });
6868+6969+ // tokio::select! {
7070+ // // v = serving => eprintln!("serving ended: {v:?}"),
7171+ // v = storage.receive(batches) => eprintln!("storage consumer ended: {v:?}"),
7272+ // v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"),
7373+ // };
7474+7575+ log::trace!("tasks running. waiting.");
7676+ t1.await?;
7777+ log::trace!("serve task ended.");
7878+ t2.await?;
7979+ log::trace!("storage receive task ended.");
8080+ t3.await?;
8181+ log::trace!("storage rw task ended.");
56825783 println!("bye!");
5884
+68-23
ufos/src/store.rs
···11+// use std::sync::{Arc, Mutex}; // BLOCKING mutex -- only in spawn_blocking tasks!
12use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr};
23use crate::store_types::{
34 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue,
···1819use tokio::{sync::mpsc::Receiver, time::sleep};
19202021/// Commit the RW batch immediately if this nubmer of events have been read off the mod queue
2121-const MAX_BATCHED_RW_EVENTS: usize = 18;
2222+const MAX_BATCHED_RW_EVENTS: usize = 3;
22232324/// Commit the RW batch immediately if this number of records is reached
2425///
···5859pub struct Storage {
5960 keyspace: Keyspace,
6061 partition: PartitionHandle,
6262+ // write_lock: Arc<Mutex<()>>,
6163}
62646365impl Storage {
···7072 Ok(Self {
7173 keyspace,
7274 partition,
7575+ // write_lock: Arc::new(Mutex::new(())),
7376 })
7477 }
7578···108111 // TODO: see rw_loop: enforce single-thread.
109112 loop {
110113 let t_sleep = Instant::now();
111111- sleep(Duration::from_secs_f64(0.3)).await; // TODO: minimize during replay
114114+ sleep(Duration::from_secs_f64(0.8)).await; // TODO: minimize during replay
112115 let slept_for = t_sleep.elapsed();
113116 let queue_size = receiver.len();
114117115118 if let Some(event_batch) = receiver.recv().await {
119119+ log::trace!("write: received write batch");
116120 let batch_summary = summarize_batch(&event_batch);
117121118122 let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first.
119123120124 let keyspace = self.keyspace.clone();
121125 let partition = self.partition.clone();
126126+ // let write_lock = self.write_lock.clone();
122127123128 let writer_t0 = Instant::now();
129129+ log::trace!("spawn_blocking for write batch");
124130 tokio::task::spawn_blocking(move || {
125131 DBWriter {
126132 keyspace,
127133 partition,
134134+ // write_lock,
128135 }
129136 .write_batch(event_batch, last)
130137 })
131138 .await??;
139139+ log::trace!("write: back from blocking task, successfully wrote batch");
132140 let wrote_for = writer_t0.elapsed();
133141134142 println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}");
135143 } else {
144144+ log::error!("store consumer: receive channel failed (dropped/closed?)");
136145 anyhow::bail!("receive channel closed");
137146 }
138147 }
···145154 sleep(Duration::from_secs_f64(0.001)).await; // todo: interval rate-limit instead
146155 let keyspace = self.keyspace.clone();
147156 let partition = self.partition.clone();
157157+ log::trace!("rw: spawn blocking for batch...");
148158 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
159159+ log::trace!("rw: getting rw cursor...");
149160 let mod_cursor = get_static::<ModCursorKey, ModCursorValue>(&partition)?
150161 .unwrap_or(Cursor::from_start());
151162 let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?;
152163153164 let mut db_batch = keyspace.batch();
154165 let mut batched_rw_items = 0;
166166+ let mut any_tasks_found = false;
155167168168+ log::trace!("rw: iterating newer rw items...");
156169 for (i, pair) in partition.range(range.clone()).enumerate() {
170170+ log::trace!("rw: iterating {i}");
171171+ any_tasks_found = true;
172172+157173 if i >= MAX_BATCHED_RW_EVENTS {
158174 break;
159175 }
···170186 let mod_value: ModQueueItemValue =
171187 db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
172188189189+ log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
173190 batched_rw_items += DBWriter {
174191 keyspace: keyspace.clone(),
175192 partition: partition.clone(),
176193 }
177194 .write_rw(&mut db_batch, mod_key, mod_value)?;
195195+ log::trace!("rw: iterating {i}: back from batcher.");
178196179197 if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
198198+ log::trace!("rw: iterating {i}: batch big enough, breaking out.");
180199 break;
181200 }
182201 }
183202184184- db_batch.commit()?;
203203+ if !any_tasks_found {
204204+ log::trace!("rw: skipping batch commit since apparently no items were added (this is normal, skipping is new)");
205205+ return Ok(());
206206+ }
207207+208208+ log::info!("rw: committing rw batch with {batched_rw_items} items (items != total inserts/deletes)...");
209209+ let r = db_batch.commit();
210210+ log::info!("rw: commit result: {r:?}");
211211+ r?;
185212 Ok(())
186213 })
187214 .await??;
215215+ log::trace!("rw: back from blocking for rw...");
188216 }
217217+ // log::warn!("exited rw loop (rw task)");
189218 }
190219191220 pub async fn get_collection_records(
···352381 if let Some(cursor) = last {
353382 insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.partition, cursor)?;
354383 }
355355- Ok(db_batch.commit()?)
384384+ log::info!("write: committing write batch...");
385385+ let r = db_batch.commit();
386386+ log::info!("write: commit result: {r:?}");
387387+ r?;
388388+ Ok(())
356389 }
357390358391 fn write_rw(
···367400368401 let items_modified = match mod_value {
369402 ModQueueItemValue::DeleteAccount(did) => {
403403+ log::trace!("rw: batcher: delete account...");
370404 let (items, finished) = self.delete_account(db_batch, mod_cursor, did)?;
405405+ log::trace!("rw: batcher: back from delete account (finished? {finished})");
371406 if finished {
372407 // only remove the queued rw task if we have actually completed its account removal work
373408 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?;
409409+ items + 1
410410+ } else {
411411+ items
374412 }
375375- items
376413 }
377414 ModQueueItemValue::DeleteRecord(did, collection, rkey) => {
415415+ log::trace!("rw: batcher: delete record...");
416416+ let items = self.delete_record(db_batch, mod_cursor, did, collection, rkey)?;
417417+ log::trace!("rw: batcher: back from delete record");
378418 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?;
379379- self.delete_record(db_batch, mod_cursor, did, collection, rkey)?
419419+ items + 1
380420 }
381421 ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => {
422422+ let items = self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?;
382423 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?;
383383- self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?
424424+ items + 1
384425 }
385426 };
386427 Ok(items_modified)
···423464 ByIdKey::record_prefix(did, collection.clone(), rkey).to_db_bytes()?;
424465425466 let mut items_removed = 0;
426426- for pair in self.partition.prefix(&key_prefix_bytes) {
467467+468468+ log::trace!("delete_record: iterate over prefix(!)...");
469469+ for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() {
470470+ log::trace!("delete_record iter {i}: found");
427471 // find all (hopefully 1)
428472 let (key_bytes, _) = pair?;
429473 let key = db_complete::<ByIdKey>(&key_bytes)?;
430474 let found_cursor = key.cursor();
431475 if found_cursor > cursor {
432476 // we are *only* allowed to delete records that came before the record delete event
433433- eprintln!("delete_record: found (and ignoring) newer version(s). key: {key:?}");
477477+ log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}");
434478 break;
435479 }
436480···445489 items_removed += 1;
446490 }
447491448448- if items_removed > 1 {
449449- eprintln!("odd, removed {items_removed} records for one record removal:");
450450- for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() {
451451- // find all (hopefully 1)
452452- let (key_bytes, _) = pair?;
453453- let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor();
454454- if found_cursor > cursor {
455455- break;
456456- }
492492+ // if items_removed > 1 {
493493+ // log::trace!("odd, removed {items_removed} records for one record removal:");
494494+ // for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() {
495495+ // // find all (hopefully 1)
496496+ // let (key_bytes, _) = pair?;
497497+ // let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor();
498498+ // if found_cursor > cursor {
499499+ // break;
500500+ // }
457501458458- let key = db_complete::<ByIdKey>(&key_bytes)?;
459459- eprintln!(" {i}: key {key:?}");
460460- }
461461- }
502502+ // let key = db_complete::<ByIdKey>(&key_bytes)?;
503503+ // log::trace!(" {i}: key {key:?}");
504504+ // }
505505+ // }
462506 Ok(items_removed)
463507 }
464508···476520477521 let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into();
478522 if found_cursor > cursor {
479479- eprintln!(
523523+ log::trace!(
480524 "delete account: found (and ignoring) newer records than the delete event??"
481525 );
482526 continue;
···495539 return Ok((items_added, false)); // there might be more records but we've done enough for this batch
496540 }
497541 }
542542+498543499544 Ok((items_added, true))
500545 }