Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 26 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{Coverage, CoverageWatch, EdgeStore, HydrantCursor, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_types::search::{SearchDoc, SearchSink}; 12use bobbin_xrpc::{AppState, router}; 13use http::{Request, StatusCode}; 14use jacquard_common::DefaultStr; 15use jacquard_common::types::nsid::Nsid; 16use jacquard_common::types::recordkey::Rkey; 17use jacquard_common::types::string::{AtUri, Did}; 18use serde_json::{Value, json}; 19use tower::ServiceExt; 20use url::Url; 21use url::form_urlencoded::byte_serialize; 22use wiremock::matchers::{method, path, query_param}; 23use wiremock::{Mock, MockServer, ResponseTemplate}; 24 25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 26 27fn at(s: &str) -> AtUri<DefaultStr> { 28 AtUri::new_owned(s).unwrap() 29} 30 31fn did(s: &str) -> Did<DefaultStr> { 32 Did::new_owned(s).unwrap() 33} 34 35fn rkey(s: &str) -> Rkey<DefaultStr> { 36 Rkey::new_owned(s).unwrap() 37} 38 39fn nsid(s: &'static str) -> Nsid<DefaultStr> { 40 Nsid::new_static(s).unwrap() 41} 42 43fn enc(s: &str) -> String { 44 byte_serialize(s.as_bytes()).collect() 45} 46 47struct Harness { 48 server: MockServer, 49 coverage: Arc<CoverageWatch>, 50 search: Arc<SearchIndex>, 51 state: AppState, 52} 53 54impl Harness { 55 async fn new() -> Self { 56 let server = MockServer::start().await; 57 let coverage = Arc::new(CoverageWatch::new()); 58 let search = Arc::new( 59 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 60 ); 61 let state = AppState::new( 62 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 63 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 64 Arc::new(EdgeStore::new(RuntimeHasher::default())), 65 Arc::new(StateIndex::new(RuntimeHasher::default())), 66 Arc::new(StateIndex::new(RuntimeHasher::default())), 67 coverage.clone(), 68 Arc::new( 69 KnotProxy::new( 70 KnotProxyConfig::default(), 71 KnotHttpConfig::default(), 72 Arc::new(SystemClock::new()), 73 RuntimeHasher::default(), 74 ) 75 .unwrap(), 76 ), 77 search.clone() as Arc<dyn SearchReader>, 78 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 79 ); 80 Self { 81 server, 82 coverage, 83 search, 84 state, 85 } 86 } 87 88 async fn index_issue( 89 &self, 90 did: &Did<DefaultStr>, 91 rkey: &Rkey<DefaultStr>, 92 title: &str, 93 body: &str, 94 ) { 95 self.index_issue_at(did, rkey, title, body, None, None) 96 .await; 97 } 98 99 async fn index_issue_at( 100 &self, 101 did: &Did<DefaultStr>, 102 rkey: &Rkey<DefaultStr>, 103 title: &str, 104 body: &str, 105 created_at: Option<i64>, 106 repo: Option<&Did<DefaultStr>>, 107 ) { 108 let uri = format!( 109 "at://{}/sh.tangled.repo.issue/{}", 110 did.as_ref(), 111 rkey.as_ref() 112 ); 113 self.search 114 .upsert(SearchDoc { 115 uri: at(&uri), 116 nsid: nsid("sh.tangled.repo.issue"), 117 title: title.to_owned(), 118 body: body.to_owned(), 119 author: Some(did.clone()), 120 created_at, 121 repo: repo.cloned(), 122 }) 123 .await; 124 self.search.flush().await; 125 self.mount_issue(did, rkey, title, body).await; 126 } 127 128 async fn index_string( 129 &self, 130 did: &Did<DefaultStr>, 131 rkey: &Rkey<DefaultStr>, 132 filename: &str, 133 contents: &str, 134 ) { 135 let uri = format!("at://{}/sh.tangled.string/{}", did.as_ref(), rkey.as_ref()); 136 self.search 137 .upsert(SearchDoc { 138 uri: at(&uri), 139 nsid: nsid("sh.tangled.string"), 140 title: filename.to_owned(), 141 body: contents.to_owned(), 142 author: None, 143 created_at: None, 144 repo: None, 145 }) 146 .await; 147 self.search.flush().await; 148 self.mount_string(did, rkey, filename, contents).await; 149 } 150 151 async fn mount_issue( 152 &self, 153 did: &Did<DefaultStr>, 154 rkey: &Rkey<DefaultStr>, 155 title: &str, 156 body: &str, 157 ) { 158 let uri = format!( 159 "at://{}/sh.tangled.repo.issue/{}", 160 did.as_ref(), 161 rkey.as_ref() 162 ); 163 let value = json!({ 164 "$type": "sh.tangled.repo.issue", 165 "repo": "did:plc:abalone", 166 "title": title, 167 "body": body, 168 "createdAt": "2026-05-01T00:00:00Z" 169 }); 170 Mock::given(method("GET")) 171 .and(path("/xrpc/com.atproto.repo.getRecord")) 172 .and(query_param("repo", did.as_ref())) 173 .and(query_param("collection", "sh.tangled.repo.issue")) 174 .and(query_param("rkey", rkey.as_ref())) 175 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 176 "uri": uri, 177 "cid": CID, 178 "value": value, 179 }))) 180 .mount(&self.server) 181 .await; 182 } 183 184 async fn mount_issue_with_raw_value( 185 &self, 186 did: &Did<DefaultStr>, 187 rkey: &Rkey<DefaultStr>, 188 raw_value: &str, 189 ) { 190 let uri = format!( 191 "at://{}/sh.tangled.repo.issue/{}", 192 did.as_ref(), 193 rkey.as_ref() 194 ); 195 let body = format!(r#"{{"uri":"{uri}","cid":"{CID}","value":{raw_value}}}"#,); 196 Mock::given(method("GET")) 197 .and(path("/xrpc/com.atproto.repo.getRecord")) 198 .and(query_param("repo", did.as_ref())) 199 .and(query_param("collection", "sh.tangled.repo.issue")) 200 .and(query_param("rkey", rkey.as_ref())) 201 .respond_with( 202 ResponseTemplate::new(200) 203 .insert_header("content-type", "application/json") 204 .set_body_string(body), 205 ) 206 .mount(&self.server) 207 .await; 208 } 209 210 async fn mount_string( 211 &self, 212 did: &Did<DefaultStr>, 213 rkey: &Rkey<DefaultStr>, 214 filename: &str, 215 contents: &str, 216 ) { 217 let uri = format!("at://{}/sh.tangled.string/{}", did.as_ref(), rkey.as_ref()); 218 let value = json!({ 219 "$type": "sh.tangled.string", 220 "filename": filename, 221 "description": "field notes", 222 "contents": contents, 223 "createdAt": "2026-05-01T00:00:00Z" 224 }); 225 Mock::given(method("GET")) 226 .and(path("/xrpc/com.atproto.repo.getRecord")) 227 .and(query_param("repo", did.as_ref())) 228 .and(query_param("collection", "sh.tangled.string")) 229 .and(query_param("rkey", rkey.as_ref())) 230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 231 "uri": uri, 232 "cid": CID, 233 "value": value, 234 }))) 235 .mount(&self.server) 236 .await; 237 } 238 239 fn warming(&self, events: u64, cursor: u64) { 240 self.coverage.update(|_| Coverage::Warming { 241 events_processed: events, 242 last_cursor: HydrantCursor::new(cursor), 243 }); 244 } 245 246 fn promote_ready(&self, events: u64, cursor: u64) { 247 self.coverage.update(|_| Coverage::Ready { 248 events_processed: events, 249 last_cursor: HydrantCursor::new(cursor), 250 }); 251 } 252} 253 254fn search_request(extras: &[(&str, &str)]) -> Request<Body> { 255 let qs = extras 256 .iter() 257 .map(|(k, v)| format!("{k}={}", enc(v))) 258 .collect::<Vec<_>>() 259 .join("&"); 260 Request::builder() 261 .uri(format!("/xrpc/sh.tangled.search.query?{qs}")) 262 .body(Body::empty()) 263 .unwrap() 264} 265 266async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 267 let status = resp.status(); 268 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 269 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 270 (status, parsed) 271} 272 273#[tokio::test] 274async fn empty_query_returns_no_hits() { 275 let h = Harness::new().await; 276 let app = router(h.state.clone()); 277 let resp = app 278 .oneshot(search_request(&[("q", "barnacle")])) 279 .await 280 .unwrap(); 281 let (status, body) = json_response(resp).await; 282 assert_eq!(status, StatusCode::OK); 283 assert_eq!(body["hits"], json!([])); 284 assert!(body["cursor"].is_null()); 285} 286 287#[tokio::test] 288async fn indexed_issue_hydrates_typed_value() { 289 let h = Harness::new().await; 290 h.index_issue( 291 &did("did:plc:nel"), 292 &rkey("abcabcabcabcz"), 293 "barnacle pagination overflow", 294 "scrolling resets when the cursor wraps", 295 ) 296 .await; 297 let app = router(h.state.clone()); 298 let resp = app 299 .oneshot(search_request(&[("q", "barnacle")])) 300 .await 301 .unwrap(); 302 let (status, body) = json_response(resp).await; 303 assert_eq!(status, StatusCode::OK); 304 let hits = body["hits"].as_array().expect("hits array"); 305 assert_eq!(hits.len(), 1); 306 assert_eq!( 307 hits[0]["uri"].as_str().unwrap(), 308 "at://did:plc:nel/sh.tangled.repo.issue/abcabcabcabcz", 309 ); 310 assert_eq!(hits[0]["nsid"], json!("sh.tangled.repo.issue")); 311 assert_eq!(hits[0]["cid"], CID); 312 assert_eq!( 313 hits[0]["value"]["$type"], 314 json!("sh.tangled.repo.issue"), 315 "hit value is typed via SearchableRecord serialization", 316 ); 317 assert_eq!( 318 hits[0]["value"]["title"], 319 json!("barnacle pagination overflow"), 320 ); 321 assert_eq!(hits[0]["value"]["repo"], json!("did:plc:abalone")); 322 assert!(hits[0]["score"].as_f64().unwrap() > 0.0); 323} 324 325#[tokio::test] 326async fn nsid_filter_narrows_to_single_collection() { 327 let h = Harness::new().await; 328 h.index_issue( 329 &did("did:plc:nel"), 330 &rkey("i1"), 331 "anemone tide", 332 "high water", 333 ) 334 .await; 335 h.index_string( 336 &did("did:plc:teq"), 337 &rkey("k1"), 338 "anemone.md", 339 "anemone notes", 340 ) 341 .await; 342 let app = router(h.state.clone()); 343 let resp = app 344 .clone() 345 .oneshot(search_request(&[ 346 ("q", "anemone"), 347 ("nsid", "sh.tangled.string"), 348 ])) 349 .await 350 .unwrap(); 351 let (status, body) = json_response(resp).await; 352 assert_eq!(status, StatusCode::OK); 353 let hits = body["hits"].as_array().unwrap(); 354 assert_eq!(hits.len(), 1); 355 assert_eq!(hits[0]["nsid"], json!("sh.tangled.string")); 356 assert_eq!(hits[0]["value"]["$type"], json!("sh.tangled.string")); 357 assert_eq!(hits[0]["value"]["filename"], json!("anemone.md")); 358} 359 360#[tokio::test] 361async fn hit_set_stable_across_coverage_promotion() { 362 let h = Harness::new().await; 363 h.index_issue( 364 &did("did:plc:nel"), 365 &rkey("i1"), 366 "limpet survey", 367 "tidal pools", 368 ) 369 .await; 370 let app = router(h.state.clone()); 371 h.warming(3, 5); 372 let (_, before) = json_response( 373 app.clone() 374 .oneshot(search_request(&[("q", "limpet")])) 375 .await 376 .unwrap(), 377 ) 378 .await; 379 let before_hits = before["hits"].as_array().unwrap().clone(); 380 assert_eq!(before_hits.len(), 1); 381 382 h.promote_ready(9, 11); 383 let (_, after) = json_response( 384 app.oneshot(search_request(&[("q", "limpet")])) 385 .await 386 .unwrap(), 387 ) 388 .await; 389 let after_hits = after["hits"].as_array().unwrap(); 390 assert_eq!(after_hits.len(), before_hits.len()); 391 assert_eq!(after_hits[0]["uri"], before_hits[0]["uri"]); 392} 393 394#[tokio::test] 395async fn pagination_round_trips_via_cursor() { 396 let h = Harness::new().await; 397 let names = ["nel", "olaren", "teq", "lyna", "bailey"]; 398 for (i, owner) in names.iter().enumerate() { 399 h.index_issue( 400 &did(&format!("did:plc:{owner}")), 401 &rkey(&format!("r{i}")), 402 "anemone tides", 403 "shell", 404 ) 405 .await; 406 } 407 let app = router(h.state.clone()); 408 let (_, page1) = json_response( 409 app.clone() 410 .oneshot(search_request(&[("q", "anemone"), ("limit", "2")])) 411 .await 412 .unwrap(), 413 ) 414 .await; 415 assert_eq!(page1["hits"].as_array().unwrap().len(), 2); 416 let cursor = page1["cursor"].as_str().expect("more pages").to_owned(); 417 418 let (_, page2) = json_response( 419 app.clone() 420 .oneshot(search_request(&[ 421 ("q", "anemone"), 422 ("limit", "2"), 423 ("cursor", &cursor), 424 ])) 425 .await 426 .unwrap(), 427 ) 428 .await; 429 assert_eq!(page2["hits"].as_array().unwrap().len(), 2); 430 let cursor2 = page2["cursor"].as_str().expect("more pages").to_owned(); 431 432 let (_, page3) = json_response( 433 app.oneshot(search_request(&[ 434 ("q", "anemone"), 435 ("limit", "2"), 436 ("cursor", &cursor2), 437 ])) 438 .await 439 .unwrap(), 440 ) 441 .await; 442 assert_eq!(page3["hits"].as_array().unwrap().len(), 1); 443 assert!(page3["cursor"].is_null()); 444} 445 446#[tokio::test] 447async fn empty_q_returns_400() { 448 let h = Harness::new().await; 449 let app = router(h.state.clone()); 450 let resp = app.oneshot(search_request(&[("q", " ")])).await.unwrap(); 451 let (status, body) = json_response(resp).await; 452 assert_eq!(status, StatusCode::BAD_REQUEST); 453 assert_eq!(body["error"], json!("InvalidRequest")); 454} 455 456#[tokio::test] 457async fn invalid_cursor_returns_400() { 458 let h = Harness::new().await; 459 let app = router(h.state.clone()); 460 let resp = app 461 .oneshot(search_request(&[("q", "anything"), ("cursor", "not-hex")])) 462 .await 463 .unwrap(); 464 let (status, body) = json_response(resp).await; 465 assert_eq!(status, StatusCode::BAD_REQUEST); 466 assert_eq!(body["error"], json!("InvalidRequest")); 467} 468 469#[tokio::test] 470async fn invalid_nsid_returns_400() { 471 let h = Harness::new().await; 472 let app = router(h.state.clone()); 473 let resp = app 474 .oneshot(search_request(&[ 475 ("q", "anything"), 476 ("nsid", "not a real nsid"), 477 ])) 478 .await 479 .unwrap(); 480 let (status, body) = json_response(resp).await; 481 assert_eq!(status, StatusCode::BAD_REQUEST); 482 assert_eq!(body["error"], json!("InvalidRequest")); 483} 484 485#[tokio::test] 486async fn tombstoned_hit_silently_dropped_from_results() { 487 let h = Harness::new().await; 488 h.search 489 .upsert(SearchDoc { 490 uri: at("at://did:plc:nel/sh.tangled.repo.issue/i1"), 491 nsid: nsid("sh.tangled.repo.issue"), 492 title: "kelp".to_owned(), 493 body: "ocean".to_owned(), 494 author: None, 495 created_at: None, 496 repo: None, 497 }) 498 .await; 499 h.search.flush().await; 500 h.index_issue( 501 &did("did:plc:teq"), 502 &rkey("i2"), 503 "kelp survives", 504 "still here", 505 ) 506 .await; 507 let app = router(h.state.clone()); 508 let resp = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap(); 509 let (status, body) = json_response(resp).await; 510 assert_eq!(status, StatusCode::OK); 511 let hits = body["hits"].as_array().expect("hits array"); 512 assert_eq!(hits.len(), 1, "tombstoned hit dropped, sibling kept"); 513 assert_eq!( 514 hits[0]["uri"].as_str().unwrap(), 515 "at://did:plc:teq/sh.tangled.repo.issue/i2", 516 ); 517} 518 519#[tokio::test] 520async fn upstream_5xx_during_hydration_drops_only_that_hit() { 521 let h = Harness::new().await; 522 h.index_issue( 523 &did("did:plc:nel"), 524 &rkey("i1"), 525 "kelp survey", 526 "still here", 527 ) 528 .await; 529 h.search 530 .upsert(SearchDoc { 531 uri: at("at://did:plc:teq/sh.tangled.repo.issue/i2"), 532 nsid: nsid("sh.tangled.repo.issue"), 533 title: "kelp drift".to_owned(), 534 body: "upstream is flaky".to_owned(), 535 author: Some(did("did:plc:teq")), 536 created_at: None, 537 repo: None, 538 }) 539 .await; 540 h.search.flush().await; 541 Mock::given(method("GET")) 542 .and(path("/xrpc/com.atproto.repo.getRecord")) 543 .and(query_param("repo", "did:plc:teq")) 544 .and(query_param("collection", "sh.tangled.repo.issue")) 545 .and(query_param("rkey", "i2")) 546 .respond_with(ResponseTemplate::new(503)) 547 .mount(&h.server) 548 .await; 549 let app = router(h.state.clone()); 550 let resp = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap(); 551 let (status, body) = json_response(resp).await; 552 assert_eq!(status, StatusCode::OK); 553 let hits = body["hits"].as_array().expect("hits array"); 554 assert_eq!(hits.len(), 1, "flaky hit dropped, healthy sibling kept"); 555 assert_eq!( 556 hits[0]["uri"].as_str().unwrap(), 557 "at://did:plc:nel/sh.tangled.repo.issue/i1", 558 ); 559} 560 561#[tokio::test] 562async fn second_query_short_circuits_via_lru_without_re_querying_slingshot() { 563 let h = Harness::new().await; 564 h.search 565 .upsert(SearchDoc { 566 uri: at("at://did:plc:nel/sh.tangled.repo.issue/i1"), 567 nsid: nsid("sh.tangled.repo.issue"), 568 title: "kelp".to_owned(), 569 body: "ocean".to_owned(), 570 author: None, 571 created_at: None, 572 repo: None, 573 }) 574 .await; 575 h.search.flush().await; 576 Mock::given(method("GET")) 577 .and(path("/xrpc/com.atproto.repo.getRecord")) 578 .and(query_param("repo", "did:plc:nel")) 579 .and(query_param("collection", "sh.tangled.repo.issue")) 580 .and(query_param("rkey", "i1")) 581 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 582 "uri": "at://did:plc:nel/sh.tangled.repo.issue/i1", 583 "cid": CID, 584 "value": { 585 "$type": "sh.tangled.repo.issue", 586 "repo": "did:plc:abalone", 587 "title": "kelp", 588 "body": "ocean", 589 "createdAt": "2026-05-01T00:00:00Z" 590 } 591 }))) 592 .expect(1) 593 .mount(&h.server) 594 .await; 595 let app = router(h.state.clone()); 596 let first = app 597 .clone() 598 .oneshot(search_request(&[("q", "kelp")])) 599 .await 600 .unwrap(); 601 let _ = json_response(first).await; 602 let second = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap(); 603 let _ = json_response(second).await; 604} 605 606#[tokio::test] 607async fn author_filter_narrows_to_matching_did() { 608 let h = Harness::new().await; 609 h.index_issue(&did("did:plc:nel"), &rkey("i1"), "kelp tide", "") 610 .await; 611 h.index_issue(&did("did:plc:teq"), &rkey("i2"), "kelp wave", "") 612 .await; 613 let app = router(h.state.clone()); 614 let resp = app 615 .oneshot(search_request(&[("q", "kelp"), ("author", "did:plc:nel")])) 616 .await 617 .unwrap(); 618 let (status, body) = json_response(resp).await; 619 assert_eq!(status, StatusCode::OK); 620 let hits = body["hits"].as_array().unwrap(); 621 assert_eq!(hits.len(), 1); 622 assert!( 623 hits[0]["uri"] 624 .as_str() 625 .unwrap() 626 .starts_with("at://did:plc:nel/") 627 ); 628} 629 630#[tokio::test] 631async fn since_until_window_filters_by_created_at() { 632 let h = Harness::new().await; 633 let early = 1_700_000_000; 634 let mid = 1_750_000_000; 635 let late = 1_800_000_000; 636 h.index_issue_at( 637 &did("did:plc:nel"), 638 &rkey("i1"), 639 "kelp early", 640 "", 641 Some(early), 642 None, 643 ) 644 .await; 645 h.index_issue_at( 646 &did("did:plc:nel"), 647 &rkey("i2"), 648 "kelp mid", 649 "", 650 Some(mid), 651 None, 652 ) 653 .await; 654 h.index_issue_at( 655 &did("did:plc:nel"), 656 &rkey("i3"), 657 "kelp late", 658 "", 659 Some(late), 660 None, 661 ) 662 .await; 663 let app = router(h.state.clone()); 664 let resp = app 665 .oneshot(search_request(&[ 666 ("q", "kelp"), 667 ("since", "2025-01-01T00:00:00Z"), 668 ("until", "2027-01-01T00:00:00Z"), 669 ])) 670 .await 671 .unwrap(); 672 let (status, body) = json_response(resp).await; 673 assert_eq!(status, StatusCode::OK); 674 let hits = body["hits"].as_array().unwrap(); 675 assert_eq!(hits.len(), 1, "only the mid record falls in [2025, 2027)"); 676 assert_eq!( 677 hits[0]["uri"].as_str().unwrap(), 678 "at://did:plc:nel/sh.tangled.repo.issue/i2" 679 ); 680} 681 682#[tokio::test] 683async fn repo_filter_scopes_to_owning_repo() { 684 let h = Harness::new().await; 685 let abalone = did("did:plc:abalone"); 686 let limpet = did("did:plc:limpet"); 687 h.index_issue_at( 688 &did("did:plc:nel"), 689 &rkey("i1"), 690 "kelp one", 691 "", 692 None, 693 Some(&abalone), 694 ) 695 .await; 696 h.index_issue_at( 697 &did("did:plc:teq"), 698 &rkey("i2"), 699 "kelp two", 700 "", 701 None, 702 Some(&limpet), 703 ) 704 .await; 705 let app = router(h.state.clone()); 706 let resp = app 707 .oneshot(search_request(&[("q", "kelp"), ("repo", abalone.as_ref())])) 708 .await 709 .unwrap(); 710 let (status, body) = json_response(resp).await; 711 assert_eq!(status, StatusCode::OK); 712 let hits = body["hits"].as_array().unwrap(); 713 assert_eq!(hits.len(), 1); 714 assert_eq!( 715 hits[0]["uri"].as_str().unwrap(), 716 "at://did:plc:nel/sh.tangled.repo.issue/i1" 717 ); 718} 719 720#[tokio::test] 721async fn invalid_author_did_returns_400() { 722 let h = Harness::new().await; 723 let app = router(h.state.clone()); 724 let resp = app 725 .oneshot(search_request(&[("q", "kelp"), ("author", "not-a-did")])) 726 .await 727 .unwrap(); 728 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 729} 730 731#[tokio::test] 732async fn invalid_since_returns_400() { 733 let h = Harness::new().await; 734 let app = router(h.state.clone()); 735 let resp = app 736 .oneshot(search_request(&[("q", "kelp"), ("since", "yesterday")])) 737 .await 738 .unwrap(); 739 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 740} 741 742#[tokio::test] 743async fn since_after_until_returns_400() { 744 let h = Harness::new().await; 745 let app = router(h.state.clone()); 746 let resp = app 747 .oneshot(search_request(&[ 748 ("q", "kelp"), 749 ("since", "2027-01-01T00:00:00Z"), 750 ("until", "2025-01-01T00:00:00Z"), 751 ])) 752 .await 753 .unwrap(); 754 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 755} 756 757#[tokio::test] 758async fn search_recovers_hit_with_duplicate_dollar_type() { 759 let h = Harness::new().await; 760 let owner = did("did:plc:nel"); 761 let key = rkey("dupezzzzzzzzz"); 762 h.search 763 .upsert(SearchDoc { 764 uri: at(&format!( 765 "at://{}/sh.tangled.repo.issue/{}", 766 owner.as_ref(), 767 key.as_ref() 768 )), 769 nsid: nsid("sh.tangled.repo.issue"), 770 title: "meow regression".to_owned(), 771 body: "dup type field at end of object".to_owned(), 772 author: Some(owner.clone()), 773 created_at: None, 774 repo: None, 775 }) 776 .await; 777 h.search.flush().await; 778 let raw = r#"{"$type":"sh.tangled.repo.issue","repo":"did:plc:scallop","title":"meow regression","createdAt":"2026-05-01T00:00:00Z","$type":"sh.tangled.repo.issue"}"#; 779 h.mount_issue_with_raw_value(&owner, &key, raw).await; 780 let app = router(h.state.clone()); 781 let resp = app.oneshot(search_request(&[("q", "meow")])).await.unwrap(); 782 let (status, body) = json_response(resp).await; 783 assert_eq!(status, StatusCode::OK); 784 let hits = body["hits"].as_array().expect("hits array"); 785 assert_eq!(hits.len(), 1); 786 assert_eq!(hits[0]["value"]["title"], json!("meow regression")); 787 assert_eq!(hits[0]["value"]["repo"], json!("did:plc:scallop")); 788} 789 790#[tokio::test] 791async fn search_drops_undecodable_hit_and_returns_others() { 792 let h = Harness::new().await; 793 let good_owner = did("did:plc:nel"); 794 let good_key = rkey("goodzzzzzzzzz"); 795 let bad_owner = did("did:plc:teq"); 796 let bad_key = rkey("badzzzzzzzzzz"); 797 h.index_issue(&good_owner, &good_key, "kelp survey", "good body") 798 .await; 799 h.search 800 .upsert(SearchDoc { 801 uri: at(&format!( 802 "at://{}/sh.tangled.repo.issue/{}", 803 bad_owner.as_ref(), 804 bad_key.as_ref() 805 )), 806 nsid: nsid("sh.tangled.repo.issue"), 807 title: "kelp drift".to_owned(), 808 body: "missing required fields".to_owned(), 809 author: Some(bad_owner.clone()), 810 created_at: None, 811 repo: None, 812 }) 813 .await; 814 h.search.flush().await; 815 let unrecoverable = r#"{"$type":"sh.tangled.repo.issue"}"#; 816 h.mount_issue_with_raw_value(&bad_owner, &bad_key, unrecoverable) 817 .await; 818 let app = router(h.state.clone()); 819 let resp = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap(); 820 let (status, body) = json_response(resp).await; 821 assert_eq!(status, StatusCode::OK); 822 let hits = body["hits"].as_array().expect("hits array"); 823 assert_eq!(hits.len(), 1); 824 assert_eq!( 825 hits[0]["uri"].as_str().unwrap(), 826 format!( 827 "at://{}/sh.tangled.repo.issue/{}", 828 good_owner.as_ref(), 829 good_key.as_ref() 830 ), 831 ); 832}