Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use foyer::HybridCache;
2// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use slingshot::{
5 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6};
7use std::net::SocketAddr;
8use std::path::PathBuf;
9
10use clap::Parser;
11use tokio_util::sync::CancellationToken;
12
13/// Slingshot record edge cache
14#[derive(Parser, Debug, Clone)]
15#[command(version, about, long_about = None)]
16struct Args {
17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
19 #[arg(long, env = "SLINGSHOT_JETSTREAM")]
20 jetstream: String,
21 /// don't request zstd-compressed jetstream events
22 ///
23 /// reduces CPU at the expense of more ingress bandwidth
24 #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")]
25 jetstream_no_zstd: bool,
26 /// where to keep disk caches
27 #[arg(long, env = "SLINGSHOT_CACHE_DIR")]
28 cache_dir: PathBuf,
29 /// where to listen for incomming requests
30 ///
31 /// cannot be used with acme -- if you need ipv6 see --acme-ipv6
32 #[arg(long, env = "SLINGSHOT_BIND")]
33 #[clap(default_value = "0.0.0.0:8080")]
34 bind: SocketAddr,
35 /// memory cache size in megabytes for records
36 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_MEMORY_MB")]
37 #[clap(default_value_t = 64)]
38 record_cache_memory_mb: usize,
39 /// disk cache size in gigabytes for records
40 #[arg(long, env = "SLINGSHOT_RECORD_CACHE_DISK_DB")]
41 #[clap(default_value_t = 1)]
42 record_cache_disk_gb: usize,
43 /// memory cache size in megabytes for identities
44 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_MEMORY_MB")]
45 #[clap(default_value_t = 64)]
46 identity_cache_memory_mb: usize,
47 /// disk cache size in gigabytes for identities
48 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
49 #[clap(default_value_t = 1)]
50 identity_cache_disk_gb: usize,
51 /// the domain pointing to this server
52 ///
53 /// if present:
54 /// - a did:web document will be served at /.well-known/did.json
55 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
56 /// - TODO: a rate-limiter will be installed
57 #[arg(
58 long,
59 conflicts_with("bind"),
60 requires("acme_cache_path"),
61 env = "SLINGSHOT_ACME_DOMAIN"
62 )]
63 acme_domain: Option<String>,
64 /// email address for letsencrypt contact
65 ///
66 /// recommended in production, i guess?
67 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")]
68 acme_contact: Option<String>,
69 /// a location to cache acme https certs
70 ///
71 /// required when (and only used when) --acme-domain is specified.
72 ///
73 /// recommended in production, but mind the file permissions.
74 #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")]
75 acme_cache_path: Option<PathBuf>,
76 /// listen for ipv6 when using acme
77 ///
78 /// you must also configure the relevant DNS records for this to work
79 #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")]
80 acme_ipv6: bool,
81 /// an web address to send healtcheck pings to every ~51s or so
82 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")]
83 healthcheck: Option<String>,
84 /// enable metrics collection and serving
85 #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")]
86 collect_metrics: bool,
87 /// metrics server's listen address
88 #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")]
89 #[clap(default_value = "[::]:8765")]
90 bind_metrics: std::net::SocketAddr,
91}
92
93#[tokio::main]
94async fn main() -> Result<(), String> {
95 tracing_subscriber::fmt::init();
96
97 let shutdown = CancellationToken::new();
98
99 let ctrlc_shutdown = shutdown.clone();
100 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
101
102 let args = Args::parse();
103
104 if args.collect_metrics {
105 log::trace!("installing metrics server...");
106 if let Err(e) = install_metrics_server(args.bind_metrics) {
107 log::error!("failed to install metrics server: {e:?}");
108 } else {
109 log::info!("metrics listening at http://{}", args.bind_metrics);
110 }
111 }
112
113 std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
114 format!(
115 "failed to ensure cache parent dir: {e:?} (dir: {:?})",
116 args.cache_dir
117 )
118 })?;
119 let cache_dir = args.cache_dir.canonicalize().map_err(|e| {
120 format!(
121 "failed to canonicalize cache_dir: {e:?} (dir: {:?})",
122 args.cache_dir
123 )
124 })?;
125 log::info!("cache dir ready at at {cache_dir:?}.");
126
127 log::info!("setting up firehose cache...");
128 let cache = firehose_cache(
129 cache_dir.join("./firehose"),
130 args.record_cache_memory_mb,
131 args.record_cache_disk_gb,
132 )
133 .await?;
134 log::info!("firehose cache ready.");
135
136 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
137
138 log::info!("starting identity service...");
139 let identity = Identity::new(
140 cache_dir.join("./identity"),
141 args.identity_cache_memory_mb,
142 args.identity_cache_disk_gb,
143 )
144 .await
145 .map_err(|e| format!("identity setup failed: {e:?}"))?;
146 let identity_refresher = identity.clone();
147 let identity_shutdown = shutdown.clone();
148 tasks.spawn(async move {
149 identity_refresher.run_refresher(identity_shutdown).await?;
150 Ok(())
151 });
152 log::info!("identity service ready.");
153
154 let repo = Repo::new(identity.clone());
155 let proxy = Proxy::new(repo.clone());
156
157 let identity_for_server = identity.clone();
158 let server_shutdown = shutdown.clone();
159 let server_cache_handle = cache.clone();
160 let bind = args.bind;
161 tasks.spawn(async move {
162 serve(
163 server_cache_handle,
164 identity_for_server,
165 repo,
166 proxy,
167 args.acme_domain,
168 args.acme_contact,
169 args.acme_cache_path,
170 args.acme_ipv6,
171 server_shutdown,
172 bind,
173 )
174 .await?;
175 Ok(())
176 });
177
178 let identity_refreshable = identity.clone();
179 let consumer_shutdown = shutdown.clone();
180 let consumer_cache = cache.clone();
181 tasks.spawn(async move {
182 consume(
183 args.jetstream,
184 None,
185 args.jetstream_no_zstd,
186 identity_refreshable,
187 consumer_shutdown,
188 consumer_cache,
189 )
190 .await?;
191 Ok(())
192 });
193
194 if let Some(hc) = args.healthcheck {
195 let healthcheck_shutdown = shutdown.clone();
196 tasks.spawn(async move {
197 healthcheck(hc, healthcheck_shutdown).await?;
198 Ok(())
199 });
200 }
201
202 tokio::select! {
203 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
204 Some(r) = tasks.join_next() => {
205 log::warn!("a task exited, shutting down: {r:?}");
206 shutdown.cancel();
207 }
208 }
209
210 tasks.spawn(async move {
211 cache
212 .close()
213 .await
214 .map_err(MainTaskError::FirehoseCacheCloseError)
215 });
216
217 tokio::select! {
218 _ = async {
219 while let Some(completed) = tasks.join_next().await {
220 log::info!("shutdown: task completed: {completed:?}");
221 }
222 } => {},
223 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
224 log::info!("shutdown: not all tasks completed on time. aborting...");
225 tasks.shutdown().await;
226 },
227 }
228
229 log::info!("bye!");
230
231 Ok(())
232}
233
234fn install_metrics_server(
235 bind_metrics: std::net::SocketAddr,
236) -> Result<(), metrics_exporter_prometheus::BuildError> {
237 log::info!("installing metrics server...");
238 PrometheusBuilder::new()
239 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
240 .set_bucket_duration(std::time::Duration::from_secs(300))?
241 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
242 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
243 .with_http_listener(bind_metrics)
244 .install()?;
245 log::info!(
246 "metrics server installed! listening on http://{}",
247 bind_metrics
248 );
249 Ok(())
250}