Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::{ 2 CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 3 error::{RecordError, ServerError}, 4 proxy::{extract_links, MatchedRef}, 5 record::RawRecord, 6}; 7use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 8use foyer::HybridCache; 9use links::at_uri::parse_at_uri as normalize_at_uri; 10use serde::Serialize; 11use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap}; 12use tokio::sync::mpsc; 13use tokio_util::sync::CancellationToken; 14 15use poem::{ 16 Endpoint, EndpointExt, IntoResponse, Route, Server, 17 endpoint::{StaticFileEndpoint, make_sync}, 18 http::Method, 19 listener::{ 20 Listener, TcpListener, 21 acme::{AutoCert, LETS_ENCRYPT_PRODUCTION}, 22 }, 23 middleware::{CatchPanic, Cors, Tracing}, 24}; 25use poem_openapi::{ 26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags, 27 Union, 28 param::Query, payload::Json, types::Example, 29}; 30 31fn example_handle() -> String { 32 "bad-example.com".to_string() 33} 34fn example_did() -> String { 35 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string() 36} 37fn example_collection() -> String { 38 "app.bsky.feed.like".to_string() 39} 40fn example_rkey() -> String { 41 "3lv4ouczo2b2a".to_string() 42} 43fn example_uri() -> String { 44 format!( 45 "at://{}/{}/{}", 46 example_did(), 47 example_collection(), 48 example_rkey() 49 ) 50} 51fn example_pds() -> String { 52 "https://porcini.us-east.host.bsky.network".to_string() 53} 54fn example_signing_key() -> String { 55 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string() 56} 57 58#[derive(Debug, Object)] 59#[oai(example = true)] 60struct XrpcErrorResponseObject { 61 /// Should correspond an error `name` in the lexicon errors array 62 error: String, 63 /// Human-readable description and possibly additonal context 64 message: String, 65} 66impl Example for XrpcErrorResponseObject { 67 fn example() -> Self { 68 Self { 69 error: "RecordNotFound".to_string(), 70 message: "This record was deleted".to_string(), 71 } 72 } 73} 74type XrpcError = Json<XrpcErrorResponseObject>; 75fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError { 76 Json(XrpcErrorResponseObject { 77 error: error.as_ref().to_string(), 78 message: message.as_ref().to_string(), 79 }) 80} 81 82fn bad_request_handler_get_record(err: poem::Error) -> GetRecordResponse { 83 GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 84 error: "InvalidRequest".to_string(), 85 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 86 })) 87} 88 89fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse { 90 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject { 91 error: "InvalidRequest".to_string(), 92 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 93 })) 94} 95 96fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse { 97 ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject { 98 error: "InvalidRequest".to_string(), 99 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 100 })) 101} 102 103fn bad_request_handler_resolve_handle(err: poem::Error) -> JustDidResponse { 104 JustDidResponse::BadRequest(Json(XrpcErrorResponseObject { 105 error: "InvalidRequest".to_string(), 106 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 107 })) 108} 109 110#[derive(Object)] 111#[oai(example = true)] 112struct FoundRecordResponseObject { 113 /// at-uri for this record 114 uri: String, 115 /// CID for this exact version of the record 116 /// 117 /// Slingshot will always return the CID, despite it not being a required 118 /// response property in the official lexicon. 119 /// 120 /// TODO: probably actually let it be optional, idk are some pds's weirdly 121 /// not returning it? 122 cid: Option<String>, 123 /// the record itself as JSON 124 value: serde_json::Value, 125} 126impl Example for FoundRecordResponseObject { 127 fn example() -> Self { 128 Self { 129 uri: example_uri(), 130 cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()), 131 value: serde_json::json!({ 132 "$type": "app.bsky.feed.like", 133 "createdAt": "2025-07-29T18:02:02.327Z", 134 "subject": { 135 "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy", 136 "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k" 137 } 138 }), 139 } 140 } 141} 142 143#[derive(ApiResponse)] 144#[oai(bad_request_handler = "bad_request_handler_get_record")] 145enum GetRecordResponse { 146 /// Record found 147 #[oai(status = 200)] 148 Ok(Json<FoundRecordResponseObject>), 149 /// Bad request or no record to return 150 /// 151 /// The only error name in the repo.getRecord lexicon is `RecordNotFound`, 152 /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 153 /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of 154 /// these, slingshot will only generate `RecordNotFound` or `InvalidRequest`, 155 /// but may return any proxied error code from the upstream repo. 156 #[oai(status = 400)] 157 BadRequest(XrpcError), 158 /// Server errors 159 #[oai(status = 500)] 160 ServerError(XrpcError), 161} 162 163#[derive(Object)] 164#[oai(example = true)] 165struct MiniDocResponseObject { 166 /// DID, bi-directionally verified if a handle was provided in the query. 167 did: String, 168 /// The validated handle of the account or `handle.invalid` if the handle 169 /// did not bi-directionally match the DID document. 170 handle: String, 171 /// The identity's PDS URL 172 pds: String, 173 /// The atproto signing key publicKeyMultibase 174 /// 175 /// Legacy key encoding not supported. the key is returned directly; `id`, 176 /// `type`, and `controller` are omitted. 177 signing_key: String, 178} 179impl Example for MiniDocResponseObject { 180 fn example() -> Self { 181 Self { 182 did: example_did(), 183 handle: example_handle(), 184 pds: example_pds(), 185 signing_key: example_signing_key(), 186 } 187 } 188} 189 190#[derive(ApiResponse)] 191#[oai(bad_request_handler = "bad_request_handler_resolve_mini")] 192enum ResolveMiniIDResponse { 193 /// Identity resolved 194 #[oai(status = 200)] 195 Ok(Json<MiniDocResponseObject>), 196 /// Bad request or identity not resolved 197 #[oai(status = 400)] 198 BadRequest(XrpcError), 199} 200 201#[derive(Object)] 202struct ProxyHydrationError { 203 reason: String, 204} 205 206#[derive(Object)] 207struct ProxyHydrationPending { 208 url: String, 209} 210 211#[derive(Object)] 212struct ProxyHydrationRecordFound { 213 record: serde_json::Value, 214} 215 216#[derive(Object)] 217struct ProxyHydrationIdentifierFound { 218 mini_doc: MiniDocResponseObject, 219} 220 221#[derive(Object)] 222#[oai(rename_all = "camelCase")] 223struct ProxyHydrationBlobFound { 224 /// cdn url 225 link: String, 226 mime_type: String, 227 size: u64, 228} 229 230// todo: there's gotta be a supertrait that collects these? 231use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType}; 232 233#[derive(Union)] 234#[oai(discriminator_name = "status", rename_all = "camelCase")] 235enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> { 236 Error(ProxyHydrationError), 237 Pending(ProxyHydrationPending), 238 Found(T), 239} 240 241#[derive(Object)] 242#[oai(example = true)] 243struct ProxyHydrateResponseObject { 244 /// The original upstream response content 245 output: serde_json::Value, 246 /// Any hydrated records 247 records: HashMap<String, Hydration<ProxyHydrationRecordFound>>, 248 /// Any hydrated identifiers 249 /// 250 /// TODO: "identifiers" feels wrong as the name, probably "identities"? 251 identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>, 252 /// Any hydrated blob CDN urls 253 blobs: HashMap<String, Hydration<ProxyHydrationBlobFound>>, 254} 255impl Example for ProxyHydrateResponseObject { 256 fn example() -> Self { 257 Self { 258 output: serde_json::json!({}), 259 records: HashMap::from([ 260 ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })), 261 ]), 262 identifiers: HashMap::new(), 263 blobs: HashMap::new(), 264 } 265 } 266} 267 268#[derive(ApiResponse)] 269#[oai(bad_request_handler = "bad_request_handler_proxy_query")] 270enum ProxyHydrateResponse { 271 #[oai(status = 200)] 272 Ok(Json<ProxyHydrateResponseObject>), 273 #[oai(status = 400)] 274 BadRequest(XrpcError) 275} 276 277#[derive(Object)] 278pub struct HydrationSource { 279 /// Record Path syntax for locating fields 280 pub path: String, 281 /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'. 282 /// 283 /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys. 284 /// - `at-uri`: string, must have all segments present (identifier, collection, rkey) 285 /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored. 286 /// - `did`: string, `did` format 287 /// - `handle`: string, `handle` format 288 /// - `at-identifier`: string, `did` or `handle` format 289 pub shape: String, 290} 291 292#[derive(Object)] 293#[oai(example = true)] 294struct ProxyQueryPayload { 295 /// The NSID of the XRPC you wish to forward 296 xrpc: String, 297 /// The destination service the request will be forwarded to 298 atproto_proxy: String, 299 /// The `params` for the destination service XRPC endpoint 300 /// 301 /// Currently this will be passed along unchecked, but a future version of 302 /// slingshot may attempt to do lexicon resolution to validate `params` 303 /// based on the upstream service 304 params: Option<serde_json::Value>, 305 /// Paths within the response to look for at-uris that can be hydrated 306 hydration_sources: Vec<HydrationSource>, 307 // todo: deadline thing 308 309} 310impl Example for ProxyQueryPayload { 311 fn example() -> Self { 312 Self { 313 xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 314 atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 315 params: Some(serde_json::json!({ 316 "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 317 })), 318 hydration_sources: vec![ 319 HydrationSource { 320 path: "feed[].post".to_string(), 321 shape: "at-uri".to_string(), 322 } 323 ], 324 } 325 } 326} 327 328#[derive(Object)] 329#[oai(example = true)] 330struct FoundDidResponseObject { 331 /// the DID, bi-directionally verified if using Slingshot 332 did: String, 333} 334impl Example for FoundDidResponseObject { 335 fn example() -> Self { 336 Self { did: example_did() } 337 } 338} 339 340#[derive(ApiResponse)] 341#[oai(bad_request_handler = "bad_request_handler_resolve_handle")] 342enum JustDidResponse { 343 /// Resolution succeeded 344 #[oai(status = 200)] 345 Ok(Json<FoundDidResponseObject>), 346 /// Bad request, failed to resolve, or failed to verify 347 /// 348 /// `error` will be one of `InvalidRequest`, `HandleNotFound`. 349 #[oai(status = 400)] 350 BadRequest(XrpcError), 351 /// Something went wrong trying to complete the request 352 #[oai(status = 500)] 353 ServerError(XrpcError), 354} 355 356struct Xrpc { 357 cache: HybridCache<String, CachedRecord>, 358 identity: Identity, 359 proxy: Arc<Proxy>, 360 repo: Arc<Repo>, 361} 362 363#[derive(Tags)] 364enum ApiTags { 365 /// Core ATProtocol-compatible APIs. 366 /// 367 /// > [!tip] 368 /// > Upstream documentation is available at 369 /// > https://docs.bsky.app/docs/category/http-reference 370 /// 371 /// These queries are usually executed directly against the PDS containing 372 /// the data being requested. Slingshot offers a caching view of the same 373 /// contents with better expected performance and reliability. 374 #[oai(rename = "com.atproto.* queries")] 375 ComAtproto, 376 /// Additional and improved APIs. 377 /// 378 /// These APIs offer small tweaks to the core ATProtocol APIs, with more 379 /// more convenient [request parameters](#tag/slingshot-specific-queries/GET/xrpc/com.bad-example.repo.getUriRecord) 380 /// or [response formats](#tag/slingshot-specific-queries/GET/xrpc/com.bad-example.identity.resolveMiniDoc). 381 /// 382 /// > [!important] 383 /// > At the moment, these are namespaced under the `com.bad-example.*` NSID 384 /// > prefix, but as they stabilize they may be migrated to an org namespace 385 /// > like `blue.microcosm.*`. Support for asliasing to `com.bad-example.*` 386 /// > will be maintained as long as it's in use. 387 #[oai(rename = "slingshot-specific queries")] 388 Custom, 389} 390 391#[OpenApi] 392impl Xrpc { 393 /// com.atproto.repo.getRecord 394 /// 395 /// Get a single record from a repository. Does not require auth. 396 /// 397 /// > [!tip] 398 /// > See also the [canonical `com.atproto` XRPC documentation](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 399 /// > that this endpoint aims to be compatible with. 400 #[oai( 401 path = "/com.atproto.repo.getRecord", 402 method = "get", 403 tag = "ApiTags::ComAtproto" 404 )] 405 async fn get_record( 406 &self, 407 /// The DID or handle of the repo 408 #[oai(example = "example_did")] 409 Query(repo): Query<String>, 410 /// The NSID of the record collection 411 #[oai(example = "example_collection")] 412 Query(collection): Query<String>, 413 /// The Record key 414 #[oai(example = "example_rkey")] 415 Query(rkey): Query<String>, 416 /// Optional: the CID of the version of the record. 417 /// 418 /// If not specified, then return the most recent version. 419 /// 420 /// If a stale `CID` is specified and a newer version of the record 421 /// exists, Slingshot returns a `NotFound` error. That is: Slingshot 422 /// only retains the most recent version of a record. 423 Query(cid): Query<Option<String>>, 424 ) -> GetRecordResponse { 425 self.get_record_impl(repo, collection, rkey, cid).await 426 } 427 428 /// blue.microcosm.repo.getRecordByUri 429 /// 430 /// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name 431 #[oai( 432 path = "/blue.microcosm.repo.getRecordByUri", 433 method = "get", 434 tag = "ApiTags::Custom" 435 )] 436 async fn get_record_by_uri( 437 &self, 438 /// The at-uri of the record 439 /// 440 /// The identifier can be a DID or an atproto handle, and the collection 441 /// and rkey segments must be present. 442 #[oai(example = "example_uri")] 443 Query(at_uri): Query<String>, 444 /// Optional: the CID of the version of the record. 445 /// 446 /// If not specified, then return the most recent version. 447 /// 448 /// > [!tip] 449 /// > If specified and a newer version of the record exists, returns 404 not 450 /// > found. That is: slingshot only retains the most recent version of a 451 /// > record. 452 Query(cid): Query<Option<String>>, 453 ) -> GetRecordResponse { 454 self.get_uri_record(Query(at_uri), Query(cid)).await 455 } 456 457 /// com.bad-example.repo.getUriRecord 458 /// 459 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record) 460 /// which accepts an `at-uri` instead of individual repo/collection/rkey params 461 #[oai( 462 path = "/com.bad-example.repo.getUriRecord", 463 method = "get", 464 tag = "ApiTags::Custom" 465 )] 466 async fn get_uri_record( 467 &self, 468 /// The at-uri of the record 469 /// 470 /// The identifier can be a DID or an atproto handle, and the collection 471 /// and rkey segments must be present. 472 #[oai(example = "example_uri")] 473 Query(at_uri): Query<String>, 474 /// Optional: the CID of the version of the record. 475 /// 476 /// If not specified, then return the most recent version. 477 /// 478 /// > [!tip] 479 /// > If specified and a newer version of the record exists, returns 404 not 480 /// > found. That is: slingshot only retains the most recent version of a 481 /// > record. 482 Query(cid): Query<Option<String>>, 483 ) -> GetRecordResponse { 484 let bad_at_uri = || { 485 GetRecordResponse::BadRequest(xrpc_error( 486 "InvalidRequest", 487 "at-uri does not appear to be valid", 488 )) 489 }; 490 491 let Some(normalized) = normalize_at_uri(&at_uri) else { 492 return bad_at_uri(); 493 }; 494 495 // TODO: move this to links 496 let Some(rest) = normalized.strip_prefix("at://") else { 497 return bad_at_uri(); 498 }; 499 let Some((repo, rest)) = rest.split_once('/') else { 500 return bad_at_uri(); 501 }; 502 let Some((collection, rest)) = rest.split_once('/') else { 503 return bad_at_uri(); 504 }; 505 let rkey = if let Some((rkey, _rest)) = rest.split_once('?') { 506 rkey 507 } else { 508 rest 509 }; 510 511 self.get_record_impl( 512 repo.to_string(), 513 collection.to_string(), 514 rkey.to_string(), 515 cid, 516 ) 517 .await 518 } 519 520 /// com.atproto.identity.resolveHandle 521 /// 522 /// Resolves an atproto [`handle`](https://atproto.com/guides/glossary#handle) 523 /// (hostname) to a [`DID`](https://atproto.com/guides/glossary#did-decentralized-id). 524 /// 525 /// > [!tip] 526 /// > Compatibility note: Slingshot will **always bi-directionally verify 527 /// > against the DID document**, which is optional according to the 528 /// > authoritative lexicon. 529 /// 530 /// > [!tip] 531 /// > See the [canonical `com.atproto` XRPC documentation](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-handle) 532 /// > that this endpoint aims to be compatible with. 533 #[oai( 534 path = "/com.atproto.identity.resolveHandle", 535 method = "get", 536 tag = "ApiTags::ComAtproto" 537 )] 538 async fn resolve_handle( 539 &self, 540 /// The handle to resolve. 541 #[oai(example = "example_handle")] 542 Query(handle): Query<String>, 543 ) -> JustDidResponse { 544 let Ok(handle) = Handle::new(handle.to_lowercase()) else { 545 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle")); 546 }; 547 548 let Ok(alleged_did) = self.identity.handle_to_did(handle.clone()).await else { 549 return JustDidResponse::ServerError(xrpc_error("Failed", "Could not resolve handle")); 550 }; 551 552 let Some(alleged_did) = alleged_did else { 553 return JustDidResponse::BadRequest(xrpc_error( 554 "HandleNotFound", 555 "Could not resolve handle to a DID", 556 )); 557 }; 558 559 let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&alleged_did).await else { 560 return JustDidResponse::ServerError(xrpc_error("Failed", "Could not fetch DID doc")); 561 }; 562 563 let Some(partial_doc) = partial_doc else { 564 return JustDidResponse::BadRequest(xrpc_error( 565 "HandleNotFound", 566 "Resolved handle but could not find DID doc for the DID", 567 )); 568 }; 569 570 if partial_doc.unverified_handle != handle { 571 return JustDidResponse::BadRequest(xrpc_error( 572 "HandleNotFound", 573 "Resolved handle failed bi-directional validation", 574 )); 575 } 576 577 JustDidResponse::Ok(Json(FoundDidResponseObject { 578 did: alleged_did.to_string(), 579 })) 580 } 581 582 /// blue.microcosm.identity.resolveMiniDoc 583 /// 584 /// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name 585 #[oai( 586 path = "/blue.microcosm.identity.resolveMiniDoc", 587 method = "get", 588 tag = "ApiTags::Custom" 589 )] 590 async fn resolve_mini_doc( 591 &self, 592 /// Handle or DID to resolve 593 #[oai(example = "example_handle")] 594 Query(identifier): Query<String>, 595 ) -> ResolveMiniIDResponse { 596 self.resolve_mini_id(Query(identifier)).await 597 } 598 599 /// com.bad-example.identity.resolveMiniDoc 600 /// 601 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity) 602 /// but instead of the full `didDoc` it returns an atproto-relevant subset. 603 #[oai( 604 path = "/com.bad-example.identity.resolveMiniDoc", 605 method = "get", 606 tag = "ApiTags::Custom" 607 )] 608 async fn resolve_mini_id( 609 &self, 610 /// Handle or DID to resolve 611 #[oai(example = "example_handle")] 612 Query(identifier): Query<String>, 613 ) -> ResolveMiniIDResponse { 614 Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await 615 } 616 617 async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse { 618 let invalid = |reason: &'static str| { 619 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 620 }; 621 622 let mut unverified_handle = None; 623 let did = match Did::new(identifier.to_string()) { 624 Ok(did) => did, 625 Err(_) => { 626 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 627 return invalid("Identifier was not a valid DID or handle"); 628 }; 629 630 match identity.handle_to_did(alleged_handle.clone()).await { 631 Ok(res) => { 632 if let Some(did) = res { 633 // we did it joe 634 unverified_handle = Some(alleged_handle); 635 did 636 } else { 637 return invalid("Could not resolve handle identifier to a DID"); 638 } 639 } 640 Err(e) => { 641 log::debug!("failed to resolve handle: {e}"); 642 // TODO: ServerError not BadRequest 643 return invalid("Errored while trying to resolve handle to DID"); 644 } 645 } 646 } 647 }; 648 let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else { 649 return invalid("Failed to get DID doc"); 650 }; 651 let Some(partial_doc) = partial_doc else { 652 return invalid("Failed to find DID doc"); 653 }; 654 655 // ok so here's where we're at: 656 // ✅ we have a DID 657 // ✅ we have a partial doc 658 // 🔶 if we have a handle, it's from the `identifier` (user-input) 659 // -> then we just need to compare to the partial doc to confirm 660 // -> else we need to resolve the DID doc's to a handle and check 661 let handle = if let Some(h) = unverified_handle { 662 if h == partial_doc.unverified_handle { 663 h.to_string() 664 } else { 665 "handle.invalid".to_string() 666 } 667 } else { 668 let Ok(handle_did) = identity 669 .handle_to_did(partial_doc.unverified_handle.clone()) 670 .await 671 else { 672 return invalid("Failed to get DID doc's handle"); 673 }; 674 let Some(handle_did) = handle_did else { 675 return invalid("Failed to resolve DID doc's handle"); 676 }; 677 if handle_did == did { 678 partial_doc.unverified_handle.to_string() 679 } else { 680 "handle.invalid".to_string() 681 } 682 }; 683 684 ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject { 685 did: did.to_string(), 686 handle, 687 pds: partial_doc.pds, 688 signing_key: partial_doc.signing_key, 689 })) 690 } 691 692 /// com.bad-example.proxy.hydrateQueryResponse 693 /// 694 /// > [!important] 695 /// > Unstable! This endpoint is experimental and may change. 696 /// 697 /// Fetch + include records referenced from an upstream xrpc query response 698 #[oai( 699 path = "/com.bad-example.proxy.hydrateQueryResponse", 700 method = "post", 701 tag = "ApiTags::Custom" 702 )] 703 async fn proxy_hydrate_query( 704 &self, 705 Json(payload): Json<ProxyQueryPayload>, 706 ) -> ProxyHydrateResponse { 707 // TODO: the Accept request header, if present, gotta be json 708 // TODO: find any Authorization header and verify it. TBD about `aud`. 709 710 let params = if let Some(p) = payload.params { 711 let serde_json::Value::Object(map) = p else { 712 panic!("params have to be an object"); 713 }; 714 Some(map) 715 } else { None }; 716 717 match self.proxy.proxy( 718 payload.xrpc, 719 payload.atproto_proxy, 720 params, 721 ).await { 722 Ok(skeleton) => { 723 let links = match extract_links(payload.hydration_sources, &skeleton) { 724 Ok(l) => l, 725 Err(e) => { 726 log::warn!("problem extracting: {e:?}"); 727 return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting")) 728 } 729 }; 730 let mut records = HashMap::new(); 731 let mut identifiers = HashMap::new(); 732 let mut blobs = HashMap::new(); 733 734 enum GetThing { 735 Record(String, Hydration<ProxyHydrationRecordFound>), 736 Identifier(String, Hydration<ProxyHydrationIdentifierFound>), 737 Blob(String, Hydration<ProxyHydrationBlobFound>), 738 } 739 740 let (tx, mut rx) = mpsc::channel(1); 741 742 for link in links { 743 match link { 744 MatchedRef::AtUri { uri, cid } => { 745 if records.contains_key(&uri) { 746 log::warn!("skipping duplicate record without checking cid"); 747 continue; 748 } 749 let mut u = url::Url::parse("https://example.com").unwrap(); 750 u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo 751 records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending { 752 url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc. 753 })); 754 let tx = tx.clone(); 755 let identity = self.identity.clone(); 756 let repo = self.repo.clone(); 757 tokio::task::spawn(async move { 758 let rest = uri.strip_prefix("at://").unwrap(); 759 let (identifier, rest) = rest.split_once('/').unwrap(); 760 let (collection, rkey) = rest.split_once('/').unwrap(); 761 762 let did = if identifier.starts_with("did:") { 763 Did::new(identifier.to_string()).unwrap() 764 } else { 765 let handle = Handle::new(identifier.to_string()).unwrap(); 766 identity.handle_to_did(handle).await.unwrap().unwrap() 767 }; 768 769 let res = match repo.get_record( 770 &did, 771 &Nsid::new(collection.to_string()).unwrap(), 772 &RecordKey::new(rkey.to_string()).unwrap(), 773 &cid.as_ref().map(|s| Cid::from_str(s).unwrap()), 774 ).await { 775 Ok(CachedRecord::Deleted) => 776 Hydration::Error(ProxyHydrationError { 777 reason: "record deleted".to_string(), 778 }), 779 Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => { 780 if let Some(c) = cid && found_cid.as_ref().to_string() != c { 781 log::warn!("ignoring cid mismatch"); 782 } 783 let value = serde_json::from_str(&record).unwrap(); 784 Hydration::Found(ProxyHydrationRecordFound { 785 record: value, 786 }) 787 } 788 Err(e) => { 789 log::warn!("finally oop {e:?}"); 790 Hydration::Error(ProxyHydrationError { 791 reason: "failed to fetch record".to_string(), 792 }) 793 } 794 }; 795 tx.send(GetThing::Record(uri, res)).await 796 }); 797 } 798 MatchedRef::Identifier(id) => { 799 if identifiers.contains_key(&id) { 800 continue; 801 } 802 let mut u = url::Url::parse("https://example.com").unwrap(); 803 u.query_pairs_mut().append_pair("identifier", &id); 804 identifiers.insert(id.clone(), Hydration::Pending(ProxyHydrationPending { 805 url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross 806 })); 807 let tx = tx.clone(); 808 let identity = self.identity.clone(); 809 tokio::task::spawn(async move { 810 let res = match Self::resolve_mini_doc_impl(&id, identity).await { 811 ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound { 812 mini_doc 813 }), 814 ResolveMiniIDResponse::BadRequest(e) => { 815 log::warn!("minidoc fail: {:?}", e.0); 816 Hydration::Error(ProxyHydrationError { 817 reason: "failed to resolve mini doc".to_string(), 818 }) 819 } 820 }; 821 tx.send(GetThing::Identifier(id, res)).await 822 }); 823 } 824 MatchedRef::Blob { link, mime, size: _ } => { 825 if blobs.contains_key(&link) { 826 continue; 827 } 828 if mime != "image/jpeg" { 829 Hydration::<ProxyHydrationBlobFound>::Error(ProxyHydrationError { 830 reason: "only image/jpeg supported for now".to_string(), 831 }); 832 } 833 todo!("oops we need to know the account too") 834 } 835 } 836 } 837 // so the channel can close when all are completed 838 // (we shoudl be doing a timeout...) 839 drop(tx); 840 841 while let Some(hydration) = rx.recv().await { 842 match hydration { 843 GetThing::Record(uri, h) => { records.insert(uri, h); } 844 GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); } 845 GetThing::Blob(cid, asdf) => { blobs.insert(cid, asdf); } 846 }; 847 } 848 849 ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject { 850 output: skeleton, 851 records, 852 identifiers, 853 blobs, 854 })) 855 } 856 Err(e) => { 857 log::warn!("oh no: {e:?}"); 858 ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry")) 859 } 860 } 861 862 } 863 864 async fn get_record_impl( 865 &self, 866 repo: String, 867 collection: String, 868 rkey: String, 869 cid: Option<String>, 870 ) -> GetRecordResponse { 871 let did = match Did::new(repo.clone()) { 872 Ok(did) => did, 873 Err(_) => { 874 let Ok(handle) = Handle::new(repo.to_lowercase()) else { 875 return GetRecordResponse::BadRequest(xrpc_error( 876 "InvalidRequest", 877 "Repo was not a valid DID or handle", 878 )); 879 }; 880 match self.identity.handle_to_did(handle).await { 881 Ok(res) => { 882 if let Some(did) = res { 883 did 884 } else { 885 return GetRecordResponse::BadRequest(xrpc_error( 886 "InvalidRequest", 887 "Could not resolve handle repo to a DID", 888 )); 889 } 890 } 891 Err(e) => { 892 log::debug!("handle resolution failed: {e}"); 893 return GetRecordResponse::ServerError(xrpc_error( 894 "ResolutionFailed", 895 "Errored while trying to resolve handle to DID", 896 )); 897 } 898 } 899 } 900 }; 901 902 let Ok(collection) = Nsid::new(collection) else { 903 return GetRecordResponse::BadRequest(xrpc_error( 904 "InvalidRequest", 905 "Invalid NSID for collection", 906 )); 907 }; 908 909 let Ok(rkey) = RecordKey::new(rkey) else { 910 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 911 }; 912 913 let cid: Option<Cid> = if let Some(cid) = cid { 914 let Ok(cid) = Cid::from_str(&cid) else { 915 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 916 }; 917 Some(cid) 918 } else { 919 None 920 }; 921 922 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey); 923 924 metrics::counter!("slingshot_get_record").increment(1); 925 let fr = self 926 .cache 927 .get_or_fetch(&at_uri, { 928 let cid = cid.clone(); 929 let repo_api = self.repo.clone(); 930 || async move { 931 let t0 = Instant::now(); 932 let res = repo_api.get_record(&did, &collection, &rkey, &cid).await; 933 let success = if res.is_ok() { "true" } else { "false" }; 934 metrics::histogram!("slingshot_fetch_record", "success" => success) 935 .record(t0.elapsed()); 936 res 937 } 938 }) 939 .await; 940 941 let entry = match fr { 942 Ok(e) => e, 943 Err(e) if e.kind() == foyer::ErrorKind::External => { 944 let record_error = match e.source().map(|s| s.downcast_ref::<RecordError>()) { 945 Some(Some(e)) => e, 946 other => { 947 if other.is_none() { 948 log::error!("external error without a source. wat? {e}"); 949 } else { 950 log::error!("downcast to RecordError failed...? {e}"); 951 } 952 return GetRecordResponse::ServerError(xrpc_error( 953 "ServerError", 954 "sorry, something went wrong", 955 )); 956 } 957 }; 958 let RecordError::UpstreamBadRequest(ErrorResponseObject { 959 ref error, 960 ref message, 961 }) = *record_error 962 else { 963 log::error!("RecordError getting cache entry, {record_error:?}"); 964 return GetRecordResponse::ServerError(xrpc_error( 965 "ServerError", 966 "sorry, something went wrong", 967 )); 968 }; 969 970 // all of the noise around here is so that we can ultimately reach this: 971 // upstream BadRequest extracted from the foyer result which we can proxy back 972 return GetRecordResponse::BadRequest(xrpc_error( 973 error, 974 format!("Upstream bad request: {message}"), 975 )); 976 } 977 Err(e) => { 978 log::error!("error (foyer) getting cache entry, {e:?}"); 979 return GetRecordResponse::ServerError(xrpc_error( 980 "ServerError", 981 "sorry, something went wrong", 982 )); 983 } 984 }; 985 986 match *entry { 987 CachedRecord::Found(ref raw) => { 988 let (found_cid, raw_value) = raw.into(); 989 if cid.clone().map(|c| c != found_cid).unwrap_or(false) { 990 return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 991 error: "RecordNotFound".to_string(), 992 message: "A record was found but its CID did not match that requested" 993 .to_string(), 994 })); 995 } 996 // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272 997 let value = 998 serde_json::from_str(raw_value.get()).expect("RawValue to be valid json"); 999 GetRecordResponse::Ok(Json(FoundRecordResponseObject { 1000 uri: at_uri, 1001 cid: Some(found_cid.as_ref().to_string()), 1002 value, 1003 })) 1004 } 1005 CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject { 1006 error: "RecordNotFound".to_string(), 1007 message: "This record was deleted".to_string(), 1008 })), 1009 } 1010 } 1011 1012 // TODO 1013 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")] 1014 // but these are both not specified to do bidirectional validation, which is what we want to offer 1015 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc 1016 // would be nice if there were two queries: 1017 // did -> verified handle + pds url 1018 // handle -> verified did + pds url 1019 // 1020 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc 1021 // but this will *definitely* cause problems probably 1022 // 1023 // resolveMiniDoc gets most of this well enough. 1024} 1025 1026#[derive(Debug, Clone, Serialize)] 1027#[serde(rename_all = "camelCase")] 1028struct AppViewService { 1029 id: String, 1030 r#type: String, 1031 service_endpoint: String, 1032} 1033#[derive(Debug, Clone, Serialize)] 1034struct AppViewDoc { 1035 id: String, 1036 service: [AppViewService; 1], 1037} 1038/// Serve a did document for did:web for this to be an xrpc appview 1039/// 1040/// No slingshot endpoints currently require auth, so it's not necessary to do 1041/// service proxying, however clients may wish to: 1042/// 1043/// - PDS proxying offers a level of client IP anonymity from slingshot 1044/// - slingshot *may* implement more generous per-user rate-limits for proxied requests in the future 1045fn get_did_doc(domain: &str) -> impl Endpoint + use<> { 1046 let doc = poem::web::Json(AppViewDoc { 1047 id: format!("did:web:{domain}"), 1048 service: [AppViewService { 1049 id: "#slingshot".to_string(), 1050 r#type: "SlingshotRecordProxy".to_string(), 1051 service_endpoint: format!("https://{domain}"), 1052 }], 1053 }); 1054 make_sync(move |_| doc.clone()) 1055} 1056 1057#[allow(clippy::too_many_arguments)] 1058pub async fn serve( 1059 cache: HybridCache<String, CachedRecord>, 1060 identity: Identity, 1061 repo: Repo, 1062 proxy: Proxy, 1063 acme_domain: Option<String>, 1064 acme_contact: Option<String>, 1065 acme_cache_path: Option<PathBuf>, 1066 acme_ipv6: bool, 1067 shutdown: CancellationToken, 1068 bind: std::net::SocketAddr, 1069) -> Result<(), ServerError> { 1070 let repo = Arc::new(repo); 1071 let proxy = Arc::new(proxy); 1072 let api_service = OpenApiService::new( 1073 Xrpc { 1074 cache, 1075 identity, 1076 proxy, 1077 repo, 1078 }, 1079 "Slingshot", 1080 env!("CARGO_PKG_VERSION"), 1081 ) 1082 .server(if let Some(ref h) = acme_domain { 1083 format!("https://{h}") 1084 } else { 1085 format!("http://{bind}") // yeah should probably fix this for reverse-proxy scenarios but it's ok for dev for now 1086 }) 1087 .url_prefix("/xrpc") 1088 .contact( 1089 ContactObject::new() 1090 .name("@microcosm.blue") 1091 .url("https://bsky.app/profile/microcosm.blue"), 1092 ) 1093 .description(include_str!("../api-description.md")) 1094 .external_document(ExternalDocumentObject::new( 1095 "https://microcosm.blue/slingshot", 1096 )); 1097 1098 let mut app = Route::new() 1099 .at("/", StaticFileEndpoint::new("./static/index.html")) 1100 .nest("/openapi", api_service.spec_endpoint()) 1101 .nest("/xrpc/", api_service); 1102 1103 if let Some(domain) = acme_domain { 1104 rustls::crypto::aws_lc_rs::default_provider() 1105 .install_default() 1106 .expect("alskfjalksdjf"); 1107 1108 app = app.at("/.well-known/did.json", get_did_doc(&domain)); 1109 1110 let mut auto_cert = AutoCert::builder() 1111 .directory_url(LETS_ENCRYPT_PRODUCTION) 1112 .domain(&domain); 1113 if let Some(contact) = acme_contact { 1114 auto_cert = auto_cert.contact(contact); 1115 } 1116 if let Some(cache_path) = acme_cache_path { 1117 auto_cert = auto_cert.cache_path(cache_path); 1118 } 1119 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 1120 1121 run( 1122 TcpListener::bind(if acme_ipv6 { "[::]:443" } else { "0.0.0.0:443" }).acme(auto_cert), 1123 app, 1124 shutdown, 1125 ) 1126 .await 1127 } else { 1128 run(TcpListener::bind(bind), app, shutdown).await 1129 } 1130} 1131 1132async fn run<L>(listener: L, app: Route, shutdown: CancellationToken) -> Result<(), ServerError> 1133where 1134 L: Listener + 'static, 1135{ 1136 let app = app 1137 .with( 1138 Cors::new() 1139 .allow_origin_regex("*") 1140 .allow_methods([Method::GET, Method::POST]) 1141 .allow_credentials(false), 1142 ) 1143 .with(CatchPanic::new()) 1144 .around(request_counter) 1145 .with(Tracing); 1146 1147 Server::new(listener) 1148 .name("slingshot") 1149 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) 1150 .await 1151 .map_err(ServerError::ServerExited) 1152 .inspect(|()| log::info!("server ended. goodbye.")) 1153} 1154 1155async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 1156 let t0 = std::time::Instant::now(); 1157 let method = req.method().to_string(); 1158 let path = req.uri().path().to_string(); 1159 let res = next.call(req).await?.into_response(); 1160 metrics::histogram!( 1161 "server_request", 1162 "endpoint" => format!("{method} {path}"), 1163 "status" => res.status().to_string(), 1164 ) 1165 .record(t0.elapsed()); 1166 Ok(res) 1167}