Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 8.7 kB View raw
1// use foyer::HybridCache; 2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use slingshot::{ 5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6}; 7use std::net::SocketAddr; 8use std::path::PathBuf; 9 10use atrium_identity::did::DEFAULT_PLC_DIRECTORY_URL; 11use clap::Parser; 12use tokio_util::sync::CancellationToken; 13 14/// Slingshot record edge cache 15#[derive(Parser, Debug, Clone)] 16#[command(version, about, long_about = None)] 17struct Args { 18 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 19 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 20 #[arg(long, env = "SLINGSHOT_JETSTREAM")] 21 jetstream: String, 22 /// don't request zstd-compressed jetstream events 23 /// 24 /// reduces CPU at the expense of more ingress bandwidth 25 #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")] 26 jetstream_no_zstd: bool, 27 /// where to keep disk caches 28 #[arg(long, env = "SLINGSHOT_CACHE_DIR")] 29 cache_dir: PathBuf, 30 /// where to listen for incomming requests 31 /// 32 /// cannot be used with acme -- if you need ipv6 see --acme-ipv6 33 #[arg(long, env = "SLINGSHOT_BIND")] 34 #[clap(default_value = "0.0.0.0:8080")] 35 bind: SocketAddr, 36 /// memory cache size in megabytes for records 37 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_MEMORY_MB")] 38 #[clap(default_value_t = 64)] 39 record_cache_memory_mb: usize, 40 /// disk cache size in gigabytes for records 41 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_DISK_DB")] 42 #[clap(default_value_t = 1)] 43 record_cache_disk_gb: usize, 44 /// memory cache size in megabytes for identities 45 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_MEMORY_MB")] 46 #[clap(default_value_t = 64)] 47 identity_cache_memory_mb: usize, 48 /// disk cache size in gigabytes for identities 49 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 50 #[clap(default_value_t = 1)] 51 identity_cache_disk_gb: usize, 52 /// the plc directory used to resolve did:plc identities 53 #[arg(long, env = "SLINGSHOT_PLC_DIRECTORY")] 54 #[clap(default_value = DEFAULT_PLC_DIRECTORY_URL)] 55 plc_directory: String, 56 /// the domain pointing to this server 57 /// 58 /// if present: 59 /// - a did:web document will be served at /.well-known/did.json 60 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 61 /// - TODO: a rate-limiter will be installed 62 #[arg( 63 long, 64 conflicts_with("bind"), 65 requires("acme_cache_path"), 66 env = "SLINGSHOT_ACME_DOMAIN" 67 )] 68 acme_domain: Option<String>, 69 /// email address for letsencrypt contact 70 /// 71 /// recommended in production, i guess? 72 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")] 73 acme_contact: Option<String>, 74 /// a location to cache acme https certs 75 /// 76 /// required when (and only used when) --acme-domain is specified. 77 /// 78 /// recommended in production, but mind the file permissions. 79 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")] 80 acme_cache_path: Option<PathBuf>, 81 /// listen for ipv6 when using acme 82 /// 83 /// you must also configure the relevant DNS records for this to work 84 #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")] 85 acme_ipv6: bool, 86 /// an web address to send healtcheck pings to every ~51s or so 87 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")] 88 healthcheck: Option<String>, 89 /// enable metrics collection and serving 90 #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")] 91 collect_metrics: bool, 92 /// metrics server's listen address 93 #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")] 94 #[clap(default_value = "[::]:8765")] 95 bind_metrics: std::net::SocketAddr, 96} 97 98#[tokio::main] 99async fn main() -> Result<(), String> { 100 tracing_subscriber::fmt::init(); 101 102 let shutdown = CancellationToken::new(); 103 104 let ctrlc_shutdown = shutdown.clone(); 105 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 106 107 let args = Args::parse(); 108 109 if args.collect_metrics { 110 log::trace!("installing metrics server..."); 111 if let Err(e) = install_metrics_server(args.bind_metrics) { 112 log::error!("failed to install metrics server: {e:?}"); 113 } else { 114 log::info!("metrics listening at http://{}", args.bind_metrics); 115 } 116 } 117 118 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { 119 format!( 120 "failed to ensure cache parent dir: {e:?} (dir: {:?})", 121 args.cache_dir 122 ) 123 })?; 124 let cache_dir = args.cache_dir.canonicalize().map_err(|e| { 125 format!( 126 "failed to canonicalize cache_dir: {e:?} (dir: {:?})", 127 args.cache_dir 128 ) 129 })?; 130 log::info!("cache dir ready at at {cache_dir:?}."); 131 132 log::info!("setting up firehose cache..."); 133 let cache = firehose_cache( 134 cache_dir.join("./firehose"), 135 args.record_cache_memory_mb, 136 args.record_cache_disk_gb, 137 ) 138 .await?; 139 log::info!("firehose cache ready."); 140 141 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 142 143 log::info!("starting identity service..."); 144 let identity = Identity::new( 145 cache_dir.join("./identity"), 146 args.identity_cache_memory_mb, 147 args.identity_cache_disk_gb, 148 args.plc_directory, 149 ) 150 .await 151 .map_err(|e| format!("identity setup failed: {e:?}"))?; 152 153 log::info!("identity service ready."); 154 let identity_refresher = identity.clone(); 155 let identity_shutdown = shutdown.clone(); 156 tasks.spawn(async move { 157 identity_refresher.run_refresher(identity_shutdown).await?; 158 Ok(()) 159 }); 160 161 let repo = Repo::new(identity.clone()); 162 163 let identity_for_server = identity.clone(); 164 let server_shutdown = shutdown.clone(); 165 let server_cache_handle = cache.clone(); 166 let bind = args.bind; 167 tasks.spawn(async move { 168 serve( 169 server_cache_handle, 170 identity_for_server, 171 repo, 172 args.acme_domain, 173 args.acme_contact, 174 args.acme_cache_path, 175 args.acme_ipv6, 176 server_shutdown, 177 bind, 178 ) 179 .await?; 180 Ok(()) 181 }); 182 183 let identity_refreshable = identity.clone(); 184 let consumer_shutdown = shutdown.clone(); 185 let consumer_cache = cache.clone(); 186 tasks.spawn(async move { 187 consume( 188 args.jetstream, 189 None, 190 args.jetstream_no_zstd, 191 identity_refreshable, 192 consumer_shutdown, 193 consumer_cache, 194 ) 195 .await?; 196 Ok(()) 197 }); 198 199 if let Some(hc) = args.healthcheck { 200 let healthcheck_shutdown = shutdown.clone(); 201 tasks.spawn(async move { 202 healthcheck(hc, healthcheck_shutdown).await?; 203 Ok(()) 204 }); 205 } 206 207 tokio::select! { 208 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 209 Some(r) = tasks.join_next() => { 210 log::warn!("a task exited, shutting down: {r:?}"); 211 shutdown.cancel(); 212 } 213 } 214 215 tasks.spawn(async move { 216 cache 217 .close() 218 .await 219 .map_err(MainTaskError::FirehoseCacheCloseError) 220 }); 221 222 tokio::select! { 223 _ = async { 224 while let Some(completed) = tasks.join_next().await { 225 log::info!("shutdown: task completed: {completed:?}"); 226 } 227 } => {}, 228 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => { 229 log::info!("shutdown: not all tasks completed on time. aborting..."); 230 tasks.shutdown().await; 231 }, 232 } 233 234 log::info!("bye!"); 235 236 Ok(()) 237} 238 239fn install_metrics_server( 240 bind_metrics: std::net::SocketAddr, 241) -> Result<(), metrics_exporter_prometheus::BuildError> { 242 log::info!("installing metrics server..."); 243 PrometheusBuilder::new() 244 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 245 .set_bucket_duration(std::time::Duration::from_secs(300))? 246 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 247 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 248 .with_http_listener(bind_metrics) 249 .install()?; 250 log::info!( 251 "metrics server installed! listening on http://{}", 252 bind_metrics 253 ); 254 Ok(()) 255}