Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{Coverage, CoverageWatch, EdgeStore, HydrantCursor, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_types::search::{SearchDoc, SearchSink};
12use bobbin_xrpc::{AppState, router};
13use http::{Request, StatusCode};
14use jacquard_common::DefaultStr;
15use jacquard_common::types::nsid::Nsid;
16use jacquard_common::types::recordkey::Rkey;
17use jacquard_common::types::string::{AtUri, Did};
18use serde_json::{Value, json};
19use tower::ServiceExt;
20use url::Url;
21use url::form_urlencoded::byte_serialize;
22use wiremock::matchers::{method, path, query_param};
23use wiremock::{Mock, MockServer, ResponseTemplate};
24
25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
26
27fn at(s: &str) -> AtUri<DefaultStr> {
28 AtUri::new_owned(s).unwrap()
29}
30
31fn did(s: &str) -> Did<DefaultStr> {
32 Did::new_owned(s).unwrap()
33}
34
35fn rkey(s: &str) -> Rkey<DefaultStr> {
36 Rkey::new_owned(s).unwrap()
37}
38
39fn nsid(s: &'static str) -> Nsid<DefaultStr> {
40 Nsid::new_static(s).unwrap()
41}
42
43fn enc(s: &str) -> String {
44 byte_serialize(s.as_bytes()).collect()
45}
46
47struct Harness {
48 server: MockServer,
49 coverage: Arc<CoverageWatch>,
50 search: Arc<SearchIndex>,
51 state: AppState,
52}
53
54impl Harness {
55 async fn new() -> Self {
56 let server = MockServer::start().await;
57 let coverage = Arc::new(CoverageWatch::new());
58 let search = Arc::new(
59 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
60 );
61 let state = AppState::new(
62 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
63 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
64 Arc::new(EdgeStore::new(RuntimeHasher::default())),
65 Arc::new(StateIndex::new(RuntimeHasher::default())),
66 Arc::new(StateIndex::new(RuntimeHasher::default())),
67 coverage.clone(),
68 Arc::new(
69 KnotProxy::new(
70 KnotProxyConfig::default(),
71 KnotHttpConfig::default(),
72 Arc::new(SystemClock::new()),
73 RuntimeHasher::default(),
74 )
75 .unwrap(),
76 ),
77 search.clone() as Arc<dyn SearchReader>,
78 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
79 );
80 Self {
81 server,
82 coverage,
83 search,
84 state,
85 }
86 }
87
88 async fn index_issue(
89 &self,
90 did: &Did<DefaultStr>,
91 rkey: &Rkey<DefaultStr>,
92 title: &str,
93 body: &str,
94 ) {
95 self.index_issue_at(did, rkey, title, body, None, None)
96 .await;
97 }
98
99 async fn index_issue_at(
100 &self,
101 did: &Did<DefaultStr>,
102 rkey: &Rkey<DefaultStr>,
103 title: &str,
104 body: &str,
105 created_at: Option<i64>,
106 repo: Option<&Did<DefaultStr>>,
107 ) {
108 let uri = format!(
109 "at://{}/sh.tangled.repo.issue/{}",
110 did.as_ref(),
111 rkey.as_ref()
112 );
113 self.search
114 .upsert(SearchDoc {
115 uri: at(&uri),
116 nsid: nsid("sh.tangled.repo.issue"),
117 title: title.to_owned(),
118 body: body.to_owned(),
119 author: Some(did.clone()),
120 created_at,
121 repo: repo.cloned(),
122 })
123 .await;
124 self.search.flush().await;
125 self.mount_issue(did, rkey, title, body).await;
126 }
127
128 async fn index_string(
129 &self,
130 did: &Did<DefaultStr>,
131 rkey: &Rkey<DefaultStr>,
132 filename: &str,
133 contents: &str,
134 ) {
135 let uri = format!("at://{}/sh.tangled.string/{}", did.as_ref(), rkey.as_ref());
136 self.search
137 .upsert(SearchDoc {
138 uri: at(&uri),
139 nsid: nsid("sh.tangled.string"),
140 title: filename.to_owned(),
141 body: contents.to_owned(),
142 author: None,
143 created_at: None,
144 repo: None,
145 })
146 .await;
147 self.search.flush().await;
148 self.mount_string(did, rkey, filename, contents).await;
149 }
150
151 async fn mount_issue(
152 &self,
153 did: &Did<DefaultStr>,
154 rkey: &Rkey<DefaultStr>,
155 title: &str,
156 body: &str,
157 ) {
158 let uri = format!(
159 "at://{}/sh.tangled.repo.issue/{}",
160 did.as_ref(),
161 rkey.as_ref()
162 );
163 let value = json!({
164 "$type": "sh.tangled.repo.issue",
165 "repo": "did:plc:abalone",
166 "title": title,
167 "body": body,
168 "createdAt": "2026-05-01T00:00:00Z"
169 });
170 Mock::given(method("GET"))
171 .and(path("/xrpc/com.atproto.repo.getRecord"))
172 .and(query_param("repo", did.as_ref()))
173 .and(query_param("collection", "sh.tangled.repo.issue"))
174 .and(query_param("rkey", rkey.as_ref()))
175 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
176 "uri": uri,
177 "cid": CID,
178 "value": value,
179 })))
180 .mount(&self.server)
181 .await;
182 }
183
184 async fn mount_issue_with_raw_value(
185 &self,
186 did: &Did<DefaultStr>,
187 rkey: &Rkey<DefaultStr>,
188 raw_value: &str,
189 ) {
190 let uri = format!(
191 "at://{}/sh.tangled.repo.issue/{}",
192 did.as_ref(),
193 rkey.as_ref()
194 );
195 let body = format!(r#"{{"uri":"{uri}","cid":"{CID}","value":{raw_value}}}"#,);
196 Mock::given(method("GET"))
197 .and(path("/xrpc/com.atproto.repo.getRecord"))
198 .and(query_param("repo", did.as_ref()))
199 .and(query_param("collection", "sh.tangled.repo.issue"))
200 .and(query_param("rkey", rkey.as_ref()))
201 .respond_with(
202 ResponseTemplate::new(200)
203 .insert_header("content-type", "application/json")
204 .set_body_string(body),
205 )
206 .mount(&self.server)
207 .await;
208 }
209
210 async fn mount_string(
211 &self,
212 did: &Did<DefaultStr>,
213 rkey: &Rkey<DefaultStr>,
214 filename: &str,
215 contents: &str,
216 ) {
217 let uri = format!("at://{}/sh.tangled.string/{}", did.as_ref(), rkey.as_ref());
218 let value = json!({
219 "$type": "sh.tangled.string",
220 "filename": filename,
221 "description": "field notes",
222 "contents": contents,
223 "createdAt": "2026-05-01T00:00:00Z"
224 });
225 Mock::given(method("GET"))
226 .and(path("/xrpc/com.atproto.repo.getRecord"))
227 .and(query_param("repo", did.as_ref()))
228 .and(query_param("collection", "sh.tangled.string"))
229 .and(query_param("rkey", rkey.as_ref()))
230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
231 "uri": uri,
232 "cid": CID,
233 "value": value,
234 })))
235 .mount(&self.server)
236 .await;
237 }
238
239 fn warming(&self, events: u64, cursor: u64) {
240 self.coverage.update(|_| Coverage::Warming {
241 events_processed: events,
242 last_cursor: HydrantCursor::new(cursor),
243 });
244 }
245
246 fn promote_ready(&self, events: u64, cursor: u64) {
247 self.coverage.update(|_| Coverage::Ready {
248 events_processed: events,
249 last_cursor: HydrantCursor::new(cursor),
250 });
251 }
252}
253
254fn search_request(extras: &[(&str, &str)]) -> Request<Body> {
255 let qs = extras
256 .iter()
257 .map(|(k, v)| format!("{k}={}", enc(v)))
258 .collect::<Vec<_>>()
259 .join("&");
260 Request::builder()
261 .uri(format!("/xrpc/sh.tangled.search.query?{qs}"))
262 .body(Body::empty())
263 .unwrap()
264}
265
266async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
267 let status = resp.status();
268 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
269 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
270 (status, parsed)
271}
272
273#[tokio::test]
274async fn empty_query_returns_no_hits() {
275 let h = Harness::new().await;
276 let app = router(h.state.clone());
277 let resp = app
278 .oneshot(search_request(&[("q", "barnacle")]))
279 .await
280 .unwrap();
281 let (status, body) = json_response(resp).await;
282 assert_eq!(status, StatusCode::OK);
283 assert_eq!(body["hits"], json!([]));
284 assert!(body["cursor"].is_null());
285}
286
287#[tokio::test]
288async fn indexed_issue_hydrates_typed_value() {
289 let h = Harness::new().await;
290 h.index_issue(
291 &did("did:plc:nel"),
292 &rkey("abcabcabcabcz"),
293 "barnacle pagination overflow",
294 "scrolling resets when the cursor wraps",
295 )
296 .await;
297 let app = router(h.state.clone());
298 let resp = app
299 .oneshot(search_request(&[("q", "barnacle")]))
300 .await
301 .unwrap();
302 let (status, body) = json_response(resp).await;
303 assert_eq!(status, StatusCode::OK);
304 let hits = body["hits"].as_array().expect("hits array");
305 assert_eq!(hits.len(), 1);
306 assert_eq!(
307 hits[0]["uri"].as_str().unwrap(),
308 "at://did:plc:nel/sh.tangled.repo.issue/abcabcabcabcz",
309 );
310 assert_eq!(hits[0]["nsid"], json!("sh.tangled.repo.issue"));
311 assert_eq!(hits[0]["cid"], CID);
312 assert_eq!(
313 hits[0]["value"]["$type"],
314 json!("sh.tangled.repo.issue"),
315 "hit value is typed via SearchableRecord serialization",
316 );
317 assert_eq!(
318 hits[0]["value"]["title"],
319 json!("barnacle pagination overflow"),
320 );
321 assert_eq!(hits[0]["value"]["repo"], json!("did:plc:abalone"));
322 assert!(hits[0]["score"].as_f64().unwrap() > 0.0);
323}
324
325#[tokio::test]
326async fn nsid_filter_narrows_to_single_collection() {
327 let h = Harness::new().await;
328 h.index_issue(
329 &did("did:plc:nel"),
330 &rkey("i1"),
331 "anemone tide",
332 "high water",
333 )
334 .await;
335 h.index_string(
336 &did("did:plc:teq"),
337 &rkey("k1"),
338 "anemone.md",
339 "anemone notes",
340 )
341 .await;
342 let app = router(h.state.clone());
343 let resp = app
344 .clone()
345 .oneshot(search_request(&[
346 ("q", "anemone"),
347 ("nsid", "sh.tangled.string"),
348 ]))
349 .await
350 .unwrap();
351 let (status, body) = json_response(resp).await;
352 assert_eq!(status, StatusCode::OK);
353 let hits = body["hits"].as_array().unwrap();
354 assert_eq!(hits.len(), 1);
355 assert_eq!(hits[0]["nsid"], json!("sh.tangled.string"));
356 assert_eq!(hits[0]["value"]["$type"], json!("sh.tangled.string"));
357 assert_eq!(hits[0]["value"]["filename"], json!("anemone.md"));
358}
359
360#[tokio::test]
361async fn hit_set_stable_across_coverage_promotion() {
362 let h = Harness::new().await;
363 h.index_issue(
364 &did("did:plc:nel"),
365 &rkey("i1"),
366 "limpet survey",
367 "tidal pools",
368 )
369 .await;
370 let app = router(h.state.clone());
371 h.warming(3, 5);
372 let (_, before) = json_response(
373 app.clone()
374 .oneshot(search_request(&[("q", "limpet")]))
375 .await
376 .unwrap(),
377 )
378 .await;
379 let before_hits = before["hits"].as_array().unwrap().clone();
380 assert_eq!(before_hits.len(), 1);
381
382 h.promote_ready(9, 11);
383 let (_, after) = json_response(
384 app.oneshot(search_request(&[("q", "limpet")]))
385 .await
386 .unwrap(),
387 )
388 .await;
389 let after_hits = after["hits"].as_array().unwrap();
390 assert_eq!(after_hits.len(), before_hits.len());
391 assert_eq!(after_hits[0]["uri"], before_hits[0]["uri"]);
392}
393
394#[tokio::test]
395async fn pagination_round_trips_via_cursor() {
396 let h = Harness::new().await;
397 let names = ["nel", "olaren", "teq", "lyna", "bailey"];
398 for (i, owner) in names.iter().enumerate() {
399 h.index_issue(
400 &did(&format!("did:plc:{owner}")),
401 &rkey(&format!("r{i}")),
402 "anemone tides",
403 "shell",
404 )
405 .await;
406 }
407 let app = router(h.state.clone());
408 let (_, page1) = json_response(
409 app.clone()
410 .oneshot(search_request(&[("q", "anemone"), ("limit", "2")]))
411 .await
412 .unwrap(),
413 )
414 .await;
415 assert_eq!(page1["hits"].as_array().unwrap().len(), 2);
416 let cursor = page1["cursor"].as_str().expect("more pages").to_owned();
417
418 let (_, page2) = json_response(
419 app.clone()
420 .oneshot(search_request(&[
421 ("q", "anemone"),
422 ("limit", "2"),
423 ("cursor", &cursor),
424 ]))
425 .await
426 .unwrap(),
427 )
428 .await;
429 assert_eq!(page2["hits"].as_array().unwrap().len(), 2);
430 let cursor2 = page2["cursor"].as_str().expect("more pages").to_owned();
431
432 let (_, page3) = json_response(
433 app.oneshot(search_request(&[
434 ("q", "anemone"),
435 ("limit", "2"),
436 ("cursor", &cursor2),
437 ]))
438 .await
439 .unwrap(),
440 )
441 .await;
442 assert_eq!(page3["hits"].as_array().unwrap().len(), 1);
443 assert!(page3["cursor"].is_null());
444}
445
446#[tokio::test]
447async fn empty_q_returns_400() {
448 let h = Harness::new().await;
449 let app = router(h.state.clone());
450 let resp = app.oneshot(search_request(&[("q", " ")])).await.unwrap();
451 let (status, body) = json_response(resp).await;
452 assert_eq!(status, StatusCode::BAD_REQUEST);
453 assert_eq!(body["error"], json!("InvalidRequest"));
454}
455
456#[tokio::test]
457async fn invalid_cursor_returns_400() {
458 let h = Harness::new().await;
459 let app = router(h.state.clone());
460 let resp = app
461 .oneshot(search_request(&[("q", "anything"), ("cursor", "not-hex")]))
462 .await
463 .unwrap();
464 let (status, body) = json_response(resp).await;
465 assert_eq!(status, StatusCode::BAD_REQUEST);
466 assert_eq!(body["error"], json!("InvalidRequest"));
467}
468
469#[tokio::test]
470async fn invalid_nsid_returns_400() {
471 let h = Harness::new().await;
472 let app = router(h.state.clone());
473 let resp = app
474 .oneshot(search_request(&[
475 ("q", "anything"),
476 ("nsid", "not a real nsid"),
477 ]))
478 .await
479 .unwrap();
480 let (status, body) = json_response(resp).await;
481 assert_eq!(status, StatusCode::BAD_REQUEST);
482 assert_eq!(body["error"], json!("InvalidRequest"));
483}
484
485#[tokio::test]
486async fn tombstoned_hit_silently_dropped_from_results() {
487 let h = Harness::new().await;
488 h.search
489 .upsert(SearchDoc {
490 uri: at("at://did:plc:nel/sh.tangled.repo.issue/i1"),
491 nsid: nsid("sh.tangled.repo.issue"),
492 title: "kelp".to_owned(),
493 body: "ocean".to_owned(),
494 author: None,
495 created_at: None,
496 repo: None,
497 })
498 .await;
499 h.search.flush().await;
500 h.index_issue(
501 &did("did:plc:teq"),
502 &rkey("i2"),
503 "kelp survives",
504 "still here",
505 )
506 .await;
507 let app = router(h.state.clone());
508 let resp = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap();
509 let (status, body) = json_response(resp).await;
510 assert_eq!(status, StatusCode::OK);
511 let hits = body["hits"].as_array().expect("hits array");
512 assert_eq!(hits.len(), 1, "tombstoned hit dropped, sibling kept");
513 assert_eq!(
514 hits[0]["uri"].as_str().unwrap(),
515 "at://did:plc:teq/sh.tangled.repo.issue/i2",
516 );
517}
518
519#[tokio::test]
520async fn upstream_5xx_during_hydration_drops_only_that_hit() {
521 let h = Harness::new().await;
522 h.index_issue(
523 &did("did:plc:nel"),
524 &rkey("i1"),
525 "kelp survey",
526 "still here",
527 )
528 .await;
529 h.search
530 .upsert(SearchDoc {
531 uri: at("at://did:plc:teq/sh.tangled.repo.issue/i2"),
532 nsid: nsid("sh.tangled.repo.issue"),
533 title: "kelp drift".to_owned(),
534 body: "upstream is flaky".to_owned(),
535 author: Some(did("did:plc:teq")),
536 created_at: None,
537 repo: None,
538 })
539 .await;
540 h.search.flush().await;
541 Mock::given(method("GET"))
542 .and(path("/xrpc/com.atproto.repo.getRecord"))
543 .and(query_param("repo", "did:plc:teq"))
544 .and(query_param("collection", "sh.tangled.repo.issue"))
545 .and(query_param("rkey", "i2"))
546 .respond_with(ResponseTemplate::new(503))
547 .mount(&h.server)
548 .await;
549 let app = router(h.state.clone());
550 let resp = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap();
551 let (status, body) = json_response(resp).await;
552 assert_eq!(status, StatusCode::OK);
553 let hits = body["hits"].as_array().expect("hits array");
554 assert_eq!(hits.len(), 1, "flaky hit dropped, healthy sibling kept");
555 assert_eq!(
556 hits[0]["uri"].as_str().unwrap(),
557 "at://did:plc:nel/sh.tangled.repo.issue/i1",
558 );
559}
560
561#[tokio::test]
562async fn second_query_short_circuits_via_lru_without_re_querying_slingshot() {
563 let h = Harness::new().await;
564 h.search
565 .upsert(SearchDoc {
566 uri: at("at://did:plc:nel/sh.tangled.repo.issue/i1"),
567 nsid: nsid("sh.tangled.repo.issue"),
568 title: "kelp".to_owned(),
569 body: "ocean".to_owned(),
570 author: None,
571 created_at: None,
572 repo: None,
573 })
574 .await;
575 h.search.flush().await;
576 Mock::given(method("GET"))
577 .and(path("/xrpc/com.atproto.repo.getRecord"))
578 .and(query_param("repo", "did:plc:nel"))
579 .and(query_param("collection", "sh.tangled.repo.issue"))
580 .and(query_param("rkey", "i1"))
581 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
582 "uri": "at://did:plc:nel/sh.tangled.repo.issue/i1",
583 "cid": CID,
584 "value": {
585 "$type": "sh.tangled.repo.issue",
586 "repo": "did:plc:abalone",
587 "title": "kelp",
588 "body": "ocean",
589 "createdAt": "2026-05-01T00:00:00Z"
590 }
591 })))
592 .expect(1)
593 .mount(&h.server)
594 .await;
595 let app = router(h.state.clone());
596 let first = app
597 .clone()
598 .oneshot(search_request(&[("q", "kelp")]))
599 .await
600 .unwrap();
601 let _ = json_response(first).await;
602 let second = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap();
603 let _ = json_response(second).await;
604}
605
606#[tokio::test]
607async fn author_filter_narrows_to_matching_did() {
608 let h = Harness::new().await;
609 h.index_issue(&did("did:plc:nel"), &rkey("i1"), "kelp tide", "")
610 .await;
611 h.index_issue(&did("did:plc:teq"), &rkey("i2"), "kelp wave", "")
612 .await;
613 let app = router(h.state.clone());
614 let resp = app
615 .oneshot(search_request(&[("q", "kelp"), ("author", "did:plc:nel")]))
616 .await
617 .unwrap();
618 let (status, body) = json_response(resp).await;
619 assert_eq!(status, StatusCode::OK);
620 let hits = body["hits"].as_array().unwrap();
621 assert_eq!(hits.len(), 1);
622 assert!(
623 hits[0]["uri"]
624 .as_str()
625 .unwrap()
626 .starts_with("at://did:plc:nel/")
627 );
628}
629
630#[tokio::test]
631async fn since_until_window_filters_by_created_at() {
632 let h = Harness::new().await;
633 let early = 1_700_000_000;
634 let mid = 1_750_000_000;
635 let late = 1_800_000_000;
636 h.index_issue_at(
637 &did("did:plc:nel"),
638 &rkey("i1"),
639 "kelp early",
640 "",
641 Some(early),
642 None,
643 )
644 .await;
645 h.index_issue_at(
646 &did("did:plc:nel"),
647 &rkey("i2"),
648 "kelp mid",
649 "",
650 Some(mid),
651 None,
652 )
653 .await;
654 h.index_issue_at(
655 &did("did:plc:nel"),
656 &rkey("i3"),
657 "kelp late",
658 "",
659 Some(late),
660 None,
661 )
662 .await;
663 let app = router(h.state.clone());
664 let resp = app
665 .oneshot(search_request(&[
666 ("q", "kelp"),
667 ("since", "2025-01-01T00:00:00Z"),
668 ("until", "2027-01-01T00:00:00Z"),
669 ]))
670 .await
671 .unwrap();
672 let (status, body) = json_response(resp).await;
673 assert_eq!(status, StatusCode::OK);
674 let hits = body["hits"].as_array().unwrap();
675 assert_eq!(hits.len(), 1, "only the mid record falls in [2025, 2027)");
676 assert_eq!(
677 hits[0]["uri"].as_str().unwrap(),
678 "at://did:plc:nel/sh.tangled.repo.issue/i2"
679 );
680}
681
682#[tokio::test]
683async fn repo_filter_scopes_to_owning_repo() {
684 let h = Harness::new().await;
685 let abalone = did("did:plc:abalone");
686 let limpet = did("did:plc:limpet");
687 h.index_issue_at(
688 &did("did:plc:nel"),
689 &rkey("i1"),
690 "kelp one",
691 "",
692 None,
693 Some(&abalone),
694 )
695 .await;
696 h.index_issue_at(
697 &did("did:plc:teq"),
698 &rkey("i2"),
699 "kelp two",
700 "",
701 None,
702 Some(&limpet),
703 )
704 .await;
705 let app = router(h.state.clone());
706 let resp = app
707 .oneshot(search_request(&[("q", "kelp"), ("repo", abalone.as_ref())]))
708 .await
709 .unwrap();
710 let (status, body) = json_response(resp).await;
711 assert_eq!(status, StatusCode::OK);
712 let hits = body["hits"].as_array().unwrap();
713 assert_eq!(hits.len(), 1);
714 assert_eq!(
715 hits[0]["uri"].as_str().unwrap(),
716 "at://did:plc:nel/sh.tangled.repo.issue/i1"
717 );
718}
719
720#[tokio::test]
721async fn invalid_author_did_returns_400() {
722 let h = Harness::new().await;
723 let app = router(h.state.clone());
724 let resp = app
725 .oneshot(search_request(&[("q", "kelp"), ("author", "not-a-did")]))
726 .await
727 .unwrap();
728 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
729}
730
731#[tokio::test]
732async fn invalid_since_returns_400() {
733 let h = Harness::new().await;
734 let app = router(h.state.clone());
735 let resp = app
736 .oneshot(search_request(&[("q", "kelp"), ("since", "yesterday")]))
737 .await
738 .unwrap();
739 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
740}
741
742#[tokio::test]
743async fn since_after_until_returns_400() {
744 let h = Harness::new().await;
745 let app = router(h.state.clone());
746 let resp = app
747 .oneshot(search_request(&[
748 ("q", "kelp"),
749 ("since", "2027-01-01T00:00:00Z"),
750 ("until", "2025-01-01T00:00:00Z"),
751 ]))
752 .await
753 .unwrap();
754 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
755}
756
757#[tokio::test]
758async fn search_recovers_hit_with_duplicate_dollar_type() {
759 let h = Harness::new().await;
760 let owner = did("did:plc:nel");
761 let key = rkey("dupezzzzzzzzz");
762 h.search
763 .upsert(SearchDoc {
764 uri: at(&format!(
765 "at://{}/sh.tangled.repo.issue/{}",
766 owner.as_ref(),
767 key.as_ref()
768 )),
769 nsid: nsid("sh.tangled.repo.issue"),
770 title: "meow regression".to_owned(),
771 body: "dup type field at end of object".to_owned(),
772 author: Some(owner.clone()),
773 created_at: None,
774 repo: None,
775 })
776 .await;
777 h.search.flush().await;
778 let raw = r#"{"$type":"sh.tangled.repo.issue","repo":"did:plc:scallop","title":"meow regression","createdAt":"2026-05-01T00:00:00Z","$type":"sh.tangled.repo.issue"}"#;
779 h.mount_issue_with_raw_value(&owner, &key, raw).await;
780 let app = router(h.state.clone());
781 let resp = app.oneshot(search_request(&[("q", "meow")])).await.unwrap();
782 let (status, body) = json_response(resp).await;
783 assert_eq!(status, StatusCode::OK);
784 let hits = body["hits"].as_array().expect("hits array");
785 assert_eq!(hits.len(), 1);
786 assert_eq!(hits[0]["value"]["title"], json!("meow regression"));
787 assert_eq!(hits[0]["value"]["repo"], json!("did:plc:scallop"));
788}
789
790#[tokio::test]
791async fn search_drops_undecodable_hit_and_returns_others() {
792 let h = Harness::new().await;
793 let good_owner = did("did:plc:nel");
794 let good_key = rkey("goodzzzzzzzzz");
795 let bad_owner = did("did:plc:teq");
796 let bad_key = rkey("badzzzzzzzzzz");
797 h.index_issue(&good_owner, &good_key, "kelp survey", "good body")
798 .await;
799 h.search
800 .upsert(SearchDoc {
801 uri: at(&format!(
802 "at://{}/sh.tangled.repo.issue/{}",
803 bad_owner.as_ref(),
804 bad_key.as_ref()
805 )),
806 nsid: nsid("sh.tangled.repo.issue"),
807 title: "kelp drift".to_owned(),
808 body: "missing required fields".to_owned(),
809 author: Some(bad_owner.clone()),
810 created_at: None,
811 repo: None,
812 })
813 .await;
814 h.search.flush().await;
815 let unrecoverable = r#"{"$type":"sh.tangled.repo.issue"}"#;
816 h.mount_issue_with_raw_value(&bad_owner, &bad_key, unrecoverable)
817 .await;
818 let app = router(h.state.clone());
819 let resp = app.oneshot(search_request(&[("q", "kelp")])).await.unwrap();
820 let (status, body) = json_response(resp).await;
821 assert_eq!(status, StatusCode::OK);
822 let hits = body["hits"].as_array().expect("hits array");
823 assert_eq!(hits.len(), 1);
824 assert_eq!(
825 hits[0]["uri"].as_str().unwrap(),
826 format!(
827 "at://{}/sh.tangled.repo.issue/{}",
828 good_owner.as_ref(),
829 good_key.as_ref()
830 ),
831 );
832}