Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use anyhow::{bail, Result};
2use clap::{Parser, ValueEnum};
3use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};
4use metrics_exporter_prometheus::PrometheusBuilder;
5use std::net::SocketAddr;
6use std::num::NonZero;
7use std::path::PathBuf;
8use std::sync::{atomic::AtomicU32, Arc};
9use std::thread;
10use std::time;
11use tokio::runtime;
12use tokio_util::sync::CancellationToken;
13
14use constellation::consumer::consume;
15use constellation::server::serve;
16#[cfg(feature = "rocks")]
17use constellation::storage::RocksStorage;
18use constellation::storage::{LinkReader, LinkStorage, MemStorage, StorageStats};
19
20const MONITOR_INTERVAL: time::Duration = time::Duration::from_secs(15);
21
22/// Aggregate links in the at-mosphere
23#[derive(Parser, Debug)]
24#[command(version, about, long_about = None)]
25struct Args {
26 /// constellation server's listen address
27 #[arg(long)]
28 #[clap(default_value = "0.0.0.0:6789")]
29 bind: SocketAddr,
30 /// enable metrics collection and serving
31 #[arg(long, action)]
32 collect_metrics: bool,
33 /// metrics server's listen address
34 #[arg(long, requires("collect_metrics"))]
35 #[clap(default_value = "0.0.0.0:8765")]
36 bind_metrics: SocketAddr,
37 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
38 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
39 #[arg(short, long)]
40 jetstream: String,
41 // TODO: make this part of rocks' own sub-config?
42 /// Where to store data on disk, for backends that use disk storage
43 #[arg(short, long)]
44 data: Option<PathBuf>,
45 /// Storage backend to use
46 #[arg(short, long)]
47 #[clap(value_enum, default_value_t = StorageBackend::Memory)]
48 backend: StorageBackend,
49 /// Serve a did:web document for this domain
50 #[arg(long)]
51 did_web_domain: Option<String>,
52 /// Initiate a database backup into this dir, if supported by the storage
53 #[arg(long)]
54 backup: Option<PathBuf>,
55 /// Start a background task to take backups every N hours
56 #[arg(long)]
57 backup_interval: Option<u64>,
58 /// Keep at most this many backups purging oldest first, requires --backup-interval
59 #[arg(long)]
60 max_old_backups: Option<usize>,
61 /// Saved jsonl from jetstream to use instead of a live subscription
62 #[arg(short, long)]
63 fixture: Option<PathBuf>,
64 /// Don't change the database jetstream cursor when using a fixture
65 #[arg(long, requires("fixture"))]
66 fixture_preserve_cursor: bool,
67 /// fix the constellation start date (funny previous bug oops)
68 #[arg(long, action)]
69 reset_db_start: bool,
70 /// debugging: print the current jetstream cursor and exit
71 #[arg(long, action)]
72 print_cursor: bool,
73}
74
75#[derive(Debug, Clone, ValueEnum)]
76enum StorageBackend {
77 Memory,
78 #[cfg(feature = "rocks")]
79 Rocks,
80}
81
82fn jetstream_url(provided: &str) -> String {
83 match provided {
84 "us-east-1" => "wss://jetstream1.us-east.bsky.network/subscribe".into(),
85 "us-east-2" => "wss://jetstream2.us-east.bsky.network/subscribe".into(),
86 "us-west-1" => "wss://jetstream1.us-west.bsky.network/subscribe".into(),
87 "us-west-2" => "wss://jetstream2.us-west.bsky.network/subscribe".into(),
88 custom => custom.into(),
89 }
90}
91
92fn main() -> Result<()> {
93 let args = Args::parse();
94
95 #[cfg(feature = "rocks")]
96 if args.print_cursor {
97 let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
98 let mut store = RocksStorage::open_readonly(storage_dir)?;
99 if let Some(cursor) = store.get_cursor()? {
100 println!("cursor: {cursor}");
101 } else {
102 println!("[no cursor]");
103 }
104 return Ok(());
105 }
106
107 println!("starting with storage backend: {:?}...", args.backend);
108
109 let fixture = args.fixture;
110 let fixture_preserve_cursor = args.fixture_preserve_cursor;
111 if let Some(ref p) = fixture {
112 println!("using fixture at {p:?}, preserving cursor? {fixture_preserve_cursor:?}...");
113 }
114
115 let stream = jetstream_url(&args.jetstream);
116 println!("using jetstream server {stream:?}...",);
117
118 let bind = args.bind;
119 let metrics_bind = args.bind_metrics;
120
121 let collect_metrics = args.collect_metrics;
122 let stay_alive = CancellationToken::new();
123
124 match args.backend {
125 StorageBackend::Memory => run(
126 MemStorage::new(),
127 fixture,
128 fixture_preserve_cursor,
129 None,
130 args.did_web_domain,
131 stream,
132 bind,
133 metrics_bind,
134 collect_metrics,
135 stay_alive,
136 ),
137 #[cfg(feature = "rocks")]
138 StorageBackend::Rocks => {
139 let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
140 println!("starting rocksdb...");
141 let mut rocks = RocksStorage::new(storage_dir)?;
142 if let Some(backup_dir) = args.backup {
143 let auto_backup = match (args.backup_interval, args.max_old_backups) {
144 (Some(interval_hrs), copies) => Some((interval_hrs, copies)),
145 (None, None) => None,
146 (None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"),
147 };
148 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
149 }
150 println!("rocks ready.");
151 std::thread::scope(|s| {
152 if args.reset_db_start {
153 let res = rocks.reset_start();
154 eprintln!("reset start finished: {res:?}");
155 }
156 s.spawn(|| {
157 let r = run(
158 rocks,
159 fixture,
160 fixture_preserve_cursor,
161 args.data,
162 args.did_web_domain,
163 stream,
164 bind,
165 metrics_bind,
166 collect_metrics,
167 stay_alive,
168 );
169 eprintln!("run finished: {r:?}");
170 r
171 });
172 });
173 Ok(())
174 }
175 }
176}
177
178#[allow(clippy::too_many_lines)]
179#[allow(clippy::too_many_arguments)]
180fn run(
181 mut storage: impl LinkStorage,
182 fixture: Option<PathBuf>,
183 fixture_preserve_cursor: bool,
184 data_dir: Option<PathBuf>,
185 did_web_domain: Option<String>,
186 stream: String,
187 bind: SocketAddr,
188 metrics_bind: SocketAddr,
189 collect_metrics: bool,
190 stay_alive: CancellationToken,
191) -> Result<()> {
192 ctrlc::set_handler({
193 let mut desperation: u8 = 0;
194 let stay_alive = stay_alive.clone();
195 move || match desperation {
196 0 => {
197 println!("ok, shutting down...");
198 stay_alive.cancel();
199 desperation += 1;
200 }
201 1.. => panic!("fine, panicking!"),
202 }
203 })?;
204
205 // Install metrics server only if requested
206 if collect_metrics {
207 install_metrics_server(metrics_bind)?;
208 }
209
210 let qsize = Arc::new(AtomicU32::new(0));
211
212 thread::scope(|s| {
213 let readable = storage.to_readable();
214
215 s.spawn({
216 let qsize = qsize.clone();
217 let stay_alive = stay_alive.clone();
218 let staying_alive = stay_alive.clone();
219 move || {
220 if let Err(e) = consume(
221 storage,
222 qsize,
223 fixture,
224 fixture_preserve_cursor,
225 stream,
226 staying_alive,
227 ) {
228 eprintln!("jetstream finished with error: {e}");
229 }
230 stay_alive.drop_guard();
231 }
232 });
233
234 s.spawn({
235 let readable = readable.clone();
236 let stay_alive = stay_alive.clone();
237 let staying_alive = stay_alive.clone();
238 || {
239 runtime::Builder::new_multi_thread()
240 .worker_threads(1)
241 .max_blocking_threads(2)
242 .enable_all()
243 .build()
244 .expect("axum startup")
245 .block_on(serve(readable, bind, did_web_domain, staying_alive))
246 .unwrap();
247 stay_alive.drop_guard();
248 }
249 });
250
251 // only spawn monitoring thread if the metrics server is running
252 if collect_metrics {
253 s.spawn(move || { // monitor thread
254 let stay_alive = stay_alive.clone();
255 let check_alive = stay_alive.clone();
256
257 let process_collector = metrics_process::Collector::default();
258 if let Some(ref p) = data_dir {
259 if let Err(e) = fs4::available_space(p) {
260 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}");
261 } else {
262 println!("disk space monitoring should work, watching at {p:?}");
263 }
264 }
265
266 'monitor: loop {
267 match readable.get_stats() {
268 Ok(StorageStats { dids, targetables, linking_records, .. }) => {
269 metrics::gauge!("storage.stats.dids").set(dids as f64);
270 metrics::gauge!("storage.stats.targetables").set(targetables as f64);
271 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
272 }
273 Err(e) => eprintln!("failed to get stats: {e:?}"),
274 }
275
276 process_collector.collect();
277 if let Some(ref p) = data_dir {
278 if let Ok(avail) = fs4::available_space(p) {
279 metrics::gauge!("storage.available").set(avail as f64);
280 }
281 if let Ok(free) = fs4::free_space(p) {
282 metrics::gauge!("storage.free").set(free as f64);
283 }
284 }
285 let wait = time::Instant::now();
286 while wait.elapsed() < MONITOR_INTERVAL {
287 thread::sleep(time::Duration::from_millis(100));
288 if check_alive.is_cancelled() {
289 break 'monitor
290 }
291 }
292 }
293 stay_alive.drop_guard();
294 });
295 }
296 });
297
298 println!("byeeee");
299
300 Ok(())
301}
302
303fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> {
304 println!("installing metrics server...");
305 #[expect(
306 deprecated,
307 reason = "would change counters to _total suffix, needs dash updates"
308 )]
309 PrometheusBuilder::new()
310 .idle_timeout(
311 metrics_util::MetricKindMask::ALL,
312 Some(time::Duration::from_secs(900)), // 15 min
313 )
314 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
315 .set_bucket_duration(time::Duration::from_secs(30))?
316 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
317 .set_enable_unit_suffix(true)
318 .with_http_listener(metrics_bind)
319 .install()?;
320 describe_metrics();
321 println!("metrics server installed! listening at {metrics_bind:?}");
322 Ok(())
323}
324
325fn describe_metrics() {
326 metrics_process::Collector::default().describe();
327 describe_gauge!(
328 "storage_available",
329 Unit::Bytes,
330 "available to be allocated"
331 );
332 describe_gauge!("storage_free", Unit::Bytes, "unused bytes in filesystem");
333 describe_counter!(
334 "jetstream_connnect",
335 Unit::Count,
336 "attempts to connect to a jetstream server"
337 );
338 describe_counter!(
339 "jetstream_read",
340 Unit::Count,
341 "attempts to read an event from jetstream"
342 );
343 describe_counter!(
344 "jetstream_read_fail",
345 Unit::Count,
346 "failures to read events from jetstream"
347 );
348 describe_counter!(
349 "jetstream_read_bytes",
350 Unit::Bytes,
351 "total received message bytes from jetstream"
352 );
353 describe_counter!(
354 "jetstream_read_bytes_decompressed",
355 Unit::Bytes,
356 "total decompressed message bytes from jetstream"
357 );
358 describe_histogram!(
359 "jetstream_read_bytes_decompressed",
360 Unit::Bytes,
361 "decompressed size of jetstream messages"
362 );
363 describe_counter!(
364 "jetstream_events",
365 Unit::Count,
366 "valid json messages received"
367 );
368 describe_histogram!(
369 "jetstream_events_queued",
370 Unit::Count,
371 "event messages waiting in queue"
372 );
373 describe_gauge!(
374 "jetstream_cursor_age",
375 Unit::Microseconds,
376 "microseconds between our clock and the jetstream event's time_us"
377 );
378 describe_counter!(
379 "consumer_events_non_actionable",
380 Unit::Count,
381 "count of non-actionable events"
382 );
383 describe_counter!(
384 "consumer_events_actionable",
385 Unit::Count,
386 "count of action by type. *all* atproto record delete events are included"
387 );
388 describe_counter!(
389 "consumer_events_actionable_links",
390 Unit::Count,
391 "total links encountered"
392 );
393 describe_histogram!(
394 "consumer_events_actionable_links",
395 Unit::Count,
396 "number of links per message"
397 );
398 #[cfg(feature = "rocks")]
399 {
400 describe_histogram!(
401 "storage_rocksdb_read_seconds",
402 Unit::Seconds,
403 "duration of the read stage of actions"
404 );
405 describe_histogram!(
406 "storage_rocksdb_action_seconds",
407 Unit::Seconds,
408 "duration of read + write of actions"
409 );
410 describe_counter!(
411 "storage_rocksdb_batch_ops_total",
412 Unit::Count,
413 "total batched operations from actions"
414 );
415 describe_histogram!(
416 "storage_rocksdb_delete_account_ops",
417 Unit::Count,
418 "total batched ops for account deletions"
419 );
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use constellation::consumer::get_actionable;
426 use constellation::storage::{LinkReader, LinkStorage, MemStorage};
427
428 #[test]
429 fn test_create_like_integrated() {
430 let mut storage = MemStorage::new();
431
432 let rec = r#"{
433 "did":"did:plc:icprmty6ticzracr5urz4uum",
434 "time_us":1736448492661668,
435 "kind":"commit",
436 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{
437 "$type":"app.bsky.feed.like",
438 "createdAt":"2025-01-09T18:48:10.412Z",
439 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"}
440 },
441 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"}
442 }"#.parse().unwrap();
443 let (action, ts) = get_actionable(&rec).unwrap();
444 storage.push(&action, ts).unwrap();
445 assert_eq!(
446 storage
447 .get_count(
448 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23",
449 "app.bsky.feed.like",
450 ".subject.uri"
451 )
452 .unwrap(),
453 1
454 );
455 }
456}