Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{
5 Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, PageToken, PullStatusKind,
6 StateIndex,
7};
8use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
9use bobbin_record_lru::{CacheCapacity, LruRecordStore};
10use bobbin_resolver::RepoIdResolver;
11use bobbin_runtime::{RuntimeHasher, SystemClock};
12use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
13use bobbin_slingshot_client::SlingshotClient;
14use bobbin_types::edges::Edge;
15use bobbin_types::ids::SubjectRef;
16use bobbin_xrpc::{AppState, router};
17use futures::stream::{self, StreamExt};
18use http::{Request, StatusCode};
19use jacquard_common::DefaultStr;
20use jacquard_common::types::did::Did;
21use jacquard_common::types::nsid::Nsid;
22use jacquard_common::types::recordkey::Rkey;
23use jacquard_common::types::string::AtUri;
24use serde_json::{Value, json};
25use tower::ServiceExt;
26use url::Url;
27use url::form_urlencoded::byte_serialize;
28use wiremock::matchers::{method, path, query_param};
29use wiremock::{Mock, MockServer, ResponseTemplate};
30
31const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
32
33fn at(s: &str) -> AtUri<DefaultStr> {
34 AtUri::new_owned(s).unwrap()
35}
36
37fn did(s: &str) -> Did<DefaultStr> {
38 Did::new_owned(s).unwrap()
39}
40
41fn rkey(s: &str) -> Rkey<DefaultStr> {
42 Rkey::new_owned(s).unwrap()
43}
44
45fn nsid(s: &'static str) -> Nsid<DefaultStr> {
46 Nsid::new_static(s).unwrap()
47}
48
49fn subj(s: &str) -> SubjectRef {
50 Did::<DefaultStr>::new_owned(s)
51 .map(SubjectRef::Did)
52 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap()))
53}
54
55struct Harness {
56 server: MockServer,
57 edges: Arc<EdgeStore>,
58 coverage: Arc<CoverageWatch>,
59 state: AppState,
60}
61
62static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
63
64fn next_sort_micros() -> u64 {
65 EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
66}
67
68impl Harness {
69 async fn new() -> Self {
70 let server = MockServer::start().await;
71 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default()));
72 let issue_states = Arc::new(StateIndex::new(RuntimeHasher::default()));
73 let pull_statuses = Arc::new(StateIndex::new(RuntimeHasher::default()));
74 let coverage = Arc::new(CoverageWatch::new());
75 let state = AppState::new(
76 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
77 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
78 edges.clone(),
79 issue_states.clone(),
80 pull_statuses.clone(),
81 coverage.clone(),
82 Arc::new(
83 KnotProxy::new(
84 KnotProxyConfig::default(),
85 KnotHttpConfig::default(),
86 Arc::new(SystemClock::new()),
87 RuntimeHasher::default(),
88 )
89 .unwrap(),
90 ),
91 Arc::new(
92 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
93 ) as Arc<dyn SearchReader>,
94 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
95 );
96 Self {
97 server,
98 edges,
99 coverage,
100 state,
101 }
102 }
103
104 fn add_edge(
105 &self,
106 kind: &Nsid<DefaultStr>,
107 subject: &AtUri<DefaultStr>,
108 source: &AtUri<DefaultStr>,
109 ) {
110 self.edges.add(Edge {
111 kind: kind.clone(),
112 subject: subj(subject.as_ref()),
113 source: source.clone(),
114 sort_micros: next_sort_micros(),
115 });
116 }
117
118 async fn mount(
119 &self,
120 did: &Did<DefaultStr>,
121 collection: &Nsid<DefaultStr>,
122 rkey: &Rkey<DefaultStr>,
123 value: Value,
124 ) {
125 let uri = format!(
126 "at://{}/{}/{}",
127 did.as_ref(),
128 collection.as_ref(),
129 rkey.as_ref()
130 );
131 let body = json!({ "uri": uri, "cid": CID, "value": value });
132 Mock::given(method("GET"))
133 .and(path("/xrpc/com.atproto.repo.getRecord"))
134 .and(query_param("repo", did.as_ref()))
135 .and(query_param("collection", collection.as_ref()))
136 .and(query_param("rkey", rkey.as_ref()))
137 .respond_with(ResponseTemplate::new(200).set_body_json(body))
138 .mount(&self.server)
139 .await;
140 }
141
142 fn promote_ready(&self, events: u64, cursor: u64) {
143 self.coverage.update(|_| Coverage::Ready {
144 events_processed: events,
145 last_cursor: HydrantCursor::new(cursor),
146 });
147 }
148
149 fn warming(&self, events: u64, cursor: u64) {
150 self.coverage.update(|_| Coverage::Warming {
151 events_processed: events,
152 last_cursor: HydrantCursor::new(cursor),
153 });
154 }
155}
156
157fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> {
158 let mut qs = format!("subject={subject}");
159 extras.iter().for_each(|(k, v)| {
160 qs.push('&');
161 qs.push_str(k);
162 qs.push('=');
163 qs.push_str(&encode(v));
164 });
165 Request::builder()
166 .uri(format!("/xrpc/{endpoint}?{qs}"))
167 .body(Body::empty())
168 .unwrap()
169}
170
171fn encode(s: &str) -> String {
172 byte_serialize(s.as_bytes()).collect()
173}
174
175async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
176 let status = resp.status();
177 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
178 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
179 (status, parsed)
180}
181
182fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value {
183 json!({
184 "$type": "sh.tangled.repo.issue",
185 "repo": repo_did.as_ref(),
186 "title": title,
187 "createdAt": "2026-05-01T00:00:00Z"
188 })
189}
190
191fn pull_body(repo_did: &Did<DefaultStr>, title: &str) -> Value {
192 json!({
193 "$type": "sh.tangled.repo.pull",
194 "title": title,
195 "createdAt": "2026-05-01T00:00:00Z",
196 "rounds": [],
197 "target": {
198 "repo": repo_did.as_ref(),
199 "branch": "main"
200 }
201 })
202}
203
204fn star_body(subject_did: &Did<DefaultStr>) -> Value {
205 json!({
206 "$type": "sh.tangled.feed.star",
207 "createdAt": "2026-05-01T00:00:00Z",
208 "subject": {
209 "$type": "sh.tangled.feed.star#repo",
210 "did": subject_did.as_ref()
211 }
212 })
213}
214
215fn follow_body(subject_did: &Did<DefaultStr>) -> Value {
216 json!({
217 "$type": "sh.tangled.graph.follow",
218 "createdAt": "2026-05-01T00:00:00Z",
219 "subject": subject_did.as_ref()
220 })
221}
222
223#[tokio::test]
224async fn list_issues_with_no_edges_returns_empty_items() {
225 let h = Harness::new().await;
226 let app = router(h.state.clone());
227 let resp = app
228 .oneshot(list_request(
229 "sh.tangled.repo.listIssues",
230 "at://did:plc:abalone",
231 &[],
232 ))
233 .await
234 .unwrap();
235 let (status, body) = json_response(resp).await;
236 assert_eq!(status, StatusCode::OK);
237 assert_eq!(body["items"], json!([]));
238 assert!(body["cursor"].is_null());
239}
240
241#[tokio::test]
242async fn count_issues_with_no_edges_returns_zero() {
243 let h = Harness::new().await;
244 let app = router(h.state.clone());
245 let resp = app
246 .oneshot(list_request(
247 "sh.tangled.repo.countIssues",
248 "at://did:plc:abalone",
249 &[],
250 ))
251 .await
252 .unwrap();
253 let (status, body) = json_response(resp).await;
254 assert_eq!(status, StatusCode::OK);
255 assert_eq!(body["count"], json!(0));
256 assert_eq!(body["distinctAuthors"], json!(0));
257}
258
259#[tokio::test]
260async fn list_issues_hydrates_via_slingshot_when_edges_present() {
261 let h = Harness::new().await;
262 let repo = did("did:plc:abalone");
263 let subject = at(&format!("at://{}", repo.as_ref()));
264 let owners = [
265 ("did:plc:nel", "i1", "first"),
266 ("did:plc:olaren", "i2", "second"),
267 ];
268 stream::iter(owners)
269 .for_each(|(d, r, title)| {
270 let h = &h;
271 let subject = subject.clone();
272 let repo = repo.clone();
273 async move {
274 let d_did = did(d);
275 let rk = rkey(r);
276 h.add_edge(
277 &nsid("sh.tangled.repo.issue"),
278 &subject,
279 &at(&format!(
280 "at://{}/sh.tangled.repo.issue/{}",
281 d_did.as_ref(),
282 rk.as_ref()
283 )),
284 );
285 h.mount(
286 &d_did,
287 &nsid("sh.tangled.repo.issue"),
288 &rk,
289 issue_body(&repo, title),
290 )
291 .await;
292 }
293 })
294 .await;
295
296 let app = router(h.state.clone());
297 let resp = app
298 .oneshot(list_request(
299 "sh.tangled.repo.listIssues",
300 subject.as_ref(),
301 &[],
302 ))
303 .await
304 .unwrap();
305 let (status, body) = json_response(resp).await;
306 assert_eq!(status, StatusCode::OK);
307 let items = body["items"].as_array().expect("items array");
308 assert_eq!(items.len(), 2);
309 let titles: Vec<&str> = items
310 .iter()
311 .map(|v| v["value"]["title"].as_str().unwrap())
312 .collect();
313 assert!(titles.contains(&"first"));
314 assert!(titles.contains(&"second"));
315 assert_eq!(items[0]["cid"], CID);
316 assert!(items[0]["uri"].as_str().unwrap().starts_with("at://"));
317}
318
319#[tokio::test]
320async fn count_distinct_authors_dedupes_per_author() {
321 let h = Harness::new().await;
322 let subject = at("at://did:plc:abalone");
323 h.add_edge(
324 &nsid("sh.tangled.feed.star"),
325 &subject,
326 &at("at://did:plc:nel/sh.tangled.feed.star/s1"),
327 );
328 h.add_edge(
329 &nsid("sh.tangled.feed.star"),
330 &subject,
331 &at("at://did:plc:nel/sh.tangled.feed.star/s2"),
332 );
333 h.add_edge(
334 &nsid("sh.tangled.feed.star"),
335 &subject,
336 &at("at://did:plc:olaren/sh.tangled.feed.star/s3"),
337 );
338
339 let app = router(h.state.clone());
340 let resp = app
341 .oneshot(list_request(
342 "sh.tangled.feed.countStars",
343 subject.as_ref(),
344 &[],
345 ))
346 .await
347 .unwrap();
348 let (_, body) = json_response(resp).await;
349 assert_eq!(body["count"], json!(3));
350 assert_eq!(body["distinctAuthors"], json!(2));
351}
352
353#[tokio::test]
354async fn list_items_stable_across_coverage_promotion() {
355 let h = Harness::new().await;
356 let subject = at("at://did:plc:abalone");
357 let nel = did("did:plc:nel");
358 h.add_edge(
359 &nsid("sh.tangled.feed.star"),
360 &subject,
361 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())),
362 );
363 h.mount(
364 &nel,
365 &nsid("sh.tangled.feed.star"),
366 &rkey("s1"),
367 star_body(&did("did:plc:abalone")),
368 )
369 .await;
370
371 let app = router(h.state.clone());
372 h.warming(1, 5);
373 let (_, before) = json_response(
374 app.clone()
375 .oneshot(list_request(
376 "sh.tangled.feed.listStars",
377 subject.as_ref(),
378 &[],
379 ))
380 .await
381 .unwrap(),
382 )
383 .await;
384 assert_eq!(before["items"].as_array().unwrap().len(), 1);
385
386 h.promote_ready(2, 9);
387 let (_, after) = json_response(
388 app.oneshot(list_request(
389 "sh.tangled.feed.listStars",
390 subject.as_ref(),
391 &[],
392 ))
393 .await
394 .unwrap(),
395 )
396 .await;
397 assert_eq!(
398 after["items"].as_array().unwrap().len(),
399 before["items"].as_array().unwrap().len(),
400 );
401 assert_eq!(after["items"], before["items"]);
402}
403
404#[tokio::test]
405async fn list_paginates_via_cursor() {
406 let h = Harness::new().await;
407 let subject = at("at://did:plc:abalone");
408 let repo = did("did:plc:abalone");
409 let owners = [
410 ("did:plc:nel", "i1"),
411 ("did:plc:olaren", "i2"),
412 ("did:plc:teq", "i3"),
413 ("did:plc:lyna", "i4"),
414 ("did:plc:bailey", "i5"),
415 ];
416 stream::iter(owners)
417 .for_each(|(d, r)| {
418 let h = &h;
419 let subject = subject.clone();
420 let repo = repo.clone();
421 async move {
422 let d_did = did(d);
423 let rk = rkey(r);
424 h.add_edge(
425 &nsid("sh.tangled.repo.issue"),
426 &subject,
427 &at(&format!(
428 "at://{}/sh.tangled.repo.issue/{}",
429 d_did.as_ref(),
430 rk.as_ref()
431 )),
432 );
433 h.mount(
434 &d_did,
435 &nsid("sh.tangled.repo.issue"),
436 &rk,
437 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
438 )
439 .await;
440 }
441 })
442 .await;
443
444 let app = router(h.state.clone());
445 let (_, page1) = json_response(
446 app.clone()
447 .oneshot(list_request(
448 "sh.tangled.repo.listIssues",
449 subject.as_ref(),
450 &[("limit", "2")],
451 ))
452 .await
453 .unwrap(),
454 )
455 .await;
456 let page1_items = page1["items"].as_array().unwrap().clone();
457 assert_eq!(page1_items.len(), 2);
458 let cursor = page1["cursor"]
459 .as_str()
460 .expect("first page must yield a cursor")
461 .to_owned();
462 assert!(
463 PageToken::decode_token(&cursor).is_ok(),
464 "cursor must be a TID-shaped token"
465 );
466
467 let (_, page2) = json_response(
468 app.oneshot(list_request(
469 "sh.tangled.repo.listIssues",
470 subject.as_ref(),
471 &[("limit", "10"), ("cursor", &cursor)],
472 ))
473 .await
474 .unwrap(),
475 )
476 .await;
477 let page2_items = page2["items"].as_array().unwrap().clone();
478 assert_eq!(page2_items.len(), 3);
479 assert!(page2["cursor"].is_null(), "tail page must not promise more");
480
481 let union: Vec<&str> = page1_items
482 .iter()
483 .chain(page2_items.iter())
484 .map(|item| item["uri"].as_str().unwrap())
485 .collect();
486 assert_eq!(union.len(), owners.len(), "union covers every owner");
487 let mut sorted = union.clone();
488 sorted.sort();
489 sorted.dedup();
490 assert_eq!(sorted.len(), owners.len(), "no duplicates across pages");
491}
492
493#[tokio::test]
494async fn pagination_unaffected_by_coverage_promotion() {
495 let h = Harness::new().await;
496 let subject = at("at://did:plc:abalone");
497 let repo = did("did:plc:abalone");
498 let owners = [("did:plc:nel", "i1"), ("did:plc:olaren", "i2")];
499 stream::iter(owners)
500 .for_each(|(d, r)| {
501 let h = &h;
502 let subject = subject.clone();
503 let repo = repo.clone();
504 async move {
505 let d_did = did(d);
506 let rk = rkey(r);
507 h.add_edge(
508 &nsid("sh.tangled.repo.issue"),
509 &subject,
510 &at(&format!(
511 "at://{}/sh.tangled.repo.issue/{}",
512 d_did.as_ref(),
513 rk.as_ref()
514 )),
515 );
516 h.mount(
517 &d_did,
518 &nsid("sh.tangled.repo.issue"),
519 &rk,
520 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
521 )
522 .await;
523 }
524 })
525 .await;
526
527 h.warming(1, 5);
528 let app = router(h.state.clone());
529 let (_, page1) = json_response(
530 app.clone()
531 .oneshot(list_request(
532 "sh.tangled.repo.listIssues",
533 subject.as_ref(),
534 &[("limit", "1")],
535 ))
536 .await
537 .unwrap(),
538 )
539 .await;
540 let cursor = page1["cursor"].as_str().unwrap().to_owned();
541
542 h.promote_ready(2, 9);
543 let (_, page2) = json_response(
544 app.oneshot(list_request(
545 "sh.tangled.repo.listIssues",
546 subject.as_ref(),
547 &[("limit", "10"), ("cursor", &cursor)],
548 ))
549 .await
550 .unwrap(),
551 )
552 .await;
553 assert_eq!(page2["items"].as_array().unwrap().len(), 1);
554}
555
556#[tokio::test]
557async fn invalid_cursor_returns_400() {
558 let h = Harness::new().await;
559 let app = router(h.state.clone());
560 let resp = app
561 .oneshot(list_request(
562 "sh.tangled.repo.listIssues",
563 "at://did:plc:abalone",
564 &[("cursor", "not-a-number")],
565 ))
566 .await
567 .unwrap();
568 let (status, body) = json_response(resp).await;
569 assert_eq!(status, StatusCode::BAD_REQUEST);
570 assert_eq!(body["error"], "InvalidRequest");
571}
572
573#[tokio::test]
574async fn list_follows_subject_is_followee_did() {
575 let h = Harness::new().await;
576 let followee = did("did:plc:bailey");
577 let subject = at(&format!("at://{}", followee.as_ref()));
578 h.add_edge(
579 &nsid("sh.tangled.graph.follow"),
580 &subject,
581 &at("at://did:plc:nel/sh.tangled.graph.follow/f1"),
582 );
583 h.mount(
584 &did("did:plc:nel"),
585 &nsid("sh.tangled.graph.follow"),
586 &rkey("f1"),
587 follow_body(&followee),
588 )
589 .await;
590
591 let app = router(h.state.clone());
592 let (status, body) = json_response(
593 app.oneshot(list_request(
594 "sh.tangled.graph.listFollows",
595 subject.as_ref(),
596 &[],
597 ))
598 .await
599 .unwrap(),
600 )
601 .await;
602 assert_eq!(status, StatusCode::OK);
603 let items = body["items"].as_array().unwrap();
604 assert_eq!(items.len(), 1);
605 assert_eq!(items[0]["value"]["subject"], followee.as_ref());
606}
607
608#[tokio::test]
609async fn upstream_failure_during_hydration_drops_only_that_item() {
610 let h = Harness::new().await;
611 let subject = at("at://did:plc:squid");
612 let kind = nsid("sh.tangled.repo.issue");
613 let repo = did("did:plc:squid");
614 h.add_edge(
615 &kind,
616 &subject,
617 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"),
618 );
619 h.add_edge(
620 &kind,
621 &subject,
622 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"),
623 );
624 h.mount(
625 &did("did:plc:nel"),
626 &kind,
627 &rkey("ok"),
628 issue_body(&repo, "kelp survey"),
629 )
630 .await;
631 Mock::given(method("GET"))
632 .and(path("/xrpc/com.atproto.repo.getRecord"))
633 .and(query_param("repo", "did:plc:teq"))
634 .and(query_param("collection", "sh.tangled.repo.issue"))
635 .and(query_param("rkey", "flaky"))
636 .respond_with(ResponseTemplate::new(503))
637 .mount(&h.server)
638 .await;
639
640 let app = router(h.state.clone());
641 let (status, body) = json_response(
642 app.oneshot(list_request(
643 "sh.tangled.repo.listIssues",
644 subject.as_ref(),
645 &[],
646 ))
647 .await
648 .unwrap(),
649 )
650 .await;
651 assert_eq!(status, StatusCode::OK);
652 let items = body["items"].as_array().expect("items array");
653 assert_eq!(items.len(), 1, "flaky item dropped, healthy sibling kept");
654 assert_eq!(
655 items[0]["uri"].as_str().unwrap(),
656 "at://did:plc:nel/sh.tangled.repo.issue/ok",
657 );
658}
659
660#[tokio::test]
661async fn transient_failure_keeps_edge_so_count_stays_whole() {
662 let h = Harness::new().await;
663 let subject = at("at://did:plc:squid");
664 let kind = nsid("sh.tangled.repo.issue");
665 let repo = did("did:plc:squid");
666 h.add_edge(
667 &kind,
668 &subject,
669 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"),
670 );
671 h.add_edge(
672 &kind,
673 &subject,
674 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"),
675 );
676 h.mount(
677 &did("did:plc:nel"),
678 &kind,
679 &rkey("ok"),
680 issue_body(&repo, "kelp survey"),
681 )
682 .await;
683 Mock::given(method("GET"))
684 .and(path("/xrpc/com.atproto.repo.getRecord"))
685 .and(query_param("repo", "did:plc:teq"))
686 .and(query_param("collection", "sh.tangled.repo.issue"))
687 .and(query_param("rkey", "flaky"))
688 .respond_with(ResponseTemplate::new(503))
689 .mount(&h.server)
690 .await;
691
692 let app = router(h.state.clone());
693 let (status, body) = json_response(
694 app.clone()
695 .oneshot(list_request(
696 "sh.tangled.repo.listIssues",
697 subject.as_ref(),
698 &[],
699 ))
700 .await
701 .unwrap(),
702 )
703 .await;
704 assert_eq!(status, StatusCode::OK);
705 assert_eq!(body["items"].as_array().unwrap().len(), 1);
706
707 let (cstatus, cbody) = json_response(
708 app.oneshot(list_request(
709 "sh.tangled.repo.countIssues",
710 subject.as_ref(),
711 &[],
712 ))
713 .await
714 .unwrap(),
715 )
716 .await;
717 assert_eq!(cstatus, StatusCode::OK);
718 assert_eq!(
719 cbody["count"],
720 json!(2),
721 "a transient 503 must not evict the edge, count stays whole",
722 );
723}
724
725#[tokio::test]
726async fn gone_item_is_evicted_so_count_converges_to_list() {
727 let h = Harness::new().await;
728 let subject = at("at://did:plc:squid");
729 let kind = nsid("sh.tangled.repo.issue");
730 let repo = did("did:plc:squid");
731 h.add_edge(
732 &kind,
733 &subject,
734 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"),
735 );
736 h.add_edge(
737 &kind,
738 &subject,
739 &at("at://did:plc:teq/sh.tangled.repo.issue/gone"),
740 );
741 h.mount(
742 &did("did:plc:nel"),
743 &kind,
744 &rkey("ok"),
745 issue_body(&repo, "kelp survey"),
746 )
747 .await;
748 Mock::given(method("GET"))
749 .and(path("/xrpc/com.atproto.repo.getRecord"))
750 .and(query_param("repo", "did:plc:teq"))
751 .and(query_param("collection", "sh.tangled.repo.issue"))
752 .and(query_param("rkey", "gone"))
753 .respond_with(ResponseTemplate::new(404))
754 .mount(&h.server)
755 .await;
756
757 let app = router(h.state.clone());
758 let (status, body) = json_response(
759 app.clone()
760 .oneshot(list_request(
761 "sh.tangled.repo.listIssues",
762 subject.as_ref(),
763 &[],
764 ))
765 .await
766 .unwrap(),
767 )
768 .await;
769 assert_eq!(status, StatusCode::OK);
770 assert_eq!(
771 body["items"].as_array().unwrap().len(),
772 1,
773 "gone item dropped from the page"
774 );
775
776 let (cstatus, cbody) = json_response(
777 app.oneshot(list_request(
778 "sh.tangled.repo.countIssues",
779 subject.as_ref(),
780 &[],
781 ))
782 .await
783 .unwrap(),
784 )
785 .await;
786 assert_eq!(cstatus, StatusCode::OK);
787 assert_eq!(
788 cbody["count"],
789 json!(1),
790 "a definitive 404 must evict the dead edge so count matches the list",
791 );
792}
793
794#[tokio::test]
795async fn handle_authority_subject_is_400() {
796 let h = Harness::new().await;
797 let app = router(h.state.clone());
798 let cases = [
799 "sh.tangled.feed.listStars",
800 "sh.tangled.feed.countStars",
801 "sh.tangled.graph.listFollows",
802 "sh.tangled.graph.countFollows",
803 "sh.tangled.repo.listIssues",
804 "sh.tangled.repo.countIssues",
805 "sh.tangled.repo.listPulls",
806 "sh.tangled.repo.countPulls",
807 "sh.tangled.feed.listComments",
808 "sh.tangled.feed.countComments",
809 ];
810 stream::iter(cases)
811 .for_each(|endpoint| {
812 let app = app.clone();
813 async move {
814 let resp = app
815 .oneshot(list_request(endpoint, "at://oyster.cafe", &[]))
816 .await
817 .unwrap();
818 let (status, body) = json_response(resp).await;
819 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}");
820 assert_eq!(body["error"], "InvalidRequest", "{endpoint}");
821 assert!(
822 body["message"]
823 .as_str()
824 .unwrap_or_default()
825 .contains("did, not a handle"),
826 "{endpoint}: {}",
827 body["message"]
828 );
829 }
830 })
831 .await;
832}
833
834#[tokio::test]
835async fn empty_subject_is_400() {
836 let h = Harness::new().await;
837 let app = router(h.state.clone());
838 let resp = app
839 .oneshot(list_request("sh.tangled.repo.listIssues", "", &[]))
840 .await
841 .unwrap();
842 let (status, body) = json_response(resp).await;
843 assert_eq!(status, StatusCode::BAD_REQUEST);
844 assert_eq!(body["error"], "InvalidRequest");
845}
846
847#[tokio::test]
848async fn limit_below_min_or_above_max_is_400() {
849 let h = Harness::new().await;
850 let app = router(h.state.clone());
851 let cases = [("0", "below"), ("1001", "above")];
852 stream::iter(cases)
853 .for_each(|(limit, label)| {
854 let app = app.clone();
855 async move {
856 let resp = app
857 .oneshot(list_request(
858 "sh.tangled.repo.listIssues",
859 "at://did:plc:abalone",
860 &[("limit", limit)],
861 ))
862 .await
863 .unwrap();
864 let (status, body) = json_response(resp).await;
865 assert_eq!(status, StatusCode::BAD_REQUEST, "limit {label}");
866 assert_eq!(body["error"], "InvalidRequest", "limit {label}");
867 }
868 })
869 .await;
870}
871
872#[tokio::test]
873async fn count_after_remove_source_returns_zero() {
874 let h = Harness::new().await;
875 let subject = at("at://did:plc:abalone");
876 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1");
877 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source);
878 h.edges.remove_source(&source);
879
880 let app = router(h.state.clone());
881 let (_, body) = json_response(
882 app.oneshot(list_request(
883 "sh.tangled.feed.countStars",
884 subject.as_ref(),
885 &[],
886 ))
887 .await
888 .unwrap(),
889 )
890 .await;
891 assert_eq!(body["count"], json!(0));
892 assert_eq!(body["distinctAuthors"], json!(0));
893}
894
895#[tokio::test]
896async fn list_feed_comments_hydrates_end_to_end() {
897 let h = Harness::new().await;
898 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1");
899 let nel = did("did:plc:nel");
900 let rk = rkey("c1");
901 h.add_edge(
902 &nsid("sh.tangled.feed.comment"),
903 &issue_uri,
904 &at(&format!(
905 "at://{}/sh.tangled.feed.comment/{}",
906 nel.as_ref(),
907 rk.as_ref()
908 )),
909 );
910 h.mount(
911 &nel,
912 &nsid("sh.tangled.feed.comment"),
913 &rk,
914 json!({
915 "$type": "sh.tangled.feed.comment",
916 "subject": { "uri": issue_uri.as_ref(), "cid": "bafkqaaa" },
917 "body": { "$type": "sh.tangled.markup.markdown", "text": "thoughts" },
918 "createdAt": "2026-05-01T00:00:00Z"
919 }),
920 )
921 .await;
922
923 let app = router(h.state.clone());
924 let (status, body) = json_response(
925 app.oneshot(list_request(
926 "sh.tangled.feed.listComments",
927 issue_uri.as_ref(),
928 &[],
929 ))
930 .await
931 .unwrap(),
932 )
933 .await;
934 assert_eq!(status, StatusCode::OK);
935 let items = body["items"].as_array().unwrap();
936 assert_eq!(items.len(), 1);
937 assert_eq!(items[0]["value"]["body"]["text"], json!("thoughts"));
938 assert_eq!(
939 items[0]["value"]["subject"]["uri"],
940 json!(issue_uri.as_ref())
941 );
942}
943
944#[tokio::test]
945async fn list_item_cid_is_present() {
946 let h = Harness::new().await;
947 let subject = at("at://did:plc:abalone");
948 let nel = did("did:plc:nel");
949 h.add_edge(
950 &nsid("sh.tangled.feed.star"),
951 &subject,
952 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())),
953 );
954 h.mount(
955 &nel,
956 &nsid("sh.tangled.feed.star"),
957 &rkey("s1"),
958 star_body(&did("did:plc:abalone")),
959 )
960 .await;
961
962 let app = router(h.state.clone());
963 let (_, body) = json_response(
964 app.oneshot(list_request(
965 "sh.tangled.feed.listStars",
966 subject.as_ref(),
967 &[],
968 ))
969 .await
970 .unwrap(),
971 )
972 .await;
973 let item = &body["items"][0];
974 assert!(
975 item.as_object().unwrap().contains_key("cid"),
976 "list items must mirror getRecord output shape and include cid"
977 );
978 assert_eq!(item["cid"], json!(CID));
979}
980
981#[tokio::test]
982async fn count_feed_comments_subjects_on_issue_uri() {
983 let h = Harness::new().await;
984 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1");
985 h.add_edge(
986 &nsid("sh.tangled.feed.comment"),
987 &issue_uri,
988 &at("at://did:plc:nel/sh.tangled.feed.comment/c1"),
989 );
990 h.add_edge(
991 &nsid("sh.tangled.feed.comment"),
992 &issue_uri,
993 &at("at://did:plc:olaren/sh.tangled.feed.comment/c2"),
994 );
995
996 let app = router(h.state.clone());
997 let (status, body) = json_response(
998 app.oneshot(list_request(
999 "sh.tangled.feed.countComments",
1000 issue_uri.as_ref(),
1001 &[],
1002 ))
1003 .await
1004 .unwrap(),
1005 )
1006 .await;
1007 assert_eq!(status, StatusCode::OK);
1008 assert_eq!(body["count"], json!(2));
1009 assert_eq!(body["distinctAuthors"], json!(2));
1010}
1011
1012#[tokio::test]
1013async fn list_item_404_dropped_not_404_for_subject() {
1014 let h = Harness::new().await;
1015 let subject = at("at://did:plc:squid");
1016 let kind = nsid("sh.tangled.repo.issue");
1017 let repo = did("did:plc:squid");
1018 h.add_edge(
1019 &kind,
1020 &subject,
1021 &at("at://did:plc:nel/sh.tangled.repo.issue/live"),
1022 );
1023 h.add_edge(
1024 &kind,
1025 &subject,
1026 &at("at://did:plc:teq/sh.tangled.repo.issue/missing"),
1027 );
1028 h.mount(
1029 &did("did:plc:nel"),
1030 &kind,
1031 &rkey("live"),
1032 issue_body(&repo, "kelp survives"),
1033 )
1034 .await;
1035 Mock::given(method("GET"))
1036 .and(path("/xrpc/com.atproto.repo.getRecord"))
1037 .and(query_param("repo", "did:plc:teq"))
1038 .and(query_param("collection", "sh.tangled.repo.issue"))
1039 .and(query_param("rkey", "missing"))
1040 .respond_with(ResponseTemplate::new(404).set_body_json(json!({
1041 "error": "RecordNotFound",
1042 "message": "could not find record"
1043 })))
1044 .mount(&h.server)
1045 .await;
1046
1047 let app = router(h.state.clone());
1048 let (status, body) = json_response(
1049 app.oneshot(list_request(
1050 "sh.tangled.repo.listIssues",
1051 subject.as_ref(),
1052 &[],
1053 ))
1054 .await
1055 .unwrap(),
1056 )
1057 .await;
1058 assert_eq!(
1059 status,
1060 StatusCode::OK,
1061 "a stale-index 404 drops that item, it must not 404 or 502 the subject's list",
1062 );
1063 let items = body["items"].as_array().expect("items array");
1064 assert_eq!(items.len(), 1, "stale 404 item dropped, live sibling kept");
1065 assert_eq!(
1066 items[0]["uri"].as_str().unwrap(),
1067 "at://did:plc:nel/sh.tangled.repo.issue/live",
1068 );
1069}
1070
1071#[tokio::test]
1072async fn list_item_with_wrong_type_tag_dropped() {
1073 let h = Harness::new().await;
1074 let subject = at("at://did:plc:squid");
1075 let kind = nsid("sh.tangled.feed.star");
1076 h.add_edge(
1077 &kind,
1078 &subject,
1079 &at("at://did:plc:nel/sh.tangled.feed.star/good"),
1080 );
1081 h.add_edge(
1082 &kind,
1083 &subject,
1084 &at("at://did:plc:teq/sh.tangled.feed.star/wrong"),
1085 );
1086 h.mount(
1087 &did("did:plc:nel"),
1088 &kind,
1089 &rkey("good"),
1090 star_body(&did("did:plc:squid")),
1091 )
1092 .await;
1093 Mock::given(method("GET"))
1094 .and(path("/xrpc/com.atproto.repo.getRecord"))
1095 .and(query_param("repo", "did:plc:teq"))
1096 .and(query_param("collection", "sh.tangled.feed.star"))
1097 .and(query_param("rkey", "wrong"))
1098 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1099 "uri": "at://did:plc:teq/sh.tangled.feed.star/wrong",
1100 "cid": CID,
1101 "value": {
1102 "$type": "sh.tangled.feed.reaction",
1103 "createdAt": "2026-05-01T00:00:00Z",
1104 "subject": "at://did:plc:squid"
1105 }
1106 })))
1107 .mount(&h.server)
1108 .await;
1109
1110 let app = router(h.state.clone());
1111 let (status, body) = json_response(
1112 app.oneshot(list_request(
1113 "sh.tangled.feed.listStars",
1114 subject.as_ref(),
1115 &[],
1116 ))
1117 .await
1118 .unwrap(),
1119 )
1120 .await;
1121 assert_eq!(status, StatusCode::OK);
1122 let items = body["items"].as_array().expect("items array");
1123 assert_eq!(items.len(), 1, "wrong-type item dropped, valid star kept");
1124 assert_eq!(
1125 items[0]["uri"].as_str().unwrap(),
1126 "at://did:plc:nel/sh.tangled.feed.star/good",
1127 );
1128}
1129
1130#[tokio::test]
1131async fn list_item_with_mismatched_collection_dropped() {
1132 let h = Harness::new().await;
1133 let subject = at("at://did:plc:squid");
1134 let kind = nsid("sh.tangled.repo.issue");
1135 let repo = did("did:plc:squid");
1136 h.add_edge(
1137 &kind,
1138 &subject,
1139 &at("at://did:plc:nel/sh.tangled.repo.issue/live"),
1140 );
1141 h.add_edge(
1142 &kind,
1143 &subject,
1144 &at("at://did:plc:teq/sh.tangled.feed.star/whelk"),
1145 );
1146 h.mount(
1147 &did("did:plc:nel"),
1148 &kind,
1149 &rkey("live"),
1150 issue_body(&repo, "kelp survives"),
1151 )
1152 .await;
1153
1154 let app = router(h.state.clone());
1155 let (status, body) = json_response(
1156 app.oneshot(list_request(
1157 "sh.tangled.repo.listIssues",
1158 subject.as_ref(),
1159 &[],
1160 ))
1161 .await
1162 .unwrap(),
1163 )
1164 .await;
1165 assert_eq!(
1166 status,
1167 StatusCode::OK,
1168 "a mismatched-collection index edge must not 400 the subject's list",
1169 );
1170 let items = body["items"].as_array().expect("items array");
1171 assert_eq!(
1172 items.len(),
1173 1,
1174 "mismatched-collection edge dropped, live sibling kept"
1175 );
1176 assert_eq!(
1177 items[0]["uri"].as_str().unwrap(),
1178 "at://did:plc:nel/sh.tangled.repo.issue/live",
1179 );
1180}
1181
1182#[tokio::test]
1183async fn bare_did_endpoints_reject_at_uri_subject() {
1184 let h = Harness::new().await;
1185 let app = router(h.state.clone());
1186 let cases = [
1187 "sh.tangled.graph.listFollows",
1188 "sh.tangled.graph.countFollows",
1189 ];
1190 stream::iter(cases)
1191 .for_each(|endpoint| {
1192 let app = app.clone();
1193 async move {
1194 let resp = app
1195 .oneshot(list_request(
1196 endpoint,
1197 "at://did:plc:abalone/sh.tangled.repo/r1",
1198 &[],
1199 ))
1200 .await
1201 .unwrap();
1202 let (status, body) = json_response(resp).await;
1203 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}");
1204 assert_eq!(body["error"], "InvalidRequest", "{endpoint}");
1205 assert!(
1206 body["message"]
1207 .as_str()
1208 .unwrap_or_default()
1209 .contains("bare did"),
1210 "{endpoint}: {}",
1211 body["message"],
1212 );
1213 }
1214 })
1215 .await;
1216}
1217
1218#[tokio::test]
1219async fn repo_pointing_endpoints_reject_at_uri_subject() {
1220 let h = Harness::new().await;
1221 let app = router(h.state.clone());
1222 let cases = [
1223 "sh.tangled.repo.listIssues",
1224 "sh.tangled.repo.countIssues",
1225 "sh.tangled.repo.listPulls",
1226 "sh.tangled.repo.countPulls",
1227 "sh.tangled.repo.listArtifacts",
1228 "sh.tangled.repo.countArtifacts",
1229 ];
1230 stream::iter(cases)
1231 .for_each(|endpoint| {
1232 let app = app.clone();
1233 async move {
1234 let resp = app
1235 .oneshot(list_request(
1236 endpoint,
1237 "at://did:plc:abalone/sh.tangled.repo/r1",
1238 &[],
1239 ))
1240 .await
1241 .unwrap();
1242 let (status, body) = json_response(resp).await;
1243 assert_eq!(
1244 status,
1245 StatusCode::BAD_REQUEST,
1246 "{endpoint} must reject rkey-form subjects since rkeys are unstable; clients must send the repoDID",
1247 );
1248 assert!(
1249 body["message"]
1250 .as_str()
1251 .unwrap_or_default()
1252 .contains("bare did"),
1253 "{endpoint}: {}",
1254 body["message"],
1255 );
1256 }
1257 })
1258 .await;
1259}
1260
1261#[tokio::test]
1262async fn repo_pointing_endpoints_accept_bare_did() {
1263 let h = Harness::new().await;
1264 let app = router(h.state.clone());
1265 let cases = [
1266 "sh.tangled.repo.listIssues",
1267 "sh.tangled.repo.countIssues",
1268 "sh.tangled.repo.listPulls",
1269 "sh.tangled.repo.countPulls",
1270 "sh.tangled.repo.listArtifacts",
1271 "sh.tangled.repo.countArtifacts",
1272 ];
1273 stream::iter(cases)
1274 .for_each(|endpoint| {
1275 let app = app.clone();
1276 async move {
1277 let resp = app
1278 .oneshot(list_request(endpoint, "did:plc:abalone", &[]))
1279 .await
1280 .unwrap();
1281 let (status, _body) = json_response(resp).await;
1282 assert_eq!(status, StatusCode::OK, "{endpoint} must accept bare did");
1283 }
1284 })
1285 .await;
1286}
1287
1288#[tokio::test]
1289async fn feed_comment_endpoints_reject_bare_did_or_wrong_collection() {
1290 let h = Harness::new().await;
1291 let app = router(h.state.clone());
1292 let endpoints = [
1293 "sh.tangled.feed.listComments",
1294 "sh.tangled.feed.countComments",
1295 ];
1296 let inputs = [
1297 "at://did:plc:abalone",
1298 "at://did:plc:abalone/sh.tangled.repo/r1",
1299 ];
1300 let cases = endpoints
1301 .iter()
1302 .copied()
1303 .flat_map(|endpoint| inputs.iter().copied().map(move |input| (endpoint, input)));
1304 stream::iter(cases)
1305 .for_each(|(endpoint, input)| {
1306 let app = app.clone();
1307 async move {
1308 let resp = app
1309 .oneshot(list_request(endpoint, input, &[]))
1310 .await
1311 .unwrap();
1312 let (status, body) = json_response(resp).await;
1313 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint} input={input}");
1314 let msg = body["message"].as_str().unwrap_or_default();
1315 assert!(
1316 msg.contains("sh.tangled.repo.issue")
1317 && msg.contains("sh.tangled.repo.pull")
1318 && msg.contains("sh.tangled.string"),
1319 "{endpoint} input={input}: {msg}",
1320 );
1321 }
1322 })
1323 .await;
1324}
1325
1326#[tokio::test]
1327async fn star_endpoints_reject_unrelated_collection() {
1328 let h = Harness::new().await;
1329 let app = router(h.state.clone());
1330 let endpoints = ["sh.tangled.feed.listStars", "sh.tangled.feed.countStars"];
1331 stream::iter(endpoints)
1332 .for_each(|endpoint| {
1333 let app = app.clone();
1334 async move {
1335 let resp = app
1336 .oneshot(list_request(
1337 endpoint,
1338 "at://did:plc:abalone/sh.tangled.knot/k1",
1339 &[],
1340 ))
1341 .await
1342 .unwrap();
1343 let (status, body) = json_response(resp).await;
1344 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}");
1345 let msg = body["message"].as_str().unwrap_or_default();
1346 assert!(msg.contains("sh.tangled.string"), "{endpoint}: {msg}",);
1347 }
1348 })
1349 .await;
1350}
1351
1352#[tokio::test]
1353async fn star_endpoints_reject_repo_uri_subject() {
1354 let h = Harness::new().await;
1355 let app = router(h.state.clone());
1356 let resp = app
1357 .oneshot(list_request(
1358 "sh.tangled.feed.countStars",
1359 "at://did:plc:abalone/sh.tangled.repo/r1",
1360 &[],
1361 ))
1362 .await
1363 .unwrap();
1364 let (status, body) = json_response(resp).await;
1365 assert_eq!(
1366 status,
1367 StatusCode::BAD_REQUEST,
1368 "rkey-form repo URI must be rejected; clients must send the repoDID directly",
1369 );
1370 let msg = body["message"].as_str().unwrap_or_default();
1371 assert!(msg.contains("sh.tangled.string"), "{msg}");
1372}
1373
1374#[tokio::test]
1375async fn star_endpoints_accept_string_subject_form() {
1376 let h = Harness::new().await;
1377 let app = router(h.state.clone());
1378 let resp = app
1379 .oneshot(list_request(
1380 "sh.tangled.feed.countStars",
1381 "at://did:plc:abalone/sh.tangled.string/k1",
1382 &[],
1383 ))
1384 .await
1385 .unwrap();
1386 let (status, body) = json_response(resp).await;
1387 assert_eq!(status, StatusCode::OK);
1388 assert_eq!(body["count"], json!(0));
1389}
1390
1391#[tokio::test]
1392async fn list_after_remove_source_returns_empty_items() {
1393 let h = Harness::new().await;
1394 let subject = at("at://did:plc:abalone");
1395 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1");
1396 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source);
1397 h.edges.remove_source(&source);
1398
1399 let app = router(h.state.clone());
1400 let (status, body) = json_response(
1401 app.oneshot(list_request(
1402 "sh.tangled.feed.listStars",
1403 subject.as_ref(),
1404 &[],
1405 ))
1406 .await
1407 .unwrap(),
1408 )
1409 .await;
1410 assert_eq!(status, StatusCode::OK);
1411 assert_eq!(body["items"], json!([]));
1412 assert!(body["cursor"].is_null());
1413}
1414
1415#[tokio::test]
1416async fn list_pulls_hydrates_via_slingshot_when_edges_present() {
1417 let h = Harness::new().await;
1418 let target_did = did("did:plc:abalone");
1419 let subject = at(&format!("at://{}", target_did.as_ref()));
1420 let source_did = did("did:plc:nel");
1421 let rk = rkey("p1");
1422 h.add_edge(
1423 &nsid("sh.tangled.repo.pull"),
1424 &subject,
1425 &at(&format!(
1426 "at://{}/sh.tangled.repo.pull/{}",
1427 source_did.as_ref(),
1428 rk.as_ref()
1429 )),
1430 );
1431 h.mount(
1432 &source_did,
1433 &nsid("sh.tangled.repo.pull"),
1434 &rk,
1435 json!({
1436 "$type": "sh.tangled.repo.pull",
1437 "title": "ship it",
1438 "createdAt": "2026-05-01T00:00:00Z",
1439 "rounds": [],
1440 "target": {"repo": target_did.as_ref(), "branch": "main"},
1441 }),
1442 )
1443 .await;
1444 let app = router(h.state.clone());
1445 let (status, body) = json_response(
1446 app.oneshot(list_request(
1447 "sh.tangled.repo.listPulls",
1448 subject.as_ref(),
1449 &[],
1450 ))
1451 .await
1452 .unwrap(),
1453 )
1454 .await;
1455 assert_eq!(status, StatusCode::OK);
1456 let items = body["items"].as_array().unwrap();
1457 assert_eq!(items.len(), 1);
1458 assert_eq!(items[0]["value"]["title"], json!("ship it"));
1459 assert_eq!(
1460 items[0]["value"]["target"]["repo"],
1461 json!(target_did.as_ref())
1462 );
1463}
1464
1465#[tokio::test]
1466async fn count_pulls_returns_distinct_authors() {
1467 let h = Harness::new().await;
1468 let subject = at("at://did:plc:abalone");
1469 h.add_edge(
1470 &nsid("sh.tangled.repo.pull"),
1471 &subject,
1472 &at("at://did:plc:nel/sh.tangled.repo.pull/p1"),
1473 );
1474 h.add_edge(
1475 &nsid("sh.tangled.repo.pull"),
1476 &subject,
1477 &at("at://did:plc:olaren/sh.tangled.repo.pull/p2"),
1478 );
1479 h.add_edge(
1480 &nsid("sh.tangled.repo.pull"),
1481 &subject,
1482 &at("at://did:plc:nel/sh.tangled.repo.pull/p3"),
1483 );
1484 let app = router(h.state.clone());
1485 let (_, body) = json_response(
1486 app.oneshot(list_request(
1487 "sh.tangled.repo.countPulls",
1488 subject.as_ref(),
1489 &[],
1490 ))
1491 .await
1492 .unwrap(),
1493 )
1494 .await;
1495 assert_eq!(body["count"], json!(3));
1496 assert_eq!(body["distinctAuthors"], json!(2));
1497}
1498
1499#[tokio::test]
1500async fn extractor_to_xrpc_round_trip_for_star() {
1501 let h = Harness::new().await;
1502 let subject_did = did("did:plc:abalone");
1503 let source_did = did("did:plc:nel");
1504 let rk = rkey("s1");
1505 let source = at(&format!(
1506 "at://{}/sh.tangled.feed.star/{}",
1507 source_did.as_ref(),
1508 rk.as_ref()
1509 ));
1510 let body = star_body(&subject_did);
1511 let parsed =
1512 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.feed.star"), body.clone())
1513 .expect("parse star record");
1514 parsed
1515 .extract_edges(&source)
1516 .expect("extract")
1517 .into_iter()
1518 .for_each(|e| h.edges.add(e));
1519 h.mount(&source_did, &nsid("sh.tangled.feed.star"), &rk, body)
1520 .await;
1521
1522 let app = router(h.state.clone());
1523 let (status, json) = json_response(
1524 app.oneshot(list_request(
1525 "sh.tangled.feed.listStars",
1526 &format!("at://{}", subject_did.as_ref()),
1527 &[],
1528 ))
1529 .await
1530 .unwrap(),
1531 )
1532 .await;
1533 assert_eq!(
1534 status,
1535 StatusCode::OK,
1536 "extractor key must match handler subject, body was {json}",
1537 );
1538 let items = json["items"].as_array().unwrap();
1539 assert_eq!(items.len(), 1, "expected exactly one star edge");
1540 assert_eq!(
1541 items[0]["value"]["subject"]["did"],
1542 json!(subject_did.as_ref())
1543 );
1544}
1545
1546#[tokio::test]
1547async fn list_issues_includes_state_comment_count_and_state_updated_at() {
1548 let h = Harness::new().await;
1549 let repo = did("did:plc:limpet");
1550 let subject = at(&format!("at://{}", repo.as_ref()));
1551 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1552 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1553 h.mount(
1554 &did("did:plc:nel"),
1555 &nsid("sh.tangled.repo.issue"),
1556 &rkey("i1"),
1557 issue_body(&repo, "hi"),
1558 )
1559 .await;
1560 h.add_edge(
1561 &nsid("sh.tangled.feed.comment"),
1562 &issue_uri,
1563 &at("at://did:plc:olaren/sh.tangled.feed.comment/c1"),
1564 );
1565 h.add_edge(
1566 &nsid("sh.tangled.feed.comment"),
1567 &issue_uri,
1568 &at("at://did:plc:teq/sh.tangled.feed.comment/c2"),
1569 );
1570
1571 h.state.issue_states.upsert(
1572 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"),
1573 issue_uri.clone(),
1574 1_777_593_600_000_000,
1575 IssueStateKind::Open,
1576 );
1577 h.state.issue_states.upsert(
1578 at("at://did:plc:nel/sh.tangled.repo.issue.state/s2"),
1579 issue_uri.clone(),
1580 1_777_593_700_000_000,
1581 IssueStateKind::Closed,
1582 );
1583
1584 let app = router(h.state.clone());
1585 let resp = app
1586 .oneshot(list_request(
1587 "sh.tangled.repo.listIssues",
1588 subject.as_ref(),
1589 &[],
1590 ))
1591 .await
1592 .unwrap();
1593 let (status, body) = json_response(resp).await;
1594 assert_eq!(status, StatusCode::OK);
1595 let item = &body["items"][0];
1596 assert_eq!(item["state"], json!("closed"));
1597 assert_eq!(item["commentCount"], json!(2));
1598 let updated = item["stateUpdatedAt"]
1599 .as_str()
1600 .expect("stateUpdatedAt must serialize as RFC3339 string");
1601 assert!(
1602 updated.starts_with("2026-"),
1603 "expected 2026 timestamp, got {updated}"
1604 );
1605}
1606
1607#[tokio::test]
1608async fn list_issues_defaults_to_open_when_no_state_record() {
1609 let h = Harness::new().await;
1610 let repo = did("did:plc:limpet");
1611 let subject = at(&format!("at://{}", repo.as_ref()));
1612 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1613 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1614 h.mount(
1615 &did("did:plc:nel"),
1616 &nsid("sh.tangled.repo.issue"),
1617 &rkey("i1"),
1618 issue_body(&repo, "no state yet"),
1619 )
1620 .await;
1621
1622 let app = router(h.state.clone());
1623 let (_status, body) = json_response(
1624 app.oneshot(list_request(
1625 "sh.tangled.repo.listIssues",
1626 subject.as_ref(),
1627 &[],
1628 ))
1629 .await
1630 .unwrap(),
1631 )
1632 .await;
1633 let item = &body["items"][0];
1634 assert_eq!(
1635 item["state"],
1636 json!("open"),
1637 "absent state record defaults to open"
1638 );
1639 assert!(
1640 item.get("stateUpdatedAt").is_none(),
1641 "stateUpdatedAt must be absent without a state record",
1642 );
1643 assert_eq!(item["commentCount"], json!(0));
1644}
1645
1646#[tokio::test]
1647async fn list_issues_author_filter_restricts_to_matching_did() {
1648 let h = Harness::new().await;
1649 let repo = did("did:plc:limpet");
1650 let subject = at(&format!("at://{}", repo.as_ref()));
1651 let owners = [
1652 ("did:plc:nel", "n1"),
1653 ("did:plc:nel", "n2"),
1654 ("did:plc:olaren", "o1"),
1655 ("did:plc:olaren", "o2"),
1656 ];
1657 stream::iter(owners)
1658 .for_each(|(d, r)| {
1659 let h = &h;
1660 let subject = subject.clone();
1661 let repo = repo.clone();
1662 async move {
1663 let d_did = did(d);
1664 let rk = rkey(r);
1665 h.add_edge(
1666 &nsid("sh.tangled.repo.issue"),
1667 &subject,
1668 &at(&format!(
1669 "at://{}/sh.tangled.repo.issue/{}",
1670 d_did.as_ref(),
1671 rk.as_ref()
1672 )),
1673 );
1674 h.mount(
1675 &d_did,
1676 &nsid("sh.tangled.repo.issue"),
1677 &rk,
1678 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
1679 )
1680 .await;
1681 }
1682 })
1683 .await;
1684
1685 let app = router(h.state.clone());
1686 let (status, body) = json_response(
1687 app.oneshot(list_request(
1688 "sh.tangled.repo.listIssues",
1689 subject.as_ref(),
1690 &[("author", "did:plc:nel")],
1691 ))
1692 .await
1693 .unwrap(),
1694 )
1695 .await;
1696 assert_eq!(status, StatusCode::OK);
1697 let items = body["items"].as_array().expect("items array");
1698 assert_eq!(items.len(), 2, "two issues authored by nel");
1699 let all_nel = items
1700 .iter()
1701 .all(|i| i["uri"].as_str().unwrap().starts_with("at://did:plc:nel/"));
1702 assert!(all_nel, "every returned uri must be authored by nel");
1703}
1704
1705#[tokio::test]
1706async fn list_issues_invalid_author_returns_400() {
1707 let h = Harness::new().await;
1708 let subject = "at://did:plc:limpet".to_owned();
1709 let app = router(h.state.clone());
1710 let resp = app
1711 .oneshot(list_request(
1712 "sh.tangled.repo.listIssues",
1713 &subject,
1714 &[("author", "not-a-did")],
1715 ))
1716 .await
1717 .unwrap();
1718 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1719}
1720
1721#[tokio::test]
1722async fn list_pulls_includes_merged_state_and_comment_count() {
1723 let h = Harness::new().await;
1724 let repo = did("did:plc:limpet");
1725 let subject = at(&format!("at://{}", repo.as_ref()));
1726 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1");
1727 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri);
1728 h.mount(
1729 &did("did:plc:nel"),
1730 &nsid("sh.tangled.repo.pull"),
1731 &rkey("p1"),
1732 pull_body(&repo, "fix bug"),
1733 )
1734 .await;
1735 h.add_edge(
1736 &nsid("sh.tangled.feed.comment"),
1737 &pull_uri,
1738 &at("at://did:plc:teq/sh.tangled.feed.comment/c1"),
1739 );
1740 h.state.pull_statuses.upsert(
1741 at("at://did:plc:nel/sh.tangled.repo.pull.status/s1"),
1742 pull_uri.clone(),
1743 1_777_593_600_000_000,
1744 PullStatusKind::Open,
1745 );
1746 h.state.pull_statuses.upsert(
1747 at("at://did:plc:nel/sh.tangled.repo.pull.status/s2"),
1748 pull_uri.clone(),
1749 1_777_593_800_000_000,
1750 PullStatusKind::Merged,
1751 );
1752
1753 let app = router(h.state.clone());
1754 let (status, body) = json_response(
1755 app.oneshot(list_request(
1756 "sh.tangled.repo.listPulls",
1757 subject.as_ref(),
1758 &[],
1759 ))
1760 .await
1761 .unwrap(),
1762 )
1763 .await;
1764 assert_eq!(status, StatusCode::OK);
1765 let item = &body["items"][0];
1766 assert_eq!(item["state"], json!("merged"));
1767 assert_eq!(item["commentCount"], json!(1));
1768}
1769
1770#[tokio::test]
1771async fn list_issues_state_filter_open_includes_records_without_state() {
1772 let h = Harness::new().await;
1773 let repo = did("did:plc:limpet");
1774 let subject = at(&format!("at://{}", repo.as_ref()));
1775 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1776 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1777 h.mount(
1778 &did("did:plc:nel"),
1779 &nsid("sh.tangled.repo.issue"),
1780 &rkey("i1"),
1781 issue_body(&repo, "fresh"),
1782 )
1783 .await;
1784
1785 let app = router(h.state.clone());
1786 let (status, body) = json_response(
1787 app.oneshot(list_request(
1788 "sh.tangled.repo.listIssues",
1789 subject.as_ref(),
1790 &[("state", "open")],
1791 ))
1792 .await
1793 .unwrap(),
1794 )
1795 .await;
1796 assert_eq!(status, StatusCode::OK);
1797 let items = body["items"].as_array().expect("items array");
1798 assert_eq!(
1799 items.len(),
1800 1,
1801 "absent state record still matches state=open"
1802 );
1803}
1804
1805#[tokio::test]
1806async fn list_issues_state_filter_ignores_third_party_state_source() {
1807 let h = Harness::new().await;
1808 let repo = did("did:plc:limpet");
1809 let subject = at(&format!("at://{}", repo.as_ref()));
1810 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1811 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1812 h.mount(
1813 &did("did:plc:nel"),
1814 &nsid("sh.tangled.repo.issue"),
1815 &rkey("i1"),
1816 issue_body(&repo, "open issue"),
1817 )
1818 .await;
1819 h.state.issue_states.upsert(
1820 at("at://did:plc:nautilus/sh.tangled.repo.issue.state/spoof"),
1821 issue_uri.clone(),
1822 1_777_593_800_000_000,
1823 IssueStateKind::Closed,
1824 );
1825
1826 let app = router(h.state.clone());
1827 let (status, body) = json_response(
1828 app.oneshot(list_request(
1829 "sh.tangled.repo.listIssues",
1830 subject.as_ref(),
1831 &[("state", "open")],
1832 ))
1833 .await
1834 .unwrap(),
1835 )
1836 .await;
1837 assert_eq!(status, StatusCode::OK);
1838 let items = body["items"].as_array().expect("items array");
1839 assert_eq!(
1840 items.len(),
1841 1,
1842 "third-party Closed record must not flip filter result for state=open",
1843 );
1844 assert_eq!(items[0]["state"], json!("open"));
1845 assert!(
1846 items[0].get("stateUpdatedAt").is_none(),
1847 "third-party state source must not surface stateUpdatedAt",
1848 );
1849}
1850
1851#[tokio::test]
1852async fn list_pulls_status_filter_ignores_third_party_status_source() {
1853 let h = Harness::new().await;
1854 let repo = did("did:plc:limpet");
1855 let subject = at(&format!("at://{}", repo.as_ref()));
1856 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1");
1857 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri);
1858 h.mount(
1859 &did("did:plc:nel"),
1860 &nsid("sh.tangled.repo.pull"),
1861 &rkey("p1"),
1862 pull_body(&repo, "wip"),
1863 )
1864 .await;
1865 h.state.pull_statuses.upsert(
1866 at("at://did:plc:nautilus/sh.tangled.repo.pull.status/spoof"),
1867 pull_uri.clone(),
1868 1_777_593_800_000_000,
1869 PullStatusKind::Merged,
1870 );
1871
1872 let app = router(h.state.clone());
1873 let (status, body) = json_response(
1874 app.oneshot(list_request(
1875 "sh.tangled.repo.listPulls",
1876 subject.as_ref(),
1877 &[("status", "merged")],
1878 ))
1879 .await
1880 .unwrap(),
1881 )
1882 .await;
1883 assert_eq!(status, StatusCode::OK);
1884 let items = body["items"].as_array().expect("items array");
1885 assert_eq!(
1886 items.len(),
1887 0,
1888 "third-party Merged record must not satisfy status=merged"
1889 );
1890}
1891
1892#[tokio::test]
1893async fn list_issues_state_filter_accepts_repo_owner_state_source() {
1894 let h = Harness::new().await;
1895 let repo_owner = did("did:plc:limpet");
1896 let subject = at(&format!("at://{}", repo_owner.as_ref()));
1897 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1898 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1899 h.mount(
1900 &did("did:plc:nel"),
1901 &nsid("sh.tangled.repo.issue"),
1902 &rkey("i1"),
1903 issue_body(&repo_owner, "owner closed"),
1904 )
1905 .await;
1906 h.state.issue_states.upsert(
1907 at("at://did:plc:limpet/sh.tangled.repo.issue.state/legit"),
1908 issue_uri.clone(),
1909 1_777_593_800_000_000,
1910 IssueStateKind::Closed,
1911 );
1912
1913 let app = router(h.state.clone());
1914 let (status, body) = json_response(
1915 app.oneshot(list_request(
1916 "sh.tangled.repo.listIssues",
1917 subject.as_ref(),
1918 &[("state", "closed")],
1919 ))
1920 .await
1921 .unwrap(),
1922 )
1923 .await;
1924 assert_eq!(status, StatusCode::OK);
1925 let items = body["items"].as_array().expect("items array");
1926 assert_eq!(
1927 items.len(),
1928 1,
1929 "repo-owner state record must satisfy state=closed"
1930 );
1931 assert_eq!(items[0]["state"], json!("closed"));
1932}
1933
1934#[tokio::test]
1935async fn list_issues_order_asc_returns_oldest_first() {
1936 let h = Harness::new().await;
1937 let repo = did("did:plc:limpet");
1938 let subject = at(&format!("at://{}", repo.as_ref()));
1939 let rkeys = ["a", "b", "c"];
1940 stream::iter(rkeys)
1941 .for_each(|r| {
1942 let h = &h;
1943 let subject = subject.clone();
1944 let repo = repo.clone();
1945 async move {
1946 let rk = rkey(r);
1947 let issue_uri = at(&format!(
1948 "at://did:plc:nel/sh.tangled.repo.issue/{}",
1949 rk.as_ref()
1950 ));
1951 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1952 h.mount(
1953 &did("did:plc:nel"),
1954 &nsid("sh.tangled.repo.issue"),
1955 &rk,
1956 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
1957 )
1958 .await;
1959 }
1960 })
1961 .await;
1962
1963 let app = router(h.state.clone());
1964 let (_, asc) = json_response(
1965 app.clone()
1966 .oneshot(list_request(
1967 "sh.tangled.repo.listIssues",
1968 subject.as_ref(),
1969 &[("order", "asc")],
1970 ))
1971 .await
1972 .unwrap(),
1973 )
1974 .await;
1975 let (_, desc) = json_response(
1976 app.oneshot(list_request(
1977 "sh.tangled.repo.listIssues",
1978 subject.as_ref(),
1979 &[("order", "desc")],
1980 ))
1981 .await
1982 .unwrap(),
1983 )
1984 .await;
1985 let asc_uris: Vec<_> = asc["items"]
1986 .as_array()
1987 .unwrap()
1988 .iter()
1989 .map(|i| i["uri"].as_str().unwrap().to_owned())
1990 .collect();
1991 let desc_uris: Vec<_> = desc["items"]
1992 .as_array()
1993 .unwrap()
1994 .iter()
1995 .map(|i| i["uri"].as_str().unwrap().to_owned())
1996 .collect();
1997 let mut reversed = asc_uris.clone();
1998 reversed.reverse();
1999 assert_eq!(asc_uris.len(), 3);
2000 assert_eq!(desc_uris, reversed, "desc must be exact reverse of asc");
2001}
2002
2003#[tokio::test]
2004async fn list_issues_by_state_filter_narrows_results() {
2005 let h = Harness::new().await;
2006 let author = did("did:plc:nel");
2007 let repo = did("did:plc:limpet");
2008 let open_uri = at("at://did:plc:nel/sh.tangled.repo.issue/open1");
2009 let closed_uri = at("at://did:plc:nel/sh.tangled.repo.issue/closed1");
2010 let author_subject = at(&format!("at://{}", author.as_ref()));
2011 h.edges.add(Edge {
2012 kind: nsid("sh.tangled.repo.issue.by"),
2013 subject: SubjectRef::Did(author.clone()),
2014 source: open_uri.clone(),
2015 sort_micros: next_sort_micros(),
2016 });
2017 h.edges.add(Edge {
2018 kind: nsid("sh.tangled.repo.issue.by"),
2019 subject: SubjectRef::Did(author.clone()),
2020 source: closed_uri.clone(),
2021 sort_micros: next_sort_micros(),
2022 });
2023 h.mount(
2024 &author,
2025 &nsid("sh.tangled.repo.issue"),
2026 &rkey("open1"),
2027 issue_body(&repo, "still open"),
2028 )
2029 .await;
2030 h.mount(
2031 &author,
2032 &nsid("sh.tangled.repo.issue"),
2033 &rkey("closed1"),
2034 issue_body(&repo, "shut"),
2035 )
2036 .await;
2037 h.state.issue_states.upsert(
2038 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"),
2039 closed_uri.clone(),
2040 1_777_593_800_000_000,
2041 IssueStateKind::Closed,
2042 );
2043
2044 let app = router(h.state.clone());
2045 let (status, body) = json_response(
2046 app.oneshot(list_request(
2047 "sh.tangled.repo.listIssuesBy",
2048 author_subject.as_ref(),
2049 &[("state", "closed")],
2050 ))
2051 .await
2052 .unwrap(),
2053 )
2054 .await;
2055 assert_eq!(status, StatusCode::OK);
2056 let items = body["items"].as_array().expect("items array");
2057 assert_eq!(
2058 items.len(),
2059 1,
2060 "only the closed issue survives state=closed"
2061 );
2062 assert_eq!(items[0]["uri"], json!(closed_uri.as_ref()));
2063}
2064
2065#[tokio::test]
2066async fn knot_owned_member_is_synthesized_without_slingshot() {
2067 let harness = Harness::new().await;
2068 let knot = bobbin_types::knot_acl::host_to_knot_did("kt.oyster.cafe").unwrap();
2069 let subject = did("did:plc:boltless");
2070 let created = chrono::DateTime::parse_from_rfc3339("2026-06-01T00:00:00Z").unwrap();
2071 let micros = created.timestamp_micros() as u64;
2072 let (source, edges) = bobbin_types::knot_acl::member_upsert(&knot, &subject, micros).unwrap();
2073 harness.edges.upsert_source(&source, edges);
2074 harness.promote_ready(1, 1);
2075
2076 let (status, body) = json_response(
2077 router(harness.state.clone())
2078 .oneshot(list_request(
2079 "sh.tangled.knot.listMembers",
2080 subject.as_ref(),
2081 &[],
2082 ))
2083 .await
2084 .unwrap(),
2085 )
2086 .await;
2087
2088 assert_eq!(status, StatusCode::OK);
2089 let items = body["items"].as_array().expect("items array");
2090 assert_eq!(
2091 items.len(),
2092 1,
2093 "synthesized member must hydrate with no slingshot mock mounted"
2094 );
2095 assert_eq!(items[0]["uri"], json!(source.as_ref()));
2096 assert!(items[0]["cid"].is_null());
2097 assert_eq!(items[0]["value"]["domain"], json!("kt.oyster.cafe"));
2098 assert_eq!(items[0]["value"]["subject"], json!("did:plc:boltless"));
2099 let got = chrono::DateTime::parse_from_rfc3339(
2100 items[0]["value"]["createdAt"]
2101 .as_str()
2102 .expect("createdAt string"),
2103 )
2104 .unwrap();
2105 assert_eq!(got.timestamp_micros(), micros as i64);
2106}
2107
2108#[tokio::test]
2109async fn knot_owned_member_lists_by_knot_did() {
2110 let harness = Harness::new().await;
2111 let knot = bobbin_types::knot_acl::host_to_knot_did("kt.oyster.cafe").unwrap();
2112 let subject = did("did:plc:boltless");
2113 let created = chrono::DateTime::parse_from_rfc3339("2026-06-01T00:00:00Z").unwrap();
2114 let micros = created.timestamp_micros() as u64;
2115 let (source, edges) = bobbin_types::knot_acl::member_upsert(&knot, &subject, micros).unwrap();
2116 harness.edges.upsert_source(&source, edges);
2117 harness.promote_ready(1, 1);
2118
2119 let (status, body) = json_response(
2120 router(harness.state.clone())
2121 .oneshot(list_request(
2122 "sh.tangled.knot.listMembersBy",
2123 knot.as_ref(),
2124 &[],
2125 ))
2126 .await
2127 .unwrap(),
2128 )
2129 .await;
2130
2131 assert_eq!(status, StatusCode::OK);
2132 let items = body["items"].as_array().expect("items array");
2133 assert_eq!(items.len(), 1);
2134 assert_eq!(items[0]["uri"], json!(source.as_ref()));
2135 assert!(items[0]["cid"].is_null());
2136 assert_eq!(items[0]["value"]["domain"], json!("kt.oyster.cafe"));
2137 assert_eq!(items[0]["value"]["subject"], json!("did:plc:boltless"));
2138}
2139
2140#[tokio::test]
2141async fn knot_owned_collaborator_is_synthesized_without_slingshot() {
2142 let harness = Harness::new().await;
2143 let repo = did("did:plc:scallop");
2144 let subject = did("did:plc:olaren");
2145 let created = chrono::DateTime::parse_from_rfc3339("2026-06-03T12:00:00Z").unwrap();
2146 let micros = created.timestamp_micros() as u64;
2147 let (source, edges) =
2148 bobbin_types::knot_acl::collaborator_upsert(&repo, &subject, micros).unwrap();
2149 harness.edges.upsert_source(&source, edges);
2150 harness.promote_ready(1, 1);
2151
2152 let (status, body) = json_response(
2153 router(harness.state.clone())
2154 .oneshot(list_request(
2155 "sh.tangled.repo.listCollaborators",
2156 repo.as_ref(),
2157 &[],
2158 ))
2159 .await
2160 .unwrap(),
2161 )
2162 .await;
2163
2164 assert_eq!(status, StatusCode::OK);
2165 let items = body["items"].as_array().expect("items array");
2166 assert_eq!(items.len(), 1);
2167 assert_eq!(items[0]["uri"], json!(source.as_ref()));
2168 assert!(items[0]["cid"].is_null());
2169 assert_eq!(items[0]["value"]["repo"], json!("did:plc:scallop"));
2170 assert_eq!(items[0]["value"]["subject"], json!("did:plc:olaren"));
2171}
2172
2173#[tokio::test]
2174async fn knot_owned_collaborator_lists_by_subject_did() {
2175 let harness = Harness::new().await;
2176 let repo = did("did:plc:scallop");
2177 let subject = did("did:plc:olaren");
2178 let created = chrono::DateTime::parse_from_rfc3339("2026-06-03T12:00:00Z").unwrap();
2179 let micros = created.timestamp_micros() as u64;
2180 let (source, edges) =
2181 bobbin_types::knot_acl::collaborator_upsert(&repo, &subject, micros).unwrap();
2182 harness.edges.upsert_source(&source, edges);
2183 harness.promote_ready(1, 1);
2184
2185 let (status, body) = json_response(
2186 router(harness.state.clone())
2187 .oneshot(list_request(
2188 "sh.tangled.repo.listCollaboratorsBy",
2189 subject.as_ref(),
2190 &[],
2191 ))
2192 .await
2193 .unwrap(),
2194 )
2195 .await;
2196
2197 assert_eq!(status, StatusCode::OK);
2198 let items = body["items"].as_array().expect("items array");
2199 assert_eq!(items.len(), 1);
2200 assert_eq!(items[0]["uri"], json!(source.as_ref()));
2201 assert!(items[0]["cid"].is_null());
2202 assert_eq!(items[0]["value"]["repo"], json!("did:plc:scallop"));
2203 assert_eq!(items[0]["value"]["subject"], json!("did:plc:olaren"));
2204}