Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at master 67 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{ 5 Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, PageToken, PullStatusKind, 6 StateIndex, 7}; 8use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 9use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 10use bobbin_resolver::RepoIdResolver; 11use bobbin_runtime::{RuntimeHasher, SystemClock}; 12use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 13use bobbin_slingshot_client::SlingshotClient; 14use bobbin_types::edges::Edge; 15use bobbin_types::ids::SubjectRef; 16use bobbin_xrpc::{AppState, router}; 17use futures::stream::{self, StreamExt}; 18use http::{Request, StatusCode}; 19use jacquard_common::DefaultStr; 20use jacquard_common::types::did::Did; 21use jacquard_common::types::nsid::Nsid; 22use jacquard_common::types::recordkey::Rkey; 23use jacquard_common::types::string::AtUri; 24use serde_json::{Value, json}; 25use tower::ServiceExt; 26use url::Url; 27use url::form_urlencoded::byte_serialize; 28use wiremock::matchers::{method, path, query_param}; 29use wiremock::{Mock, MockServer, ResponseTemplate}; 30 31const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 32 33fn at(s: &str) -> AtUri<DefaultStr> { 34 AtUri::new_owned(s).unwrap() 35} 36 37fn did(s: &str) -> Did<DefaultStr> { 38 Did::new_owned(s).unwrap() 39} 40 41fn rkey(s: &str) -> Rkey<DefaultStr> { 42 Rkey::new_owned(s).unwrap() 43} 44 45fn nsid(s: &'static str) -> Nsid<DefaultStr> { 46 Nsid::new_static(s).unwrap() 47} 48 49fn subj(s: &str) -> SubjectRef { 50 Did::<DefaultStr>::new_owned(s) 51 .map(SubjectRef::Did) 52 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap())) 53} 54 55struct Harness { 56 server: MockServer, 57 edges: Arc<EdgeStore>, 58 coverage: Arc<CoverageWatch>, 59 state: AppState, 60} 61 62static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); 63 64fn next_sort_micros() -> u64 { 65 EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed) 66} 67 68impl Harness { 69 async fn new() -> Self { 70 let server = MockServer::start().await; 71 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default())); 72 let issue_states = Arc::new(StateIndex::new(RuntimeHasher::default())); 73 let pull_statuses = Arc::new(StateIndex::new(RuntimeHasher::default())); 74 let coverage = Arc::new(CoverageWatch::new()); 75 let state = AppState::new( 76 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 77 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 78 edges.clone(), 79 issue_states.clone(), 80 pull_statuses.clone(), 81 coverage.clone(), 82 Arc::new( 83 KnotProxy::new( 84 KnotProxyConfig::default(), 85 KnotHttpConfig::default(), 86 Arc::new(SystemClock::new()), 87 RuntimeHasher::default(), 88 ) 89 .unwrap(), 90 ), 91 Arc::new( 92 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 93 ) as Arc<dyn SearchReader>, 94 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 95 ); 96 Self { 97 server, 98 edges, 99 coverage, 100 state, 101 } 102 } 103 104 fn add_edge( 105 &self, 106 kind: &Nsid<DefaultStr>, 107 subject: &AtUri<DefaultStr>, 108 source: &AtUri<DefaultStr>, 109 ) { 110 self.edges.add(Edge { 111 kind: kind.clone(), 112 subject: subj(subject.as_ref()), 113 source: source.clone(), 114 sort_micros: next_sort_micros(), 115 }); 116 } 117 118 async fn mount( 119 &self, 120 did: &Did<DefaultStr>, 121 collection: &Nsid<DefaultStr>, 122 rkey: &Rkey<DefaultStr>, 123 value: Value, 124 ) { 125 let uri = format!( 126 "at://{}/{}/{}", 127 did.as_ref(), 128 collection.as_ref(), 129 rkey.as_ref() 130 ); 131 let body = json!({ "uri": uri, "cid": CID, "value": value }); 132 Mock::given(method("GET")) 133 .and(path("/xrpc/com.atproto.repo.getRecord")) 134 .and(query_param("repo", did.as_ref())) 135 .and(query_param("collection", collection.as_ref())) 136 .and(query_param("rkey", rkey.as_ref())) 137 .respond_with(ResponseTemplate::new(200).set_body_json(body)) 138 .mount(&self.server) 139 .await; 140 } 141 142 fn promote_ready(&self, events: u64, cursor: u64) { 143 self.coverage.update(|_| Coverage::Ready { 144 events_processed: events, 145 last_cursor: HydrantCursor::new(cursor), 146 }); 147 } 148 149 fn warming(&self, events: u64, cursor: u64) { 150 self.coverage.update(|_| Coverage::Warming { 151 events_processed: events, 152 last_cursor: HydrantCursor::new(cursor), 153 }); 154 } 155} 156 157fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> { 158 let mut qs = format!("subject={subject}"); 159 extras.iter().for_each(|(k, v)| { 160 qs.push('&'); 161 qs.push_str(k); 162 qs.push('='); 163 qs.push_str(&encode(v)); 164 }); 165 Request::builder() 166 .uri(format!("/xrpc/{endpoint}?{qs}")) 167 .body(Body::empty()) 168 .unwrap() 169} 170 171fn encode(s: &str) -> String { 172 byte_serialize(s.as_bytes()).collect() 173} 174 175async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 176 let status = resp.status(); 177 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 178 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 179 (status, parsed) 180} 181 182fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value { 183 json!({ 184 "$type": "sh.tangled.repo.issue", 185 "repo": repo_did.as_ref(), 186 "title": title, 187 "createdAt": "2026-05-01T00:00:00Z" 188 }) 189} 190 191fn pull_body(repo_did: &Did<DefaultStr>, title: &str) -> Value { 192 json!({ 193 "$type": "sh.tangled.repo.pull", 194 "title": title, 195 "createdAt": "2026-05-01T00:00:00Z", 196 "rounds": [], 197 "target": { 198 "repo": repo_did.as_ref(), 199 "branch": "main" 200 } 201 }) 202} 203 204fn star_body(subject_did: &Did<DefaultStr>) -> Value { 205 json!({ 206 "$type": "sh.tangled.feed.star", 207 "createdAt": "2026-05-01T00:00:00Z", 208 "subject": { 209 "$type": "sh.tangled.feed.star#repo", 210 "did": subject_did.as_ref() 211 } 212 }) 213} 214 215fn follow_body(subject_did: &Did<DefaultStr>) -> Value { 216 json!({ 217 "$type": "sh.tangled.graph.follow", 218 "createdAt": "2026-05-01T00:00:00Z", 219 "subject": subject_did.as_ref() 220 }) 221} 222 223#[tokio::test] 224async fn list_issues_with_no_edges_returns_empty_items() { 225 let h = Harness::new().await; 226 let app = router(h.state.clone()); 227 let resp = app 228 .oneshot(list_request( 229 "sh.tangled.repo.listIssues", 230 "at://did:plc:abalone", 231 &[], 232 )) 233 .await 234 .unwrap(); 235 let (status, body) = json_response(resp).await; 236 assert_eq!(status, StatusCode::OK); 237 assert_eq!(body["items"], json!([])); 238 assert!(body["cursor"].is_null()); 239} 240 241#[tokio::test] 242async fn count_issues_with_no_edges_returns_zero() { 243 let h = Harness::new().await; 244 let app = router(h.state.clone()); 245 let resp = app 246 .oneshot(list_request( 247 "sh.tangled.repo.countIssues", 248 "at://did:plc:abalone", 249 &[], 250 )) 251 .await 252 .unwrap(); 253 let (status, body) = json_response(resp).await; 254 assert_eq!(status, StatusCode::OK); 255 assert_eq!(body["count"], json!(0)); 256 assert_eq!(body["distinctAuthors"], json!(0)); 257} 258 259#[tokio::test] 260async fn list_issues_hydrates_via_slingshot_when_edges_present() { 261 let h = Harness::new().await; 262 let repo = did("did:plc:abalone"); 263 let subject = at(&format!("at://{}", repo.as_ref())); 264 let owners = [ 265 ("did:plc:nel", "i1", "first"), 266 ("did:plc:olaren", "i2", "second"), 267 ]; 268 stream::iter(owners) 269 .for_each(|(d, r, title)| { 270 let h = &h; 271 let subject = subject.clone(); 272 let repo = repo.clone(); 273 async move { 274 let d_did = did(d); 275 let rk = rkey(r); 276 h.add_edge( 277 &nsid("sh.tangled.repo.issue"), 278 &subject, 279 &at(&format!( 280 "at://{}/sh.tangled.repo.issue/{}", 281 d_did.as_ref(), 282 rk.as_ref() 283 )), 284 ); 285 h.mount( 286 &d_did, 287 &nsid("sh.tangled.repo.issue"), 288 &rk, 289 issue_body(&repo, title), 290 ) 291 .await; 292 } 293 }) 294 .await; 295 296 let app = router(h.state.clone()); 297 let resp = app 298 .oneshot(list_request( 299 "sh.tangled.repo.listIssues", 300 subject.as_ref(), 301 &[], 302 )) 303 .await 304 .unwrap(); 305 let (status, body) = json_response(resp).await; 306 assert_eq!(status, StatusCode::OK); 307 let items = body["items"].as_array().expect("items array"); 308 assert_eq!(items.len(), 2); 309 let titles: Vec<&str> = items 310 .iter() 311 .map(|v| v["value"]["title"].as_str().unwrap()) 312 .collect(); 313 assert!(titles.contains(&"first")); 314 assert!(titles.contains(&"second")); 315 assert_eq!(items[0]["cid"], CID); 316 assert!(items[0]["uri"].as_str().unwrap().starts_with("at://")); 317} 318 319#[tokio::test] 320async fn count_distinct_authors_dedupes_per_author() { 321 let h = Harness::new().await; 322 let subject = at("at://did:plc:abalone"); 323 h.add_edge( 324 &nsid("sh.tangled.feed.star"), 325 &subject, 326 &at("at://did:plc:nel/sh.tangled.feed.star/s1"), 327 ); 328 h.add_edge( 329 &nsid("sh.tangled.feed.star"), 330 &subject, 331 &at("at://did:plc:nel/sh.tangled.feed.star/s2"), 332 ); 333 h.add_edge( 334 &nsid("sh.tangled.feed.star"), 335 &subject, 336 &at("at://did:plc:olaren/sh.tangled.feed.star/s3"), 337 ); 338 339 let app = router(h.state.clone()); 340 let resp = app 341 .oneshot(list_request( 342 "sh.tangled.feed.countStars", 343 subject.as_ref(), 344 &[], 345 )) 346 .await 347 .unwrap(); 348 let (_, body) = json_response(resp).await; 349 assert_eq!(body["count"], json!(3)); 350 assert_eq!(body["distinctAuthors"], json!(2)); 351} 352 353#[tokio::test] 354async fn list_items_stable_across_coverage_promotion() { 355 let h = Harness::new().await; 356 let subject = at("at://did:plc:abalone"); 357 let nel = did("did:plc:nel"); 358 h.add_edge( 359 &nsid("sh.tangled.feed.star"), 360 &subject, 361 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())), 362 ); 363 h.mount( 364 &nel, 365 &nsid("sh.tangled.feed.star"), 366 &rkey("s1"), 367 star_body(&did("did:plc:abalone")), 368 ) 369 .await; 370 371 let app = router(h.state.clone()); 372 h.warming(1, 5); 373 let (_, before) = json_response( 374 app.clone() 375 .oneshot(list_request( 376 "sh.tangled.feed.listStars", 377 subject.as_ref(), 378 &[], 379 )) 380 .await 381 .unwrap(), 382 ) 383 .await; 384 assert_eq!(before["items"].as_array().unwrap().len(), 1); 385 386 h.promote_ready(2, 9); 387 let (_, after) = json_response( 388 app.oneshot(list_request( 389 "sh.tangled.feed.listStars", 390 subject.as_ref(), 391 &[], 392 )) 393 .await 394 .unwrap(), 395 ) 396 .await; 397 assert_eq!( 398 after["items"].as_array().unwrap().len(), 399 before["items"].as_array().unwrap().len(), 400 ); 401 assert_eq!(after["items"], before["items"]); 402} 403 404#[tokio::test] 405async fn list_paginates_via_cursor() { 406 let h = Harness::new().await; 407 let subject = at("at://did:plc:abalone"); 408 let repo = did("did:plc:abalone"); 409 let owners = [ 410 ("did:plc:nel", "i1"), 411 ("did:plc:olaren", "i2"), 412 ("did:plc:teq", "i3"), 413 ("did:plc:lyna", "i4"), 414 ("did:plc:bailey", "i5"), 415 ]; 416 stream::iter(owners) 417 .for_each(|(d, r)| { 418 let h = &h; 419 let subject = subject.clone(); 420 let repo = repo.clone(); 421 async move { 422 let d_did = did(d); 423 let rk = rkey(r); 424 h.add_edge( 425 &nsid("sh.tangled.repo.issue"), 426 &subject, 427 &at(&format!( 428 "at://{}/sh.tangled.repo.issue/{}", 429 d_did.as_ref(), 430 rk.as_ref() 431 )), 432 ); 433 h.mount( 434 &d_did, 435 &nsid("sh.tangled.repo.issue"), 436 &rk, 437 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 438 ) 439 .await; 440 } 441 }) 442 .await; 443 444 let app = router(h.state.clone()); 445 let (_, page1) = json_response( 446 app.clone() 447 .oneshot(list_request( 448 "sh.tangled.repo.listIssues", 449 subject.as_ref(), 450 &[("limit", "2")], 451 )) 452 .await 453 .unwrap(), 454 ) 455 .await; 456 let page1_items = page1["items"].as_array().unwrap().clone(); 457 assert_eq!(page1_items.len(), 2); 458 let cursor = page1["cursor"] 459 .as_str() 460 .expect("first page must yield a cursor") 461 .to_owned(); 462 assert!( 463 PageToken::decode_token(&cursor).is_ok(), 464 "cursor must be a TID-shaped token" 465 ); 466 467 let (_, page2) = json_response( 468 app.oneshot(list_request( 469 "sh.tangled.repo.listIssues", 470 subject.as_ref(), 471 &[("limit", "10"), ("cursor", &cursor)], 472 )) 473 .await 474 .unwrap(), 475 ) 476 .await; 477 let page2_items = page2["items"].as_array().unwrap().clone(); 478 assert_eq!(page2_items.len(), 3); 479 assert!(page2["cursor"].is_null(), "tail page must not promise more"); 480 481 let union: Vec<&str> = page1_items 482 .iter() 483 .chain(page2_items.iter()) 484 .map(|item| item["uri"].as_str().unwrap()) 485 .collect(); 486 assert_eq!(union.len(), owners.len(), "union covers every owner"); 487 let mut sorted = union.clone(); 488 sorted.sort(); 489 sorted.dedup(); 490 assert_eq!(sorted.len(), owners.len(), "no duplicates across pages"); 491} 492 493#[tokio::test] 494async fn pagination_unaffected_by_coverage_promotion() { 495 let h = Harness::new().await; 496 let subject = at("at://did:plc:abalone"); 497 let repo = did("did:plc:abalone"); 498 let owners = [("did:plc:nel", "i1"), ("did:plc:olaren", "i2")]; 499 stream::iter(owners) 500 .for_each(|(d, r)| { 501 let h = &h; 502 let subject = subject.clone(); 503 let repo = repo.clone(); 504 async move { 505 let d_did = did(d); 506 let rk = rkey(r); 507 h.add_edge( 508 &nsid("sh.tangled.repo.issue"), 509 &subject, 510 &at(&format!( 511 "at://{}/sh.tangled.repo.issue/{}", 512 d_did.as_ref(), 513 rk.as_ref() 514 )), 515 ); 516 h.mount( 517 &d_did, 518 &nsid("sh.tangled.repo.issue"), 519 &rk, 520 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 521 ) 522 .await; 523 } 524 }) 525 .await; 526 527 h.warming(1, 5); 528 let app = router(h.state.clone()); 529 let (_, page1) = json_response( 530 app.clone() 531 .oneshot(list_request( 532 "sh.tangled.repo.listIssues", 533 subject.as_ref(), 534 &[("limit", "1")], 535 )) 536 .await 537 .unwrap(), 538 ) 539 .await; 540 let cursor = page1["cursor"].as_str().unwrap().to_owned(); 541 542 h.promote_ready(2, 9); 543 let (_, page2) = json_response( 544 app.oneshot(list_request( 545 "sh.tangled.repo.listIssues", 546 subject.as_ref(), 547 &[("limit", "10"), ("cursor", &cursor)], 548 )) 549 .await 550 .unwrap(), 551 ) 552 .await; 553 assert_eq!(page2["items"].as_array().unwrap().len(), 1); 554} 555 556#[tokio::test] 557async fn invalid_cursor_returns_400() { 558 let h = Harness::new().await; 559 let app = router(h.state.clone()); 560 let resp = app 561 .oneshot(list_request( 562 "sh.tangled.repo.listIssues", 563 "at://did:plc:abalone", 564 &[("cursor", "not-a-number")], 565 )) 566 .await 567 .unwrap(); 568 let (status, body) = json_response(resp).await; 569 assert_eq!(status, StatusCode::BAD_REQUEST); 570 assert_eq!(body["error"], "InvalidRequest"); 571} 572 573#[tokio::test] 574async fn list_follows_subject_is_followee_did() { 575 let h = Harness::new().await; 576 let followee = did("did:plc:bailey"); 577 let subject = at(&format!("at://{}", followee.as_ref())); 578 h.add_edge( 579 &nsid("sh.tangled.graph.follow"), 580 &subject, 581 &at("at://did:plc:nel/sh.tangled.graph.follow/f1"), 582 ); 583 h.mount( 584 &did("did:plc:nel"), 585 &nsid("sh.tangled.graph.follow"), 586 &rkey("f1"), 587 follow_body(&followee), 588 ) 589 .await; 590 591 let app = router(h.state.clone()); 592 let (status, body) = json_response( 593 app.oneshot(list_request( 594 "sh.tangled.graph.listFollows", 595 subject.as_ref(), 596 &[], 597 )) 598 .await 599 .unwrap(), 600 ) 601 .await; 602 assert_eq!(status, StatusCode::OK); 603 let items = body["items"].as_array().unwrap(); 604 assert_eq!(items.len(), 1); 605 assert_eq!(items[0]["value"]["subject"], followee.as_ref()); 606} 607 608#[tokio::test] 609async fn upstream_failure_during_hydration_drops_only_that_item() { 610 let h = Harness::new().await; 611 let subject = at("at://did:plc:squid"); 612 let kind = nsid("sh.tangled.repo.issue"); 613 let repo = did("did:plc:squid"); 614 h.add_edge( 615 &kind, 616 &subject, 617 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"), 618 ); 619 h.add_edge( 620 &kind, 621 &subject, 622 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"), 623 ); 624 h.mount( 625 &did("did:plc:nel"), 626 &kind, 627 &rkey("ok"), 628 issue_body(&repo, "kelp survey"), 629 ) 630 .await; 631 Mock::given(method("GET")) 632 .and(path("/xrpc/com.atproto.repo.getRecord")) 633 .and(query_param("repo", "did:plc:teq")) 634 .and(query_param("collection", "sh.tangled.repo.issue")) 635 .and(query_param("rkey", "flaky")) 636 .respond_with(ResponseTemplate::new(503)) 637 .mount(&h.server) 638 .await; 639 640 let app = router(h.state.clone()); 641 let (status, body) = json_response( 642 app.oneshot(list_request( 643 "sh.tangled.repo.listIssues", 644 subject.as_ref(), 645 &[], 646 )) 647 .await 648 .unwrap(), 649 ) 650 .await; 651 assert_eq!(status, StatusCode::OK); 652 let items = body["items"].as_array().expect("items array"); 653 assert_eq!(items.len(), 1, "flaky item dropped, healthy sibling kept"); 654 assert_eq!( 655 items[0]["uri"].as_str().unwrap(), 656 "at://did:plc:nel/sh.tangled.repo.issue/ok", 657 ); 658} 659 660#[tokio::test] 661async fn transient_failure_keeps_edge_so_count_stays_whole() { 662 let h = Harness::new().await; 663 let subject = at("at://did:plc:squid"); 664 let kind = nsid("sh.tangled.repo.issue"); 665 let repo = did("did:plc:squid"); 666 h.add_edge( 667 &kind, 668 &subject, 669 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"), 670 ); 671 h.add_edge( 672 &kind, 673 &subject, 674 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"), 675 ); 676 h.mount( 677 &did("did:plc:nel"), 678 &kind, 679 &rkey("ok"), 680 issue_body(&repo, "kelp survey"), 681 ) 682 .await; 683 Mock::given(method("GET")) 684 .and(path("/xrpc/com.atproto.repo.getRecord")) 685 .and(query_param("repo", "did:plc:teq")) 686 .and(query_param("collection", "sh.tangled.repo.issue")) 687 .and(query_param("rkey", "flaky")) 688 .respond_with(ResponseTemplate::new(503)) 689 .mount(&h.server) 690 .await; 691 692 let app = router(h.state.clone()); 693 let (status, body) = json_response( 694 app.clone() 695 .oneshot(list_request( 696 "sh.tangled.repo.listIssues", 697 subject.as_ref(), 698 &[], 699 )) 700 .await 701 .unwrap(), 702 ) 703 .await; 704 assert_eq!(status, StatusCode::OK); 705 assert_eq!(body["items"].as_array().unwrap().len(), 1); 706 707 let (cstatus, cbody) = json_response( 708 app.oneshot(list_request( 709 "sh.tangled.repo.countIssues", 710 subject.as_ref(), 711 &[], 712 )) 713 .await 714 .unwrap(), 715 ) 716 .await; 717 assert_eq!(cstatus, StatusCode::OK); 718 assert_eq!( 719 cbody["count"], 720 json!(2), 721 "a transient 503 must not evict the edge, count stays whole", 722 ); 723} 724 725#[tokio::test] 726async fn gone_item_is_evicted_so_count_converges_to_list() { 727 let h = Harness::new().await; 728 let subject = at("at://did:plc:squid"); 729 let kind = nsid("sh.tangled.repo.issue"); 730 let repo = did("did:plc:squid"); 731 h.add_edge( 732 &kind, 733 &subject, 734 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"), 735 ); 736 h.add_edge( 737 &kind, 738 &subject, 739 &at("at://did:plc:teq/sh.tangled.repo.issue/gone"), 740 ); 741 h.mount( 742 &did("did:plc:nel"), 743 &kind, 744 &rkey("ok"), 745 issue_body(&repo, "kelp survey"), 746 ) 747 .await; 748 Mock::given(method("GET")) 749 .and(path("/xrpc/com.atproto.repo.getRecord")) 750 .and(query_param("repo", "did:plc:teq")) 751 .and(query_param("collection", "sh.tangled.repo.issue")) 752 .and(query_param("rkey", "gone")) 753 .respond_with(ResponseTemplate::new(404)) 754 .mount(&h.server) 755 .await; 756 757 let app = router(h.state.clone()); 758 let (status, body) = json_response( 759 app.clone() 760 .oneshot(list_request( 761 "sh.tangled.repo.listIssues", 762 subject.as_ref(), 763 &[], 764 )) 765 .await 766 .unwrap(), 767 ) 768 .await; 769 assert_eq!(status, StatusCode::OK); 770 assert_eq!( 771 body["items"].as_array().unwrap().len(), 772 1, 773 "gone item dropped from the page" 774 ); 775 776 let (cstatus, cbody) = json_response( 777 app.oneshot(list_request( 778 "sh.tangled.repo.countIssues", 779 subject.as_ref(), 780 &[], 781 )) 782 .await 783 .unwrap(), 784 ) 785 .await; 786 assert_eq!(cstatus, StatusCode::OK); 787 assert_eq!( 788 cbody["count"], 789 json!(1), 790 "a definitive 404 must evict the dead edge so count matches the list", 791 ); 792} 793 794#[tokio::test] 795async fn handle_authority_subject_is_400() { 796 let h = Harness::new().await; 797 let app = router(h.state.clone()); 798 let cases = [ 799 "sh.tangled.feed.listStars", 800 "sh.tangled.feed.countStars", 801 "sh.tangled.graph.listFollows", 802 "sh.tangled.graph.countFollows", 803 "sh.tangled.repo.listIssues", 804 "sh.tangled.repo.countIssues", 805 "sh.tangled.repo.listPulls", 806 "sh.tangled.repo.countPulls", 807 "sh.tangled.feed.listComments", 808 "sh.tangled.feed.countComments", 809 ]; 810 stream::iter(cases) 811 .for_each(|endpoint| { 812 let app = app.clone(); 813 async move { 814 let resp = app 815 .oneshot(list_request(endpoint, "at://oyster.cafe", &[])) 816 .await 817 .unwrap(); 818 let (status, body) = json_response(resp).await; 819 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}"); 820 assert_eq!(body["error"], "InvalidRequest", "{endpoint}"); 821 assert!( 822 body["message"] 823 .as_str() 824 .unwrap_or_default() 825 .contains("did, not a handle"), 826 "{endpoint}: {}", 827 body["message"] 828 ); 829 } 830 }) 831 .await; 832} 833 834#[tokio::test] 835async fn empty_subject_is_400() { 836 let h = Harness::new().await; 837 let app = router(h.state.clone()); 838 let resp = app 839 .oneshot(list_request("sh.tangled.repo.listIssues", "", &[])) 840 .await 841 .unwrap(); 842 let (status, body) = json_response(resp).await; 843 assert_eq!(status, StatusCode::BAD_REQUEST); 844 assert_eq!(body["error"], "InvalidRequest"); 845} 846 847#[tokio::test] 848async fn limit_below_min_or_above_max_is_400() { 849 let h = Harness::new().await; 850 let app = router(h.state.clone()); 851 let cases = [("0", "below"), ("1001", "above")]; 852 stream::iter(cases) 853 .for_each(|(limit, label)| { 854 let app = app.clone(); 855 async move { 856 let resp = app 857 .oneshot(list_request( 858 "sh.tangled.repo.listIssues", 859 "at://did:plc:abalone", 860 &[("limit", limit)], 861 )) 862 .await 863 .unwrap(); 864 let (status, body) = json_response(resp).await; 865 assert_eq!(status, StatusCode::BAD_REQUEST, "limit {label}"); 866 assert_eq!(body["error"], "InvalidRequest", "limit {label}"); 867 } 868 }) 869 .await; 870} 871 872#[tokio::test] 873async fn count_after_remove_source_returns_zero() { 874 let h = Harness::new().await; 875 let subject = at("at://did:plc:abalone"); 876 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1"); 877 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source); 878 h.edges.remove_source(&source); 879 880 let app = router(h.state.clone()); 881 let (_, body) = json_response( 882 app.oneshot(list_request( 883 "sh.tangled.feed.countStars", 884 subject.as_ref(), 885 &[], 886 )) 887 .await 888 .unwrap(), 889 ) 890 .await; 891 assert_eq!(body["count"], json!(0)); 892 assert_eq!(body["distinctAuthors"], json!(0)); 893} 894 895#[tokio::test] 896async fn list_feed_comments_hydrates_end_to_end() { 897 let h = Harness::new().await; 898 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1"); 899 let nel = did("did:plc:nel"); 900 let rk = rkey("c1"); 901 h.add_edge( 902 &nsid("sh.tangled.feed.comment"), 903 &issue_uri, 904 &at(&format!( 905 "at://{}/sh.tangled.feed.comment/{}", 906 nel.as_ref(), 907 rk.as_ref() 908 )), 909 ); 910 h.mount( 911 &nel, 912 &nsid("sh.tangled.feed.comment"), 913 &rk, 914 json!({ 915 "$type": "sh.tangled.feed.comment", 916 "subject": { "uri": issue_uri.as_ref(), "cid": "bafkqaaa" }, 917 "body": { "$type": "sh.tangled.markup.markdown", "text": "thoughts" }, 918 "createdAt": "2026-05-01T00:00:00Z" 919 }), 920 ) 921 .await; 922 923 let app = router(h.state.clone()); 924 let (status, body) = json_response( 925 app.oneshot(list_request( 926 "sh.tangled.feed.listComments", 927 issue_uri.as_ref(), 928 &[], 929 )) 930 .await 931 .unwrap(), 932 ) 933 .await; 934 assert_eq!(status, StatusCode::OK); 935 let items = body["items"].as_array().unwrap(); 936 assert_eq!(items.len(), 1); 937 assert_eq!(items[0]["value"]["body"]["text"], json!("thoughts")); 938 assert_eq!( 939 items[0]["value"]["subject"]["uri"], 940 json!(issue_uri.as_ref()) 941 ); 942} 943 944#[tokio::test] 945async fn list_item_cid_is_present() { 946 let h = Harness::new().await; 947 let subject = at("at://did:plc:abalone"); 948 let nel = did("did:plc:nel"); 949 h.add_edge( 950 &nsid("sh.tangled.feed.star"), 951 &subject, 952 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())), 953 ); 954 h.mount( 955 &nel, 956 &nsid("sh.tangled.feed.star"), 957 &rkey("s1"), 958 star_body(&did("did:plc:abalone")), 959 ) 960 .await; 961 962 let app = router(h.state.clone()); 963 let (_, body) = json_response( 964 app.oneshot(list_request( 965 "sh.tangled.feed.listStars", 966 subject.as_ref(), 967 &[], 968 )) 969 .await 970 .unwrap(), 971 ) 972 .await; 973 let item = &body["items"][0]; 974 assert!( 975 item.as_object().unwrap().contains_key("cid"), 976 "list items must mirror getRecord output shape and include cid" 977 ); 978 assert_eq!(item["cid"], json!(CID)); 979} 980 981#[tokio::test] 982async fn count_feed_comments_subjects_on_issue_uri() { 983 let h = Harness::new().await; 984 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1"); 985 h.add_edge( 986 &nsid("sh.tangled.feed.comment"), 987 &issue_uri, 988 &at("at://did:plc:nel/sh.tangled.feed.comment/c1"), 989 ); 990 h.add_edge( 991 &nsid("sh.tangled.feed.comment"), 992 &issue_uri, 993 &at("at://did:plc:olaren/sh.tangled.feed.comment/c2"), 994 ); 995 996 let app = router(h.state.clone()); 997 let (status, body) = json_response( 998 app.oneshot(list_request( 999 "sh.tangled.feed.countComments", 1000 issue_uri.as_ref(), 1001 &[], 1002 )) 1003 .await 1004 .unwrap(), 1005 ) 1006 .await; 1007 assert_eq!(status, StatusCode::OK); 1008 assert_eq!(body["count"], json!(2)); 1009 assert_eq!(body["distinctAuthors"], json!(2)); 1010} 1011 1012#[tokio::test] 1013async fn list_item_404_dropped_not_404_for_subject() { 1014 let h = Harness::new().await; 1015 let subject = at("at://did:plc:squid"); 1016 let kind = nsid("sh.tangled.repo.issue"); 1017 let repo = did("did:plc:squid"); 1018 h.add_edge( 1019 &kind, 1020 &subject, 1021 &at("at://did:plc:nel/sh.tangled.repo.issue/live"), 1022 ); 1023 h.add_edge( 1024 &kind, 1025 &subject, 1026 &at("at://did:plc:teq/sh.tangled.repo.issue/missing"), 1027 ); 1028 h.mount( 1029 &did("did:plc:nel"), 1030 &kind, 1031 &rkey("live"), 1032 issue_body(&repo, "kelp survives"), 1033 ) 1034 .await; 1035 Mock::given(method("GET")) 1036 .and(path("/xrpc/com.atproto.repo.getRecord")) 1037 .and(query_param("repo", "did:plc:teq")) 1038 .and(query_param("collection", "sh.tangled.repo.issue")) 1039 .and(query_param("rkey", "missing")) 1040 .respond_with(ResponseTemplate::new(404).set_body_json(json!({ 1041 "error": "RecordNotFound", 1042 "message": "could not find record" 1043 }))) 1044 .mount(&h.server) 1045 .await; 1046 1047 let app = router(h.state.clone()); 1048 let (status, body) = json_response( 1049 app.oneshot(list_request( 1050 "sh.tangled.repo.listIssues", 1051 subject.as_ref(), 1052 &[], 1053 )) 1054 .await 1055 .unwrap(), 1056 ) 1057 .await; 1058 assert_eq!( 1059 status, 1060 StatusCode::OK, 1061 "a stale-index 404 drops that item, it must not 404 or 502 the subject's list", 1062 ); 1063 let items = body["items"].as_array().expect("items array"); 1064 assert_eq!(items.len(), 1, "stale 404 item dropped, live sibling kept"); 1065 assert_eq!( 1066 items[0]["uri"].as_str().unwrap(), 1067 "at://did:plc:nel/sh.tangled.repo.issue/live", 1068 ); 1069} 1070 1071#[tokio::test] 1072async fn list_item_with_wrong_type_tag_dropped() { 1073 let h = Harness::new().await; 1074 let subject = at("at://did:plc:squid"); 1075 let kind = nsid("sh.tangled.feed.star"); 1076 h.add_edge( 1077 &kind, 1078 &subject, 1079 &at("at://did:plc:nel/sh.tangled.feed.star/good"), 1080 ); 1081 h.add_edge( 1082 &kind, 1083 &subject, 1084 &at("at://did:plc:teq/sh.tangled.feed.star/wrong"), 1085 ); 1086 h.mount( 1087 &did("did:plc:nel"), 1088 &kind, 1089 &rkey("good"), 1090 star_body(&did("did:plc:squid")), 1091 ) 1092 .await; 1093 Mock::given(method("GET")) 1094 .and(path("/xrpc/com.atproto.repo.getRecord")) 1095 .and(query_param("repo", "did:plc:teq")) 1096 .and(query_param("collection", "sh.tangled.feed.star")) 1097 .and(query_param("rkey", "wrong")) 1098 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 1099 "uri": "at://did:plc:teq/sh.tangled.feed.star/wrong", 1100 "cid": CID, 1101 "value": { 1102 "$type": "sh.tangled.feed.reaction", 1103 "createdAt": "2026-05-01T00:00:00Z", 1104 "subject": "at://did:plc:squid" 1105 } 1106 }))) 1107 .mount(&h.server) 1108 .await; 1109 1110 let app = router(h.state.clone()); 1111 let (status, body) = json_response( 1112 app.oneshot(list_request( 1113 "sh.tangled.feed.listStars", 1114 subject.as_ref(), 1115 &[], 1116 )) 1117 .await 1118 .unwrap(), 1119 ) 1120 .await; 1121 assert_eq!(status, StatusCode::OK); 1122 let items = body["items"].as_array().expect("items array"); 1123 assert_eq!(items.len(), 1, "wrong-type item dropped, valid star kept"); 1124 assert_eq!( 1125 items[0]["uri"].as_str().unwrap(), 1126 "at://did:plc:nel/sh.tangled.feed.star/good", 1127 ); 1128} 1129 1130#[tokio::test] 1131async fn list_item_with_mismatched_collection_dropped() { 1132 let h = Harness::new().await; 1133 let subject = at("at://did:plc:squid"); 1134 let kind = nsid("sh.tangled.repo.issue"); 1135 let repo = did("did:plc:squid"); 1136 h.add_edge( 1137 &kind, 1138 &subject, 1139 &at("at://did:plc:nel/sh.tangled.repo.issue/live"), 1140 ); 1141 h.add_edge( 1142 &kind, 1143 &subject, 1144 &at("at://did:plc:teq/sh.tangled.feed.star/whelk"), 1145 ); 1146 h.mount( 1147 &did("did:plc:nel"), 1148 &kind, 1149 &rkey("live"), 1150 issue_body(&repo, "kelp survives"), 1151 ) 1152 .await; 1153 1154 let app = router(h.state.clone()); 1155 let (status, body) = json_response( 1156 app.oneshot(list_request( 1157 "sh.tangled.repo.listIssues", 1158 subject.as_ref(), 1159 &[], 1160 )) 1161 .await 1162 .unwrap(), 1163 ) 1164 .await; 1165 assert_eq!( 1166 status, 1167 StatusCode::OK, 1168 "a mismatched-collection index edge must not 400 the subject's list", 1169 ); 1170 let items = body["items"].as_array().expect("items array"); 1171 assert_eq!( 1172 items.len(), 1173 1, 1174 "mismatched-collection edge dropped, live sibling kept" 1175 ); 1176 assert_eq!( 1177 items[0]["uri"].as_str().unwrap(), 1178 "at://did:plc:nel/sh.tangled.repo.issue/live", 1179 ); 1180} 1181 1182#[tokio::test] 1183async fn bare_did_endpoints_reject_at_uri_subject() { 1184 let h = Harness::new().await; 1185 let app = router(h.state.clone()); 1186 let cases = [ 1187 "sh.tangled.graph.listFollows", 1188 "sh.tangled.graph.countFollows", 1189 ]; 1190 stream::iter(cases) 1191 .for_each(|endpoint| { 1192 let app = app.clone(); 1193 async move { 1194 let resp = app 1195 .oneshot(list_request( 1196 endpoint, 1197 "at://did:plc:abalone/sh.tangled.repo/r1", 1198 &[], 1199 )) 1200 .await 1201 .unwrap(); 1202 let (status, body) = json_response(resp).await; 1203 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}"); 1204 assert_eq!(body["error"], "InvalidRequest", "{endpoint}"); 1205 assert!( 1206 body["message"] 1207 .as_str() 1208 .unwrap_or_default() 1209 .contains("bare did"), 1210 "{endpoint}: {}", 1211 body["message"], 1212 ); 1213 } 1214 }) 1215 .await; 1216} 1217 1218#[tokio::test] 1219async fn repo_pointing_endpoints_reject_at_uri_subject() { 1220 let h = Harness::new().await; 1221 let app = router(h.state.clone()); 1222 let cases = [ 1223 "sh.tangled.repo.listIssues", 1224 "sh.tangled.repo.countIssues", 1225 "sh.tangled.repo.listPulls", 1226 "sh.tangled.repo.countPulls", 1227 "sh.tangled.repo.listArtifacts", 1228 "sh.tangled.repo.countArtifacts", 1229 ]; 1230 stream::iter(cases) 1231 .for_each(|endpoint| { 1232 let app = app.clone(); 1233 async move { 1234 let resp = app 1235 .oneshot(list_request( 1236 endpoint, 1237 "at://did:plc:abalone/sh.tangled.repo/r1", 1238 &[], 1239 )) 1240 .await 1241 .unwrap(); 1242 let (status, body) = json_response(resp).await; 1243 assert_eq!( 1244 status, 1245 StatusCode::BAD_REQUEST, 1246 "{endpoint} must reject rkey-form subjects since rkeys are unstable; clients must send the repoDID", 1247 ); 1248 assert!( 1249 body["message"] 1250 .as_str() 1251 .unwrap_or_default() 1252 .contains("bare did"), 1253 "{endpoint}: {}", 1254 body["message"], 1255 ); 1256 } 1257 }) 1258 .await; 1259} 1260 1261#[tokio::test] 1262async fn repo_pointing_endpoints_accept_bare_did() { 1263 let h = Harness::new().await; 1264 let app = router(h.state.clone()); 1265 let cases = [ 1266 "sh.tangled.repo.listIssues", 1267 "sh.tangled.repo.countIssues", 1268 "sh.tangled.repo.listPulls", 1269 "sh.tangled.repo.countPulls", 1270 "sh.tangled.repo.listArtifacts", 1271 "sh.tangled.repo.countArtifacts", 1272 ]; 1273 stream::iter(cases) 1274 .for_each(|endpoint| { 1275 let app = app.clone(); 1276 async move { 1277 let resp = app 1278 .oneshot(list_request(endpoint, "did:plc:abalone", &[])) 1279 .await 1280 .unwrap(); 1281 let (status, _body) = json_response(resp).await; 1282 assert_eq!(status, StatusCode::OK, "{endpoint} must accept bare did"); 1283 } 1284 }) 1285 .await; 1286} 1287 1288#[tokio::test] 1289async fn feed_comment_endpoints_reject_bare_did_or_wrong_collection() { 1290 let h = Harness::new().await; 1291 let app = router(h.state.clone()); 1292 let endpoints = [ 1293 "sh.tangled.feed.listComments", 1294 "sh.tangled.feed.countComments", 1295 ]; 1296 let inputs = [ 1297 "at://did:plc:abalone", 1298 "at://did:plc:abalone/sh.tangled.repo/r1", 1299 ]; 1300 let cases = endpoints 1301 .iter() 1302 .copied() 1303 .flat_map(|endpoint| inputs.iter().copied().map(move |input| (endpoint, input))); 1304 stream::iter(cases) 1305 .for_each(|(endpoint, input)| { 1306 let app = app.clone(); 1307 async move { 1308 let resp = app 1309 .oneshot(list_request(endpoint, input, &[])) 1310 .await 1311 .unwrap(); 1312 let (status, body) = json_response(resp).await; 1313 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint} input={input}"); 1314 let msg = body["message"].as_str().unwrap_or_default(); 1315 assert!( 1316 msg.contains("sh.tangled.repo.issue") 1317 && msg.contains("sh.tangled.repo.pull") 1318 && msg.contains("sh.tangled.string"), 1319 "{endpoint} input={input}: {msg}", 1320 ); 1321 } 1322 }) 1323 .await; 1324} 1325 1326#[tokio::test] 1327async fn star_endpoints_reject_unrelated_collection() { 1328 let h = Harness::new().await; 1329 let app = router(h.state.clone()); 1330 let endpoints = ["sh.tangled.feed.listStars", "sh.tangled.feed.countStars"]; 1331 stream::iter(endpoints) 1332 .for_each(|endpoint| { 1333 let app = app.clone(); 1334 async move { 1335 let resp = app 1336 .oneshot(list_request( 1337 endpoint, 1338 "at://did:plc:abalone/sh.tangled.knot/k1", 1339 &[], 1340 )) 1341 .await 1342 .unwrap(); 1343 let (status, body) = json_response(resp).await; 1344 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}"); 1345 let msg = body["message"].as_str().unwrap_or_default(); 1346 assert!(msg.contains("sh.tangled.string"), "{endpoint}: {msg}",); 1347 } 1348 }) 1349 .await; 1350} 1351 1352#[tokio::test] 1353async fn star_endpoints_reject_repo_uri_subject() { 1354 let h = Harness::new().await; 1355 let app = router(h.state.clone()); 1356 let resp = app 1357 .oneshot(list_request( 1358 "sh.tangled.feed.countStars", 1359 "at://did:plc:abalone/sh.tangled.repo/r1", 1360 &[], 1361 )) 1362 .await 1363 .unwrap(); 1364 let (status, body) = json_response(resp).await; 1365 assert_eq!( 1366 status, 1367 StatusCode::BAD_REQUEST, 1368 "rkey-form repo URI must be rejected; clients must send the repoDID directly", 1369 ); 1370 let msg = body["message"].as_str().unwrap_or_default(); 1371 assert!(msg.contains("sh.tangled.string"), "{msg}"); 1372} 1373 1374#[tokio::test] 1375async fn star_endpoints_accept_string_subject_form() { 1376 let h = Harness::new().await; 1377 let app = router(h.state.clone()); 1378 let resp = app 1379 .oneshot(list_request( 1380 "sh.tangled.feed.countStars", 1381 "at://did:plc:abalone/sh.tangled.string/k1", 1382 &[], 1383 )) 1384 .await 1385 .unwrap(); 1386 let (status, body) = json_response(resp).await; 1387 assert_eq!(status, StatusCode::OK); 1388 assert_eq!(body["count"], json!(0)); 1389} 1390 1391#[tokio::test] 1392async fn list_after_remove_source_returns_empty_items() { 1393 let h = Harness::new().await; 1394 let subject = at("at://did:plc:abalone"); 1395 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1"); 1396 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source); 1397 h.edges.remove_source(&source); 1398 1399 let app = router(h.state.clone()); 1400 let (status, body) = json_response( 1401 app.oneshot(list_request( 1402 "sh.tangled.feed.listStars", 1403 subject.as_ref(), 1404 &[], 1405 )) 1406 .await 1407 .unwrap(), 1408 ) 1409 .await; 1410 assert_eq!(status, StatusCode::OK); 1411 assert_eq!(body["items"], json!([])); 1412 assert!(body["cursor"].is_null()); 1413} 1414 1415#[tokio::test] 1416async fn list_pulls_hydrates_via_slingshot_when_edges_present() { 1417 let h = Harness::new().await; 1418 let target_did = did("did:plc:abalone"); 1419 let subject = at(&format!("at://{}", target_did.as_ref())); 1420 let source_did = did("did:plc:nel"); 1421 let rk = rkey("p1"); 1422 h.add_edge( 1423 &nsid("sh.tangled.repo.pull"), 1424 &subject, 1425 &at(&format!( 1426 "at://{}/sh.tangled.repo.pull/{}", 1427 source_did.as_ref(), 1428 rk.as_ref() 1429 )), 1430 ); 1431 h.mount( 1432 &source_did, 1433 &nsid("sh.tangled.repo.pull"), 1434 &rk, 1435 json!({ 1436 "$type": "sh.tangled.repo.pull", 1437 "title": "ship it", 1438 "createdAt": "2026-05-01T00:00:00Z", 1439 "rounds": [], 1440 "target": {"repo": target_did.as_ref(), "branch": "main"}, 1441 }), 1442 ) 1443 .await; 1444 let app = router(h.state.clone()); 1445 let (status, body) = json_response( 1446 app.oneshot(list_request( 1447 "sh.tangled.repo.listPulls", 1448 subject.as_ref(), 1449 &[], 1450 )) 1451 .await 1452 .unwrap(), 1453 ) 1454 .await; 1455 assert_eq!(status, StatusCode::OK); 1456 let items = body["items"].as_array().unwrap(); 1457 assert_eq!(items.len(), 1); 1458 assert_eq!(items[0]["value"]["title"], json!("ship it")); 1459 assert_eq!( 1460 items[0]["value"]["target"]["repo"], 1461 json!(target_did.as_ref()) 1462 ); 1463} 1464 1465#[tokio::test] 1466async fn count_pulls_returns_distinct_authors() { 1467 let h = Harness::new().await; 1468 let subject = at("at://did:plc:abalone"); 1469 h.add_edge( 1470 &nsid("sh.tangled.repo.pull"), 1471 &subject, 1472 &at("at://did:plc:nel/sh.tangled.repo.pull/p1"), 1473 ); 1474 h.add_edge( 1475 &nsid("sh.tangled.repo.pull"), 1476 &subject, 1477 &at("at://did:plc:olaren/sh.tangled.repo.pull/p2"), 1478 ); 1479 h.add_edge( 1480 &nsid("sh.tangled.repo.pull"), 1481 &subject, 1482 &at("at://did:plc:nel/sh.tangled.repo.pull/p3"), 1483 ); 1484 let app = router(h.state.clone()); 1485 let (_, body) = json_response( 1486 app.oneshot(list_request( 1487 "sh.tangled.repo.countPulls", 1488 subject.as_ref(), 1489 &[], 1490 )) 1491 .await 1492 .unwrap(), 1493 ) 1494 .await; 1495 assert_eq!(body["count"], json!(3)); 1496 assert_eq!(body["distinctAuthors"], json!(2)); 1497} 1498 1499#[tokio::test] 1500async fn extractor_to_xrpc_round_trip_for_star() { 1501 let h = Harness::new().await; 1502 let subject_did = did("did:plc:abalone"); 1503 let source_did = did("did:plc:nel"); 1504 let rk = rkey("s1"); 1505 let source = at(&format!( 1506 "at://{}/sh.tangled.feed.star/{}", 1507 source_did.as_ref(), 1508 rk.as_ref() 1509 )); 1510 let body = star_body(&subject_did); 1511 let parsed = 1512 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.feed.star"), body.clone()) 1513 .expect("parse star record"); 1514 parsed 1515 .extract_edges(&source) 1516 .expect("extract") 1517 .into_iter() 1518 .for_each(|e| h.edges.add(e)); 1519 h.mount(&source_did, &nsid("sh.tangled.feed.star"), &rk, body) 1520 .await; 1521 1522 let app = router(h.state.clone()); 1523 let (status, json) = json_response( 1524 app.oneshot(list_request( 1525 "sh.tangled.feed.listStars", 1526 &format!("at://{}", subject_did.as_ref()), 1527 &[], 1528 )) 1529 .await 1530 .unwrap(), 1531 ) 1532 .await; 1533 assert_eq!( 1534 status, 1535 StatusCode::OK, 1536 "extractor key must match handler subject, body was {json}", 1537 ); 1538 let items = json["items"].as_array().unwrap(); 1539 assert_eq!(items.len(), 1, "expected exactly one star edge"); 1540 assert_eq!( 1541 items[0]["value"]["subject"]["did"], 1542 json!(subject_did.as_ref()) 1543 ); 1544} 1545 1546#[tokio::test] 1547async fn list_issues_includes_state_comment_count_and_state_updated_at() { 1548 let h = Harness::new().await; 1549 let repo = did("did:plc:limpet"); 1550 let subject = at(&format!("at://{}", repo.as_ref())); 1551 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1552 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1553 h.mount( 1554 &did("did:plc:nel"), 1555 &nsid("sh.tangled.repo.issue"), 1556 &rkey("i1"), 1557 issue_body(&repo, "hi"), 1558 ) 1559 .await; 1560 h.add_edge( 1561 &nsid("sh.tangled.feed.comment"), 1562 &issue_uri, 1563 &at("at://did:plc:olaren/sh.tangled.feed.comment/c1"), 1564 ); 1565 h.add_edge( 1566 &nsid("sh.tangled.feed.comment"), 1567 &issue_uri, 1568 &at("at://did:plc:teq/sh.tangled.feed.comment/c2"), 1569 ); 1570 1571 h.state.issue_states.upsert( 1572 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"), 1573 issue_uri.clone(), 1574 1_777_593_600_000_000, 1575 IssueStateKind::Open, 1576 ); 1577 h.state.issue_states.upsert( 1578 at("at://did:plc:nel/sh.tangled.repo.issue.state/s2"), 1579 issue_uri.clone(), 1580 1_777_593_700_000_000, 1581 IssueStateKind::Closed, 1582 ); 1583 1584 let app = router(h.state.clone()); 1585 let resp = app 1586 .oneshot(list_request( 1587 "sh.tangled.repo.listIssues", 1588 subject.as_ref(), 1589 &[], 1590 )) 1591 .await 1592 .unwrap(); 1593 let (status, body) = json_response(resp).await; 1594 assert_eq!(status, StatusCode::OK); 1595 let item = &body["items"][0]; 1596 assert_eq!(item["state"], json!("closed")); 1597 assert_eq!(item["commentCount"], json!(2)); 1598 let updated = item["stateUpdatedAt"] 1599 .as_str() 1600 .expect("stateUpdatedAt must serialize as RFC3339 string"); 1601 assert!( 1602 updated.starts_with("2026-"), 1603 "expected 2026 timestamp, got {updated}" 1604 ); 1605} 1606 1607#[tokio::test] 1608async fn list_issues_defaults_to_open_when_no_state_record() { 1609 let h = Harness::new().await; 1610 let repo = did("did:plc:limpet"); 1611 let subject = at(&format!("at://{}", repo.as_ref())); 1612 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1613 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1614 h.mount( 1615 &did("did:plc:nel"), 1616 &nsid("sh.tangled.repo.issue"), 1617 &rkey("i1"), 1618 issue_body(&repo, "no state yet"), 1619 ) 1620 .await; 1621 1622 let app = router(h.state.clone()); 1623 let (_status, body) = json_response( 1624 app.oneshot(list_request( 1625 "sh.tangled.repo.listIssues", 1626 subject.as_ref(), 1627 &[], 1628 )) 1629 .await 1630 .unwrap(), 1631 ) 1632 .await; 1633 let item = &body["items"][0]; 1634 assert_eq!( 1635 item["state"], 1636 json!("open"), 1637 "absent state record defaults to open" 1638 ); 1639 assert!( 1640 item.get("stateUpdatedAt").is_none(), 1641 "stateUpdatedAt must be absent without a state record", 1642 ); 1643 assert_eq!(item["commentCount"], json!(0)); 1644} 1645 1646#[tokio::test] 1647async fn list_issues_author_filter_restricts_to_matching_did() { 1648 let h = Harness::new().await; 1649 let repo = did("did:plc:limpet"); 1650 let subject = at(&format!("at://{}", repo.as_ref())); 1651 let owners = [ 1652 ("did:plc:nel", "n1"), 1653 ("did:plc:nel", "n2"), 1654 ("did:plc:olaren", "o1"), 1655 ("did:plc:olaren", "o2"), 1656 ]; 1657 stream::iter(owners) 1658 .for_each(|(d, r)| { 1659 let h = &h; 1660 let subject = subject.clone(); 1661 let repo = repo.clone(); 1662 async move { 1663 let d_did = did(d); 1664 let rk = rkey(r); 1665 h.add_edge( 1666 &nsid("sh.tangled.repo.issue"), 1667 &subject, 1668 &at(&format!( 1669 "at://{}/sh.tangled.repo.issue/{}", 1670 d_did.as_ref(), 1671 rk.as_ref() 1672 )), 1673 ); 1674 h.mount( 1675 &d_did, 1676 &nsid("sh.tangled.repo.issue"), 1677 &rk, 1678 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 1679 ) 1680 .await; 1681 } 1682 }) 1683 .await; 1684 1685 let app = router(h.state.clone()); 1686 let (status, body) = json_response( 1687 app.oneshot(list_request( 1688 "sh.tangled.repo.listIssues", 1689 subject.as_ref(), 1690 &[("author", "did:plc:nel")], 1691 )) 1692 .await 1693 .unwrap(), 1694 ) 1695 .await; 1696 assert_eq!(status, StatusCode::OK); 1697 let items = body["items"].as_array().expect("items array"); 1698 assert_eq!(items.len(), 2, "two issues authored by nel"); 1699 let all_nel = items 1700 .iter() 1701 .all(|i| i["uri"].as_str().unwrap().starts_with("at://did:plc:nel/")); 1702 assert!(all_nel, "every returned uri must be authored by nel"); 1703} 1704 1705#[tokio::test] 1706async fn list_issues_invalid_author_returns_400() { 1707 let h = Harness::new().await; 1708 let subject = "at://did:plc:limpet".to_owned(); 1709 let app = router(h.state.clone()); 1710 let resp = app 1711 .oneshot(list_request( 1712 "sh.tangled.repo.listIssues", 1713 &subject, 1714 &[("author", "not-a-did")], 1715 )) 1716 .await 1717 .unwrap(); 1718 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 1719} 1720 1721#[tokio::test] 1722async fn list_pulls_includes_merged_state_and_comment_count() { 1723 let h = Harness::new().await; 1724 let repo = did("did:plc:limpet"); 1725 let subject = at(&format!("at://{}", repo.as_ref())); 1726 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1"); 1727 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri); 1728 h.mount( 1729 &did("did:plc:nel"), 1730 &nsid("sh.tangled.repo.pull"), 1731 &rkey("p1"), 1732 pull_body(&repo, "fix bug"), 1733 ) 1734 .await; 1735 h.add_edge( 1736 &nsid("sh.tangled.feed.comment"), 1737 &pull_uri, 1738 &at("at://did:plc:teq/sh.tangled.feed.comment/c1"), 1739 ); 1740 h.state.pull_statuses.upsert( 1741 at("at://did:plc:nel/sh.tangled.repo.pull.status/s1"), 1742 pull_uri.clone(), 1743 1_777_593_600_000_000, 1744 PullStatusKind::Open, 1745 ); 1746 h.state.pull_statuses.upsert( 1747 at("at://did:plc:nel/sh.tangled.repo.pull.status/s2"), 1748 pull_uri.clone(), 1749 1_777_593_800_000_000, 1750 PullStatusKind::Merged, 1751 ); 1752 1753 let app = router(h.state.clone()); 1754 let (status, body) = json_response( 1755 app.oneshot(list_request( 1756 "sh.tangled.repo.listPulls", 1757 subject.as_ref(), 1758 &[], 1759 )) 1760 .await 1761 .unwrap(), 1762 ) 1763 .await; 1764 assert_eq!(status, StatusCode::OK); 1765 let item = &body["items"][0]; 1766 assert_eq!(item["state"], json!("merged")); 1767 assert_eq!(item["commentCount"], json!(1)); 1768} 1769 1770#[tokio::test] 1771async fn list_issues_state_filter_open_includes_records_without_state() { 1772 let h = Harness::new().await; 1773 let repo = did("did:plc:limpet"); 1774 let subject = at(&format!("at://{}", repo.as_ref())); 1775 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1776 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1777 h.mount( 1778 &did("did:plc:nel"), 1779 &nsid("sh.tangled.repo.issue"), 1780 &rkey("i1"), 1781 issue_body(&repo, "fresh"), 1782 ) 1783 .await; 1784 1785 let app = router(h.state.clone()); 1786 let (status, body) = json_response( 1787 app.oneshot(list_request( 1788 "sh.tangled.repo.listIssues", 1789 subject.as_ref(), 1790 &[("state", "open")], 1791 )) 1792 .await 1793 .unwrap(), 1794 ) 1795 .await; 1796 assert_eq!(status, StatusCode::OK); 1797 let items = body["items"].as_array().expect("items array"); 1798 assert_eq!( 1799 items.len(), 1800 1, 1801 "absent state record still matches state=open" 1802 ); 1803} 1804 1805#[tokio::test] 1806async fn list_issues_state_filter_ignores_third_party_state_source() { 1807 let h = Harness::new().await; 1808 let repo = did("did:plc:limpet"); 1809 let subject = at(&format!("at://{}", repo.as_ref())); 1810 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1811 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1812 h.mount( 1813 &did("did:plc:nel"), 1814 &nsid("sh.tangled.repo.issue"), 1815 &rkey("i1"), 1816 issue_body(&repo, "open issue"), 1817 ) 1818 .await; 1819 h.state.issue_states.upsert( 1820 at("at://did:plc:nautilus/sh.tangled.repo.issue.state/spoof"), 1821 issue_uri.clone(), 1822 1_777_593_800_000_000, 1823 IssueStateKind::Closed, 1824 ); 1825 1826 let app = router(h.state.clone()); 1827 let (status, body) = json_response( 1828 app.oneshot(list_request( 1829 "sh.tangled.repo.listIssues", 1830 subject.as_ref(), 1831 &[("state", "open")], 1832 )) 1833 .await 1834 .unwrap(), 1835 ) 1836 .await; 1837 assert_eq!(status, StatusCode::OK); 1838 let items = body["items"].as_array().expect("items array"); 1839 assert_eq!( 1840 items.len(), 1841 1, 1842 "third-party Closed record must not flip filter result for state=open", 1843 ); 1844 assert_eq!(items[0]["state"], json!("open")); 1845 assert!( 1846 items[0].get("stateUpdatedAt").is_none(), 1847 "third-party state source must not surface stateUpdatedAt", 1848 ); 1849} 1850 1851#[tokio::test] 1852async fn list_pulls_status_filter_ignores_third_party_status_source() { 1853 let h = Harness::new().await; 1854 let repo = did("did:plc:limpet"); 1855 let subject = at(&format!("at://{}", repo.as_ref())); 1856 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1"); 1857 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri); 1858 h.mount( 1859 &did("did:plc:nel"), 1860 &nsid("sh.tangled.repo.pull"), 1861 &rkey("p1"), 1862 pull_body(&repo, "wip"), 1863 ) 1864 .await; 1865 h.state.pull_statuses.upsert( 1866 at("at://did:plc:nautilus/sh.tangled.repo.pull.status/spoof"), 1867 pull_uri.clone(), 1868 1_777_593_800_000_000, 1869 PullStatusKind::Merged, 1870 ); 1871 1872 let app = router(h.state.clone()); 1873 let (status, body) = json_response( 1874 app.oneshot(list_request( 1875 "sh.tangled.repo.listPulls", 1876 subject.as_ref(), 1877 &[("status", "merged")], 1878 )) 1879 .await 1880 .unwrap(), 1881 ) 1882 .await; 1883 assert_eq!(status, StatusCode::OK); 1884 let items = body["items"].as_array().expect("items array"); 1885 assert_eq!( 1886 items.len(), 1887 0, 1888 "third-party Merged record must not satisfy status=merged" 1889 ); 1890} 1891 1892#[tokio::test] 1893async fn list_issues_state_filter_accepts_repo_owner_state_source() { 1894 let h = Harness::new().await; 1895 let repo_owner = did("did:plc:limpet"); 1896 let subject = at(&format!("at://{}", repo_owner.as_ref())); 1897 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1"); 1898 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1899 h.mount( 1900 &did("did:plc:nel"), 1901 &nsid("sh.tangled.repo.issue"), 1902 &rkey("i1"), 1903 issue_body(&repo_owner, "owner closed"), 1904 ) 1905 .await; 1906 h.state.issue_states.upsert( 1907 at("at://did:plc:limpet/sh.tangled.repo.issue.state/legit"), 1908 issue_uri.clone(), 1909 1_777_593_800_000_000, 1910 IssueStateKind::Closed, 1911 ); 1912 1913 let app = router(h.state.clone()); 1914 let (status, body) = json_response( 1915 app.oneshot(list_request( 1916 "sh.tangled.repo.listIssues", 1917 subject.as_ref(), 1918 &[("state", "closed")], 1919 )) 1920 .await 1921 .unwrap(), 1922 ) 1923 .await; 1924 assert_eq!(status, StatusCode::OK); 1925 let items = body["items"].as_array().expect("items array"); 1926 assert_eq!( 1927 items.len(), 1928 1, 1929 "repo-owner state record must satisfy state=closed" 1930 ); 1931 assert_eq!(items[0]["state"], json!("closed")); 1932} 1933 1934#[tokio::test] 1935async fn list_issues_order_asc_returns_oldest_first() { 1936 let h = Harness::new().await; 1937 let repo = did("did:plc:limpet"); 1938 let subject = at(&format!("at://{}", repo.as_ref())); 1939 let rkeys = ["a", "b", "c"]; 1940 stream::iter(rkeys) 1941 .for_each(|r| { 1942 let h = &h; 1943 let subject = subject.clone(); 1944 let repo = repo.clone(); 1945 async move { 1946 let rk = rkey(r); 1947 let issue_uri = at(&format!( 1948 "at://did:plc:nel/sh.tangled.repo.issue/{}", 1949 rk.as_ref() 1950 )); 1951 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri); 1952 h.mount( 1953 &did("did:plc:nel"), 1954 &nsid("sh.tangled.repo.issue"), 1955 &rk, 1956 issue_body(&repo, &format!("issue-{}", rk.as_ref())), 1957 ) 1958 .await; 1959 } 1960 }) 1961 .await; 1962 1963 let app = router(h.state.clone()); 1964 let (_, asc) = json_response( 1965 app.clone() 1966 .oneshot(list_request( 1967 "sh.tangled.repo.listIssues", 1968 subject.as_ref(), 1969 &[("order", "asc")], 1970 )) 1971 .await 1972 .unwrap(), 1973 ) 1974 .await; 1975 let (_, desc) = json_response( 1976 app.oneshot(list_request( 1977 "sh.tangled.repo.listIssues", 1978 subject.as_ref(), 1979 &[("order", "desc")], 1980 )) 1981 .await 1982 .unwrap(), 1983 ) 1984 .await; 1985 let asc_uris: Vec<_> = asc["items"] 1986 .as_array() 1987 .unwrap() 1988 .iter() 1989 .map(|i| i["uri"].as_str().unwrap().to_owned()) 1990 .collect(); 1991 let desc_uris: Vec<_> = desc["items"] 1992 .as_array() 1993 .unwrap() 1994 .iter() 1995 .map(|i| i["uri"].as_str().unwrap().to_owned()) 1996 .collect(); 1997 let mut reversed = asc_uris.clone(); 1998 reversed.reverse(); 1999 assert_eq!(asc_uris.len(), 3); 2000 assert_eq!(desc_uris, reversed, "desc must be exact reverse of asc"); 2001} 2002 2003#[tokio::test] 2004async fn list_issues_by_state_filter_narrows_results() { 2005 let h = Harness::new().await; 2006 let author = did("did:plc:nel"); 2007 let repo = did("did:plc:limpet"); 2008 let open_uri = at("at://did:plc:nel/sh.tangled.repo.issue/open1"); 2009 let closed_uri = at("at://did:plc:nel/sh.tangled.repo.issue/closed1"); 2010 let author_subject = at(&format!("at://{}", author.as_ref())); 2011 h.edges.add(Edge { 2012 kind: nsid("sh.tangled.repo.issue.by"), 2013 subject: SubjectRef::Did(author.clone()), 2014 source: open_uri.clone(), 2015 sort_micros: next_sort_micros(), 2016 }); 2017 h.edges.add(Edge { 2018 kind: nsid("sh.tangled.repo.issue.by"), 2019 subject: SubjectRef::Did(author.clone()), 2020 source: closed_uri.clone(), 2021 sort_micros: next_sort_micros(), 2022 }); 2023 h.mount( 2024 &author, 2025 &nsid("sh.tangled.repo.issue"), 2026 &rkey("open1"), 2027 issue_body(&repo, "still open"), 2028 ) 2029 .await; 2030 h.mount( 2031 &author, 2032 &nsid("sh.tangled.repo.issue"), 2033 &rkey("closed1"), 2034 issue_body(&repo, "shut"), 2035 ) 2036 .await; 2037 h.state.issue_states.upsert( 2038 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"), 2039 closed_uri.clone(), 2040 1_777_593_800_000_000, 2041 IssueStateKind::Closed, 2042 ); 2043 2044 let app = router(h.state.clone()); 2045 let (status, body) = json_response( 2046 app.oneshot(list_request( 2047 "sh.tangled.repo.listIssuesBy", 2048 author_subject.as_ref(), 2049 &[("state", "closed")], 2050 )) 2051 .await 2052 .unwrap(), 2053 ) 2054 .await; 2055 assert_eq!(status, StatusCode::OK); 2056 let items = body["items"].as_array().expect("items array"); 2057 assert_eq!( 2058 items.len(), 2059 1, 2060 "only the closed issue survives state=closed" 2061 ); 2062 assert_eq!(items[0]["uri"], json!(closed_uri.as_ref())); 2063} 2064 2065#[tokio::test] 2066async fn knot_owned_member_is_synthesized_without_slingshot() { 2067 let harness = Harness::new().await; 2068 let knot = bobbin_types::knot_acl::host_to_knot_did("kt.oyster.cafe").unwrap(); 2069 let subject = did("did:plc:boltless"); 2070 let created = chrono::DateTime::parse_from_rfc3339("2026-06-01T00:00:00Z").unwrap(); 2071 let micros = created.timestamp_micros() as u64; 2072 let (source, edges) = bobbin_types::knot_acl::member_upsert(&knot, &subject, micros).unwrap(); 2073 harness.edges.upsert_source(&source, edges); 2074 harness.promote_ready(1, 1); 2075 2076 let (status, body) = json_response( 2077 router(harness.state.clone()) 2078 .oneshot(list_request( 2079 "sh.tangled.knot.listMembers", 2080 subject.as_ref(), 2081 &[], 2082 )) 2083 .await 2084 .unwrap(), 2085 ) 2086 .await; 2087 2088 assert_eq!(status, StatusCode::OK); 2089 let items = body["items"].as_array().expect("items array"); 2090 assert_eq!( 2091 items.len(), 2092 1, 2093 "synthesized member must hydrate with no slingshot mock mounted" 2094 ); 2095 assert_eq!(items[0]["uri"], json!(source.as_ref())); 2096 assert!(items[0]["cid"].is_null()); 2097 assert_eq!(items[0]["value"]["domain"], json!("kt.oyster.cafe")); 2098 assert_eq!(items[0]["value"]["subject"], json!("did:plc:boltless")); 2099 let got = chrono::DateTime::parse_from_rfc3339( 2100 items[0]["value"]["createdAt"] 2101 .as_str() 2102 .expect("createdAt string"), 2103 ) 2104 .unwrap(); 2105 assert_eq!(got.timestamp_micros(), micros as i64); 2106} 2107 2108#[tokio::test] 2109async fn knot_owned_member_lists_by_knot_did() { 2110 let harness = Harness::new().await; 2111 let knot = bobbin_types::knot_acl::host_to_knot_did("kt.oyster.cafe").unwrap(); 2112 let subject = did("did:plc:boltless"); 2113 let created = chrono::DateTime::parse_from_rfc3339("2026-06-01T00:00:00Z").unwrap(); 2114 let micros = created.timestamp_micros() as u64; 2115 let (source, edges) = bobbin_types::knot_acl::member_upsert(&knot, &subject, micros).unwrap(); 2116 harness.edges.upsert_source(&source, edges); 2117 harness.promote_ready(1, 1); 2118 2119 let (status, body) = json_response( 2120 router(harness.state.clone()) 2121 .oneshot(list_request( 2122 "sh.tangled.knot.listMembersBy", 2123 knot.as_ref(), 2124 &[], 2125 )) 2126 .await 2127 .unwrap(), 2128 ) 2129 .await; 2130 2131 assert_eq!(status, StatusCode::OK); 2132 let items = body["items"].as_array().expect("items array"); 2133 assert_eq!(items.len(), 1); 2134 assert_eq!(items[0]["uri"], json!(source.as_ref())); 2135 assert!(items[0]["cid"].is_null()); 2136 assert_eq!(items[0]["value"]["domain"], json!("kt.oyster.cafe")); 2137 assert_eq!(items[0]["value"]["subject"], json!("did:plc:boltless")); 2138} 2139 2140#[tokio::test] 2141async fn knot_owned_collaborator_is_synthesized_without_slingshot() { 2142 let harness = Harness::new().await; 2143 let repo = did("did:plc:scallop"); 2144 let subject = did("did:plc:olaren"); 2145 let created = chrono::DateTime::parse_from_rfc3339("2026-06-03T12:00:00Z").unwrap(); 2146 let micros = created.timestamp_micros() as u64; 2147 let (source, edges) = 2148 bobbin_types::knot_acl::collaborator_upsert(&repo, &subject, micros).unwrap(); 2149 harness.edges.upsert_source(&source, edges); 2150 harness.promote_ready(1, 1); 2151 2152 let (status, body) = json_response( 2153 router(harness.state.clone()) 2154 .oneshot(list_request( 2155 "sh.tangled.repo.listCollaborators", 2156 repo.as_ref(), 2157 &[], 2158 )) 2159 .await 2160 .unwrap(), 2161 ) 2162 .await; 2163 2164 assert_eq!(status, StatusCode::OK); 2165 let items = body["items"].as_array().expect("items array"); 2166 assert_eq!(items.len(), 1); 2167 assert_eq!(items[0]["uri"], json!(source.as_ref())); 2168 assert!(items[0]["cid"].is_null()); 2169 assert_eq!(items[0]["value"]["repo"], json!("did:plc:scallop")); 2170 assert_eq!(items[0]["value"]["subject"], json!("did:plc:olaren")); 2171} 2172 2173#[tokio::test] 2174async fn knot_owned_collaborator_lists_by_subject_did() { 2175 let harness = Harness::new().await; 2176 let repo = did("did:plc:scallop"); 2177 let subject = did("did:plc:olaren"); 2178 let created = chrono::DateTime::parse_from_rfc3339("2026-06-03T12:00:00Z").unwrap(); 2179 let micros = created.timestamp_micros() as u64; 2180 let (source, edges) = 2181 bobbin_types::knot_acl::collaborator_upsert(&repo, &subject, micros).unwrap(); 2182 harness.edges.upsert_source(&source, edges); 2183 harness.promote_ready(1, 1); 2184 2185 let (status, body) = json_response( 2186 router(harness.state.clone()) 2187 .oneshot(list_request( 2188 "sh.tangled.repo.listCollaboratorsBy", 2189 subject.as_ref(), 2190 &[], 2191 )) 2192 .await 2193 .unwrap(), 2194 ) 2195 .await; 2196 2197 assert_eq!(status, StatusCode::OK); 2198 let items = body["items"].as_array().expect("items array"); 2199 assert_eq!(items.len(), 1); 2200 assert_eq!(items[0]["uri"], json!(source.as_ref())); 2201 assert!(items[0]["cid"].is_null()); 2202 assert_eq!(items[0]["value"]["repo"], json!("did:plc:scallop")); 2203 assert_eq!(items[0]["value"]["subject"], json!("did:plc:olaren")); 2204}