This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1use std::sync::Arc; 2use std::time::Duration; 3 4use bobbin_runtime::{HttpRequest, HttpResponseHead, HttpTransport, NetworkError, ReqwestHttp}; 5use bobbin_types::record::RecordBody; 6use bytes::{Bytes, BytesMut}; 7use cid::Cid as IpldCid; 8use futures::TryStreamExt; 9use http::{HeaderMap, StatusCode}; 10use jacquard_common::BosStr; 11use jacquard_common::types::did::Did; 12use jacquard_common::types::ident::AtIdentifier; 13use jacquard_common::types::nsid::Nsid; 14use jacquard_common::types::recordkey::Rkey; 15use jacquard_common::types::string::{AtStrError, AtUri}; 16use serde::Deserialize; 17use serde_json::value::RawValue; 18use thiserror::Error; 19use url::Url; 20 21const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION")); 22const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); 23const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); 24const GET_RECORD_PATH: &str = "xrpc/com.atproto.repo.getRecord"; 25const RESOLVE_MINI_DOC_PATH: &str = "xrpc/com.bad-example.identity.resolveMiniDoc"; 26pub const MAX_BODY_BYTES: u64 = 4 * 1024 * 1024; 27 28#[derive(Clone)] 29pub struct SlingshotClient { 30 http: Arc<dyn HttpTransport>, 31 base: Url, 32} 33 34impl std::fmt::Debug for SlingshotClient { 35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 36 f.debug_struct("SlingshotClient") 37 .field("base", &self.base) 38 .finish_non_exhaustive() 39 } 40} 41 42#[derive(Debug, Error)] 43pub enum SlingshotError { 44 #[error("invalid base url scheme: {0}")] 45 BadScheme(String), 46 #[error("network: {0}")] 47 Network(#[from] NetworkError), 48 #[error("http client build: {0}")] 49 Build(String), 50 #[error("record not found")] 51 NotFound, 52 #[error("upstream returned status {0}")] 53 Upstream(StatusCode), 54 #[error("response body exceeded {limit} bytes")] 55 BodyTooLarge { limit: u64 }, 56 #[error("decode response: {0}")] 57 Decode(#[from] serde_json::Error), 58 #[error("response missing required field: {0}")] 59 MissingField(&'static str), 60 #[error("invalid AT-URI in response: {0}")] 61 InvalidAtUri(#[from] AtStrError), 62 #[error("invalid CID in response: {0}")] 63 InvalidCid(#[from] cid::Error), 64 #[error("upstream returned uri {got}, expected {expected}")] 65 UriMismatch { expected: String, got: String }, 66} 67 68pub fn default_http_client() -> Result<reqwest::Client, reqwest::Error> { 69 reqwest::Client::builder() 70 .user_agent(USER_AGENT) 71 .timeout(REQUEST_TIMEOUT) 72 .connect_timeout(CONNECT_TIMEOUT) 73 .build() 74} 75 76impl SlingshotClient { 77 pub fn new(base: Url, http: Arc<dyn HttpTransport>) -> Result<Self, SlingshotError> { 78 match base.scheme() { 79 "http" | "https" => {} 80 other => return Err(SlingshotError::BadScheme(other.to_owned())), 81 } 82 let base = ensure_trailing_slash(base); 83 Ok(Self { http, base }) 84 } 85 86 pub fn with_default_http(base: Url) -> Result<Self, SlingshotError> { 87 let client = default_http_client().map_err(|e| SlingshotError::Build(e.to_string()))?; 88 Self::new(base, ReqwestHttp::shared(client)) 89 } 90 91 pub async fn resolve_mini_doc<S>( 92 &self, 93 identifier: &AtIdentifier<S>, 94 ) -> Result<Bytes, SlingshotError> 95 where 96 S: BosStr, 97 { 98 let mut url = self.base.join(RESOLVE_MINI_DOC_PATH).expect( 99 "base url is hierarchical and RESOLVE_MINI_DOC_PATH is a literal relative path", 100 ); 101 url.query_pairs_mut() 102 .clear() 103 .append_pair("identifier", identifier.as_str()); 104 105 let resp = self 106 .http 107 .execute(HttpRequest { 108 url, 109 headers: HeaderMap::new(), 110 }) 111 .await?; 112 match resp.status { 113 StatusCode::OK => read_bounded(resp).await, 114 StatusCode::NOT_FOUND => Err(SlingshotError::NotFound), 115 other => Err(SlingshotError::Upstream(other)), 116 } 117 } 118 119 pub async fn get_record<S>( 120 &self, 121 repo: &Did<S>, 122 collection: &Nsid<S>, 123 rkey: &Rkey<S>, 124 ) -> Result<Arc<RecordBody>, SlingshotError> 125 where 126 S: BosStr, 127 { 128 let mut url = self 129 .base 130 .join(GET_RECORD_PATH) 131 .expect("base url is hierarchical and GET_RECORD_PATH is a literal relative path"); 132 url.query_pairs_mut() 133 .clear() 134 .append_pair("repo", repo.as_ref()) 135 .append_pair("collection", collection.as_ref()) 136 .append_pair("rkey", rkey.as_ref()); 137 138 let resp = self 139 .http 140 .execute(HttpRequest { 141 url, 142 headers: HeaderMap::new(), 143 }) 144 .await?; 145 match resp.status { 146 StatusCode::OK => { 147 let bytes = read_bounded(resp).await?; 148 let body = decode(&bytes)?; 149 verify_addresses(&body, repo, collection, rkey)?; 150 Ok(Arc::new(body)) 151 } 152 StatusCode::NOT_FOUND => Err(SlingshotError::NotFound), 153 other => Err(SlingshotError::Upstream(other)), 154 } 155 } 156} 157 158fn ensure_trailing_slash(mut base: Url) -> Url { 159 if !base.path().ends_with('/') { 160 let with_slash = format!("{}/", base.path()); 161 base.set_path(&with_slash); 162 } 163 base 164} 165 166async fn read_bounded(resp: HttpResponseHead) -> Result<Bytes, SlingshotError> { 167 if resp.content_length.is_some_and(|len| len > MAX_BODY_BYTES) { 168 return Err(SlingshotError::BodyTooLarge { 169 limit: MAX_BODY_BYTES, 170 }); 171 } 172 let buf = resp 173 .body 174 .map_err(SlingshotError::Network) 175 .try_fold(BytesMut::new(), |mut acc, chunk| async move { 176 if (acc.len() as u64).saturating_add(chunk.len() as u64) > MAX_BODY_BYTES { 177 return Err(SlingshotError::BodyTooLarge { 178 limit: MAX_BODY_BYTES, 179 }); 180 } 181 acc.extend_from_slice(&chunk); 182 Ok(acc) 183 }) 184 .await?; 185 Ok(buf.freeze()) 186} 187 188fn decode(bytes: &[u8]) -> Result<RecordBody, SlingshotError> { 189 let raw: RawResponse<'_> = serde_json::from_slice(bytes)?; 190 let value = raw.value.ok_or(SlingshotError::MissingField("value"))?; 191 IpldCid::try_from(raw.cid)?; 192 Ok(RecordBody { 193 uri: AtUri::new_owned(raw.uri)?, 194 cid: raw.cid.to_owned().into(), 195 value: Bytes::copy_from_slice(value.get().as_bytes()), 196 }) 197} 198 199fn verify_addresses<S: BosStr>( 200 body: &RecordBody, 201 repo: &Did<S>, 202 collection: &Nsid<S>, 203 rkey: &Rkey<S>, 204) -> Result<(), SlingshotError> { 205 let expected = format!( 206 "at://{}/{}/{}", 207 repo.as_ref(), 208 collection.as_ref(), 209 rkey.as_ref() 210 ); 211 if body.uri.as_ref() == expected { 212 Ok(()) 213 } else { 214 Err(SlingshotError::UriMismatch { 215 expected, 216 got: body.uri.as_ref().to_owned(), 217 }) 218 } 219} 220 221#[derive(Deserialize)] 222struct RawResponse<'a> { 223 uri: &'a str, 224 cid: &'a str, 225 #[serde(default, borrow)] 226 value: Option<&'a RawValue>, 227} 228 229#[cfg(test)] 230mod tests { 231 use super::*; 232 use jacquard_common::DefaultStr; 233 use serde_json::json; 234 use wiremock::matchers::{method, path, query_param}; 235 use wiremock::{Mock, MockServer, ResponseTemplate}; 236 237 const VALID_CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 238 239 fn did(s: &'static str) -> Did<DefaultStr> { 240 Did::new_static(s).unwrap() 241 } 242 fn nsid(s: &'static str) -> Nsid<DefaultStr> { 243 Nsid::new_static(s).unwrap() 244 } 245 fn rkey(s: &str) -> Rkey<DefaultStr> { 246 Rkey::new_owned(s).unwrap() 247 } 248 249 async fn server() -> MockServer { 250 MockServer::start().await 251 } 252 253 fn client_for(server: &MockServer) -> SlingshotClient { 254 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap() 255 } 256 257 #[tokio::test] 258 async fn returns_decoded_record_on_200() { 259 let server = server().await; 260 let body = json!({ 261 "uri": "at://did:plc:abalone/sh.tangled.repo/r1", 262 "cid": VALID_CID, 263 "value": { 264 "$type": "sh.tangled.repo", 265 "knot": "oyster.cafe", 266 "createdAt": "2026-05-01T00:00:00Z" 267 } 268 }); 269 Mock::given(method("GET")) 270 .and(path("/xrpc/com.atproto.repo.getRecord")) 271 .and(query_param("repo", "did:plc:abalone")) 272 .and(query_param("collection", "sh.tangled.repo")) 273 .and(query_param("rkey", "r1")) 274 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 275 .mount(&server) 276 .await; 277 278 let client = client_for(&server); 279 let resp = client 280 .get_record( 281 &did("did:plc:abalone"), 282 &nsid("sh.tangled.repo"), 283 &rkey("r1"), 284 ) 285 .await 286 .unwrap(); 287 assert_eq!(resp.uri.as_ref(), "at://did:plc:abalone/sh.tangled.repo/r1"); 288 let parsed: serde_json::Value = serde_json::from_slice(&resp.value).unwrap(); 289 assert_eq!(parsed["knot"], "oyster.cafe"); 290 } 291 292 #[tokio::test] 293 async fn preserves_value_bytes_byte_for_byte() { 294 let server = server().await; 295 let raw = format!( 296 r#"{{"uri":"at://did:plc:uni/sh.tangled.repo/r1","cid":"{VALID_CID}","value":{{"z":1,"a":2}}}}"# 297 ); 298 Mock::given(method("GET")) 299 .and(path("/xrpc/com.atproto.repo.getRecord")) 300 .respond_with( 301 ResponseTemplate::new(200) 302 .insert_header("content-type", "application/json") 303 .set_body_string(raw), 304 ) 305 .mount(&server) 306 .await; 307 308 let client = client_for(&server); 309 let resp = client 310 .get_record(&did("did:plc:uni"), &nsid("sh.tangled.repo"), &rkey("r1")) 311 .await 312 .unwrap(); 313 assert_eq!(resp.value.as_ref(), br#"{"z":1,"a":2}"#); 314 } 315 316 #[tokio::test] 317 async fn maps_404_to_not_found() { 318 let server = server().await; 319 Mock::given(method("GET")) 320 .and(path("/xrpc/com.atproto.repo.getRecord")) 321 .respond_with(ResponseTemplate::new(404).set_body_string("nope")) 322 .mount(&server) 323 .await; 324 325 let client = client_for(&server); 326 let err = client 327 .get_record( 328 &did("did:plc:abalone"), 329 &nsid("sh.tangled.repo"), 330 &rkey("r1"), 331 ) 332 .await 333 .expect_err("404 must surface"); 334 assert!(matches!(err, SlingshotError::NotFound)); 335 } 336 337 #[tokio::test] 338 async fn maps_400_to_upstream() { 339 let server = server().await; 340 Mock::given(method("GET")) 341 .and(path("/xrpc/com.atproto.repo.getRecord")) 342 .respond_with(ResponseTemplate::new(400).set_body_string("bad")) 343 .mount(&server) 344 .await; 345 346 let client = client_for(&server); 347 let err = client 348 .get_record( 349 &did("did:plc:abalone"), 350 &nsid("sh.tangled.repo"), 351 &rkey("r1"), 352 ) 353 .await 354 .expect_err("400 must surface as upstream, not not-found"); 355 match err { 356 SlingshotError::Upstream(s) => assert_eq!(s.as_u16(), 400), 357 other => panic!("wrong variant: {other:?}"), 358 } 359 } 360 361 #[tokio::test] 362 async fn maps_5xx_to_upstream() { 363 let server = server().await; 364 Mock::given(method("GET")) 365 .and(path("/xrpc/com.atproto.repo.getRecord")) 366 .respond_with(ResponseTemplate::new(503)) 367 .mount(&server) 368 .await; 369 370 let client = client_for(&server); 371 let err = client 372 .get_record( 373 &did("did:plc:abalone"), 374 &nsid("sh.tangled.repo"), 375 &rkey("r1"), 376 ) 377 .await 378 .expect_err("5xx must surface"); 379 match err { 380 SlingshotError::Upstream(s) => assert_eq!(s.as_u16(), 503), 381 other => panic!("wrong variant: {other:?}"), 382 } 383 } 384 385 #[tokio::test] 386 async fn rejects_oversize_body_via_content_length() { 387 let server = server().await; 388 let payload = vec![b'x'; (MAX_BODY_BYTES + 1) as usize]; 389 Mock::given(method("GET")) 390 .and(path("/xrpc/com.atproto.repo.getRecord")) 391 .respond_with( 392 ResponseTemplate::new(200) 393 .insert_header("content-type", "application/json") 394 .set_body_bytes(payload), 395 ) 396 .mount(&server) 397 .await; 398 399 let client = client_for(&server); 400 let err = client 401 .get_record( 402 &did("did:plc:abalone"), 403 &nsid("sh.tangled.repo"), 404 &rkey("r1"), 405 ) 406 .await 407 .expect_err("oversize body must be rejected"); 408 assert!( 409 matches!(err, SlingshotError::BodyTooLarge { limit } if limit == MAX_BODY_BYTES), 410 "wrong variant: {err:?}" 411 ); 412 } 413 414 #[tokio::test] 415 async fn rejects_garbage_cid() { 416 let server = server().await; 417 let body = json!({ 418 "uri": "at://did:plc:abalone/sh.tangled.repo/r1", 419 "cid": "not-a-real-cid", 420 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z"} 421 }); 422 Mock::given(method("GET")) 423 .and(path("/xrpc/com.atproto.repo.getRecord")) 424 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 425 .mount(&server) 426 .await; 427 428 let client = client_for(&server); 429 let err = client 430 .get_record( 431 &did("did:plc:abalone"), 432 &nsid("sh.tangled.repo"), 433 &rkey("r1"), 434 ) 435 .await 436 .expect_err("garbage cid must be rejected"); 437 assert!( 438 matches!(err, SlingshotError::InvalidCid(_)), 439 "wrong variant: {err:?}" 440 ); 441 } 442 443 #[tokio::test] 444 async fn rejects_uri_mismatch() { 445 let server = server().await; 446 let body = json!({ 447 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere", 448 "cid": VALID_CID, 449 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z"} 450 }); 451 Mock::given(method("GET")) 452 .and(path("/xrpc/com.atproto.repo.getRecord")) 453 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 454 .mount(&server) 455 .await; 456 457 let client = client_for(&server); 458 let err = client 459 .get_record( 460 &did("did:plc:abalone"), 461 &nsid("sh.tangled.repo"), 462 &rkey("r1"), 463 ) 464 .await 465 .expect_err("uri mismatch must be rejected"); 466 match err { 467 SlingshotError::UriMismatch { expected, got } => { 468 assert_eq!(expected, "at://did:plc:abalone/sh.tangled.repo/r1"); 469 assert_eq!(got, "at://did:plc:limpet/sh.tangled.repo/elsewhere"); 470 } 471 other => panic!("wrong variant: {other:?}"), 472 } 473 } 474 475 #[tokio::test] 476 async fn rejects_explicit_null_value() { 477 let server = server().await; 478 let body = json!({ 479 "uri": "at://did:plc:abalone/sh.tangled.repo/r1", 480 "cid": VALID_CID, 481 "value": null 482 }); 483 Mock::given(method("GET")) 484 .and(path("/xrpc/com.atproto.repo.getRecord")) 485 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 486 .mount(&server) 487 .await; 488 489 let client = client_for(&server); 490 let err = client 491 .get_record( 492 &did("did:plc:abalone"), 493 &nsid("sh.tangled.repo"), 494 &rkey("r1"), 495 ) 496 .await 497 .expect_err("explicit null value must be rejected"); 498 assert!( 499 matches!(err, SlingshotError::MissingField("value")), 500 "{err:?}" 501 ); 502 } 503 504 #[tokio::test] 505 async fn rejects_missing_value_field() { 506 let server = server().await; 507 let body = json!({ 508 "uri": "at://did:plc:abalone/sh.tangled.repo/r1", 509 "cid": VALID_CID 510 }); 511 Mock::given(method("GET")) 512 .and(path("/xrpc/com.atproto.repo.getRecord")) 513 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 514 .mount(&server) 515 .await; 516 517 let client = client_for(&server); 518 let err = client 519 .get_record( 520 &did("did:plc:abalone"), 521 &nsid("sh.tangled.repo"), 522 &rkey("r1"), 523 ) 524 .await 525 .expect_err("missing value must be rejected"); 526 assert!( 527 matches!(err, SlingshotError::MissingField("value")), 528 "{err:?}" 529 ); 530 } 531 532 #[test] 533 fn rejects_non_http_scheme() { 534 let err = SlingshotClient::with_default_http(Url::parse("ftp://nel.pet").unwrap()) 535 .expect_err("ftp bad"); 536 assert!(matches!(err, SlingshotError::BadScheme(s) if s == "ftp")); 537 } 538 539 #[tokio::test] 540 async fn preserves_operator_configured_base_path() { 541 let server = server().await; 542 let body = json!({ 543 "uri": "at://did:plc:abalone/sh.tangled.repo/r1", 544 "cid": VALID_CID, 545 "value": { 546 "$type": "sh.tangled.repo", 547 "knot": "oyster.cafe", 548 "createdAt": "2026-05-01T00:00:00Z" 549 } 550 }); 551 Mock::given(method("GET")) 552 .and(path("/api/v0/xrpc/com.atproto.repo.getRecord")) 553 .and(query_param("repo", "did:plc:abalone")) 554 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 555 .mount(&server) 556 .await; 557 558 let base = Url::parse(&format!("{}/api/v0", server.uri())).unwrap(); 559 let client = SlingshotClient::with_default_http(base).unwrap(); 560 client 561 .get_record( 562 &did("did:plc:abalone"), 563 &nsid("sh.tangled.repo"), 564 &rkey("r1"), 565 ) 566 .await 567 .expect("path-prefixed base must reach prefixed xrpc endpoint"); 568 } 569 570 #[tokio::test] 571 async fn accepts_borrowed_string_newtypes() { 572 let server = server().await; 573 let body = json!({ 574 "uri": "at://did:plc:limpet/sh.tangled.repo/r1", 575 "cid": VALID_CID, 576 "value": { 577 "$type": "sh.tangled.repo", 578 "knot": "nel.pet", 579 "createdAt": "2026-05-01T00:00:00Z" 580 } 581 }); 582 Mock::given(method("GET")) 583 .and(path("/xrpc/com.atproto.repo.getRecord")) 584 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 585 .mount(&server) 586 .await; 587 588 let uri = AtUri::<DefaultStr>::new_owned("at://did:plc:limpet/sh.tangled.repo/r1").unwrap(); 589 let collection = uri.collection().unwrap(); 590 let rkey = uri.rkey().unwrap(); 591 let did_borrow = match uri.authority() { 592 jacquard_common::types::ident::AtIdentifier::Did(d) => d, 593 jacquard_common::types::ident::AtIdentifier::Handle(_) => unreachable!(), 594 }; 595 596 let client = client_for(&server); 597 let resp = client 598 .get_record(&did_borrow, &collection, &rkey) 599 .await 600 .unwrap(); 601 assert_eq!(resp.uri.as_ref(), "at://did:plc:limpet/sh.tangled.repo/r1"); 602 } 603}