alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
simplify cursor status updates
author
phil
date
1 year ago
(May 25, 2025, 11:15 AM -0400)
commit
7693c53e
7693c53e5816e162425b66a449077647fee831e5
parent
2abb7fae
2abb7faecba075ff48225ec57c67600d1a00cc1e
+54
-82
2 changed files
Expand all
Collapse all
Unified
Split
ufos
src
consumer.rs
main.rs
+1
-1
ufos/src/consumer.rs
Reviewed
···
79
79
batch_sender: Sender<LimitedBatch>,
80
80
sketch_secret: SketchSecretPrefix,
81
81
) -> Self {
82
82
-
let mut rate_limit = tokio::time::interval(std::time::Duration::from_micros(3_900));
82
82
+
let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(5));
83
83
rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
84
84
Self {
85
85
jetstream_receiver,
+53
-81
ufos/src/main.rs
Reviewed
···
142
142
let rolling = write_store.background_tasks(reroll)?.run(backfill);
143
143
let storing = write_store.receive_batches(batches);
144
144
145
145
-
let stating = do_update_stuff(read_store, backfill);
145
145
+
let stating = do_update_stuff(read_store);
146
146
147
147
tokio::select! {
148
148
z = serving => log::warn!("serve task ended: {z:?}"),
···
156
156
Ok(())
157
157
}
158
158
159
159
-
async fn do_update_stuff(read_store: impl StoreReader, actually: bool) {
160
160
-
if !actually {
161
161
-
loop {
162
162
-
tokio::time::sleep(std::time::Duration::from_secs_f64(4.)).await;
163
163
-
}
164
164
-
}
159
159
+
async fn do_update_stuff(read_store: impl StoreReader) {
165
160
let started_at = std::time::SystemTime::now();
166
161
let mut first_cursor = None;
167
162
let mut first_rollup = None;
···
201
196
}
202
197
}
203
198
204
204
-
fn nice_duration(dt: Duration) -> String {
205
205
-
let secs = dt.as_secs_f64();
206
206
-
if secs < 1. {
207
207
-
return format!("{:.0}ms", secs * 1000.);
208
208
-
}
209
209
-
if secs < 60. {
210
210
-
return format!("{secs:.02}s");
211
211
-
}
212
212
-
let mins = (secs / 60.).floor();
213
213
-
let rsecs = secs - (mins * 60.);
214
214
-
if mins < 60. {
215
215
-
return format!("{mins:.0}m{rsecs:.0}s");
216
216
-
}
217
217
-
let hrs = (mins / 60.).floor();
218
218
-
let rmins = mins - (hrs * 60.);
219
219
-
if hrs < 24. {
220
220
-
return format!("{hrs:.0}h{rmins:.0}m{rsecs:.0}s");
221
221
-
}
222
222
-
let days = (hrs / 24.).floor();
223
223
-
let rhrs = hrs - (days * 24.);
224
224
-
format!("{days:.0}d{rhrs:.0}h{rmins:.0}m{rsecs:.0}s")
225
225
-
}
226
226
-
227
227
-
fn nice_dt_two_maybes(earlier: Option<Cursor>, later: Option<Cursor>) -> String {
228
228
-
match (earlier, later) {
229
229
-
(Some(earlier), Some(later)) => match later.duration_since(&earlier) {
230
230
-
Ok(dt) => nice_duration(dt),
231
231
-
Err(e) => {
232
232
-
let rev_dt = e.duration();
233
233
-
format!("+{}", nice_duration(rev_dt))
234
234
-
}
235
235
-
},
236
236
-
_ => "unknown".to_string(),
237
237
-
}
238
238
-
}
239
239
-
240
199
#[allow(clippy::too_many_arguments)]
241
200
fn backfill_info(
242
201
latest_cursor: Option<Cursor>,
···
249
208
started_at: SystemTime,
250
209
now: SystemTime,
251
210
) {
211
211
+
let nice_duration = |dt: Duration| {
212
212
+
let secs = dt.as_secs_f64();
213
213
+
if secs < 1. {
214
214
+
return format!("{:.0}ms", secs * 1000.);
215
215
+
}
216
216
+
if secs < 60. {
217
217
+
return format!("{secs:.02}s");
218
218
+
}
219
219
+
let mins = (secs / 60.).floor();
220
220
+
let rsecs = secs - (mins * 60.);
221
221
+
if mins < 60. {
222
222
+
return format!("{mins:.0}m{rsecs:.0}s");
223
223
+
}
224
224
+
let hrs = (mins / 60.).floor();
225
225
+
let rmins = mins - (hrs * 60.);
226
226
+
if hrs < 24. {
227
227
+
return format!("{hrs:.0}h{rmins:.0}m{rsecs:.0}s");
228
228
+
}
229
229
+
let days = (hrs / 24.).floor();
230
230
+
let rhrs = hrs - (days * 24.);
231
231
+
format!("{days:.0}d{rhrs:.0}h{rmins:.0}m{rsecs:.0}s")
232
232
+
};
233
233
+
234
234
+
let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
235
235
+
{
236
236
+
(Some(earlier), Some(later)) => match later.duration_since(&earlier) {
237
237
+
Ok(dt) => nice_duration(dt),
238
238
+
Err(e) => {
239
239
+
let rev_dt = e.duration();
240
240
+
format!("+{}", nice_duration(rev_dt))
241
241
+
}
242
242
+
},
243
243
+
_ => "unknown".to_string(),
244
244
+
};
245
245
+
246
246
+
let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| {
247
247
+
mlatest
248
248
+
.zip(msince)
249
249
+
.map(|(latest, since)| {
250
250
+
latest
251
251
+
.duration_since(&since)
252
252
+
.unwrap_or(Duration::from_millis(1))
253
253
+
})
254
254
+
.map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64()))
255
255
+
.unwrap_or("??".into())
256
256
+
};
257
257
+
252
258
let dt_real = now
253
259
.duration_since(last_at)
254
260
.unwrap_or(Duration::from_millis(1));
···
257
263
.duration_since(started_at)
258
264
.unwrap_or(Duration::from_millis(1));
259
265
260
260
-
let cursor_rate = latest_cursor
261
261
-
.zip(last_cursor)
262
262
-
.map(|(latest, last)| {
263
263
-
latest
264
264
-
.duration_since(&last)
265
265
-
.unwrap_or(Duration::from_millis(1))
266
266
-
})
267
267
-
.map(|dtc| format!("{:.2}", dtc.as_secs_f64() / dt_real.as_secs_f64()))
268
268
-
.unwrap_or("??".into());
266
266
+
let cursor_rate = rate(latest_cursor, last_cursor, dt_real);
267
267
+
let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total);
269
268
270
270
-
let cursor_avg = latest_cursor
271
271
-
.zip(first_cursor)
272
272
-
.map(|(latest, first)| {
273
273
-
latest
274
274
-
.duration_since(&first)
275
275
-
.unwrap_or(Duration::from_millis(1))
276
276
-
})
277
277
-
.map(|dtc| format!("{:.2}", dtc.as_secs_f64() / dt_real_total.as_secs_f64()))
278
278
-
.unwrap_or("??".into());
279
279
-
280
280
-
let rollup_rate = rollup_cursor
281
281
-
.zip(last_rollup)
282
282
-
.map(|(latest, last)| {
283
283
-
latest
284
284
-
.duration_since(&last)
285
285
-
.unwrap_or(Duration::from_millis(1))
286
286
-
})
287
287
-
.map(|dtc| format!("{:.2}", dtc.as_secs_f64() / dt_real.as_secs_f64()))
288
288
-
.unwrap_or("??".into());
289
289
-
290
290
-
let rollup_avg = rollup_cursor
291
291
-
.zip(first_rollup)
292
292
-
.map(|(latest, first)| {
293
293
-
latest
294
294
-
.duration_since(&first)
295
295
-
.unwrap_or(Duration::from_millis(1))
296
296
-
})
297
297
-
.map(|dtc| format!("{:.2}", dtc.as_secs_f64() / dt_real_total.as_secs_f64()))
298
298
-
.unwrap_or("??".into());
269
269
+
let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
270
270
+
let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
299
271
300
272
log::info!(
301
273
"cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",