Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use foyer::HybridCache;
2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use slingshot::{
5 Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6};
7use std::net::SocketAddr;
8use std::path::PathBuf;
9
10use atrium_identity::did::DEFAULT_PLC_DIRECTORY_URL;
11use clap::Parser;
12use tokio_util::sync::CancellationToken;
13
14/// Slingshot record edge cache
15#[derive(Parser, Debug, Clone)]
16#[command(version, about, long_about = None)]
17struct Args {
18 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
19 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
20 #[arg(long, env = "SLINGSHOT_JETSTREAM")]
21 jetstream: String,
22 /// don't request zstd-compressed jetstream events
23 ///
24 /// reduces CPU at the expense of more ingress bandwidth
25 #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")]
26 jetstream_no_zstd: bool,
27 /// where to keep disk caches
28 #[arg(long, env = "SLINGSHOT_CACHE_DIR")]
29 cache_dir: PathBuf,
30 /// where to listen for incomming requests
31 ///
32 /// cannot be used with acme -- if you need ipv6 see --acme-ipv6
33 #[arg(long, env = "SLINGSHOT_BIND")]
34 #[clap(default_value = "0.0.0.0:8080")]
35 bind: SocketAddr,
36 /// memory cache size in megabytes for records
37 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_MEMORY_MB")]
38 #[clap(default_value_t = 64)]
39 record_cache_memory_mb: usize,
40 /// disk cache size in gigabytes for records
41 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_DISK_DB")]
42 #[clap(default_value_t = 1)]
43 record_cache_disk_gb: usize,
44 /// memory cache size in megabytes for identities
45 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_MEMORY_MB")]
46 #[clap(default_value_t = 64)]
47 identity_cache_memory_mb: usize,
48 /// disk cache size in gigabytes for identities
49 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
50 #[clap(default_value_t = 1)]
51 identity_cache_disk_gb: usize,
52 /// the plc directory used to resolve did:plc identities
53 #[arg(long, env = "SLINGSHOT_PLC_DIRECTORY")]
54 #[clap(default_value = DEFAULT_PLC_DIRECTORY_URL)]
55 plc_directory: String,
56 /// the domain pointing to this server
57 ///
58 /// if present:
59 /// - a did:web document will be served at /.well-known/did.json
60 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
61 /// - TODO: a rate-limiter will be installed
62 #[arg(
63 long,
64 conflicts_with("bind"),
65 requires("acme_cache_path"),
66 env = "SLINGSHOT_ACME_DOMAIN"
67 )]
68 acme_domain: Option<String>,
69 /// email address for letsencrypt contact
70 ///
71 /// recommended in production, i guess?
72 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")]
73 acme_contact: Option<String>,
74 /// a location to cache acme https certs
75 ///
76 /// required when (and only used when) --acme-domain is specified.
77 ///
78 /// recommended in production, but mind the file permissions.
79 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")]
80 acme_cache_path: Option<PathBuf>,
81 /// listen for ipv6 when using acme
82 ///
83 /// you must also configure the relevant DNS records for this to work
84 #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")]
85 acme_ipv6: bool,
86 /// an web address to send healtcheck pings to every ~51s or so
87 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")]
88 healthcheck: Option<String>,
89 /// enable metrics collection and serving
90 #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")]
91 collect_metrics: bool,
92 /// metrics server's listen address
93 #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")]
94 #[clap(default_value = "[::]:8765")]
95 bind_metrics: std::net::SocketAddr,
96}
97
98#[tokio::main]
99async fn main() -> Result<(), String> {
100 tracing_subscriber::fmt::init();
101
102 let shutdown = CancellationToken::new();
103
104 let ctrlc_shutdown = shutdown.clone();
105 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
106
107 let args = Args::parse();
108
109 if args.collect_metrics {
110 log::trace!("installing metrics server...");
111 if let Err(e) = install_metrics_server(args.bind_metrics) {
112 log::error!("failed to install metrics server: {e:?}");
113 } else {
114 log::info!("metrics listening at http://{}", args.bind_metrics);
115 }
116 }
117
118 std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
119 format!(
120 "failed to ensure cache parent dir: {e:?} (dir: {:?})",
121 args.cache_dir
122 )
123 })?;
124 let cache_dir = args.cache_dir.canonicalize().map_err(|e| {
125 format!(
126 "failed to canonicalize cache_dir: {e:?} (dir: {:?})",
127 args.cache_dir
128 )
129 })?;
130 log::info!("cache dir ready at at {cache_dir:?}.");
131
132 log::info!("setting up firehose cache...");
133 let cache = firehose_cache(
134 cache_dir.join("./firehose"),
135 args.record_cache_memory_mb,
136 args.record_cache_disk_gb,
137 )
138 .await?;
139 log::info!("firehose cache ready.");
140
141 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
142
143 log::info!("starting identity service...");
144 let identity = Identity::new(
145 cache_dir.join("./identity"),
146 args.identity_cache_memory_mb,
147 args.identity_cache_disk_gb,
148 args.plc_directory,
149 )
150 .await
151 .map_err(|e| format!("identity setup failed: {e:?}"))?;
152
153 log::info!("identity service ready.");
154 let identity_refresher = identity.clone();
155 let identity_shutdown = shutdown.clone();
156 tasks.spawn(async move {
157 identity_refresher.run_refresher(identity_shutdown).await?;
158 Ok(())
159 });
160
161 let repo = Repo::new(identity.clone());
162
163 let identity_for_server = identity.clone();
164 let server_shutdown = shutdown.clone();
165 let server_cache_handle = cache.clone();
166 let bind = args.bind;
167 tasks.spawn(async move {
168 serve(
169 server_cache_handle,
170 identity_for_server,
171 repo,
172 args.acme_domain,
173 args.acme_contact,
174 args.acme_cache_path,
175 args.acme_ipv6,
176 server_shutdown,
177 bind,
178 )
179 .await?;
180 Ok(())
181 });
182
183 let identity_refreshable = identity.clone();
184 let consumer_shutdown = shutdown.clone();
185 let consumer_cache = cache.clone();
186 tasks.spawn(async move {
187 consume(
188 args.jetstream,
189 None,
190 args.jetstream_no_zstd,
191 identity_refreshable,
192 consumer_shutdown,
193 consumer_cache,
194 )
195 .await?;
196 Ok(())
197 });
198
199 if let Some(hc) = args.healthcheck {
200 let healthcheck_shutdown = shutdown.clone();
201 tasks.spawn(async move {
202 healthcheck(hc, healthcheck_shutdown).await?;
203 Ok(())
204 });
205 }
206
207 tokio::select! {
208 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
209 Some(r) = tasks.join_next() => {
210 log::warn!("a task exited, shutting down: {r:?}");
211 shutdown.cancel();
212 }
213 }
214
215 tasks.spawn(async move {
216 cache
217 .close()
218 .await
219 .map_err(MainTaskError::FirehoseCacheCloseError)
220 });
221
222 tokio::select! {
223 _ = async {
224 while let Some(completed) = tasks.join_next().await {
225 log::info!("shutdown: task completed: {completed:?}");
226 }
227 } => {},
228 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
229 log::info!("shutdown: not all tasks completed on time. aborting...");
230 tasks.shutdown().await;
231 },
232 }
233
234 log::info!("bye!");
235
236 Ok(())
237}
238
239fn install_metrics_server(
240 bind_metrics: std::net::SocketAddr,
241) -> Result<(), metrics_exporter_prometheus::BuildError> {
242 log::info!("installing metrics server...");
243 PrometheusBuilder::new()
244 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
245 .set_bucket_duration(std::time::Duration::from_secs(300))?
246 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
247 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
248 .with_http_listener(bind_metrics)
249 .install()?;
250 log::info!(
251 "metrics server installed! listening on http://{}",
252 bind_metrics
253 );
254 Ok(())
255}