Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1// use foyer::HybridCache; 2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3use metrics_exporter_prometheus::PrometheusBuilder; 4use slingshot::{ 5 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6}; 7use std::net::SocketAddr; 8use std::path::PathBuf; 9 10use clap::Parser; 11use tokio_util::sync::CancellationToken; 12 13/// Slingshot record edge cache 14#[derive(Parser, Debug, Clone)] 15#[command(version, about, long_about = None)] 16struct Args { 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 #[arg(long, env = "SLINGSHOT_JETSTREAM")] 20 jetstream: String, 21 /// don't request zstd-compressed jetstream events 22 /// 23 /// reduces CPU at the expense of more ingress bandwidth 24 #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")] 25 jetstream_no_zstd: bool, 26 /// where to keep disk caches 27 #[arg(long, env = "SLINGSHOT_CACHE_DIR")] 28 cache_dir: PathBuf, 29 /// where to listen for incomming requests 30 /// 31 /// cannot be used with acme -- if you need ipv6 see --acme-ipv6 32 #[arg(long, env = "SLINGSHOT_BIND")] 33 #[clap(default_value = "0.0.0.0:8080")] 34 bind: SocketAddr, 35 /// memory cache size in megabytes for records 36 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_MEMORY_MB")] 37 #[clap(default_value_t = 64)] 38 record_cache_memory_mb: usize, 39 /// disk cache size in gigabytes for records 40 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_DISK_DB")] 41 #[clap(default_value_t = 1)] 42 record_cache_disk_gb: usize, 43 /// memory cache size in megabytes for identities 44 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_MEMORY_MB")] 45 #[clap(default_value_t = 64)] 46 identity_cache_memory_mb: usize, 47 /// disk cache size in gigabytes for identities 48 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 49 #[clap(default_value_t = 1)] 50 identity_cache_disk_gb: usize, 51 /// the domain pointing to this server 52 /// 53 /// if present: 54 /// - a did:web document will be served at /.well-known/did.json 55 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 56 /// - TODO: a rate-limiter will be installed 57 #[arg( 58 long, 59 conflicts_with("bind"), 60 requires("acme_cache_path"), 61 env = "SLINGSHOT_ACME_DOMAIN" 62 )] 63 acme_domain: Option<String>, 64 /// email address for letsencrypt contact 65 /// 66 /// recommended in production, i guess? 67 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")] 68 acme_contact: Option<String>, 69 /// a location to cache acme https certs 70 /// 71 /// required when (and only used when) --acme-domain is specified. 72 /// 73 /// recommended in production, but mind the file permissions. 74 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")] 75 acme_cache_path: Option<PathBuf>, 76 /// listen for ipv6 when using acme 77 /// 78 /// you must also configure the relevant DNS records for this to work 79 #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")] 80 acme_ipv6: bool, 81 /// an web address to send healtcheck pings to every ~51s or so 82 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")] 83 healthcheck: Option<String>, 84 /// enable metrics collection and serving 85 #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")] 86 collect_metrics: bool, 87 /// metrics server's listen address 88 #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")] 89 #[clap(default_value = "[::]:8765")] 90 bind_metrics: std::net::SocketAddr, 91} 92 93#[tokio::main] 94async fn main() -> Result<(), String> { 95 tracing_subscriber::fmt::init(); 96 97 let shutdown = CancellationToken::new(); 98 99 let ctrlc_shutdown = shutdown.clone(); 100 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 101 102 let args = Args::parse(); 103 104 if args.collect_metrics { 105 log::trace!("installing metrics server..."); 106 if let Err(e) = install_metrics_server(args.bind_metrics) { 107 log::error!("failed to install metrics server: {e:?}"); 108 } else { 109 log::info!("metrics listening at http://{}", args.bind_metrics); 110 } 111 } 112 113 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { 114 format!( 115 "failed to ensure cache parent dir: {e:?} (dir: {:?})", 116 args.cache_dir 117 ) 118 })?; 119 let cache_dir = args.cache_dir.canonicalize().map_err(|e| { 120 format!( 121 "failed to canonicalize cache_dir: {e:?} (dir: {:?})", 122 args.cache_dir 123 ) 124 })?; 125 log::info!("cache dir ready at at {cache_dir:?}."); 126 127 log::info!("setting up firehose cache..."); 128 let cache = firehose_cache( 129 cache_dir.join("./firehose"), 130 args.record_cache_memory_mb, 131 args.record_cache_disk_gb, 132 ) 133 .await?; 134 log::info!("firehose cache ready."); 135 136 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 137 138 log::info!("starting identity service..."); 139 let identity = Identity::new( 140 cache_dir.join("./identity"), 141 args.identity_cache_memory_mb, 142 args.identity_cache_disk_gb, 143 ) 144 .await 145 .map_err(|e| format!("identity setup failed: {e:?}"))?; 146 let identity_refresher = identity.clone(); 147 let identity_shutdown = shutdown.clone(); 148 tasks.spawn(async move { 149 identity_refresher.run_refresher(identity_shutdown).await?; 150 Ok(()) 151 }); 152 log::info!("identity service ready."); 153 154 let repo = Repo::new(identity.clone()); 155 let proxy = Proxy::new(repo.clone()); 156 157 let identity_for_server = identity.clone(); 158 let server_shutdown = shutdown.clone(); 159 let server_cache_handle = cache.clone(); 160 let bind = args.bind; 161 tasks.spawn(async move { 162 serve( 163 server_cache_handle, 164 identity_for_server, 165 repo, 166 proxy, 167 args.acme_domain, 168 args.acme_contact, 169 args.acme_cache_path, 170 args.acme_ipv6, 171 server_shutdown, 172 bind, 173 ) 174 .await?; 175 Ok(()) 176 }); 177 178 let identity_refreshable = identity.clone(); 179 let consumer_shutdown = shutdown.clone(); 180 let consumer_cache = cache.clone(); 181 tasks.spawn(async move { 182 consume( 183 args.jetstream, 184 None, 185 args.jetstream_no_zstd, 186 identity_refreshable, 187 consumer_shutdown, 188 consumer_cache, 189 ) 190 .await?; 191 Ok(()) 192 }); 193 194 if let Some(hc) = args.healthcheck { 195 let healthcheck_shutdown = shutdown.clone(); 196 tasks.spawn(async move { 197 healthcheck(hc, healthcheck_shutdown).await?; 198 Ok(()) 199 }); 200 } 201 202 tokio::select! { 203 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 204 Some(r) = tasks.join_next() => { 205 log::warn!("a task exited, shutting down: {r:?}"); 206 shutdown.cancel(); 207 } 208 } 209 210 tasks.spawn(async move { 211 cache 212 .close() 213 .await 214 .map_err(MainTaskError::FirehoseCacheCloseError) 215 }); 216 217 tokio::select! { 218 _ = async { 219 while let Some(completed) = tasks.join_next().await { 220 log::info!("shutdown: task completed: {completed:?}"); 221 } 222 } => {}, 223 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => { 224 log::info!("shutdown: not all tasks completed on time. aborting..."); 225 tasks.shutdown().await; 226 }, 227 } 228 229 log::info!("bye!"); 230 231 Ok(()) 232} 233 234fn install_metrics_server( 235 bind_metrics: std::net::SocketAddr, 236) -> Result<(), metrics_exporter_prometheus::BuildError> { 237 log::info!("installing metrics server..."); 238 PrometheusBuilder::new() 239 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 240 .set_bucket_duration(std::time::Duration::from_secs(300))? 241 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 242 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 243 .with_http_listener(bind_metrics) 244 .install()?; 245 log::info!( 246 "metrics server installed! listening on http://{}", 247 bind_metrics 248 ); 249 Ok(()) 250}