Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::{
2 CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3 error::{RecordError, ServerError},
4 proxy::{extract_links, MatchedRef},
5 record::RawRecord,
6};
7use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
8use foyer::HybridCache;
9use links::at_uri::parse_at_uri as normalize_at_uri;
10use serde::Serialize;
11use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
12use tokio::sync::mpsc;
13use tokio_util::sync::CancellationToken;
14
15use poem::{
16 Endpoint, EndpointExt, IntoResponse, Route, Server,
17 endpoint::{StaticFileEndpoint, make_sync},
18 http::Method,
19 listener::{
20 Listener, TcpListener,
21 acme::{AutoCert, LETS_ENCRYPT_PRODUCTION},
22 },
23 middleware::{CatchPanic, Cors, Tracing},
24};
25use poem_openapi::{
26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27 Union,
28 param::Query, payload::Json, types::Example,
29};
30
31fn example_handle() -> String {
32 "bad-example.com".to_string()
33}
34fn example_did() -> String {
35 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
36}
37fn example_collection() -> String {
38 "app.bsky.feed.like".to_string()
39}
40fn example_rkey() -> String {
41 "3lv4ouczo2b2a".to_string()
42}
43fn example_uri() -> String {
44 format!(
45 "at://{}/{}/{}",
46 example_did(),
47 example_collection(),
48 example_rkey()
49 )
50}
51fn example_pds() -> String {
52 "https://porcini.us-east.host.bsky.network".to_string()
53}
54fn example_signing_key() -> String {
55 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
56}
57
58#[derive(Debug, Object)]
59#[oai(example = true)]
60struct XrpcErrorResponseObject {
61 /// Should correspond an error `name` in the lexicon errors array
62 error: String,
63 /// Human-readable description and possibly additonal context
64 message: String,
65}
66impl Example for XrpcErrorResponseObject {
67 fn example() -> Self {
68 Self {
69 error: "RecordNotFound".to_string(),
70 message: "This record was deleted".to_string(),
71 }
72 }
73}
74type XrpcError = Json<XrpcErrorResponseObject>;
75fn xrpc_error(error: impl AsRef<str>, message: impl AsRef<str>) -> XrpcError {
76 Json(XrpcErrorResponseObject {
77 error: error.as_ref().to_string(),
78 message: message.as_ref().to_string(),
79 })
80}
81
82fn bad_request_handler_get_record(err: poem::Error) -> GetRecordResponse {
83 GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
84 error: "InvalidRequest".to_string(),
85 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
86 }))
87}
88
89fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
90 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
91 error: "InvalidRequest".to_string(),
92 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
93 }))
94}
95
96fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97 ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
98 error: "InvalidRequest".to_string(),
99 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
100 }))
101}
102
103fn bad_request_handler_resolve_handle(err: poem::Error) -> JustDidResponse {
104 JustDidResponse::BadRequest(Json(XrpcErrorResponseObject {
105 error: "InvalidRequest".to_string(),
106 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
107 }))
108}
109
110#[derive(Object)]
111#[oai(example = true)]
112struct FoundRecordResponseObject {
113 /// at-uri for this record
114 uri: String,
115 /// CID for this exact version of the record
116 ///
117 /// Slingshot will always return the CID, despite it not being a required
118 /// response property in the official lexicon.
119 ///
120 /// TODO: probably actually let it be optional, idk are some pds's weirdly
121 /// not returning it?
122 cid: Option<String>,
123 /// the record itself as JSON
124 value: serde_json::Value,
125}
126impl Example for FoundRecordResponseObject {
127 fn example() -> Self {
128 Self {
129 uri: example_uri(),
130 cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()),
131 value: serde_json::json!({
132 "$type": "app.bsky.feed.like",
133 "createdAt": "2025-07-29T18:02:02.327Z",
134 "subject": {
135 "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy",
136 "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k"
137 }
138 }),
139 }
140 }
141}
142
143#[derive(ApiResponse)]
144#[oai(bad_request_handler = "bad_request_handler_get_record")]
145enum GetRecordResponse {
146 /// Record found
147 #[oai(status = 200)]
148 Ok(Json<FoundRecordResponseObject>),
149 /// Bad request or no record to return
150 ///
151 /// The only error name in the repo.getRecord lexicon is `RecordNotFound`,
152 /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
153 /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of
154 /// these, slingshot will only generate `RecordNotFound` or `InvalidRequest`,
155 /// but may return any proxied error code from the upstream repo.
156 #[oai(status = 400)]
157 BadRequest(XrpcError),
158 /// Server errors
159 #[oai(status = 500)]
160 ServerError(XrpcError),
161}
162
163#[derive(Object)]
164#[oai(example = true)]
165struct MiniDocResponseObject {
166 /// DID, bi-directionally verified if a handle was provided in the query.
167 did: String,
168 /// The validated handle of the account or `handle.invalid` if the handle
169 /// did not bi-directionally match the DID document.
170 handle: String,
171 /// The identity's PDS URL
172 pds: String,
173 /// The atproto signing key publicKeyMultibase
174 ///
175 /// Legacy key encoding not supported. the key is returned directly; `id`,
176 /// `type`, and `controller` are omitted.
177 signing_key: String,
178}
179impl Example for MiniDocResponseObject {
180 fn example() -> Self {
181 Self {
182 did: example_did(),
183 handle: example_handle(),
184 pds: example_pds(),
185 signing_key: example_signing_key(),
186 }
187 }
188}
189
190#[derive(ApiResponse)]
191#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
192enum ResolveMiniIDResponse {
193 /// Identity resolved
194 #[oai(status = 200)]
195 Ok(Json<MiniDocResponseObject>),
196 /// Bad request or identity not resolved
197 #[oai(status = 400)]
198 BadRequest(XrpcError),
199}
200
201#[derive(Object)]
202struct ProxyHydrationError {
203 reason: String,
204}
205
206#[derive(Object)]
207struct ProxyHydrationPending {
208 url: String,
209}
210
211#[derive(Object)]
212struct ProxyHydrationRecordFound {
213 record: serde_json::Value,
214}
215
216#[derive(Object)]
217struct ProxyHydrationIdentifierFound {
218 mini_doc: MiniDocResponseObject,
219}
220
221#[derive(Object)]
222#[oai(rename_all = "camelCase")]
223struct ProxyHydrationBlobFound {
224 /// cdn url
225 link: String,
226 mime_type: String,
227 size: u64,
228}
229
230// todo: there's gotta be a supertrait that collects these?
231use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
232
233#[derive(Union)]
234#[oai(discriminator_name = "status", rename_all = "camelCase")]
235enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
236 Error(ProxyHydrationError),
237 Pending(ProxyHydrationPending),
238 Found(T),
239}
240
241#[derive(Object)]
242#[oai(example = true)]
243struct ProxyHydrateResponseObject {
244 /// The original upstream response content
245 output: serde_json::Value,
246 /// Any hydrated records
247 records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
248 /// Any hydrated identifiers
249 ///
250 /// TODO: "identifiers" feels wrong as the name, probably "identities"?
251 identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
252 /// Any hydrated blob CDN urls
253 blobs: HashMap<String, Hydration<ProxyHydrationBlobFound>>,
254}
255impl Example for ProxyHydrateResponseObject {
256 fn example() -> Self {
257 Self {
258 output: serde_json::json!({}),
259 records: HashMap::from([
260 ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
261 ]),
262 identifiers: HashMap::new(),
263 blobs: HashMap::new(),
264 }
265 }
266}
267
268#[derive(ApiResponse)]
269#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
270enum ProxyHydrateResponse {
271 #[oai(status = 200)]
272 Ok(Json<ProxyHydrateResponseObject>),
273 #[oai(status = 400)]
274 BadRequest(XrpcError)
275}
276
277#[derive(Object)]
278pub struct HydrationSource {
279 /// Record Path syntax for locating fields
280 pub path: String,
281 /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
282 ///
283 /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
284 /// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
285 /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
286 /// - `did`: string, `did` format
287 /// - `handle`: string, `handle` format
288 /// - `at-identifier`: string, `did` or `handle` format
289 pub shape: String,
290}
291
292#[derive(Object)]
293#[oai(example = true)]
294struct ProxyQueryPayload {
295 /// The NSID of the XRPC you wish to forward
296 xrpc: String,
297 /// The destination service the request will be forwarded to
298 atproto_proxy: String,
299 /// The `params` for the destination service XRPC endpoint
300 ///
301 /// Currently this will be passed along unchecked, but a future version of
302 /// slingshot may attempt to do lexicon resolution to validate `params`
303 /// based on the upstream service
304 params: Option<serde_json::Value>,
305 /// Paths within the response to look for at-uris that can be hydrated
306 hydration_sources: Vec<HydrationSource>,
307 // todo: deadline thing
308
309}
310impl Example for ProxyQueryPayload {
311 fn example() -> Self {
312 Self {
313 xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
314 atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
315 params: Some(serde_json::json!({
316 "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
317 })),
318 hydration_sources: vec![
319 HydrationSource {
320 path: "feed[].post".to_string(),
321 shape: "at-uri".to_string(),
322 }
323 ],
324 }
325 }
326}
327
328#[derive(Object)]
329#[oai(example = true)]
330struct FoundDidResponseObject {
331 /// the DID, bi-directionally verified if using Slingshot
332 did: String,
333}
334impl Example for FoundDidResponseObject {
335 fn example() -> Self {
336 Self { did: example_did() }
337 }
338}
339
340#[derive(ApiResponse)]
341#[oai(bad_request_handler = "bad_request_handler_resolve_handle")]
342enum JustDidResponse {
343 /// Resolution succeeded
344 #[oai(status = 200)]
345 Ok(Json<FoundDidResponseObject>),
346 /// Bad request, failed to resolve, or failed to verify
347 ///
348 /// `error` will be one of `InvalidRequest`, `HandleNotFound`.
349 #[oai(status = 400)]
350 BadRequest(XrpcError),
351 /// Something went wrong trying to complete the request
352 #[oai(status = 500)]
353 ServerError(XrpcError),
354}
355
356struct Xrpc {
357 cache: HybridCache<String, CachedRecord>,
358 identity: Identity,
359 proxy: Arc<Proxy>,
360 repo: Arc<Repo>,
361}
362
363#[derive(Tags)]
364enum ApiTags {
365 /// Core ATProtocol-compatible APIs.
366 ///
367 /// > [!tip]
368 /// > Upstream documentation is available at
369 /// > https://docs.bsky.app/docs/category/http-reference
370 ///
371 /// These queries are usually executed directly against the PDS containing
372 /// the data being requested. Slingshot offers a caching view of the same
373 /// contents with better expected performance and reliability.
374 #[oai(rename = "com.atproto.* queries")]
375 ComAtproto,
376 /// Additional and improved APIs.
377 ///
378 /// These APIs offer small tweaks to the core ATProtocol APIs, with more
379 /// more convenient [request parameters](#tag/slingshot-specific-queries/GET/xrpc/com.bad-example.repo.getUriRecord)
380 /// or [response formats](#tag/slingshot-specific-queries/GET/xrpc/com.bad-example.identity.resolveMiniDoc).
381 ///
382 /// > [!important]
383 /// > At the moment, these are namespaced under the `com.bad-example.*` NSID
384 /// > prefix, but as they stabilize they may be migrated to an org namespace
385 /// > like `blue.microcosm.*`. Support for asliasing to `com.bad-example.*`
386 /// > will be maintained as long as it's in use.
387 #[oai(rename = "slingshot-specific queries")]
388 Custom,
389}
390
391#[OpenApi]
392impl Xrpc {
393 /// com.atproto.repo.getRecord
394 ///
395 /// Get a single record from a repository. Does not require auth.
396 ///
397 /// > [!tip]
398 /// > See also the [canonical `com.atproto` XRPC documentation](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
399 /// > that this endpoint aims to be compatible with.
400 #[oai(
401 path = "/com.atproto.repo.getRecord",
402 method = "get",
403 tag = "ApiTags::ComAtproto"
404 )]
405 async fn get_record(
406 &self,
407 /// The DID or handle of the repo
408 #[oai(example = "example_did")]
409 Query(repo): Query<String>,
410 /// The NSID of the record collection
411 #[oai(example = "example_collection")]
412 Query(collection): Query<String>,
413 /// The Record key
414 #[oai(example = "example_rkey")]
415 Query(rkey): Query<String>,
416 /// Optional: the CID of the version of the record.
417 ///
418 /// If not specified, then return the most recent version.
419 ///
420 /// If a stale `CID` is specified and a newer version of the record
421 /// exists, Slingshot returns a `NotFound` error. That is: Slingshot
422 /// only retains the most recent version of a record.
423 Query(cid): Query<Option<String>>,
424 ) -> GetRecordResponse {
425 self.get_record_impl(repo, collection, rkey, cid).await
426 }
427
428 /// blue.microcosm.repo.getRecordByUri
429 ///
430 /// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name
431 #[oai(
432 path = "/blue.microcosm.repo.getRecordByUri",
433 method = "get",
434 tag = "ApiTags::Custom"
435 )]
436 async fn get_record_by_uri(
437 &self,
438 /// The at-uri of the record
439 ///
440 /// The identifier can be a DID or an atproto handle, and the collection
441 /// and rkey segments must be present.
442 #[oai(example = "example_uri")]
443 Query(at_uri): Query<String>,
444 /// Optional: the CID of the version of the record.
445 ///
446 /// If not specified, then return the most recent version.
447 ///
448 /// > [!tip]
449 /// > If specified and a newer version of the record exists, returns 404 not
450 /// > found. That is: slingshot only retains the most recent version of a
451 /// > record.
452 Query(cid): Query<Option<String>>,
453 ) -> GetRecordResponse {
454 self.get_uri_record(Query(at_uri), Query(cid)).await
455 }
456
457 /// com.bad-example.repo.getUriRecord
458 ///
459 /// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
460 /// which accepts an `at-uri` instead of individual repo/collection/rkey params
461 #[oai(
462 path = "/com.bad-example.repo.getUriRecord",
463 method = "get",
464 tag = "ApiTags::Custom"
465 )]
466 async fn get_uri_record(
467 &self,
468 /// The at-uri of the record
469 ///
470 /// The identifier can be a DID or an atproto handle, and the collection
471 /// and rkey segments must be present.
472 #[oai(example = "example_uri")]
473 Query(at_uri): Query<String>,
474 /// Optional: the CID of the version of the record.
475 ///
476 /// If not specified, then return the most recent version.
477 ///
478 /// > [!tip]
479 /// > If specified and a newer version of the record exists, returns 404 not
480 /// > found. That is: slingshot only retains the most recent version of a
481 /// > record.
482 Query(cid): Query<Option<String>>,
483 ) -> GetRecordResponse {
484 let bad_at_uri = || {
485 GetRecordResponse::BadRequest(xrpc_error(
486 "InvalidRequest",
487 "at-uri does not appear to be valid",
488 ))
489 };
490
491 let Some(normalized) = normalize_at_uri(&at_uri) else {
492 return bad_at_uri();
493 };
494
495 // TODO: move this to links
496 let Some(rest) = normalized.strip_prefix("at://") else {
497 return bad_at_uri();
498 };
499 let Some((repo, rest)) = rest.split_once('/') else {
500 return bad_at_uri();
501 };
502 let Some((collection, rest)) = rest.split_once('/') else {
503 return bad_at_uri();
504 };
505 let rkey = if let Some((rkey, _rest)) = rest.split_once('?') {
506 rkey
507 } else {
508 rest
509 };
510
511 self.get_record_impl(
512 repo.to_string(),
513 collection.to_string(),
514 rkey.to_string(),
515 cid,
516 )
517 .await
518 }
519
520 /// com.atproto.identity.resolveHandle
521 ///
522 /// Resolves an atproto [`handle`](https://atproto.com/guides/glossary#handle)
523 /// (hostname) to a [`DID`](https://atproto.com/guides/glossary#did-decentralized-id).
524 ///
525 /// > [!tip]
526 /// > Compatibility note: Slingshot will **always bi-directionally verify
527 /// > against the DID document**, which is optional according to the
528 /// > authoritative lexicon.
529 ///
530 /// > [!tip]
531 /// > See the [canonical `com.atproto` XRPC documentation](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-handle)
532 /// > that this endpoint aims to be compatible with.
533 #[oai(
534 path = "/com.atproto.identity.resolveHandle",
535 method = "get",
536 tag = "ApiTags::ComAtproto"
537 )]
538 async fn resolve_handle(
539 &self,
540 /// The handle to resolve.
541 #[oai(example = "example_handle")]
542 Query(handle): Query<String>,
543 ) -> JustDidResponse {
544 let Ok(handle) = Handle::new(handle.to_lowercase()) else {
545 return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle"));
546 };
547
548 let Ok(alleged_did) = self.identity.handle_to_did(handle.clone()).await else {
549 return JustDidResponse::ServerError(xrpc_error("Failed", "Could not resolve handle"));
550 };
551
552 let Some(alleged_did) = alleged_did else {
553 return JustDidResponse::BadRequest(xrpc_error(
554 "HandleNotFound",
555 "Could not resolve handle to a DID",
556 ));
557 };
558
559 let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&alleged_did).await else {
560 return JustDidResponse::ServerError(xrpc_error("Failed", "Could not fetch DID doc"));
561 };
562
563 let Some(partial_doc) = partial_doc else {
564 return JustDidResponse::BadRequest(xrpc_error(
565 "HandleNotFound",
566 "Resolved handle but could not find DID doc for the DID",
567 ));
568 };
569
570 if partial_doc.unverified_handle != handle {
571 return JustDidResponse::BadRequest(xrpc_error(
572 "HandleNotFound",
573 "Resolved handle failed bi-directional validation",
574 ));
575 }
576
577 JustDidResponse::Ok(Json(FoundDidResponseObject {
578 did: alleged_did.to_string(),
579 }))
580 }
581
582 /// blue.microcosm.identity.resolveMiniDoc
583 ///
584 /// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name
585 #[oai(
586 path = "/blue.microcosm.identity.resolveMiniDoc",
587 method = "get",
588 tag = "ApiTags::Custom"
589 )]
590 async fn resolve_mini_doc(
591 &self,
592 /// Handle or DID to resolve
593 #[oai(example = "example_handle")]
594 Query(identifier): Query<String>,
595 ) -> ResolveMiniIDResponse {
596 self.resolve_mini_id(Query(identifier)).await
597 }
598
599 /// com.bad-example.identity.resolveMiniDoc
600 ///
601 /// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity)
602 /// but instead of the full `didDoc` it returns an atproto-relevant subset.
603 #[oai(
604 path = "/com.bad-example.identity.resolveMiniDoc",
605 method = "get",
606 tag = "ApiTags::Custom"
607 )]
608 async fn resolve_mini_id(
609 &self,
610 /// Handle or DID to resolve
611 #[oai(example = "example_handle")]
612 Query(identifier): Query<String>,
613 ) -> ResolveMiniIDResponse {
614 Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
615 }
616
617 async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse {
618 let invalid = |reason: &'static str| {
619 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
620 };
621
622 let mut unverified_handle = None;
623 let did = match Did::new(identifier.to_string()) {
624 Ok(did) => did,
625 Err(_) => {
626 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
627 return invalid("Identifier was not a valid DID or handle");
628 };
629
630 match identity.handle_to_did(alleged_handle.clone()).await {
631 Ok(res) => {
632 if let Some(did) = res {
633 // we did it joe
634 unverified_handle = Some(alleged_handle);
635 did
636 } else {
637 return invalid("Could not resolve handle identifier to a DID");
638 }
639 }
640 Err(e) => {
641 log::debug!("failed to resolve handle: {e}");
642 // TODO: ServerError not BadRequest
643 return invalid("Errored while trying to resolve handle to DID");
644 }
645 }
646 }
647 };
648 let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
649 return invalid("Failed to get DID doc");
650 };
651 let Some(partial_doc) = partial_doc else {
652 return invalid("Failed to find DID doc");
653 };
654
655 // ok so here's where we're at:
656 // ✅ we have a DID
657 // ✅ we have a partial doc
658 // 🔶 if we have a handle, it's from the `identifier` (user-input)
659 // -> then we just need to compare to the partial doc to confirm
660 // -> else we need to resolve the DID doc's to a handle and check
661 let handle = if let Some(h) = unverified_handle {
662 if h == partial_doc.unverified_handle {
663 h.to_string()
664 } else {
665 "handle.invalid".to_string()
666 }
667 } else {
668 let Ok(handle_did) = identity
669 .handle_to_did(partial_doc.unverified_handle.clone())
670 .await
671 else {
672 return invalid("Failed to get DID doc's handle");
673 };
674 let Some(handle_did) = handle_did else {
675 return invalid("Failed to resolve DID doc's handle");
676 };
677 if handle_did == did {
678 partial_doc.unverified_handle.to_string()
679 } else {
680 "handle.invalid".to_string()
681 }
682 };
683
684 ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject {
685 did: did.to_string(),
686 handle,
687 pds: partial_doc.pds,
688 signing_key: partial_doc.signing_key,
689 }))
690 }
691
692 /// com.bad-example.proxy.hydrateQueryResponse
693 ///
694 /// > [!important]
695 /// > Unstable! This endpoint is experimental and may change.
696 ///
697 /// Fetch + include records referenced from an upstream xrpc query response
698 #[oai(
699 path = "/com.bad-example.proxy.hydrateQueryResponse",
700 method = "post",
701 tag = "ApiTags::Custom"
702 )]
703 async fn proxy_hydrate_query(
704 &self,
705 Json(payload): Json<ProxyQueryPayload>,
706 ) -> ProxyHydrateResponse {
707 // TODO: the Accept request header, if present, gotta be json
708 // TODO: find any Authorization header and verify it. TBD about `aud`.
709
710 let params = if let Some(p) = payload.params {
711 let serde_json::Value::Object(map) = p else {
712 panic!("params have to be an object");
713 };
714 Some(map)
715 } else { None };
716
717 match self.proxy.proxy(
718 payload.xrpc,
719 payload.atproto_proxy,
720 params,
721 ).await {
722 Ok(skeleton) => {
723 let links = match extract_links(payload.hydration_sources, &skeleton) {
724 Ok(l) => l,
725 Err(e) => {
726 log::warn!("problem extracting: {e:?}");
727 return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
728 }
729 };
730 let mut records = HashMap::new();
731 let mut identifiers = HashMap::new();
732 let mut blobs = HashMap::new();
733
734 enum GetThing {
735 Record(String, Hydration<ProxyHydrationRecordFound>),
736 Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
737 Blob(String, Hydration<ProxyHydrationBlobFound>),
738 }
739
740 let (tx, mut rx) = mpsc::channel(1);
741
742 for link in links {
743 match link {
744 MatchedRef::AtUri { uri, cid } => {
745 if records.contains_key(&uri) {
746 log::warn!("skipping duplicate record without checking cid");
747 continue;
748 }
749 let mut u = url::Url::parse("https://example.com").unwrap();
750 u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
751 records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
752 url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
753 }));
754 let tx = tx.clone();
755 let identity = self.identity.clone();
756 let repo = self.repo.clone();
757 tokio::task::spawn(async move {
758 let rest = uri.strip_prefix("at://").unwrap();
759 let (identifier, rest) = rest.split_once('/').unwrap();
760 let (collection, rkey) = rest.split_once('/').unwrap();
761
762 let did = if identifier.starts_with("did:") {
763 Did::new(identifier.to_string()).unwrap()
764 } else {
765 let handle = Handle::new(identifier.to_string()).unwrap();
766 identity.handle_to_did(handle).await.unwrap().unwrap()
767 };
768
769 let res = match repo.get_record(
770 &did,
771 &Nsid::new(collection.to_string()).unwrap(),
772 &RecordKey::new(rkey.to_string()).unwrap(),
773 &cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
774 ).await {
775 Ok(CachedRecord::Deleted) =>
776 Hydration::Error(ProxyHydrationError {
777 reason: "record deleted".to_string(),
778 }),
779 Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
780 if let Some(c) = cid && found_cid.as_ref().to_string() != c {
781 log::warn!("ignoring cid mismatch");
782 }
783 let value = serde_json::from_str(&record).unwrap();
784 Hydration::Found(ProxyHydrationRecordFound {
785 record: value,
786 })
787 }
788 Err(e) => {
789 log::warn!("finally oop {e:?}");
790 Hydration::Error(ProxyHydrationError {
791 reason: "failed to fetch record".to_string(),
792 })
793 }
794 };
795 tx.send(GetThing::Record(uri, res)).await
796 });
797 }
798 MatchedRef::Identifier(id) => {
799 if identifiers.contains_key(&id) {
800 continue;
801 }
802 let mut u = url::Url::parse("https://example.com").unwrap();
803 u.query_pairs_mut().append_pair("identifier", &id);
804 identifiers.insert(id.clone(), Hydration::Pending(ProxyHydrationPending {
805 url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
806 }));
807 let tx = tx.clone();
808 let identity = self.identity.clone();
809 tokio::task::spawn(async move {
810 let res = match Self::resolve_mini_doc_impl(&id, identity).await {
811 ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound {
812 mini_doc
813 }),
814 ResolveMiniIDResponse::BadRequest(e) => {
815 log::warn!("minidoc fail: {:?}", e.0);
816 Hydration::Error(ProxyHydrationError {
817 reason: "failed to resolve mini doc".to_string(),
818 })
819 }
820 };
821 tx.send(GetThing::Identifier(id, res)).await
822 });
823 }
824 MatchedRef::Blob { link, mime, size: _ } => {
825 if blobs.contains_key(&link) {
826 continue;
827 }
828 if mime != "image/jpeg" {
829 Hydration::<ProxyHydrationBlobFound>::Error(ProxyHydrationError {
830 reason: "only image/jpeg supported for now".to_string(),
831 });
832 }
833 todo!("oops we need to know the account too")
834 }
835 }
836 }
837 // so the channel can close when all are completed
838 // (we shoudl be doing a timeout...)
839 drop(tx);
840
841 while let Some(hydration) = rx.recv().await {
842 match hydration {
843 GetThing::Record(uri, h) => { records.insert(uri, h); }
844 GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
845 GetThing::Blob(cid, asdf) => { blobs.insert(cid, asdf); }
846 };
847 }
848
849 ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
850 output: skeleton,
851 records,
852 identifiers,
853 blobs,
854 }))
855 }
856 Err(e) => {
857 log::warn!("oh no: {e:?}");
858 ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
859 }
860 }
861
862 }
863
864 async fn get_record_impl(
865 &self,
866 repo: String,
867 collection: String,
868 rkey: String,
869 cid: Option<String>,
870 ) -> GetRecordResponse {
871 let did = match Did::new(repo.clone()) {
872 Ok(did) => did,
873 Err(_) => {
874 let Ok(handle) = Handle::new(repo.to_lowercase()) else {
875 return GetRecordResponse::BadRequest(xrpc_error(
876 "InvalidRequest",
877 "Repo was not a valid DID or handle",
878 ));
879 };
880 match self.identity.handle_to_did(handle).await {
881 Ok(res) => {
882 if let Some(did) = res {
883 did
884 } else {
885 return GetRecordResponse::BadRequest(xrpc_error(
886 "InvalidRequest",
887 "Could not resolve handle repo to a DID",
888 ));
889 }
890 }
891 Err(e) => {
892 log::debug!("handle resolution failed: {e}");
893 return GetRecordResponse::ServerError(xrpc_error(
894 "ResolutionFailed",
895 "Errored while trying to resolve handle to DID",
896 ));
897 }
898 }
899 }
900 };
901
902 let Ok(collection) = Nsid::new(collection) else {
903 return GetRecordResponse::BadRequest(xrpc_error(
904 "InvalidRequest",
905 "Invalid NSID for collection",
906 ));
907 };
908
909 let Ok(rkey) = RecordKey::new(rkey) else {
910 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
911 };
912
913 let cid: Option<Cid> = if let Some(cid) = cid {
914 let Ok(cid) = Cid::from_str(&cid) else {
915 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
916 };
917 Some(cid)
918 } else {
919 None
920 };
921
922 let at_uri = format!("at://{}/{}/{}", &*did, &*collection, &*rkey);
923
924 metrics::counter!("slingshot_get_record").increment(1);
925 let fr = self
926 .cache
927 .get_or_fetch(&at_uri, {
928 let cid = cid.clone();
929 let repo_api = self.repo.clone();
930 || async move {
931 let t0 = Instant::now();
932 let res = repo_api.get_record(&did, &collection, &rkey, &cid).await;
933 let success = if res.is_ok() { "true" } else { "false" };
934 metrics::histogram!("slingshot_fetch_record", "success" => success)
935 .record(t0.elapsed());
936 res
937 }
938 })
939 .await;
940
941 let entry = match fr {
942 Ok(e) => e,
943 Err(e) if e.kind() == foyer::ErrorKind::External => {
944 let record_error = match e.source().map(|s| s.downcast_ref::<RecordError>()) {
945 Some(Some(e)) => e,
946 other => {
947 if other.is_none() {
948 log::error!("external error without a source. wat? {e}");
949 } else {
950 log::error!("downcast to RecordError failed...? {e}");
951 }
952 return GetRecordResponse::ServerError(xrpc_error(
953 "ServerError",
954 "sorry, something went wrong",
955 ));
956 }
957 };
958 let RecordError::UpstreamBadRequest(ErrorResponseObject {
959 ref error,
960 ref message,
961 }) = *record_error
962 else {
963 log::error!("RecordError getting cache entry, {record_error:?}");
964 return GetRecordResponse::ServerError(xrpc_error(
965 "ServerError",
966 "sorry, something went wrong",
967 ));
968 };
969
970 // all of the noise around here is so that we can ultimately reach this:
971 // upstream BadRequest extracted from the foyer result which we can proxy back
972 return GetRecordResponse::BadRequest(xrpc_error(
973 error,
974 format!("Upstream bad request: {message}"),
975 ));
976 }
977 Err(e) => {
978 log::error!("error (foyer) getting cache entry, {e:?}");
979 return GetRecordResponse::ServerError(xrpc_error(
980 "ServerError",
981 "sorry, something went wrong",
982 ));
983 }
984 };
985
986 match *entry {
987 CachedRecord::Found(ref raw) => {
988 let (found_cid, raw_value) = raw.into();
989 if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
990 return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
991 error: "RecordNotFound".to_string(),
992 message: "A record was found but its CID did not match that requested"
993 .to_string(),
994 }));
995 }
996 // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272
997 let value =
998 serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
999 GetRecordResponse::Ok(Json(FoundRecordResponseObject {
1000 uri: at_uri,
1001 cid: Some(found_cid.as_ref().to_string()),
1002 value,
1003 }))
1004 }
1005 CachedRecord::Deleted => GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
1006 error: "RecordNotFound".to_string(),
1007 message: "This record was deleted".to_string(),
1008 })),
1009 }
1010 }
1011
1012 // TODO
1013 // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")]
1014 // but these are both not specified to do bidirectional validation, which is what we want to offer
1015 // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc
1016 // would be nice if there were two queries:
1017 // did -> verified handle + pds url
1018 // handle -> verified did + pds url
1019 //
1020 // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc
1021 // but this will *definitely* cause problems probably
1022 //
1023 // resolveMiniDoc gets most of this well enough.
1024}
1025
1026#[derive(Debug, Clone, Serialize)]
1027#[serde(rename_all = "camelCase")]
1028struct AppViewService {
1029 id: String,
1030 r#type: String,
1031 service_endpoint: String,
1032}
1033#[derive(Debug, Clone, Serialize)]
1034struct AppViewDoc {
1035 id: String,
1036 service: [AppViewService; 1],
1037}
1038/// Serve a did document for did:web for this to be an xrpc appview
1039///
1040/// No slingshot endpoints currently require auth, so it's not necessary to do
1041/// service proxying, however clients may wish to:
1042///
1043/// - PDS proxying offers a level of client IP anonymity from slingshot
1044/// - slingshot *may* implement more generous per-user rate-limits for proxied requests in the future
1045fn get_did_doc(domain: &str) -> impl Endpoint + use<> {
1046 let doc = poem::web::Json(AppViewDoc {
1047 id: format!("did:web:{domain}"),
1048 service: [AppViewService {
1049 id: "#slingshot".to_string(),
1050 r#type: "SlingshotRecordProxy".to_string(),
1051 service_endpoint: format!("https://{domain}"),
1052 }],
1053 });
1054 make_sync(move |_| doc.clone())
1055}
1056
1057#[allow(clippy::too_many_arguments)]
1058pub async fn serve(
1059 cache: HybridCache<String, CachedRecord>,
1060 identity: Identity,
1061 repo: Repo,
1062 proxy: Proxy,
1063 acme_domain: Option<String>,
1064 acme_contact: Option<String>,
1065 acme_cache_path: Option<PathBuf>,
1066 acme_ipv6: bool,
1067 shutdown: CancellationToken,
1068 bind: std::net::SocketAddr,
1069) -> Result<(), ServerError> {
1070 let repo = Arc::new(repo);
1071 let proxy = Arc::new(proxy);
1072 let api_service = OpenApiService::new(
1073 Xrpc {
1074 cache,
1075 identity,
1076 proxy,
1077 repo,
1078 },
1079 "Slingshot",
1080 env!("CARGO_PKG_VERSION"),
1081 )
1082 .server(if let Some(ref h) = acme_domain {
1083 format!("https://{h}")
1084 } else {
1085 format!("http://{bind}") // yeah should probably fix this for reverse-proxy scenarios but it's ok for dev for now
1086 })
1087 .url_prefix("/xrpc")
1088 .contact(
1089 ContactObject::new()
1090 .name("@microcosm.blue")
1091 .url("https://bsky.app/profile/microcosm.blue"),
1092 )
1093 .description(include_str!("../api-description.md"))
1094 .external_document(ExternalDocumentObject::new(
1095 "https://microcosm.blue/slingshot",
1096 ));
1097
1098 let mut app = Route::new()
1099 .at("/", StaticFileEndpoint::new("./static/index.html"))
1100 .nest("/openapi", api_service.spec_endpoint())
1101 .nest("/xrpc/", api_service);
1102
1103 if let Some(domain) = acme_domain {
1104 rustls::crypto::aws_lc_rs::default_provider()
1105 .install_default()
1106 .expect("alskfjalksdjf");
1107
1108 app = app.at("/.well-known/did.json", get_did_doc(&domain));
1109
1110 let mut auto_cert = AutoCert::builder()
1111 .directory_url(LETS_ENCRYPT_PRODUCTION)
1112 .domain(&domain);
1113 if let Some(contact) = acme_contact {
1114 auto_cert = auto_cert.contact(contact);
1115 }
1116 if let Some(cache_path) = acme_cache_path {
1117 auto_cert = auto_cert.cache_path(cache_path);
1118 }
1119 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?;
1120
1121 run(
1122 TcpListener::bind(if acme_ipv6 { "[::]:443" } else { "0.0.0.0:443" }).acme(auto_cert),
1123 app,
1124 shutdown,
1125 )
1126 .await
1127 } else {
1128 run(TcpListener::bind(bind), app, shutdown).await
1129 }
1130}
1131
1132async fn run<L>(listener: L, app: Route, shutdown: CancellationToken) -> Result<(), ServerError>
1133where
1134 L: Listener + 'static,
1135{
1136 let app = app
1137 .with(
1138 Cors::new()
1139 .allow_origin_regex("*")
1140 .allow_methods([Method::GET, Method::POST])
1141 .allow_credentials(false),
1142 )
1143 .with(CatchPanic::new())
1144 .around(request_counter)
1145 .with(Tracing);
1146
1147 Server::new(listener)
1148 .name("slingshot")
1149 .run_with_graceful_shutdown(app, shutdown.cancelled(), None)
1150 .await
1151 .map_err(ServerError::ServerExited)
1152 .inspect(|()| log::info!("server ended. goodbye."))
1153}
1154
1155async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> {
1156 let t0 = std::time::Instant::now();
1157 let method = req.method().to_string();
1158 let path = req.uri().path().to_string();
1159 let res = next.call(req).await?.into_response();
1160 metrics::histogram!(
1161 "server_request",
1162 "endpoint" => format!("{method} {path}"),
1163 "status" => res.status().to_string(),
1164 )
1165 .record(t0.elapsed());
1166 Ok(res)
1167}