Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use clap::Parser;
2use jetstream::events::Cursor;
3use metrics::{describe_gauge, gauge, Unit};
4use metrics_exporter_prometheus::PrometheusBuilder;
5use std::path::PathBuf;
6use std::time::{Duration, SystemTime};
7use tokio::task::JoinSet;
8use ufos::consumer;
9use ufos::file_consumer;
10use ufos::server;
11use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
12use ufos::storage_fjall::FjallStorage;
13use ufos::store_types::SketchSecretPrefix;
14use ufos::{nice_duration, ConsumerInfo};
15
16#[cfg(not(target_env = "msvc"))]
17use tikv_jemallocator::Jemalloc;
18
19#[cfg(not(target_env = "msvc"))]
20#[global_allocator]
21static GLOBAL: Jemalloc = Jemalloc;
22
23/// Aggregate links in the at-mosphere
24#[derive(Parser, Debug, Clone)]
25#[command(version, about, long_about = None)]
26struct Args {
27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
29 #[arg(long, env = "UFOS_JETSTREAM")]
30 jetstream: String,
31 /// allow changing jetstream endpoints
32 #[arg(long, action, env = "UFOS_JETSTREAM_FORCE")]
33 jetstream_force: bool,
34 /// don't request zstd-compressed jetstream events
35 ///
36 /// reduces CPU at the expense of more ingress bandwidth
37 #[arg(long, action, env = "UFOS_JETSTREAM_NO_ZSTD")]
38 jetstream_no_zstd: bool,
39 /// ufos server's listen address
40 #[arg(long, env = "UFOS_BIND")]
41 #[clap(default_value = "0.0.0.0:9990")]
42 bind: std::net::SocketAddr,
43 /// Location to store persist data to disk
44 #[arg(long, env = "UFOS_DATA")]
45 data: PathBuf,
46 /// DEBUG: don't start the jetstream consumer or its write loop
47 #[arg(long, action, env = "UFOS_PAUSE_WRITER")]
48 pause_writer: bool,
49 /// Adjust runtime settings like background task intervals for efficient backfill
50 #[arg(long, action, env = "UFOS_BACKFILL_MODE")]
51 backfill: bool,
52 /// DEBUG: force the rw loop to fall behind by pausing it
53 /// todo: restore this
54 #[arg(long, action)]
55 pause_rw: bool,
56 /// reset the rollup cursor, scrape through missed things in the past (backfill)
57 #[arg(long, action, env = "UFOS_REROLL")]
58 reroll: bool,
59 /// DEBUG: interpret jetstream as a file fixture
60 #[arg(long, action, env = "UFOS_JETSTREAM_FIXTURE")]
61 jetstream_fixture: bool,
62 /// enable metrics collection and serving
63 #[arg(long, action, env = "UFOS_COLLECT_METRICS")]
64 collect_metrics: bool,
65 /// metrics server's listen address
66 #[arg(long, env = "UFOS_BIND_METRICS")]
67 #[clap(default_value = "0.0.0.0:8765")]
68 bind_metrics: std::net::SocketAddr,
69}
70
71#[tokio::main]
72async fn main() -> anyhow::Result<()> {
73 env_logger::init();
74
75 let args = Args::parse();
76 let jetstream = args.jetstream.clone();
77 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
78 args.data.clone(),
79 jetstream,
80 args.jetstream_force,
81 Default::default(),
82 )?;
83 go(args, read_store, write_store, cursor, sketch_secret).await?;
84 Ok(())
85}
86
87async fn go<B: StoreBackground + 'static>(
88 args: Args,
89 read_store: impl StoreReader + 'static + Clone,
90 mut write_store: impl StoreWriter<B> + 'static,
91 cursor: Option<Cursor>,
92 sketch_secret: SketchSecretPrefix,
93) -> anyhow::Result<()> {
94 let mut whatever_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
95 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
96
97 println!("starting server with storage...");
98 let serving = server::serve(read_store.clone(), args.bind);
99 whatever_tasks.spawn(async move {
100 serving.await.map_err(|e| {
101 log::warn!("server ended: {e}");
102 anyhow::anyhow!(e)
103 })
104 });
105
106 if args.pause_writer {
107 log::info!("not starting jetstream or the write loop.");
108 for t in whatever_tasks.join_all().await {
109 if let Err(e) = t {
110 return Err(anyhow::anyhow!(e));
111 }
112 }
113 return Ok(());
114 }
115
116 let batches = if args.jetstream_fixture {
117 log::info!("starting with jestream file fixture: {:?}", args.jetstream);
118 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await?
119 } else {
120 log::info!(
121 "starting consumer with cursor: {cursor:?} from {:?} ago",
122 cursor.map(|c| c.elapsed())
123 );
124 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await?
125 };
126
127 let rolling = write_store
128 .background_tasks(args.reroll)?
129 .run(args.backfill);
130 whatever_tasks.spawn(async move {
131 rolling
132 .await
133 .inspect_err(|e| log::warn!("rollup ended: {e}"))?;
134 Ok(())
135 });
136
137 consumer_tasks.spawn(async move {
138 write_store
139 .receive_batches(batches)
140 .await
141 .inspect_err(|e| log::warn!("consumer ended: {e}"))?;
142 Ok(())
143 });
144
145 whatever_tasks.spawn(async move {
146 do_update_stuff(read_store).await;
147 log::warn!("status task ended");
148 Ok(())
149 });
150
151 if args.collect_metrics {
152 log::trace!("installing metrics server...");
153 install_metrics_server(args.bind_metrics)?;
154 }
155
156 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() {
157 log::warn!("task {i} done: {t:?}");
158 }
159
160 println!("consumer tasks all completed, killing the others");
161 whatever_tasks.shutdown().await;
162
163 println!("bye!");
164
165 Ok(())
166}
167
168fn install_metrics_server(bind: std::net::SocketAddr) -> anyhow::Result<()> {
169 log::info!("installing metrics server...");
170 PrometheusBuilder::new()
171 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
172 .set_bucket_duration(Duration::from_secs(60))?
173 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here.
174 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
175 .with_http_listener(bind)
176 .install()?;
177 log::info!("metrics server installed! listening on {bind}");
178 Ok(())
179}
180
181async fn do_update_stuff(read_store: impl StoreReader) {
182 describe_gauge!(
183 "persisted_cursor_age",
184 Unit::Microseconds,
185 "microseconds between our clock and the latest persisted event's cursor"
186 );
187 describe_gauge!(
188 "rollup_cursor_age",
189 Unit::Microseconds,
190 "microseconds between our clock and the latest rollup cursor"
191 );
192 let started_at = std::time::SystemTime::now();
193 let mut first_cursor = None;
194 let mut first_rollup = None;
195 let mut last_at = std::time::SystemTime::now();
196 let mut last_cursor = None;
197 let mut last_rollup = None;
198 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4));
199 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
200 loop {
201 interval.tick().await;
202 read_store.update_metrics();
203 match read_store.get_consumer_info().await {
204 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"),
205 Ok(ConsumerInfo::Jetstream {
206 latest_cursor,
207 rollup_cursor,
208 ..
209 }) => {
210 let now = std::time::SystemTime::now();
211 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64);
212 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64);
213 backfill_info(
214 latest_cursor,
215 rollup_cursor,
216 last_cursor,
217 last_rollup,
218 last_at,
219 first_cursor,
220 first_rollup,
221 started_at,
222 now,
223 );
224 first_cursor = first_cursor.or(latest_cursor);
225 first_rollup = first_rollup.or(rollup_cursor);
226 last_cursor = latest_cursor;
227 last_rollup = rollup_cursor;
228 last_at = now;
229 }
230 }
231 }
232}
233
234#[allow(clippy::too_many_arguments)]
235fn backfill_info(
236 latest_cursor: Option<Cursor>,
237 rollup_cursor: Option<Cursor>,
238 last_cursor: Option<Cursor>,
239 last_rollup: Option<Cursor>,
240 last_at: SystemTime,
241 first_cursor: Option<Cursor>,
242 first_rollup: Option<Cursor>,
243 started_at: SystemTime,
244 now: SystemTime,
245) {
246 if let Some(cursor) = latest_cursor {
247 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64());
248 }
249 if let Some(cursor) = rollup_cursor {
250 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64());
251 }
252
253 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
254 {
255 (Some(earlier), Some(later)) => match later.duration_since(&earlier) {
256 Ok(dt) => nice_duration(dt),
257 Err(e) => {
258 let rev_dt = e.duration();
259 format!("+{}", nice_duration(rev_dt))
260 }
261 },
262 _ => "unknown".to_string(),
263 };
264
265 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| {
266 mlatest
267 .zip(msince)
268 .map(|(latest, since)| {
269 latest
270 .duration_since(&since)
271 .unwrap_or(Duration::from_millis(1))
272 })
273 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64()))
274 .unwrap_or("??".into())
275 };
276
277 let dt_real = now
278 .duration_since(last_at)
279 .unwrap_or(Duration::from_millis(1));
280
281 let dt_real_total = now
282 .duration_since(started_at)
283 .unwrap_or(Duration::from_millis(1));
284
285 let cursor_rate = rate(latest_cursor, last_cursor, dt_real);
286 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total);
287
288 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
289 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
290
291 log::trace!(
292 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",
293 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
294 nice_dt_two_maybes(last_cursor, latest_cursor),
295 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
296 nice_dt_two_maybes(last_rollup, rollup_cursor),
297 );
298}