Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use anyhow::{bail, Result};
2use clap::{Parser, ValueEnum};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use std::num::NonZero;
5use std::path::PathBuf;
6use std::sync::{atomic::AtomicU32, Arc};
7use std::thread;
8use std::time;
9use tokio::runtime;
10use tokio_util::sync::CancellationToken;
11
12use constellation::consumer::consume;
13use constellation::server::serve;
14#[cfg(feature = "rocks")]
15use constellation::storage::RocksStorage;
16use constellation::storage::{LinkReader, LinkStorage, MemStorage, StorageStats};
17
18const MONITOR_INTERVAL: time::Duration = time::Duration::from_secs(15);
19
20/// Aggregate links in the at-mosphere
21#[derive(Parser, Debug)]
22#[command(version, about, long_about = None)]
23struct Args {
24 #[arg(short, long)]
25 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
26 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
27 #[arg(short, long)]
28 jetstream: String,
29 // TODO: make this part of rocks' own sub-config?
30 /// Where to store data on disk, for backends that use disk storage
31 #[arg(short, long)]
32 data: Option<PathBuf>,
33 /// Storage backend to use
34 #[arg(short, long)]
35 #[clap(value_enum, default_value_t = StorageBackend::Memory)]
36 backend: StorageBackend,
37 /// Initiate a database backup into this dir, if supported by the storage
38 #[arg(long)]
39 backup: Option<PathBuf>,
40 /// Start a background task to take backups every N hours
41 #[arg(long)]
42 backup_interval: Option<u64>,
43 /// Keep at most this many backups purging oldest first, requires --backup-interval
44 #[arg(long)]
45 max_old_backups: Option<usize>,
46 /// Saved jsonl from jetstream to use instead of a live subscription
47 #[arg(short, long)]
48 fixture: Option<PathBuf>,
49}
50
51#[derive(Debug, Clone, ValueEnum)]
52enum StorageBackend {
53 Memory,
54 #[cfg(feature = "rocks")]
55 Rocks,
56}
57
58fn jetstream_url(provided: &str) -> String {
59 match provided {
60 "us-east-1" => "wss://jetstream1.us-east.bsky.network/subscribe".into(),
61 "us-east-2" => "wss://jetstream2.us-east.bsky.network/subscribe".into(),
62 "us-west-1" => "wss://jetstream1.us-west.bsky.network/subscribe".into(),
63 "us-west-2" => "wss://jetstream2.us-west.bsky.network/subscribe".into(),
64 custom => custom.into(),
65 }
66}
67
68fn main() -> Result<()> {
69 let args = Args::parse();
70
71 println!("starting with storage backend: {:?}...", args.backend);
72
73 let fixture = args.fixture;
74 if let Some(ref p) = fixture {
75 println!("using fixture at {p:?}...");
76 }
77
78 let stream = jetstream_url(&args.jetstream);
79 println!("using jetstream server {stream:?}...",);
80
81 let stay_alive = CancellationToken::new();
82
83 match args.backend {
84 StorageBackend::Memory => run(MemStorage::new(), fixture, None, stream, stay_alive),
85 #[cfg(feature = "rocks")]
86 StorageBackend::Rocks => {
87 let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
88 println!("starting rocksdb...");
89 let mut rocks = RocksStorage::new(storage_dir)?;
90 if let Some(backup_dir) = args.backup {
91 let auto_backup = match (args.backup_interval, args.max_old_backups) {
92 (Some(interval_hrs), copies) => Some((interval_hrs, copies)),
93 (None, None) => None,
94 (None, Some(_)) => bail!("invalid backup config: --max-old-backups requires --backup-interval to be configured"),
95 };
96 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
97 }
98 println!("rocks ready.");
99 run(rocks, fixture, args.data, stream, stay_alive)
100 }
101 }
102}
103
104fn run(
105 mut storage: impl LinkStorage,
106 fixture: Option<PathBuf>,
107 data_dir: Option<PathBuf>,
108 stream: String,
109 stay_alive: CancellationToken,
110) -> Result<()> {
111 ctrlc::set_handler({
112 let mut desperation: u8 = 0;
113 let stay_alive = stay_alive.clone();
114 move || match desperation {
115 0 => {
116 println!("ok, shutting down...");
117 stay_alive.cancel();
118 desperation += 1;
119 }
120 1.. => panic!("fine, panicking!"),
121 }
122 })?;
123
124 let qsize = Arc::new(AtomicU32::new(0));
125
126 thread::scope(|s| {
127 let readable = storage.to_readable();
128
129 s.spawn({
130 let qsize = qsize.clone();
131 let stay_alive = stay_alive.clone();
132 let staying_alive = stay_alive.clone();
133 move || {
134 if let Err(e) = consume(storage, qsize, fixture, stream, staying_alive) {
135 eprintln!("jetstream finished with error: {e}");
136 }
137 stay_alive.drop_guard();
138 }
139 });
140
141 s.spawn({
142 let readable = readable.clone();
143 let stay_alive = stay_alive.clone();
144 let staying_alive = stay_alive.clone();
145 || {
146 runtime::Builder::new_multi_thread()
147 .worker_threads(1)
148 .max_blocking_threads(2)
149 .enable_all()
150 .build()
151 .expect("axum startup")
152 .block_on(async {
153 install_metrics_server()?;
154 serve(readable, "0.0.0.0:6789", staying_alive).await
155 })
156 .unwrap();
157 stay_alive.drop_guard();
158 }
159 });
160
161 s.spawn(move || { // monitor thread
162 let stay_alive = stay_alive.clone();
163 let check_alive = stay_alive.clone();
164
165 let process_collector = metrics_process::Collector::default();
166 process_collector.describe();
167 metrics::describe_gauge!(
168 "storage_available",
169 metrics::Unit::Bytes,
170 "available to be allocated"
171 );
172 metrics::describe_gauge!(
173 "storage_free",
174 metrics::Unit::Bytes,
175 "unused bytes in filesystem"
176 );
177 if let Some(ref p) = data_dir {
178 if let Err(e) = fs4::available_space(p) {
179 eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}");
180 } else {
181 println!("disk space monitoring should work, watching at {p:?}");
182 }
183 }
184
185 'monitor: loop {
186 match readable.get_stats() {
187 Ok(StorageStats { dids, targetables, linking_records }) => {
188 metrics::gauge!("storage.stats.dids").set(dids as f64);
189 metrics::gauge!("storage.stats.targetables").set(targetables as f64);
190 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
191 }
192 Err(e) => eprintln!("failed to get stats: {e:?}"),
193 }
194
195 process_collector.collect();
196 if let Some(ref p) = data_dir {
197 if let Ok(avail) = fs4::available_space(p) {
198 metrics::gauge!("storage.available").set(avail as f64);
199 }
200 if let Ok(free) = fs4::free_space(p) {
201 metrics::gauge!("storage.free").set(free as f64);
202 }
203 }
204 let wait = time::Instant::now();
205 while wait.elapsed() < MONITOR_INTERVAL {
206 thread::sleep(time::Duration::from_millis(100));
207 if check_alive.is_cancelled() {
208 break 'monitor
209 }
210 }
211 }
212 stay_alive.drop_guard();
213 });
214 });
215
216 println!("byeeee");
217
218 Ok(())
219}
220
221fn install_metrics_server() -> Result<()> {
222 println!("installing metrics server...");
223 let host = [0, 0, 0, 0];
224 let port = 8765;
225 PrometheusBuilder::new()
226 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
227 .set_bucket_duration(time::Duration::from_secs(30))?
228 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
229 .set_enable_unit_suffix(true)
230 .with_http_listener((host, port))
231 .install()?;
232 println!(
233 "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
234 host[0], host[1], host[2], host[3]
235 );
236 Ok(())
237}
238
239#[cfg(test)]
240mod tests {
241 use constellation::consumer::get_actionable;
242 use constellation::storage::{LinkReader, LinkStorage, MemStorage};
243
244 #[test]
245 fn test_create_like_integrated() {
246 let mut storage = MemStorage::new();
247
248 let rec = r#"{
249 "did":"did:plc:icprmty6ticzracr5urz4uum",
250 "time_us":1736448492661668,
251 "kind":"commit",
252 "commit":{"rev":"3lfddpt5qa62c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lfddpt5djw2c","record":{
253 "$type":"app.bsky.feed.like",
254 "createdAt":"2025-01-09T18:48:10.412Z",
255 "subject":{"cid":"bafyreihazf62qvmusup55ojhkzwbmzee6rxtsug3e6eg33mnjrgthxvozu","uri":"at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23"}
256 },
257 "cid":"bafyreidgcs2id7nsbp6co42ind2wcig3riwcvypwan6xdywyfqklovhdjq"}
258 }"#.parse().unwrap();
259 let (action, ts) = get_actionable(&rec).unwrap();
260 storage.push(&action, ts).unwrap();
261 assert_eq!(
262 storage
263 .get_count(
264 "at://did:plc:lphckw3dz4mnh3ogmfpdgt6z/app.bsky.feed.post/3lfdau5f7wk23",
265 "app.bsky.feed.like",
266 ".subject.uri"
267 )
268 .unwrap(),
269 1
270 );
271 }
272}