Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

centralize metrics descriptions

maybe fixes flaky race in unit suffixes?

+106 -107
+103 -11
constellation/src/bin/main.rs
··· 1 1 use anyhow::{bail, Result}; 2 2 use clap::{Parser, ValueEnum}; 3 + use metrics::{describe_counter, describe_gauge, describe_histogram, Unit}; 3 4 use metrics_exporter_prometheus::PrometheusBuilder; 4 5 use std::net::SocketAddr; 5 6 use std::num::NonZero; ··· 244 245 245 246 let process_collector = metrics_process::Collector::default(); 246 247 process_collector.describe(); 247 - metrics::describe_gauge!( 248 - "storage_available", 249 - metrics::Unit::Bytes, 250 - "available to be allocated" 251 - ); 252 - metrics::describe_gauge!( 253 - "storage_free", 254 - metrics::Unit::Bytes, 255 - "unused bytes in filesystem" 256 - ); 257 248 if let Some(ref p) = data_dir { 258 249 if let Err(e) = fs4::available_space(p) { 259 250 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); ··· 301 292 302 293 fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> { 303 294 println!("installing metrics server..."); 304 - #[expect(deprecated, reason = "would change counters to _total suffix, needs dash updates")] 295 + #[expect( 296 + deprecated, 297 + reason = "would change counters to _total suffix, needs dash updates" 298 + )] 305 299 PrometheusBuilder::new() 306 300 .idle_timeout( 307 301 metrics_util::MetricKindMask::ALL, ··· 313 307 .set_enable_unit_suffix(true) 314 308 .with_http_listener(metrics_bind) 315 309 .install()?; 310 + describe_metrics(); 316 311 println!("metrics server installed! listening at {metrics_bind:?}"); 317 312 Ok(()) 313 + } 314 + 315 + fn describe_metrics() { 316 + describe_gauge!( 317 + "storage_available", 318 + Unit::Bytes, 319 + "available to be allocated" 320 + ); 321 + describe_gauge!("storage_free", Unit::Bytes, "unused bytes in filesystem"); 322 + describe_counter!( 323 + "jetstream_connnect", 324 + Unit::Count, 325 + "attempts to connect to a jetstream server" 326 + ); 327 + describe_counter!( 328 + "jetstream_read", 329 + Unit::Count, 330 + "attempts to read an event from jetstream" 331 + ); 332 + describe_counter!( 333 + "jetstream_read_fail", 334 + Unit::Count, 335 + "failures to read events from jetstream" 336 + ); 337 + describe_counter!( 338 + "jetstream_read_bytes", 339 + Unit::Bytes, 340 + "total received message bytes from jetstream" 341 + ); 342 + describe_counter!( 343 + "jetstream_read_bytes_decompressed", 344 + Unit::Bytes, 345 + "total decompressed message bytes from jetstream" 346 + ); 347 + describe_histogram!( 348 + "jetstream_read_bytes_decompressed", 349 + Unit::Bytes, 350 + "decompressed size of jetstream messages" 351 + ); 352 + describe_counter!( 353 + "jetstream_events", 354 + Unit::Count, 355 + "valid json messages received" 356 + ); 357 + describe_histogram!( 358 + "jetstream_events_queued", 359 + Unit::Count, 360 + "event messages waiting in queue" 361 + ); 362 + describe_gauge!( 363 + "jetstream_cursor_age", 364 + Unit::Microseconds, 365 + "microseconds between our clock and the jetstream event's time_us" 366 + ); 367 + describe_counter!( 368 + "consumer_events_non_actionable", 369 + Unit::Count, 370 + "count of non-actionable events" 371 + ); 372 + describe_counter!( 373 + "consumer_events_actionable", 374 + Unit::Count, 375 + "count of action by type. *all* atproto record delete events are included" 376 + ); 377 + describe_counter!( 378 + "consumer_events_actionable_links", 379 + Unit::Count, 380 + "total links encountered" 381 + ); 382 + describe_histogram!( 383 + "consumer_events_actionable_links", 384 + Unit::Count, 385 + "number of links per message" 386 + ); 387 + #[cfg(feature = "rocks")] 388 + { 389 + describe_histogram!( 390 + "storage_rocksdb_read_seconds", 391 + Unit::Seconds, 392 + "duration of the read stage of actions" 393 + ); 394 + describe_histogram!( 395 + "storage_rocksdb_action_seconds", 396 + Unit::Seconds, 397 + "duration of read + write of actions" 398 + ); 399 + describe_counter!( 400 + "storage_rocksdb_batch_ops_total", 401 + Unit::Count, 402 + "total batched operations from actions" 403 + ); 404 + describe_histogram!( 405 + "storage_rocksdb_delete_account_ops", 406 + Unit::Count, 407 + "total batched ops for account deletions" 408 + ); 409 + } 318 410 } 319 411 320 412 #[cfg(test)]
+1 -49
constellation/src/consumer/jetstream.rs
··· 1 1 use anyhow::{bail, Result}; 2 - use metrics::{ 3 - counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit, 4 - }; 2 + use metrics::{counter, gauge, histogram}; 5 3 use std::io::{Cursor, ErrorKind, Read}; 6 4 use std::net::ToSocketAddrs; 7 5 use std::thread; ··· 19 17 stream: String, 20 18 staying_alive: CancellationToken, 21 19 ) -> Result<()> { 22 - describe_counter!( 23 - "jetstream_connnect", 24 - Unit::Count, 25 - "attempts to connect to a jetstream server" 26 - ); 27 - describe_counter!( 28 - "jetstream_read", 29 - Unit::Count, 30 - "attempts to read an event from jetstream" 31 - ); 32 - describe_counter!( 33 - "jetstream_read_fail", 34 - Unit::Count, 35 - "failures to read events from jetstream" 36 - ); 37 - describe_counter!( 38 - "jetstream_read_bytes", 39 - Unit::Bytes, 40 - "total received message bytes from jetstream" 41 - ); 42 - describe_counter!( 43 - "jetstream_read_bytes_decompressed", 44 - Unit::Bytes, 45 - "total decompressed message bytes from jetstream" 46 - ); 47 - describe_histogram!( 48 - "jetstream_read_bytes_decompressed", 49 - Unit::Bytes, 50 - "decompressed size of jetstream messages" 51 - ); 52 - describe_counter!( 53 - "jetstream_events", 54 - Unit::Count, 55 - "valid json messages received" 56 - ); 57 - describe_histogram!( 58 - "jetstream_events_queued", 59 - Unit::Count, 60 - "event messages waiting in queue" 61 - ); 62 - describe_gauge!( 63 - "jetstream_cursor_age", 64 - Unit::Microseconds, 65 - "microseconds between our clock and the jetstream event's time_us" 66 - ); 67 - 68 20 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY); 69 21 let mut connect_retries = 0; 70 22 let mut latest_cursor = cursor;
+1 -22
constellation/src/consumer/mod.rs
··· 7 7 use jetstream::consume_jetstream; 8 8 use jsonl_file::consume_jsonl_file; 9 9 use links::{parse_any_link, record::walk_record, CollectedLink}; 10 - use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 10 + use metrics::{counter, histogram}; 11 11 use std::path::PathBuf; 12 12 use std::sync::atomic::{AtomicU32, Ordering}; 13 13 use std::sync::Arc; ··· 23 23 stream: String, 24 24 staying_alive: CancellationToken, 25 25 ) -> Result<()> { 26 - describe_counter!( 27 - "consumer_events_non_actionable", 28 - Unit::Count, 29 - "count of non-actionable events" 30 - ); 31 - describe_counter!( 32 - "consumer_events_actionable", 33 - Unit::Count, 34 - "count of action by type. *all* atproto record delete events are included" 35 - ); 36 - describe_counter!( 37 - "consumer_events_actionable_links", 38 - Unit::Count, 39 - "total links encountered" 40 - ); 41 - describe_histogram!( 42 - "consumer_events_actionable_links", 43 - Unit::Count, 44 - "number of links per message" 45 - ); 46 - 47 26 let mut fixture_cursor = None; 48 27 let (receiver, consumer_handle) = if let Some(f) = fixture { 49 28 let (sender, receiver) = flume::bounded(21);
+1 -25
constellation/src/storage/rocks_store.rs
··· 7 7 use anyhow::{anyhow, bail, Result}; 8 8 use bincode::Options as BincodeOptions; 9 9 use links::CollectedLink; 10 - use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 10 + use metrics::{counter, histogram}; 11 11 use ratelimit::Ratelimiter; 12 12 use rocksdb::backup::{BackupEngine, BackupEngineOptions}; 13 13 use rocksdb::{ ··· 256 256 257 257 impl RocksStorage { 258 258 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 259 - Self::describe_metrics(); 260 259 let me = RocksStorage::open_readmode(path, false)?; 261 260 me.global_init()?; 262 261 Ok(me) ··· 418 417 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 419 418 engine.purge_old_backups(num_backups_to_keep)?; 420 419 Ok(()) 421 - } 422 - 423 - fn describe_metrics() { 424 - describe_histogram!( 425 - "storage_rocksdb_read_seconds", 426 - Unit::Seconds, 427 - "duration of the read stage of actions" 428 - ); 429 - describe_histogram!( 430 - "storage_rocksdb_action_seconds", 431 - Unit::Seconds, 432 - "duration of read + write of actions" 433 - ); 434 - describe_counter!( 435 - "storage_rocksdb_batch_ops_total", 436 - Unit::Count, 437 - "total batched operations from actions" 438 - ); 439 - describe_histogram!( 440 - "storage_rocksdb_delete_account_ops", 441 - Unit::Count, 442 - "total batched ops for account deletions" 443 - ); 444 420 } 445 421 446 422 fn merge_op_extend_did_ids(