Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 22 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_xrpc::{AppState, router}; 12use futures::stream::{self, StreamExt}; 13use http::{Request, StatusCode}; 14use jacquard_common::DefaultStr; 15use jacquard_common::types::did::Did; 16use jacquard_common::types::nsid::Nsid; 17use jacquard_common::types::recordkey::Rkey; 18use serde_json::{Value, json}; 19use tower::ServiceExt; 20use url::Url; 21use url::form_urlencoded::byte_serialize; 22use wiremock::matchers::{method, path, query_param}; 23use wiremock::{Mock, MockServer, ResponseTemplate}; 24 25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 26 27fn did(s: &str) -> Did<DefaultStr> { 28 Did::new_owned(s).unwrap() 29} 30 31fn rkey(s: &str) -> Rkey<DefaultStr> { 32 Rkey::new_owned(s).unwrap() 33} 34 35fn nsid(s: &'static str) -> Nsid<DefaultStr> { 36 Nsid::new_static(s).unwrap() 37} 38 39async fn fresh_app(server_uri: &Url) -> AppState { 40 AppState::new( 41 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 42 SlingshotClient::with_default_http(server_uri.clone()).unwrap(), 43 Arc::new(EdgeStore::new(RuntimeHasher::default())), 44 Arc::new(StateIndex::new(RuntimeHasher::default())), 45 Arc::new(StateIndex::new(RuntimeHasher::default())), 46 Arc::new(CoverageWatch::new()), 47 Arc::new( 48 KnotProxy::new( 49 KnotProxyConfig::default(), 50 KnotHttpConfig::default(), 51 Arc::new(SystemClock::new()), 52 RuntimeHasher::default(), 53 ) 54 .unwrap(), 55 ), 56 Arc::new(SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap()) 57 as Arc<dyn SearchReader>, 58 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 59 ) 60} 61 62async fn mount_record( 63 server: &MockServer, 64 did: &Did<DefaultStr>, 65 collection: &Nsid<DefaultStr>, 66 rkey: &Rkey<DefaultStr>, 67 value: Value, 68) { 69 let uri = format!( 70 "at://{}/{}/{}", 71 did.as_ref(), 72 collection.as_ref(), 73 rkey.as_ref() 74 ); 75 let body = json!({ "uri": uri, "cid": CID, "value": value }); 76 Mock::given(method("GET")) 77 .and(path("/xrpc/com.atproto.repo.getRecord")) 78 .and(query_param("repo", did.as_ref())) 79 .and(query_param("collection", collection.as_ref())) 80 .and(query_param("rkey", rkey.as_ref())) 81 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 82 .mount(server) 83 .await; 84} 85 86fn xrpc_request(endpoint: &str, param: &str, value: &str) -> Request<Body> { 87 Request::builder() 88 .uri(format!("/xrpc/{endpoint}?{param}={value}")) 89 .body(Body::empty()) 90 .unwrap() 91} 92 93fn xrpc_request_escaped(endpoint: &str, param: &str, value: &str) -> Request<Body> { 94 let encoded: String = byte_serialize(value.as_bytes()).collect(); 95 Request::builder() 96 .uri(format!("/xrpc/{endpoint}?{param}={encoded}")) 97 .body(Body::empty()) 98 .unwrap() 99} 100 101async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 102 let status = resp.status(); 103 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 104 let parsed: Value = serde_json::from_slice(&bytes).expect("response is JSON"); 105 (status, parsed) 106} 107 108#[tokio::test] 109async fn cold_start_serves_all_four_point_lookups() { 110 let server = MockServer::start().await; 111 let clam = did("did:plc:clam"); 112 113 mount_record( 114 &server, 115 &clam, 116 &nsid("sh.tangled.repo"), 117 &rkey("r1"), 118 json!({ 119 "$type": "sh.tangled.repo", 120 "name": "clam", 121 "knot": "oyster.cafe", 122 "createdAt": "2026-05-01T00:00:00Z" 123 }), 124 ) 125 .await; 126 127 mount_record( 128 &server, 129 &clam, 130 &nsid("sh.tangled.actor.profile"), 131 &rkey("self"), 132 json!({ 133 "$type": "sh.tangled.actor.profile", 134 "bluesky": false, 135 "description": "clam shell" 136 }), 137 ) 138 .await; 139 140 mount_record( 141 &server, 142 &clam, 143 &nsid("sh.tangled.repo.issue"), 144 &rkey("i1"), 145 json!({ 146 "$type": "sh.tangled.repo.issue", 147 "repo": "did:plc:limpet", 148 "title": "broken", 149 "createdAt": "2026-05-01T00:00:00Z" 150 }), 151 ) 152 .await; 153 154 mount_record( 155 &server, 156 &clam, 157 &nsid("sh.tangled.repo.pull"), 158 &rkey("p1"), 159 json!({ 160 "$type": "sh.tangled.repo.pull", 161 "title": "ship", 162 "createdAt": "2026-05-01T00:00:00Z", 163 "rounds": [], 164 "target": {"repo": "did:plc:limpet", "branch": "main"} 165 }), 166 ) 167 .await; 168 169 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 170 let app = router(state); 171 172 let cases = [ 173 ( 174 "sh.tangled.repo.getRepo", 175 "repo", 176 format!("at://{}/sh.tangled.repo/r1", clam.as_ref()), 177 "knot", 178 json!("oyster.cafe"), 179 ), 180 ( 181 "sh.tangled.actor.getProfile", 182 "actor", 183 format!("at://{}/sh.tangled.actor.profile/self", clam.as_ref()), 184 "description", 185 json!("clam shell"), 186 ), 187 ( 188 "sh.tangled.repo.getIssue", 189 "issue", 190 format!("at://{}/sh.tangled.repo.issue/i1", clam.as_ref()), 191 "title", 192 json!("broken"), 193 ), 194 ( 195 "sh.tangled.repo.getPull", 196 "pull", 197 format!("at://{}/sh.tangled.repo.pull/p1", clam.as_ref()), 198 "title", 199 json!("ship"), 200 ), 201 ]; 202 203 stream::iter(cases) 204 .for_each(|(endpoint, param, at_uri, field, expected)| { 205 let app = app.clone(); 206 async move { 207 let resp = app 208 .oneshot(xrpc_request(endpoint, param, &at_uri)) 209 .await 210 .unwrap(); 211 let (status, body) = json_response(resp).await; 212 assert_eq!(status, StatusCode::OK, "{endpoint} status"); 213 assert_eq!(body["uri"], at_uri, "{endpoint} uri"); 214 assert_eq!(body["cid"], CID, "{endpoint} cid"); 215 assert_eq!( 216 body["value"][field], expected, 217 "{endpoint} body field {field}" 218 ); 219 } 220 }) 221 .await; 222} 223 224#[tokio::test] 225async fn percent_escaped_at_uri_resolves_identically_to_raw() { 226 let server = MockServer::start().await; 227 let clam = did("did:plc:clam"); 228 mount_record( 229 &server, 230 &clam, 231 &nsid("sh.tangled.actor.profile"), 232 &rkey("self"), 233 json!({ 234 "$type": "sh.tangled.actor.profile", 235 "bluesky": false, 236 "description": "clam shell" 237 }), 238 ) 239 .await; 240 241 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 242 let app = router(state); 243 244 let at_uri = format!("at://{}/sh.tangled.actor.profile/self", clam.as_ref()); 245 246 let (raw_status, raw_body) = json_response( 247 app.clone() 248 .oneshot(xrpc_request( 249 "sh.tangled.actor.getProfile", 250 "actor", 251 &at_uri, 252 )) 253 .await 254 .unwrap(), 255 ) 256 .await; 257 let (escaped_status, escaped_body) = json_response( 258 app.oneshot(xrpc_request_escaped( 259 "sh.tangled.actor.getProfile", 260 "actor", 261 &at_uri, 262 )) 263 .await 264 .unwrap(), 265 ) 266 .await; 267 268 assert_eq!(raw_status, StatusCode::OK, "raw at-uri status"); 269 assert_eq!(escaped_status, StatusCode::OK, "escaped at-uri status"); 270 assert_eq!( 271 raw_body, escaped_body, 272 "raw and escaped must resolve identically" 273 ); 274 assert_eq!(escaped_body["uri"], at_uri); 275} 276 277#[tokio::test] 278async fn second_call_is_served_from_lru() { 279 let server = MockServer::start().await; 280 let uni = did("did:plc:uni"); 281 let mock = Mock::given(method("GET")) 282 .and(path("/xrpc/com.atproto.repo.getRecord")) 283 .and(query_param("repo", uni.as_ref())) 284 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 285 "uri": format!("at://{}/sh.tangled.repo/r1", uni.as_ref()), 286 "cid": CID, 287 "value": { 288 "$type": "sh.tangled.repo", 289 "name": "uni", 290 "knot": "witchcraft.systems", 291 "createdAt": "2026-05-01T00:00:00Z" 292 } 293 }))) 294 .expect(1) 295 .mount_as_scoped(&server) 296 .await; 297 298 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 299 let app = router(state); 300 let req_uri = format!("at://{}/sh.tangled.repo/r1", uni.as_ref()); 301 302 stream::iter(0..3) 303 .for_each(|_| { 304 let app = app.clone(); 305 let req_uri = req_uri.clone(); 306 async move { 307 let resp = app 308 .oneshot(xrpc_request("sh.tangled.repo.getRepo", "repo", &req_uri)) 309 .await 310 .unwrap(); 311 assert_eq!(resp.status(), StatusCode::OK); 312 } 313 }) 314 .await; 315 316 drop(mock); 317} 318 319#[tokio::test] 320async fn collection_mismatch_is_400() { 321 let server = MockServer::start().await; 322 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 323 let app = router(state); 324 let resp = app 325 .oneshot(xrpc_request( 326 "sh.tangled.repo.getRepo", 327 "repo", 328 "at://did:plc:clam/sh.tangled.actor.profile/self", 329 )) 330 .await 331 .unwrap(); 332 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 333} 334 335#[tokio::test] 336async fn handle_authority_is_400() { 337 let server = MockServer::start().await; 338 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 339 let app = router(state); 340 let resp = app 341 .oneshot(xrpc_request( 342 "sh.tangled.repo.getRepo", 343 "repo", 344 "at://witchcraft.systems/sh.tangled.repo/r1", 345 )) 346 .await 347 .unwrap(); 348 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 349} 350 351#[tokio::test] 352async fn slingshot_404_propagates_as_404() { 353 let server = MockServer::start().await; 354 Mock::given(method("GET")) 355 .and(path("/xrpc/com.atproto.repo.getRecord")) 356 .respond_with(ResponseTemplate::new(404)) 357 .mount(&server) 358 .await; 359 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 360 let app = router(state); 361 let resp = app 362 .oneshot(xrpc_request( 363 "sh.tangled.repo.getRepo", 364 "repo", 365 "at://did:plc:clam/sh.tangled.repo/missing", 366 )) 367 .await 368 .unwrap(); 369 assert_eq!(resp.status(), StatusCode::NOT_FOUND); 370} 371 372#[tokio::test] 373async fn wrong_record_type_is_502() { 374 let server = MockServer::start().await; 375 mount_record( 376 &server, 377 &did("did:plc:clam"), 378 &nsid("sh.tangled.repo"), 379 &rkey("r1"), 380 json!({ 381 "$type": "sh.tangled.knot", 382 "knot": "oyster.cafe", 383 "createdAt": "2026-05-01T00:00:00Z" 384 }), 385 ) 386 .await; 387 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 388 let app = router(state); 389 let resp = app 390 .oneshot(xrpc_request( 391 "sh.tangled.repo.getRepo", 392 "repo", 393 "at://did:plc:clam/sh.tangled.repo/r1", 394 )) 395 .await 396 .unwrap(); 397 let (status, body) = json_response(resp).await; 398 assert_eq!(status, StatusCode::BAD_GATEWAY); 399 assert_eq!(body["error"], "InvalidRecord"); 400} 401 402#[tokio::test] 403async fn wrong_type_does_not_poison_cache() { 404 let server = MockServer::start().await; 405 let mock = Mock::given(method("GET")) 406 .and(path("/xrpc/com.atproto.repo.getRecord")) 407 .and(query_param("repo", "did:plc:clam")) 408 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 409 "uri": "at://did:plc:clam/sh.tangled.repo/r1", 410 "cid": CID, 411 "value": { 412 "$type": "sh.tangled.knot", 413 "knot": "oyster.cafe", 414 "createdAt": "2026-05-01T00:00:00Z" 415 } 416 }))) 417 .expect(2) 418 .mount_as_scoped(&server) 419 .await; 420 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 421 let app = router(state); 422 let req = || { 423 xrpc_request( 424 "sh.tangled.repo.getRepo", 425 "repo", 426 "at://did:plc:clam/sh.tangled.repo/r1", 427 ) 428 }; 429 let first = app.clone().oneshot(req()).await.unwrap(); 430 assert_eq!(first.status(), StatusCode::BAD_GATEWAY); 431 let second = app.clone().oneshot(req()).await.unwrap(); 432 assert_eq!(second.status(), StatusCode::BAD_GATEWAY); 433 drop(mock); 434} 435 436#[tokio::test] 437async fn profile_with_empty_preferred_handle_is_tolerated() { 438 let server = MockServer::start().await; 439 let nel = did("did:plc:nel"); 440 mount_record( 441 &server, 442 &nel, 443 &nsid("sh.tangled.actor.profile"), 444 &rkey("self"), 445 json!({ 446 "$type": "sh.tangled.actor.profile", 447 "bluesky": true, 448 "preferredHandle": "", 449 "description": "empty handle, valid profile" 450 }), 451 ) 452 .await; 453 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 454 let app = router(state); 455 let at_uri = format!("at://{}/sh.tangled.actor.profile/self", nel.as_ref()); 456 let resp = app 457 .oneshot(xrpc_request( 458 "sh.tangled.actor.getProfile", 459 "actor", 460 &at_uri, 461 )) 462 .await 463 .unwrap(); 464 let (status, body) = json_response(resp).await; 465 assert_eq!(status, StatusCode::OK, "status: {body}"); 466 assert_eq!(body["uri"], at_uri); 467 assert_eq!(body["value"]["description"], "empty handle, valid profile"); 468 assert!(body["value"]["preferredHandle"].is_null()); 469} 470 471#[tokio::test] 472async fn missing_uri_param_returns_json_envelope() { 473 let server = MockServer::start().await; 474 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 475 let app = router(state); 476 let req = Request::builder() 477 .uri("/xrpc/sh.tangled.repo.getRepo") 478 .body(Body::empty()) 479 .unwrap(); 480 let resp = app.oneshot(req).await.unwrap(); 481 let (status, body) = json_response(resp).await; 482 assert_eq!(status, StatusCode::BAD_REQUEST); 483 assert_eq!(body["error"], "InvalidRequest"); 484 assert!(body["message"].is_string()); 485} 486 487#[tokio::test] 488async fn malformed_at_uri_returns_400_envelope() { 489 let server = MockServer::start().await; 490 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 491 let app = router(state); 492 let resp = app 493 .oneshot(xrpc_request( 494 "sh.tangled.repo.getRepo", 495 "repo", 496 "definitely-not-an-at-uri", 497 )) 498 .await 499 .unwrap(); 500 let (status, body) = json_response(resp).await; 501 assert_eq!(status, StatusCode::BAD_REQUEST); 502 assert_eq!(body["error"], "InvalidRequest"); 503} 504 505#[tokio::test] 506async fn upstream_uri_mismatch_routes_to_invalid_record() { 507 let server = MockServer::start().await; 508 Mock::given(method("GET")) 509 .and(path("/xrpc/com.atproto.repo.getRecord")) 510 .and(query_param("repo", "did:plc:clam")) 511 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 512 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere", 513 "cid": CID, 514 "value": { 515 "$type": "sh.tangled.repo", 516 "knot": "oyster.cafe", 517 "createdAt": "2026-05-01T00:00:00Z" 518 } 519 }))) 520 .mount(&server) 521 .await; 522 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 523 let app = router(state); 524 let resp = app 525 .oneshot(xrpc_request( 526 "sh.tangled.repo.getRepo", 527 "repo", 528 "at://did:plc:clam/sh.tangled.repo/r1", 529 )) 530 .await 531 .unwrap(); 532 let (status, body) = json_response(resp).await; 533 assert_eq!(status, StatusCode::BAD_GATEWAY); 534 assert_eq!(body["error"], "InvalidRecord"); 535} 536 537#[tokio::test] 538async fn upstream_garbage_cid_routes_to_invalid_record() { 539 let server = MockServer::start().await; 540 Mock::given(method("GET")) 541 .and(path("/xrpc/com.atproto.repo.getRecord")) 542 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 543 "uri": "at://did:plc:clam/sh.tangled.repo/r1", 544 "cid": "not-a-real-cid", 545 "value": { 546 "$type": "sh.tangled.repo", 547 "knot": "oyster.cafe", 548 "createdAt": "2026-05-01T00:00:00Z" 549 } 550 }))) 551 .mount(&server) 552 .await; 553 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 554 let app = router(state); 555 let resp = app 556 .oneshot(xrpc_request( 557 "sh.tangled.repo.getRepo", 558 "repo", 559 "at://did:plc:clam/sh.tangled.repo/r1", 560 )) 561 .await 562 .unwrap(); 563 let (status, body) = json_response(resp).await; 564 assert_eq!(status, StatusCode::BAD_GATEWAY); 565 assert_eq!(body["error"], "InvalidRecord"); 566} 567 568#[tokio::test] 569async fn oversize_upstream_body_routes_to_upstream_failed() { 570 let server = MockServer::start().await; 571 let payload = vec![b'x'; 8 * 1024 * 1024]; 572 Mock::given(method("GET")) 573 .and(path("/xrpc/com.atproto.repo.getRecord")) 574 .respond_with( 575 ResponseTemplate::new(200) 576 .insert_header("content-type", "application/json") 577 .set_body_bytes(payload), 578 ) 579 .mount(&server) 580 .await; 581 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 582 let app = router(state); 583 let resp = app 584 .oneshot(xrpc_request( 585 "sh.tangled.repo.getRepo", 586 "repo", 587 "at://did:plc:clam/sh.tangled.repo/r1", 588 )) 589 .await 590 .unwrap(); 591 let (status, body) = json_response(resp).await; 592 assert_eq!(status, StatusCode::BAD_GATEWAY); 593 assert_eq!(body["error"], "UpstreamFailed"); 594} 595 596#[tokio::test] 597async fn upstream_503_routes_to_upstream_failed() { 598 let server = MockServer::start().await; 599 Mock::given(method("GET")) 600 .and(path("/xrpc/com.atproto.repo.getRecord")) 601 .respond_with(ResponseTemplate::new(503)) 602 .mount(&server) 603 .await; 604 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 605 let app = router(state); 606 let resp = app 607 .oneshot(xrpc_request( 608 "sh.tangled.repo.getRepo", 609 "repo", 610 "at://did:plc:clam/sh.tangled.repo/r1", 611 )) 612 .await 613 .unwrap(); 614 let (status, body) = json_response(resp).await; 615 assert_eq!(status, StatusCode::BAD_GATEWAY); 616 assert_eq!(body["error"], "UpstreamFailed"); 617} 618 619#[tokio::test] 620async fn get_repo_by_repo_did_returns_observed_record() { 621 let server = MockServer::start().await; 622 let owner_did = did("did:plc:scallop"); 623 let rk = rkey("r1"); 624 let repo_did = did("did:plc:limpet"); 625 mount_record( 626 &server, 627 &owner_did, 628 &nsid("sh.tangled.repo"), 629 &rk, 630 json!({ 631 "$type": "sh.tangled.repo", 632 "name": "scallop", 633 "knot": "oyster.cafe", 634 "createdAt": "2026-05-01T00:00:00Z", 635 "repoDid": repo_did.as_ref(), 636 }), 637 ) 638 .await; 639 640 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 641 state 642 .resolver 643 .observe(owner_did.clone(), rk.clone(), Some(repo_did.clone())) 644 .await; 645 646 let app = router(state); 647 let resp = app 648 .oneshot(xrpc_request( 649 "sh.tangled.repo.getRepoByRepoDid", 650 "repoDid", 651 repo_did.as_ref(), 652 )) 653 .await 654 .unwrap(); 655 let (status, body) = json_response(resp).await; 656 assert_eq!(status, StatusCode::OK); 657 assert_eq!( 658 body["uri"], 659 format!( 660 "at://{}/sh.tangled.repo/{}", 661 owner_did.as_ref(), 662 rk.as_ref() 663 ) 664 ); 665 assert_eq!(body["value"]["name"], "scallop"); 666 assert_eq!(body["value"]["repoDid"], repo_did.as_ref()); 667} 668 669#[tokio::test] 670async fn get_repo_by_repo_did_404_when_unobserved() { 671 let server = MockServer::start().await; 672 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 673 let app = router(state); 674 let resp = app 675 .oneshot(xrpc_request( 676 "sh.tangled.repo.getRepoByRepoDid", 677 "repoDid", 678 "did:plc:whelk", 679 )) 680 .await 681 .unwrap(); 682 assert_eq!(resp.status(), StatusCode::NOT_FOUND); 683} 684 685#[tokio::test] 686async fn get_repo_by_repo_did_400_on_invalid_did() { 687 let server = MockServer::start().await; 688 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await; 689 let app = router(state); 690 let resp = app 691 .oneshot(xrpc_request( 692 "sh.tangled.repo.getRepoByRepoDid", 693 "repoDid", 694 "not-a-did", 695 )) 696 .await 697 .unwrap(); 698 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 699}