This repository has no description
1use std::sync::Arc;
2use std::time::Duration;
3
4use bobbin_runtime::{HttpRequest, HttpResponseHead, HttpTransport, NetworkError, ReqwestHttp};
5use bobbin_types::record::RecordBody;
6use bytes::{Bytes, BytesMut};
7use cid::Cid as IpldCid;
8use futures::TryStreamExt;
9use http::{HeaderMap, StatusCode};
10use jacquard_common::BosStr;
11use jacquard_common::types::did::Did;
12use jacquard_common::types::ident::AtIdentifier;
13use jacquard_common::types::nsid::Nsid;
14use jacquard_common::types::recordkey::Rkey;
15use jacquard_common::types::string::{AtStrError, AtUri};
16use serde::Deserialize;
17use serde_json::value::RawValue;
18use thiserror::Error;
19use url::Url;
20
21const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION"));
22const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
23const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
24const GET_RECORD_PATH: &str = "xrpc/com.atproto.repo.getRecord";
25const RESOLVE_MINI_DOC_PATH: &str = "xrpc/com.bad-example.identity.resolveMiniDoc";
26pub const MAX_BODY_BYTES: u64 = 4 * 1024 * 1024;
27
28#[derive(Clone)]
29pub struct SlingshotClient {
30 http: Arc<dyn HttpTransport>,
31 base: Url,
32}
33
34impl std::fmt::Debug for SlingshotClient {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("SlingshotClient")
37 .field("base", &self.base)
38 .finish_non_exhaustive()
39 }
40}
41
42#[derive(Debug, Error)]
43pub enum SlingshotError {
44 #[error("invalid base url scheme: {0}")]
45 BadScheme(String),
46 #[error("network: {0}")]
47 Network(#[from] NetworkError),
48 #[error("http client build: {0}")]
49 Build(String),
50 #[error("record not found")]
51 NotFound,
52 #[error("upstream returned status {0}")]
53 Upstream(StatusCode),
54 #[error("response body exceeded {limit} bytes")]
55 BodyTooLarge { limit: u64 },
56 #[error("decode response: {0}")]
57 Decode(#[from] serde_json::Error),
58 #[error("response missing required field: {0}")]
59 MissingField(&'static str),
60 #[error("invalid AT-URI in response: {0}")]
61 InvalidAtUri(#[from] AtStrError),
62 #[error("invalid CID in response: {0}")]
63 InvalidCid(#[from] cid::Error),
64 #[error("upstream returned uri {got}, expected {expected}")]
65 UriMismatch { expected: String, got: String },
66}
67
68pub fn default_http_client() -> Result<reqwest::Client, reqwest::Error> {
69 reqwest::Client::builder()
70 .user_agent(USER_AGENT)
71 .timeout(REQUEST_TIMEOUT)
72 .connect_timeout(CONNECT_TIMEOUT)
73 .build()
74}
75
76impl SlingshotClient {
77 pub fn new(base: Url, http: Arc<dyn HttpTransport>) -> Result<Self, SlingshotError> {
78 match base.scheme() {
79 "http" | "https" => {}
80 other => return Err(SlingshotError::BadScheme(other.to_owned())),
81 }
82 let base = ensure_trailing_slash(base);
83 Ok(Self { http, base })
84 }
85
86 pub fn with_default_http(base: Url) -> Result<Self, SlingshotError> {
87 let client = default_http_client().map_err(|e| SlingshotError::Build(e.to_string()))?;
88 Self::new(base, ReqwestHttp::shared(client))
89 }
90
91 pub async fn resolve_mini_doc<S>(
92 &self,
93 identifier: &AtIdentifier<S>,
94 ) -> Result<Bytes, SlingshotError>
95 where
96 S: BosStr,
97 {
98 let mut url = self.base.join(RESOLVE_MINI_DOC_PATH).expect(
99 "base url is hierarchical and RESOLVE_MINI_DOC_PATH is a literal relative path",
100 );
101 url.query_pairs_mut()
102 .clear()
103 .append_pair("identifier", identifier.as_str());
104
105 let resp = self
106 .http
107 .execute(HttpRequest {
108 url,
109 headers: HeaderMap::new(),
110 })
111 .await?;
112 match resp.status {
113 StatusCode::OK => read_bounded(resp).await,
114 StatusCode::NOT_FOUND => Err(SlingshotError::NotFound),
115 other => Err(SlingshotError::Upstream(other)),
116 }
117 }
118
119 pub async fn get_record<S>(
120 &self,
121 repo: &Did<S>,
122 collection: &Nsid<S>,
123 rkey: &Rkey<S>,
124 ) -> Result<Arc<RecordBody>, SlingshotError>
125 where
126 S: BosStr,
127 {
128 let mut url = self
129 .base
130 .join(GET_RECORD_PATH)
131 .expect("base url is hierarchical and GET_RECORD_PATH is a literal relative path");
132 url.query_pairs_mut()
133 .clear()
134 .append_pair("repo", repo.as_ref())
135 .append_pair("collection", collection.as_ref())
136 .append_pair("rkey", rkey.as_ref());
137
138 let resp = self
139 .http
140 .execute(HttpRequest {
141 url,
142 headers: HeaderMap::new(),
143 })
144 .await?;
145 match resp.status {
146 StatusCode::OK => {
147 let bytes = read_bounded(resp).await?;
148 let body = decode(&bytes)?;
149 verify_addresses(&body, repo, collection, rkey)?;
150 Ok(Arc::new(body))
151 }
152 StatusCode::NOT_FOUND => Err(SlingshotError::NotFound),
153 other => Err(SlingshotError::Upstream(other)),
154 }
155 }
156}
157
158fn ensure_trailing_slash(mut base: Url) -> Url {
159 if !base.path().ends_with('/') {
160 let with_slash = format!("{}/", base.path());
161 base.set_path(&with_slash);
162 }
163 base
164}
165
166async fn read_bounded(resp: HttpResponseHead) -> Result<Bytes, SlingshotError> {
167 if resp.content_length.is_some_and(|len| len > MAX_BODY_BYTES) {
168 return Err(SlingshotError::BodyTooLarge {
169 limit: MAX_BODY_BYTES,
170 });
171 }
172 let buf = resp
173 .body
174 .map_err(SlingshotError::Network)
175 .try_fold(BytesMut::new(), |mut acc, chunk| async move {
176 if (acc.len() as u64).saturating_add(chunk.len() as u64) > MAX_BODY_BYTES {
177 return Err(SlingshotError::BodyTooLarge {
178 limit: MAX_BODY_BYTES,
179 });
180 }
181 acc.extend_from_slice(&chunk);
182 Ok(acc)
183 })
184 .await?;
185 Ok(buf.freeze())
186}
187
188fn decode(bytes: &[u8]) -> Result<RecordBody, SlingshotError> {
189 let raw: RawResponse<'_> = serde_json::from_slice(bytes)?;
190 let value = raw.value.ok_or(SlingshotError::MissingField("value"))?;
191 IpldCid::try_from(raw.cid)?;
192 Ok(RecordBody {
193 uri: AtUri::new_owned(raw.uri)?,
194 cid: raw.cid.to_owned().into(),
195 value: Bytes::copy_from_slice(value.get().as_bytes()),
196 })
197}
198
199fn verify_addresses<S: BosStr>(
200 body: &RecordBody,
201 repo: &Did<S>,
202 collection: &Nsid<S>,
203 rkey: &Rkey<S>,
204) -> Result<(), SlingshotError> {
205 let expected = format!(
206 "at://{}/{}/{}",
207 repo.as_ref(),
208 collection.as_ref(),
209 rkey.as_ref()
210 );
211 if body.uri.as_ref() == expected {
212 Ok(())
213 } else {
214 Err(SlingshotError::UriMismatch {
215 expected,
216 got: body.uri.as_ref().to_owned(),
217 })
218 }
219}
220
221#[derive(Deserialize)]
222struct RawResponse<'a> {
223 uri: &'a str,
224 cid: &'a str,
225 #[serde(default, borrow)]
226 value: Option<&'a RawValue>,
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use jacquard_common::DefaultStr;
233 use serde_json::json;
234 use wiremock::matchers::{method, path, query_param};
235 use wiremock::{Mock, MockServer, ResponseTemplate};
236
237 const VALID_CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
238
239 fn did(s: &'static str) -> Did<DefaultStr> {
240 Did::new_static(s).unwrap()
241 }
242 fn nsid(s: &'static str) -> Nsid<DefaultStr> {
243 Nsid::new_static(s).unwrap()
244 }
245 fn rkey(s: &str) -> Rkey<DefaultStr> {
246 Rkey::new_owned(s).unwrap()
247 }
248
249 async fn server() -> MockServer {
250 MockServer::start().await
251 }
252
253 fn client_for(server: &MockServer) -> SlingshotClient {
254 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap()
255 }
256
257 #[tokio::test]
258 async fn returns_decoded_record_on_200() {
259 let server = server().await;
260 let body = json!({
261 "uri": "at://did:plc:abalone/sh.tangled.repo/r1",
262 "cid": VALID_CID,
263 "value": {
264 "$type": "sh.tangled.repo",
265 "knot": "oyster.cafe",
266 "createdAt": "2026-05-01T00:00:00Z"
267 }
268 });
269 Mock::given(method("GET"))
270 .and(path("/xrpc/com.atproto.repo.getRecord"))
271 .and(query_param("repo", "did:plc:abalone"))
272 .and(query_param("collection", "sh.tangled.repo"))
273 .and(query_param("rkey", "r1"))
274 .respond_with(ResponseTemplate::new(200).set_body_json(body))
275 .mount(&server)
276 .await;
277
278 let client = client_for(&server);
279 let resp = client
280 .get_record(
281 &did("did:plc:abalone"),
282 &nsid("sh.tangled.repo"),
283 &rkey("r1"),
284 )
285 .await
286 .unwrap();
287 assert_eq!(resp.uri.as_ref(), "at://did:plc:abalone/sh.tangled.repo/r1");
288 let parsed: serde_json::Value = serde_json::from_slice(&resp.value).unwrap();
289 assert_eq!(parsed["knot"], "oyster.cafe");
290 }
291
292 #[tokio::test]
293 async fn preserves_value_bytes_byte_for_byte() {
294 let server = server().await;
295 let raw = format!(
296 r#"{{"uri":"at://did:plc:uni/sh.tangled.repo/r1","cid":"{VALID_CID}","value":{{"z":1,"a":2}}}}"#
297 );
298 Mock::given(method("GET"))
299 .and(path("/xrpc/com.atproto.repo.getRecord"))
300 .respond_with(
301 ResponseTemplate::new(200)
302 .insert_header("content-type", "application/json")
303 .set_body_string(raw),
304 )
305 .mount(&server)
306 .await;
307
308 let client = client_for(&server);
309 let resp = client
310 .get_record(&did("did:plc:uni"), &nsid("sh.tangled.repo"), &rkey("r1"))
311 .await
312 .unwrap();
313 assert_eq!(resp.value.as_ref(), br#"{"z":1,"a":2}"#);
314 }
315
316 #[tokio::test]
317 async fn maps_404_to_not_found() {
318 let server = server().await;
319 Mock::given(method("GET"))
320 .and(path("/xrpc/com.atproto.repo.getRecord"))
321 .respond_with(ResponseTemplate::new(404).set_body_string("nope"))
322 .mount(&server)
323 .await;
324
325 let client = client_for(&server);
326 let err = client
327 .get_record(
328 &did("did:plc:abalone"),
329 &nsid("sh.tangled.repo"),
330 &rkey("r1"),
331 )
332 .await
333 .expect_err("404 must surface");
334 assert!(matches!(err, SlingshotError::NotFound));
335 }
336
337 #[tokio::test]
338 async fn maps_400_to_upstream() {
339 let server = server().await;
340 Mock::given(method("GET"))
341 .and(path("/xrpc/com.atproto.repo.getRecord"))
342 .respond_with(ResponseTemplate::new(400).set_body_string("bad"))
343 .mount(&server)
344 .await;
345
346 let client = client_for(&server);
347 let err = client
348 .get_record(
349 &did("did:plc:abalone"),
350 &nsid("sh.tangled.repo"),
351 &rkey("r1"),
352 )
353 .await
354 .expect_err("400 must surface as upstream, not not-found");
355 match err {
356 SlingshotError::Upstream(s) => assert_eq!(s.as_u16(), 400),
357 other => panic!("wrong variant: {other:?}"),
358 }
359 }
360
361 #[tokio::test]
362 async fn maps_5xx_to_upstream() {
363 let server = server().await;
364 Mock::given(method("GET"))
365 .and(path("/xrpc/com.atproto.repo.getRecord"))
366 .respond_with(ResponseTemplate::new(503))
367 .mount(&server)
368 .await;
369
370 let client = client_for(&server);
371 let err = client
372 .get_record(
373 &did("did:plc:abalone"),
374 &nsid("sh.tangled.repo"),
375 &rkey("r1"),
376 )
377 .await
378 .expect_err("5xx must surface");
379 match err {
380 SlingshotError::Upstream(s) => assert_eq!(s.as_u16(), 503),
381 other => panic!("wrong variant: {other:?}"),
382 }
383 }
384
385 #[tokio::test]
386 async fn rejects_oversize_body_via_content_length() {
387 let server = server().await;
388 let payload = vec![b'x'; (MAX_BODY_BYTES + 1) as usize];
389 Mock::given(method("GET"))
390 .and(path("/xrpc/com.atproto.repo.getRecord"))
391 .respond_with(
392 ResponseTemplate::new(200)
393 .insert_header("content-type", "application/json")
394 .set_body_bytes(payload),
395 )
396 .mount(&server)
397 .await;
398
399 let client = client_for(&server);
400 let err = client
401 .get_record(
402 &did("did:plc:abalone"),
403 &nsid("sh.tangled.repo"),
404 &rkey("r1"),
405 )
406 .await
407 .expect_err("oversize body must be rejected");
408 assert!(
409 matches!(err, SlingshotError::BodyTooLarge { limit } if limit == MAX_BODY_BYTES),
410 "wrong variant: {err:?}"
411 );
412 }
413
414 #[tokio::test]
415 async fn rejects_garbage_cid() {
416 let server = server().await;
417 let body = json!({
418 "uri": "at://did:plc:abalone/sh.tangled.repo/r1",
419 "cid": "not-a-real-cid",
420 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z"}
421 });
422 Mock::given(method("GET"))
423 .and(path("/xrpc/com.atproto.repo.getRecord"))
424 .respond_with(ResponseTemplate::new(200).set_body_json(body))
425 .mount(&server)
426 .await;
427
428 let client = client_for(&server);
429 let err = client
430 .get_record(
431 &did("did:plc:abalone"),
432 &nsid("sh.tangled.repo"),
433 &rkey("r1"),
434 )
435 .await
436 .expect_err("garbage cid must be rejected");
437 assert!(
438 matches!(err, SlingshotError::InvalidCid(_)),
439 "wrong variant: {err:?}"
440 );
441 }
442
443 #[tokio::test]
444 async fn rejects_uri_mismatch() {
445 let server = server().await;
446 let body = json!({
447 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere",
448 "cid": VALID_CID,
449 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z"}
450 });
451 Mock::given(method("GET"))
452 .and(path("/xrpc/com.atproto.repo.getRecord"))
453 .respond_with(ResponseTemplate::new(200).set_body_json(body))
454 .mount(&server)
455 .await;
456
457 let client = client_for(&server);
458 let err = client
459 .get_record(
460 &did("did:plc:abalone"),
461 &nsid("sh.tangled.repo"),
462 &rkey("r1"),
463 )
464 .await
465 .expect_err("uri mismatch must be rejected");
466 match err {
467 SlingshotError::UriMismatch { expected, got } => {
468 assert_eq!(expected, "at://did:plc:abalone/sh.tangled.repo/r1");
469 assert_eq!(got, "at://did:plc:limpet/sh.tangled.repo/elsewhere");
470 }
471 other => panic!("wrong variant: {other:?}"),
472 }
473 }
474
475 #[tokio::test]
476 async fn rejects_explicit_null_value() {
477 let server = server().await;
478 let body = json!({
479 "uri": "at://did:plc:abalone/sh.tangled.repo/r1",
480 "cid": VALID_CID,
481 "value": null
482 });
483 Mock::given(method("GET"))
484 .and(path("/xrpc/com.atproto.repo.getRecord"))
485 .respond_with(ResponseTemplate::new(200).set_body_json(body))
486 .mount(&server)
487 .await;
488
489 let client = client_for(&server);
490 let err = client
491 .get_record(
492 &did("did:plc:abalone"),
493 &nsid("sh.tangled.repo"),
494 &rkey("r1"),
495 )
496 .await
497 .expect_err("explicit null value must be rejected");
498 assert!(
499 matches!(err, SlingshotError::MissingField("value")),
500 "{err:?}"
501 );
502 }
503
504 #[tokio::test]
505 async fn rejects_missing_value_field() {
506 let server = server().await;
507 let body = json!({
508 "uri": "at://did:plc:abalone/sh.tangled.repo/r1",
509 "cid": VALID_CID
510 });
511 Mock::given(method("GET"))
512 .and(path("/xrpc/com.atproto.repo.getRecord"))
513 .respond_with(ResponseTemplate::new(200).set_body_json(body))
514 .mount(&server)
515 .await;
516
517 let client = client_for(&server);
518 let err = client
519 .get_record(
520 &did("did:plc:abalone"),
521 &nsid("sh.tangled.repo"),
522 &rkey("r1"),
523 )
524 .await
525 .expect_err("missing value must be rejected");
526 assert!(
527 matches!(err, SlingshotError::MissingField("value")),
528 "{err:?}"
529 );
530 }
531
532 #[test]
533 fn rejects_non_http_scheme() {
534 let err = SlingshotClient::with_default_http(Url::parse("ftp://nel.pet").unwrap())
535 .expect_err("ftp bad");
536 assert!(matches!(err, SlingshotError::BadScheme(s) if s == "ftp"));
537 }
538
539 #[tokio::test]
540 async fn preserves_operator_configured_base_path() {
541 let server = server().await;
542 let body = json!({
543 "uri": "at://did:plc:abalone/sh.tangled.repo/r1",
544 "cid": VALID_CID,
545 "value": {
546 "$type": "sh.tangled.repo",
547 "knot": "oyster.cafe",
548 "createdAt": "2026-05-01T00:00:00Z"
549 }
550 });
551 Mock::given(method("GET"))
552 .and(path("/api/v0/xrpc/com.atproto.repo.getRecord"))
553 .and(query_param("repo", "did:plc:abalone"))
554 .respond_with(ResponseTemplate::new(200).set_body_json(body))
555 .mount(&server)
556 .await;
557
558 let base = Url::parse(&format!("{}/api/v0", server.uri())).unwrap();
559 let client = SlingshotClient::with_default_http(base).unwrap();
560 client
561 .get_record(
562 &did("did:plc:abalone"),
563 &nsid("sh.tangled.repo"),
564 &rkey("r1"),
565 )
566 .await
567 .expect("path-prefixed base must reach prefixed xrpc endpoint");
568 }
569
570 #[tokio::test]
571 async fn accepts_borrowed_string_newtypes() {
572 let server = server().await;
573 let body = json!({
574 "uri": "at://did:plc:limpet/sh.tangled.repo/r1",
575 "cid": VALID_CID,
576 "value": {
577 "$type": "sh.tangled.repo",
578 "knot": "nel.pet",
579 "createdAt": "2026-05-01T00:00:00Z"
580 }
581 });
582 Mock::given(method("GET"))
583 .and(path("/xrpc/com.atproto.repo.getRecord"))
584 .respond_with(ResponseTemplate::new(200).set_body_json(body))
585 .mount(&server)
586 .await;
587
588 let uri = AtUri::<DefaultStr>::new_owned("at://did:plc:limpet/sh.tangled.repo/r1").unwrap();
589 let collection = uri.collection().unwrap();
590 let rkey = uri.rkey().unwrap();
591 let did_borrow = match uri.authority() {
592 jacquard_common::types::ident::AtIdentifier::Did(d) => d,
593 jacquard_common::types::ident::AtIdentifier::Handle(_) => unreachable!(),
594 };
595
596 let client = client_for(&server);
597 let resp = client
598 .get_record(&did_borrow, &collection, &rkey)
599 .await
600 .unwrap();
601 assert_eq!(resp.uri.as_ref(), "at://did:plc:limpet/sh.tangled.repo/r1");
602 }
603}