Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_types::edges::Edge; 12use bobbin_types::ids::SubjectRef; 13use bobbin_xrpc::{AppState, router}; 14use http::{Request, StatusCode}; 15use jacquard_common::DefaultStr; 16use jacquard_common::types::did::Did; 17use jacquard_common::types::nsid::Nsid; 18use jacquard_common::types::recordkey::Rkey; 19use jacquard_common::types::string::AtUri; 20use serde_json::{Value, json}; 21use tower::ServiceExt; 22use url::Url; 23use url::form_urlencoded::byte_serialize; 24use wiremock::matchers::{method, path, query_param}; 25use wiremock::{Mock, MockServer, ResponseTemplate}; 26 27const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 28const TAG_BYTES: &str = "AAAAAAAAAAAAAAAAAAAAAAAAAAA="; 29const ARTIFACT_LINK: &str = "bafkreigh2akiscaildc7gnvtklbsfhdgwz72eolmpckbqr5ej26byp3uli"; 30 31fn at(s: &str) -> AtUri<DefaultStr> { 32 AtUri::new_owned(s).unwrap() 33} 34 35fn did(s: &str) -> Did<DefaultStr> { 36 Did::new_owned(s).unwrap() 37} 38 39fn rkey(s: &str) -> Rkey<DefaultStr> { 40 Rkey::new_owned(s).unwrap() 41} 42 43fn nsid(s: &'static str) -> Nsid<DefaultStr> { 44 Nsid::new_static(s).unwrap() 45} 46 47fn subj(s: &str) -> SubjectRef { 48 Did::<DefaultStr>::new_owned(s) 49 .map(SubjectRef::Did) 50 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap())) 51} 52 53struct Harness { 54 server: MockServer, 55 edges: Arc<EdgeStore>, 56 state: AppState, 57} 58 59impl Harness { 60 async fn new() -> Self { 61 let server = MockServer::start().await; 62 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default())); 63 let coverage = Arc::new(CoverageWatch::new()); 64 let state = AppState::new( 65 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 66 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 67 edges.clone(), 68 Arc::new(StateIndex::new(RuntimeHasher::default())), 69 Arc::new(StateIndex::new(RuntimeHasher::default())), 70 coverage, 71 Arc::new( 72 KnotProxy::new( 73 KnotProxyConfig::default(), 74 KnotHttpConfig::default(), 75 Arc::new(SystemClock::new()), 76 RuntimeHasher::default(), 77 ) 78 .unwrap(), 79 ), 80 Arc::new( 81 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 82 ) as Arc<dyn SearchReader>, 83 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 84 ); 85 Self { 86 server, 87 edges, 88 state, 89 } 90 } 91 92 fn add_edge( 93 &self, 94 kind: &Nsid<DefaultStr>, 95 subject: &AtUri<DefaultStr>, 96 source: &AtUri<DefaultStr>, 97 ) { 98 static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); 99 self.edges.add(Edge { 100 kind: kind.clone(), 101 subject: subj(subject.as_ref()), 102 source: source.clone(), 103 sort_micros: EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed), 104 }); 105 } 106 107 async fn mount( 108 &self, 109 did: &Did<DefaultStr>, 110 collection: &Nsid<DefaultStr>, 111 rkey: &Rkey<DefaultStr>, 112 value: Value, 113 ) { 114 let uri = format!( 115 "at://{}/{}/{}", 116 did.as_ref(), 117 collection.as_ref(), 118 rkey.as_ref() 119 ); 120 Mock::given(method("GET")) 121 .and(path("/xrpc/com.atproto.repo.getRecord")) 122 .and(query_param("repo", did.as_ref())) 123 .and(query_param("collection", collection.as_ref())) 124 .and(query_param("rkey", rkey.as_ref())) 125 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 126 "uri": uri, 127 "cid": CID, 128 "value": value, 129 }))) 130 .mount(&self.server) 131 .await; 132 } 133} 134 135fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> { 136 let mut qs = format!("subject={subject}"); 137 extras.iter().for_each(|(k, v)| { 138 qs.push('&'); 139 qs.push_str(k); 140 qs.push('='); 141 qs.push_str(&encode(v)); 142 }); 143 Request::builder() 144 .uri(format!("/xrpc/{endpoint}?{qs}")) 145 .body(Body::empty()) 146 .unwrap() 147} 148 149fn list_request_escaped_subject( 150 endpoint: &str, 151 subject: &str, 152 extras: &[(&str, &str)], 153) -> Request<Body> { 154 let mut qs = format!("subject={}", encode(subject)); 155 extras.iter().for_each(|(k, v)| { 156 qs.push('&'); 157 qs.push_str(k); 158 qs.push('='); 159 qs.push_str(&encode(v)); 160 }); 161 Request::builder() 162 .uri(format!("/xrpc/{endpoint}?{qs}")) 163 .body(Body::empty()) 164 .unwrap() 165} 166 167fn encode(s: &str) -> String { 168 byte_serialize(s.as_bytes()).collect() 169} 170 171async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 172 let status = resp.status(); 173 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 174 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 175 (status, parsed) 176} 177 178fn label_definition_body(name: &str) -> Value { 179 json!({ 180 "$type": "sh.tangled.label.definition", 181 "createdAt": "2026-05-01T00:00:00Z", 182 "name": name, 183 "scope": ["sh.tangled.repo.issue"], 184 "valueType": {"type": "boolean", "format": "any"} 185 }) 186} 187 188fn label_op_body(subject: &AtUri<DefaultStr>, def_uri: &AtUri<DefaultStr>, value: &str) -> Value { 189 json!({ 190 "$type": "sh.tangled.label.op", 191 "performedAt": "2026-05-01T00:00:00Z", 192 "subject": subject.as_ref(), 193 "add": [{"key": def_uri.as_ref(), "value": value}], 194 "delete": [] 195 }) 196} 197 198fn pipeline_body(repo_did: &Did<DefaultStr>) -> Value { 199 json!({ 200 "$type": "sh.tangled.pipeline", 201 "workflows": [], 202 "triggerMetadata": { 203 "kind": "manual", 204 "repo": { 205 "did": "did:plc:teq", 206 "repoDid": repo_did.as_ref(), 207 "knot": "nel.pet", 208 "defaultBranch": "main" 209 } 210 } 211 }) 212} 213 214fn pipeline_body_owner_only(owner_did: &Did<DefaultStr>) -> Value { 215 json!({ 216 "$type": "sh.tangled.pipeline", 217 "workflows": [], 218 "triggerMetadata": { 219 "kind": "manual", 220 "repo": { 221 "did": owner_did.as_ref(), 222 "knot": "nel.pet", 223 "defaultBranch": "main" 224 } 225 } 226 }) 227} 228 229fn pipeline_status_body(pipeline_uri: &AtUri<DefaultStr>) -> Value { 230 json!({ 231 "$type": "sh.tangled.pipeline.status", 232 "createdAt": "2026-05-01T00:00:00Z", 233 "pipeline": pipeline_uri.as_ref(), 234 "workflow": pipeline_uri.as_ref(), 235 "status": "success" 236 }) 237} 238 239fn artifact_body(repo_did: &Did<DefaultStr>, name: &str) -> Value { 240 json!({ 241 "$type": "sh.tangled.repo.artifact", 242 "createdAt": "2026-05-01T00:00:00Z", 243 "name": name, 244 "repoDid": repo_did.as_ref(), 245 "tag": {"$bytes": TAG_BYTES}, 246 "artifact": { 247 "$type": "blob", 248 "ref": {"$link": ARTIFACT_LINK}, 249 "mimeType": "application/octet-stream", 250 "size": 12 251 } 252 }) 253} 254 255fn knot_member_body(subject_did: &Did<DefaultStr>) -> Value { 256 json!({ 257 "$type": "sh.tangled.knot.member", 258 "createdAt": "2026-05-01T00:00:00Z", 259 "subject": subject_did.as_ref(), 260 "domain": "oyster.cafe" 261 }) 262} 263 264fn spindle_member_body(subject_did: &Did<DefaultStr>) -> Value { 265 json!({ 266 "$type": "sh.tangled.spindle.member", 267 "createdAt": "2026-05-01T00:00:00Z", 268 "subject": subject_did.as_ref(), 269 "instance": "spin.nel.pet" 270 }) 271} 272 273fn string_body(filename: &str, contents: &str) -> Value { 274 json!({ 275 "$type": "sh.tangled.string", 276 "createdAt": "2026-05-01T00:00:00Z", 277 "filename": filename, 278 "description": "test fixture", 279 "contents": contents 280 }) 281} 282 283#[tokio::test] 284async fn list_label_definitions_keys_on_owner_did() { 285 let h = Harness::new().await; 286 let owner = did("did:plc:abalone"); 287 let rk = rkey("bug"); 288 let source = at(&format!( 289 "at://{}/sh.tangled.label.definition/{}", 290 owner.as_ref(), 291 rk.as_ref() 292 )); 293 h.add_edge( 294 &nsid("sh.tangled.label.definition"), 295 &at(&format!("at://{}", owner.as_ref())), 296 &source, 297 ); 298 h.mount( 299 &owner, 300 &nsid("sh.tangled.label.definition"), 301 &rk, 302 label_definition_body("bug"), 303 ) 304 .await; 305 306 let app = router(h.state.clone()); 307 let (status, body) = json_response( 308 app.oneshot(list_request( 309 "sh.tangled.label.listDefinitions", 310 &format!("at://{}", owner.as_ref()), 311 &[], 312 )) 313 .await 314 .unwrap(), 315 ) 316 .await; 317 assert_eq!(status, StatusCode::OK); 318 let items = body["items"].as_array().unwrap(); 319 assert_eq!(items.len(), 1); 320 assert_eq!(items[0]["value"]["name"], json!("bug")); 321 assert_eq!( 322 items[0]["value"]["scope"][0], 323 json!("sh.tangled.repo.issue") 324 ); 325} 326 327#[tokio::test] 328async fn count_label_definitions_dedupes_per_author() { 329 let h = Harness::new().await; 330 let owner = did("did:plc:abalone"); 331 let subject = at(&format!("at://{}", owner.as_ref())); 332 h.add_edge( 333 &nsid("sh.tangled.label.definition"), 334 &subject, 335 &at(&format!( 336 "at://{}/sh.tangled.label.definition/bug", 337 owner.as_ref() 338 )), 339 ); 340 h.add_edge( 341 &nsid("sh.tangled.label.definition"), 342 &subject, 343 &at(&format!( 344 "at://{}/sh.tangled.label.definition/wontfix", 345 owner.as_ref() 346 )), 347 ); 348 349 let app = router(h.state.clone()); 350 let (_, body) = json_response( 351 app.oneshot(list_request( 352 "sh.tangled.label.countDefinitions", 353 subject.as_ref(), 354 &[], 355 )) 356 .await 357 .unwrap(), 358 ) 359 .await; 360 assert_eq!(body["count"], json!(2)); 361 assert_eq!(body["distinctAuthors"], json!(1)); 362} 363 364#[tokio::test] 365async fn list_label_ops_accepts_issue_subject() { 366 let h = Harness::new().await; 367 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1"); 368 let author = did("did:plc:nel"); 369 let rk = rkey("op1"); 370 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/bug"); 371 h.add_edge( 372 &nsid("sh.tangled.label.op"), 373 &issue_uri, 374 &at(&format!( 375 "at://{}/sh.tangled.label.op/{}", 376 author.as_ref(), 377 rk.as_ref() 378 )), 379 ); 380 h.mount( 381 &author, 382 &nsid("sh.tangled.label.op"), 383 &rk, 384 label_op_body(&issue_uri, &def_uri, "true"), 385 ) 386 .await; 387 388 let app = router(h.state.clone()); 389 let (status, body) = json_response( 390 app.oneshot(list_request( 391 "sh.tangled.label.listOps", 392 issue_uri.as_ref(), 393 &[], 394 )) 395 .await 396 .unwrap(), 397 ) 398 .await; 399 assert_eq!(status, StatusCode::OK); 400 let items = body["items"].as_array().unwrap(); 401 assert_eq!(items.len(), 1); 402 assert_eq!(items[0]["value"]["subject"], json!(issue_uri.as_ref())); 403 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref())); 404} 405 406#[tokio::test] 407async fn list_ops_accepts_percent_escaped_subject() { 408 let h = Harness::new().await; 409 let issue_uri = at("at://did:plc:clam/sh.tangled.repo.issue/i1"); 410 let author = did("did:plc:nel"); 411 let rk = rkey("op1"); 412 let def_uri = at("at://did:plc:clam/sh.tangled.label.definition/bug"); 413 h.add_edge( 414 &nsid("sh.tangled.label.op"), 415 &issue_uri, 416 &at(&format!( 417 "at://{}/sh.tangled.label.op/{}", 418 author.as_ref(), 419 rk.as_ref() 420 )), 421 ); 422 h.mount( 423 &author, 424 &nsid("sh.tangled.label.op"), 425 &rk, 426 label_op_body(&issue_uri, &def_uri, "true"), 427 ) 428 .await; 429 430 let app = router(h.state.clone()); 431 let (status, body) = json_response( 432 app.oneshot(list_request_escaped_subject( 433 "sh.tangled.label.listOps", 434 issue_uri.as_ref(), 435 &[], 436 )) 437 .await 438 .unwrap(), 439 ) 440 .await; 441 assert_eq!( 442 status, 443 StatusCode::OK, 444 "escaped subject must still be accepted" 445 ); 446 assert_eq!(body["items"].as_array().unwrap().len(), 1); 447} 448 449#[tokio::test] 450async fn list_label_ops_pull_subject_round_trip() { 451 let h = Harness::new().await; 452 let pull_uri = at("at://did:plc:abalone/sh.tangled.repo.pull/p1"); 453 let author = did("did:plc:bailey"); 454 let rk = rkey("op1"); 455 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/wontfix"); 456 let source = at(&format!( 457 "at://{}/sh.tangled.label.op/{}", 458 author.as_ref(), 459 rk.as_ref() 460 )); 461 let body = label_op_body(&pull_uri, &def_uri, "true"); 462 let parsed = 463 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.label.op"), body.clone()) 464 .expect("parse label.op record"); 465 parsed 466 .extract_edges(&source) 467 .expect("extract") 468 .into_iter() 469 .for_each(|e| h.edges.add(e)); 470 h.mount(&author, &nsid("sh.tangled.label.op"), &rk, body) 471 .await; 472 473 let app = router(h.state.clone()); 474 let (status, json) = json_response( 475 app.oneshot(list_request( 476 "sh.tangled.label.listOps", 477 pull_uri.as_ref(), 478 &[], 479 )) 480 .await 481 .unwrap(), 482 ) 483 .await; 484 assert_eq!(status, StatusCode::OK); 485 let items = json["items"].as_array().unwrap(); 486 assert_eq!(items.len(), 1); 487 assert_eq!(items[0]["value"]["subject"], json!(pull_uri.as_ref())); 488 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref())); 489} 490 491#[tokio::test] 492async fn list_label_ops_rejects_bare_did_subject() { 493 let h = Harness::new().await; 494 let app = router(h.state.clone()); 495 let (status, body) = json_response( 496 app.oneshot(list_request( 497 "sh.tangled.label.listOps", 498 "at://did:plc:abalone", 499 &[], 500 )) 501 .await 502 .unwrap(), 503 ) 504 .await; 505 assert_eq!(status, StatusCode::BAD_REQUEST); 506 let msg = body["message"].as_str().unwrap_or_default(); 507 assert!( 508 msg.contains("sh.tangled.repo.issue") && msg.contains("sh.tangled.repo.pull"), 509 "message must list allowed collections, got {msg}" 510 ); 511} 512 513#[tokio::test] 514async fn list_label_ops_rejects_unrelated_collection() { 515 let h = Harness::new().await; 516 let app = router(h.state.clone()); 517 let (status, _) = json_response( 518 app.oneshot(list_request( 519 "sh.tangled.label.listOps", 520 "at://did:plc:abalone/sh.tangled.repo/r1", 521 &[], 522 )) 523 .await 524 .unwrap(), 525 ) 526 .await; 527 assert_eq!(status, StatusCode::BAD_REQUEST); 528} 529 530#[tokio::test] 531async fn list_pipelines_keys_on_repo_did() { 532 let h = Harness::new().await; 533 let repo_did = did("did:plc:abalone"); 534 let subject = at(&format!("at://{}", repo_did.as_ref())); 535 let spindle_did = did("did:plc:lyna"); 536 let rk = rkey("pl1"); 537 h.add_edge( 538 &nsid("sh.tangled.pipeline"), 539 &subject, 540 &at(&format!( 541 "at://{}/sh.tangled.pipeline/{}", 542 spindle_did.as_ref(), 543 rk.as_ref() 544 )), 545 ); 546 h.mount( 547 &spindle_did, 548 &nsid("sh.tangled.pipeline"), 549 &rk, 550 pipeline_body(&repo_did), 551 ) 552 .await; 553 554 let app = router(h.state.clone()); 555 let (status, body) = json_response( 556 app.oneshot(list_request( 557 "sh.tangled.pipeline.listPipelines", 558 subject.as_ref(), 559 &[], 560 )) 561 .await 562 .unwrap(), 563 ) 564 .await; 565 assert_eq!(status, StatusCode::OK); 566 let items = body["items"].as_array().unwrap(); 567 assert_eq!(items.len(), 1); 568 assert_eq!( 569 items[0]["value"]["triggerMetadata"]["repo"]["repoDid"], 570 json!(repo_did.as_ref()) 571 ); 572} 573 574#[tokio::test] 575async fn count_pipelines_returns_zero_when_no_edges() { 576 let h = Harness::new().await; 577 let app = router(h.state.clone()); 578 let (_, body) = json_response( 579 app.oneshot(list_request( 580 "sh.tangled.pipeline.countPipelines", 581 "at://did:plc:abalone", 582 &[], 583 )) 584 .await 585 .unwrap(), 586 ) 587 .await; 588 assert_eq!(body["count"], json!(0)); 589} 590 591#[tokio::test] 592async fn list_pipeline_statuses_keys_on_pipeline_uri() { 593 let h = Harness::new().await; 594 let pipeline_uri = at("at://did:plc:lyna/sh.tangled.pipeline/pl1"); 595 let author = did("did:plc:bailey"); 596 let rk = rkey("s1"); 597 h.add_edge( 598 &nsid("sh.tangled.pipeline.status"), 599 &pipeline_uri, 600 &at(&format!( 601 "at://{}/sh.tangled.pipeline.status/{}", 602 author.as_ref(), 603 rk.as_ref() 604 )), 605 ); 606 h.mount( 607 &author, 608 &nsid("sh.tangled.pipeline.status"), 609 &rk, 610 pipeline_status_body(&pipeline_uri), 611 ) 612 .await; 613 614 let app = router(h.state.clone()); 615 let (status, body) = json_response( 616 app.oneshot(list_request( 617 "sh.tangled.pipeline.listStatuses", 618 pipeline_uri.as_ref(), 619 &[], 620 )) 621 .await 622 .unwrap(), 623 ) 624 .await; 625 assert_eq!(status, StatusCode::OK); 626 let items = body["items"].as_array().unwrap(); 627 assert_eq!(items.len(), 1); 628 assert_eq!(items[0]["value"]["pipeline"], json!(pipeline_uri.as_ref())); 629 assert_eq!(items[0]["value"]["status"], json!("success")); 630} 631 632#[tokio::test] 633async fn pipeline_status_endpoint_rejects_bare_did_subject() { 634 let h = Harness::new().await; 635 let app = router(h.state.clone()); 636 let (status, body) = json_response( 637 app.oneshot(list_request( 638 "sh.tangled.pipeline.listStatuses", 639 "at://did:plc:lyna", 640 &[], 641 )) 642 .await 643 .unwrap(), 644 ) 645 .await; 646 assert_eq!(status, StatusCode::BAD_REQUEST); 647 assert!( 648 body["message"] 649 .as_str() 650 .unwrap_or_default() 651 .contains("sh.tangled.pipeline/<rkey>"), 652 "{body}" 653 ); 654} 655 656#[tokio::test] 657async fn list_artifacts_keys_on_repo_did() { 658 let h = Harness::new().await; 659 let repo_did = did("did:plc:abalone"); 660 let subject = at(&format!("at://{}", repo_did.as_ref())); 661 let owner = did("did:plc:nel"); 662 let rk = rkey("a1"); 663 h.add_edge( 664 &nsid("sh.tangled.repo.artifact"), 665 &subject, 666 &at(&format!( 667 "at://{}/sh.tangled.repo.artifact/{}", 668 owner.as_ref(), 669 rk.as_ref() 670 )), 671 ); 672 h.mount( 673 &owner, 674 &nsid("sh.tangled.repo.artifact"), 675 &rk, 676 artifact_body(&repo_did, "out.bin"), 677 ) 678 .await; 679 680 let app = router(h.state.clone()); 681 let (status, body) = json_response( 682 app.oneshot(list_request( 683 "sh.tangled.repo.listArtifacts", 684 subject.as_ref(), 685 &[], 686 )) 687 .await 688 .unwrap(), 689 ) 690 .await; 691 assert_eq!(status, StatusCode::OK); 692 let items = body["items"].as_array().unwrap(); 693 assert_eq!(items.len(), 1); 694 assert_eq!(items[0]["value"]["name"], json!("out.bin")); 695 assert_eq!(items[0]["value"]["repoDid"], json!(repo_did.as_ref())); 696} 697 698#[tokio::test] 699async fn list_knot_members_keys_on_subject_did() { 700 let h = Harness::new().await; 701 let subject_did = did("did:plc:nel"); 702 let subject = at(&format!("at://{}", subject_did.as_ref())); 703 let admin = did("did:plc:teq"); 704 let rk = rkey("m1"); 705 h.add_edge( 706 &nsid("sh.tangled.knot.member"), 707 &subject, 708 &at(&format!( 709 "at://{}/sh.tangled.knot.member/{}", 710 admin.as_ref(), 711 rk.as_ref() 712 )), 713 ); 714 h.mount( 715 &admin, 716 &nsid("sh.tangled.knot.member"), 717 &rk, 718 knot_member_body(&subject_did), 719 ) 720 .await; 721 722 let app = router(h.state.clone()); 723 let (status, body) = json_response( 724 app.oneshot(list_request( 725 "sh.tangled.knot.listMembers", 726 subject.as_ref(), 727 &[], 728 )) 729 .await 730 .unwrap(), 731 ) 732 .await; 733 assert_eq!(status, StatusCode::OK); 734 let items = body["items"].as_array().unwrap(); 735 assert_eq!(items.len(), 1); 736 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref())); 737 assert_eq!(items[0]["value"]["domain"], json!("oyster.cafe")); 738} 739 740#[tokio::test] 741async fn list_spindle_members_keys_on_subject_did() { 742 let h = Harness::new().await; 743 let subject_did = did("did:plc:olaren"); 744 let subject = at(&format!("at://{}", subject_did.as_ref())); 745 let admin = did("did:plc:teq"); 746 let rk = rkey("m1"); 747 h.add_edge( 748 &nsid("sh.tangled.spindle.member"), 749 &subject, 750 &at(&format!( 751 "at://{}/sh.tangled.spindle.member/{}", 752 admin.as_ref(), 753 rk.as_ref() 754 )), 755 ); 756 h.mount( 757 &admin, 758 &nsid("sh.tangled.spindle.member"), 759 &rk, 760 spindle_member_body(&subject_did), 761 ) 762 .await; 763 764 let app = router(h.state.clone()); 765 let (status, body) = json_response( 766 app.oneshot(list_request( 767 "sh.tangled.spindle.listMembers", 768 subject.as_ref(), 769 &[], 770 )) 771 .await 772 .unwrap(), 773 ) 774 .await; 775 assert_eq!(status, StatusCode::OK); 776 let items = body["items"].as_array().unwrap(); 777 assert_eq!(items.len(), 1); 778 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref())); 779 assert_eq!(items[0]["value"]["instance"], json!("spin.nel.pet")); 780} 781 782#[tokio::test] 783async fn list_strings_keys_on_owner_did() { 784 let h = Harness::new().await; 785 let owner = did("did:plc:abalone"); 786 let subject = at(&format!("at://{}", owner.as_ref())); 787 let rk = rkey("k1"); 788 h.add_edge( 789 &nsid("sh.tangled.string"), 790 &subject, 791 &at(&format!( 792 "at://{}/sh.tangled.string/{}", 793 owner.as_ref(), 794 rk.as_ref() 795 )), 796 ); 797 h.mount( 798 &owner, 799 &nsid("sh.tangled.string"), 800 &rk, 801 string_body("snippet.rs", "fn main() {}"), 802 ) 803 .await; 804 805 let app = router(h.state.clone()); 806 let (status, body) = json_response( 807 app.oneshot(list_request( 808 "sh.tangled.string.listStrings", 809 subject.as_ref(), 810 &[], 811 )) 812 .await 813 .unwrap(), 814 ) 815 .await; 816 assert_eq!(status, StatusCode::OK); 817 let items = body["items"].as_array().unwrap(); 818 assert_eq!(items.len(), 1); 819 assert_eq!(items[0]["value"]["filename"], json!("snippet.rs")); 820 assert_eq!(items[0]["value"]["contents"], json!("fn main() {}")); 821} 822 823#[tokio::test] 824async fn count_strings_dedupes_per_owner() { 825 let h = Harness::new().await; 826 let owner = did("did:plc:abalone"); 827 let subject = at(&format!("at://{}", owner.as_ref())); 828 ["k1", "k2", "k3"].iter().for_each(|r| { 829 h.add_edge( 830 &nsid("sh.tangled.string"), 831 &subject, 832 &at(&format!("at://{}/sh.tangled.string/{}", owner.as_ref(), r)), 833 ); 834 }); 835 836 let app = router(h.state.clone()); 837 let (_, body) = json_response( 838 app.oneshot(list_request( 839 "sh.tangled.string.countStrings", 840 subject.as_ref(), 841 &[], 842 )) 843 .await 844 .unwrap(), 845 ) 846 .await; 847 assert_eq!(body["count"], json!(3)); 848 assert_eq!(body["distinctAuthors"], json!(1)); 849} 850 851#[tokio::test] 852async fn extractor_to_xrpc_round_trip_for_pipeline() { 853 let h = Harness::new().await; 854 let repo_did = did("did:plc:abalone"); 855 let spindle_did = did("did:plc:lyna"); 856 let rk = rkey("pl1"); 857 let source = at(&format!( 858 "at://{}/sh.tangled.pipeline/{}", 859 spindle_did.as_ref(), 860 rk.as_ref() 861 )); 862 let body = pipeline_body(&repo_did); 863 let parsed = 864 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone()) 865 .expect("parse pipeline record"); 866 parsed 867 .extract_edges(&source) 868 .expect("extract") 869 .into_iter() 870 .for_each(|e| h.edges.add(e)); 871 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body) 872 .await; 873 874 let app = router(h.state.clone()); 875 let (status, json) = json_response( 876 app.oneshot(list_request( 877 "sh.tangled.pipeline.listPipelines", 878 &format!("at://{}", repo_did.as_ref()), 879 &[], 880 )) 881 .await 882 .unwrap(), 883 ) 884 .await; 885 assert_eq!( 886 status, 887 StatusCode::OK, 888 "extractor key must match handler subject, body was {json}" 889 ); 890 let items = json["items"].as_array().unwrap(); 891 assert_eq!(items.len(), 1, "expected exactly one pipeline edge"); 892} 893 894#[tokio::test] 895async fn list_pipelines_drops_records_without_resolvable_repo_did() { 896 let h = Harness::new().await; 897 let owner_did = did("did:plc:nel"); 898 let spindle_did = did("did:plc:lyna"); 899 let rk = rkey("pl1"); 900 let source = at(&format!( 901 "at://{}/sh.tangled.pipeline/{}", 902 spindle_did.as_ref(), 903 rk.as_ref() 904 )); 905 let body = pipeline_body_owner_only(&owner_did); 906 let parsed = 907 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone()) 908 .expect("parse pipeline record"); 909 parsed 910 .extract_edges(&source) 911 .expect("extract") 912 .into_iter() 913 .for_each(|e| h.edges.add(e)); 914 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body) 915 .await; 916 917 let app = router(h.state.clone()); 918 let (status, json) = json_response( 919 app.oneshot(list_request( 920 "sh.tangled.pipeline.listPipelines", 921 &format!("at://{}", owner_did.as_ref()), 922 &[], 923 )) 924 .await 925 .unwrap(), 926 ) 927 .await; 928 assert_eq!(status, StatusCode::OK, "body was {json}"); 929 let items = json["items"].as_array().unwrap(); 930 assert!( 931 items.is_empty(), 932 "pipeline without resolvable repoDid must be dropped, got {json}" 933 ); 934}