alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
chill out rollups if we're caught up
author
phil
date
1 year ago
(May 7, 2025, 2:20 PM -0400)
commit
f32b1932
f32b1932534cef1b20ca8cf415fea27f2173efa9
parent
974a82d7
974a82d70eaa4aa3a94a40f29d3a26fe29ed0b1a
+6
-13
3 changed files
Expand all
Collapse all
Unified
Split
ufos
src
main.rs
storage_fjall.rs
store_types.rs
-10
ufos/src/main.rs
Reviewed
···
124
124
let rolling = write_store.background_tasks()?.run();
125
125
let storing = write_store.receive_batches(batches);
126
126
127
127
-
// let storing = tokio::task::spawn_blocking(move || {
128
128
-
// while let Some(event_batch) = batches.blocking_recv() {
129
129
-
// write_store.insert_batch(event_batch)?;
130
130
-
// write_store
131
131
-
// .step_rollup()
132
132
-
// .inspect_err(|e| log::error!("rollup error: {e:?}"))?;
133
133
-
// }
134
134
-
// Ok::<(), StorageError>(())
135
135
-
// });
136
136
-
137
127
tokio::select! {
138
128
z = serving => log::warn!("serve task ended: {z:?}"),
139
129
z = rolling => log::warn!("rollup task ended: {z:?}"),
+5
-2
ufos/src/storage_fjall.rs
Reviewed
···
862
862
async fn run(mut self) -> StorageResult<()> {
863
863
let mut dirty_nsids = HashSet::new();
864
864
865
865
-
let mut rollup = tokio::time::interval(Duration::from_millis(240));
865
865
+
let mut rollup = tokio::time::interval(Duration::from_millis(81));
866
866
rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
867
867
868
868
-
let mut trim = tokio::time::interval(Duration::from_millis(3_000));
868
868
+
let mut trim = tokio::time::interval(Duration::from_millis(6_000));
869
869
trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
870
870
871
871
loop {
872
872
tokio::select! {
873
873
_ = rollup.tick() => {
874
874
let (n, dirty) = self.0.step_rollup().inspect_err(|e| log::error!("rollup error: {e:?}"))?;
875
875
+
if n == 0 {
876
876
+
rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break
877
877
+
}
875
878
dirty_nsids.extend(dirty);
876
879
log::info!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len());
877
880
},
+1
-1
ufos/src/store_types.rs
Reviewed
···
42
42
Self::from_pair(Default::default(), collection)
43
43
}
44
44
}
45
45
-
pub type TrimCollectionCursorValue = Cursor;
45
45
+
pub type TrimCollectionCursorVal = Cursor;
46
46
47
47
/// key format: ["js_endpoint"]
48
48
#[derive(Debug, PartialEq)]