This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_xrpc::{AppState, router}; 12use futures::stream::{self, StreamExt}; 13use http::{Request, StatusCode}; 14use jacquard_common::DefaultStr; 15use jacquard_common::types::did::Did; 16use jacquard_common::types::nsid::Nsid; 17use jacquard_common::types::recordkey::Rkey; 18use serde_json::{Value, json}; 19use tower::ServiceExt; 20use url::Url; 21use url::form_urlencoded::byte_serialize; 22use wiremock::matchers::{method, path, query_param}; 23use wiremock::{Mock, MockServer, ResponseTemplate}; 24 25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 26 27fn did(s: &str) -> Did<DefaultStr> { 28 Did::new_owned(s).unwrap() 29} 30 31fn rkey(s: &str) -> Rkey<DefaultStr> { 32 Rkey::new_owned(s).unwrap() 33} 34 35fn nsid(s: &'static str) -> Nsid<DefaultStr> { 36 Nsid::new_static(s).unwrap() 37} 38 39async fn fresh_app(server_uri: &Url) -> AppState { 40 AppState::new( 41 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 42 SlingshotClient::with_default_http(server_uri.clone()).unwrap(), 43 Arc::new(EdgeStore::new(RuntimeHasher::default())), 44 Arc::new(StateIndex::new(RuntimeHasher::default())), 45 Arc::new(StateIndex::new(RuntimeHasher::default())), 46 Arc::new(CoverageWatch::new()), 47 Arc::new( 48 KnotProxy::new( 49 KnotProxyConfig::default(), 50 KnotHttpConfig::default(), 51 Arc::new(SystemClock::new()), 52 RuntimeHasher::default(), 53 ) 54 .unwrap(), 55 ), 56 Arc::new(SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap()) 57 as Arc<dyn SearchReader>, 58 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 59 ) 60} 61 62async fn mount_record( 63 server: &MockServer, 64 did: &Did<DefaultStr>, 65 collection: &Nsid<DefaultStr>, 66 rkey: &Rkey<DefaultStr>, 67 value: Value, 68) { 69 let uri = format!( 70 "at://{}/{}/{}", 71 did.as_ref(), 72 collection.as_ref(), 73 rkey.as_ref() 74 ); 75 let body = json!({ "uri": uri, "cid": CID, "value": value }); 76 Mock::given(method("GET")) 77 .and(path("/xrpc/com.atproto.repo.getRecord")) 78 .and(query_param("repo", did.as_ref())) 79 .and(query_param("collection", collection.as_ref())) 80 .and(query_param("rkey", rkey.as_ref())) 81 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 82 .mount(server) 83 .await; 84} 85 86fn xrpc_request(endpoint: &str, param: &str, value: &str) -> Request<Body> { 87 let encoded: String = byte_serialize(value.as_bytes()).collect(); 88 Request::builder() 89 .uri(format!("/xrpc/{endpoint}?{param}={encoded}")) 90 .body(Body::empty()) 91 .unwrap() 92} 93 94async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 95 let status = resp.status(); 96 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 97 let parsed: Value = serde_json::from_slice(&bytes).expect("response is JSON"); 98 (status, parsed) 99} 100 101#[tokio::test] 102async fn cold_start_serves_all_four_point_lookups() { 103 let server = MockServer::start().await; 104 let clam = did("did:plc:clam"); 105 106 mount_record( 107 &server, 108 &clam, 109 &nsid("sh.tangled.repo"), 110 &rkey("r1"), 111 json!({ 112 "$type": "sh.tangled.repo", 113 "name": "clam", 114 "knot": "oyster.cafe", 115 "createdAt": "2026-05-01T00:00:00Z" 116 }), 117 ) 118 .await; 119 120 mount_record( 121 &server, 122 &clam, 123 &nsid("sh.tangled.actor.profile"), 124 &rkey("self"), 125 json!({ 126 "$type": "sh.tangled.actor.profile", 127 "bluesky": false, 128 "description": "clam shell" 129 }), 130 ) 131 .await; 132 133 mount_record( 134 &server, 135 &clam, 136 &nsid("sh.tangled.repo.issue"), 137 &rkey("i1"), 138 json!({ 139 "$type": "sh.tangled.repo.issue", 140 "repo": "did:plc:limpet", 141 "title": "broken", 142 "createdAt": "2026-05-01T00:00:00Z" 143 }), 144 ) 145 .await; 146 147 mount_record( 148 &server, 149 &clam, 150 &nsid("sh.tangled.repo.pull"), 151 &rkey("p1"), 152 json!({ 153 "$type": "sh.tangled.repo.pull", 154 "title": "ship", 155 "createdAt": "2026-05-01T00:00:00Z", 156 "rounds": [], 157 "target": {"repo": "did:plc:limpet", "branch": "main"} 158 }), 159 ) 160 .await; 161 162 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 163 let app = router(state); 164 165 let cases = [ 166 ( 167 "sh.tangled.repo.getRepo", 168 "repo", 169 format!("at://{}/sh.tangled.repo/r1", clam.as_ref()), 170 "knot", 171 json!("oyster.cafe"), 172 ), 173 ( 174 "sh.tangled.actor.getProfile", 175 "actor", 176 format!("at://{}/sh.tangled.actor.profile/self", clam.as_ref()), 177 "description", 178 json!("clam shell"), 179 ), 180 ( 181 "sh.tangled.repo.getIssue", 182 "issue", 183 format!("at://{}/sh.tangled.repo.issue/i1", clam.as_ref()), 184 "title", 185 json!("broken"), 186 ), 187 ( 188 "sh.tangled.repo.getPull", 189 "pull", 190 format!("at://{}/sh.tangled.repo.pull/p1", clam.as_ref()), 191 "title", 192 json!("ship"), 193 ), 194 ]; 195 196 stream::iter(cases) 197 .for_each(|(endpoint, param, at_uri, field, expected)| { 198 let app = app.clone(); 199 async move { 200 let resp = app 201 .oneshot(xrpc_request(endpoint, param, &at_uri)) 202 .await 203 .unwrap(); 204 let (status, body) = json_response(resp).await; 205 assert_eq!(status, StatusCode::OK, "{endpoint} status"); 206 assert_eq!(body["uri"], at_uri, "{endpoint} uri"); 207 assert_eq!(body["cid"], CID, "{endpoint} cid"); 208 assert_eq!( 209 body["value"][field], expected, 210 "{endpoint} body field {field}" 211 ); 212 } 213 }) 214 .await; 215} 216 217#[tokio::test] 218async fn second_call_is_served_from_lru() { 219 let server = MockServer::start().await; 220 let uni = did("did:plc:uni"); 221 let mock = Mock::given(method("GET")) 222 .and(path("/xrpc/com.atproto.repo.getRecord")) 223 .and(query_param("repo", uni.as_ref())) 224 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 225 "uri": format!("at://{}/sh.tangled.repo/r1", uni.as_ref()), 226 "cid": CID, 227 "value": { 228 "$type": "sh.tangled.repo", 229 "name": "uni", 230 "knot": "witchcraft.systems", 231 "createdAt": "2026-05-01T00:00:00Z" 232 } 233 }))) 234 .expect(1) 235 .mount_as_scoped(&server) 236 .await; 237 238 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 239 let app = router(state); 240 let req_uri = format!("at://{}/sh.tangled.repo/r1", uni.as_ref()); 241 242 stream::iter(0..3) 243 .for_each(|_| { 244 let app = app.clone(); 245 let req_uri = req_uri.clone(); 246 async move { 247 let resp = app 248 .oneshot(xrpc_request("sh.tangled.repo.getRepo", "repo", &req_uri)) 249 .await 250 .unwrap(); 251 assert_eq!(resp.status(), StatusCode::OK); 252 } 253 }) 254 .await; 255 256 drop(mock); 257} 258 259#[tokio::test] 260async fn collection_mismatch_is_400() { 261 let server = MockServer::start().await; 262 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 263 let app = router(state); 264 let resp = app 265 .oneshot(xrpc_request( 266 "sh.tangled.repo.getRepo", 267 "repo", 268 "at://did:plc:clam/sh.tangled.actor.profile/self", 269 )) 270 .await 271 .unwrap(); 272 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 273} 274 275#[tokio::test] 276async fn handle_authority_is_400() { 277 let server = MockServer::start().await; 278 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 279 let app = router(state); 280 let resp = app 281 .oneshot(xrpc_request( 282 "sh.tangled.repo.getRepo", 283 "repo", 284 "at://witchcraft.systems/sh.tangled.repo/r1", 285 )) 286 .await 287 .unwrap(); 288 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 289} 290 291#[tokio::test] 292async fn slingshot_404_propagates_as_404() { 293 let server = MockServer::start().await; 294 Mock::given(method("GET")) 295 .and(path("/xrpc/com.atproto.repo.getRecord")) 296 .respond_with(ResponseTemplate::new(404)) 297 .mount(&server) 298 .await; 299 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 300 let app = router(state); 301 let resp = app 302 .oneshot(xrpc_request( 303 "sh.tangled.repo.getRepo", 304 "repo", 305 "at://did:plc:clam/sh.tangled.repo/missing", 306 )) 307 .await 308 .unwrap(); 309 assert_eq!(resp.status(), StatusCode::NOT_FOUND); 310} 311 312#[tokio::test] 313async fn wrong_record_type_is_502() { 314 let server = MockServer::start().await; 315 mount_record( 316 &server, 317 &did("did:plc:clam"), 318 &nsid("sh.tangled.repo"), 319 &rkey("r1"), 320 json!({ 321 "$type": "sh.tangled.knot", 322 "knot": "oyster.cafe", 323 "createdAt": "2026-05-01T00:00:00Z" 324 }), 325 ) 326 .await; 327 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 328 let app = router(state); 329 let resp = app 330 .oneshot(xrpc_request( 331 "sh.tangled.repo.getRepo", 332 "repo", 333 "at://did:plc:clam/sh.tangled.repo/r1", 334 )) 335 .await 336 .unwrap(); 337 let (status, body) = json_response(resp).await; 338 assert_eq!(status, StatusCode::BAD_GATEWAY); 339 assert_eq!(body["error"], "InvalidRecord"); 340} 341 342#[tokio::test] 343async fn wrong_type_does_not_poison_cache() { 344 let server = MockServer::start().await; 345 let mock = Mock::given(method("GET")) 346 .and(path("/xrpc/com.atproto.repo.getRecord")) 347 .and(query_param("repo", "did:plc:clam")) 348 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 349 "uri": "at://did:plc:clam/sh.tangled.repo/r1", 350 "cid": CID, 351 "value": { 352 "$type": "sh.tangled.knot", 353 "knot": "oyster.cafe", 354 "createdAt": "2026-05-01T00:00:00Z" 355 } 356 }))) 357 .expect(2) 358 .mount_as_scoped(&server) 359 .await; 360 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 361 let app = router(state); 362 let req = || { 363 xrpc_request( 364 "sh.tangled.repo.getRepo", 365 "repo", 366 "at://did:plc:clam/sh.tangled.repo/r1", 367 ) 368 }; 369 let first = app.clone().oneshot(req()).await.unwrap(); 370 assert_eq!(first.status(), StatusCode::BAD_GATEWAY); 371 let second = app.clone().oneshot(req()).await.unwrap(); 372 assert_eq!(second.status(), StatusCode::BAD_GATEWAY); 373 drop(mock); 374} 375 376#[tokio::test] 377async fn profile_with_empty_preferred_handle_is_tolerated() { 378 let server = MockServer::start().await; 379 let nel = did("did:plc:nel"); 380 mount_record( 381 &server, 382 &nel, 383 &nsid("sh.tangled.actor.profile"), 384 &rkey("self"), 385 json!({ 386 "$type": "sh.tangled.actor.profile", 387 "bluesky": true, 388 "preferredHandle": "", 389 "description": "empty handle, valid profile" 390 }), 391 ) 392 .await; 393 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 394 let app = router(state); 395 let at_uri = format!("at://{}/sh.tangled.actor.profile/self", nel.as_ref()); 396 let resp = app 397 .oneshot(xrpc_request( 398 "sh.tangled.actor.getProfile", 399 "actor", 400 &at_uri, 401 )) 402 .await 403 .unwrap(); 404 let (status, body) = json_response(resp).await; 405 assert_eq!(status, StatusCode::OK, "status: {body}"); 406 assert_eq!(body["uri"], at_uri); 407 assert_eq!(body["value"]["description"], "empty handle, valid profile"); 408 assert!(body["value"]["preferredHandle"].is_null()); 409} 410 411#[tokio::test] 412async fn missing_uri_param_returns_json_envelope() { 413 let server = MockServer::start().await; 414 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 415 let app = router(state); 416 let req = Request::builder() 417 .uri("/xrpc/sh.tangled.repo.getRepo") 418 .body(Body::empty()) 419 .unwrap(); 420 let resp = app.oneshot(req).await.unwrap(); 421 let (status, body) = json_response(resp).await; 422 assert_eq!(status, StatusCode::BAD_REQUEST); 423 assert_eq!(body["error"], "InvalidRequest"); 424 assert!(body["message"].is_string()); 425} 426 427#[tokio::test] 428async fn malformed_at_uri_returns_400_envelope() { 429 let server = MockServer::start().await; 430 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 431 let app = router(state); 432 let resp = app 433 .oneshot(xrpc_request( 434 "sh.tangled.repo.getRepo", 435 "repo", 436 "definitely-not-an-at-uri", 437 )) 438 .await 439 .unwrap(); 440 let (status, body) = json_response(resp).await; 441 assert_eq!(status, StatusCode::BAD_REQUEST); 442 assert_eq!(body["error"], "InvalidRequest"); 443} 444 445#[tokio::test] 446async fn upstream_uri_mismatch_routes_to_invalid_record() { 447 let server = MockServer::start().await; 448 Mock::given(method("GET")) 449 .and(path("/xrpc/com.atproto.repo.getRecord")) 450 .and(query_param("repo", "did:plc:clam")) 451 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 452 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere", 453 "cid": CID, 454 "value": { 455 "$type": "sh.tangled.repo", 456 "knot": "oyster.cafe", 457 "createdAt": "2026-05-01T00:00:00Z" 458 } 459 }))) 460 .mount(&server) 461 .await; 462 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 463 let app = router(state); 464 let resp = app 465 .oneshot(xrpc_request( 466 "sh.tangled.repo.getRepo", 467 "repo", 468 "at://did:plc:clam/sh.tangled.repo/r1", 469 )) 470 .await 471 .unwrap(); 472 let (status, body) = json_response(resp).await; 473 assert_eq!(status, StatusCode::BAD_GATEWAY); 474 assert_eq!(body["error"], "InvalidRecord"); 475} 476 477#[tokio::test] 478async fn upstream_garbage_cid_routes_to_invalid_record() { 479 let server = MockServer::start().await; 480 Mock::given(method("GET")) 481 .and(path("/xrpc/com.atproto.repo.getRecord")) 482 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 483 "uri": "at://did:plc:clam/sh.tangled.repo/r1", 484 "cid": "not-a-real-cid", 485 "value": { 486 "$type": "sh.tangled.repo", 487 "knot": "oyster.cafe", 488 "createdAt": "2026-05-01T00:00:00Z" 489 } 490 }))) 491 .mount(&server) 492 .await; 493 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 494 let app = router(state); 495 let resp = app 496 .oneshot(xrpc_request( 497 "sh.tangled.repo.getRepo", 498 "repo", 499 "at://did:plc:clam/sh.tangled.repo/r1", 500 )) 501 .await 502 .unwrap(); 503 let (status, body) = json_response(resp).await; 504 assert_eq!(status, StatusCode::BAD_GATEWAY); 505 assert_eq!(body["error"], "InvalidRecord"); 506} 507 508#[tokio::test] 509async fn oversize_upstream_body_routes_to_upstream_failed() { 510 let server = MockServer::start().await; 511 let payload = vec![b'x'; 8 * 1024 * 1024]; 512 Mock::given(method("GET")) 513 .and(path("/xrpc/com.atproto.repo.getRecord")) 514 .respond_with( 515 ResponseTemplate::new(200) 516 .insert_header("content-type", "application/json") 517 .set_body_bytes(payload), 518 ) 519 .mount(&server) 520 .await; 521 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 522 let app = router(state); 523 let resp = app 524 .oneshot(xrpc_request( 525 "sh.tangled.repo.getRepo", 526 "repo", 527 "at://did:plc:clam/sh.tangled.repo/r1", 528 )) 529 .await 530 .unwrap(); 531 let (status, body) = json_response(resp).await; 532 assert_eq!(status, StatusCode::BAD_GATEWAY); 533 assert_eq!(body["error"], "UpstreamFailed"); 534} 535 536#[tokio::test] 537async fn upstream_503_routes_to_upstream_failed() { 538 let server = MockServer::start().await; 539 Mock::given(method("GET")) 540 .and(path("/xrpc/com.atproto.repo.getRecord")) 541 .respond_with(ResponseTemplate::new(503)) 542 .mount(&server) 543 .await; 544 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 545 let app = router(state); 546 let resp = app 547 .oneshot(xrpc_request( 548 "sh.tangled.repo.getRepo", 549 "repo", 550 "at://did:plc:clam/sh.tangled.repo/r1", 551 )) 552 .await 553 .unwrap(); 554 let (status, body) = json_response(resp).await; 555 assert_eq!(status, StatusCode::BAD_GATEWAY); 556 assert_eq!(body["error"], "UpstreamFailed"); 557} 558 559#[tokio::test] 560async fn get_repo_by_repo_did_returns_observed_record() { 561 let server = MockServer::start().await; 562 let owner_did = did("did:plc:scallop"); 563 let rk = rkey("r1"); 564 let repo_did = did("did:plc:limpet"); 565 mount_record( 566 &server, 567 &owner_did, 568 &nsid("sh.tangled.repo"), 569 &rk, 570 json!({ 571 "$type": "sh.tangled.repo", 572 "name": "scallop", 573 "knot": "oyster.cafe", 574 "createdAt": "2026-05-01T00:00:00Z", 575 "repoDid": repo_did.as_ref(), 576 }), 577 ) 578 .await; 579 580 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 581 state 582 .resolver 583 .observe(owner_did.clone(), rk.clone(), Some(repo_did.clone())) 584 .await; 585 586 let app = router(state); 587 let resp = app 588 .oneshot(xrpc_request( 589 "sh.tangled.repo.getRepoByRepoDid", 590 "repoDid", 591 repo_did.as_ref(), 592 )) 593 .await 594 .unwrap(); 595 let (status, body) = json_response(resp).await; 596 assert_eq!(status, StatusCode::OK); 597 assert_eq!( 598 body["uri"], 599 format!( 600 "at://{}/sh.tangled.repo/{}", 601 owner_did.as_ref(), 602 rk.as_ref() 603 ) 604 ); 605 assert_eq!(body["value"]["name"], "scallop"); 606 assert_eq!(body["value"]["repoDid"], repo_did.as_ref()); 607} 608 609#[tokio::test] 610async fn get_repo_by_repo_did_404_when_unobserved() { 611 let server = MockServer::start().await; 612 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 613 let app = router(state); 614 let resp = app 615 .oneshot(xrpc_request( 616 "sh.tangled.repo.getRepoByRepoDid", 617 "repoDid", 618 "did:plc:whelk", 619 )) 620 .await 621 .unwrap(); 622 assert_eq!(resp.status(), StatusCode::NOT_FOUND); 623} 624 625#[tokio::test] 626async fn get_repo_by_repo_did_400_on_invalid_did() { 627 let server = MockServer::start().await; 628 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 629 let app = router(state); 630 let resp = app 631 .oneshot(xrpc_request( 632 "sh.tangled.repo.getRepoByRepoDid", 633 "repoDid", 634 "not-a-did", 635 )) 636 .await 637 .unwrap(); 638 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 639}