This repository has no description
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{
5 Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, PageToken, PullStatusKind,
6 StateIndex,
7};
8use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
9use bobbin_record_lru::{CacheCapacity, LruRecordStore};
10use bobbin_resolver::RepoIdResolver;
11use bobbin_runtime::{RuntimeHasher, SystemClock};
12use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
13use bobbin_slingshot_client::SlingshotClient;
14use bobbin_types::edges::Edge;
15use bobbin_types::ids::SubjectRef;
16use bobbin_xrpc::{AppState, router};
17use futures::stream::{self, StreamExt};
18use http::{Request, StatusCode};
19use jacquard_common::DefaultStr;
20use jacquard_common::types::did::Did;
21use jacquard_common::types::nsid::Nsid;
22use jacquard_common::types::recordkey::Rkey;
23use jacquard_common::types::string::AtUri;
24use serde_json::{Value, json};
25use tower::ServiceExt;
26use url::Url;
27use url::form_urlencoded::byte_serialize;
28use wiremock::matchers::{method, path, query_param};
29use wiremock::{Mock, MockServer, ResponseTemplate};
30
31const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
32
33fn at(s: &str) -> AtUri<DefaultStr> {
34 AtUri::new_owned(s).unwrap()
35}
36
37fn did(s: &str) -> Did<DefaultStr> {
38 Did::new_owned(s).unwrap()
39}
40
41fn rkey(s: &str) -> Rkey<DefaultStr> {
42 Rkey::new_owned(s).unwrap()
43}
44
45fn nsid(s: &'static str) -> Nsid<DefaultStr> {
46 Nsid::new_static(s).unwrap()
47}
48
49fn subj(s: &str) -> SubjectRef {
50 Did::<DefaultStr>::new_owned(s)
51 .map(SubjectRef::Did)
52 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap()))
53}
54
55struct Harness {
56 server: MockServer,
57 edges: Arc<EdgeStore>,
58 coverage: Arc<CoverageWatch>,
59 state: AppState,
60}
61
62static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
63
64fn next_sort_micros() -> u64 {
65 EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
66}
67
68impl Harness {
69 async fn new() -> Self {
70 let server = MockServer::start().await;
71 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default()));
72 let issue_states = Arc::new(StateIndex::new(RuntimeHasher::default()));
73 let pull_statuses = Arc::new(StateIndex::new(RuntimeHasher::default()));
74 let coverage = Arc::new(CoverageWatch::new());
75 let state = AppState::new(
76 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
77 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
78 edges.clone(),
79 issue_states.clone(),
80 pull_statuses.clone(),
81 coverage.clone(),
82 Arc::new(
83 KnotProxy::new(
84 KnotProxyConfig::default(),
85 KnotHttpConfig::default(),
86 Arc::new(SystemClock::new()),
87 RuntimeHasher::default(),
88 )
89 .unwrap(),
90 ),
91 Arc::new(
92 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
93 ) as Arc<dyn SearchReader>,
94 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
95 );
96 Self {
97 server,
98 edges,
99 coverage,
100 state,
101 }
102 }
103
104 fn add_edge(
105 &self,
106 kind: &Nsid<DefaultStr>,
107 subject: &AtUri<DefaultStr>,
108 source: &AtUri<DefaultStr>,
109 ) {
110 self.edges.add(Edge {
111 kind: kind.clone(),
112 subject: subj(subject.as_ref()),
113 source: source.clone(),
114 sort_micros: next_sort_micros(),
115 });
116 }
117
118 async fn mount(
119 &self,
120 did: &Did<DefaultStr>,
121 collection: &Nsid<DefaultStr>,
122 rkey: &Rkey<DefaultStr>,
123 value: Value,
124 ) {
125 let uri = format!(
126 "at://{}/{}/{}",
127 did.as_ref(),
128 collection.as_ref(),
129 rkey.as_ref()
130 );
131 let body = json!({ "uri": uri, "cid": CID, "value": value });
132 Mock::given(method("GET"))
133 .and(path("/xrpc/com.atproto.repo.getRecord"))
134 .and(query_param("repo", did.as_ref()))
135 .and(query_param("collection", collection.as_ref()))
136 .and(query_param("rkey", rkey.as_ref()))
137 .respond_with(ResponseTemplate::new(200).set_body_json(body))
138 .mount(&self.server)
139 .await;
140 }
141
142 fn promote_ready(&self, events: u64, cursor: u64) {
143 self.coverage.update(|_| Coverage::Ready {
144 events_processed: events,
145 last_cursor: HydrantCursor::new(cursor),
146 });
147 }
148
149 fn warming(&self, events: u64, cursor: u64) {
150 self.coverage.update(|_| Coverage::Warming {
151 events_processed: events,
152 last_cursor: HydrantCursor::new(cursor),
153 });
154 }
155}
156
157fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> {
158 let mut qs = format!("subject={}", encode(subject));
159 extras.iter().for_each(|(k, v)| {
160 qs.push('&');
161 qs.push_str(k);
162 qs.push('=');
163 qs.push_str(&encode(v));
164 });
165 Request::builder()
166 .uri(format!("/xrpc/{endpoint}?{qs}"))
167 .body(Body::empty())
168 .unwrap()
169}
170
171fn encode(s: &str) -> String {
172 byte_serialize(s.as_bytes()).collect()
173}
174
175async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
176 let status = resp.status();
177 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
178 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
179 (status, parsed)
180}
181
182fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value {
183 json!({
184 "$type": "sh.tangled.repo.issue",
185 "repo": repo_did.as_ref(),
186 "title": title,
187 "createdAt": "2026-05-01T00:00:00Z"
188 })
189}
190
191fn pull_body(repo_did: &Did<DefaultStr>, title: &str) -> Value {
192 json!({
193 "$type": "sh.tangled.repo.pull",
194 "title": title,
195 "createdAt": "2026-05-01T00:00:00Z",
196 "rounds": [],
197 "target": {
198 "repo": repo_did.as_ref(),
199 "branch": "main"
200 }
201 })
202}
203
204fn star_body(subject_did: &Did<DefaultStr>) -> Value {
205 json!({
206 "$type": "sh.tangled.feed.star",
207 "createdAt": "2026-05-01T00:00:00Z",
208 "subject": {
209 "$type": "sh.tangled.feed.star#repo",
210 "did": subject_did.as_ref()
211 }
212 })
213}
214
215fn follow_body(subject_did: &Did<DefaultStr>) -> Value {
216 json!({
217 "$type": "sh.tangled.graph.follow",
218 "createdAt": "2026-05-01T00:00:00Z",
219 "subject": subject_did.as_ref()
220 })
221}
222
223#[tokio::test]
224async fn list_issues_with_no_edges_returns_empty_items() {
225 let h = Harness::new().await;
226 let app = router(h.state.clone());
227 let resp = app
228 .oneshot(list_request(
229 "sh.tangled.repo.listIssues",
230 "at://did:plc:abalone",
231 &[],
232 ))
233 .await
234 .unwrap();
235 let (status, body) = json_response(resp).await;
236 assert_eq!(status, StatusCode::OK);
237 assert_eq!(body["items"], json!([]));
238 assert!(body["cursor"].is_null());
239}
240
241#[tokio::test]
242async fn count_issues_with_no_edges_returns_zero() {
243 let h = Harness::new().await;
244 let app = router(h.state.clone());
245 let resp = app
246 .oneshot(list_request(
247 "sh.tangled.repo.countIssues",
248 "at://did:plc:abalone",
249 &[],
250 ))
251 .await
252 .unwrap();
253 let (status, body) = json_response(resp).await;
254 assert_eq!(status, StatusCode::OK);
255 assert_eq!(body["count"], json!(0));
256 assert_eq!(body["distinctAuthors"], json!(0));
257}
258
259#[tokio::test]
260async fn list_issues_hydrates_via_slingshot_when_edges_present() {
261 let h = Harness::new().await;
262 let repo = did("did:plc:abalone");
263 let subject = at(&format!("at://{}", repo.as_ref()));
264 let owners = [
265 ("did:plc:nel", "i1", "first"),
266 ("did:plc:olaren", "i2", "second"),
267 ];
268 stream::iter(owners)
269 .for_each(|(d, r, title)| {
270 let h = &h;
271 let subject = subject.clone();
272 let repo = repo.clone();
273 async move {
274 let d_did = did(d);
275 let rk = rkey(r);
276 h.add_edge(
277 &nsid("sh.tangled.repo.issue"),
278 &subject,
279 &at(&format!(
280 "at://{}/sh.tangled.repo.issue/{}",
281 d_did.as_ref(),
282 rk.as_ref()
283 )),
284 );
285 h.mount(
286 &d_did,
287 &nsid("sh.tangled.repo.issue"),
288 &rk,
289 issue_body(&repo, title),
290 )
291 .await;
292 }
293 })
294 .await;
295
296 let app = router(h.state.clone());
297 let resp = app
298 .oneshot(list_request(
299 "sh.tangled.repo.listIssues",
300 subject.as_ref(),
301 &[],
302 ))
303 .await
304 .unwrap();
305 let (status, body) = json_response(resp).await;
306 assert_eq!(status, StatusCode::OK);
307 let items = body["items"].as_array().expect("items array");
308 assert_eq!(items.len(), 2);
309 let titles: Vec<&str> = items
310 .iter()
311 .map(|v| v["value"]["title"].as_str().unwrap())
312 .collect();
313 assert!(titles.contains(&"first"));
314 assert!(titles.contains(&"second"));
315 assert_eq!(items[0]["cid"], CID);
316 assert!(items[0]["uri"].as_str().unwrap().starts_with("at://"));
317}
318
319#[tokio::test]
320async fn count_distinct_authors_dedupes_per_author() {
321 let h = Harness::new().await;
322 let subject = at("at://did:plc:abalone");
323 h.add_edge(
324 &nsid("sh.tangled.feed.star"),
325 &subject,
326 &at("at://did:plc:nel/sh.tangled.feed.star/s1"),
327 );
328 h.add_edge(
329 &nsid("sh.tangled.feed.star"),
330 &subject,
331 &at("at://did:plc:nel/sh.tangled.feed.star/s2"),
332 );
333 h.add_edge(
334 &nsid("sh.tangled.feed.star"),
335 &subject,
336 &at("at://did:plc:olaren/sh.tangled.feed.star/s3"),
337 );
338
339 let app = router(h.state.clone());
340 let resp = app
341 .oneshot(list_request(
342 "sh.tangled.feed.countStars",
343 subject.as_ref(),
344 &[],
345 ))
346 .await
347 .unwrap();
348 let (_, body) = json_response(resp).await;
349 assert_eq!(body["count"], json!(3));
350 assert_eq!(body["distinctAuthors"], json!(2));
351}
352
353#[tokio::test]
354async fn list_items_stable_across_coverage_promotion() {
355 let h = Harness::new().await;
356 let subject = at("at://did:plc:abalone");
357 let nel = did("did:plc:nel");
358 h.add_edge(
359 &nsid("sh.tangled.feed.star"),
360 &subject,
361 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())),
362 );
363 h.mount(
364 &nel,
365 &nsid("sh.tangled.feed.star"),
366 &rkey("s1"),
367 star_body(&did("did:plc:abalone")),
368 )
369 .await;
370
371 let app = router(h.state.clone());
372 h.warming(1, 5);
373 let (_, before) = json_response(
374 app.clone()
375 .oneshot(list_request(
376 "sh.tangled.feed.listStars",
377 subject.as_ref(),
378 &[],
379 ))
380 .await
381 .unwrap(),
382 )
383 .await;
384 assert_eq!(before["items"].as_array().unwrap().len(), 1);
385
386 h.promote_ready(2, 9);
387 let (_, after) = json_response(
388 app.oneshot(list_request(
389 "sh.tangled.feed.listStars",
390 subject.as_ref(),
391 &[],
392 ))
393 .await
394 .unwrap(),
395 )
396 .await;
397 assert_eq!(
398 after["items"].as_array().unwrap().len(),
399 before["items"].as_array().unwrap().len(),
400 );
401 assert_eq!(after["items"], before["items"]);
402}
403
404#[tokio::test]
405async fn list_paginates_via_cursor() {
406 let h = Harness::new().await;
407 let subject = at("at://did:plc:abalone");
408 let repo = did("did:plc:abalone");
409 let owners = [
410 ("did:plc:nel", "i1"),
411 ("did:plc:olaren", "i2"),
412 ("did:plc:teq", "i3"),
413 ("did:plc:lyna", "i4"),
414 ("did:plc:bailey", "i5"),
415 ];
416 stream::iter(owners)
417 .for_each(|(d, r)| {
418 let h = &h;
419 let subject = subject.clone();
420 let repo = repo.clone();
421 async move {
422 let d_did = did(d);
423 let rk = rkey(r);
424 h.add_edge(
425 &nsid("sh.tangled.repo.issue"),
426 &subject,
427 &at(&format!(
428 "at://{}/sh.tangled.repo.issue/{}",
429 d_did.as_ref(),
430 rk.as_ref()
431 )),
432 );
433 h.mount(
434 &d_did,
435 &nsid("sh.tangled.repo.issue"),
436 &rk,
437 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
438 )
439 .await;
440 }
441 })
442 .await;
443
444 let app = router(h.state.clone());
445 let (_, page1) = json_response(
446 app.clone()
447 .oneshot(list_request(
448 "sh.tangled.repo.listIssues",
449 subject.as_ref(),
450 &[("limit", "2")],
451 ))
452 .await
453 .unwrap(),
454 )
455 .await;
456 let page1_items = page1["items"].as_array().unwrap().clone();
457 assert_eq!(page1_items.len(), 2);
458 let cursor = page1["cursor"]
459 .as_str()
460 .expect("first page must yield a cursor")
461 .to_owned();
462 assert!(
463 PageToken::decode_token(&cursor).is_ok(),
464 "cursor must be a TID-shaped token"
465 );
466
467 let (_, page2) = json_response(
468 app.oneshot(list_request(
469 "sh.tangled.repo.listIssues",
470 subject.as_ref(),
471 &[("limit", "10"), ("cursor", &cursor)],
472 ))
473 .await
474 .unwrap(),
475 )
476 .await;
477 let page2_items = page2["items"].as_array().unwrap().clone();
478 assert_eq!(page2_items.len(), 3);
479 assert!(page2["cursor"].is_null(), "tail page must not promise more");
480
481 let union: Vec<&str> = page1_items
482 .iter()
483 .chain(page2_items.iter())
484 .map(|item| item["uri"].as_str().unwrap())
485 .collect();
486 assert_eq!(union.len(), owners.len(), "union covers every owner");
487 let mut sorted = union.clone();
488 sorted.sort();
489 sorted.dedup();
490 assert_eq!(sorted.len(), owners.len(), "no duplicates across pages");
491}
492
493#[tokio::test]
494async fn pagination_unaffected_by_coverage_promotion() {
495 let h = Harness::new().await;
496 let subject = at("at://did:plc:abalone");
497 let repo = did("did:plc:abalone");
498 let owners = [("did:plc:nel", "i1"), ("did:plc:olaren", "i2")];
499 stream::iter(owners)
500 .for_each(|(d, r)| {
501 let h = &h;
502 let subject = subject.clone();
503 let repo = repo.clone();
504 async move {
505 let d_did = did(d);
506 let rk = rkey(r);
507 h.add_edge(
508 &nsid("sh.tangled.repo.issue"),
509 &subject,
510 &at(&format!(
511 "at://{}/sh.tangled.repo.issue/{}",
512 d_did.as_ref(),
513 rk.as_ref()
514 )),
515 );
516 h.mount(
517 &d_did,
518 &nsid("sh.tangled.repo.issue"),
519 &rk,
520 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
521 )
522 .await;
523 }
524 })
525 .await;
526
527 h.warming(1, 5);
528 let app = router(h.state.clone());
529 let (_, page1) = json_response(
530 app.clone()
531 .oneshot(list_request(
532 "sh.tangled.repo.listIssues",
533 subject.as_ref(),
534 &[("limit", "1")],
535 ))
536 .await
537 .unwrap(),
538 )
539 .await;
540 let cursor = page1["cursor"].as_str().unwrap().to_owned();
541
542 h.promote_ready(2, 9);
543 let (_, page2) = json_response(
544 app.oneshot(list_request(
545 "sh.tangled.repo.listIssues",
546 subject.as_ref(),
547 &[("limit", "10"), ("cursor", &cursor)],
548 ))
549 .await
550 .unwrap(),
551 )
552 .await;
553 assert_eq!(page2["items"].as_array().unwrap().len(), 1);
554}
555
556#[tokio::test]
557async fn invalid_cursor_returns_400() {
558 let h = Harness::new().await;
559 let app = router(h.state.clone());
560 let resp = app
561 .oneshot(list_request(
562 "sh.tangled.repo.listIssues",
563 "at://did:plc:abalone",
564 &[("cursor", "not-a-number")],
565 ))
566 .await
567 .unwrap();
568 let (status, body) = json_response(resp).await;
569 assert_eq!(status, StatusCode::BAD_REQUEST);
570 assert_eq!(body["error"], "InvalidRequest");
571}
572
573#[tokio::test]
574async fn list_follows_subject_is_followee_did() {
575 let h = Harness::new().await;
576 let followee = did("did:plc:bailey");
577 let subject = at(&format!("at://{}", followee.as_ref()));
578 h.add_edge(
579 &nsid("sh.tangled.graph.follow"),
580 &subject,
581 &at("at://did:plc:nel/sh.tangled.graph.follow/f1"),
582 );
583 h.mount(
584 &did("did:plc:nel"),
585 &nsid("sh.tangled.graph.follow"),
586 &rkey("f1"),
587 follow_body(&followee),
588 )
589 .await;
590
591 let app = router(h.state.clone());
592 let (status, body) = json_response(
593 app.oneshot(list_request(
594 "sh.tangled.graph.listFollows",
595 subject.as_ref(),
596 &[],
597 ))
598 .await
599 .unwrap(),
600 )
601 .await;
602 assert_eq!(status, StatusCode::OK);
603 let items = body["items"].as_array().unwrap();
604 assert_eq!(items.len(), 1);
605 assert_eq!(items[0]["value"]["subject"], followee.as_ref());
606}
607
608#[tokio::test]
609async fn upstream_failure_during_hydration_drops_only_that_item() {
610 let h = Harness::new().await;
611 let subject = at("at://did:plc:squid");
612 let kind = nsid("sh.tangled.repo.issue");
613 let repo = did("did:plc:squid");
614 h.add_edge(
615 &kind,
616 &subject,
617 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"),
618 );
619 h.add_edge(
620 &kind,
621 &subject,
622 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"),
623 );
624 h.mount(
625 &did("did:plc:nel"),
626 &kind,
627 &rkey("ok"),
628 issue_body(&repo, "kelp survey"),
629 )
630 .await;
631 Mock::given(method("GET"))
632 .and(path("/xrpc/com.atproto.repo.getRecord"))
633 .and(query_param("repo", "did:plc:teq"))
634 .and(query_param("collection", "sh.tangled.repo.issue"))
635 .and(query_param("rkey", "flaky"))
636 .respond_with(ResponseTemplate::new(503))
637 .mount(&h.server)
638 .await;
639
640 let app = router(h.state.clone());
641 let (status, body) = json_response(
642 app.oneshot(list_request(
643 "sh.tangled.repo.listIssues",
644 subject.as_ref(),
645 &[],
646 ))
647 .await
648 .unwrap(),
649 )
650 .await;
651 assert_eq!(status, StatusCode::OK);
652 let items = body["items"].as_array().expect("items array");
653 assert_eq!(items.len(), 1, "flaky item dropped, healthy sibling kept");
654 assert_eq!(
655 items[0]["uri"].as_str().unwrap(),
656 "at://did:plc:nel/sh.tangled.repo.issue/ok",
657 );
658}
659
660#[tokio::test]
661async fn transient_failure_keeps_edge_so_count_stays_whole() {
662 let h = Harness::new().await;
663 let subject = at("at://did:plc:squid");
664 let kind = nsid("sh.tangled.repo.issue");
665 let repo = did("did:plc:squid");
666 h.add_edge(
667 &kind,
668 &subject,
669 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"),
670 );
671 h.add_edge(
672 &kind,
673 &subject,
674 &at("at://did:plc:teq/sh.tangled.repo.issue/flaky"),
675 );
676 h.mount(
677 &did("did:plc:nel"),
678 &kind,
679 &rkey("ok"),
680 issue_body(&repo, "kelp survey"),
681 )
682 .await;
683 Mock::given(method("GET"))
684 .and(path("/xrpc/com.atproto.repo.getRecord"))
685 .and(query_param("repo", "did:plc:teq"))
686 .and(query_param("collection", "sh.tangled.repo.issue"))
687 .and(query_param("rkey", "flaky"))
688 .respond_with(ResponseTemplate::new(503))
689 .mount(&h.server)
690 .await;
691
692 let app = router(h.state.clone());
693 let (status, body) = json_response(
694 app.clone()
695 .oneshot(list_request(
696 "sh.tangled.repo.listIssues",
697 subject.as_ref(),
698 &[],
699 ))
700 .await
701 .unwrap(),
702 )
703 .await;
704 assert_eq!(status, StatusCode::OK);
705 assert_eq!(body["items"].as_array().unwrap().len(), 1);
706
707 let (cstatus, cbody) = json_response(
708 app.oneshot(list_request(
709 "sh.tangled.repo.countIssues",
710 subject.as_ref(),
711 &[],
712 ))
713 .await
714 .unwrap(),
715 )
716 .await;
717 assert_eq!(cstatus, StatusCode::OK);
718 assert_eq!(
719 cbody["count"],
720 json!(2),
721 "a transient 503 must not evict the edge, count stays whole",
722 );
723}
724
725#[tokio::test]
726async fn gone_item_is_evicted_so_count_converges_to_list() {
727 let h = Harness::new().await;
728 let subject = at("at://did:plc:squid");
729 let kind = nsid("sh.tangled.repo.issue");
730 let repo = did("did:plc:squid");
731 h.add_edge(
732 &kind,
733 &subject,
734 &at("at://did:plc:nel/sh.tangled.repo.issue/ok"),
735 );
736 h.add_edge(
737 &kind,
738 &subject,
739 &at("at://did:plc:teq/sh.tangled.repo.issue/gone"),
740 );
741 h.mount(
742 &did("did:plc:nel"),
743 &kind,
744 &rkey("ok"),
745 issue_body(&repo, "kelp survey"),
746 )
747 .await;
748 Mock::given(method("GET"))
749 .and(path("/xrpc/com.atproto.repo.getRecord"))
750 .and(query_param("repo", "did:plc:teq"))
751 .and(query_param("collection", "sh.tangled.repo.issue"))
752 .and(query_param("rkey", "gone"))
753 .respond_with(ResponseTemplate::new(404))
754 .mount(&h.server)
755 .await;
756
757 let app = router(h.state.clone());
758 let (status, body) = json_response(
759 app.clone()
760 .oneshot(list_request(
761 "sh.tangled.repo.listIssues",
762 subject.as_ref(),
763 &[],
764 ))
765 .await
766 .unwrap(),
767 )
768 .await;
769 assert_eq!(status, StatusCode::OK);
770 assert_eq!(body["items"].as_array().unwrap().len(), 1, "gone item dropped from the page");
771
772 let (cstatus, cbody) = json_response(
773 app.oneshot(list_request(
774 "sh.tangled.repo.countIssues",
775 subject.as_ref(),
776 &[],
777 ))
778 .await
779 .unwrap(),
780 )
781 .await;
782 assert_eq!(cstatus, StatusCode::OK);
783 assert_eq!(
784 cbody["count"],
785 json!(1),
786 "a definitive 404 must evict the dead edge so count matches the list",
787 );
788}
789
790#[tokio::test]
791async fn handle_authority_subject_is_400() {
792 let h = Harness::new().await;
793 let app = router(h.state.clone());
794 let cases = [
795 "sh.tangled.feed.listStars",
796 "sh.tangled.feed.countStars",
797 "sh.tangled.graph.listFollows",
798 "sh.tangled.graph.countFollows",
799 "sh.tangled.repo.listIssues",
800 "sh.tangled.repo.countIssues",
801 "sh.tangled.repo.listPulls",
802 "sh.tangled.repo.countPulls",
803 "sh.tangled.repo.issue.listComments",
804 "sh.tangled.repo.issue.countComments",
805 ];
806 stream::iter(cases)
807 .for_each(|endpoint| {
808 let app = app.clone();
809 async move {
810 let resp = app
811 .oneshot(list_request(endpoint, "at://oyster.cafe", &[]))
812 .await
813 .unwrap();
814 let (status, body) = json_response(resp).await;
815 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}");
816 assert_eq!(body["error"], "InvalidRequest", "{endpoint}");
817 assert!(
818 body["message"]
819 .as_str()
820 .unwrap_or_default()
821 .contains("did, not a handle"),
822 "{endpoint}: {}",
823 body["message"]
824 );
825 }
826 })
827 .await;
828}
829
830#[tokio::test]
831async fn empty_subject_is_400() {
832 let h = Harness::new().await;
833 let app = router(h.state.clone());
834 let resp = app
835 .oneshot(list_request("sh.tangled.repo.listIssues", "", &[]))
836 .await
837 .unwrap();
838 let (status, body) = json_response(resp).await;
839 assert_eq!(status, StatusCode::BAD_REQUEST);
840 assert_eq!(body["error"], "InvalidRequest");
841}
842
843#[tokio::test]
844async fn limit_below_min_or_above_max_is_400() {
845 let h = Harness::new().await;
846 let app = router(h.state.clone());
847 let cases = [("0", "below"), ("1001", "above")];
848 stream::iter(cases)
849 .for_each(|(limit, label)| {
850 let app = app.clone();
851 async move {
852 let resp = app
853 .oneshot(list_request(
854 "sh.tangled.repo.listIssues",
855 "at://did:plc:abalone",
856 &[("limit", limit)],
857 ))
858 .await
859 .unwrap();
860 let (status, body) = json_response(resp).await;
861 assert_eq!(status, StatusCode::BAD_REQUEST, "limit {label}");
862 assert_eq!(body["error"], "InvalidRequest", "limit {label}");
863 }
864 })
865 .await;
866}
867
868#[tokio::test]
869async fn count_after_remove_source_returns_zero() {
870 let h = Harness::new().await;
871 let subject = at("at://did:plc:abalone");
872 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1");
873 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source);
874 h.edges.remove_source(&source);
875
876 let app = router(h.state.clone());
877 let (_, body) = json_response(
878 app.oneshot(list_request(
879 "sh.tangled.feed.countStars",
880 subject.as_ref(),
881 &[],
882 ))
883 .await
884 .unwrap(),
885 )
886 .await;
887 assert_eq!(body["count"], json!(0));
888 assert_eq!(body["distinctAuthors"], json!(0));
889}
890
891#[tokio::test]
892async fn list_issue_comments_hydrates_end_to_end() {
893 let h = Harness::new().await;
894 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1");
895 let nel = did("did:plc:nel");
896 let rk = rkey("c1");
897 h.add_edge(
898 &nsid("sh.tangled.repo.issue.comment"),
899 &issue_uri,
900 &at(&format!(
901 "at://{}/sh.tangled.repo.issue.comment/{}",
902 nel.as_ref(),
903 rk.as_ref()
904 )),
905 );
906 h.mount(
907 &nel,
908 &nsid("sh.tangled.repo.issue.comment"),
909 &rk,
910 json!({
911 "$type": "sh.tangled.repo.issue.comment",
912 "issue": issue_uri.as_ref(),
913 "body": "thoughts",
914 "createdAt": "2026-05-01T00:00:00Z"
915 }),
916 )
917 .await;
918
919 let app = router(h.state.clone());
920 let (status, body) = json_response(
921 app.oneshot(list_request(
922 "sh.tangled.repo.issue.listComments",
923 issue_uri.as_ref(),
924 &[],
925 ))
926 .await
927 .unwrap(),
928 )
929 .await;
930 assert_eq!(status, StatusCode::OK);
931 let items = body["items"].as_array().unwrap();
932 assert_eq!(items.len(), 1);
933 assert_eq!(items[0]["value"]["body"], json!("thoughts"));
934 assert_eq!(items[0]["value"]["issue"], json!(issue_uri.as_ref()));
935}
936
937#[tokio::test]
938async fn list_item_cid_is_present() {
939 let h = Harness::new().await;
940 let subject = at("at://did:plc:abalone");
941 let nel = did("did:plc:nel");
942 h.add_edge(
943 &nsid("sh.tangled.feed.star"),
944 &subject,
945 &at(&format!("at://{}/sh.tangled.feed.star/s1", nel.as_ref())),
946 );
947 h.mount(
948 &nel,
949 &nsid("sh.tangled.feed.star"),
950 &rkey("s1"),
951 star_body(&did("did:plc:abalone")),
952 )
953 .await;
954
955 let app = router(h.state.clone());
956 let (_, body) = json_response(
957 app.oneshot(list_request(
958 "sh.tangled.feed.listStars",
959 subject.as_ref(),
960 &[],
961 ))
962 .await
963 .unwrap(),
964 )
965 .await;
966 let item = &body["items"][0];
967 assert!(
968 item.as_object().unwrap().contains_key("cid"),
969 "list items must mirror getRecord output shape and include cid"
970 );
971 assert_eq!(item["cid"], json!(CID));
972}
973
974#[tokio::test]
975async fn count_issue_comments_subjects_on_issue_uri() {
976 let h = Harness::new().await;
977 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1");
978 h.add_edge(
979 &nsid("sh.tangled.repo.issue.comment"),
980 &issue_uri,
981 &at("at://did:plc:nel/sh.tangled.repo.issue.comment/c1"),
982 );
983 h.add_edge(
984 &nsid("sh.tangled.repo.issue.comment"),
985 &issue_uri,
986 &at("at://did:plc:olaren/sh.tangled.repo.issue.comment/c2"),
987 );
988
989 let app = router(h.state.clone());
990 let (status, body) = json_response(
991 app.oneshot(list_request(
992 "sh.tangled.repo.issue.countComments",
993 issue_uri.as_ref(),
994 &[],
995 ))
996 .await
997 .unwrap(),
998 )
999 .await;
1000 assert_eq!(status, StatusCode::OK);
1001 assert_eq!(body["count"], json!(2));
1002 assert_eq!(body["distinctAuthors"], json!(2));
1003}
1004
1005#[tokio::test]
1006async fn list_item_404_dropped_not_404_for_subject() {
1007 let h = Harness::new().await;
1008 let subject = at("at://did:plc:squid");
1009 let kind = nsid("sh.tangled.repo.issue");
1010 let repo = did("did:plc:squid");
1011 h.add_edge(
1012 &kind,
1013 &subject,
1014 &at("at://did:plc:nel/sh.tangled.repo.issue/live"),
1015 );
1016 h.add_edge(
1017 &kind,
1018 &subject,
1019 &at("at://did:plc:teq/sh.tangled.repo.issue/missing"),
1020 );
1021 h.mount(
1022 &did("did:plc:nel"),
1023 &kind,
1024 &rkey("live"),
1025 issue_body(&repo, "kelp survives"),
1026 )
1027 .await;
1028 Mock::given(method("GET"))
1029 .and(path("/xrpc/com.atproto.repo.getRecord"))
1030 .and(query_param("repo", "did:plc:teq"))
1031 .and(query_param("collection", "sh.tangled.repo.issue"))
1032 .and(query_param("rkey", "missing"))
1033 .respond_with(ResponseTemplate::new(404).set_body_json(json!({
1034 "error": "RecordNotFound",
1035 "message": "could not find record"
1036 })))
1037 .mount(&h.server)
1038 .await;
1039
1040 let app = router(h.state.clone());
1041 let (status, body) = json_response(
1042 app.oneshot(list_request(
1043 "sh.tangled.repo.listIssues",
1044 subject.as_ref(),
1045 &[],
1046 ))
1047 .await
1048 .unwrap(),
1049 )
1050 .await;
1051 assert_eq!(
1052 status,
1053 StatusCode::OK,
1054 "a stale-index 404 drops that item, it must not 404 or 502 the subject's list",
1055 );
1056 let items = body["items"].as_array().expect("items array");
1057 assert_eq!(items.len(), 1, "stale 404 item dropped, live sibling kept");
1058 assert_eq!(
1059 items[0]["uri"].as_str().unwrap(),
1060 "at://did:plc:nel/sh.tangled.repo.issue/live",
1061 );
1062}
1063
1064#[tokio::test]
1065async fn list_item_with_wrong_type_tag_dropped() {
1066 let h = Harness::new().await;
1067 let subject = at("at://did:plc:squid");
1068 let kind = nsid("sh.tangled.feed.star");
1069 h.add_edge(
1070 &kind,
1071 &subject,
1072 &at("at://did:plc:nel/sh.tangled.feed.star/good"),
1073 );
1074 h.add_edge(
1075 &kind,
1076 &subject,
1077 &at("at://did:plc:teq/sh.tangled.feed.star/wrong"),
1078 );
1079 h.mount(
1080 &did("did:plc:nel"),
1081 &kind,
1082 &rkey("good"),
1083 star_body(&did("did:plc:squid")),
1084 )
1085 .await;
1086 Mock::given(method("GET"))
1087 .and(path("/xrpc/com.atproto.repo.getRecord"))
1088 .and(query_param("repo", "did:plc:teq"))
1089 .and(query_param("collection", "sh.tangled.feed.star"))
1090 .and(query_param("rkey", "wrong"))
1091 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1092 "uri": "at://did:plc:teq/sh.tangled.feed.star/wrong",
1093 "cid": CID,
1094 "value": {
1095 "$type": "sh.tangled.feed.reaction",
1096 "createdAt": "2026-05-01T00:00:00Z",
1097 "subject": "at://did:plc:squid"
1098 }
1099 })))
1100 .mount(&h.server)
1101 .await;
1102
1103 let app = router(h.state.clone());
1104 let (status, body) = json_response(
1105 app.oneshot(list_request(
1106 "sh.tangled.feed.listStars",
1107 subject.as_ref(),
1108 &[],
1109 ))
1110 .await
1111 .unwrap(),
1112 )
1113 .await;
1114 assert_eq!(status, StatusCode::OK);
1115 let items = body["items"].as_array().expect("items array");
1116 assert_eq!(items.len(), 1, "wrong-type item dropped, valid star kept");
1117 assert_eq!(
1118 items[0]["uri"].as_str().unwrap(),
1119 "at://did:plc:nel/sh.tangled.feed.star/good",
1120 );
1121}
1122
1123#[tokio::test]
1124async fn list_item_with_mismatched_collection_dropped() {
1125 let h = Harness::new().await;
1126 let subject = at("at://did:plc:squid");
1127 let kind = nsid("sh.tangled.repo.issue");
1128 let repo = did("did:plc:squid");
1129 h.add_edge(
1130 &kind,
1131 &subject,
1132 &at("at://did:plc:nel/sh.tangled.repo.issue/live"),
1133 );
1134 h.add_edge(
1135 &kind,
1136 &subject,
1137 &at("at://did:plc:teq/sh.tangled.feed.star/whelk"),
1138 );
1139 h.mount(
1140 &did("did:plc:nel"),
1141 &kind,
1142 &rkey("live"),
1143 issue_body(&repo, "kelp survives"),
1144 )
1145 .await;
1146
1147 let app = router(h.state.clone());
1148 let (status, body) = json_response(
1149 app.oneshot(list_request(
1150 "sh.tangled.repo.listIssues",
1151 subject.as_ref(),
1152 &[],
1153 ))
1154 .await
1155 .unwrap(),
1156 )
1157 .await;
1158 assert_eq!(
1159 status,
1160 StatusCode::OK,
1161 "a mismatched-collection index edge must not 400 the subject's list",
1162 );
1163 let items = body["items"].as_array().expect("items array");
1164 assert_eq!(
1165 items.len(),
1166 1,
1167 "mismatched-collection edge dropped, live sibling kept"
1168 );
1169 assert_eq!(
1170 items[0]["uri"].as_str().unwrap(),
1171 "at://did:plc:nel/sh.tangled.repo.issue/live",
1172 );
1173}
1174
1175#[tokio::test]
1176async fn bare_did_endpoints_reject_at_uri_subject() {
1177 let h = Harness::new().await;
1178 let app = router(h.state.clone());
1179 let cases = [
1180 "sh.tangled.graph.listFollows",
1181 "sh.tangled.graph.countFollows",
1182 ];
1183 stream::iter(cases)
1184 .for_each(|endpoint| {
1185 let app = app.clone();
1186 async move {
1187 let resp = app
1188 .oneshot(list_request(
1189 endpoint,
1190 "at://did:plc:abalone/sh.tangled.repo/r1",
1191 &[],
1192 ))
1193 .await
1194 .unwrap();
1195 let (status, body) = json_response(resp).await;
1196 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}");
1197 assert_eq!(body["error"], "InvalidRequest", "{endpoint}");
1198 assert!(
1199 body["message"]
1200 .as_str()
1201 .unwrap_or_default()
1202 .contains("bare did"),
1203 "{endpoint}: {}",
1204 body["message"],
1205 );
1206 }
1207 })
1208 .await;
1209}
1210
1211#[tokio::test]
1212async fn repo_pointing_endpoints_reject_at_uri_subject() {
1213 let h = Harness::new().await;
1214 let app = router(h.state.clone());
1215 let cases = [
1216 "sh.tangled.repo.listIssues",
1217 "sh.tangled.repo.countIssues",
1218 "sh.tangled.repo.listPulls",
1219 "sh.tangled.repo.countPulls",
1220 "sh.tangled.repo.listArtifacts",
1221 "sh.tangled.repo.countArtifacts",
1222 ];
1223 stream::iter(cases)
1224 .for_each(|endpoint| {
1225 let app = app.clone();
1226 async move {
1227 let resp = app
1228 .oneshot(list_request(
1229 endpoint,
1230 "at://did:plc:abalone/sh.tangled.repo/r1",
1231 &[],
1232 ))
1233 .await
1234 .unwrap();
1235 let (status, body) = json_response(resp).await;
1236 assert_eq!(
1237 status,
1238 StatusCode::BAD_REQUEST,
1239 "{endpoint} must reject rkey-form subjects since rkeys are unstable; clients must send the repoDID",
1240 );
1241 assert!(
1242 body["message"]
1243 .as_str()
1244 .unwrap_or_default()
1245 .contains("bare did"),
1246 "{endpoint}: {}",
1247 body["message"],
1248 );
1249 }
1250 })
1251 .await;
1252}
1253
1254#[tokio::test]
1255async fn repo_pointing_endpoints_accept_bare_did() {
1256 let h = Harness::new().await;
1257 let app = router(h.state.clone());
1258 let cases = [
1259 "sh.tangled.repo.listIssues",
1260 "sh.tangled.repo.countIssues",
1261 "sh.tangled.repo.listPulls",
1262 "sh.tangled.repo.countPulls",
1263 "sh.tangled.repo.listArtifacts",
1264 "sh.tangled.repo.countArtifacts",
1265 ];
1266 stream::iter(cases)
1267 .for_each(|endpoint| {
1268 let app = app.clone();
1269 async move {
1270 let resp = app
1271 .oneshot(list_request(endpoint, "did:plc:abalone", &[]))
1272 .await
1273 .unwrap();
1274 let (status, _body) = json_response(resp).await;
1275 assert_eq!(status, StatusCode::OK, "{endpoint} must accept bare did");
1276 }
1277 })
1278 .await;
1279}
1280
1281#[tokio::test]
1282async fn issue_collection_endpoints_reject_bare_did_or_wrong_collection() {
1283 let h = Harness::new().await;
1284 let app = router(h.state.clone());
1285 let endpoints = [
1286 "sh.tangled.repo.issue.listComments",
1287 "sh.tangled.repo.issue.countComments",
1288 ];
1289 let inputs = [
1290 "at://did:plc:abalone",
1291 "at://did:plc:abalone/sh.tangled.repo/r1",
1292 ];
1293 let cases = endpoints
1294 .iter()
1295 .copied()
1296 .flat_map(|endpoint| inputs.iter().copied().map(move |input| (endpoint, input)));
1297 stream::iter(cases)
1298 .for_each(|(endpoint, input)| {
1299 let app = app.clone();
1300 async move {
1301 let resp = app
1302 .oneshot(list_request(endpoint, input, &[]))
1303 .await
1304 .unwrap();
1305 let (status, body) = json_response(resp).await;
1306 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint} input={input}");
1307 assert!(
1308 body["message"]
1309 .as_str()
1310 .unwrap_or_default()
1311 .contains("sh.tangled.repo.issue/<rkey>"),
1312 "{endpoint} input={input}: {}",
1313 body["message"],
1314 );
1315 }
1316 })
1317 .await;
1318}
1319
1320#[tokio::test]
1321async fn star_endpoints_reject_unrelated_collection() {
1322 let h = Harness::new().await;
1323 let app = router(h.state.clone());
1324 let endpoints = ["sh.tangled.feed.listStars", "sh.tangled.feed.countStars"];
1325 stream::iter(endpoints)
1326 .for_each(|endpoint| {
1327 let app = app.clone();
1328 async move {
1329 let resp = app
1330 .oneshot(list_request(
1331 endpoint,
1332 "at://did:plc:abalone/sh.tangled.knot/k1",
1333 &[],
1334 ))
1335 .await
1336 .unwrap();
1337 let (status, body) = json_response(resp).await;
1338 assert_eq!(status, StatusCode::BAD_REQUEST, "{endpoint}");
1339 let msg = body["message"].as_str().unwrap_or_default();
1340 assert!(msg.contains("sh.tangled.string"), "{endpoint}: {msg}",);
1341 }
1342 })
1343 .await;
1344}
1345
1346#[tokio::test]
1347async fn star_endpoints_reject_repo_uri_subject() {
1348 let h = Harness::new().await;
1349 let app = router(h.state.clone());
1350 let resp = app
1351 .oneshot(list_request(
1352 "sh.tangled.feed.countStars",
1353 "at://did:plc:abalone/sh.tangled.repo/r1",
1354 &[],
1355 ))
1356 .await
1357 .unwrap();
1358 let (status, body) = json_response(resp).await;
1359 assert_eq!(
1360 status,
1361 StatusCode::BAD_REQUEST,
1362 "rkey-form repo URI must be rejected; clients must send the repoDID directly",
1363 );
1364 let msg = body["message"].as_str().unwrap_or_default();
1365 assert!(msg.contains("sh.tangled.string"), "{msg}");
1366}
1367
1368#[tokio::test]
1369async fn star_endpoints_accept_string_subject_form() {
1370 let h = Harness::new().await;
1371 let app = router(h.state.clone());
1372 let resp = app
1373 .oneshot(list_request(
1374 "sh.tangled.feed.countStars",
1375 "at://did:plc:abalone/sh.tangled.string/k1",
1376 &[],
1377 ))
1378 .await
1379 .unwrap();
1380 let (status, body) = json_response(resp).await;
1381 assert_eq!(status, StatusCode::OK);
1382 assert_eq!(body["count"], json!(0));
1383}
1384
1385#[tokio::test]
1386async fn list_after_remove_source_returns_empty_items() {
1387 let h = Harness::new().await;
1388 let subject = at("at://did:plc:abalone");
1389 let source = at("at://did:plc:nel/sh.tangled.feed.star/s1");
1390 h.add_edge(&nsid("sh.tangled.feed.star"), &subject, &source);
1391 h.edges.remove_source(&source);
1392
1393 let app = router(h.state.clone());
1394 let (status, body) = json_response(
1395 app.oneshot(list_request(
1396 "sh.tangled.feed.listStars",
1397 subject.as_ref(),
1398 &[],
1399 ))
1400 .await
1401 .unwrap(),
1402 )
1403 .await;
1404 assert_eq!(status, StatusCode::OK);
1405 assert_eq!(body["items"], json!([]));
1406 assert!(body["cursor"].is_null());
1407}
1408
1409#[tokio::test]
1410async fn list_pulls_hydrates_via_slingshot_when_edges_present() {
1411 let h = Harness::new().await;
1412 let target_did = did("did:plc:abalone");
1413 let subject = at(&format!("at://{}", target_did.as_ref()));
1414 let source_did = did("did:plc:nel");
1415 let rk = rkey("p1");
1416 h.add_edge(
1417 &nsid("sh.tangled.repo.pull"),
1418 &subject,
1419 &at(&format!(
1420 "at://{}/sh.tangled.repo.pull/{}",
1421 source_did.as_ref(),
1422 rk.as_ref()
1423 )),
1424 );
1425 h.mount(
1426 &source_did,
1427 &nsid("sh.tangled.repo.pull"),
1428 &rk,
1429 json!({
1430 "$type": "sh.tangled.repo.pull",
1431 "title": "ship it",
1432 "createdAt": "2026-05-01T00:00:00Z",
1433 "rounds": [],
1434 "target": {"repo": target_did.as_ref(), "branch": "main"},
1435 }),
1436 )
1437 .await;
1438 let app = router(h.state.clone());
1439 let (status, body) = json_response(
1440 app.oneshot(list_request(
1441 "sh.tangled.repo.listPulls",
1442 subject.as_ref(),
1443 &[],
1444 ))
1445 .await
1446 .unwrap(),
1447 )
1448 .await;
1449 assert_eq!(status, StatusCode::OK);
1450 let items = body["items"].as_array().unwrap();
1451 assert_eq!(items.len(), 1);
1452 assert_eq!(items[0]["value"]["title"], json!("ship it"));
1453 assert_eq!(
1454 items[0]["value"]["target"]["repo"],
1455 json!(target_did.as_ref())
1456 );
1457}
1458
1459#[tokio::test]
1460async fn count_pulls_returns_distinct_authors() {
1461 let h = Harness::new().await;
1462 let subject = at("at://did:plc:abalone");
1463 h.add_edge(
1464 &nsid("sh.tangled.repo.pull"),
1465 &subject,
1466 &at("at://did:plc:nel/sh.tangled.repo.pull/p1"),
1467 );
1468 h.add_edge(
1469 &nsid("sh.tangled.repo.pull"),
1470 &subject,
1471 &at("at://did:plc:olaren/sh.tangled.repo.pull/p2"),
1472 );
1473 h.add_edge(
1474 &nsid("sh.tangled.repo.pull"),
1475 &subject,
1476 &at("at://did:plc:nel/sh.tangled.repo.pull/p3"),
1477 );
1478 let app = router(h.state.clone());
1479 let (_, body) = json_response(
1480 app.oneshot(list_request(
1481 "sh.tangled.repo.countPulls",
1482 subject.as_ref(),
1483 &[],
1484 ))
1485 .await
1486 .unwrap(),
1487 )
1488 .await;
1489 assert_eq!(body["count"], json!(3));
1490 assert_eq!(body["distinctAuthors"], json!(2));
1491}
1492
1493#[tokio::test]
1494async fn extractor_to_xrpc_round_trip_for_star() {
1495 let h = Harness::new().await;
1496 let subject_did = did("did:plc:abalone");
1497 let source_did = did("did:plc:nel");
1498 let rk = rkey("s1");
1499 let source = at(&format!(
1500 "at://{}/sh.tangled.feed.star/{}",
1501 source_did.as_ref(),
1502 rk.as_ref()
1503 ));
1504 let body = star_body(&subject_did);
1505 let parsed =
1506 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.feed.star"), body.clone())
1507 .expect("parse star record");
1508 parsed
1509 .extract_edges(&source)
1510 .expect("extract")
1511 .into_iter()
1512 .for_each(|e| h.edges.add(e));
1513 h.mount(&source_did, &nsid("sh.tangled.feed.star"), &rk, body)
1514 .await;
1515
1516 let app = router(h.state.clone());
1517 let (status, json) = json_response(
1518 app.oneshot(list_request(
1519 "sh.tangled.feed.listStars",
1520 &format!("at://{}", subject_did.as_ref()),
1521 &[],
1522 ))
1523 .await
1524 .unwrap(),
1525 )
1526 .await;
1527 assert_eq!(
1528 status,
1529 StatusCode::OK,
1530 "extractor key must match handler subject, body was {json}",
1531 );
1532 let items = json["items"].as_array().unwrap();
1533 assert_eq!(items.len(), 1, "expected exactly one star edge");
1534 assert_eq!(
1535 items[0]["value"]["subject"]["did"],
1536 json!(subject_did.as_ref())
1537 );
1538}
1539
1540#[tokio::test]
1541async fn list_issues_includes_state_comment_count_and_state_updated_at() {
1542 let h = Harness::new().await;
1543 let repo = did("did:plc:limpet");
1544 let subject = at(&format!("at://{}", repo.as_ref()));
1545 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1546 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1547 h.mount(
1548 &did("did:plc:nel"),
1549 &nsid("sh.tangled.repo.issue"),
1550 &rkey("i1"),
1551 issue_body(&repo, "hi"),
1552 )
1553 .await;
1554 h.add_edge(
1555 &nsid("sh.tangled.repo.issue.comment"),
1556 &issue_uri,
1557 &at("at://did:plc:olaren/sh.tangled.repo.issue.comment/c1"),
1558 );
1559 h.add_edge(
1560 &nsid("sh.tangled.repo.issue.comment"),
1561 &issue_uri,
1562 &at("at://did:plc:teq/sh.tangled.repo.issue.comment/c2"),
1563 );
1564
1565 h.state.issue_states.upsert(
1566 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"),
1567 issue_uri.clone(),
1568 1_777_593_600_000_000,
1569 IssueStateKind::Open,
1570 );
1571 h.state.issue_states.upsert(
1572 at("at://did:plc:nel/sh.tangled.repo.issue.state/s2"),
1573 issue_uri.clone(),
1574 1_777_593_700_000_000,
1575 IssueStateKind::Closed,
1576 );
1577
1578 let app = router(h.state.clone());
1579 let resp = app
1580 .oneshot(list_request(
1581 "sh.tangled.repo.listIssues",
1582 subject.as_ref(),
1583 &[],
1584 ))
1585 .await
1586 .unwrap();
1587 let (status, body) = json_response(resp).await;
1588 assert_eq!(status, StatusCode::OK);
1589 let item = &body["items"][0];
1590 assert_eq!(item["state"], json!("closed"));
1591 assert_eq!(item["commentCount"], json!(2));
1592 let updated = item["stateUpdatedAt"]
1593 .as_str()
1594 .expect("stateUpdatedAt must serialize as RFC3339 string");
1595 assert!(
1596 updated.starts_with("2026-"),
1597 "expected 2026 timestamp, got {updated}"
1598 );
1599}
1600
1601#[tokio::test]
1602async fn list_issues_defaults_to_open_when_no_state_record() {
1603 let h = Harness::new().await;
1604 let repo = did("did:plc:limpet");
1605 let subject = at(&format!("at://{}", repo.as_ref()));
1606 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1607 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1608 h.mount(
1609 &did("did:plc:nel"),
1610 &nsid("sh.tangled.repo.issue"),
1611 &rkey("i1"),
1612 issue_body(&repo, "no state yet"),
1613 )
1614 .await;
1615
1616 let app = router(h.state.clone());
1617 let (_status, body) = json_response(
1618 app.oneshot(list_request(
1619 "sh.tangled.repo.listIssues",
1620 subject.as_ref(),
1621 &[],
1622 ))
1623 .await
1624 .unwrap(),
1625 )
1626 .await;
1627 let item = &body["items"][0];
1628 assert_eq!(
1629 item["state"],
1630 json!("open"),
1631 "absent state record defaults to open"
1632 );
1633 assert!(
1634 item.get("stateUpdatedAt").is_none(),
1635 "stateUpdatedAt must be absent without a state record",
1636 );
1637 assert_eq!(item["commentCount"], json!(0));
1638}
1639
1640#[tokio::test]
1641async fn list_issues_author_filter_restricts_to_matching_did() {
1642 let h = Harness::new().await;
1643 let repo = did("did:plc:limpet");
1644 let subject = at(&format!("at://{}", repo.as_ref()));
1645 let owners = [
1646 ("did:plc:nel", "n1"),
1647 ("did:plc:nel", "n2"),
1648 ("did:plc:olaren", "o1"),
1649 ("did:plc:olaren", "o2"),
1650 ];
1651 stream::iter(owners)
1652 .for_each(|(d, r)| {
1653 let h = &h;
1654 let subject = subject.clone();
1655 let repo = repo.clone();
1656 async move {
1657 let d_did = did(d);
1658 let rk = rkey(r);
1659 h.add_edge(
1660 &nsid("sh.tangled.repo.issue"),
1661 &subject,
1662 &at(&format!(
1663 "at://{}/sh.tangled.repo.issue/{}",
1664 d_did.as_ref(),
1665 rk.as_ref()
1666 )),
1667 );
1668 h.mount(
1669 &d_did,
1670 &nsid("sh.tangled.repo.issue"),
1671 &rk,
1672 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
1673 )
1674 .await;
1675 }
1676 })
1677 .await;
1678
1679 let app = router(h.state.clone());
1680 let (status, body) = json_response(
1681 app.oneshot(list_request(
1682 "sh.tangled.repo.listIssues",
1683 subject.as_ref(),
1684 &[("author", "did:plc:nel")],
1685 ))
1686 .await
1687 .unwrap(),
1688 )
1689 .await;
1690 assert_eq!(status, StatusCode::OK);
1691 let items = body["items"].as_array().expect("items array");
1692 assert_eq!(items.len(), 2, "two issues authored by nel");
1693 let all_nel = items
1694 .iter()
1695 .all(|i| i["uri"].as_str().unwrap().starts_with("at://did:plc:nel/"));
1696 assert!(all_nel, "every returned uri must be authored by nel");
1697}
1698
1699#[tokio::test]
1700async fn list_issues_invalid_author_returns_400() {
1701 let h = Harness::new().await;
1702 let subject = "at://did:plc:limpet".to_owned();
1703 let app = router(h.state.clone());
1704 let resp = app
1705 .oneshot(list_request(
1706 "sh.tangled.repo.listIssues",
1707 &subject,
1708 &[("author", "not-a-did")],
1709 ))
1710 .await
1711 .unwrap();
1712 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1713}
1714
1715#[tokio::test]
1716async fn list_pulls_includes_merged_state_and_comment_count() {
1717 let h = Harness::new().await;
1718 let repo = did("did:plc:limpet");
1719 let subject = at(&format!("at://{}", repo.as_ref()));
1720 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1");
1721 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri);
1722 h.mount(
1723 &did("did:plc:nel"),
1724 &nsid("sh.tangled.repo.pull"),
1725 &rkey("p1"),
1726 pull_body(&repo, "fix bug"),
1727 )
1728 .await;
1729 h.add_edge(
1730 &nsid("sh.tangled.repo.pull.comment"),
1731 &pull_uri,
1732 &at("at://did:plc:teq/sh.tangled.repo.pull.comment/c1"),
1733 );
1734 h.state.pull_statuses.upsert(
1735 at("at://did:plc:nel/sh.tangled.repo.pull.status/s1"),
1736 pull_uri.clone(),
1737 1_777_593_600_000_000,
1738 PullStatusKind::Open,
1739 );
1740 h.state.pull_statuses.upsert(
1741 at("at://did:plc:nel/sh.tangled.repo.pull.status/s2"),
1742 pull_uri.clone(),
1743 1_777_593_800_000_000,
1744 PullStatusKind::Merged,
1745 );
1746
1747 let app = router(h.state.clone());
1748 let (status, body) = json_response(
1749 app.oneshot(list_request(
1750 "sh.tangled.repo.listPulls",
1751 subject.as_ref(),
1752 &[],
1753 ))
1754 .await
1755 .unwrap(),
1756 )
1757 .await;
1758 assert_eq!(status, StatusCode::OK);
1759 let item = &body["items"][0];
1760 assert_eq!(item["state"], json!("merged"));
1761 assert_eq!(item["commentCount"], json!(1));
1762}
1763
1764#[tokio::test]
1765async fn list_issues_state_filter_open_includes_records_without_state() {
1766 let h = Harness::new().await;
1767 let repo = did("did:plc:limpet");
1768 let subject = at(&format!("at://{}", repo.as_ref()));
1769 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1770 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1771 h.mount(
1772 &did("did:plc:nel"),
1773 &nsid("sh.tangled.repo.issue"),
1774 &rkey("i1"),
1775 issue_body(&repo, "fresh"),
1776 )
1777 .await;
1778
1779 let app = router(h.state.clone());
1780 let (status, body) = json_response(
1781 app.oneshot(list_request(
1782 "sh.tangled.repo.listIssues",
1783 subject.as_ref(),
1784 &[("state", "open")],
1785 ))
1786 .await
1787 .unwrap(),
1788 )
1789 .await;
1790 assert_eq!(status, StatusCode::OK);
1791 let items = body["items"].as_array().expect("items array");
1792 assert_eq!(
1793 items.len(),
1794 1,
1795 "absent state record still matches state=open"
1796 );
1797}
1798
1799#[tokio::test]
1800async fn list_issues_state_filter_ignores_third_party_state_source() {
1801 let h = Harness::new().await;
1802 let repo = did("did:plc:limpet");
1803 let subject = at(&format!("at://{}", repo.as_ref()));
1804 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1805 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1806 h.mount(
1807 &did("did:plc:nel"),
1808 &nsid("sh.tangled.repo.issue"),
1809 &rkey("i1"),
1810 issue_body(&repo, "open issue"),
1811 )
1812 .await;
1813 h.state.issue_states.upsert(
1814 at("at://did:plc:nautilus/sh.tangled.repo.issue.state/spoof"),
1815 issue_uri.clone(),
1816 1_777_593_800_000_000,
1817 IssueStateKind::Closed,
1818 );
1819
1820 let app = router(h.state.clone());
1821 let (status, body) = json_response(
1822 app.oneshot(list_request(
1823 "sh.tangled.repo.listIssues",
1824 subject.as_ref(),
1825 &[("state", "open")],
1826 ))
1827 .await
1828 .unwrap(),
1829 )
1830 .await;
1831 assert_eq!(status, StatusCode::OK);
1832 let items = body["items"].as_array().expect("items array");
1833 assert_eq!(
1834 items.len(),
1835 1,
1836 "third-party Closed record must not flip filter result for state=open",
1837 );
1838 assert_eq!(items[0]["state"], json!("open"));
1839 assert!(
1840 items[0].get("stateUpdatedAt").is_none(),
1841 "third-party state source must not surface stateUpdatedAt",
1842 );
1843}
1844
1845#[tokio::test]
1846async fn list_pulls_status_filter_ignores_third_party_status_source() {
1847 let h = Harness::new().await;
1848 let repo = did("did:plc:limpet");
1849 let subject = at(&format!("at://{}", repo.as_ref()));
1850 let pull_uri = at("at://did:plc:nel/sh.tangled.repo.pull/p1");
1851 h.add_edge(&nsid("sh.tangled.repo.pull"), &subject, &pull_uri);
1852 h.mount(
1853 &did("did:plc:nel"),
1854 &nsid("sh.tangled.repo.pull"),
1855 &rkey("p1"),
1856 pull_body(&repo, "wip"),
1857 )
1858 .await;
1859 h.state.pull_statuses.upsert(
1860 at("at://did:plc:nautilus/sh.tangled.repo.pull.status/spoof"),
1861 pull_uri.clone(),
1862 1_777_593_800_000_000,
1863 PullStatusKind::Merged,
1864 );
1865
1866 let app = router(h.state.clone());
1867 let (status, body) = json_response(
1868 app.oneshot(list_request(
1869 "sh.tangled.repo.listPulls",
1870 subject.as_ref(),
1871 &[("status", "merged")],
1872 ))
1873 .await
1874 .unwrap(),
1875 )
1876 .await;
1877 assert_eq!(status, StatusCode::OK);
1878 let items = body["items"].as_array().expect("items array");
1879 assert_eq!(
1880 items.len(),
1881 0,
1882 "third-party Merged record must not satisfy status=merged"
1883 );
1884}
1885
1886#[tokio::test]
1887async fn list_issues_state_filter_accepts_repo_owner_state_source() {
1888 let h = Harness::new().await;
1889 let repo_owner = did("did:plc:limpet");
1890 let subject = at(&format!("at://{}", repo_owner.as_ref()));
1891 let issue_uri = at("at://did:plc:nel/sh.tangled.repo.issue/i1");
1892 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1893 h.mount(
1894 &did("did:plc:nel"),
1895 &nsid("sh.tangled.repo.issue"),
1896 &rkey("i1"),
1897 issue_body(&repo_owner, "owner closed"),
1898 )
1899 .await;
1900 h.state.issue_states.upsert(
1901 at("at://did:plc:limpet/sh.tangled.repo.issue.state/legit"),
1902 issue_uri.clone(),
1903 1_777_593_800_000_000,
1904 IssueStateKind::Closed,
1905 );
1906
1907 let app = router(h.state.clone());
1908 let (status, body) = json_response(
1909 app.oneshot(list_request(
1910 "sh.tangled.repo.listIssues",
1911 subject.as_ref(),
1912 &[("state", "closed")],
1913 ))
1914 .await
1915 .unwrap(),
1916 )
1917 .await;
1918 assert_eq!(status, StatusCode::OK);
1919 let items = body["items"].as_array().expect("items array");
1920 assert_eq!(
1921 items.len(),
1922 1,
1923 "repo-owner state record must satisfy state=closed"
1924 );
1925 assert_eq!(items[0]["state"], json!("closed"));
1926}
1927
1928#[tokio::test]
1929async fn list_issues_order_asc_returns_oldest_first() {
1930 let h = Harness::new().await;
1931 let repo = did("did:plc:limpet");
1932 let subject = at(&format!("at://{}", repo.as_ref()));
1933 let rkeys = ["a", "b", "c"];
1934 stream::iter(rkeys)
1935 .for_each(|r| {
1936 let h = &h;
1937 let subject = subject.clone();
1938 let repo = repo.clone();
1939 async move {
1940 let rk = rkey(r);
1941 let issue_uri = at(&format!(
1942 "at://did:plc:nel/sh.tangled.repo.issue/{}",
1943 rk.as_ref()
1944 ));
1945 h.add_edge(&nsid("sh.tangled.repo.issue"), &subject, &issue_uri);
1946 h.mount(
1947 &did("did:plc:nel"),
1948 &nsid("sh.tangled.repo.issue"),
1949 &rk,
1950 issue_body(&repo, &format!("issue-{}", rk.as_ref())),
1951 )
1952 .await;
1953 }
1954 })
1955 .await;
1956
1957 let app = router(h.state.clone());
1958 let (_, asc) = json_response(
1959 app.clone()
1960 .oneshot(list_request(
1961 "sh.tangled.repo.listIssues",
1962 subject.as_ref(),
1963 &[("order", "asc")],
1964 ))
1965 .await
1966 .unwrap(),
1967 )
1968 .await;
1969 let (_, desc) = json_response(
1970 app.oneshot(list_request(
1971 "sh.tangled.repo.listIssues",
1972 subject.as_ref(),
1973 &[("order", "desc")],
1974 ))
1975 .await
1976 .unwrap(),
1977 )
1978 .await;
1979 let asc_uris: Vec<_> = asc["items"]
1980 .as_array()
1981 .unwrap()
1982 .iter()
1983 .map(|i| i["uri"].as_str().unwrap().to_owned())
1984 .collect();
1985 let desc_uris: Vec<_> = desc["items"]
1986 .as_array()
1987 .unwrap()
1988 .iter()
1989 .map(|i| i["uri"].as_str().unwrap().to_owned())
1990 .collect();
1991 let mut reversed = asc_uris.clone();
1992 reversed.reverse();
1993 assert_eq!(asc_uris.len(), 3);
1994 assert_eq!(desc_uris, reversed, "desc must be exact reverse of asc");
1995}
1996
1997#[tokio::test]
1998async fn list_issues_by_state_filter_narrows_results() {
1999 let h = Harness::new().await;
2000 let author = did("did:plc:nel");
2001 let repo = did("did:plc:limpet");
2002 let open_uri = at("at://did:plc:nel/sh.tangled.repo.issue/open1");
2003 let closed_uri = at("at://did:plc:nel/sh.tangled.repo.issue/closed1");
2004 let author_subject = at(&format!("at://{}", author.as_ref()));
2005 h.edges.add(Edge {
2006 kind: nsid("sh.tangled.repo.issue.by"),
2007 subject: SubjectRef::Did(author.clone()),
2008 source: open_uri.clone(),
2009 sort_micros: next_sort_micros(),
2010 });
2011 h.edges.add(Edge {
2012 kind: nsid("sh.tangled.repo.issue.by"),
2013 subject: SubjectRef::Did(author.clone()),
2014 source: closed_uri.clone(),
2015 sort_micros: next_sort_micros(),
2016 });
2017 h.mount(
2018 &author,
2019 &nsid("sh.tangled.repo.issue"),
2020 &rkey("open1"),
2021 issue_body(&repo, "still open"),
2022 )
2023 .await;
2024 h.mount(
2025 &author,
2026 &nsid("sh.tangled.repo.issue"),
2027 &rkey("closed1"),
2028 issue_body(&repo, "shut"),
2029 )
2030 .await;
2031 h.state.issue_states.upsert(
2032 at("at://did:plc:nel/sh.tangled.repo.issue.state/s1"),
2033 closed_uri.clone(),
2034 1_777_593_800_000_000,
2035 IssueStateKind::Closed,
2036 );
2037
2038 let app = router(h.state.clone());
2039 let (status, body) = json_response(
2040 app.oneshot(list_request(
2041 "sh.tangled.repo.listIssuesBy",
2042 author_subject.as_ref(),
2043 &[("state", "closed")],
2044 ))
2045 .await
2046 .unwrap(),
2047 )
2048 .await;
2049 assert_eq!(status, StatusCode::OK);
2050 let items = body["items"].as_array().expect("items array");
2051 assert_eq!(
2052 items.len(),
2053 1,
2054 "only the closed issue survives state=closed"
2055 );
2056 assert_eq!(items[0]["uri"], json!(closed_uri.as_ref()));
2057}