This repository has no description
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_types::edges::Edge;
12use bobbin_types::ids::SubjectRef;
13use bobbin_xrpc::{AppState, router};
14use http::{Request, StatusCode};
15use jacquard_common::DefaultStr;
16use jacquard_common::types::did::Did;
17use jacquard_common::types::nsid::Nsid;
18use jacquard_common::types::recordkey::Rkey;
19use jacquard_common::types::string::AtUri;
20use serde_json::{Value, json};
21use tower::ServiceExt;
22use url::Url;
23use url::form_urlencoded::byte_serialize;
24use wiremock::matchers::{method, path, query_param};
25use wiremock::{Mock, MockServer, ResponseTemplate};
26
27const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
28const TAG_BYTES: &str = "AAAAAAAAAAAAAAAAAAAAAAAAAAA=";
29const ARTIFACT_LINK: &str = "bafkreigh2akiscaildc7gnvtklbsfhdgwz72eolmpckbqr5ej26byp3uli";
30
31fn at(s: &str) -> AtUri<DefaultStr> {
32 AtUri::new_owned(s).unwrap()
33}
34
35fn did(s: &str) -> Did<DefaultStr> {
36 Did::new_owned(s).unwrap()
37}
38
39fn rkey(s: &str) -> Rkey<DefaultStr> {
40 Rkey::new_owned(s).unwrap()
41}
42
43fn nsid(s: &'static str) -> Nsid<DefaultStr> {
44 Nsid::new_static(s).unwrap()
45}
46
47fn subj(s: &str) -> SubjectRef {
48 Did::<DefaultStr>::new_owned(s)
49 .map(SubjectRef::Did)
50 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap()))
51}
52
53struct Harness {
54 server: MockServer,
55 edges: Arc<EdgeStore>,
56 state: AppState,
57}
58
59impl Harness {
60 async fn new() -> Self {
61 let server = MockServer::start().await;
62 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default()));
63 let coverage = Arc::new(CoverageWatch::new());
64 let state = AppState::new(
65 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
66 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
67 edges.clone(),
68 Arc::new(StateIndex::new(RuntimeHasher::default())),
69 Arc::new(StateIndex::new(RuntimeHasher::default())),
70 coverage,
71 Arc::new(
72 KnotProxy::new(
73 KnotProxyConfig::default(),
74 KnotHttpConfig::default(),
75 Arc::new(SystemClock::new()),
76 RuntimeHasher::default(),
77 )
78 .unwrap(),
79 ),
80 Arc::new(
81 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
82 ) as Arc<dyn SearchReader>,
83 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
84 );
85 Self {
86 server,
87 edges,
88 state,
89 }
90 }
91
92 fn add_edge(
93 &self,
94 kind: &Nsid<DefaultStr>,
95 subject: &AtUri<DefaultStr>,
96 source: &AtUri<DefaultStr>,
97 ) {
98 static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
99 self.edges.add(Edge {
100 kind: kind.clone(),
101 subject: subj(subject.as_ref()),
102 source: source.clone(),
103 sort_micros: EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
104 });
105 }
106
107 async fn mount(
108 &self,
109 did: &Did<DefaultStr>,
110 collection: &Nsid<DefaultStr>,
111 rkey: &Rkey<DefaultStr>,
112 value: Value,
113 ) {
114 let uri = format!(
115 "at://{}/{}/{}",
116 did.as_ref(),
117 collection.as_ref(),
118 rkey.as_ref()
119 );
120 Mock::given(method("GET"))
121 .and(path("/xrpc/com.atproto.repo.getRecord"))
122 .and(query_param("repo", did.as_ref()))
123 .and(query_param("collection", collection.as_ref()))
124 .and(query_param("rkey", rkey.as_ref()))
125 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
126 "uri": uri,
127 "cid": CID,
128 "value": value,
129 })))
130 .mount(&self.server)
131 .await;
132 }
133}
134
135fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> {
136 let mut qs = format!("subject={}", encode(subject));
137 extras.iter().for_each(|(k, v)| {
138 qs.push('&');
139 qs.push_str(k);
140 qs.push('=');
141 qs.push_str(&encode(v));
142 });
143 Request::builder()
144 .uri(format!("/xrpc/{endpoint}?{qs}"))
145 .body(Body::empty())
146 .unwrap()
147}
148
149fn encode(s: &str) -> String {
150 byte_serialize(s.as_bytes()).collect()
151}
152
153async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
154 let status = resp.status();
155 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
156 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
157 (status, parsed)
158}
159
160fn label_definition_body(name: &str) -> Value {
161 json!({
162 "$type": "sh.tangled.label.definition",
163 "createdAt": "2026-05-01T00:00:00Z",
164 "name": name,
165 "scope": ["sh.tangled.repo.issue"],
166 "valueType": {"type": "boolean", "format": "any"}
167 })
168}
169
170fn label_op_body(subject: &AtUri<DefaultStr>, def_uri: &AtUri<DefaultStr>, value: &str) -> Value {
171 json!({
172 "$type": "sh.tangled.label.op",
173 "performedAt": "2026-05-01T00:00:00Z",
174 "subject": subject.as_ref(),
175 "add": [{"key": def_uri.as_ref(), "value": value}],
176 "delete": []
177 })
178}
179
180fn pipeline_body(repo_did: &Did<DefaultStr>) -> Value {
181 json!({
182 "$type": "sh.tangled.pipeline",
183 "workflows": [],
184 "triggerMetadata": {
185 "kind": "manual",
186 "repo": {
187 "did": "did:plc:teq",
188 "repoDid": repo_did.as_ref(),
189 "knot": "nel.pet",
190 "defaultBranch": "main"
191 }
192 }
193 })
194}
195
196fn pipeline_body_owner_only(owner_did: &Did<DefaultStr>) -> Value {
197 json!({
198 "$type": "sh.tangled.pipeline",
199 "workflows": [],
200 "triggerMetadata": {
201 "kind": "manual",
202 "repo": {
203 "did": owner_did.as_ref(),
204 "knot": "nel.pet",
205 "defaultBranch": "main"
206 }
207 }
208 })
209}
210
211fn pipeline_status_body(pipeline_uri: &AtUri<DefaultStr>) -> Value {
212 json!({
213 "$type": "sh.tangled.pipeline.status",
214 "createdAt": "2026-05-01T00:00:00Z",
215 "pipeline": pipeline_uri.as_ref(),
216 "workflow": pipeline_uri.as_ref(),
217 "status": "success"
218 })
219}
220
221fn artifact_body(repo_did: &Did<DefaultStr>, name: &str) -> Value {
222 json!({
223 "$type": "sh.tangled.repo.artifact",
224 "createdAt": "2026-05-01T00:00:00Z",
225 "name": name,
226 "repoDid": repo_did.as_ref(),
227 "tag": {"$bytes": TAG_BYTES},
228 "artifact": {
229 "$type": "blob",
230 "ref": {"$link": ARTIFACT_LINK},
231 "mimeType": "application/octet-stream",
232 "size": 12
233 }
234 })
235}
236
237fn knot_member_body(subject_did: &Did<DefaultStr>) -> Value {
238 json!({
239 "$type": "sh.tangled.knot.member",
240 "createdAt": "2026-05-01T00:00:00Z",
241 "subject": subject_did.as_ref(),
242 "domain": "oyster.cafe"
243 })
244}
245
246fn spindle_member_body(subject_did: &Did<DefaultStr>) -> Value {
247 json!({
248 "$type": "sh.tangled.spindle.member",
249 "createdAt": "2026-05-01T00:00:00Z",
250 "subject": subject_did.as_ref(),
251 "instance": "spin.nel.pet"
252 })
253}
254
255fn string_body(filename: &str, contents: &str) -> Value {
256 json!({
257 "$type": "sh.tangled.string",
258 "createdAt": "2026-05-01T00:00:00Z",
259 "filename": filename,
260 "description": "test fixture",
261 "contents": contents
262 })
263}
264
265#[tokio::test]
266async fn list_label_definitions_keys_on_owner_did() {
267 let h = Harness::new().await;
268 let owner = did("did:plc:abalone");
269 let rk = rkey("bug");
270 let source = at(&format!(
271 "at://{}/sh.tangled.label.definition/{}",
272 owner.as_ref(),
273 rk.as_ref()
274 ));
275 h.add_edge(
276 &nsid("sh.tangled.label.definition"),
277 &at(&format!("at://{}", owner.as_ref())),
278 &source,
279 );
280 h.mount(
281 &owner,
282 &nsid("sh.tangled.label.definition"),
283 &rk,
284 label_definition_body("bug"),
285 )
286 .await;
287
288 let app = router(h.state.clone());
289 let (status, body) = json_response(
290 app.oneshot(list_request(
291 "sh.tangled.label.listDefinitions",
292 &format!("at://{}", owner.as_ref()),
293 &[],
294 ))
295 .await
296 .unwrap(),
297 )
298 .await;
299 assert_eq!(status, StatusCode::OK);
300 let items = body["items"].as_array().unwrap();
301 assert_eq!(items.len(), 1);
302 assert_eq!(items[0]["value"]["name"], json!("bug"));
303 assert_eq!(
304 items[0]["value"]["scope"][0],
305 json!("sh.tangled.repo.issue")
306 );
307}
308
309#[tokio::test]
310async fn count_label_definitions_dedupes_per_author() {
311 let h = Harness::new().await;
312 let owner = did("did:plc:abalone");
313 let subject = at(&format!("at://{}", owner.as_ref()));
314 h.add_edge(
315 &nsid("sh.tangled.label.definition"),
316 &subject,
317 &at(&format!(
318 "at://{}/sh.tangled.label.definition/bug",
319 owner.as_ref()
320 )),
321 );
322 h.add_edge(
323 &nsid("sh.tangled.label.definition"),
324 &subject,
325 &at(&format!(
326 "at://{}/sh.tangled.label.definition/wontfix",
327 owner.as_ref()
328 )),
329 );
330
331 let app = router(h.state.clone());
332 let (_, body) = json_response(
333 app.oneshot(list_request(
334 "sh.tangled.label.countDefinitions",
335 subject.as_ref(),
336 &[],
337 ))
338 .await
339 .unwrap(),
340 )
341 .await;
342 assert_eq!(body["count"], json!(2));
343 assert_eq!(body["distinctAuthors"], json!(1));
344}
345
346#[tokio::test]
347async fn list_label_ops_accepts_issue_subject() {
348 let h = Harness::new().await;
349 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1");
350 let author = did("did:plc:nel");
351 let rk = rkey("op1");
352 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/bug");
353 h.add_edge(
354 &nsid("sh.tangled.label.op"),
355 &issue_uri,
356 &at(&format!(
357 "at://{}/sh.tangled.label.op/{}",
358 author.as_ref(),
359 rk.as_ref()
360 )),
361 );
362 h.mount(
363 &author,
364 &nsid("sh.tangled.label.op"),
365 &rk,
366 label_op_body(&issue_uri, &def_uri, "true"),
367 )
368 .await;
369
370 let app = router(h.state.clone());
371 let (status, body) = json_response(
372 app.oneshot(list_request(
373 "sh.tangled.label.listOps",
374 issue_uri.as_ref(),
375 &[],
376 ))
377 .await
378 .unwrap(),
379 )
380 .await;
381 assert_eq!(status, StatusCode::OK);
382 let items = body["items"].as_array().unwrap();
383 assert_eq!(items.len(), 1);
384 assert_eq!(items[0]["value"]["subject"], json!(issue_uri.as_ref()));
385 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref()));
386}
387
388#[tokio::test]
389async fn list_label_ops_pull_subject_round_trip() {
390 let h = Harness::new().await;
391 let pull_uri = at("at://did:plc:abalone/sh.tangled.repo.pull/p1");
392 let author = did("did:plc:bailey");
393 let rk = rkey("op1");
394 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/wontfix");
395 let source = at(&format!(
396 "at://{}/sh.tangled.label.op/{}",
397 author.as_ref(),
398 rk.as_ref()
399 ));
400 let body = label_op_body(&pull_uri, &def_uri, "true");
401 let parsed =
402 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.label.op"), body.clone())
403 .expect("parse label.op record");
404 parsed
405 .extract_edges(&source)
406 .expect("extract")
407 .into_iter()
408 .for_each(|e| h.edges.add(e));
409 h.mount(&author, &nsid("sh.tangled.label.op"), &rk, body)
410 .await;
411
412 let app = router(h.state.clone());
413 let (status, json) = json_response(
414 app.oneshot(list_request(
415 "sh.tangled.label.listOps",
416 pull_uri.as_ref(),
417 &[],
418 ))
419 .await
420 .unwrap(),
421 )
422 .await;
423 assert_eq!(status, StatusCode::OK);
424 let items = json["items"].as_array().unwrap();
425 assert_eq!(items.len(), 1);
426 assert_eq!(items[0]["value"]["subject"], json!(pull_uri.as_ref()));
427 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref()));
428}
429
430#[tokio::test]
431async fn list_label_ops_rejects_bare_did_subject() {
432 let h = Harness::new().await;
433 let app = router(h.state.clone());
434 let (status, body) = json_response(
435 app.oneshot(list_request(
436 "sh.tangled.label.listOps",
437 "at://did:plc:abalone",
438 &[],
439 ))
440 .await
441 .unwrap(),
442 )
443 .await;
444 assert_eq!(status, StatusCode::BAD_REQUEST);
445 let msg = body["message"].as_str().unwrap_or_default();
446 assert!(
447 msg.contains("sh.tangled.repo.issue") && msg.contains("sh.tangled.repo.pull"),
448 "message must list allowed collections, got {msg}"
449 );
450}
451
452#[tokio::test]
453async fn list_label_ops_rejects_unrelated_collection() {
454 let h = Harness::new().await;
455 let app = router(h.state.clone());
456 let (status, _) = json_response(
457 app.oneshot(list_request(
458 "sh.tangled.label.listOps",
459 "at://did:plc:abalone/sh.tangled.repo/r1",
460 &[],
461 ))
462 .await
463 .unwrap(),
464 )
465 .await;
466 assert_eq!(status, StatusCode::BAD_REQUEST);
467}
468
469#[tokio::test]
470async fn list_pipelines_keys_on_repo_did() {
471 let h = Harness::new().await;
472 let repo_did = did("did:plc:abalone");
473 let subject = at(&format!("at://{}", repo_did.as_ref()));
474 let spindle_did = did("did:plc:lyna");
475 let rk = rkey("pl1");
476 h.add_edge(
477 &nsid("sh.tangled.pipeline"),
478 &subject,
479 &at(&format!(
480 "at://{}/sh.tangled.pipeline/{}",
481 spindle_did.as_ref(),
482 rk.as_ref()
483 )),
484 );
485 h.mount(
486 &spindle_did,
487 &nsid("sh.tangled.pipeline"),
488 &rk,
489 pipeline_body(&repo_did),
490 )
491 .await;
492
493 let app = router(h.state.clone());
494 let (status, body) = json_response(
495 app.oneshot(list_request(
496 "sh.tangled.pipeline.listPipelines",
497 subject.as_ref(),
498 &[],
499 ))
500 .await
501 .unwrap(),
502 )
503 .await;
504 assert_eq!(status, StatusCode::OK);
505 let items = body["items"].as_array().unwrap();
506 assert_eq!(items.len(), 1);
507 assert_eq!(
508 items[0]["value"]["triggerMetadata"]["repo"]["repoDid"],
509 json!(repo_did.as_ref())
510 );
511}
512
513#[tokio::test]
514async fn count_pipelines_returns_zero_when_no_edges() {
515 let h = Harness::new().await;
516 let app = router(h.state.clone());
517 let (_, body) = json_response(
518 app.oneshot(list_request(
519 "sh.tangled.pipeline.countPipelines",
520 "at://did:plc:abalone",
521 &[],
522 ))
523 .await
524 .unwrap(),
525 )
526 .await;
527 assert_eq!(body["count"], json!(0));
528}
529
530#[tokio::test]
531async fn list_pipeline_statuses_keys_on_pipeline_uri() {
532 let h = Harness::new().await;
533 let pipeline_uri = at("at://did:plc:lyna/sh.tangled.pipeline/pl1");
534 let author = did("did:plc:bailey");
535 let rk = rkey("s1");
536 h.add_edge(
537 &nsid("sh.tangled.pipeline.status"),
538 &pipeline_uri,
539 &at(&format!(
540 "at://{}/sh.tangled.pipeline.status/{}",
541 author.as_ref(),
542 rk.as_ref()
543 )),
544 );
545 h.mount(
546 &author,
547 &nsid("sh.tangled.pipeline.status"),
548 &rk,
549 pipeline_status_body(&pipeline_uri),
550 )
551 .await;
552
553 let app = router(h.state.clone());
554 let (status, body) = json_response(
555 app.oneshot(list_request(
556 "sh.tangled.pipeline.listStatuses",
557 pipeline_uri.as_ref(),
558 &[],
559 ))
560 .await
561 .unwrap(),
562 )
563 .await;
564 assert_eq!(status, StatusCode::OK);
565 let items = body["items"].as_array().unwrap();
566 assert_eq!(items.len(), 1);
567 assert_eq!(items[0]["value"]["pipeline"], json!(pipeline_uri.as_ref()));
568 assert_eq!(items[0]["value"]["status"], json!("success"));
569}
570
571#[tokio::test]
572async fn pipeline_status_endpoint_rejects_bare_did_subject() {
573 let h = Harness::new().await;
574 let app = router(h.state.clone());
575 let (status, body) = json_response(
576 app.oneshot(list_request(
577 "sh.tangled.pipeline.listStatuses",
578 "at://did:plc:lyna",
579 &[],
580 ))
581 .await
582 .unwrap(),
583 )
584 .await;
585 assert_eq!(status, StatusCode::BAD_REQUEST);
586 assert!(
587 body["message"]
588 .as_str()
589 .unwrap_or_default()
590 .contains("sh.tangled.pipeline/<rkey>"),
591 "{body}"
592 );
593}
594
595#[tokio::test]
596async fn list_artifacts_keys_on_repo_did() {
597 let h = Harness::new().await;
598 let repo_did = did("did:plc:abalone");
599 let subject = at(&format!("at://{}", repo_did.as_ref()));
600 let owner = did("did:plc:nel");
601 let rk = rkey("a1");
602 h.add_edge(
603 &nsid("sh.tangled.repo.artifact"),
604 &subject,
605 &at(&format!(
606 "at://{}/sh.tangled.repo.artifact/{}",
607 owner.as_ref(),
608 rk.as_ref()
609 )),
610 );
611 h.mount(
612 &owner,
613 &nsid("sh.tangled.repo.artifact"),
614 &rk,
615 artifact_body(&repo_did, "out.bin"),
616 )
617 .await;
618
619 let app = router(h.state.clone());
620 let (status, body) = json_response(
621 app.oneshot(list_request(
622 "sh.tangled.repo.listArtifacts",
623 subject.as_ref(),
624 &[],
625 ))
626 .await
627 .unwrap(),
628 )
629 .await;
630 assert_eq!(status, StatusCode::OK);
631 let items = body["items"].as_array().unwrap();
632 assert_eq!(items.len(), 1);
633 assert_eq!(items[0]["value"]["name"], json!("out.bin"));
634 assert_eq!(items[0]["value"]["repoDid"], json!(repo_did.as_ref()));
635}
636
637#[tokio::test]
638async fn list_knot_members_keys_on_subject_did() {
639 let h = Harness::new().await;
640 let subject_did = did("did:plc:nel");
641 let subject = at(&format!("at://{}", subject_did.as_ref()));
642 let admin = did("did:plc:teq");
643 let rk = rkey("m1");
644 h.add_edge(
645 &nsid("sh.tangled.knot.member"),
646 &subject,
647 &at(&format!(
648 "at://{}/sh.tangled.knot.member/{}",
649 admin.as_ref(),
650 rk.as_ref()
651 )),
652 );
653 h.mount(
654 &admin,
655 &nsid("sh.tangled.knot.member"),
656 &rk,
657 knot_member_body(&subject_did),
658 )
659 .await;
660
661 let app = router(h.state.clone());
662 let (status, body) = json_response(
663 app.oneshot(list_request(
664 "sh.tangled.knot.listMembers",
665 subject.as_ref(),
666 &[],
667 ))
668 .await
669 .unwrap(),
670 )
671 .await;
672 assert_eq!(status, StatusCode::OK);
673 let items = body["items"].as_array().unwrap();
674 assert_eq!(items.len(), 1);
675 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref()));
676 assert_eq!(items[0]["value"]["domain"], json!("oyster.cafe"));
677}
678
679#[tokio::test]
680async fn list_spindle_members_keys_on_subject_did() {
681 let h = Harness::new().await;
682 let subject_did = did("did:plc:olaren");
683 let subject = at(&format!("at://{}", subject_did.as_ref()));
684 let admin = did("did:plc:teq");
685 let rk = rkey("m1");
686 h.add_edge(
687 &nsid("sh.tangled.spindle.member"),
688 &subject,
689 &at(&format!(
690 "at://{}/sh.tangled.spindle.member/{}",
691 admin.as_ref(),
692 rk.as_ref()
693 )),
694 );
695 h.mount(
696 &admin,
697 &nsid("sh.tangled.spindle.member"),
698 &rk,
699 spindle_member_body(&subject_did),
700 )
701 .await;
702
703 let app = router(h.state.clone());
704 let (status, body) = json_response(
705 app.oneshot(list_request(
706 "sh.tangled.spindle.listMembers",
707 subject.as_ref(),
708 &[],
709 ))
710 .await
711 .unwrap(),
712 )
713 .await;
714 assert_eq!(status, StatusCode::OK);
715 let items = body["items"].as_array().unwrap();
716 assert_eq!(items.len(), 1);
717 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref()));
718 assert_eq!(items[0]["value"]["instance"], json!("spin.nel.pet"));
719}
720
721#[tokio::test]
722async fn list_strings_keys_on_owner_did() {
723 let h = Harness::new().await;
724 let owner = did("did:plc:abalone");
725 let subject = at(&format!("at://{}", owner.as_ref()));
726 let rk = rkey("k1");
727 h.add_edge(
728 &nsid("sh.tangled.string"),
729 &subject,
730 &at(&format!(
731 "at://{}/sh.tangled.string/{}",
732 owner.as_ref(),
733 rk.as_ref()
734 )),
735 );
736 h.mount(
737 &owner,
738 &nsid("sh.tangled.string"),
739 &rk,
740 string_body("snippet.rs", "fn main() {}"),
741 )
742 .await;
743
744 let app = router(h.state.clone());
745 let (status, body) = json_response(
746 app.oneshot(list_request(
747 "sh.tangled.string.listStrings",
748 subject.as_ref(),
749 &[],
750 ))
751 .await
752 .unwrap(),
753 )
754 .await;
755 assert_eq!(status, StatusCode::OK);
756 let items = body["items"].as_array().unwrap();
757 assert_eq!(items.len(), 1);
758 assert_eq!(items[0]["value"]["filename"], json!("snippet.rs"));
759 assert_eq!(items[0]["value"]["contents"], json!("fn main() {}"));
760}
761
762#[tokio::test]
763async fn count_strings_dedupes_per_owner() {
764 let h = Harness::new().await;
765 let owner = did("did:plc:abalone");
766 let subject = at(&format!("at://{}", owner.as_ref()));
767 ["k1", "k2", "k3"].iter().for_each(|r| {
768 h.add_edge(
769 &nsid("sh.tangled.string"),
770 &subject,
771 &at(&format!("at://{}/sh.tangled.string/{}", owner.as_ref(), r)),
772 );
773 });
774
775 let app = router(h.state.clone());
776 let (_, body) = json_response(
777 app.oneshot(list_request(
778 "sh.tangled.string.countStrings",
779 subject.as_ref(),
780 &[],
781 ))
782 .await
783 .unwrap(),
784 )
785 .await;
786 assert_eq!(body["count"], json!(3));
787 assert_eq!(body["distinctAuthors"], json!(1));
788}
789
790#[tokio::test]
791async fn extractor_to_xrpc_round_trip_for_pipeline() {
792 let h = Harness::new().await;
793 let repo_did = did("did:plc:abalone");
794 let spindle_did = did("did:plc:lyna");
795 let rk = rkey("pl1");
796 let source = at(&format!(
797 "at://{}/sh.tangled.pipeline/{}",
798 spindle_did.as_ref(),
799 rk.as_ref()
800 ));
801 let body = pipeline_body(&repo_did);
802 let parsed =
803 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone())
804 .expect("parse pipeline record");
805 parsed
806 .extract_edges(&source)
807 .expect("extract")
808 .into_iter()
809 .for_each(|e| h.edges.add(e));
810 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body)
811 .await;
812
813 let app = router(h.state.clone());
814 let (status, json) = json_response(
815 app.oneshot(list_request(
816 "sh.tangled.pipeline.listPipelines",
817 &format!("at://{}", repo_did.as_ref()),
818 &[],
819 ))
820 .await
821 .unwrap(),
822 )
823 .await;
824 assert_eq!(
825 status,
826 StatusCode::OK,
827 "extractor key must match handler subject, body was {json}"
828 );
829 let items = json["items"].as_array().unwrap();
830 assert_eq!(items.len(), 1, "expected exactly one pipeline edge");
831}
832
833#[tokio::test]
834async fn list_pipelines_drops_records_without_resolvable_repo_did() {
835 let h = Harness::new().await;
836 let owner_did = did("did:plc:nel");
837 let spindle_did = did("did:plc:lyna");
838 let rk = rkey("pl1");
839 let source = at(&format!(
840 "at://{}/sh.tangled.pipeline/{}",
841 spindle_did.as_ref(),
842 rk.as_ref()
843 ));
844 let body = pipeline_body_owner_only(&owner_did);
845 let parsed =
846 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone())
847 .expect("parse pipeline record");
848 parsed
849 .extract_edges(&source)
850 .expect("extract")
851 .into_iter()
852 .for_each(|e| h.edges.add(e));
853 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body)
854 .await;
855
856 let app = router(h.state.clone());
857 let (status, json) = json_response(
858 app.oneshot(list_request(
859 "sh.tangled.pipeline.listPipelines",
860 &format!("at://{}", owner_did.as_ref()),
861 &[],
862 ))
863 .await
864 .unwrap(),
865 )
866 .await;
867 assert_eq!(status, StatusCode::OK, "body was {json}");
868 let items = json["items"].as_array().unwrap();
869 assert!(
870 items.is_empty(),
871 "pipeline without resolvable repoDid must be dropped, got {json}"
872 );
873}