Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::ClientMessage;
2use crate::error::ServerError;
3use crate::subscriber::Subscriber;
4use dropshot::{
5 ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging,
6 ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext,
7 ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint,
8};
9use http::{
10 Response, StatusCode,
11 header::{ORIGIN, USER_AGENT},
12};
13use metrics::{counter, histogram};
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use schemars::JsonSchema;
18use serde::{Deserialize, Serialize};
19use std::collections::HashSet;
20use tokio::sync::broadcast;
21use tokio::time::Instant;
22use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig};
23use tokio_util::sync::CancellationToken;
24
25const INDEX_HTML: &str = include_str!("../static/index.html");
26const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
27
28pub async fn serve(
29 b: broadcast::Sender<Arc<ClientMessage>>,
30 d: broadcast::Sender<Arc<ClientMessage>>,
31 shutdown: CancellationToken,
32) -> Result<(), ServerError> {
33 let config_logging = ConfigLogging::StderrTerminal {
34 level: ConfigLoggingLevel::Info,
35 };
36
37 let log = config_logging
38 .to_logger("example-basic")
39 .map_err(ServerError::ConfigLogError)?;
40
41 let mut api = ApiDescription::new();
42 api.register(index).unwrap();
43 api.register(favicon).unwrap();
44 api.register(openapi).unwrap();
45 api.register(subscribe).unwrap();
46
47 // TODO: put spec in a once cell / lazy lock thing?
48 let spec = Arc::new(
49 api.openapi(
50 "Spacedust",
51 env!("CARGO_PKG_VERSION")
52 .parse()
53 .inspect_err(|e| {
54 eprintln!("failed to parse cargo package version for openapi: {e:?}")
55 })
56 .unwrap_or(semver::Version::new(0, 0, 1)),
57 )
58 .description("A configurable ATProto notifications firehose.")
59 .contact_name("part of @microcosm.blue")
60 .contact_url("https://microcosm.blue")
61 .json()
62 .map_err(ServerError::OpenApiJsonFail)?,
63 );
64
65 let sub_shutdown = shutdown.clone();
66 let ctx = Context {
67 spec,
68 b,
69 d,
70 shutdown: sub_shutdown,
71 };
72
73 let server = ServerBuilder::new(api, ctx, log)
74 .config(ConfigDropshot {
75 bind_address: "0.0.0.0:9998".parse().unwrap(),
76 ..Default::default()
77 })
78 .start()?;
79
80 tokio::select! {
81 s = server.wait_for_shutdown() => {
82 s.map_err(ServerError::ServerExited)?;
83 log::info!("server shut down normally.");
84 },
85 _ = shutdown.cancelled() => {
86 log::info!("shutting down: closing server");
87 server.close().await.map_err(ServerError::BadClose)?;
88 },
89 }
90 Ok(())
91}
92
93#[derive(Debug, Clone)]
94struct Context {
95 pub spec: Arc<serde_json::Value>,
96 pub b: broadcast::Sender<Arc<ClientMessage>>,
97 pub d: broadcast::Sender<Arc<ClientMessage>>,
98 pub shutdown: CancellationToken,
99}
100
101async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
102where
103 R: HttpResponse,
104 H: Future<Output = Result<R, HttpError>>,
105 T: ServerContext,
106{
107 let start = Instant::now();
108 let result = handler.await;
109 let latency = start.elapsed();
110 let status_code = match &result {
111 Ok(response) => response.status_code(),
112 Err(e) => e.status_code.as_status(),
113 }
114 .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
115 .to_string();
116 let endpoint = ctx.endpoint.operation_id.clone();
117 let headers = ctx.request.headers();
118 let origin = headers
119 .get(ORIGIN)
120 .and_then(|v| v.to_str().ok())
121 .unwrap_or("")
122 .to_string();
123 let ua = headers
124 .get(USER_AGENT)
125 .and_then(|v| v.to_str().ok())
126 .map(|ua| {
127 if ua.starts_with("Mozilla/5.0 ") {
128 "browser"
129 } else {
130 ua
131 }
132 })
133 .unwrap_or("")
134 .to_string();
135 counter!("server_requests_total",
136 "endpoint" => endpoint.clone(),
137 "origin" => origin,
138 "ua" => ua,
139 "status_code" => status_code,
140 )
141 .increment(1);
142 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
143 result
144}
145
146use dropshot::{HttpResponseHeaders, HttpResponseOk};
147
148pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
149
150/// Helper for constructing Ok responses: return OkCors(T).into()
151/// (not happy with this yet)
152pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
153
154impl<T> From<OkCors<T>> for OkCorsResponse<T>
155where
156 T: Serialize + JsonSchema + Send + Sync,
157{
158 fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
159 let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
160 res.headers_mut()
161 .insert("access-control-allow-origin", "*".parse().unwrap());
162 Ok(res)
163 }
164}
165
166// TODO: cors for HttpError
167
168/// Serve index page as html
169#[endpoint {
170 method = GET,
171 path = "/",
172 /*
173 * not useful to have this in openapi
174 */
175 unpublished = true,
176}]
177async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
178 instrument_handler(&ctx, async {
179 Ok(Response::builder()
180 .status(StatusCode::OK)
181 .header(http::header::CONTENT_TYPE, "text/html")
182 .body(INDEX_HTML.into())?)
183 })
184 .await
185}
186
187/// Serve index page as html
188#[endpoint {
189 method = GET,
190 path = "/favicon.ico",
191 /*
192 * not useful to have this in openapi
193 */
194 unpublished = true,
195}]
196async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
197 instrument_handler(&ctx, async {
198 Ok(Response::builder()
199 .status(StatusCode::OK)
200 .header(http::header::CONTENT_TYPE, "image/x-icon")
201 .body(FAVICON.to_vec().into())?)
202 })
203 .await
204}
205
206/// Meta: get the openapi spec for this api
207#[endpoint {
208 method = GET,
209 path = "/openapi",
210 /*
211 * not useful to have this in openapi
212 */
213 unpublished = true,
214}]
215async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
216 instrument_handler(&ctx, async {
217 let spec = (*ctx.context().spec).clone();
218 OkCors(spec).into()
219 })
220 .await
221}
222
223/// The real type that gets deserialized
224#[derive(Debug, Deserialize, JsonSchema)]
225#[serde(rename_all = "camelCase")]
226pub struct MultiSubscribeQuery {
227 #[serde(default)]
228 pub wanted_subjects: HashSet<String>,
229 #[serde(default)]
230 pub wanted_subject_prefixes: HashSet<String>,
231 #[serde(default)]
232 pub wanted_subject_dids: HashSet<String>,
233 #[serde(default)]
234 pub wanted_sources: HashSet<String>,
235}
236/// The fake corresponding type for docs that dropshot won't freak out about a
237/// vec for
238#[derive(Deserialize, JsonSchema)]
239#[allow(dead_code)]
240#[serde(rename_all = "camelCase")]
241struct MultiSubscribeQueryForDocs {
242 /// One or more at-uris to receive links about
243 ///
244 /// The at-uri must be url-encoded
245 ///
246 /// Pass this parameter multiple times to specify multiple subjects, like
247 /// `wantedSubjects=[...]&wantedSubjects=[...]`
248 pub wanted_subjects: String,
249 /// One or more at-uri, URI, or DID prefixes to receive links about
250 ///
251 /// The uri must be url-encoded
252 ///
253 /// Pass this parameter multiple times to specify multiple prefixes, like
254 /// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]`
255 pub wanted_subject_prefixes: String,
256 /// One or more DIDs to receive links about
257 ///
258 /// Pass this parameter multiple times to specify multiple subjects
259 pub wanted_subject_dids: String,
260 /// One or more link sources to receive links about
261 ///
262 /// TODO: docs about link sources
263 ///
264 /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri`
265 ///
266 /// Pass this parameter multiple times to specify multiple sources
267 pub wanted_sources: String,
268}
269
270// The `SharedExtractor` implementation for Query<QueryType> describes how to
271// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
272// parsing the query string to an instance of `QueryType`.
273#[async_trait]
274impl SharedExtractor for MultiSubscribeQuery {
275 async fn from_request<Context: ServerContext>(
276 ctx: &RequestContext<Context>,
277 ) -> Result<MultiSubscribeQuery, HttpError> {
278 let raw_query = ctx.request.uri().query().unwrap_or("");
279 let q = serde_qs::from_str(raw_query).map_err(|e| {
280 HttpError::for_bad_request(None, format!("unable to parse query string: {e}"))
281 })?;
282 Ok(q)
283 }
284
285 fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
286 // HACK: query type switcheroo: passing MultiSubscribeQuery to
287 // `metadata` would "helpfully" panic because dropshot believes we can
288 // only have scalar types in a query.
289 //
290 // so instead we have a fake second type whose only job is to look the
291 // same as MultiSubscribeQuery exept that it has `String` instead of
292 // `Vec<String>`, which dropshot will accept, and generate ~close-enough
293 // docs for.
294 <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type)
295 }
296}
297
298#[derive(Deserialize, JsonSchema)]
299#[serde(rename_all = "camelCase")]
300struct ScalarSubscribeQuery {
301 /// Bypass the 21-sec delay buffer
302 ///
303 /// By default, spacedust holds all firehose links for 21 seconds before
304 /// emitting them, to prevent quickly- undone interactions from generating
305 /// notifications.
306 ///
307 /// Setting `instant` to true bypasses this buffer, allowing faster (and
308 /// noisier) notification delivery.
309 ///
310 /// Typically [a little less than 1%](https://bsky.app/profile/bad-example.com/post/3ls32wctsrs2l)
311 /// of links links get deleted within 21s of being created.
312 #[serde(default)]
313 pub instant: bool,
314}
315
316#[channel {
317 protocol = WEBSOCKETS,
318 path = "/subscribe",
319}]
320async fn subscribe(
321 reqctx: RequestContext<Context>,
322 query: MultiSubscribeQuery,
323 scalar_query: Query<ScalarSubscribeQuery>,
324 upgraded: WebsocketConnection,
325) -> dropshot::WebsocketChannelResult {
326 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
327 upgraded.into_inner(),
328 Role::Server,
329 Some(WebSocketConfig::default().max_message_size(
330 Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream
331 )),
332 )
333 .await;
334
335 let Context { b, d, shutdown, .. } = reqctx.context();
336 let sub_token = shutdown.child_token();
337
338 let q = scalar_query.into_inner();
339 let subscription = if q.instant { b } else { d }.subscribe();
340 log::info!("starting subscriber with broadcast: instant={}", q.instant);
341
342 Subscriber::new(query, sub_token)
343 .start(ws, subscription)
344 .await
345 .map_err(|e| format!("boo: {e:?}"))?;
346
347 Ok(())
348}