Monorepo for Tangled
tangled.org
1use std::future::Future;
2use std::ops::Bound;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bobbin_runtime::Clock;
8use bobbin_types::search::{SearchDoc, SearchSink};
9use jacquard_common::DefaultStr;
10use jacquard_common::types::nsid::Nsid;
11use jacquard_common::types::string::{AtUri, Did};
12use tantivy::collector::TopDocs;
13use tantivy::query::{
14 BooleanQuery, Occur, Query, QueryParser, QueryParserError, RangeQuery, TermQuery,
15};
16use tantivy::schema::{
17 FAST, Field, INDEXED, IndexRecordOption, STORED, STRING, Schema, TEXT, TextFieldIndexing,
18 TextOptions, Value,
19};
20use tantivy::{Index, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument, TantivyError, Term};
21use thiserror::Error;
22use tokio::sync::{mpsc, oneshot};
23use tracing::warn;
24
25pub const DEFAULT_WRITER_HEAP_BYTES: usize = 50_000_000;
26const BATCH_SIZE: usize = 200;
27const BATCH_INTERVAL: Duration = Duration::from_millis(250);
28const TITLE_BOOST: f32 = 4.0;
29const OFFSET_TOKEN_LEN: usize = 8;
30const WRITE_QUEUE_CAPACITY: usize = 4096;
31
32#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
33pub struct SearchOffset(u32);
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq, Error)]
36pub enum SearchOffsetError {
37 #[error("offset token must be {OFFSET_TOKEN_LEN} hex characters")]
38 Malformed,
39}
40
41impl SearchOffset {
42 pub const fn new(value: u32) -> Self {
43 Self(value)
44 }
45
46 pub const fn raw(self) -> u32 {
47 self.0
48 }
49
50 pub fn encode_token(self) -> String {
51 format!("{:0width$x}", self.0, width = OFFSET_TOKEN_LEN)
52 }
53
54 pub fn decode_token(token: &str) -> Result<Self, SearchOffsetError> {
55 if token.len() != OFFSET_TOKEN_LEN || !token.bytes().all(|b| b.is_ascii_hexdigit()) {
56 return Err(SearchOffsetError::Malformed);
57 }
58 u32::from_str_radix(token, 16)
59 .map(Self)
60 .map_err(|_| SearchOffsetError::Malformed)
61 }
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65pub enum SearchCursor {
66 Start,
67 At(SearchOffset),
68}
69
70impl SearchCursor {
71 pub fn from_token(raw: Option<&str>) -> Result<Self, SearchOffsetError> {
72 raw.map_or(Ok(Self::Start), |t| {
73 SearchOffset::decode_token(t).map(Self::At)
74 })
75 }
76
77 fn offset(self) -> u32 {
78 match self {
79 Self::Start => 0,
80 Self::At(o) => o.raw(),
81 }
82 }
83}
84
85#[derive(Debug)]
86pub struct SearchHit {
87 pub uri: AtUri<DefaultStr>,
88 pub nsid: Nsid<DefaultStr>,
89 pub score: f32,
90}
91
92#[derive(Debug)]
93pub struct SearchPage {
94 pub hits: Vec<SearchHit>,
95 pub next: Option<SearchOffset>,
96}
97
98#[derive(Debug, Error)]
99pub enum SearchError {
100 #[error("tantivy: {0}")]
101 Tantivy(#[from] TantivyError),
102 #[error("query parse: {0}")]
103 Query(#[from] QueryParserError),
104 #[error("indexed uri is invalid: {0}")]
105 InvalidUri(String),
106 #[error("indexed nsid is invalid: {0}")]
107 InvalidNsid(String),
108 #[error("indexed document missing field: {0}")]
109 MissingField(&'static str),
110 #[error("search blocking task cancelled: {0}")]
111 Cancelled(String),
112}
113
114#[derive(Clone, Copy)]
115struct Fields {
116 uri: Field,
117 nsid: Field,
118 title: Field,
119 body: Field,
120 author: Field,
121 created_at: Field,
122 repo: Field,
123}
124
125#[derive(Clone, Debug, Default)]
126pub struct SearchFilters {
127 pub nsid: Option<Nsid<DefaultStr>>,
128 pub author: Option<Did<DefaultStr>>,
129 pub repo: Option<Did<DefaultStr>>,
130 pub since: Option<i64>,
131 pub until: Option<i64>,
132}
133
134impl SearchFilters {
135 pub fn is_empty(&self) -> bool {
136 self.nsid.is_none()
137 && self.author.is_none()
138 && self.repo.is_none()
139 && self.since.is_none()
140 && self.until.is_none()
141 }
142}
143
144enum WriteOp {
145 Upsert(SearchDoc),
146 Remove(AtUri<DefaultStr>),
147 Flush(oneshot::Sender<()>),
148}
149
150pub struct SearchIndex {
151 inner: Arc<Inner>,
152}
153
154struct Inner {
155 fields: Fields,
156 reader: IndexReader,
157 parser: QueryParser,
158 tx: mpsc::Sender<WriteOp>,
159}
160
161impl SearchIndex {
162 pub fn new(heap_bytes: usize, clock: Arc<dyn Clock>) -> Result<Self, SearchError> {
163 let mut sb = Schema::builder();
164 let uri = sb.add_text_field("uri", STRING | STORED);
165 let nsid = sb.add_text_field("nsid", STRING | STORED);
166 let title_indexing = TextFieldIndexing::default()
167 .set_tokenizer("default")
168 .set_index_option(IndexRecordOption::WithFreqsAndPositions);
169 let title_opts = TextOptions::default()
170 .set_indexing_options(title_indexing)
171 .set_stored();
172 let title = sb.add_text_field("title", title_opts);
173 let body = sb.add_text_field("body", TEXT);
174 let author = sb.add_text_field("author", STRING | STORED);
175 let created_at = sb.add_i64_field("created_at", INDEXED | FAST | STORED);
176 let repo = sb.add_text_field("repo", STRING | STORED);
177 let schema = sb.build();
178 let index = Index::create_in_ram(schema);
179 let writer: IndexWriter = index.writer(heap_bytes)?;
180 let reader = index
181 .reader_builder()
182 .reload_policy(ReloadPolicy::Manual)
183 .try_into()?;
184 let mut parser = QueryParser::for_index(&index, vec![title, body]);
185 parser.set_field_boost(title, TITLE_BOOST);
186 let fields = Fields {
187 uri,
188 nsid,
189 title,
190 body,
191 author,
192 created_at,
193 repo,
194 };
195 let (tx, rx) = mpsc::channel(WRITE_QUEUE_CAPACITY);
196 let writer_reader = reader.clone();
197 tokio::spawn(writer_loop(writer, fields, writer_reader, rx, clock));
198 Ok(Self {
199 inner: Arc::new(Inner {
200 fields,
201 reader,
202 parser,
203 tx,
204 }),
205 })
206 }
207
208 pub async fn flush(&self) {
209 let (done_tx, done_rx) = oneshot::channel();
210 if self.inner.tx.send(WriteOp::Flush(done_tx)).await.is_err() {
211 warn!("search writer channel closed before flush");
212 return;
213 }
214 if done_rx.await.is_err() {
215 warn!("search writer dropped before completing flush");
216 }
217 }
218
219 pub async fn search(
220 &self,
221 q: &str,
222 filters: SearchFilters,
223 cursor: SearchCursor,
224 limit: u32,
225 ) -> Result<SearchPage, SearchError> {
226 let trimmed = q.trim();
227 if trimmed.is_empty() {
228 return Ok(SearchPage {
229 hits: Vec::new(),
230 next: None,
231 });
232 }
233 let inner = self.inner.clone();
234 let q_owned = trimmed.to_owned();
235 let limit_usize = limit as usize;
236 let offset = cursor.offset() as usize;
237 match tokio::task::spawn_blocking(move || {
238 inner.search_blocking(&q_owned, &filters, offset, limit_usize)
239 })
240 .await
241 {
242 Ok(result) => result,
243 Err(e) if e.is_panic() => std::panic::resume_unwind(e.into_panic()),
244 Err(e) => Err(SearchError::Cancelled(e.to_string())),
245 }
246 }
247}
248
249#[derive(Clone, Copy, Debug, Default)]
250pub struct SearchSpaceUsage {
251 pub num_docs: u64,
252 pub num_segments: u64,
253 pub total_bytes: u64,
254}
255
256impl SearchIndex {
257 pub fn space_usage(&self) -> SearchSpaceUsage {
258 let searcher = self.inner.reader.searcher();
259 let segments = searcher.segment_readers();
260 let total_bytes = segments
261 .iter()
262 .map(|s| s.space_usage().map(|u| u.total().get_bytes()).unwrap_or(0))
263 .sum();
264 SearchSpaceUsage {
265 num_docs: searcher.num_docs(),
266 num_segments: segments.len() as u64,
267 total_bytes,
268 }
269 }
270}
271
272pub type SearchReadFuture<'a> =
273 Pin<Box<dyn Future<Output = Result<SearchPage, SearchError>> + Send + 'a>>;
274
275pub trait SearchReader: Send + Sync + 'static {
276 fn search<'a>(
277 &'a self,
278 query: &'a str,
279 filters: SearchFilters,
280 cursor: SearchCursor,
281 limit: u32,
282 ) -> SearchReadFuture<'a>;
283}
284
285impl SearchReader for SearchIndex {
286 fn search<'a>(
287 &'a self,
288 query: &'a str,
289 filters: SearchFilters,
290 cursor: SearchCursor,
291 limit: u32,
292 ) -> SearchReadFuture<'a> {
293 Box::pin(SearchIndex::search(self, query, filters, cursor, limit))
294 }
295}
296
297impl Inner {
298 fn search_blocking(
299 &self,
300 q: &str,
301 filters: &SearchFilters,
302 offset: usize,
303 limit: usize,
304 ) -> Result<SearchPage, SearchError> {
305 let parsed = self.parser.parse_query(q)?;
306 let final_query: Box<dyn Query> = if filters.is_empty() {
307 parsed
308 } else {
309 let mut clauses: Vec<(Occur, Box<dyn Query>)> = Vec::with_capacity(5);
310 clauses.push((Occur::Must, parsed));
311 if let Some(n) = &filters.nsid {
312 let term = Term::from_field_text(self.fields.nsid, n.as_ref());
313 clauses.push((
314 Occur::Must,
315 Box::new(TermQuery::new(term, IndexRecordOption::Basic)),
316 ));
317 }
318 if let Some(a) = &filters.author {
319 let term = Term::from_field_text(self.fields.author, a.as_ref());
320 clauses.push((
321 Occur::Must,
322 Box::new(TermQuery::new(term, IndexRecordOption::Basic)),
323 ));
324 }
325 if let Some(r) = &filters.repo {
326 let term = Term::from_field_text(self.fields.repo, r.as_ref());
327 clauses.push((
328 Occur::Must,
329 Box::new(TermQuery::new(term, IndexRecordOption::Basic)),
330 ));
331 }
332 if filters.since.is_some() || filters.until.is_some() {
333 let lower = filters.since.map_or(Bound::Unbounded, |s| {
334 Bound::Included(Term::from_field_i64(self.fields.created_at, s))
335 });
336 let upper = filters.until.map_or(Bound::Unbounded, |u| {
337 Bound::Excluded(Term::from_field_i64(self.fields.created_at, u))
338 });
339 clauses.push((Occur::Must, Box::new(RangeQuery::new(lower, upper))));
340 }
341 Box::new(BooleanQuery::new(clauses))
342 };
343 let collector = TopDocs::with_limit(limit + 1)
344 .and_offset(offset)
345 .order_by_score();
346 let searcher = self.reader.searcher();
347 let raw_hits: Vec<(f32, tantivy::DocAddress)> =
348 searcher.search(&final_query, &collector)?;
349 let has_more = raw_hits.len() > limit;
350 let page_slice = &raw_hits[..raw_hits.len().min(limit)];
351 let hits = page_slice
352 .iter()
353 .map(|(score, addr)| {
354 let doc: TantivyDocument = searcher.doc(*addr)?;
355 let uri_str = stored_text(&doc, self.fields.uri, "uri")?;
356 let nsid_str = stored_text(&doc, self.fields.nsid, "nsid")?;
357 let uri = AtUri::<DefaultStr>::new_owned(uri_str.as_str())
358 .map_err(|e| SearchError::InvalidUri(format!("{e}: {uri_str}")))?;
359 let nsid = Nsid::<DefaultStr>::new_owned(nsid_str.as_str())
360 .map_err(|e| SearchError::InvalidNsid(format!("{e}: {nsid_str}")))?;
361 Ok::<_, SearchError>(SearchHit {
362 uri,
363 nsid,
364 score: *score,
365 })
366 })
367 .collect::<Result<Vec<_>, _>>()?;
368 Ok(SearchPage {
369 hits,
370 next: next_cursor(offset, limit, has_more),
371 })
372 }
373}
374
375fn next_cursor(offset: usize, limit: usize, has_more: bool) -> Option<SearchOffset> {
376 has_more
377 .then(|| u32::try_from(offset.saturating_add(limit)).ok())
378 .flatten()
379 .map(SearchOffset::new)
380}
381
382async fn writer_loop(
383 mut writer: IndexWriter,
384 fields: Fields,
385 reader: IndexReader,
386 mut rx: mpsc::Receiver<WriteOp>,
387 clock: Arc<dyn Clock>,
388) {
389 let mut pending: usize = 0;
390 let mut deadline = clock.now_instant() + BATCH_INTERVAL;
391 loop {
392 let outcome = tokio::select! {
393 biased;
394 msg = rx.recv() => RecvOutcome::Message(msg),
395 _ = clock.sleep_until(deadline) => RecvOutcome::Idle,
396 };
397 match outcome {
398 RecvOutcome::Message(Some(WriteOp::Upsert(doc))) => {
399 apply_upsert(&mut writer, fields, doc);
400 pending += 1;
401 }
402 RecvOutcome::Message(Some(WriteOp::Remove(uri))) => {
403 apply_remove(&mut writer, fields, &uri);
404 pending += 1;
405 }
406 RecvOutcome::Message(Some(WriteOp::Flush(done))) => {
407 if pending > 0 {
408 commit_and_reload(&mut writer, &reader);
409 pending = 0;
410 }
411 let _ = done.send(());
412 deadline = clock.now_instant() + BATCH_INTERVAL;
413 continue;
414 }
415 RecvOutcome::Message(None) => break,
416 RecvOutcome::Idle => {
417 if pending > 0 {
418 commit_and_reload(&mut writer, &reader);
419 pending = 0;
420 }
421 deadline = clock.now_instant() + BATCH_INTERVAL;
422 continue;
423 }
424 }
425 if pending >= BATCH_SIZE {
426 commit_and_reload(&mut writer, &reader);
427 pending = 0;
428 }
429 }
430 if pending > 0 {
431 commit_and_reload(&mut writer, &reader);
432 }
433}
434
435enum RecvOutcome {
436 Message(Option<WriteOp>),
437 Idle,
438}
439
440fn apply_upsert(writer: &mut IndexWriter, fields: Fields, doc: SearchDoc) {
441 let uri_term = Term::from_field_text(fields.uri, doc.uri.as_ref());
442 writer.delete_term(uri_term);
443 let mut td = TantivyDocument::default();
444 td.add_text(fields.uri, doc.uri.as_ref());
445 td.add_text(fields.nsid, doc.nsid.as_ref());
446 td.add_text(fields.title, &doc.title);
447 td.add_text(fields.body, &doc.body);
448 if let Some(author) = &doc.author {
449 td.add_text(fields.author, author.as_ref());
450 }
451 if let Some(ts) = doc.created_at {
452 td.add_i64(fields.created_at, ts);
453 }
454 if let Some(repo) = &doc.repo {
455 td.add_text(fields.repo, repo.as_ref());
456 }
457 if let Err(e) = writer.add_document(td) {
458 warn!(?e, "search add_document failed");
459 }
460}
461
462fn apply_remove(writer: &mut IndexWriter, fields: Fields, uri: &AtUri<DefaultStr>) {
463 let term = Term::from_field_text(fields.uri, uri.as_ref());
464 writer.delete_term(term);
465}
466
467fn commit_and_reload(writer: &mut IndexWriter, reader: &IndexReader) {
468 match writer.commit() {
469 Ok(_) => {
470 if let Err(e) = reader.reload() {
471 warn!(?e, "search reader reload failed");
472 }
473 }
474 Err(e) => warn!(?e, "search index commit failed"),
475 }
476}
477
478fn stored_text(
479 doc: &TantivyDocument,
480 field: Field,
481 name: &'static str,
482) -> Result<String, SearchError> {
483 doc.get_first(field)
484 .and_then(|v| v.as_str().map(|s| s.to_owned()))
485 .ok_or(SearchError::MissingField(name))
486}
487
488impl SearchSink for SearchIndex {
489 async fn upsert(&self, doc: SearchDoc) {
490 if self.inner.tx.send(WriteOp::Upsert(doc)).await.is_err() {
491 warn!("search writer channel closed; upsert dropped");
492 }
493 }
494
495 async fn remove(&self, uri: &AtUri<DefaultStr>) {
496 if self
497 .inner
498 .tx
499 .send(WriteOp::Remove(uri.clone()))
500 .await
501 .is_err()
502 {
503 warn!("search writer channel closed; remove dropped");
504 }
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511 use bobbin_runtime::SystemClock;
512 use bobbin_types::search::SearchDoc;
513 use jacquard_common::types::nsid::Nsid as NsidType;
514
515 fn at(s: &str) -> AtUri<DefaultStr> {
516 AtUri::new_owned(s).unwrap()
517 }
518
519 fn nsid(s: &'static str) -> NsidType<DefaultStr> {
520 NsidType::new_static(s).unwrap()
521 }
522
523 fn doc(
524 uri: AtUri<DefaultStr>,
525 nsid: NsidType<DefaultStr>,
526 title: &str,
527 body: &str,
528 ) -> SearchDoc {
529 SearchDoc {
530 uri,
531 nsid,
532 title: title.to_owned(),
533 body: body.to_owned(),
534 author: None,
535 created_at: None,
536 repo: None,
537 }
538 }
539
540 fn build() -> SearchIndex {
541 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap()
542 }
543
544 #[tokio::test]
545 async fn upsert_then_query_returns_matching_doc() {
546 let idx = build();
547 idx.upsert(doc(
548 at("at://did:plc:nel/sh.tangled.repo.issue/r1"),
549 nsid("sh.tangled.repo.issue"),
550 "barnacle pagination",
551 "scroll resets",
552 ))
553 .await;
554 idx.upsert(doc(
555 at("at://did:plc:teq/sh.tangled.repo.issue/r2"),
556 nsid("sh.tangled.repo.issue"),
557 "kelp grew sideways",
558 "kelp",
559 ))
560 .await;
561 idx.flush().await;
562
563 let page = idx
564 .search(
565 "barnacle",
566 SearchFilters::default(),
567 SearchCursor::Start,
568 10,
569 )
570 .await
571 .unwrap();
572 assert_eq!(page.hits.len(), 1);
573 assert_eq!(
574 page.hits[0].uri.as_ref(),
575 "at://did:plc:nel/sh.tangled.repo.issue/r1"
576 );
577 assert!(page.next.is_none());
578 }
579
580 #[tokio::test]
581 async fn query_filters_by_nsid() {
582 let idx = build();
583 idx.upsert(doc(
584 at("at://did:plc:nel/sh.tangled.repo.issue/r1"),
585 nsid("sh.tangled.repo.issue"),
586 "anemone tide",
587 "",
588 ))
589 .await;
590 idx.upsert(doc(
591 at("at://did:plc:teq/sh.tangled.string/k1"),
592 nsid("sh.tangled.string"),
593 "anemone.md",
594 "anemone again",
595 ))
596 .await;
597 idx.flush().await;
598
599 let only_strings = idx
600 .search(
601 "anemone",
602 SearchFilters {
603 nsid: Some(nsid("sh.tangled.string")),
604 ..SearchFilters::default()
605 },
606 SearchCursor::Start,
607 10,
608 )
609 .await
610 .unwrap();
611 assert_eq!(only_strings.hits.len(), 1);
612 assert_eq!(only_strings.hits[0].nsid.as_ref(), "sh.tangled.string");
613 }
614
615 #[tokio::test]
616 async fn upsert_replaces_prior_document_for_same_uri() {
617 let idx = build();
618 let uri = at("at://did:plc:nel/sh.tangled.repo.issue/r1");
619 idx.upsert(doc(
620 uri.clone(),
621 nsid("sh.tangled.repo.issue"),
622 "abalone",
623 "",
624 ))
625 .await;
626 idx.upsert(doc(uri, nsid("sh.tangled.repo.issue"), "limpet", ""))
627 .await;
628 idx.flush().await;
629
630 let abalone_hits = idx
631 .search("abalone", SearchFilters::default(), SearchCursor::Start, 10)
632 .await
633 .unwrap();
634 assert!(abalone_hits.hits.is_empty(), "old title must be evicted");
635
636 let limpet_hits = idx
637 .search("limpet", SearchFilters::default(), SearchCursor::Start, 10)
638 .await
639 .unwrap();
640 assert_eq!(limpet_hits.hits.len(), 1);
641 }
642
643 #[tokio::test]
644 async fn remove_drops_document_from_index() {
645 let idx = build();
646 let uri = at("at://did:plc:nel/sh.tangled.repo.issue/r1");
647 idx.upsert(doc(
648 uri.clone(),
649 nsid("sh.tangled.repo.issue"),
650 "whelk",
651 "shell",
652 ))
653 .await;
654 idx.remove(&uri).await;
655 idx.flush().await;
656 let hits = idx
657 .search("whelk", SearchFilters::default(), SearchCursor::Start, 10)
658 .await
659 .unwrap();
660 assert!(hits.hits.is_empty());
661 }
662
663 #[tokio::test]
664 async fn pagination_advances_cursor() {
665 let idx = build();
666 let names = ["nel", "olaren", "teq", "lyna", "bailey"];
667 for (i, owner) in names.iter().enumerate() {
668 idx.upsert(doc(
669 at(&format!("at://did:plc:{owner}/sh.tangled.repo.issue/r{i}")),
670 nsid("sh.tangled.repo.issue"),
671 "anemone",
672 "tides",
673 ))
674 .await;
675 }
676 idx.flush().await;
677
678 let page1 = idx
679 .search("anemone", SearchFilters::default(), SearchCursor::Start, 2)
680 .await
681 .unwrap();
682 assert_eq!(page1.hits.len(), 2);
683 let next = page1.next.expect("more pages");
684 let page2 = idx
685 .search(
686 "anemone",
687 SearchFilters::default(),
688 SearchCursor::At(next),
689 2,
690 )
691 .await
692 .unwrap();
693 assert_eq!(page2.hits.len(), 2);
694 let next2 = page2.next.expect("more pages");
695 let page3 = idx
696 .search(
697 "anemone",
698 SearchFilters::default(),
699 SearchCursor::At(next2),
700 2,
701 )
702 .await
703 .unwrap();
704 assert_eq!(page3.hits.len(), 1);
705 assert!(page3.next.is_none());
706 }
707
708 #[tokio::test]
709 async fn empty_query_short_circuits_to_empty_page() {
710 let idx = build();
711 idx.upsert(doc(
712 at("at://did:plc:nel/sh.tangled.repo.issue/r1"),
713 nsid("sh.tangled.repo.issue"),
714 "abalone",
715 "",
716 ))
717 .await;
718 idx.flush().await;
719 let page = idx
720 .search(" ", SearchFilters::default(), SearchCursor::Start, 10)
721 .await
722 .unwrap();
723 assert!(page.hits.is_empty());
724 assert!(page.next.is_none());
725 }
726
727 #[tokio::test]
728 async fn flush_without_pending_ops_completes() {
729 let idx = build();
730 idx.flush().await;
731 }
732
733 #[tokio::test]
734 async fn writer_auto_commits_on_idle_interval() {
735 let idx = build();
736 idx.upsert(doc(
737 at("at://did:plc:nel/sh.tangled.repo.issue/r1"),
738 nsid("sh.tangled.repo.issue"),
739 "auto",
740 "",
741 ))
742 .await;
743 tokio::time::sleep(BATCH_INTERVAL * 3).await;
744 let page = idx
745 .search("auto", SearchFilters::default(), SearchCursor::Start, 10)
746 .await
747 .unwrap();
748 assert_eq!(page.hits.len(), 1);
749 }
750
751 #[test]
752 fn next_cursor_advances_by_limit_when_more_results() {
753 assert_eq!(next_cursor(0, 10, true), Some(SearchOffset::new(10)));
754 assert_eq!(next_cursor(10, 25, true), Some(SearchOffset::new(35)));
755 }
756
757 #[test]
758 fn next_cursor_is_none_when_no_more_results() {
759 assert_eq!(next_cursor(0, 10, false), None);
760 assert_eq!(next_cursor(99, 50, false), None);
761 }
762
763 #[test]
764 fn next_cursor_returns_none_on_u32_overflow_rather_than_wrapping() {
765 let max = u32::MAX as usize;
766 assert_eq!(next_cursor(max, 1, true), None);
767 assert_eq!(next_cursor(max - 5, 10, true), None);
768 assert_eq!(
769 next_cursor(max - 10, 10, true),
770 Some(SearchOffset::new(u32::MAX))
771 );
772 }
773
774 #[test]
775 fn cursor_token_round_trips() {
776 let off = SearchOffset::new(0xc0fe);
777 let token = off.encode_token();
778 assert_eq!(token.len(), OFFSET_TOKEN_LEN);
779 assert_eq!(SearchOffset::decode_token(&token).unwrap(), off);
780 }
781
782 #[test]
783 fn cursor_decode_rejects_malformed() {
784 let bad = ["", "deadbeef0", "no-hex!!", "zzzzzzzz", "1234567"];
785 for s in bad {
786 assert!(matches!(
787 SearchOffset::decode_token(s),
788 Err(SearchOffsetError::Malformed),
789 ));
790 }
791 }
792}