Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 16 kB View raw
1use anyhow::{bail, Result}; 2use clap::{Parser, ValueEnum}; 3use metrics::{describe_counter, describe_gauge, describe_histogram, Unit}; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use std::net::SocketAddr; 6use std::num::NonZero; 7use std::path::PathBuf; 8use std::sync::{atomic::AtomicU32, Arc}; 9use std::thread; 10use std::time; 11use tokio::runtime; 12use tokio_util::sync::CancellationToken; 13 14use constellation::consumer::consume; 15use constellation::server::serve; 16#[cfg(feature = "rocks")] 17use constellation::storage::RocksStorage; 18use constellation::storage::{LinkReader, LinkStorage, MemStorage, StorageStats}; 19 20const MONITOR_INTERVAL: time::Duration = time::Duration::from_secs(15); 21 22/// Aggregate links in the at-mosphere 23#[derive(Parser, Debug)] 24#[command(version, about, long_about = None)] 25struct Args { 26 /// constellation server's listen address 27 #[arg(long)] 28 #[clap(default_value = "0.0.0.0:6789")] 29 bind: SocketAddr, 30 /// enable metrics collection and serving 31 #[arg(long, action)] 32 collect_metrics: bool, 33 /// metrics server's listen address 34 #[arg(long, requires("collect_metrics"))] 35 #[clap(default_value = "0.0.0.0:8765")] 36 bind_metrics: SocketAddr, 37 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 38 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 39 #[arg(short, long)] 40 jetstream: String, 41 // TODO: make this part of rocks' own sub-config? 42 /// Where to store data on disk, for backends that use disk storage 43 #[arg(short, long)] 44 data: Option<PathBuf>, 45 /// Storage backend to use 46 #[arg(short, long)] 47 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 48 backend: StorageBackend, 49 /// Serve a did:web document for this domain 50 #[arg(long)] 51 did_web_domain: Option<String>, 52 /// Initiate a database backup into this dir, if supported by the storage 53 #[arg(long)] 54 backup: Option<PathBuf>, 55 /// Start a background task to take backups every N hours 56 #[arg(long)] 57 backup_interval: Option<u64>, 58 /// Keep at most this many backups purging oldest first, requires --backup-interval 59 #[arg(long)] 60 max_old_backups: Option<usize>, 61 /// Saved jsonl from jetstream to use instead of a live subscription 62 #[arg(short, long)] 63 fixture: Option<PathBuf>, 64 /// Don't change the database jetstream cursor when using a fixture 65 #[arg(long, requires("fixture"))] 66 fixture_preserve_cursor: bool, 67 /// fix the constellation start date (funny previous bug oops) 68 #[arg(long, action)] 69 reset_db_start: bool, 70 /// debugging: print the current jetstream cursor and exit 71 #[arg(long, action)] 72 print_cursor: bool, 73} 74 75#[derive(Debug, Clone, ValueEnum)] 76enum StorageBackend { 77 Memory, 78 #[cfg(feature = "rocks")] 79 Rocks, 80} 81 82fn jetstream_url(provided: &str) -> String { 83 match provided { 84 "us-east-1" => "wss://jetstream1.us-east.bsky.network/subscribe".into(), 85 "us-east-2" => "wss://jetstream2.us-east.bsky.network/subscribe".into(), 86 "us-west-1" => "wss://jetstream1.us-west.bsky.network/subscribe".into(), 87 "us-west-2" => "wss://jetstream2.us-west.bsky.network/subscribe".into(), 88 custom => custom.into(), 89 } 90} 91 92fn main() -> Result<()> { 93 let args = Args::parse(); 94 95 #[cfg(feature = "rocks")] 96 if args.print_cursor { 97 let storage_dir = args.data.clone().unwrap_or("rocks.test".into()); 98 let mut store = RocksStorage::open_readonly(storage_dir)?; 99 if let Some(cursor) = store.get_cursor()? { 100 println!("cursor: {cursor}"); 101 } else { 102 println!("[no cursor]"); 103 } 104 return Ok(()); 105 } 106 107 println!("starting with storage backend: {:?}...", args.backend); 108 109 let fixture = args.fixture; 110 let fixture_preserve_cursor = args.fixture_preserve_cursor; 111 if let Some(ref p) = fixture { 112 println!("using fixture at {p:?}, preserving cursor? {fixture_preserve_cursor:?}..."); 113 } 114 115 let stream = jetstream_url(&args.jetstream); 116 println!("using jetstream server {stream:?}...",); 117 118 let bind = args.bind; 119 let metrics_bind = args.bind_metrics; 120 121 let collect_metrics = args.collect_metrics; 122 let stay_alive = CancellationToken::new(); 123 124 match args.backend { 125 StorageBackend::Memory => run( 126 MemStorage::new(), 127 fixture, 128 fixture_preserve_cursor, 129 None, 130 args.did_web_domain, 131 stream, 132 bind, 133 metrics_bind, 134 collect_metrics, 135 stay_alive, 136 ), 137 #[cfg(feature = "rocks")] 138 StorageBackend::Rocks => { 139 let storage_dir = args.data.clone().unwrap_or("rocks.test".into()); 140 println!("starting rocksdb..."); 141 let mut rocks = RocksStorage::new(storage_dir)?; 142 if let Some(backup_dir) = args.backup { 143 let auto_backup = match (args.backup_interval, args.max_old_backups) { 144 (Some(interval_hrs), copies) => Some((interval_hrs, copies)), 145 (None, None) => None, 146 (None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"), 147 }; 148 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?; 149 } 150 println!("rocks ready."); 151 std::thread::scope(|s| { 152 if args.reset_db_start { 153 let res = rocks.reset_start(); 154 eprintln!("reset start finished: {res:?}"); 155 } 156 s.spawn(|| { 157 let r = run( 158 rocks, 159 fixture, 160 fixture_preserve_cursor, 161 args.data, 162 args.did_web_domain, 163 stream, 164 bind, 165 metrics_bind, 166 collect_metrics, 167 stay_alive, 168 ); 169 eprintln!("run finished: {r:?}"); 170 r 171 }); 172 }); 173 Ok(()) 174 } 175 } 176} 177 178#[allow(clippy::too_many_lines)] 179#[allow(clippy::too_many_arguments)] 180fn run( 181 mut storage: impl LinkStorage, 182 fixture: Option<PathBuf>, 183 fixture_preserve_cursor: bool, 184 data_dir: Option<PathBuf>, 185 did_web_domain: Option<String>, 186 stream: String, 187 bind: SocketAddr, 188 metrics_bind: SocketAddr, 189 collect_metrics: bool, 190 stay_alive: CancellationToken, 191) -> Result<()> { 192 ctrlc::set_handler({ 193 let mut desperation: u8 = 0; 194 let stay_alive = stay_alive.clone(); 195 move || match desperation { 196 0 => { 197 println!("ok, shutting down..."); 198 stay_alive.cancel(); 199 desperation += 1; 200 } 201 1.. => panic!("fine, panicking!"), 202 } 203 })?; 204 205 // Install metrics server only if requested 206 if collect_metrics { 207 install_metrics_server(metrics_bind)?; 208 } 209 210 let qsize = Arc::new(AtomicU32::new(0)); 211 212 thread::scope(|s| { 213 let readable = storage.to_readable(); 214 215 s.spawn({ 216 let qsize = qsize.clone(); 217 let stay_alive = stay_alive.clone(); 218 let staying_alive = stay_alive.clone(); 219 move || { 220 if let Err(e) = consume( 221 storage, 222 qsize, 223 fixture, 224 fixture_preserve_cursor, 225 stream, 226 staying_alive, 227 ) { 228 eprintln!("jetstream finished with error: {e}"); 229 } 230 stay_alive.drop_guard(); 231 } 232 }); 233 234 s.spawn({ 235 let readable = readable.clone(); 236 let stay_alive = stay_alive.clone(); 237 let staying_alive = stay_alive.clone(); 238 || { 239 runtime::Builder::new_multi_thread() 240 .worker_threads(1) 241 .max_blocking_threads(2) 242 .enable_all() 243 .build() 244 .expect("axum startup") 245 .block_on(serve(readable, bind, did_web_domain, staying_alive)) 246 .unwrap(); 247 stay_alive.drop_guard(); 248 } 249 }); 250 251 // only spawn monitoring thread if the metrics server is running 252 if collect_metrics { 253 s.spawn(move || { // monitor thread 254 let stay_alive = stay_alive.clone(); 255 let check_alive = stay_alive.clone(); 256 257 let process_collector = metrics_process::Collector::default(); 258 if let Some(ref p) = data_dir { 259 if let Err(e) = fs4::available_space(p) { 260 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 261 } else { 262 println!("disk space monitoring should work, watching at {p:?}"); 263 } 264 } 265 266 'monitor: loop { 267 match readable.get_stats() { 268 Ok(StorageStats { dids, targetables, linking_records, .. }) => { 269 metrics::gauge!("storage.stats.dids").set(dids as f64); 270 metrics::gauge!("storage.stats.targetables").set(targetables as f64); 271 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 272 } 273 Err(e) => eprintln!("failed to get stats: {e:?}"), 274 } 275 276 process_collector.collect(); 277 if let Some(ref p) = data_dir { 278 if let Ok(avail) = fs4::available_space(p) { 279 metrics::gauge!("storage.available").set(avail as f64); 280 } 281 if let Ok(free) = fs4::free_space(p) { 282 metrics::gauge!("storage.free").set(free as f64); 283 } 284 } 285 let wait = time::Instant::now(); 286 while wait.elapsed() < MONITOR_INTERVAL { 287 thread::sleep(time::Duration::from_millis(100)); 288 if check_alive.is_cancelled() { 289 break 'monitor 290 } 291 } 292 } 293 stay_alive.drop_guard(); 294 }); 295 } 296 }); 297 298 println!("byeeee"); 299 300 Ok(()) 301} 302 303fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> { 304 println!("installing metrics server..."); 305 #[expect( 306 deprecated, 307 reason = "would change counters to _total suffix, needs dash updates" 308 )] 309 PrometheusBuilder::new() 310 .idle_timeout( 311 metrics_util::MetricKindMask::ALL, 312 Some(time::Duration::from_secs(900)), // 15 min 313 ) 314 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 315 .set_bucket_duration(time::Duration::from_secs(30))? 316 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here. 317 .set_enable_unit_suffix(true) 318 .with_http_listener(metrics_bind) 319 .install()?; 320 describe_metrics(); 321 println!("metrics server installed! listening at {metrics_bind:?}"); 322 Ok(()) 323} 324 325fn describe_metrics() { 326 metrics_process::Collector::default().describe(); 327 describe_gauge!( 328 "storage_available", 329 Unit::Bytes, 330 "available to be allocated" 331 ); 332 describe_gauge!("storage_free", Unit::Bytes, "unused bytes in filesystem"); 333 describe_counter!( 334 "jetstream_connnect", 335 Unit::Count, 336 "attempts to connect to a jetstream server" 337 ); 338 describe_counter!( 339 "jetstream_read", 340 Unit::Count, 341 "attempts to read an event from jetstream" 342 ); 343 describe_counter!( 344 "jetstream_read_fail", 345 Unit::Count, 346 "failures to read events from jetstream" 347 ); 348 describe_counter!( 349 "jetstream_read_bytes", 350 Unit::Bytes, 351 "total received message bytes from jetstream" 352 ); 353 describe_counter!( 354 "jetstream_read_bytes_decompressed", 355 Unit::Bytes, 356 "total decompressed message bytes from jetstream" 357 ); 358 describe_histogram!( 359 "jetstream_read_bytes_decompressed", 360 Unit::Bytes, 361 "decompressed size of jetstream messages" 362 ); 363 describe_counter!( 364 "jetstream_events", 365 Unit::Count, 366 "valid json messages received" 367 ); 368 describe_histogram!( 369 "jetstream_events_queued", 370 Unit::Count, 371 "event messages waiting in queue" 372 ); 373 describe_gauge!( 374 "jetstream_cursor_age", 375 Unit::Microseconds, 376 "microseconds between our clock and the jetstream event's time_us" 377 ); 378 describe_counter!( 379 "consumer_events_non_actionable", 380 Unit::Count, 381 "count of non-actionable events" 382 ); 383 describe_counter!( 384 "consumer_events_actionable", 385 Unit::Count, 386 "count of action by type. *all* atproto record delete events are included" 387 ); 388 describe_counter!( 389 "consumer_events_actionable_links", 390 Unit::Count, 391 "total links encountered" 392 ); 393 describe_histogram!( 394 "consumer_events_actionable_links", 395 Unit::Count, 396 "number of links per message" 397 ); 398 #[cfg(feature = "rocks")] 399 { 400 describe_histogram!( 401 "storage_rocksdb_read_seconds", 402 Unit::Seconds, 403 "duration of the read stage of actions" 404 ); 405 describe_histogram!( 406 "storage_rocksdb_action_seconds", 407 Unit::Seconds, 408 "duration of read + write of actions" 409 ); 410 describe_counter!( 411 "storage_rocksdb_batch_ops_total", 412 Unit::Count, 413 "total batched operations from actions" 414 ); 415 describe_histogram!( 416 "storage_rocksdb_delete_account_ops", 417 Unit::Count, 418 "total batched ops for account deletions" 419 ); 420 } 421} 422 423#[cfg(test)] 424mod tests { 425 use constellation::consumer::get_actionable; 426 use constellation::storage::{LinkReader, LinkStorage, MemStorage}; 427 428 #[test] 429 fn test_create_like_integrated() { 430 let mut storage = MemStorage::new(); 431 432 let rec = r#"{ 433 "did":"did:plc:icprmty6ticzracr5urz4uum", 434 "time_us":1736448492661668, 435 "kind":"commit", 436 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{ 437 "$type":"app.bsky.feed.like", 438 "createdAt":"2025-01-09T18:48:10.412Z", 439 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"} 440 }, 441 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"} 442 }"#.parse().unwrap(); 443 let (action, ts) = get_actionable(&rec).unwrap(); 444 storage.push(&action, ts).unwrap(); 445 assert_eq!( 446 storage 447 .get_count( 448 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23", 449 "app.bsky.feed.like", 450 ".subject.uri" 451 ) 452 .unwrap(), 453 1 454 ); 455 } 456}