Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

at pocket 9.9 kB View raw
1use anyhow::{bail, Result}; 2use clap::{Parser, ValueEnum}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use std::num::NonZero; 5use std::path::PathBuf; 6use std::sync::{atomic::AtomicU32, Arc}; 7use std::thread; 8use std::time; 9use tokio::runtime; 10use tokio_util::sync::CancellationToken; 11 12use constellation::consumer::consume; 13use constellation::server::serve; 14#[cfg(feature = "rocks")] 15use constellation::storage::RocksStorage; 16use constellation::storage::{LinkReader, LinkStorage, MemStorage, StorageStats}; 17 18const MONITOR_INTERVAL: time::Duration = time::Duration::from_secs(15); 19 20/// Aggregate links in the at-mosphere 21#[derive(Parser, Debug)] 22#[command(version, about, long_about = None)] 23struct Args { 24 #[arg(short, long)] 25 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 26 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 27 #[arg(short, long)] 28 jetstream: String, 29 // TODO: make this part of rocks' own sub-config? 30 /// Where to store data on disk, for backends that use disk storage 31 #[arg(short, long)] 32 data: Option<PathBuf>, 33 /// Storage backend to use 34 #[arg(short, long)] 35 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 36 backend: StorageBackend, 37 /// Initiate a database backup into this dir, if supported by the storage 38 #[arg(long)] 39 backup: Option<PathBuf>, 40 /// Start a background task to take backups every N hours 41 #[arg(long)] 42 backup_interval: Option<u64>, 43 /// Keep at most this many backups purging oldest first, requires --backup-interval 44 #[arg(long)] 45 max_old_backups: Option<usize>, 46 /// Saved jsonl from jetstream to use instead of a live subscription 47 #[arg(short, long)] 48 fixture: Option<PathBuf>, 49} 50 51#[derive(Debug, Clone, ValueEnum)] 52enum StorageBackend { 53 Memory, 54 #[cfg(feature = "rocks")] 55 Rocks, 56} 57 58fn jetstream_url(provided: &str) -> String { 59 match provided { 60 "us-east-1" => "wss://jetstream1.us-east.bsky.network/subscribe".into(), 61 "us-east-2" => "wss://jetstream2.us-east.bsky.network/subscribe".into(), 62 "us-west-1" => "wss://jetstream1.us-west.bsky.network/subscribe".into(), 63 "us-west-2" => "wss://jetstream2.us-west.bsky.network/subscribe".into(), 64 custom => custom.into(), 65 } 66} 67 68fn main() -> Result<()> { 69 let args = Args::parse(); 70 71 println!("starting with storage backend: {:?}...", args.backend); 72 73 let fixture = args.fixture; 74 if let Some(ref p) = fixture { 75 println!("using fixture at {p:?}..."); 76 } 77 78 let stream = jetstream_url(&args.jetstream); 79 println!("using jetstream server {stream:?}...",); 80 81 let stay_alive = CancellationToken::new(); 82 83 match args.backend { 84 StorageBackend::Memory => run(MemStorage::new(), fixture, None, stream, stay_alive), 85 #[cfg(feature = "rocks")] 86 StorageBackend::Rocks => { 87 let storage_dir = args.data.clone().unwrap_or("rocks.test".into()); 88 println!("starting rocksdb..."); 89 let mut rocks = RocksStorage::new(storage_dir)?; 90 if let Some(backup_dir) = args.backup { 91 let auto_backup = match (args.backup_interval, args.max_old_backups) { 92 (Some(interval_hrs), copies) => Some((interval_hrs, copies)), 93 (None, None) => None, 94 (None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"), 95 }; 96 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?; 97 } 98 println!("rocks ready."); 99 run(rocks, fixture, args.data, stream, stay_alive) 100 } 101 } 102} 103 104fn run( 105 mut storage: impl LinkStorage, 106 fixture: Option<PathBuf>, 107 data_dir: Option<PathBuf>, 108 stream: String, 109 stay_alive: CancellationToken, 110) -> Result<()> { 111 ctrlc::set_handler({ 112 let mut desperation: u8 = 0; 113 let stay_alive = stay_alive.clone(); 114 move || match desperation { 115 0 => { 116 println!("ok, shutting down..."); 117 stay_alive.cancel(); 118 desperation += 1; 119 } 120 1.. => panic!("fine, panicking!"), 121 } 122 })?; 123 124 let qsize = Arc::new(AtomicU32::new(0)); 125 126 thread::scope(|s| { 127 let readable = storage.to_readable(); 128 129 s.spawn({ 130 let qsize = qsize.clone(); 131 let stay_alive = stay_alive.clone(); 132 let staying_alive = stay_alive.clone(); 133 move || { 134 if let Err(e) = consume(storage, qsize, fixture, stream, staying_alive) { 135 eprintln!("jetstream finished with error: {e}"); 136 } 137 stay_alive.drop_guard(); 138 } 139 }); 140 141 s.spawn({ 142 let readable = readable.clone(); 143 let stay_alive = stay_alive.clone(); 144 let staying_alive = stay_alive.clone(); 145 || { 146 runtime::Builder::new_multi_thread() 147 .worker_threads(1) 148 .max_blocking_threads(2) 149 .enable_all() 150 .build() 151 .expect("axum startup") 152 .block_on(async { 153 install_metrics_server()?; 154 serve(readable, "0.0.0.0:6789", staying_alive).await 155 }) 156 .unwrap(); 157 stay_alive.drop_guard(); 158 } 159 }); 160 161 s.spawn(move || { // monitor thread 162 let stay_alive = stay_alive.clone(); 163 let check_alive = stay_alive.clone(); 164 165 let process_collector = metrics_process::Collector::default(); 166 process_collector.describe(); 167 metrics::describe_gauge!( 168 "storage_available", 169 metrics::Unit::Bytes, 170 "available to be allocated" 171 ); 172 metrics::describe_gauge!( 173 "storage_free", 174 metrics::Unit::Bytes, 175 "unused bytes in filesystem" 176 ); 177 if let Some(ref p) = data_dir { 178 if let Err(e) = fs4::available_space(p) { 179 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 180 } else { 181 println!("disk space monitoring should work, watching at {p:?}"); 182 } 183 } 184 185 'monitor: loop { 186 match readable.get_stats() { 187 Ok(StorageStats { dids, targetables, linking_records }) => { 188 metrics::gauge!("storage.stats.dids").set(dids as f64); 189 metrics::gauge!("storage.stats.targetables").set(targetables as f64); 190 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 191 } 192 Err(e) => eprintln!("failed to get stats: {e:?}"), 193 } 194 195 process_collector.collect(); 196 if let Some(ref p) = data_dir { 197 if let Ok(avail) = fs4::available_space(p) { 198 metrics::gauge!("storage.available").set(avail as f64); 199 } 200 if let Ok(free) = fs4::free_space(p) { 201 metrics::gauge!("storage.free").set(free as f64); 202 } 203 } 204 let wait = time::Instant::now(); 205 while wait.elapsed() < MONITOR_INTERVAL { 206 thread::sleep(time::Duration::from_millis(100)); 207 if check_alive.is_cancelled() { 208 break 'monitor 209 } 210 } 211 } 212 stay_alive.drop_guard(); 213 }); 214 }); 215 216 println!("byeeee"); 217 218 Ok(()) 219} 220 221fn install_metrics_server() -> Result<()> { 222 println!("installing metrics server..."); 223 let host = [0, 0, 0, 0]; 224 let port = 8765; 225 PrometheusBuilder::new() 226 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 227 .set_bucket_duration(time::Duration::from_secs(30))? 228 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here. 229 .set_enable_unit_suffix(true) 230 .with_http_listener((host, port)) 231 .install()?; 232 println!( 233 "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 234 host[0], host[1], host[2], host[3] 235 ); 236 Ok(()) 237} 238 239#[cfg(test)] 240mod tests { 241 use constellation::consumer::get_actionable; 242 use constellation::storage::{LinkReader, LinkStorage, MemStorage}; 243 244 #[test] 245 fn test_create_like_integrated() { 246 let mut storage = MemStorage::new(); 247 248 let rec = r#"{ 249 "did":"did:plc:icprmty6ticzracr5urz4uum", 250 "time_us":1736448492661668, 251 "kind":"commit", 252 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{ 253 "$type":"app.bsky.feed.like", 254 "createdAt":"2025-01-09T18:48:10.412Z", 255 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"} 256 }, 257 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"} 258 }"#.parse().unwrap(); 259 let (action, ts) = get_actionable(&rec).unwrap(); 260 storage.push(&action, ts).unwrap(); 261 assert_eq!( 262 storage 263 .get_count( 264 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23", 265 "app.bsky.feed.like", 266 ".subject.uri" 267 ) 268 .unwrap(), 269 1 270 ); 271 } 272}