This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 62 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{ 5 Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, PageToken, PullStatusKind, 6 StateIndex, 7}; 8use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 9use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 10use bobbin_resolver::RepoIdResolver; 11use bobbin_runtime::{RuntimeHasher, SystemClock}; 12use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 13use bobbin_slingshot_client::SlingshotClient; 14use bobbin_types::edges::Edge; 15use bobbin_types::ids::SubjectRef; 16use bobbin_xrpc::{AppState, router}; 17use futures::stream::{self, StreamExt}; 18use http::{Request, StatusCode}; 19use jacquard_common::DefaultStr; 20use jacquard_common::types::did::Did; 21use jacquard_common::types::nsid::Nsid; 22use jacquard_common::types::recordkey::Rkey; 23use jacquard_common::types::string::AtUri; 24use serde_json::{Value, json}; 25use tower::ServiceExt; 26use url::Url; 27use url::form_urlencoded::byte_serialize; 28use wiremock::matchers::{method, path, query_param}; 29use wiremock::{Mock, MockServer, ResponseTemplate}; 30 31const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 32 33fn at(s: &str) -> AtUri<DefaultStr> { 34 AtUri::new_owned(s).unwrap() 35} 36 37fn did(s: &str) -> Did<DefaultStr> { 38 Did::new_owned(s).unwrap() 39} 40 41fn rkey(s: &str) -> Rkey<DefaultStr> { 42 Rkey::new_owned(s).unwrap() 43} 44 45fn nsid(s: &'static str) -> Nsid<DefaultStr> { 46 Nsid::new_static(s).unwrap() 47} 48 49fn subj(s: &str) -> SubjectRef { 50 Did::<DefaultStr>::new_owned(s) 51 .map(SubjectRef::Did) 52 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap())) 53} 54 55struct Harness { 56 server: MockServer, 57 edges: Arc<EdgeStore>, 58 coverage: Arc<CoverageWatch>, 59 state: AppState, 60} 61 62static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); 63 64fn next_sort_micros() -> u64 { 65 EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed) 66} 67 68impl Harness { 69 async fn new() -> Self { 70 let server = MockServer::start().await; 71 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default())); 72 let issue_states = Arc::new(StateIndex::new(RuntimeHasher::default())); 73 let pull_statuses = Arc::new(StateIndex::new(RuntimeHasher::default())); 74 let coverage = Arc::new(CoverageWatch::new()); 75 let state = AppState::new( 76 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 77 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 78 edges.clone(), 79 issue_states.clone(), 80 pull_statuses.clone(), 81 coverage.clone(), 82 Arc::new( 83 KnotProxy::new( 84 KnotProxyConfig::default(), 85 KnotHttpConfig::default(), 86 Arc::new(SystemClock::new()), 87 RuntimeHasher::default(), 88 ) 89 .unwrap(), 90 ), 91 Arc::new( 92 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 93 ) as Arc<dyn SearchReader>, 94 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 95 ); 96 Self { 97 server, 98 edges, 99 coverage, 100 state, 101 } 102 } 103 104 fn add_edge( 105 &self, 106 kind: &Nsid<DefaultStr>, 107 subject: &AtUri<DefaultStr>, 108 source: &AtUri<DefaultStr>, 109 ) { 110 self.edges.add(Edge { 111 kind: kind.clone(), 112 subject: subj(subject.as_ref()), 113 source: source.clone(), 114 sort_micros: next_sort_micros(), 115 }); 116 } 117 118 async fn mount( 119 &self, 120 did: &Did<DefaultStr>, 121 collection: &Nsid<DefaultStr>, 122 rkey: &Rkey<DefaultStr>, 123 value: Value, 124 ) { 125 let uri = format!( 126 "at://{}/{}/{}", 127 did.as_ref(), 128 collection.as_ref(), 129 rkey.as_ref() 130 ); 131 let body = json!({ "uri": uri, "cid": CID, "value": value }); 132 Mock::given(method("GET")) 133 .and(path("/xrpc/com.atproto.repo.getRecord")) 134 .and(query_param("repo", did.as_ref())) 135 .and(query_param("collection", collection.as_ref())) 136 .and(query_param("rkey", rkey.as_ref())) 137 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 138 .mount(&self.server) 139 .await; 140 } 141 142 fn promote_ready(&self, events: u64, cursor: u64) { 143 self.coverage.update(|_| Coverage::Ready { 144 events_processed: events, 145 last_cursor: HydrantCursor::new(cursor), 146 }); 147 } 148 149 fn warming(&self, events: u64, cursor: u64) { 150 self.coverage.update(|_| Coverage::Warming { 151 events_processed: events, 152 last_cursor: HydrantCursor::new(cursor), 153 }); 154 } 155} 156 157fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> { 158 let mut qs = format!("subject={}", encode(subject)); 159 extras.iter().for_each(|(k, v)| { 160 qs.push('&'); 161 qs.push_str(k); 162 qs.push('='); 163 qs.push_str(&encode(v)); 164 }); 165 Request::builder() 166 .uri(format!("/xrpc/{endpoint}?{qs}")) 167 .body(Body::empty()) 168 .unwrap() 169} 170 171fn encode(s: &str) -> String { 172 byte_serialize(s.as_bytes()).collect() 173} 174 175async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 176 let status = resp.status(); 177 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 178 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 179 (status, parsed) 180} 181 182fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value { 183 json!({ 184 "$type": "sh.tangled.repo.issue", 185 "repo": repo_did.as_ref(), 186 "title": title, 187 "createdAt": "2026-05-01T00:00:00Z" 188 }) 189} 190 191fn pull_body(repo_did: &Did<DefaultStr>, title: &str) -> Value { 192 json!({ 193 "$type": "sh.tangled.repo.pull", 194 "title": title, 195 "createdAt": "2026-05-01T00:00:00Z", 196 "rounds": [], 197 "target": { 198 "repo": repo_did.as_ref(), 199 "branch": "main" 200 } 201 }) 202} 203 204fn star_body(subject_did: &Did<DefaultStr>) -> Value { 205 json!({ 206 "$type": "sh.tangled.feed.star", 207 "createdAt": "2026-05-01T00:00:00Z", 208 "subject": { 209 "$type": "sh.tangled.feed.star#repo", 210 "did": subject_did.as_ref() 211 } 212 }) 213} 214 215fn follow_body(subject_did: &Did<DefaultStr>) -> Value { 216 json!({ 217 "$type": "sh.tangled.graph.follow", 218 "createdAt": "2026-05-01T00:00:00Z", 219 "subject": subject_did.as_ref() 220 }) 221} 222 223#[tokio::test] 224async fn list_issues_with_no_edges_returns_empty_items() { 225 let h = Harness::new().await; 226 let app = router(h.state.clone()); 227 let resp = app 228 .oneshot(list_request( 229 "sh.tangled.repo.listIssues", 230 "at://did:plc:abalone", 231 &[], 232 )) 233 .await 234 .unwrap(); 235 let (status, body) = json_response(resp).await; 236 assert_eq!(status, StatusCode::OK); 237 assert_eq!(body["items"], json!([])); 238 assert!(body["cursor"].is_null()); 239} 240 241#[tokio::test] 242async fn count_issues_with_no_edges_returns_zero() { 243 let h = Harness::new().await; 244 let app = router(h.state.clone()); 245 let resp = app 246 .oneshot(list_request( 247 "sh.tangled.repo.countIssues", 248 "at://did:plc:abalone", 249 &[], 250 )) 251 .await 252 .unwrap(); 253 let (status, body) = json_response(resp).await; 254 assert_eq!(status, StatusCode::OK); 255 assert_eq!(body["count"], json!(0)); 256 assert_eq!(body["distinctAuthors"], json!(0)); 257} 258 259#[tokio::test] 260async fn list_issues_hydrates_via_slingshot_when_edges_present() { 261 let h = Harness::new().await; 262 let repo = did("did:plc:abalone"); 263 let subject = at(&format!("at://{}", repo.as_ref())); 264 let owners = [ 265 ("did:plc:nel", "i1", "first"), 266 ("did:plc:olaren", "i2", "second"), 267 ]; 268 stream::iter(owners) 269 .for_each(|(d, r, title)| { 270 let h = &h; 271 let subject = subject.clone(); 272 let repo = repo.clone(); 273 async move { 274 let d_did = did(d); 275 let rk = rkey(r); 276 h.add_edge( 277 &nsid("sh.tangled.repo.issue"), 278 &subject, 279 &at(&format!( 280 "at://{}/sh.tangled.repo.issue/{}", 281 d_did.as_ref(), 282 rk.as_ref() 283 )), 284 ); 285 h.mount( 286 &d_did, 287 &nsid("sh.tangled.repo.issue"), 288 &rk, 289 issue_body(&repo, title), 290 ) 291 .await; 292 } 293 }) 294 .await; 295 296 let app = router(h.state.clone()); 297 let resp = app 298 .oneshot(list_request( 299 "sh.tangled.repo.listIssues", 300 subject.as_ref(), 301 &[], 302 )) 303 .await 304 .unwrap(); 305 let (status, body) = json_response(resp).await; 306 assert_eq!(status, StatusCode::OK); 307 let items = body["items"].as_array().expect("items array"); 308 assert_eq!(items.len(), 2); 309 let titles: Vec<&str> = items 310 .iter() 311 .map(|v| v["value"]["title"].as_str().unwrap()) 312 .collect(); 313 assert!(titles.contains(&"first")); 314 assert!(titles.contains(&"second")); 315 assert_eq!(items[0]["cid"], CID); 316 assert!(items[0]["uri"].as_str().unwrap().starts_with("at://")); 317} 318 319#[tokio::test] 320async fn count_distinct_authors_dedupes_per_author() { 321 let h = Harness::new().await; 322 let subject = at("at://did:plc:abalone"); 323 h.add_edge( 324 &nsid("sh.tangled.feed.star"), 325 &subject, 326 &at("at://did:plc:nel/sh.tangled.feed.star/s1"), 327 ); 328 h.add_edge( 329 &nsid("sh.tangled.feed.star"), 330 &subject, 331 &at("at://did:plc:nel/sh.tangled.feed.star/s2"), 332 ); 333 h.add_edge( 334 &nsid("sh.tangled.feed.star"), 335 &subject, 336 &at("at://did:plc:olaren/sh.tangled.feed.star/s3"), 337 ); 338 339 let app = router(h.state.clone()); 340 let resp = app 341 .oneshot(list_request( 342 "sh.tangled.feed.countStars", 343 subject.as_ref(), 344 &[], 345 )) 346 .await 347 .unwrap(); 348 let (_, body) = json_response(resp).await; 349 assert_eq!(body["count"], json!(3)); 350 assert_eq!(body["distinctAuthors"], json!(2)); 351} 352 353#[tokio::test] 354async fn list_items_stable_across_coverage_promotion() { 355 let h = Harness::new().await; 356 let subject = at("at://did:plc:abalone"); 357 let nel = did("did:plc:nel"); 358 h.add_edge( 359 &nsid("sh.tangled.feed.star"), 360 &subject, 361 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())), 362 ); 363 h.mount( 364 &nel, 365 &nsid("sh.tangled.feed.star"), 366 &rkey("s1"), 367 star_body(&did("did:plc:abalone")), 368 ) 369 .await; 370 371 let app = router(h.state.clone()); 372 h.warming(1, 5); 373 let (_, before) = json_response( 374 app.clone() 375 .oneshot(list_request( 376 "sh.tangled.feed.listStars", 377 subject.as_ref(), 378 &[], 379 )) 380 .await 381 .unwrap(), 382 ) 383 .await; 384 assert_eq!(before["items"].as_array().unwrap().len(), 1); 385 386 h.promote_ready(2, 9); 387 let (_, after) = json_response( 388 app.oneshot(list_request( 389 "sh.tangled.feed.listStars", 390 subject.as_ref(), 391 &[], 392 )) 393 .await 394 .unwrap(), 395 ) 396 .await; 397 assert_eq!( 398 after["items"].as_array().unwrap().len(), 399 before["items"].as_array().unwrap().len(), 400 ); 401 assert_eq!(after["items"], before["items"]); 402} 403 404#[tokio::test] 405async fn list_paginates_via_cursor() { 406 let h = Harness::new().await; 407 let subject = at("at://did:plc:abalone"); 408 let repo = did("did:plc:abalone"); 409 let owners = [ 410 ("did:plc:nel", "i1"), 411 ("did:plc:olaren", "i2"), 412 ("did:plc:teq", "i3"), 413 ("did:plc:lyna", "i4"), 414 ("did:plc:bailey", "i5"), 415 ]; 416 stream::iter(owners) 417 .for_each(|(d, r)| { 418 let h = &h; 419 let subject = subject.clone(); 420 let repo = repo.clone(); 421 async move { 422 let d_did = did(d); 423 let rk = rkey(r); 424 h.add_edge( 425 &nsid("sh.tangled.repo.issue"), 426 &subject, 427 &at(&format!( 428 "at://{}/sh.tangled.repo.issue/{}", 429 d_did.as_ref(), 430 rk.as_ref() 431 )), 432 ); 433 h.mount( 434 &d_did, 435 &nsid("sh.tangled.repo.issue"), 436 &rk, 437 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 438 ) 439 .await; 440 } 441 }) 442 .await; 443 444 let app = router(h.state.clone()); 445 let (_, page1) = json_response( 446 app.clone() 447 .oneshot(list_request( 448 "sh.tangled.repo.listIssues", 449 subject.as_ref(), 450 &[("limit", "2")], 451 )) 452 .await 453 .unwrap(), 454 ) 455 .await; 456 let page1_items = page1["items"].as_array().unwrap().clone(); 457 assert_eq!(page1_items.len(), 2); 458 let cursor = page1["cursor"] 459 .as_str() 460 .expect("first page must yield a cursor") 461 .to_owned(); 462 assert!( 463 PageToken::decode_token(&cursor).is_ok(), 464 "cursor must be a TID-shaped token" 465 ); 466 467 let (_, page2) = json_response( 468 app.oneshot(list_request( 469 "sh.tangled.repo.listIssues", 470 subject.as_ref(), 471 &[("limit", "10"), ("cursor", &cursor)], 472 )) 473 .await 474 .unwrap(), 475 ) 476 .await; 477 let page2_items = page2["items"].as_array().unwrap().clone(); 478 assert_eq!(page2_items.len(), 3); 479 assert!(page2["cursor"].is_null(), "tail page must not promise more"); 480 481 let union: Vec<&str> = page1_items 482 .iter() 483 .chain(page2_items.iter()) 484 .map(|item| item["uri"].as_str().unwrap()) 485 .collect(); 486 assert_eq!(union.len(), owners.len(), "union covers every owner"); 487 let mut sorted = union.clone(); 488 sorted.sort(); 489 sorted.dedup(); 490 assert_eq!(sorted.len(), owners.len(), "no duplicates across pages"); 491} 492 493#[tokio::test] 494async fn pagination_unaffected_by_coverage_promotion() { 495 let h = Harness::new().await; 496 let subject = at("at://did:plc:abalone"); 497 let repo = did("did:plc:abalone"); 498 let owners = [("did:plc:nel", "i1"), ("did:plc:olaren", "i2")]; 499 stream::iter(owners) 500 .for_each(|(d, r)| { 501 let h = &h; 502 let subject = subject.clone(); 503 let repo = repo.clone(); 504 async move { 505 let d_did = did(d); 506 let rk = rkey(r); 507 h.add_edge( 508 &nsid("sh.tangled.repo.issue"), 509 &subject, 510 &at(&format!( 511 "at://{}/sh.tangled.repo.issue/{}", 512 d_did.as_ref(), 513 rk.as_ref() 514 )), 515 ); 516 h.mount( 517 &d_did, 518 &nsid("sh.tangled.repo.issue"), 519 &rk, 520 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 521 ) 522 .await; 523 } 524 }) 525 .await; 526 527 h.warming(1, 5); 528 let app = router(h.state.clone()); 529 let (_, page1) = json_response( 530 app.clone() 531 .oneshot(list_request( 532 "sh.tangled.repo.listIssues", 533 subject.as_ref(), 534 &[("limit", "1")], 535 )) 536 .await 537 .unwrap(), 538 ) 539 .await; 540 let cursor = page1["cursor"].as_str().unwrap().to_owned(); 541 542 h.promote_ready(2, 9); 543 let (_, page2) = json_response( 544 app.oneshot(list_request( 545 "sh.tangled.repo.listIssues", 546 subject.as_ref(), 547 &[("limit", "10"), ("cursor", &cursor)], 548 )) 549 .await 550 .unwrap(), 551 ) 552 .await; 553 assert_eq!(page2["items"].as_array().unwrap().len(), 1); 554} 555 556#[tokio::test] 557async fn invalid_cursor_returns_400() { 558 let h = Harness::new().await; 559 let app = router(h.state.clone()); 560 let resp = app 561 .oneshot(list_request( 562 "sh.tangled.repo.listIssues", 563 "at://did:plc:abalone", 564 &[("cursor", "not-a-number")], 565 )) 566 .await 567 .unwrap(); 568 let (status, body) = json_response(resp).await; 569 assert_eq!(status, StatusCode::BAD_REQUEST); 570 assert_eq!(body["error"], "InvalidRequest"); 571} 572 573#[tokio::test] 574async fn list_follows_subject_is_followee_did() { 575 let h = Harness::new().await; 576 let followee = did("did:plc:bailey"); 577 let subject = at(&format!("at://{}", followee.as_ref())); 578 h.add_edge( 579 &nsid("sh.tangled.graph.follow"), 580 &subject, 581 &at("at://did:plc:nel/sh.tangled.graph.follow/f1"), 582 ); 583 h.mount( 584 &did("did:plc:nel"), 585 &nsid("sh.tangled.graph.follow"), 586 &rkey("f1"), 587 follow_body(&followee), 588 ) 589 .await; 590 591 let app = router(h.state.clone()); 592 let (status, body) = json_response( 593 app.oneshot(list_request( 594 "sh.tangled.graph.listFollows", 595 subject.as_ref(), 596 &[], 597 )) 598 .await 599 .unwrap(), 600 ) 601 .await; 602 assert_eq!(status, StatusCode::OK); 603 let items = body["items"].as_array().unwrap(); 604 assert_eq!(items.len(), 1); 605 assert_eq!(items[0]["value"]["subject"], followee.as_ref()); 606} 607 608#[tokio::test] 609async fn upstream_failure_during_hydration_drops_only_that_item() { 610 let h = Harness::new().await; 611 let subject = at("at://did:plc:squid"); 612 let kind = nsid("sh.tangled.repo.issue"); 613 let repo = did("did:plc:squid"); 614 h.add_edge( 615 &kind, 616 &subject, 617 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"), 618 ); 619 h.add_edge( 620 &kind, 621 &subject, 622 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"), 623 ); 624 h.mount( 625 &did("did:plc:nel"), 626 &kind, 627 &rkey("ok"), 628 issue_body(&repo, "kelp survey"), 629 ) 630 .await; 631 Mock::given(method("GET")) 632 .and(path("/xrpc/com.atproto.repo.getRecord")) 633 .and(query_param("repo", "did:plc:teq")) 634 .and(query_param("collection", "sh.tangled.repo.issue")) 635 .and(query_param("rkey", "flaky")) 636 .respond_with(ResponseTemplate::new(503)) 637 .mount(&h.server) 638 .await; 639 640 let app = router(h.state.clone()); 641 let (status, body) = json_response( 642 app.oneshot(list_request( 643 "sh.tangled.repo.listIssues", 644 subject.as_ref(), 645 &[], 646 )) 647 .await 648 .unwrap(), 649 ) 650 .await; 651 assert_eq!(status, StatusCode::OK); 652 let items = body["items"].as_array().expect("items array"); 653 assert_eq!(items.len(), 1, "flaky item dropped, healthy sibling kept"); 654 assert_eq!( 655 items[0]["uri"].as_str().unwrap(), 656 "at://did:plc:nel/sh.tangled.repo.issue/ok", 657 ); 658} 659 660#[tokio::test] 661async fn transient_failure_keeps_edge_so_count_stays_whole() { 662 let h = Harness::new().await; 663 let subject = at("at://did:plc:squid"); 664 let kind = nsid("sh.tangled.repo.issue"); 665 let repo = did("did:plc:squid"); 666 h.add_edge( 667 &kind, 668 &subject, 669 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"), 670 ); 671 h.add_edge( 672 &kind, 673 &subject, 674 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"), 675 ); 676 h.mount( 677 &did("did:plc:nel"), 678 &kind, 679 &rkey("ok"), 680 issue_body(&repo, "kelp survey"), 681 ) 682 .await; 683 Mock::given(method("GET")) 684 .and(path("/xrpc/com.atproto.repo.getRecord")) 685 .and(query_param("repo", "did:plc:teq")) 686 .and(query_param("collection", "sh.tangled.repo.issue")) 687 .and(query_param("rkey", "flaky")) 688 .respond_with(ResponseTemplate::new(503)) 689 .mount(&h.server) 690 .await; 691 692 let app = router(h.state.clone()); 693 let (status, body) = json_response( 694 app.clone() 695 .oneshot(list_request( 696 "sh.tangled.repo.listIssues", 697 subject.as_ref(), 698 &[], 699 )) 700 .await 701 .unwrap(), 702 ) 703 .await; 704 assert_eq!(status, StatusCode::OK); 705 assert_eq!(body["items"].as_array().unwrap().len(), 1); 706 707 let (cstatus, cbody) = json_response( 708 app.oneshot(list_request( 709 "sh.tangled.repo.countIssues", 710 subject.as_ref(), 711 &[], 712 )) 713 .await 714 .unwrap(), 715 ) 716 .await; 717 assert_eq!(cstatus, StatusCode::OK); 718 assert_eq!( 719 cbody["count"], 720 json!(2), 721 "a transient 503 must not evict the edge, count stays whole", 722 ); 723} 724 725#[tokio::test] 726async fn gone_item_is_evicted_so_count_converges_to_list() { 727 let h = Harness::new().await; 728 let subject = at("at://did:plc:squid"); 729 let kind = nsid("sh.tangled.repo.issue"); 730 let repo = did("did:plc:squid"); 731 h.add_edge( 732 &kind, 733 &subject, 734 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"), 735 ); 736 h.add_edge( 737 &kind, 738 &subject, 739 &at("at://did:plc:teq/sh.tangled.repo.issue/gone"), 740 ); 741 h.mount( 742 &did("did:plc:nel"), 743 &kind, 744 &rkey("ok"), 745 issue_body(&repo, "kelp survey"), 746 ) 747 .await; 748 Mock::given(method("GET")) 749 .and(path("/xrpc/com.atproto.repo.getRecord")) 750 .and(query_param("repo", "did:plc:teq")) 751 .and(query_param("collection", "sh.tangled.repo.issue")) 752 .and(query_param("rkey", "gone")) 753 .respond_with(ResponseTemplate::new(404)) 754 .mount(&h.server) 755 .await; 756 757 let app = router(h.state.clone()); 758 let (status, body) = json_response( 759 app.clone() 760 .oneshot(list_request( 761 "sh.tangled.repo.listIssues", 762 subject.as_ref(), 763 &[], 764 )) 765 .await 766 .unwrap(), 767 ) 768 .await; 769 assert_eq!(status, StatusCode::OK); 770 assert_eq!(body["items"].as_array().unwrap().len(), 1, "gone item dropped from the page"); 771 772 let (cstatus, cbody) = json_response( 773 app.oneshot(list_request( 774 "sh.tangled.repo.countIssues", 775 subject.as_ref(), 776 &[], 777 )) 778 .await 779 .unwrap(), 780 ) 781 .await; 782 assert_eq!(cstatus, StatusCode::OK); 783 assert_eq!( 784 cbody["count"], 785 json!(1), 786 "a definitive 404 must evict the dead edge so count matches the list", 787 ); 788} 789 790#[tokio::test] 791async fn handle_authority_subject_is_400() { 792 let h = Harness::new().await; 793 let app = router(h.state.clone()); 794 let cases = [ 795 "sh.tangled.feed.listStars", 796 "sh.tangled.feed.countStars", 797 "sh.tangled.graph.listFollows", 798 "sh.tangled.graph.countFollows", 799 "sh.tangled.repo.listIssues", 800 "sh.tangled.repo.countIssues", 801 "sh.tangled.repo.listPulls", 802 "sh.tangled.repo.countPulls", 803 "sh.tangled.repo.issue.listComments", 804 "sh.tangled.repo.issue.countComments", 805 ]; 806 stream::iter(cases) 807 .for_each(|endpoint| { 808 let app = app.clone(); 809 async move { 810 let resp = app 811 .oneshot(list_request(endpoint, "at://oyster.cafe", &[])) 812 .await 813 .unwrap(); 814 let (status, body) = json_response(resp).await; 815 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}"); 816 assert_eq!(body["error"], "InvalidRequest", "{endpoint}"); 817 assert!( 818 body["message"] 819 .as_str() 820 .unwrap_or_default() 821 .contains("did, not a handle"), 822 "{endpoint}: {}", 823 body["message"] 824 ); 825 } 826 }) 827 .await; 828} 829 830#[tokio::test] 831async fn empty_subject_is_400() { 832 let h = Harness::new().await; 833 let app = router(h.state.clone()); 834 let resp = app 835 .oneshot(list_request("sh.tangled.repo.listIssues", "", &[])) 836 .await 837 .unwrap(); 838 let (status, body) = json_response(resp).await; 839 assert_eq!(status, StatusCode::BAD_REQUEST); 840 assert_eq!(body["error"], "InvalidRequest"); 841} 842 843#[tokio::test] 844async fn limit_below_min_or_above_max_is_400() { 845 let h = Harness::new().await; 846 let app = router(h.state.clone()); 847 let cases = [("0", "below"), ("1001", "above")]; 848 stream::iter(cases) 849 .for_each(|(limit, label)| { 850 let app = app.clone(); 851 async move { 852 let resp = app 853 .oneshot(list_request( 854 "sh.tangled.repo.listIssues", 855 "at://did:plc:abalone", 856 &[("limit", limit)], 857 )) 858 .await 859 .unwrap(); 860 let (status, body) = json_response(resp).await; 861 assert_eq!(status, StatusCode::BAD_REQUEST, "limit {label}"); 862 assert_eq!(body["error"], "InvalidRequest", "limit {label}"); 863 } 864 }) 865 .await; 866} 867 868#[tokio::test] 869async fn count_after_remove_source_returns_zero() { 870 let h = Harness::new().await; 871 let subject = at("at://did:plc:abalone"); 872 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1"); 873 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source); 874 h.edges.remove_source(&source); 875 876 let app = router(h.state.clone()); 877 let (_, body) = json_response( 878 app.oneshot(list_request( 879 "sh.tangled.feed.countStars", 880 subject.as_ref(), 881 &[], 882 )) 883 .await 884 .unwrap(), 885 ) 886 .await; 887 assert_eq!(body["count"], json!(0)); 888 assert_eq!(body["distinctAuthors"], json!(0)); 889} 890 891#[tokio::test] 892async fn list_issue_comments_hydrates_end_to_end() { 893 let h = Harness::new().await; 894 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1"); 895 let nel = did("did:plc:nel"); 896 let rk = rkey("c1"); 897 h.add_edge( 898 &nsid("sh.tangled.repo.issue.comment"), 899 &issue_uri, 900 &at(&format!( 901 "at://{}/sh.tangled.repo.issue.comment/{}", 902 nel.as_ref(), 903 rk.as_ref() 904 )), 905 ); 906 h.mount( 907 &nel, 908 &nsid("sh.tangled.repo.issue.comment"), 909 &rk, 910 json!({ 911 "$type": "sh.tangled.repo.issue.comment", 912 "issue": issue_uri.as_ref(), 913 "body": "thoughts", 914 "createdAt": "2026-05-01T00:00:00Z" 915 }), 916 ) 917 .await; 918 919 let app = router(h.state.clone()); 920 let (status, body) = json_response( 921 app.oneshot(list_request( 922 "sh.tangled.repo.issue.listComments", 923 issue_uri.as_ref(), 924 &[], 925 )) 926 .await 927 .unwrap(), 928 ) 929 .await; 930 assert_eq!(status, StatusCode::OK); 931 let items = body["items"].as_array().unwrap(); 932 assert_eq!(items.len(), 1); 933 assert_eq!(items[0]["value"]["body"], json!("thoughts")); 934 assert_eq!(items[0]["value"]["issue"], json!(issue_uri.as_ref())); 935} 936 937#[tokio::test] 938async fn list_item_cid_is_present() { 939 let h = Harness::new().await; 940 let subject = at("at://did:plc:abalone"); 941 let nel = did("did:plc:nel"); 942 h.add_edge( 943 &nsid("sh.tangled.feed.star"), 944 &subject, 945 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())), 946 ); 947 h.mount( 948 &nel, 949 &nsid("sh.tangled.feed.star"), 950 &rkey("s1"), 951 star_body(&did("did:plc:abalone")), 952 ) 953 .await; 954 955 let app = router(h.state.clone()); 956 let (_, body) = json_response( 957 app.oneshot(list_request( 958 "sh.tangled.feed.listStars", 959 subject.as_ref(), 960 &[], 961 )) 962 .await 963 .unwrap(), 964 ) 965 .await; 966 let item = &body["items"][0]; 967 assert!( 968 item.as_object().unwrap().contains_key("cid"), 969 "list items must mirror getRecord output shape and include cid" 970 ); 971 assert_eq!(item["cid"], json!(CID)); 972} 973 974#[tokio::test] 975async fn count_issue_comments_subjects_on_issue_uri() { 976 let h = Harness::new().await; 977 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1"); 978 h.add_edge( 979 &nsid("sh.tangled.repo.issue.comment"), 980 &issue_uri, 981 &at("at://did:plc:nel/sh.tangled.repo.issue.comment/c1"), 982 ); 983 h.add_edge( 984 &nsid("sh.tangled.repo.issue.comment"), 985 &issue_uri, 986 &at("at://did:plc:olaren/sh.tangled.repo.issue.comment/c2"), 987 ); 988 989 let app = router(h.state.clone()); 990 let (status, body) = json_response( 991 app.oneshot(list_request( 992 "sh.tangled.repo.issue.countComments", 993 issue_uri.as_ref(), 994 &[], 995 )) 996 .await 997 .unwrap(), 998 ) 999 .await; 1000 assert_eq!(status, StatusCode::OK); 1001 assert_eq!(body["count"], json!(2)); 1002 assert_eq!(body["distinctAuthors"], json!(2)); 1003} 1004 1005#[tokio::test] 1006async fn list_item_404_dropped_not_404_for_subject() { 1007 let h = Harness::new().await; 1008 let subject = at("at://did:plc:squid"); 1009 let kind = nsid("sh.tangled.repo.issue"); 1010 let repo = did("did:plc:squid"); 1011 h.add_edge( 1012 &kind, 1013 &subject, 1014 &at("at://did:plc:nel/sh.tangled.repo.issue/live"), 1015 ); 1016 h.add_edge( 1017 &kind, 1018 &subject, 1019 &at("at://did:plc:teq/sh.tangled.repo.issue/missing"), 1020 ); 1021 h.mount( 1022 &did("did:plc:nel"), 1023 &kind, 1024 &rkey("live"), 1025 issue_body(&repo, "kelp survives"), 1026 ) 1027 .await; 1028 Mock::given(method("GET")) 1029 .and(path("/xrpc/com.atproto.repo.getRecord")) 1030 .and(query_param("repo", "did:plc:teq")) 1031 .and(query_param("collection", "sh.tangled.repo.issue")) 1032 .and(query_param("rkey", "missing")) 1033 .respond_with(ResponseTemplate::new(404).set_body_json(json!({ 1034 "error": "RecordNotFound", 1035 "message": "could not find record" 1036 }))) 1037 .mount(&h.server) 1038 .await; 1039 1040 let app = router(h.state.clone()); 1041 let (status, body) = json_response( 1042 app.oneshot(list_request( 1043 "sh.tangled.repo.listIssues", 1044 subject.as_ref(), 1045 &[], 1046 )) 1047 .await 1048 .unwrap(), 1049 ) 1050 .await; 1051 assert_eq!( 1052 status, 1053 StatusCode::OK, 1054 "a stale-index 404 drops that item, it must not 404 or 502 the subject's list", 1055 ); 1056 let items = body["items"].as_array().expect("items array"); 1057 assert_eq!(items.len(), 1, "stale 404 item dropped, live sibling kept"); 1058 assert_eq!( 1059 items[0]["uri"].as_str().unwrap(), 1060 "at://did:plc:nel/sh.tangled.repo.issue/live", 1061 ); 1062} 1063 1064#[tokio::test] 1065async fn list_item_with_wrong_type_tag_dropped() { 1066 let h = Harness::new().await; 1067 let subject = at("at://did:plc:squid"); 1068 let kind = nsid("sh.tangled.feed.star"); 1069 h.add_edge( 1070 &kind, 1071 &subject, 1072 &at("at://did:plc:nel/sh.tangled.feed.star/good"), 1073 ); 1074 h.add_edge( 1075 &kind, 1076 &subject, 1077 &at("at://did:plc:teq/sh.tangled.feed.star/wrong"), 1078 ); 1079 h.mount( 1080 &did("did:plc:nel"), 1081 &kind, 1082 &rkey("good"), 1083 star_body(&did("did:plc:squid")), 1084 ) 1085 .await; 1086 Mock::given(method("GET")) 1087 .and(path("/xrpc/com.atproto.repo.getRecord")) 1088 .and(query_param("repo", "did:plc:teq")) 1089 .and(query_param("collection", "sh.tangled.feed.star")) 1090 .and(query_param("rkey", "wrong")) 1091 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 1092 "uri": "at://did:plc:teq/sh.tangled.feed.star/wrong", 1093 "cid": CID, 1094 "value": { 1095 "$type": "sh.tangled.feed.reaction", 1096 "createdAt": "2026-05-01T00:00:00Z", 1097 "subject": "at://did:plc:squid" 1098 } 1099 }))) 1100 .mount(&h.server) 1101 .await; 1102 1103 let app = router(h.state.clone()); 1104 let (status, body) = json_response( 1105 app.oneshot(list_request( 1106 "sh.tangled.feed.listStars", 1107 subject.as_ref(), 1108 &[], 1109 )) 1110 .await 1111 .unwrap(), 1112 ) 1113 .await; 1114 assert_eq!(status, StatusCode::OK); 1115 let items = body["items"].as_array().expect("items array"); 1116 assert_eq!(items.len(), 1, "wrong-type item dropped, valid star kept"); 1117 assert_eq!( 1118 items[0]["uri"].as_str().unwrap(), 1119 "at://did:plc:nel/sh.tangled.feed.star/good", 1120 ); 1121} 1122 1123#[tokio::test] 1124async fn list_item_with_mismatched_collection_dropped() { 1125 let h = Harness::new().await; 1126 let subject = at("at://did:plc:squid"); 1127 let kind = nsid("sh.tangled.repo.issue"); 1128 let repo = did("did:plc:squid"); 1129 h.add_edge( 1130 &kind, 1131 &subject, 1132 &at("at://did:plc:nel/sh.tangled.repo.issue/live"), 1133 ); 1134 h.add_edge( 1135 &kind, 1136 &subject, 1137 &at("at://did:plc:teq/sh.tangled.feed.star/whelk"), 1138 ); 1139 h.mount( 1140 &did("did:plc:nel"), 1141 &kind, 1142 &rkey("live"), 1143 issue_body(&repo, "kelp survives"), 1144 ) 1145 .await; 1146 1147 let app = router(h.state.clone()); 1148 let (status, body) = json_response( 1149 app.oneshot(list_request( 1150 "sh.tangled.repo.listIssues", 1151 subject.as_ref(), 1152 &[], 1153 )) 1154 .await 1155 .unwrap(), 1156 ) 1157 .await; 1158 assert_eq!( 1159 status, 1160 StatusCode::OK, 1161 "a mismatched-collection index edge must not 400 the subject's list", 1162 ); 1163 let items = body["items"].as_array().expect("items array"); 1164 assert_eq!( 1165 items.len(), 1166 1, 1167 "mismatched-collection edge dropped, live sibling kept" 1168 ); 1169 assert_eq!( 1170 items[0]["uri"].as_str().unwrap(), 1171 "at://did:plc:nel/sh.tangled.repo.issue/live", 1172 ); 1173} 1174 1175#[tokio::test] 1176async fn bare_did_endpoints_reject_at_uri_subject() { 1177 let h = Harness::new().await; 1178 let app = router(h.state.clone()); 1179 let cases = [ 1180 "sh.tangled.graph.listFollows", 1181 "sh.tangled.graph.countFollows", 1182 ]; 1183 stream::iter(cases) 1184 .for_each(|endpoint| { 1185 let app = app.clone(); 1186 async move { 1187 let resp = app 1188 .oneshot(list_request( 1189 endpoint, 1190 "at://did:plc:abalone/sh.tangled.repo/r1", 1191 &[], 1192 )) 1193 .await 1194 .unwrap(); 1195 let (status, body) = json_response(resp).await; 1196 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}"); 1197 assert_eq!(body["error"], "InvalidRequest", "{endpoint}"); 1198 assert!( 1199 body["message"] 1200 .as_str() 1201 .unwrap_or_default() 1202 .contains("bare did"), 1203 "{endpoint}: {}", 1204 body["message"], 1205 ); 1206 } 1207 }) 1208 .await; 1209} 1210 1211#[tokio::test] 1212async fn repo_pointing_endpoints_reject_at_uri_subject() { 1213 let h = Harness::new().await; 1214 let app = router(h.state.clone()); 1215 let cases = [ 1216 "sh.tangled.repo.listIssues", 1217 "sh.tangled.repo.countIssues", 1218 "sh.tangled.repo.listPulls", 1219 "sh.tangled.repo.countPulls", 1220 "sh.tangled.repo.listArtifacts", 1221 "sh.tangled.repo.countArtifacts", 1222 ]; 1223 stream::iter(cases) 1224 .for_each(|endpoint| { 1225 let app = app.clone(); 1226 async move { 1227 let resp = app 1228 .oneshot(list_request( 1229 endpoint, 1230 "at://did:plc:abalone/sh.tangled.repo/r1", 1231 &[], 1232 )) 1233 .await 1234 .unwrap(); 1235 let (status, body) = json_response(resp).await; 1236 assert_eq!( 1237 status, 1238 StatusCode::BAD_REQUEST, 1239 "{endpoint} must reject rkey-form subjects since rkeys are unstable; clients must send the repoDID", 1240 ); 1241 assert!( 1242 body["message"] 1243 .as_str() 1244 .unwrap_or_default() 1245 .contains("bare did"), 1246 "{endpoint}: {}", 1247 body["message"], 1248 ); 1249 } 1250 }) 1251 .await; 1252} 1253 1254#[tokio::test] 1255async fn repo_pointing_endpoints_accept_bare_did() { 1256 let h = Harness::new().await; 1257 let app = router(h.state.clone()); 1258 let cases = [ 1259 "sh.tangled.repo.listIssues", 1260 "sh.tangled.repo.countIssues", 1261 "sh.tangled.repo.listPulls", 1262 "sh.tangled.repo.countPulls", 1263 "sh.tangled.repo.listArtifacts", 1264 "sh.tangled.repo.countArtifacts", 1265 ]; 1266 stream::iter(cases) 1267 .for_each(|endpoint| { 1268 let app = app.clone(); 1269 async move { 1270 let resp = app 1271 .oneshot(list_request(endpoint, "did:plc:abalone", &[])) 1272 .await 1273 .unwrap(); 1274 let (status, _body) = json_response(resp).await; 1275 assert_eq!(status, StatusCode::OK, "{endpoint} must accept bare did"); 1276 } 1277 }) 1278 .await; 1279} 1280 1281#[tokio::test] 1282async fn issue_collection_endpoints_reject_bare_did_or_wrong_collection() { 1283 let h = Harness::new().await; 1284 let app = router(h.state.clone()); 1285 let endpoints = [ 1286 "sh.tangled.repo.issue.listComments", 1287 "sh.tangled.repo.issue.countComments", 1288 ]; 1289 let inputs = [ 1290 "at://did:plc:abalone", 1291 "at://did:plc:abalone/sh.tangled.repo/r1", 1292 ]; 1293 let cases = endpoints 1294 .iter() 1295 .copied() 1296 .flat_map(|endpoint| inputs.iter().copied().map(move |input| (endpoint, input))); 1297 stream::iter(cases) 1298 .for_each(|(endpoint, input)| { 1299 let app = app.clone(); 1300 async move { 1301 let resp = app 1302 .oneshot(list_request(endpoint, input, &[])) 1303 .await 1304 .unwrap(); 1305 let (status, body) = json_response(resp).await; 1306 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint} input={input}"); 1307 assert!( 1308 body["message"] 1309 .as_str() 1310 .unwrap_or_default() 1311 .contains("sh.tangled.repo.issue/<rkey>"), 1312 "{endpoint} input={input}: {}", 1313 body["message"], 1314 ); 1315 } 1316 }) 1317 .await; 1318} 1319 1320#[tokio::test] 1321async fn star_endpoints_reject_unrelated_collection() { 1322 let h = Harness::new().await; 1323 let app = router(h.state.clone()); 1324 let endpoints = ["sh.tangled.feed.listStars", "sh.tangled.feed.countStars"]; 1325 stream::iter(endpoints) 1326 .for_each(|endpoint| { 1327 let app = app.clone(); 1328 async move { 1329 let resp = app 1330 .oneshot(list_request( 1331 endpoint, 1332 "at://did:plc:abalone/sh.tangled.knot/k1", 1333 &[], 1334 )) 1335 .await 1336 .unwrap(); 1337 let (status, body) = json_response(resp).await; 1338 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}"); 1339 let msg = body["message"].as_str().unwrap_or_default(); 1340 assert!(msg.contains("sh.tangled.string"), "{endpoint}: {msg}",); 1341 } 1342 }) 1343 .await; 1344} 1345 1346#[tokio::test] 1347async fn star_endpoints_reject_repo_uri_subject() { 1348 let h = Harness::new().await; 1349 let app = router(h.state.clone()); 1350 let resp = app 1351 .oneshot(list_request( 1352 "sh.tangled.feed.countStars", 1353 "at://did:plc:abalone/sh.tangled.repo/r1", 1354 &[], 1355 )) 1356 .await 1357 .unwrap(); 1358 let (status, body) = json_response(resp).await; 1359 assert_eq!( 1360 status, 1361 StatusCode::BAD_REQUEST, 1362 "rkey-form repo URI must be rejected; clients must send the repoDID directly", 1363 ); 1364 let msg = body["message"].as_str().unwrap_or_default(); 1365 assert!(msg.contains("sh.tangled.string"), "{msg}"); 1366} 1367 1368#[tokio::test] 1369async fn star_endpoints_accept_string_subject_form() { 1370 let h = Harness::new().await; 1371 let app = router(h.state.clone()); 1372 let resp = app 1373 .oneshot(list_request( 1374 "sh.tangled.feed.countStars", 1375 "at://did:plc:abalone/sh.tangled.string/k1", 1376 &[], 1377 )) 1378 .await 1379 .unwrap(); 1380 let (status, body) = json_response(resp).await; 1381 assert_eq!(status, StatusCode::OK); 1382 assert_eq!(body["count"], json!(0)); 1383} 1384 1385#[tokio::test] 1386async fn list_after_remove_source_returns_empty_items() { 1387 let h = Harness::new().await; 1388 let subject = at("at://did:plc:abalone"); 1389 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1"); 1390 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source); 1391 h.edges.remove_source(&source); 1392 1393 let app = router(h.state.clone()); 1394 let (status, body) = json_response( 1395 app.oneshot(list_request( 1396 "sh.tangled.feed.listStars", 1397 subject.as_ref(), 1398 &[], 1399 )) 1400 .await 1401 .unwrap(), 1402 ) 1403 .await; 1404 assert_eq!(status, StatusCode::OK); 1405 assert_eq!(body["items"], json!([])); 1406 assert!(body["cursor"].is_null()); 1407} 1408 1409#[tokio::test] 1410async fn list_pulls_hydrates_via_slingshot_when_edges_present() { 1411 let h = Harness::new().await; 1412 let target_did = did("did:plc:abalone"); 1413 let subject = at(&format!("at://{}", target_did.as_ref())); 1414 let source_did = did("did:plc:nel"); 1415 let rk = rkey("p1"); 1416 h.add_edge( 1417 &nsid("sh.tangled.repo.pull"), 1418 &subject, 1419 &at(&format!( 1420 "at://{}/sh.tangled.repo.pull/{}", 1421 source_did.as_ref(), 1422 rk.as_ref() 1423 )), 1424 ); 1425 h.mount( 1426 &source_did, 1427 &nsid("sh.tangled.repo.pull"), 1428 &rk, 1429 json!({ 1430 "$type": "sh.tangled.repo.pull", 1431 "title": "ship it", 1432 "createdAt": "2026-05-01T00:00:00Z", 1433 "rounds": [], 1434 "target": {"repo": target_did.as_ref(), "branch": "main"}, 1435 }), 1436 ) 1437 .await; 1438 let app = router(h.state.clone()); 1439 let (status, body) = json_response( 1440 app.oneshot(list_request( 1441 "sh.tangled.repo.listPulls", 1442 subject.as_ref(), 1443 &[], 1444 )) 1445 .await 1446 .unwrap(), 1447 ) 1448 .await; 1449 assert_eq!(status, StatusCode::OK); 1450 let items = body["items"].as_array().unwrap(); 1451 assert_eq!(items.len(), 1); 1452 assert_eq!(items[0]["value"]["title"], json!("ship it")); 1453 assert_eq!( 1454 items[0]["value"]["target"]["repo"], 1455 json!(target_did.as_ref()) 1456 ); 1457} 1458 1459#[tokio::test] 1460async fn count_pulls_returns_distinct_authors() { 1461 let h = Harness::new().await; 1462 let subject = at("at://did:plc:abalone"); 1463 h.add_edge( 1464 &nsid("sh.tangled.repo.pull"), 1465 &subject, 1466 &at("at://did:plc:nel/sh.tangled.repo.pull/p1"), 1467 ); 1468 h.add_edge( 1469 &nsid("sh.tangled.repo.pull"), 1470 &subject, 1471 &at("at://did:plc:olaren/sh.tangled.repo.pull/p2"), 1472 ); 1473 h.add_edge( 1474 &nsid("sh.tangled.repo.pull"), 1475 &subject, 1476 &at("at://did:plc:nel/sh.tangled.repo.pull/p3"), 1477 ); 1478 let app = router(h.state.clone()); 1479 let (_, body) = json_response( 1480 app.oneshot(list_request( 1481 "sh.tangled.repo.countPulls", 1482 subject.as_ref(), 1483 &[], 1484 )) 1485 .await 1486 .unwrap(), 1487 ) 1488 .await; 1489 assert_eq!(body["count"], json!(3)); 1490 assert_eq!(body["distinctAuthors"], json!(2)); 1491} 1492 1493#[tokio::test] 1494async fn extractor_to_xrpc_round_trip_for_star() { 1495 let h = Harness::new().await; 1496 let subject_did = did("did:plc:abalone"); 1497 let source_did = did("did:plc:nel"); 1498 let rk = rkey("s1"); 1499 let source = at(&format!( 1500 "at://{}/sh.tangled.feed.star/{}", 1501 source_did.as_ref(), 1502 rk.as_ref() 1503 )); 1504 let body = star_body(&subject_did); 1505 let parsed = 1506 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.feed.star"), body.clone()) 1507 .expect("parse star record"); 1508 parsed 1509 .extract_edges(&source) 1510 .expect("extract") 1511 .into_iter() 1512 .for_each(|e| h.edges.add(e)); 1513 h.mount(&source_did, &nsid("sh.tangled.feed.star"), &rk, body) 1514 .await; 1515 1516 let app = router(h.state.clone()); 1517 let (status, json) = json_response( 1518 app.oneshot(list_request( 1519 "sh.tangled.feed.listStars", 1520 &format!("at://{}", subject_did.as_ref()), 1521 &[], 1522 )) 1523 .await 1524 .unwrap(), 1525 ) 1526 .await; 1527 assert_eq!( 1528 status, 1529 StatusCode::OK, 1530 "extractor key must match handler subject, body was {json}", 1531 ); 1532 let items = json["items"].as_array().unwrap(); 1533 assert_eq!(items.len(), 1, "expected exactly one star edge"); 1534 assert_eq!( 1535 items[0]["value"]["subject"]["did"], 1536 json!(subject_did.as_ref()) 1537 ); 1538} 1539 1540#[tokio::test] 1541async fn list_issues_includes_state_comment_count_and_state_updated_at() { 1542 let h = Harness::new().await; 1543 let repo = did("did:plc:limpet"); 1544 let subject = at(&format!("at://{}", repo.as_ref())); 1545 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1546 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1547 h.mount( 1548 &did("did:plc:nel"), 1549 &nsid("sh.tangled.repo.issue"), 1550 &rkey("i1"), 1551 issue_body(&repo, "hi"), 1552 ) 1553 .await; 1554 h.add_edge( 1555 &nsid("sh.tangled.repo.issue.comment"), 1556 &issue_uri, 1557 &at("at://did:plc:olaren/sh.tangled.repo.issue.comment/c1"), 1558 ); 1559 h.add_edge( 1560 &nsid("sh.tangled.repo.issue.comment"), 1561 &issue_uri, 1562 &at("at://did:plc:teq/sh.tangled.repo.issue.comment/c2"), 1563 ); 1564 1565 h.state.issue_states.upsert( 1566 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"), 1567 issue_uri.clone(), 1568 1_777_593_600_000_000, 1569 IssueStateKind::Open, 1570 ); 1571 h.state.issue_states.upsert( 1572 at("at://did:plc:nel/sh.tangled.repo.issue.state/s2"), 1573 issue_uri.clone(), 1574 1_777_593_700_000_000, 1575 IssueStateKind::Closed, 1576 ); 1577 1578 let app = router(h.state.clone()); 1579 let resp = app 1580 .oneshot(list_request( 1581 "sh.tangled.repo.listIssues", 1582 subject.as_ref(), 1583 &[], 1584 )) 1585 .await 1586 .unwrap(); 1587 let (status, body) = json_response(resp).await; 1588 assert_eq!(status, StatusCode::OK); 1589 let item = &body["items"][0]; 1590 assert_eq!(item["state"], json!("closed")); 1591 assert_eq!(item["commentCount"], json!(2)); 1592 let updated = item["stateUpdatedAt"] 1593 .as_str() 1594 .expect("stateUpdatedAt must serialize as RFC3339 string"); 1595 assert!( 1596 updated.starts_with("2026-"), 1597 "expected 2026 timestamp, got {updated}" 1598 ); 1599} 1600 1601#[tokio::test] 1602async fn list_issues_defaults_to_open_when_no_state_record() { 1603 let h = Harness::new().await; 1604 let repo = did("did:plc:limpet"); 1605 let subject = at(&format!("at://{}", repo.as_ref())); 1606 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1607 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1608 h.mount( 1609 &did("did:plc:nel"), 1610 &nsid("sh.tangled.repo.issue"), 1611 &rkey("i1"), 1612 issue_body(&repo, "no state yet"), 1613 ) 1614 .await; 1615 1616 let app = router(h.state.clone()); 1617 let (_status, body) = json_response( 1618 app.oneshot(list_request( 1619 "sh.tangled.repo.listIssues", 1620 subject.as_ref(), 1621 &[], 1622 )) 1623 .await 1624 .unwrap(), 1625 ) 1626 .await; 1627 let item = &body["items"][0]; 1628 assert_eq!( 1629 item["state"], 1630 json!("open"), 1631 "absent state record defaults to open" 1632 ); 1633 assert!( 1634 item.get("stateUpdatedAt").is_none(), 1635 "stateUpdatedAt must be absent without a state record", 1636 ); 1637 assert_eq!(item["commentCount"], json!(0)); 1638} 1639 1640#[tokio::test] 1641async fn list_issues_author_filter_restricts_to_matching_did() { 1642 let h = Harness::new().await; 1643 let repo = did("did:plc:limpet"); 1644 let subject = at(&format!("at://{}", repo.as_ref())); 1645 let owners = [ 1646 ("did:plc:nel", "n1"), 1647 ("did:plc:nel", "n2"), 1648 ("did:plc:olaren", "o1"), 1649 ("did:plc:olaren", "o2"), 1650 ]; 1651 stream::iter(owners) 1652 .for_each(|(d, r)| { 1653 let h = &h; 1654 let subject = subject.clone(); 1655 let repo = repo.clone(); 1656 async move { 1657 let d_did = did(d); 1658 let rk = rkey(r); 1659 h.add_edge( 1660 &nsid("sh.tangled.repo.issue"), 1661 &subject, 1662 &at(&format!( 1663 "at://{}/sh.tangled.repo.issue/{}", 1664 d_did.as_ref(), 1665 rk.as_ref() 1666 )), 1667 ); 1668 h.mount( 1669 &d_did, 1670 &nsid("sh.tangled.repo.issue"), 1671 &rk, 1672 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 1673 ) 1674 .await; 1675 } 1676 }) 1677 .await; 1678 1679 let app = router(h.state.clone()); 1680 let (status, body) = json_response( 1681 app.oneshot(list_request( 1682 "sh.tangled.repo.listIssues", 1683 subject.as_ref(), 1684 &[("author", "did:plc:nel")], 1685 )) 1686 .await 1687 .unwrap(), 1688 ) 1689 .await; 1690 assert_eq!(status, StatusCode::OK); 1691 let items = body["items"].as_array().expect("items array"); 1692 assert_eq!(items.len(), 2, "two issues authored by nel"); 1693 let all_nel = items 1694 .iter() 1695 .all(|i| i["uri"].as_str().unwrap().starts_with("at://did:plc:nel/")); 1696 assert!(all_nel, "every returned uri must be authored by nel"); 1697} 1698 1699#[tokio::test] 1700async fn list_issues_invalid_author_returns_400() { 1701 let h = Harness::new().await; 1702 let subject = "at://did:plc:limpet".to_owned(); 1703 let app = router(h.state.clone()); 1704 let resp = app 1705 .oneshot(list_request( 1706 "sh.tangled.repo.listIssues", 1707 &subject, 1708 &[("author", "not-a-did")], 1709 )) 1710 .await 1711 .unwrap(); 1712 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 1713} 1714 1715#[tokio::test] 1716async fn list_pulls_includes_merged_state_and_comment_count() { 1717 let h = Harness::new().await; 1718 let repo = did("did:plc:limpet"); 1719 let subject = at(&format!("at://{}", repo.as_ref())); 1720 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1"); 1721 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri); 1722 h.mount( 1723 &did("did:plc:nel"), 1724 &nsid("sh.tangled.repo.pull"), 1725 &rkey("p1"), 1726 pull_body(&repo, "fix bug"), 1727 ) 1728 .await; 1729 h.add_edge( 1730 &nsid("sh.tangled.repo.pull.comment"), 1731 &pull_uri, 1732 &at("at://did:plc:teq/sh.tangled.repo.pull.comment/c1"), 1733 ); 1734 h.state.pull_statuses.upsert( 1735 at("at://did:plc:nel/sh.tangled.repo.pull.status/s1"), 1736 pull_uri.clone(), 1737 1_777_593_600_000_000, 1738 PullStatusKind::Open, 1739 ); 1740 h.state.pull_statuses.upsert( 1741 at("at://did:plc:nel/sh.tangled.repo.pull.status/s2"), 1742 pull_uri.clone(), 1743 1_777_593_800_000_000, 1744 PullStatusKind::Merged, 1745 ); 1746 1747 let app = router(h.state.clone()); 1748 let (status, body) = json_response( 1749 app.oneshot(list_request( 1750 "sh.tangled.repo.listPulls", 1751 subject.as_ref(), 1752 &[], 1753 )) 1754 .await 1755 .unwrap(), 1756 ) 1757 .await; 1758 assert_eq!(status, StatusCode::OK); 1759 let item = &body["items"][0]; 1760 assert_eq!(item["state"], json!("merged")); 1761 assert_eq!(item["commentCount"], json!(1)); 1762} 1763 1764#[tokio::test] 1765async fn list_issues_state_filter_open_includes_records_without_state() { 1766 let h = Harness::new().await; 1767 let repo = did("did:plc:limpet"); 1768 let subject = at(&format!("at://{}", repo.as_ref())); 1769 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1770 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1771 h.mount( 1772 &did("did:plc:nel"), 1773 &nsid("sh.tangled.repo.issue"), 1774 &rkey("i1"), 1775 issue_body(&repo, "fresh"), 1776 ) 1777 .await; 1778 1779 let app = router(h.state.clone()); 1780 let (status, body) = json_response( 1781 app.oneshot(list_request( 1782 "sh.tangled.repo.listIssues", 1783 subject.as_ref(), 1784 &[("state", "open")], 1785 )) 1786 .await 1787 .unwrap(), 1788 ) 1789 .await; 1790 assert_eq!(status, StatusCode::OK); 1791 let items = body["items"].as_array().expect("items array"); 1792 assert_eq!( 1793 items.len(), 1794 1, 1795 "absent state record still matches state=open" 1796 ); 1797} 1798 1799#[tokio::test] 1800async fn list_issues_state_filter_ignores_third_party_state_source() { 1801 let h = Harness::new().await; 1802 let repo = did("did:plc:limpet"); 1803 let subject = at(&format!("at://{}", repo.as_ref())); 1804 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1805 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1806 h.mount( 1807 &did("did:plc:nel"), 1808 &nsid("sh.tangled.repo.issue"), 1809 &rkey("i1"), 1810 issue_body(&repo, "open issue"), 1811 ) 1812 .await; 1813 h.state.issue_states.upsert( 1814 at("at://did:plc:nautilus/sh.tangled.repo.issue.state/spoof"), 1815 issue_uri.clone(), 1816 1_777_593_800_000_000, 1817 IssueStateKind::Closed, 1818 ); 1819 1820 let app = router(h.state.clone()); 1821 let (status, body) = json_response( 1822 app.oneshot(list_request( 1823 "sh.tangled.repo.listIssues", 1824 subject.as_ref(), 1825 &[("state", "open")], 1826 )) 1827 .await 1828 .unwrap(), 1829 ) 1830 .await; 1831 assert_eq!(status, StatusCode::OK); 1832 let items = body["items"].as_array().expect("items array"); 1833 assert_eq!( 1834 items.len(), 1835 1, 1836 "third-party Closed record must not flip filter result for state=open", 1837 ); 1838 assert_eq!(items[0]["state"], json!("open")); 1839 assert!( 1840 items[0].get("stateUpdatedAt").is_none(), 1841 "third-party state source must not surface stateUpdatedAt", 1842 ); 1843} 1844 1845#[tokio::test] 1846async fn list_pulls_status_filter_ignores_third_party_status_source() { 1847 let h = Harness::new().await; 1848 let repo = did("did:plc:limpet"); 1849 let subject = at(&format!("at://{}", repo.as_ref())); 1850 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1"); 1851 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri); 1852 h.mount( 1853 &did("did:plc:nel"), 1854 &nsid("sh.tangled.repo.pull"), 1855 &rkey("p1"), 1856 pull_body(&repo, "wip"), 1857 ) 1858 .await; 1859 h.state.pull_statuses.upsert( 1860 at("at://did:plc:nautilus/sh.tangled.repo.pull.status/spoof"), 1861 pull_uri.clone(), 1862 1_777_593_800_000_000, 1863 PullStatusKind::Merged, 1864 ); 1865 1866 let app = router(h.state.clone()); 1867 let (status, body) = json_response( 1868 app.oneshot(list_request( 1869 "sh.tangled.repo.listPulls", 1870 subject.as_ref(), 1871 &[("status", "merged")], 1872 )) 1873 .await 1874 .unwrap(), 1875 ) 1876 .await; 1877 assert_eq!(status, StatusCode::OK); 1878 let items = body["items"].as_array().expect("items array"); 1879 assert_eq!( 1880 items.len(), 1881 0, 1882 "third-party Merged record must not satisfy status=merged" 1883 ); 1884} 1885 1886#[tokio::test] 1887async fn list_issues_state_filter_accepts_repo_owner_state_source() { 1888 let h = Harness::new().await; 1889 let repo_owner = did("did:plc:limpet"); 1890 let subject = at(&format!("at://{}", repo_owner.as_ref())); 1891 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1892 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1893 h.mount( 1894 &did("did:plc:nel"), 1895 &nsid("sh.tangled.repo.issue"), 1896 &rkey("i1"), 1897 issue_body(&repo_owner, "owner closed"), 1898 ) 1899 .await; 1900 h.state.issue_states.upsert( 1901 at("at://did:plc:limpet/sh.tangled.repo.issue.state/legit"), 1902 issue_uri.clone(), 1903 1_777_593_800_000_000, 1904 IssueStateKind::Closed, 1905 ); 1906 1907 let app = router(h.state.clone()); 1908 let (status, body) = json_response( 1909 app.oneshot(list_request( 1910 "sh.tangled.repo.listIssues", 1911 subject.as_ref(), 1912 &[("state", "closed")], 1913 )) 1914 .await 1915 .unwrap(), 1916 ) 1917 .await; 1918 assert_eq!(status, StatusCode::OK); 1919 let items = body["items"].as_array().expect("items array"); 1920 assert_eq!( 1921 items.len(), 1922 1, 1923 "repo-owner state record must satisfy state=closed" 1924 ); 1925 assert_eq!(items[0]["state"], json!("closed")); 1926} 1927 1928#[tokio::test] 1929async fn list_issues_order_asc_returns_oldest_first() { 1930 let h = Harness::new().await; 1931 let repo = did("did:plc:limpet"); 1932 let subject = at(&format!("at://{}", repo.as_ref())); 1933 let rkeys = ["a", "b", "c"]; 1934 stream::iter(rkeys) 1935 .for_each(|r| { 1936 let h = &h; 1937 let subject = subject.clone(); 1938 let repo = repo.clone(); 1939 async move { 1940 let rk = rkey(r); 1941 let issue_uri = at(&format!( 1942 "at://did:plc:nel/sh.tangled.repo.issue/{}", 1943 rk.as_ref() 1944 )); 1945 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1946 h.mount( 1947 &did("did:plc:nel"), 1948 &nsid("sh.tangled.repo.issue"), 1949 &rk, 1950 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 1951 ) 1952 .await; 1953 } 1954 }) 1955 .await; 1956 1957 let app = router(h.state.clone()); 1958 let (_, asc) = json_response( 1959 app.clone() 1960 .oneshot(list_request( 1961 "sh.tangled.repo.listIssues", 1962 subject.as_ref(), 1963 &[("order", "asc")], 1964 )) 1965 .await 1966 .unwrap(), 1967 ) 1968 .await; 1969 let (_, desc) = json_response( 1970 app.oneshot(list_request( 1971 "sh.tangled.repo.listIssues", 1972 subject.as_ref(), 1973 &[("order", "desc")], 1974 )) 1975 .await 1976 .unwrap(), 1977 ) 1978 .await; 1979 let asc_uris: Vec<_> = asc["items"] 1980 .as_array() 1981 .unwrap() 1982 .iter() 1983 .map(|i| i["uri"].as_str().unwrap().to_owned()) 1984 .collect(); 1985 let desc_uris: Vec<_> = desc["items"] 1986 .as_array() 1987 .unwrap() 1988 .iter() 1989 .map(|i| i["uri"].as_str().unwrap().to_owned()) 1990 .collect(); 1991 let mut reversed = asc_uris.clone(); 1992 reversed.reverse(); 1993 assert_eq!(asc_uris.len(), 3); 1994 assert_eq!(desc_uris, reversed, "desc must be exact reverse of asc"); 1995} 1996 1997#[tokio::test] 1998async fn list_issues_by_state_filter_narrows_results() { 1999 let h = Harness::new().await; 2000 let author = did("did:plc:nel"); 2001 let repo = did("did:plc:limpet"); 2002 let open_uri = at("at://did:plc:nel/sh.tangled.repo.issue/open1"); 2003 let closed_uri = at("at://did:plc:nel/sh.tangled.repo.issue/closed1"); 2004 let author_subject = at(&format!("at://{}", author.as_ref())); 2005 h.edges.add(Edge { 2006 kind: nsid("sh.tangled.repo.issue.by"), 2007 subject: SubjectRef::Did(author.clone()), 2008 source: open_uri.clone(), 2009 sort_micros: next_sort_micros(), 2010 }); 2011 h.edges.add(Edge { 2012 kind: nsid("sh.tangled.repo.issue.by"), 2013 subject: SubjectRef::Did(author.clone()), 2014 source: closed_uri.clone(), 2015 sort_micros: next_sort_micros(), 2016 }); 2017 h.mount( 2018 &author, 2019 &nsid("sh.tangled.repo.issue"), 2020 &rkey("open1"), 2021 issue_body(&repo, "still open"), 2022 ) 2023 .await; 2024 h.mount( 2025 &author, 2026 &nsid("sh.tangled.repo.issue"), 2027 &rkey("closed1"), 2028 issue_body(&repo, "shut"), 2029 ) 2030 .await; 2031 h.state.issue_states.upsert( 2032 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"), 2033 closed_uri.clone(), 2034 1_777_593_800_000_000, 2035 IssueStateKind::Closed, 2036 ); 2037 2038 let app = router(h.state.clone()); 2039 let (status, body) = json_response( 2040 app.oneshot(list_request( 2041 "sh.tangled.repo.listIssuesBy", 2042 author_subject.as_ref(), 2043 &[("state", "closed")], 2044 )) 2045 .await 2046 .unwrap(), 2047 ) 2048 .await; 2049 assert_eq!(status, StatusCode::OK); 2050 let items = body["items"].as_array().expect("items array"); 2051 assert_eq!( 2052 items.len(), 2053 1, 2054 "only the closed issue survives state=closed" 2055 ); 2056 assert_eq!(items[0]["uri"], json!(closed_uri.as_ref())); 2057}