Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_types::edges::Edge;
12use bobbin_types::ids::SubjectRef;
13use bobbin_xrpc::{AppState, router};
14use http::{Request, StatusCode};
15use jacquard_common::DefaultStr;
16use jacquard_common::types::did::Did;
17use jacquard_common::types::nsid::Nsid;
18use jacquard_common::types::recordkey::Rkey;
19use jacquard_common::types::string::AtUri;
20use serde_json::{Value, json};
21use tower::ServiceExt;
22use url::Url;
23use url::form_urlencoded::byte_serialize;
24use wiremock::matchers::{method, path, query_param};
25use wiremock::{Mock, MockServer, ResponseTemplate};
26
27const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
28const TAG_BYTES: &str = "AAAAAAAAAAAAAAAAAAAAAAAAAAA=";
29const ARTIFACT_LINK: &str = "bafkreigh2akiscaildc7gnvtklbsfhdgwz72eolmpckbqr5ej26byp3uli";
30
31fn at(s: &str) -> AtUri<DefaultStr> {
32 AtUri::new_owned(s).unwrap()
33}
34
35fn did(s: &str) -> Did<DefaultStr> {
36 Did::new_owned(s).unwrap()
37}
38
39fn rkey(s: &str) -> Rkey<DefaultStr> {
40 Rkey::new_owned(s).unwrap()
41}
42
43fn nsid(s: &'static str) -> Nsid<DefaultStr> {
44 Nsid::new_static(s).unwrap()
45}
46
47fn subj(s: &str) -> SubjectRef {
48 Did::<DefaultStr>::new_owned(s)
49 .map(SubjectRef::Did)
50 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap()))
51}
52
53struct Harness {
54 server: MockServer,
55 edges: Arc<EdgeStore>,
56 state: AppState,
57}
58
59impl Harness {
60 async fn new() -> Self {
61 let server = MockServer::start().await;
62 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default()));
63 let coverage = Arc::new(CoverageWatch::new());
64 let state = AppState::new(
65 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
66 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
67 edges.clone(),
68 Arc::new(StateIndex::new(RuntimeHasher::default())),
69 Arc::new(StateIndex::new(RuntimeHasher::default())),
70 coverage,
71 Arc::new(
72 KnotProxy::new(
73 KnotProxyConfig::default(),
74 KnotHttpConfig::default(),
75 Arc::new(SystemClock::new()),
76 RuntimeHasher::default(),
77 )
78 .unwrap(),
79 ),
80 Arc::new(
81 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
82 ) as Arc<dyn SearchReader>,
83 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
84 );
85 Self {
86 server,
87 edges,
88 state,
89 }
90 }
91
92 fn add_edge(
93 &self,
94 kind: &Nsid<DefaultStr>,
95 subject: &AtUri<DefaultStr>,
96 source: &AtUri<DefaultStr>,
97 ) {
98 static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
99 self.edges.add(Edge {
100 kind: kind.clone(),
101 subject: subj(subject.as_ref()),
102 source: source.clone(),
103 sort_micros: EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
104 });
105 }
106
107 async fn mount(
108 &self,
109 did: &Did<DefaultStr>,
110 collection: &Nsid<DefaultStr>,
111 rkey: &Rkey<DefaultStr>,
112 value: Value,
113 ) {
114 let uri = format!(
115 "at://{}/{}/{}",
116 did.as_ref(),
117 collection.as_ref(),
118 rkey.as_ref()
119 );
120 Mock::given(method("GET"))
121 .and(path("/xrpc/com.atproto.repo.getRecord"))
122 .and(query_param("repo", did.as_ref()))
123 .and(query_param("collection", collection.as_ref()))
124 .and(query_param("rkey", rkey.as_ref()))
125 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
126 "uri": uri,
127 "cid": CID,
128 "value": value,
129 })))
130 .mount(&self.server)
131 .await;
132 }
133}
134
135fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> {
136 let mut qs = format!("subject={subject}");
137 extras.iter().for_each(|(k, v)| {
138 qs.push('&');
139 qs.push_str(k);
140 qs.push('=');
141 qs.push_str(&encode(v));
142 });
143 Request::builder()
144 .uri(format!("/xrpc/{endpoint}?{qs}"))
145 .body(Body::empty())
146 .unwrap()
147}
148
149fn list_request_escaped_subject(
150 endpoint: &str,
151 subject: &str,
152 extras: &[(&str, &str)],
153) -> Request<Body> {
154 let mut qs = format!("subject={}", encode(subject));
155 extras.iter().for_each(|(k, v)| {
156 qs.push('&');
157 qs.push_str(k);
158 qs.push('=');
159 qs.push_str(&encode(v));
160 });
161 Request::builder()
162 .uri(format!("/xrpc/{endpoint}?{qs}"))
163 .body(Body::empty())
164 .unwrap()
165}
166
167fn encode(s: &str) -> String {
168 byte_serialize(s.as_bytes()).collect()
169}
170
171async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
172 let status = resp.status();
173 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
174 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
175 (status, parsed)
176}
177
178fn label_definition_body(name: &str) -> Value {
179 json!({
180 "$type": "sh.tangled.label.definition",
181 "createdAt": "2026-05-01T00:00:00Z",
182 "name": name,
183 "scope": ["sh.tangled.repo.issue"],
184 "valueType": {"type": "boolean", "format": "any"}
185 })
186}
187
188fn label_op_body(subject: &AtUri<DefaultStr>, def_uri: &AtUri<DefaultStr>, value: &str) -> Value {
189 json!({
190 "$type": "sh.tangled.label.op",
191 "performedAt": "2026-05-01T00:00:00Z",
192 "subject": subject.as_ref(),
193 "add": [{"key": def_uri.as_ref(), "value": value}],
194 "delete": []
195 })
196}
197
198fn pipeline_body(repo_did: &Did<DefaultStr>) -> Value {
199 json!({
200 "$type": "sh.tangled.pipeline",
201 "workflows": [],
202 "triggerMetadata": {
203 "kind": "manual",
204 "repo": {
205 "did": "did:plc:teq",
206 "repoDid": repo_did.as_ref(),
207 "knot": "nel.pet",
208 "defaultBranch": "main"
209 }
210 }
211 })
212}
213
214fn pipeline_body_owner_only(owner_did: &Did<DefaultStr>) -> Value {
215 json!({
216 "$type": "sh.tangled.pipeline",
217 "workflows": [],
218 "triggerMetadata": {
219 "kind": "manual",
220 "repo": {
221 "did": owner_did.as_ref(),
222 "knot": "nel.pet",
223 "defaultBranch": "main"
224 }
225 }
226 })
227}
228
229fn pipeline_status_body(pipeline_uri: &AtUri<DefaultStr>) -> Value {
230 json!({
231 "$type": "sh.tangled.pipeline.status",
232 "createdAt": "2026-05-01T00:00:00Z",
233 "pipeline": pipeline_uri.as_ref(),
234 "workflow": pipeline_uri.as_ref(),
235 "status": "success"
236 })
237}
238
239fn artifact_body(repo_did: &Did<DefaultStr>, name: &str) -> Value {
240 json!({
241 "$type": "sh.tangled.repo.artifact",
242 "createdAt": "2026-05-01T00:00:00Z",
243 "name": name,
244 "repoDid": repo_did.as_ref(),
245 "tag": {"$bytes": TAG_BYTES},
246 "artifact": {
247 "$type": "blob",
248 "ref": {"$link": ARTIFACT_LINK},
249 "mimeType": "application/octet-stream",
250 "size": 12
251 }
252 })
253}
254
255fn knot_member_body(subject_did: &Did<DefaultStr>) -> Value {
256 json!({
257 "$type": "sh.tangled.knot.member",
258 "createdAt": "2026-05-01T00:00:00Z",
259 "subject": subject_did.as_ref(),
260 "domain": "oyster.cafe"
261 })
262}
263
264fn spindle_member_body(subject_did: &Did<DefaultStr>) -> Value {
265 json!({
266 "$type": "sh.tangled.spindle.member",
267 "createdAt": "2026-05-01T00:00:00Z",
268 "subject": subject_did.as_ref(),
269 "instance": "spin.nel.pet"
270 })
271}
272
273fn string_body(filename: &str, contents: &str) -> Value {
274 json!({
275 "$type": "sh.tangled.string",
276 "createdAt": "2026-05-01T00:00:00Z",
277 "filename": filename,
278 "description": "test fixture",
279 "contents": contents
280 })
281}
282
283#[tokio::test]
284async fn list_label_definitions_keys_on_owner_did() {
285 let h = Harness::new().await;
286 let owner = did("did:plc:abalone");
287 let rk = rkey("bug");
288 let source = at(&format!(
289 "at://{}/sh.tangled.label.definition/{}",
290 owner.as_ref(),
291 rk.as_ref()
292 ));
293 h.add_edge(
294 &nsid("sh.tangled.label.definition"),
295 &at(&format!("at://{}", owner.as_ref())),
296 &source,
297 );
298 h.mount(
299 &owner,
300 &nsid("sh.tangled.label.definition"),
301 &rk,
302 label_definition_body("bug"),
303 )
304 .await;
305
306 let app = router(h.state.clone());
307 let (status, body) = json_response(
308 app.oneshot(list_request(
309 "sh.tangled.label.listDefinitions",
310 &format!("at://{}", owner.as_ref()),
311 &[],
312 ))
313 .await
314 .unwrap(),
315 )
316 .await;
317 assert_eq!(status, StatusCode::OK);
318 let items = body["items"].as_array().unwrap();
319 assert_eq!(items.len(), 1);
320 assert_eq!(items[0]["value"]["name"], json!("bug"));
321 assert_eq!(
322 items[0]["value"]["scope"][0],
323 json!("sh.tangled.repo.issue")
324 );
325}
326
327#[tokio::test]
328async fn count_label_definitions_dedupes_per_author() {
329 let h = Harness::new().await;
330 let owner = did("did:plc:abalone");
331 let subject = at(&format!("at://{}", owner.as_ref()));
332 h.add_edge(
333 &nsid("sh.tangled.label.definition"),
334 &subject,
335 &at(&format!(
336 "at://{}/sh.tangled.label.definition/bug",
337 owner.as_ref()
338 )),
339 );
340 h.add_edge(
341 &nsid("sh.tangled.label.definition"),
342 &subject,
343 &at(&format!(
344 "at://{}/sh.tangled.label.definition/wontfix",
345 owner.as_ref()
346 )),
347 );
348
349 let app = router(h.state.clone());
350 let (_, body) = json_response(
351 app.oneshot(list_request(
352 "sh.tangled.label.countDefinitions",
353 subject.as_ref(),
354 &[],
355 ))
356 .await
357 .unwrap(),
358 )
359 .await;
360 assert_eq!(body["count"], json!(2));
361 assert_eq!(body["distinctAuthors"], json!(1));
362}
363
364#[tokio::test]
365async fn list_label_ops_accepts_issue_subject() {
366 let h = Harness::new().await;
367 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1");
368 let author = did("did:plc:nel");
369 let rk = rkey("op1");
370 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/bug");
371 h.add_edge(
372 &nsid("sh.tangled.label.op"),
373 &issue_uri,
374 &at(&format!(
375 "at://{}/sh.tangled.label.op/{}",
376 author.as_ref(),
377 rk.as_ref()
378 )),
379 );
380 h.mount(
381 &author,
382 &nsid("sh.tangled.label.op"),
383 &rk,
384 label_op_body(&issue_uri, &def_uri, "true"),
385 )
386 .await;
387
388 let app = router(h.state.clone());
389 let (status, body) = json_response(
390 app.oneshot(list_request(
391 "sh.tangled.label.listOps",
392 issue_uri.as_ref(),
393 &[],
394 ))
395 .await
396 .unwrap(),
397 )
398 .await;
399 assert_eq!(status, StatusCode::OK);
400 let items = body["items"].as_array().unwrap();
401 assert_eq!(items.len(), 1);
402 assert_eq!(items[0]["value"]["subject"], json!(issue_uri.as_ref()));
403 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref()));
404}
405
406#[tokio::test]
407async fn list_ops_accepts_percent_escaped_subject() {
408 let h = Harness::new().await;
409 let issue_uri = at("at://did:plc:clam/sh.tangled.repo.issue/i1");
410 let author = did("did:plc:nel");
411 let rk = rkey("op1");
412 let def_uri = at("at://did:plc:clam/sh.tangled.label.definition/bug");
413 h.add_edge(
414 &nsid("sh.tangled.label.op"),
415 &issue_uri,
416 &at(&format!(
417 "at://{}/sh.tangled.label.op/{}",
418 author.as_ref(),
419 rk.as_ref()
420 )),
421 );
422 h.mount(
423 &author,
424 &nsid("sh.tangled.label.op"),
425 &rk,
426 label_op_body(&issue_uri, &def_uri, "true"),
427 )
428 .await;
429
430 let app = router(h.state.clone());
431 let (status, body) = json_response(
432 app.oneshot(list_request_escaped_subject(
433 "sh.tangled.label.listOps",
434 issue_uri.as_ref(),
435 &[],
436 ))
437 .await
438 .unwrap(),
439 )
440 .await;
441 assert_eq!(
442 status,
443 StatusCode::OK,
444 "escaped subject must still be accepted"
445 );
446 assert_eq!(body["items"].as_array().unwrap().len(), 1);
447}
448
449#[tokio::test]
450async fn list_label_ops_pull_subject_round_trip() {
451 let h = Harness::new().await;
452 let pull_uri = at("at://did:plc:abalone/sh.tangled.repo.pull/p1");
453 let author = did("did:plc:bailey");
454 let rk = rkey("op1");
455 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/wontfix");
456 let source = at(&format!(
457 "at://{}/sh.tangled.label.op/{}",
458 author.as_ref(),
459 rk.as_ref()
460 ));
461 let body = label_op_body(&pull_uri, &def_uri, "true");
462 let parsed =
463 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.label.op"), body.clone())
464 .expect("parse label.op record");
465 parsed
466 .extract_edges(&source)
467 .expect("extract")
468 .into_iter()
469 .for_each(|e| h.edges.add(e));
470 h.mount(&author, &nsid("sh.tangled.label.op"), &rk, body)
471 .await;
472
473 let app = router(h.state.clone());
474 let (status, json) = json_response(
475 app.oneshot(list_request(
476 "sh.tangled.label.listOps",
477 pull_uri.as_ref(),
478 &[],
479 ))
480 .await
481 .unwrap(),
482 )
483 .await;
484 assert_eq!(status, StatusCode::OK);
485 let items = json["items"].as_array().unwrap();
486 assert_eq!(items.len(), 1);
487 assert_eq!(items[0]["value"]["subject"], json!(pull_uri.as_ref()));
488 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref()));
489}
490
491#[tokio::test]
492async fn list_label_ops_rejects_bare_did_subject() {
493 let h = Harness::new().await;
494 let app = router(h.state.clone());
495 let (status, body) = json_response(
496 app.oneshot(list_request(
497 "sh.tangled.label.listOps",
498 "at://did:plc:abalone",
499 &[],
500 ))
501 .await
502 .unwrap(),
503 )
504 .await;
505 assert_eq!(status, StatusCode::BAD_REQUEST);
506 let msg = body["message"].as_str().unwrap_or_default();
507 assert!(
508 msg.contains("sh.tangled.repo.issue") && msg.contains("sh.tangled.repo.pull"),
509 "message must list allowed collections, got {msg}"
510 );
511}
512
513#[tokio::test]
514async fn list_label_ops_rejects_unrelated_collection() {
515 let h = Harness::new().await;
516 let app = router(h.state.clone());
517 let (status, _) = json_response(
518 app.oneshot(list_request(
519 "sh.tangled.label.listOps",
520 "at://did:plc:abalone/sh.tangled.repo/r1",
521 &[],
522 ))
523 .await
524 .unwrap(),
525 )
526 .await;
527 assert_eq!(status, StatusCode::BAD_REQUEST);
528}
529
530#[tokio::test]
531async fn list_pipelines_keys_on_repo_did() {
532 let h = Harness::new().await;
533 let repo_did = did("did:plc:abalone");
534 let subject = at(&format!("at://{}", repo_did.as_ref()));
535 let spindle_did = did("did:plc:lyna");
536 let rk = rkey("pl1");
537 h.add_edge(
538 &nsid("sh.tangled.pipeline"),
539 &subject,
540 &at(&format!(
541 "at://{}/sh.tangled.pipeline/{}",
542 spindle_did.as_ref(),
543 rk.as_ref()
544 )),
545 );
546 h.mount(
547 &spindle_did,
548 &nsid("sh.tangled.pipeline"),
549 &rk,
550 pipeline_body(&repo_did),
551 )
552 .await;
553
554 let app = router(h.state.clone());
555 let (status, body) = json_response(
556 app.oneshot(list_request(
557 "sh.tangled.pipeline.listPipelines",
558 subject.as_ref(),
559 &[],
560 ))
561 .await
562 .unwrap(),
563 )
564 .await;
565 assert_eq!(status, StatusCode::OK);
566 let items = body["items"].as_array().unwrap();
567 assert_eq!(items.len(), 1);
568 assert_eq!(
569 items[0]["value"]["triggerMetadata"]["repo"]["repoDid"],
570 json!(repo_did.as_ref())
571 );
572}
573
574#[tokio::test]
575async fn count_pipelines_returns_zero_when_no_edges() {
576 let h = Harness::new().await;
577 let app = router(h.state.clone());
578 let (_, body) = json_response(
579 app.oneshot(list_request(
580 "sh.tangled.pipeline.countPipelines",
581 "at://did:plc:abalone",
582 &[],
583 ))
584 .await
585 .unwrap(),
586 )
587 .await;
588 assert_eq!(body["count"], json!(0));
589}
590
591#[tokio::test]
592async fn list_pipeline_statuses_keys_on_pipeline_uri() {
593 let h = Harness::new().await;
594 let pipeline_uri = at("at://did:plc:lyna/sh.tangled.pipeline/pl1");
595 let author = did("did:plc:bailey");
596 let rk = rkey("s1");
597 h.add_edge(
598 &nsid("sh.tangled.pipeline.status"),
599 &pipeline_uri,
600 &at(&format!(
601 "at://{}/sh.tangled.pipeline.status/{}",
602 author.as_ref(),
603 rk.as_ref()
604 )),
605 );
606 h.mount(
607 &author,
608 &nsid("sh.tangled.pipeline.status"),
609 &rk,
610 pipeline_status_body(&pipeline_uri),
611 )
612 .await;
613
614 let app = router(h.state.clone());
615 let (status, body) = json_response(
616 app.oneshot(list_request(
617 "sh.tangled.pipeline.listStatuses",
618 pipeline_uri.as_ref(),
619 &[],
620 ))
621 .await
622 .unwrap(),
623 )
624 .await;
625 assert_eq!(status, StatusCode::OK);
626 let items = body["items"].as_array().unwrap();
627 assert_eq!(items.len(), 1);
628 assert_eq!(items[0]["value"]["pipeline"], json!(pipeline_uri.as_ref()));
629 assert_eq!(items[0]["value"]["status"], json!("success"));
630}
631
632#[tokio::test]
633async fn pipeline_status_endpoint_rejects_bare_did_subject() {
634 let h = Harness::new().await;
635 let app = router(h.state.clone());
636 let (status, body) = json_response(
637 app.oneshot(list_request(
638 "sh.tangled.pipeline.listStatuses",
639 "at://did:plc:lyna",
640 &[],
641 ))
642 .await
643 .unwrap(),
644 )
645 .await;
646 assert_eq!(status, StatusCode::BAD_REQUEST);
647 assert!(
648 body["message"]
649 .as_str()
650 .unwrap_or_default()
651 .contains("sh.tangled.pipeline/<rkey>"),
652 "{body}"
653 );
654}
655
656#[tokio::test]
657async fn list_artifacts_keys_on_repo_did() {
658 let h = Harness::new().await;
659 let repo_did = did("did:plc:abalone");
660 let subject = at(&format!("at://{}", repo_did.as_ref()));
661 let owner = did("did:plc:nel");
662 let rk = rkey("a1");
663 h.add_edge(
664 &nsid("sh.tangled.repo.artifact"),
665 &subject,
666 &at(&format!(
667 "at://{}/sh.tangled.repo.artifact/{}",
668 owner.as_ref(),
669 rk.as_ref()
670 )),
671 );
672 h.mount(
673 &owner,
674 &nsid("sh.tangled.repo.artifact"),
675 &rk,
676 artifact_body(&repo_did, "out.bin"),
677 )
678 .await;
679
680 let app = router(h.state.clone());
681 let (status, body) = json_response(
682 app.oneshot(list_request(
683 "sh.tangled.repo.listArtifacts",
684 subject.as_ref(),
685 &[],
686 ))
687 .await
688 .unwrap(),
689 )
690 .await;
691 assert_eq!(status, StatusCode::OK);
692 let items = body["items"].as_array().unwrap();
693 assert_eq!(items.len(), 1);
694 assert_eq!(items[0]["value"]["name"], json!("out.bin"));
695 assert_eq!(items[0]["value"]["repoDid"], json!(repo_did.as_ref()));
696}
697
698#[tokio::test]
699async fn list_knot_members_keys_on_subject_did() {
700 let h = Harness::new().await;
701 let subject_did = did("did:plc:nel");
702 let subject = at(&format!("at://{}", subject_did.as_ref()));
703 let admin = did("did:plc:teq");
704 let rk = rkey("m1");
705 h.add_edge(
706 &nsid("sh.tangled.knot.member"),
707 &subject,
708 &at(&format!(
709 "at://{}/sh.tangled.knot.member/{}",
710 admin.as_ref(),
711 rk.as_ref()
712 )),
713 );
714 h.mount(
715 &admin,
716 &nsid("sh.tangled.knot.member"),
717 &rk,
718 knot_member_body(&subject_did),
719 )
720 .await;
721
722 let app = router(h.state.clone());
723 let (status, body) = json_response(
724 app.oneshot(list_request(
725 "sh.tangled.knot.listMembers",
726 subject.as_ref(),
727 &[],
728 ))
729 .await
730 .unwrap(),
731 )
732 .await;
733 assert_eq!(status, StatusCode::OK);
734 let items = body["items"].as_array().unwrap();
735 assert_eq!(items.len(), 1);
736 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref()));
737 assert_eq!(items[0]["value"]["domain"], json!("oyster.cafe"));
738}
739
740#[tokio::test]
741async fn list_spindle_members_keys_on_subject_did() {
742 let h = Harness::new().await;
743 let subject_did = did("did:plc:olaren");
744 let subject = at(&format!("at://{}", subject_did.as_ref()));
745 let admin = did("did:plc:teq");
746 let rk = rkey("m1");
747 h.add_edge(
748 &nsid("sh.tangled.spindle.member"),
749 &subject,
750 &at(&format!(
751 "at://{}/sh.tangled.spindle.member/{}",
752 admin.as_ref(),
753 rk.as_ref()
754 )),
755 );
756 h.mount(
757 &admin,
758 &nsid("sh.tangled.spindle.member"),
759 &rk,
760 spindle_member_body(&subject_did),
761 )
762 .await;
763
764 let app = router(h.state.clone());
765 let (status, body) = json_response(
766 app.oneshot(list_request(
767 "sh.tangled.spindle.listMembers",
768 subject.as_ref(),
769 &[],
770 ))
771 .await
772 .unwrap(),
773 )
774 .await;
775 assert_eq!(status, StatusCode::OK);
776 let items = body["items"].as_array().unwrap();
777 assert_eq!(items.len(), 1);
778 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref()));
779 assert_eq!(items[0]["value"]["instance"], json!("spin.nel.pet"));
780}
781
782#[tokio::test]
783async fn list_strings_keys_on_owner_did() {
784 let h = Harness::new().await;
785 let owner = did("did:plc:abalone");
786 let subject = at(&format!("at://{}", owner.as_ref()));
787 let rk = rkey("k1");
788 h.add_edge(
789 &nsid("sh.tangled.string"),
790 &subject,
791 &at(&format!(
792 "at://{}/sh.tangled.string/{}",
793 owner.as_ref(),
794 rk.as_ref()
795 )),
796 );
797 h.mount(
798 &owner,
799 &nsid("sh.tangled.string"),
800 &rk,
801 string_body("snippet.rs", "fn main() {}"),
802 )
803 .await;
804
805 let app = router(h.state.clone());
806 let (status, body) = json_response(
807 app.oneshot(list_request(
808 "sh.tangled.string.listStrings",
809 subject.as_ref(),
810 &[],
811 ))
812 .await
813 .unwrap(),
814 )
815 .await;
816 assert_eq!(status, StatusCode::OK);
817 let items = body["items"].as_array().unwrap();
818 assert_eq!(items.len(), 1);
819 assert_eq!(items[0]["value"]["filename"], json!("snippet.rs"));
820 assert_eq!(items[0]["value"]["contents"], json!("fn main() {}"));
821}
822
823#[tokio::test]
824async fn count_strings_dedupes_per_owner() {
825 let h = Harness::new().await;
826 let owner = did("did:plc:abalone");
827 let subject = at(&format!("at://{}", owner.as_ref()));
828 ["k1", "k2", "k3"].iter().for_each(|r| {
829 h.add_edge(
830 &nsid("sh.tangled.string"),
831 &subject,
832 &at(&format!("at://{}/sh.tangled.string/{}", owner.as_ref(), r)),
833 );
834 });
835
836 let app = router(h.state.clone());
837 let (_, body) = json_response(
838 app.oneshot(list_request(
839 "sh.tangled.string.countStrings",
840 subject.as_ref(),
841 &[],
842 ))
843 .await
844 .unwrap(),
845 )
846 .await;
847 assert_eq!(body["count"], json!(3));
848 assert_eq!(body["distinctAuthors"], json!(1));
849}
850
851#[tokio::test]
852async fn extractor_to_xrpc_round_trip_for_pipeline() {
853 let h = Harness::new().await;
854 let repo_did = did("did:plc:abalone");
855 let spindle_did = did("did:plc:lyna");
856 let rk = rkey("pl1");
857 let source = at(&format!(
858 "at://{}/sh.tangled.pipeline/{}",
859 spindle_did.as_ref(),
860 rk.as_ref()
861 ));
862 let body = pipeline_body(&repo_did);
863 let parsed =
864 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone())
865 .expect("parse pipeline record");
866 parsed
867 .extract_edges(&source)
868 .expect("extract")
869 .into_iter()
870 .for_each(|e| h.edges.add(e));
871 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body)
872 .await;
873
874 let app = router(h.state.clone());
875 let (status, json) = json_response(
876 app.oneshot(list_request(
877 "sh.tangled.pipeline.listPipelines",
878 &format!("at://{}", repo_did.as_ref()),
879 &[],
880 ))
881 .await
882 .unwrap(),
883 )
884 .await;
885 assert_eq!(
886 status,
887 StatusCode::OK,
888 "extractor key must match handler subject, body was {json}"
889 );
890 let items = json["items"].as_array().unwrap();
891 assert_eq!(items.len(), 1, "expected exactly one pipeline edge");
892}
893
894#[tokio::test]
895async fn list_pipelines_drops_records_without_resolvable_repo_did() {
896 let h = Harness::new().await;
897 let owner_did = did("did:plc:nel");
898 let spindle_did = did("did:plc:lyna");
899 let rk = rkey("pl1");
900 let source = at(&format!(
901 "at://{}/sh.tangled.pipeline/{}",
902 spindle_did.as_ref(),
903 rk.as_ref()
904 ));
905 let body = pipeline_body_owner_only(&owner_did);
906 let parsed =
907 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone())
908 .expect("parse pipeline record");
909 parsed
910 .extract_edges(&source)
911 .expect("extract")
912 .into_iter()
913 .for_each(|e| h.edges.add(e));
914 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body)
915 .await;
916
917 let app = router(h.state.clone());
918 let (status, json) = json_response(
919 app.oneshot(list_request(
920 "sh.tangled.pipeline.listPipelines",
921 &format!("at://{}", owner_did.as_ref()),
922 &[],
923 ))
924 .await
925 .unwrap(),
926 )
927 .await;
928 assert_eq!(status, StatusCode::OK, "body was {json}");
929 let items = json["items"].as_array().unwrap();
930 assert!(
931 items.is_empty(),
932 "pipeline without resolvable repoDid must be dropped, got {json}"
933 );
934}