This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 25 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_types::edges::Edge; 12use bobbin_types::ids::SubjectRef; 13use bobbin_xrpc::{AppState, router}; 14use http::{Request, StatusCode}; 15use jacquard_common::DefaultStr; 16use jacquard_common::types::did::Did; 17use jacquard_common::types::nsid::Nsid; 18use jacquard_common::types::recordkey::Rkey; 19use jacquard_common::types::string::AtUri; 20use serde_json::{Value, json}; 21use tower::ServiceExt; 22use url::Url; 23use url::form_urlencoded::byte_serialize; 24use wiremock::matchers::{method, path, query_param}; 25use wiremock::{Mock, MockServer, ResponseTemplate}; 26 27const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 28const TAG_BYTES: &str = "AAAAAAAAAAAAAAAAAAAAAAAAAAA="; 29const ARTIFACT_LINK: &str = "bafkreigh2akiscaildc7gnvtklbsfhdgwz72eolmpckbqr5ej26byp3uli"; 30 31fn at(s: &str) -> AtUri<DefaultStr> { 32 AtUri::new_owned(s).unwrap() 33} 34 35fn did(s: &str) -> Did<DefaultStr> { 36 Did::new_owned(s).unwrap() 37} 38 39fn rkey(s: &str) -> Rkey<DefaultStr> { 40 Rkey::new_owned(s).unwrap() 41} 42 43fn nsid(s: &'static str) -> Nsid<DefaultStr> { 44 Nsid::new_static(s).unwrap() 45} 46 47fn subj(s: &str) -> SubjectRef { 48 Did::<DefaultStr>::new_owned(s) 49 .map(SubjectRef::Did) 50 .unwrap_or_else(|_| SubjectRef::Uri(AtUri::new_owned(s).unwrap())) 51} 52 53struct Harness { 54 server: MockServer, 55 edges: Arc<EdgeStore>, 56 state: AppState, 57} 58 59impl Harness { 60 async fn new() -> Self { 61 let server = MockServer::start().await; 62 let edges = Arc::new(EdgeStore::new(RuntimeHasher::default())); 63 let coverage = Arc::new(CoverageWatch::new()); 64 let state = AppState::new( 65 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 66 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 67 edges.clone(), 68 Arc::new(StateIndex::new(RuntimeHasher::default())), 69 Arc::new(StateIndex::new(RuntimeHasher::default())), 70 coverage, 71 Arc::new( 72 KnotProxy::new( 73 KnotProxyConfig::default(), 74 KnotHttpConfig::default(), 75 Arc::new(SystemClock::new()), 76 RuntimeHasher::default(), 77 ) 78 .unwrap(), 79 ), 80 Arc::new( 81 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 82 ) as Arc<dyn SearchReader>, 83 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 84 ); 85 Self { 86 server, 87 edges, 88 state, 89 } 90 } 91 92 fn add_edge( 93 &self, 94 kind: &Nsid<DefaultStr>, 95 subject: &AtUri<DefaultStr>, 96 source: &AtUri<DefaultStr>, 97 ) { 98 static EDGE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); 99 self.edges.add(Edge { 100 kind: kind.clone(), 101 subject: subj(subject.as_ref()), 102 source: source.clone(), 103 sort_micros: EDGE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed), 104 }); 105 } 106 107 async fn mount( 108 &self, 109 did: &Did<DefaultStr>, 110 collection: &Nsid<DefaultStr>, 111 rkey: &Rkey<DefaultStr>, 112 value: Value, 113 ) { 114 let uri = format!( 115 "at://{}/{}/{}", 116 did.as_ref(), 117 collection.as_ref(), 118 rkey.as_ref() 119 ); 120 Mock::given(method("GET")) 121 .and(path("/xrpc/com.atproto.repo.getRecord")) 122 .and(query_param("repo", did.as_ref())) 123 .and(query_param("collection", collection.as_ref())) 124 .and(query_param("rkey", rkey.as_ref())) 125 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 126 "uri": uri, 127 "cid": CID, 128 "value": value, 129 }))) 130 .mount(&self.server) 131 .await; 132 } 133} 134 135fn list_request(endpoint: &str, subject: &str, extras: &[(&str, &str)]) -> Request<Body> { 136 let mut qs = format!("subject={}", encode(subject)); 137 extras.iter().for_each(|(k, v)| { 138 qs.push('&'); 139 qs.push_str(k); 140 qs.push('='); 141 qs.push_str(&encode(v)); 142 }); 143 Request::builder() 144 .uri(format!("/xrpc/{endpoint}?{qs}")) 145 .body(Body::empty()) 146 .unwrap() 147} 148 149fn encode(s: &str) -> String { 150 byte_serialize(s.as_bytes()).collect() 151} 152 153async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 154 let status = resp.status(); 155 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 156 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 157 (status, parsed) 158} 159 160fn label_definition_body(name: &str) -> Value { 161 json!({ 162 "$type": "sh.tangled.label.definition", 163 "createdAt": "2026-05-01T00:00:00Z", 164 "name": name, 165 "scope": ["sh.tangled.repo.issue"], 166 "valueType": {"type": "boolean", "format": "any"} 167 }) 168} 169 170fn label_op_body(subject: &AtUri<DefaultStr>, def_uri: &AtUri<DefaultStr>, value: &str) -> Value { 171 json!({ 172 "$type": "sh.tangled.label.op", 173 "performedAt": "2026-05-01T00:00:00Z", 174 "subject": subject.as_ref(), 175 "add": [{"key": def_uri.as_ref(), "value": value}], 176 "delete": [] 177 }) 178} 179 180fn pipeline_body(repo_did: &Did<DefaultStr>) -> Value { 181 json!({ 182 "$type": "sh.tangled.pipeline", 183 "workflows": [], 184 "triggerMetadata": { 185 "kind": "manual", 186 "repo": { 187 "did": "did:plc:teq", 188 "repoDid": repo_did.as_ref(), 189 "knot": "nel.pet", 190 "defaultBranch": "main" 191 } 192 } 193 }) 194} 195 196fn pipeline_body_owner_only(owner_did: &Did<DefaultStr>) -> Value { 197 json!({ 198 "$type": "sh.tangled.pipeline", 199 "workflows": [], 200 "triggerMetadata": { 201 "kind": "manual", 202 "repo": { 203 "did": owner_did.as_ref(), 204 "knot": "nel.pet", 205 "defaultBranch": "main" 206 } 207 } 208 }) 209} 210 211fn pipeline_status_body(pipeline_uri: &AtUri<DefaultStr>) -> Value { 212 json!({ 213 "$type": "sh.tangled.pipeline.status", 214 "createdAt": "2026-05-01T00:00:00Z", 215 "pipeline": pipeline_uri.as_ref(), 216 "workflow": pipeline_uri.as_ref(), 217 "status": "success" 218 }) 219} 220 221fn artifact_body(repo_did: &Did<DefaultStr>, name: &str) -> Value { 222 json!({ 223 "$type": "sh.tangled.repo.artifact", 224 "createdAt": "2026-05-01T00:00:00Z", 225 "name": name, 226 "repoDid": repo_did.as_ref(), 227 "tag": {"$bytes": TAG_BYTES}, 228 "artifact": { 229 "$type": "blob", 230 "ref": {"$link": ARTIFACT_LINK}, 231 "mimeType": "application/octet-stream", 232 "size": 12 233 } 234 }) 235} 236 237fn knot_member_body(subject_did: &Did<DefaultStr>) -> Value { 238 json!({ 239 "$type": "sh.tangled.knot.member", 240 "createdAt": "2026-05-01T00:00:00Z", 241 "subject": subject_did.as_ref(), 242 "domain": "oyster.cafe" 243 }) 244} 245 246fn spindle_member_body(subject_did: &Did<DefaultStr>) -> Value { 247 json!({ 248 "$type": "sh.tangled.spindle.member", 249 "createdAt": "2026-05-01T00:00:00Z", 250 "subject": subject_did.as_ref(), 251 "instance": "spin.nel.pet" 252 }) 253} 254 255fn string_body(filename: &str, contents: &str) -> Value { 256 json!({ 257 "$type": "sh.tangled.string", 258 "createdAt": "2026-05-01T00:00:00Z", 259 "filename": filename, 260 "description": "test fixture", 261 "contents": contents 262 }) 263} 264 265#[tokio::test] 266async fn list_label_definitions_keys_on_owner_did() { 267 let h = Harness::new().await; 268 let owner = did("did:plc:abalone"); 269 let rk = rkey("bug"); 270 let source = at(&format!( 271 "at://{}/sh.tangled.label.definition/{}", 272 owner.as_ref(), 273 rk.as_ref() 274 )); 275 h.add_edge( 276 &nsid("sh.tangled.label.definition"), 277 &at(&format!("at://{}", owner.as_ref())), 278 &source, 279 ); 280 h.mount( 281 &owner, 282 &nsid("sh.tangled.label.definition"), 283 &rk, 284 label_definition_body("bug"), 285 ) 286 .await; 287 288 let app = router(h.state.clone()); 289 let (status, body) = json_response( 290 app.oneshot(list_request( 291 "sh.tangled.label.listDefinitions", 292 &format!("at://{}", owner.as_ref()), 293 &[], 294 )) 295 .await 296 .unwrap(), 297 ) 298 .await; 299 assert_eq!(status, StatusCode::OK); 300 let items = body["items"].as_array().unwrap(); 301 assert_eq!(items.len(), 1); 302 assert_eq!(items[0]["value"]["name"], json!("bug")); 303 assert_eq!( 304 items[0]["value"]["scope"][0], 305 json!("sh.tangled.repo.issue") 306 ); 307} 308 309#[tokio::test] 310async fn count_label_definitions_dedupes_per_author() { 311 let h = Harness::new().await; 312 let owner = did("did:plc:abalone"); 313 let subject = at(&format!("at://{}", owner.as_ref())); 314 h.add_edge( 315 &nsid("sh.tangled.label.definition"), 316 &subject, 317 &at(&format!( 318 "at://{}/sh.tangled.label.definition/bug", 319 owner.as_ref() 320 )), 321 ); 322 h.add_edge( 323 &nsid("sh.tangled.label.definition"), 324 &subject, 325 &at(&format!( 326 "at://{}/sh.tangled.label.definition/wontfix", 327 owner.as_ref() 328 )), 329 ); 330 331 let app = router(h.state.clone()); 332 let (_, body) = json_response( 333 app.oneshot(list_request( 334 "sh.tangled.label.countDefinitions", 335 subject.as_ref(), 336 &[], 337 )) 338 .await 339 .unwrap(), 340 ) 341 .await; 342 assert_eq!(body["count"], json!(2)); 343 assert_eq!(body["distinctAuthors"], json!(1)); 344} 345 346#[tokio::test] 347async fn list_label_ops_accepts_issue_subject() { 348 let h = Harness::new().await; 349 let issue_uri = at("at://did:plc:abalone/sh.tangled.repo.issue/i1"); 350 let author = did("did:plc:nel"); 351 let rk = rkey("op1"); 352 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/bug"); 353 h.add_edge( 354 &nsid("sh.tangled.label.op"), 355 &issue_uri, 356 &at(&format!( 357 "at://{}/sh.tangled.label.op/{}", 358 author.as_ref(), 359 rk.as_ref() 360 )), 361 ); 362 h.mount( 363 &author, 364 &nsid("sh.tangled.label.op"), 365 &rk, 366 label_op_body(&issue_uri, &def_uri, "true"), 367 ) 368 .await; 369 370 let app = router(h.state.clone()); 371 let (status, body) = json_response( 372 app.oneshot(list_request( 373 "sh.tangled.label.listOps", 374 issue_uri.as_ref(), 375 &[], 376 )) 377 .await 378 .unwrap(), 379 ) 380 .await; 381 assert_eq!(status, StatusCode::OK); 382 let items = body["items"].as_array().unwrap(); 383 assert_eq!(items.len(), 1); 384 assert_eq!(items[0]["value"]["subject"], json!(issue_uri.as_ref())); 385 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref())); 386} 387 388#[tokio::test] 389async fn list_label_ops_pull_subject_round_trip() { 390 let h = Harness::new().await; 391 let pull_uri = at("at://did:plc:abalone/sh.tangled.repo.pull/p1"); 392 let author = did("did:plc:bailey"); 393 let rk = rkey("op1"); 394 let def_uri = at("at://did:plc:abalone/sh.tangled.label.definition/wontfix"); 395 let source = at(&format!( 396 "at://{}/sh.tangled.label.op/{}", 397 author.as_ref(), 398 rk.as_ref() 399 )); 400 let body = label_op_body(&pull_uri, &def_uri, "true"); 401 let parsed = 402 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.label.op"), body.clone()) 403 .expect("parse label.op record"); 404 parsed 405 .extract_edges(&source) 406 .expect("extract") 407 .into_iter() 408 .for_each(|e| h.edges.add(e)); 409 h.mount(&author, &nsid("sh.tangled.label.op"), &rk, body) 410 .await; 411 412 let app = router(h.state.clone()); 413 let (status, json) = json_response( 414 app.oneshot(list_request( 415 "sh.tangled.label.listOps", 416 pull_uri.as_ref(), 417 &[], 418 )) 419 .await 420 .unwrap(), 421 ) 422 .await; 423 assert_eq!(status, StatusCode::OK); 424 let items = json["items"].as_array().unwrap(); 425 assert_eq!(items.len(), 1); 426 assert_eq!(items[0]["value"]["subject"], json!(pull_uri.as_ref())); 427 assert_eq!(items[0]["value"]["add"][0]["key"], json!(def_uri.as_ref())); 428} 429 430#[tokio::test] 431async fn list_label_ops_rejects_bare_did_subject() { 432 let h = Harness::new().await; 433 let app = router(h.state.clone()); 434 let (status, body) = json_response( 435 app.oneshot(list_request( 436 "sh.tangled.label.listOps", 437 "at://did:plc:abalone", 438 &[], 439 )) 440 .await 441 .unwrap(), 442 ) 443 .await; 444 assert_eq!(status, StatusCode::BAD_REQUEST); 445 let msg = body["message"].as_str().unwrap_or_default(); 446 assert!( 447 msg.contains("sh.tangled.repo.issue") && msg.contains("sh.tangled.repo.pull"), 448 "message must list allowed collections, got {msg}" 449 ); 450} 451 452#[tokio::test] 453async fn list_label_ops_rejects_unrelated_collection() { 454 let h = Harness::new().await; 455 let app = router(h.state.clone()); 456 let (status, _) = json_response( 457 app.oneshot(list_request( 458 "sh.tangled.label.listOps", 459 "at://did:plc:abalone/sh.tangled.repo/r1", 460 &[], 461 )) 462 .await 463 .unwrap(), 464 ) 465 .await; 466 assert_eq!(status, StatusCode::BAD_REQUEST); 467} 468 469#[tokio::test] 470async fn list_pipelines_keys_on_repo_did() { 471 let h = Harness::new().await; 472 let repo_did = did("did:plc:abalone"); 473 let subject = at(&format!("at://{}", repo_did.as_ref())); 474 let spindle_did = did("did:plc:lyna"); 475 let rk = rkey("pl1"); 476 h.add_edge( 477 &nsid("sh.tangled.pipeline"), 478 &subject, 479 &at(&format!( 480 "at://{}/sh.tangled.pipeline/{}", 481 spindle_did.as_ref(), 482 rk.as_ref() 483 )), 484 ); 485 h.mount( 486 &spindle_did, 487 &nsid("sh.tangled.pipeline"), 488 &rk, 489 pipeline_body(&repo_did), 490 ) 491 .await; 492 493 let app = router(h.state.clone()); 494 let (status, body) = json_response( 495 app.oneshot(list_request( 496 "sh.tangled.pipeline.listPipelines", 497 subject.as_ref(), 498 &[], 499 )) 500 .await 501 .unwrap(), 502 ) 503 .await; 504 assert_eq!(status, StatusCode::OK); 505 let items = body["items"].as_array().unwrap(); 506 assert_eq!(items.len(), 1); 507 assert_eq!( 508 items[0]["value"]["triggerMetadata"]["repo"]["repoDid"], 509 json!(repo_did.as_ref()) 510 ); 511} 512 513#[tokio::test] 514async fn count_pipelines_returns_zero_when_no_edges() { 515 let h = Harness::new().await; 516 let app = router(h.state.clone()); 517 let (_, body) = json_response( 518 app.oneshot(list_request( 519 "sh.tangled.pipeline.countPipelines", 520 "at://did:plc:abalone", 521 &[], 522 )) 523 .await 524 .unwrap(), 525 ) 526 .await; 527 assert_eq!(body["count"], json!(0)); 528} 529 530#[tokio::test] 531async fn list_pipeline_statuses_keys_on_pipeline_uri() { 532 let h = Harness::new().await; 533 let pipeline_uri = at("at://did:plc:lyna/sh.tangled.pipeline/pl1"); 534 let author = did("did:plc:bailey"); 535 let rk = rkey("s1"); 536 h.add_edge( 537 &nsid("sh.tangled.pipeline.status"), 538 &pipeline_uri, 539 &at(&format!( 540 "at://{}/sh.tangled.pipeline.status/{}", 541 author.as_ref(), 542 rk.as_ref() 543 )), 544 ); 545 h.mount( 546 &author, 547 &nsid("sh.tangled.pipeline.status"), 548 &rk, 549 pipeline_status_body(&pipeline_uri), 550 ) 551 .await; 552 553 let app = router(h.state.clone()); 554 let (status, body) = json_response( 555 app.oneshot(list_request( 556 "sh.tangled.pipeline.listStatuses", 557 pipeline_uri.as_ref(), 558 &[], 559 )) 560 .await 561 .unwrap(), 562 ) 563 .await; 564 assert_eq!(status, StatusCode::OK); 565 let items = body["items"].as_array().unwrap(); 566 assert_eq!(items.len(), 1); 567 assert_eq!(items[0]["value"]["pipeline"], json!(pipeline_uri.as_ref())); 568 assert_eq!(items[0]["value"]["status"], json!("success")); 569} 570 571#[tokio::test] 572async fn pipeline_status_endpoint_rejects_bare_did_subject() { 573 let h = Harness::new().await; 574 let app = router(h.state.clone()); 575 let (status, body) = json_response( 576 app.oneshot(list_request( 577 "sh.tangled.pipeline.listStatuses", 578 "at://did:plc:lyna", 579 &[], 580 )) 581 .await 582 .unwrap(), 583 ) 584 .await; 585 assert_eq!(status, StatusCode::BAD_REQUEST); 586 assert!( 587 body["message"] 588 .as_str() 589 .unwrap_or_default() 590 .contains("sh.tangled.pipeline/<rkey>"), 591 "{body}" 592 ); 593} 594 595#[tokio::test] 596async fn list_artifacts_keys_on_repo_did() { 597 let h = Harness::new().await; 598 let repo_did = did("did:plc:abalone"); 599 let subject = at(&format!("at://{}", repo_did.as_ref())); 600 let owner = did("did:plc:nel"); 601 let rk = rkey("a1"); 602 h.add_edge( 603 &nsid("sh.tangled.repo.artifact"), 604 &subject, 605 &at(&format!( 606 "at://{}/sh.tangled.repo.artifact/{}", 607 owner.as_ref(), 608 rk.as_ref() 609 )), 610 ); 611 h.mount( 612 &owner, 613 &nsid("sh.tangled.repo.artifact"), 614 &rk, 615 artifact_body(&repo_did, "out.bin"), 616 ) 617 .await; 618 619 let app = router(h.state.clone()); 620 let (status, body) = json_response( 621 app.oneshot(list_request( 622 "sh.tangled.repo.listArtifacts", 623 subject.as_ref(), 624 &[], 625 )) 626 .await 627 .unwrap(), 628 ) 629 .await; 630 assert_eq!(status, StatusCode::OK); 631 let items = body["items"].as_array().unwrap(); 632 assert_eq!(items.len(), 1); 633 assert_eq!(items[0]["value"]["name"], json!("out.bin")); 634 assert_eq!(items[0]["value"]["repoDid"], json!(repo_did.as_ref())); 635} 636 637#[tokio::test] 638async fn list_knot_members_keys_on_subject_did() { 639 let h = Harness::new().await; 640 let subject_did = did("did:plc:nel"); 641 let subject = at(&format!("at://{}", subject_did.as_ref())); 642 let admin = did("did:plc:teq"); 643 let rk = rkey("m1"); 644 h.add_edge( 645 &nsid("sh.tangled.knot.member"), 646 &subject, 647 &at(&format!( 648 "at://{}/sh.tangled.knot.member/{}", 649 admin.as_ref(), 650 rk.as_ref() 651 )), 652 ); 653 h.mount( 654 &admin, 655 &nsid("sh.tangled.knot.member"), 656 &rk, 657 knot_member_body(&subject_did), 658 ) 659 .await; 660 661 let app = router(h.state.clone()); 662 let (status, body) = json_response( 663 app.oneshot(list_request( 664 "sh.tangled.knot.listMembers", 665 subject.as_ref(), 666 &[], 667 )) 668 .await 669 .unwrap(), 670 ) 671 .await; 672 assert_eq!(status, StatusCode::OK); 673 let items = body["items"].as_array().unwrap(); 674 assert_eq!(items.len(), 1); 675 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref())); 676 assert_eq!(items[0]["value"]["domain"], json!("oyster.cafe")); 677} 678 679#[tokio::test] 680async fn list_spindle_members_keys_on_subject_did() { 681 let h = Harness::new().await; 682 let subject_did = did("did:plc:olaren"); 683 let subject = at(&format!("at://{}", subject_did.as_ref())); 684 let admin = did("did:plc:teq"); 685 let rk = rkey("m1"); 686 h.add_edge( 687 &nsid("sh.tangled.spindle.member"), 688 &subject, 689 &at(&format!( 690 "at://{}/sh.tangled.spindle.member/{}", 691 admin.as_ref(), 692 rk.as_ref() 693 )), 694 ); 695 h.mount( 696 &admin, 697 &nsid("sh.tangled.spindle.member"), 698 &rk, 699 spindle_member_body(&subject_did), 700 ) 701 .await; 702 703 let app = router(h.state.clone()); 704 let (status, body) = json_response( 705 app.oneshot(list_request( 706 "sh.tangled.spindle.listMembers", 707 subject.as_ref(), 708 &[], 709 )) 710 .await 711 .unwrap(), 712 ) 713 .await; 714 assert_eq!(status, StatusCode::OK); 715 let items = body["items"].as_array().unwrap(); 716 assert_eq!(items.len(), 1); 717 assert_eq!(items[0]["value"]["subject"], json!(subject_did.as_ref())); 718 assert_eq!(items[0]["value"]["instance"], json!("spin.nel.pet")); 719} 720 721#[tokio::test] 722async fn list_strings_keys_on_owner_did() { 723 let h = Harness::new().await; 724 let owner = did("did:plc:abalone"); 725 let subject = at(&format!("at://{}", owner.as_ref())); 726 let rk = rkey("k1"); 727 h.add_edge( 728 &nsid("sh.tangled.string"), 729 &subject, 730 &at(&format!( 731 "at://{}/sh.tangled.string/{}", 732 owner.as_ref(), 733 rk.as_ref() 734 )), 735 ); 736 h.mount( 737 &owner, 738 &nsid("sh.tangled.string"), 739 &rk, 740 string_body("snippet.rs", "fn main() {}"), 741 ) 742 .await; 743 744 let app = router(h.state.clone()); 745 let (status, body) = json_response( 746 app.oneshot(list_request( 747 "sh.tangled.string.listStrings", 748 subject.as_ref(), 749 &[], 750 )) 751 .await 752 .unwrap(), 753 ) 754 .await; 755 assert_eq!(status, StatusCode::OK); 756 let items = body["items"].as_array().unwrap(); 757 assert_eq!(items.len(), 1); 758 assert_eq!(items[0]["value"]["filename"], json!("snippet.rs")); 759 assert_eq!(items[0]["value"]["contents"], json!("fn main() {}")); 760} 761 762#[tokio::test] 763async fn count_strings_dedupes_per_owner() { 764 let h = Harness::new().await; 765 let owner = did("did:plc:abalone"); 766 let subject = at(&format!("at://{}", owner.as_ref())); 767 ["k1", "k2", "k3"].iter().for_each(|r| { 768 h.add_edge( 769 &nsid("sh.tangled.string"), 770 &subject, 771 &at(&format!("at://{}/sh.tangled.string/{}", owner.as_ref(), r)), 772 ); 773 }); 774 775 let app = router(h.state.clone()); 776 let (_, body) = json_response( 777 app.oneshot(list_request( 778 "sh.tangled.string.countStrings", 779 subject.as_ref(), 780 &[], 781 )) 782 .await 783 .unwrap(), 784 ) 785 .await; 786 assert_eq!(body["count"], json!(3)); 787 assert_eq!(body["distinctAuthors"], json!(1)); 788} 789 790#[tokio::test] 791async fn extractor_to_xrpc_round_trip_for_pipeline() { 792 let h = Harness::new().await; 793 let repo_did = did("did:plc:abalone"); 794 let spindle_did = did("did:plc:lyna"); 795 let rk = rkey("pl1"); 796 let source = at(&format!( 797 "at://{}/sh.tangled.pipeline/{}", 798 spindle_did.as_ref(), 799 rk.as_ref() 800 )); 801 let body = pipeline_body(&repo_did); 802 let parsed = 803 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone()) 804 .expect("parse pipeline record"); 805 parsed 806 .extract_edges(&source) 807 .expect("extract") 808 .into_iter() 809 .for_each(|e| h.edges.add(e)); 810 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body) 811 .await; 812 813 let app = router(h.state.clone()); 814 let (status, json) = json_response( 815 app.oneshot(list_request( 816 "sh.tangled.pipeline.listPipelines", 817 &format!("at://{}", repo_did.as_ref()), 818 &[], 819 )) 820 .await 821 .unwrap(), 822 ) 823 .await; 824 assert_eq!( 825 status, 826 StatusCode::OK, 827 "extractor key must match handler subject, body was {json}" 828 ); 829 let items = json["items"].as_array().unwrap(); 830 assert_eq!(items.len(), 1, "expected exactly one pipeline edge"); 831} 832 833#[tokio::test] 834async fn list_pipelines_drops_records_without_resolvable_repo_did() { 835 let h = Harness::new().await; 836 let owner_did = did("did:plc:nel"); 837 let spindle_did = did("did:plc:lyna"); 838 let rk = rkey("pl1"); 839 let source = at(&format!( 840 "at://{}/sh.tangled.pipeline/{}", 841 spindle_did.as_ref(), 842 rk.as_ref() 843 )); 844 let body = pipeline_body_owner_only(&owner_did); 845 let parsed = 846 bobbin_types::edges::Record::from_json_value(&nsid("sh.tangled.pipeline"), body.clone()) 847 .expect("parse pipeline record"); 848 parsed 849 .extract_edges(&source) 850 .expect("extract") 851 .into_iter() 852 .for_each(|e| h.edges.add(e)); 853 h.mount(&spindle_did, &nsid("sh.tangled.pipeline"), &rk, body) 854 .await; 855 856 let app = router(h.state.clone()); 857 let (status, json) = json_response( 858 app.oneshot(list_request( 859 "sh.tangled.pipeline.listPipelines", 860 &format!("at://{}", owner_did.as_ref()), 861 &[], 862 )) 863 .await 864 .unwrap(), 865 ) 866 .await; 867 assert_eq!(status, StatusCode::OK, "body was {json}"); 868 let items = json["items"].as_array().unwrap(); 869 assert!( 870 items.is_empty(), 871 "pipeline without resolvable repoDid must be dropped, got {json}" 872 ); 873}