Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

handle batch full from consumer

+31 -12
+16 -12
ufos/src/consumer.rs
··· 8 8 use std::time::Duration; 9 9 use tokio::sync::mpsc::{channel, Receiver, Sender}; 10 10 11 - use crate::error::FirehoseEventError; 11 + use crate::error::{BatchInsertError, FirehoseEventError}; 12 12 use crate::{DeleteAccount, EventBatch, UFOsCommit}; 13 13 14 14 const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached. ··· 125 125 Ok(()) 126 126 } 127 127 128 - async fn handle_commit(&mut self, commit: UFOsCommit, nsid: Nsid) -> anyhow::Result<()> { 129 - if !self.current_batch.batch.commits_by_nsid.contains_key(&nsid) 130 - && self.current_batch.batch.commits_by_nsid.len() >= MAX_BATCHED_COLLECTIONS 131 - { 128 + async fn handle_commit(&mut self, commit: UFOsCommit, collection: Nsid) -> anyhow::Result<()> { 129 + let optimistic_res = self.current_batch.batch.insert_commit_by_nsid( 130 + &collection, 131 + commit, 132 + MAX_BATCHED_COLLECTIONS, 133 + ); 134 + 135 + if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res { 132 136 self.send_current_batch_now().await?; 137 + self.current_batch.batch.insert_commit_by_nsid( 138 + &collection, 139 + commit, 140 + MAX_BATCHED_COLLECTIONS, 141 + )?; 142 + } else { 143 + optimistic_res?; 133 144 } 134 - 135 - self.current_batch 136 - .batch 137 - .commits_by_nsid 138 - .entry(nsid) 139 - .or_default() 140 - .truncating_insert(commit); 141 145 142 146 Ok(()) 143 147 }
+15
ufos/src/lib.rs
··· 149 149 } 150 150 151 151 impl<const LIMIT: usize> EventBatch<LIMIT> { 152 + pub fn insert_commit_by_nsid( 153 + &mut self, 154 + collection: &Nsid, 155 + commit: UFOsCommit, 156 + max_collections: usize, 157 + ) -> Result<(), BatchInsertError> { 158 + let map = &mut self.commits_by_nsid; 159 + if !map.contains_key(collection) && map.len() >= max_collections { 160 + return Err(BatchInsertError::BatchFull(commit)); 161 + } 162 + map.entry(collection.clone()) 163 + .or_default() 164 + .truncating_insert(commit)?; 165 + Ok(()) 166 + } 152 167 pub fn total_records(&self) -> usize { 153 168 self.commits_by_nsid.values().map(|v| v.commits.len()).sum() 154 169 }