···439439440440 if let Some(last) = last_cursor {
441441 if event_cursor <= *last {
442442- log::warn!("event cursor {event_cursor:?} was older than the last one: {last:?}. dropping event.");
442442+ log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
443443 continue;
444444 }
445445 }
···475475476476 if let Some(last) = last_cursor {
477477 if event_cursor <= *last {
478478- log::warn!("event cursor {event_cursor:?} was older than the last one: {last:?}. dropping event.");
478478+ log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
479479 continue;
480480 }
481481 }
+22-11
ufos/src/consumer.rs
···88use std::mem;
99use std::time::Duration;
1010use tokio::sync::mpsc::{channel, Receiver, Sender};
1111+use tokio::time::timeout;
11121213use crate::error::{BatchInsertError, FirehoseEventError};
1314use crate::{DeleteAccount, EventBatch, UFOsCommit};
···84858586 pub async fn run(&mut self) -> anyhow::Result<()> {
8687 loop {
8787- if let Some(event) = self.jetstream_receiver.recv().await {
8888- self.handle_event(event).await?
8989- } else {
9090- anyhow::bail!("channel closed");
8888+ match timeout(Duration::from_millis(9_000), self.jetstream_receiver.recv()).await {
8989+ Err(_elapsed) => self.no_events_step().await?,
9090+ Ok(Some(event)) => self.handle_event(event).await?,
9191+ Ok(None) => anyhow::bail!("channel closed"),
9192 }
9293 }
9394 }
94959696+ async fn no_events_step(&mut self) -> anyhow::Result<()> {
9797+ let empty = self.current_batch.batch.is_empty();
9898+ log::info!("no events received, stepping batcher (empty? {empty})");
9999+ if !empty {
100100+ self.send_current_batch_now(true, "no events step").await?;
101101+ }
102102+ Ok(())
103103+ }
104104+95105 async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> {
96106 if let Some(earliest) = &self.current_batch.initial_cursor {
97107 if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
98108 {
9999- self.send_current_batch_now(false).await?;
109109+ self.send_current_batch_now(false, "time since event")
110110+ .await?;
100111 }
101112 } else {
102113 self.current_batch.initial_cursor = Some(event.cursor);
···126137 if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
127138 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
128139 {
129129- self.send_current_batch_now(true).await?;
140140+ self.send_current_batch_now(true, "available queue").await?;
130141 }
131142 }
132143 Ok(())
···141152 );
142153143154 if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
144144- self.send_current_batch_now(false).await?;
155155+ self.send_current_batch_now(false, "handle commit").await?;
145156 self.current_batch.batch.insert_commit_by_nsid(
146157 &collection,
147158 commit,
···157168158169 async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> {
159170 if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES {
160160- self.send_current_batch_now(false).await?;
171171+ self.send_current_batch_now(false, "delete account").await?;
161172 }
162173 self.current_batch
163174 .batch
···168179169180 // holds up all consumer progress until it can send to the channel
170181 // use this when the current batch is too full to add more to it
171171- async fn send_current_batch_now(&mut self, small: bool) -> anyhow::Result<()> {
182182+ async fn send_current_batch_now(&mut self, small: bool, referrer: &str) -> anyhow::Result<()> {
172183 let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
173184 None => "unknown".to_string(),
174185 Some(Ok(t)) => format!("{:?}", t),
175186 Some(Err(e)) => format!("+{:?}", e.duration()),
176187 };
177177- log::info!(
178178- "sending batch now from {beginning}, {}, queue capacity: {}",
188188+ log::trace!(
189189+ "sending batch now from {beginning}, {}, queue capacity: {}, referrer: {referrer}",
179190 if small { "small" } else { "full" },
180191 self.batch_sender.capacity(),
181192 );
+4-1
ufos/src/file_consumer.rs
···1919 return Err(JetstreamEventError::ReceiverClosedError.into());
2020 }
2121 }
2222- Ok(())
2222+ log::info!("reached end of jsonl file, looping on noop to keep server alive.");
2323+ loop {
2424+ tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await;
2525+ }
2326}
24272528pub async fn consume(