Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::ClientMessage; 2use crate::error::ServerError; 3use crate::subscriber::Subscriber; 4use dropshot::{ 5 ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging, 6 ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext, 7 ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint, 8}; 9use http::{ 10 Response, StatusCode, 11 header::{ORIGIN, USER_AGENT}, 12}; 13use metrics::{counter, histogram}; 14use std::sync::Arc; 15 16use async_trait::async_trait; 17use schemars::JsonSchema; 18use serde::{Deserialize, Serialize}; 19use std::collections::HashSet; 20use tokio::sync::broadcast; 21use tokio::time::Instant; 22use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig}; 23use tokio_util::sync::CancellationToken; 24 25const INDEX_HTML: &str = include_str!("../static/index.html"); 26const FAVICON: &[u8] = include_bytes!("../static/favicon.ico"); 27 28pub async fn serve( 29 b: broadcast::Sender<Arc<ClientMessage>>, 30 d: broadcast::Sender<Arc<ClientMessage>>, 31 shutdown: CancellationToken, 32) -> Result<(), ServerError> { 33 let config_logging = ConfigLogging::StderrTerminal { 34 level: ConfigLoggingLevel::Info, 35 }; 36 37 let log = config_logging 38 .to_logger("example-basic") 39 .map_err(ServerError::ConfigLogError)?; 40 41 let mut api = ApiDescription::new(); 42 api.register(index).unwrap(); 43 api.register(favicon).unwrap(); 44 api.register(openapi).unwrap(); 45 api.register(subscribe).unwrap(); 46 47 // TODO: put spec in a once cell / lazy lock thing? 48 let spec = Arc::new( 49 api.openapi( 50 "Spacedust", 51 env!("CARGO_PKG_VERSION") 52 .parse() 53 .inspect_err(|e| { 54 eprintln!("failed to parse cargo package version for openapi: {e:?}") 55 }) 56 .unwrap_or(semver::Version::new(0, 0, 1)), 57 ) 58 .description("A configurable ATProto notifications firehose.") 59 .contact_name("part of @microcosm.blue") 60 .contact_url("https://microcosm.blue") 61 .json() 62 .map_err(ServerError::OpenApiJsonFail)?, 63 ); 64 65 let sub_shutdown = shutdown.clone(); 66 let ctx = Context { 67 spec, 68 b, 69 d, 70 shutdown: sub_shutdown, 71 }; 72 73 let server = ServerBuilder::new(api, ctx, log) 74 .config(ConfigDropshot { 75 bind_address: "0.0.0.0:9998".parse().unwrap(), 76 ..Default::default() 77 }) 78 .start()?; 79 80 tokio::select! { 81 s = server.wait_for_shutdown() => { 82 s.map_err(ServerError::ServerExited)?; 83 log::info!("server shut down normally."); 84 }, 85 _ = shutdown.cancelled() => { 86 log::info!("shutting down: closing server"); 87 server.close().await.map_err(ServerError::BadClose)?; 88 }, 89 } 90 Ok(()) 91} 92 93#[derive(Debug, Clone)] 94struct Context { 95 pub spec: Arc<serde_json::Value>, 96 pub b: broadcast::Sender<Arc<ClientMessage>>, 97 pub d: broadcast::Sender<Arc<ClientMessage>>, 98 pub shutdown: CancellationToken, 99} 100 101async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError> 102where 103 R: HttpResponse, 104 H: Future<Output = Result<R, HttpError>>, 105 T: ServerContext, 106{ 107 let start = Instant::now(); 108 let result = handler.await; 109 let latency = start.elapsed(); 110 let status_code = match &result { 111 Ok(response) => response.status_code(), 112 Err(e) => e.status_code.as_status(), 113 } 114 .as_str() // just the number (.to_string()'s Display does eg `200 OK`) 115 .to_string(); 116 let endpoint = ctx.endpoint.operation_id.clone(); 117 let headers = ctx.request.headers(); 118 let origin = headers 119 .get(ORIGIN) 120 .and_then(|v| v.to_str().ok()) 121 .unwrap_or("") 122 .to_string(); 123 let ua = headers 124 .get(USER_AGENT) 125 .and_then(|v| v.to_str().ok()) 126 .map(|ua| { 127 if ua.starts_with("Mozilla/5.0 ") { 128 "browser" 129 } else { 130 ua 131 } 132 }) 133 .unwrap_or("") 134 .to_string(); 135 counter!("server_requests_total", 136 "endpoint" => endpoint.clone(), 137 "origin" => origin, 138 "ua" => ua, 139 "status_code" => status_code, 140 ) 141 .increment(1); 142 histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64); 143 result 144} 145 146use dropshot::{HttpResponseHeaders, HttpResponseOk}; 147 148pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>; 149 150/// Helper for constructing Ok responses: return OkCors(T).into() 151/// (not happy with this yet) 152pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T); 153 154impl<T> From<OkCors<T>> for OkCorsResponse<T> 155where 156 T: Serialize + JsonSchema + Send + Sync, 157{ 158 fn from(ok: OkCors<T>) -> OkCorsResponse<T> { 159 let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0)); 160 res.headers_mut() 161 .insert("access-control-allow-origin", "*".parse().unwrap()); 162 Ok(res) 163 } 164} 165 166// TODO: cors for HttpError 167 168/// Serve index page as html 169#[endpoint { 170 method = GET, 171 path = "/", 172 /* 173 * not useful to have this in openapi 174 */ 175 unpublished = true, 176}] 177async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 178 instrument_handler(&ctx, async { 179 Ok(Response::builder() 180 .status(StatusCode::OK) 181 .header(http::header::CONTENT_TYPE, "text/html") 182 .body(INDEX_HTML.into())?) 183 }) 184 .await 185} 186 187/// Serve index page as html 188#[endpoint { 189 method = GET, 190 path = "/favicon.ico", 191 /* 192 * not useful to have this in openapi 193 */ 194 unpublished = true, 195}] 196async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 197 instrument_handler(&ctx, async { 198 Ok(Response::builder() 199 .status(StatusCode::OK) 200 .header(http::header::CONTENT_TYPE, "image/x-icon") 201 .body(FAVICON.to_vec().into())?) 202 }) 203 .await 204} 205 206/// Meta: get the openapi spec for this api 207#[endpoint { 208 method = GET, 209 path = "/openapi", 210 /* 211 * not useful to have this in openapi 212 */ 213 unpublished = true, 214}] 215async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> { 216 instrument_handler(&ctx, async { 217 let spec = (*ctx.context().spec).clone(); 218 OkCors(spec).into() 219 }) 220 .await 221} 222 223/// The real type that gets deserialized 224#[derive(Debug, Deserialize, JsonSchema)] 225#[serde(rename_all = "camelCase")] 226pub struct MultiSubscribeQuery { 227 #[serde(default)] 228 pub wanted_subjects: HashSet<String>, 229 #[serde(default)] 230 pub wanted_subject_prefixes: HashSet<String>, 231 #[serde(default)] 232 pub wanted_subject_dids: HashSet<String>, 233 #[serde(default)] 234 pub wanted_sources: HashSet<String>, 235} 236/// The fake corresponding type for docs that dropshot won't freak out about a 237/// vec for 238#[derive(Deserialize, JsonSchema)] 239#[allow(dead_code)] 240#[serde(rename_all = "camelCase")] 241struct MultiSubscribeQueryForDocs { 242 /// One or more at-uris to receive links about 243 /// 244 /// The at-uri must be url-encoded 245 /// 246 /// Pass this parameter multiple times to specify multiple subjects, like 247 /// `wantedSubjects=[...]&wantedSubjects=[...]` 248 pub wanted_subjects: String, 249 /// One or more at-uri, URI, or DID prefixes to receive links about 250 /// 251 /// The uri must be url-encoded 252 /// 253 /// Pass this parameter multiple times to specify multiple prefixes, like 254 /// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]` 255 pub wanted_subject_prefixes: String, 256 /// One or more DIDs to receive links about 257 /// 258 /// Pass this parameter multiple times to specify multiple subjects 259 pub wanted_subject_dids: String, 260 /// One or more link sources to receive links about 261 /// 262 /// TODO: docs about link sources 263 /// 264 /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri` 265 /// 266 /// Pass this parameter multiple times to specify multiple sources 267 pub wanted_sources: String, 268} 269 270// The `SharedExtractor` implementation for Query<QueryType> describes how to 271// construct an instance of `Query<QueryType>` from an HTTP request: namely, by 272// parsing the query string to an instance of `QueryType`. 273#[async_trait] 274impl SharedExtractor for MultiSubscribeQuery { 275 async fn from_request<Context: ServerContext>( 276 ctx: &RequestContext<Context>, 277 ) -> Result<MultiSubscribeQuery, HttpError> { 278 let raw_query = ctx.request.uri().query().unwrap_or(""); 279 let q = serde_qs::from_str(raw_query).map_err(|e| { 280 HttpError::for_bad_request(None, format!("unable to parse query string: {e}")) 281 })?; 282 Ok(q) 283 } 284 285 fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata { 286 // HACK: query type switcheroo: passing MultiSubscribeQuery to 287 // `metadata` would "helpfully" panic because dropshot believes we can 288 // only have scalar types in a query. 289 // 290 // so instead we have a fake second type whose only job is to look the 291 // same as MultiSubscribeQuery exept that it has `String` instead of 292 // `Vec<String>`, which dropshot will accept, and generate ~close-enough 293 // docs for. 294 <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type) 295 } 296} 297 298#[derive(Deserialize, JsonSchema)] 299#[serde(rename_all = "camelCase")] 300struct ScalarSubscribeQuery { 301 /// Bypass the 21-sec delay buffer 302 /// 303 /// By default, spacedust holds all firehose links for 21 seconds before 304 /// emitting them, to prevent quickly- undone interactions from generating 305 /// notifications. 306 /// 307 /// Setting `instant` to true bypasses this buffer, allowing faster (and 308 /// noisier) notification delivery. 309 /// 310 /// Typically [a little less than 1%](https://bsky.app/profile/bad-example.com/post/3ls32wctsrs2l) 311 /// of links links get deleted within 21s of being created. 312 #[serde(default)] 313 pub instant: bool, 314} 315 316#[channel { 317 protocol = WEBSOCKETS, 318 path = "/subscribe", 319}] 320async fn subscribe( 321 reqctx: RequestContext<Context>, 322 query: MultiSubscribeQuery, 323 scalar_query: Query<ScalarSubscribeQuery>, 324 upgraded: WebsocketConnection, 325) -> dropshot::WebsocketChannelResult { 326 let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( 327 upgraded.into_inner(), 328 Role::Server, 329 Some(WebSocketConfig::default().max_message_size( 330 Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream 331 )), 332 ) 333 .await; 334 335 let Context { b, d, shutdown, .. } = reqctx.context(); 336 let sub_token = shutdown.child_token(); 337 338 let q = scalar_query.into_inner(); 339 let subscription = if q.instant { b } else { d }.subscribe(); 340 log::info!("starting subscriber with broadcast: instant={}", q.instant); 341 342 Subscriber::new(query, sub_token) 343 .start(ws, subscription) 344 .await 345 .map_err(|e| format!("boo: {e:?}"))?; 346 347 Ok(()) 348}