Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1use clap::Parser; 2use jetstream::events::Cursor; 3use metrics::{describe_gauge, gauge, Unit}; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use std::path::PathBuf; 6use std::time::{Duration, SystemTime}; 7use tokio::task::JoinSet; 8use ufos::consumer; 9use ufos::file_consumer; 10use ufos::server; 11use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 12use ufos::storage_fjall::FjallStorage; 13use ufos::store_types::SketchSecretPrefix; 14use ufos::{nice_duration, ConsumerInfo}; 15 16#[cfg(not(target_env = "msvc"))] 17use tikv_jemallocator::Jemalloc; 18 19#[cfg(not(target_env = "msvc"))] 20#[global_allocator] 21static GLOBAL: Jemalloc = Jemalloc; 22 23/// Aggregate links in the at-mosphere 24#[derive(Parser, Debug, Clone)] 25#[command(version, about, long_about = None)] 26struct Args { 27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 29 #[arg(long, env = "UFOS_JETSTREAM")] 30 jetstream: String, 31 /// allow changing jetstream endpoints 32 #[arg(long, action, env = "UFOS_JETSTREAM_FORCE")] 33 jetstream_force: bool, 34 /// don't request zstd-compressed jetstream events 35 /// 36 /// reduces CPU at the expense of more ingress bandwidth 37 #[arg(long, action, env = "UFOS_JETSTREAM_NO_ZSTD")] 38 jetstream_no_zstd: bool, 39 /// ufos server's listen address 40 #[arg(long, env = "UFOS_BIND")] 41 #[clap(default_value = "0.0.0.0:9990")] 42 bind: std::net::SocketAddr, 43 /// Location to store persist data to disk 44 #[arg(long, env = "UFOS_DATA")] 45 data: PathBuf, 46 /// DEBUG: don't start the jetstream consumer or its write loop 47 #[arg(long, action, env = "UFOS_PAUSE_WRITER")] 48 pause_writer: bool, 49 /// Adjust runtime settings like background task intervals for efficient backfill 50 #[arg(long, action, env = "UFOS_BACKFILL_MODE")] 51 backfill: bool, 52 /// DEBUG: force the rw loop to fall behind by pausing it 53 /// todo: restore this 54 #[arg(long, action)] 55 pause_rw: bool, 56 /// reset the rollup cursor, scrape through missed things in the past (backfill) 57 #[arg(long, action, env = "UFOS_REROLL")] 58 reroll: bool, 59 /// DEBUG: interpret jetstream as a file fixture 60 #[arg(long, action, env = "UFOS_JETSTREAM_FIXTURE")] 61 jetstream_fixture: bool, 62 /// enable metrics collection and serving 63 #[arg(long, action, env = "UFOS_COLLECT_METRICS")] 64 collect_metrics: bool, 65 /// metrics server's listen address 66 #[arg(long, env = "UFOS_BIND_METRICS")] 67 #[clap(default_value = "0.0.0.0:8765")] 68 bind_metrics: std::net::SocketAddr, 69} 70 71#[tokio::main] 72async fn main() -> anyhow::Result<()> { 73 env_logger::init(); 74 75 let args = Args::parse(); 76 let jetstream = args.jetstream.clone(); 77 let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init( 78 args.data.clone(), 79 jetstream, 80 args.jetstream_force, 81 Default::default(), 82 )?; 83 go(args, read_store, write_store, cursor, sketch_secret).await?; 84 Ok(()) 85} 86 87async fn go<B: StoreBackground + 'static>( 88 args: Args, 89 read_store: impl StoreReader + 'static + Clone, 90 mut write_store: impl StoreWriter<B> + 'static, 91 cursor: Option<Cursor>, 92 sketch_secret: SketchSecretPrefix, 93) -> anyhow::Result<()> { 94 let mut whatever_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 95 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 96 97 println!("starting server with storage..."); 98 let serving = server::serve(read_store.clone(), args.bind); 99 whatever_tasks.spawn(async move { 100 serving.await.map_err(|e| { 101 log::warn!("server ended: {e}"); 102 anyhow::anyhow!(e) 103 }) 104 }); 105 106 if args.pause_writer { 107 log::info!("not starting jetstream or the write loop."); 108 for t in whatever_tasks.join_all().await { 109 if let Err(e) = t { 110 return Err(anyhow::anyhow!(e)); 111 } 112 } 113 return Ok(()); 114 } 115 116 let batches = if args.jetstream_fixture { 117 log::info!("starting with jestream file fixture: {:?}", args.jetstream); 118 file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await? 119 } else { 120 log::info!( 121 "starting consumer with cursor: {cursor:?} from {:?} ago", 122 cursor.map(|c| c.elapsed()) 123 ); 124 consumer::consume(&args.jetstream, cursor, false, sketch_secret).await? 125 }; 126 127 let rolling = write_store 128 .background_tasks(args.reroll)? 129 .run(args.backfill); 130 whatever_tasks.spawn(async move { 131 rolling 132 .await 133 .inspect_err(|e| log::warn!("rollup ended: {e}"))?; 134 Ok(()) 135 }); 136 137 consumer_tasks.spawn(async move { 138 write_store 139 .receive_batches(batches) 140 .await 141 .inspect_err(|e| log::warn!("consumer ended: {e}"))?; 142 Ok(()) 143 }); 144 145 whatever_tasks.spawn(async move { 146 do_update_stuff(read_store).await; 147 log::warn!("status task ended"); 148 Ok(()) 149 }); 150 151 if args.collect_metrics { 152 log::trace!("installing metrics server..."); 153 install_metrics_server(args.bind_metrics)?; 154 } 155 156 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() { 157 log::warn!("task {i} done: {t:?}"); 158 } 159 160 println!("consumer tasks all completed, killing the others"); 161 whatever_tasks.shutdown().await; 162 163 println!("bye!"); 164 165 Ok(()) 166} 167 168fn install_metrics_server(bind: std::net::SocketAddr) -> anyhow::Result<()> { 169 log::info!("installing metrics server..."); 170 PrometheusBuilder::new() 171 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 172 .set_bucket_duration(Duration::from_secs(60))? 173 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 174 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 175 .with_http_listener(bind) 176 .install()?; 177 log::info!("metrics server installed! listening on {bind}"); 178 Ok(()) 179} 180 181async fn do_update_stuff(read_store: impl StoreReader) { 182 describe_gauge!( 183 "persisted_cursor_age", 184 Unit::Microseconds, 185 "microseconds between our clock and the latest persisted event's cursor" 186 ); 187 describe_gauge!( 188 "rollup_cursor_age", 189 Unit::Microseconds, 190 "microseconds between our clock and the latest rollup cursor" 191 ); 192 let started_at = std::time::SystemTime::now(); 193 let mut first_cursor = None; 194 let mut first_rollup = None; 195 let mut last_at = std::time::SystemTime::now(); 196 let mut last_cursor = None; 197 let mut last_rollup = None; 198 let mut interval = tokio::time::interval(std::time::Duration::from_secs(4)); 199 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 200 loop { 201 interval.tick().await; 202 read_store.update_metrics(); 203 match read_store.get_consumer_info().await { 204 Err(e) => log::warn!("failed to get jetstream consumer info: {e:?}"), 205 Ok(ConsumerInfo::Jetstream { 206 latest_cursor, 207 rollup_cursor, 208 .. 209 }) => { 210 let now = std::time::SystemTime::now(); 211 let latest_cursor = latest_cursor.map(Cursor::from_raw_u64); 212 let rollup_cursor = rollup_cursor.map(Cursor::from_raw_u64); 213 backfill_info( 214 latest_cursor, 215 rollup_cursor, 216 last_cursor, 217 last_rollup, 218 last_at, 219 first_cursor, 220 first_rollup, 221 started_at, 222 now, 223 ); 224 first_cursor = first_cursor.or(latest_cursor); 225 first_rollup = first_rollup.or(rollup_cursor); 226 last_cursor = latest_cursor; 227 last_rollup = rollup_cursor; 228 last_at = now; 229 } 230 } 231 } 232} 233 234#[allow(clippy::too_many_arguments)] 235fn backfill_info( 236 latest_cursor: Option<Cursor>, 237 rollup_cursor: Option<Cursor>, 238 last_cursor: Option<Cursor>, 239 last_rollup: Option<Cursor>, 240 last_at: SystemTime, 241 first_cursor: Option<Cursor>, 242 first_rollup: Option<Cursor>, 243 started_at: SystemTime, 244 now: SystemTime, 245) { 246 if let Some(cursor) = latest_cursor { 247 gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64()); 248 } 249 if let Some(cursor) = rollup_cursor { 250 gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64()); 251 } 252 253 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later) 254 { 255 (Some(earlier), Some(later)) => match later.duration_since(&earlier) { 256 Ok(dt) => nice_duration(dt), 257 Err(e) => { 258 let rev_dt = e.duration(); 259 format!("+{}", nice_duration(rev_dt)) 260 } 261 }, 262 _ => "unknown".to_string(), 263 }; 264 265 let rate = |mlatest: Option<Cursor>, msince: Option<Cursor>, real: Duration| { 266 mlatest 267 .zip(msince) 268 .map(|(latest, since)| { 269 latest 270 .duration_since(&since) 271 .unwrap_or(Duration::from_millis(1)) 272 }) 273 .map(|dtc| format!("{:.2}", dtc.as_secs_f64() / real.as_secs_f64())) 274 .unwrap_or("??".into()) 275 }; 276 277 let dt_real = now 278 .duration_since(last_at) 279 .unwrap_or(Duration::from_millis(1)); 280 281 let dt_real_total = now 282 .duration_since(started_at) 283 .unwrap_or(Duration::from_millis(1)); 284 285 let cursor_rate = rate(latest_cursor, last_cursor, dt_real); 286 let cursor_avg = rate(latest_cursor, first_cursor, dt_real_total); 287 288 let rollup_rate = rate(rollup_cursor, last_rollup, dt_real); 289 let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total); 290 291 log::trace!( 292 "cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).", 293 latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 294 nice_dt_two_maybes(last_cursor, latest_cursor), 295 rollup_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()), 296 nice_dt_two_maybes(last_rollup, rollup_cursor), 297 ); 298}