Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1use std::future::Future; 2use std::ops::Bound; 3use std::pin::Pin; 4use std::sync::Arc; 5use std::time::Duration; 6 7use bobbin_runtime::Clock; 8use bobbin_types::search::{SearchDoc, SearchSink}; 9use jacquard_common::DefaultStr; 10use jacquard_common::types::nsid::Nsid; 11use jacquard_common::types::string::{AtUri, Did}; 12use tantivy::collector::TopDocs; 13use tantivy::query::{ 14 BooleanQuery, Occur, Query, QueryParser, QueryParserError, RangeQuery, TermQuery, 15}; 16use tantivy::schema::{ 17 FAST, Field, INDEXED, IndexRecordOption, STORED, STRING, Schema, TEXT, TextFieldIndexing, 18 TextOptions, Value, 19}; 20use tantivy::{Index, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument, TantivyError, Term}; 21use thiserror::Error; 22use tokio::sync::{mpsc, oneshot}; 23use tracing::warn; 24 25pub const DEFAULT_WRITER_HEAP_BYTES: usize = 50_000_000; 26const BATCH_SIZE: usize = 200; 27const BATCH_INTERVAL: Duration = Duration::from_millis(250); 28const TITLE_BOOST: f32 = 4.0; 29const OFFSET_TOKEN_LEN: usize = 8; 30const WRITE_QUEUE_CAPACITY: usize = 4096; 31 32#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] 33pub struct SearchOffset(u32); 34 35#[derive(Clone, Copy, Debug, Eq, PartialEq, Error)] 36pub enum SearchOffsetError { 37 #[error("offset token must be {OFFSET_TOKEN_LEN} hex characters")] 38 Malformed, 39} 40 41impl SearchOffset { 42 pub const fn new(value: u32) -> Self { 43 Self(value) 44 } 45 46 pub const fn raw(self) -> u32 { 47 self.0 48 } 49 50 pub fn encode_token(self) -> String { 51 format!("{:0width$x}", self.0, width = OFFSET_TOKEN_LEN) 52 } 53 54 pub fn decode_token(token: &str) -> Result<Self, SearchOffsetError> { 55 if token.len() != OFFSET_TOKEN_LEN || !token.bytes().all(|b| b.is_ascii_hexdigit()) { 56 return Err(SearchOffsetError::Malformed); 57 } 58 u32::from_str_radix(token, 16) 59 .map(Self) 60 .map_err(|_| SearchOffsetError::Malformed) 61 } 62} 63 64#[derive(Clone, Copy, Debug, Eq, PartialEq)] 65pub enum SearchCursor { 66 Start, 67 At(SearchOffset), 68} 69 70impl SearchCursor { 71 pub fn from_token(raw: Option<&str>) -> Result<Self, SearchOffsetError> { 72 raw.map_or(Ok(Self::Start), |t| { 73 SearchOffset::decode_token(t).map(Self::At) 74 }) 75 } 76 77 fn offset(self) -> u32 { 78 match self { 79 Self::Start => 0, 80 Self::At(o) => o.raw(), 81 } 82 } 83} 84 85#[derive(Debug)] 86pub struct SearchHit { 87 pub uri: AtUri<DefaultStr>, 88 pub nsid: Nsid<DefaultStr>, 89 pub score: f32, 90} 91 92#[derive(Debug)] 93pub struct SearchPage { 94 pub hits: Vec<SearchHit>, 95 pub next: Option<SearchOffset>, 96} 97 98#[derive(Debug, Error)] 99pub enum SearchError { 100 #[error("tantivy: {0}")] 101 Tantivy(#[from] TantivyError), 102 #[error("query parse: {0}")] 103 Query(#[from] QueryParserError), 104 #[error("indexed uri is invalid: {0}")] 105 InvalidUri(String), 106 #[error("indexed nsid is invalid: {0}")] 107 InvalidNsid(String), 108 #[error("indexed document missing field: {0}")] 109 MissingField(&'static str), 110 #[error("search blocking task cancelled: {0}")] 111 Cancelled(String), 112} 113 114#[derive(Clone, Copy)] 115struct Fields { 116 uri: Field, 117 nsid: Field, 118 title: Field, 119 body: Field, 120 author: Field, 121 created_at: Field, 122 repo: Field, 123} 124 125#[derive(Clone, Debug, Default)] 126pub struct SearchFilters { 127 pub nsid: Option<Nsid<DefaultStr>>, 128 pub author: Option<Did<DefaultStr>>, 129 pub repo: Option<Did<DefaultStr>>, 130 pub since: Option<i64>, 131 pub until: Option<i64>, 132} 133 134impl SearchFilters { 135 pub fn is_empty(&self) -> bool { 136 self.nsid.is_none() 137 && self.author.is_none() 138 && self.repo.is_none() 139 && self.since.is_none() 140 && self.until.is_none() 141 } 142} 143 144enum WriteOp { 145 Upsert(SearchDoc), 146 Remove(AtUri<DefaultStr>), 147 Flush(oneshot::Sender<()>), 148} 149 150pub struct SearchIndex { 151 inner: Arc<Inner>, 152} 153 154struct Inner { 155 fields: Fields, 156 reader: IndexReader, 157 parser: QueryParser, 158 tx: mpsc::Sender<WriteOp>, 159} 160 161impl SearchIndex { 162 pub fn new(heap_bytes: usize, clock: Arc<dyn Clock>) -> Result<Self, SearchError> { 163 let mut sb = Schema::builder(); 164 let uri = sb.add_text_field("uri", STRING | STORED); 165 let nsid = sb.add_text_field("nsid", STRING | STORED); 166 let title_indexing = TextFieldIndexing::default() 167 .set_tokenizer("default") 168 .set_index_option(IndexRecordOption::WithFreqsAndPositions); 169 let title_opts = TextOptions::default() 170 .set_indexing_options(title_indexing) 171 .set_stored(); 172 let title = sb.add_text_field("title", title_opts); 173 let body = sb.add_text_field("body", TEXT); 174 let author = sb.add_text_field("author", STRING | STORED); 175 let created_at = sb.add_i64_field("created_at", INDEXED | FAST | STORED); 176 let repo = sb.add_text_field("repo", STRING | STORED); 177 let schema = sb.build(); 178 let index = Index::create_in_ram(schema); 179 let writer: IndexWriter = index.writer(heap_bytes)?; 180 let reader = index 181 .reader_builder() 182 .reload_policy(ReloadPolicy::Manual) 183 .try_into()?; 184 let mut parser = QueryParser::for_index(&index, vec![title, body]); 185 parser.set_field_boost(title, TITLE_BOOST); 186 let fields = Fields { 187 uri, 188 nsid, 189 title, 190 body, 191 author, 192 created_at, 193 repo, 194 }; 195 let (tx, rx) = mpsc::channel(WRITE_QUEUE_CAPACITY); 196 let writer_reader = reader.clone(); 197 tokio::spawn(writer_loop(writer, fields, writer_reader, rx, clock)); 198 Ok(Self { 199 inner: Arc::new(Inner { 200 fields, 201 reader, 202 parser, 203 tx, 204 }), 205 }) 206 } 207 208 pub async fn flush(&self) { 209 let (done_tx, done_rx) = oneshot::channel(); 210 if self.inner.tx.send(WriteOp::Flush(done_tx)).await.is_err() { 211 warn!("search writer channel closed before flush"); 212 return; 213 } 214 if done_rx.await.is_err() { 215 warn!("search writer dropped before completing flush"); 216 } 217 } 218 219 pub async fn search( 220 &self, 221 q: &str, 222 filters: SearchFilters, 223 cursor: SearchCursor, 224 limit: u32, 225 ) -> Result<SearchPage, SearchError> { 226 let trimmed = q.trim(); 227 if trimmed.is_empty() { 228 return Ok(SearchPage { 229 hits: Vec::new(), 230 next: None, 231 }); 232 } 233 let inner = self.inner.clone(); 234 let q_owned = trimmed.to_owned(); 235 let limit_usize = limit as usize; 236 let offset = cursor.offset() as usize; 237 match tokio::task::spawn_blocking(move || { 238 inner.search_blocking(&q_owned, &filters, offset, limit_usize) 239 }) 240 .await 241 { 242 Ok(result) => result, 243 Err(e) if e.is_panic() => std::panic::resume_unwind(e.into_panic()), 244 Err(e) => Err(SearchError::Cancelled(e.to_string())), 245 } 246 } 247} 248 249#[derive(Clone, Copy, Debug, Default)] 250pub struct SearchSpaceUsage { 251 pub num_docs: u64, 252 pub num_segments: u64, 253 pub total_bytes: u64, 254} 255 256impl SearchIndex { 257 pub fn space_usage(&self) -> SearchSpaceUsage { 258 let searcher = self.inner.reader.searcher(); 259 let segments = searcher.segment_readers(); 260 let total_bytes = segments 261 .iter() 262 .map(|s| s.space_usage().map(|u| u.total().get_bytes()).unwrap_or(0)) 263 .sum(); 264 SearchSpaceUsage { 265 num_docs: searcher.num_docs(), 266 num_segments: segments.len() as u64, 267 total_bytes, 268 } 269 } 270} 271 272pub type SearchReadFuture<'a> = 273 Pin<Box<dyn Future<Output = Result<SearchPage, SearchError>> + Send + 'a>>; 274 275pub trait SearchReader: Send + Sync + 'static { 276 fn search<'a>( 277 &'a self, 278 query: &'a str, 279 filters: SearchFilters, 280 cursor: SearchCursor, 281 limit: u32, 282 ) -> SearchReadFuture<'a>; 283} 284 285impl SearchReader for SearchIndex { 286 fn search<'a>( 287 &'a self, 288 query: &'a str, 289 filters: SearchFilters, 290 cursor: SearchCursor, 291 limit: u32, 292 ) -> SearchReadFuture<'a> { 293 Box::pin(SearchIndex::search(self, query, filters, cursor, limit)) 294 } 295} 296 297impl Inner { 298 fn search_blocking( 299 &self, 300 q: &str, 301 filters: &SearchFilters, 302 offset: usize, 303 limit: usize, 304 ) -> Result<SearchPage, SearchError> { 305 let parsed = self.parser.parse_query(q)?; 306 let final_query: Box<dyn Query> = if filters.is_empty() { 307 parsed 308 } else { 309 let mut clauses: Vec<(Occur, Box<dyn Query>)> = Vec::with_capacity(5); 310 clauses.push((Occur::Must, parsed)); 311 if let Some(n) = &filters.nsid { 312 let term = Term::from_field_text(self.fields.nsid, n.as_ref()); 313 clauses.push(( 314 Occur::Must, 315 Box::new(TermQuery::new(term, IndexRecordOption::Basic)), 316 )); 317 } 318 if let Some(a) = &filters.author { 319 let term = Term::from_field_text(self.fields.author, a.as_ref()); 320 clauses.push(( 321 Occur::Must, 322 Box::new(TermQuery::new(term, IndexRecordOption::Basic)), 323 )); 324 } 325 if let Some(r) = &filters.repo { 326 let term = Term::from_field_text(self.fields.repo, r.as_ref()); 327 clauses.push(( 328 Occur::Must, 329 Box::new(TermQuery::new(term, IndexRecordOption::Basic)), 330 )); 331 } 332 if filters.since.is_some() || filters.until.is_some() { 333 let lower = filters.since.map_or(Bound::Unbounded, |s| { 334 Bound::Included(Term::from_field_i64(self.fields.created_at, s)) 335 }); 336 let upper = filters.until.map_or(Bound::Unbounded, |u| { 337 Bound::Excluded(Term::from_field_i64(self.fields.created_at, u)) 338 }); 339 clauses.push((Occur::Must, Box::new(RangeQuery::new(lower, upper)))); 340 } 341 Box::new(BooleanQuery::new(clauses)) 342 }; 343 let collector = TopDocs::with_limit(limit + 1) 344 .and_offset(offset) 345 .order_by_score(); 346 let searcher = self.reader.searcher(); 347 let raw_hits: Vec<(f32, tantivy::DocAddress)> = 348 searcher.search(&final_query, &collector)?; 349 let has_more = raw_hits.len() > limit; 350 let page_slice = &raw_hits[..raw_hits.len().min(limit)]; 351 let hits = page_slice 352 .iter() 353 .map(|(score, addr)| { 354 let doc: TantivyDocument = searcher.doc(*addr)?; 355 let uri_str = stored_text(&doc, self.fields.uri, "uri")?; 356 let nsid_str = stored_text(&doc, self.fields.nsid, "nsid")?; 357 let uri = AtUri::<DefaultStr>::new_owned(uri_str.as_str()) 358 .map_err(|e| SearchError::InvalidUri(format!("{e}: {uri_str}")))?; 359 let nsid = Nsid::<DefaultStr>::new_owned(nsid_str.as_str()) 360 .map_err(|e| SearchError::InvalidNsid(format!("{e}: {nsid_str}")))?; 361 Ok::<_, SearchError>(SearchHit { 362 uri, 363 nsid, 364 score: *score, 365 }) 366 }) 367 .collect::<Result<Vec<_>, _>>()?; 368 Ok(SearchPage { 369 hits, 370 next: next_cursor(offset, limit, has_more), 371 }) 372 } 373} 374 375fn next_cursor(offset: usize, limit: usize, has_more: bool) -> Option<SearchOffset> { 376 has_more 377 .then(|| u32::try_from(offset.saturating_add(limit)).ok()) 378 .flatten() 379 .map(SearchOffset::new) 380} 381 382async fn writer_loop( 383 mut writer: IndexWriter, 384 fields: Fields, 385 reader: IndexReader, 386 mut rx: mpsc::Receiver<WriteOp>, 387 clock: Arc<dyn Clock>, 388) { 389 let mut pending: usize = 0; 390 let mut deadline = clock.now_instant() + BATCH_INTERVAL; 391 loop { 392 let outcome = tokio::select! { 393 biased; 394 msg = rx.recv() => RecvOutcome::Message(msg), 395 _ = clock.sleep_until(deadline) => RecvOutcome::Idle, 396 }; 397 match outcome { 398 RecvOutcome::Message(Some(WriteOp::Upsert(doc))) => { 399 apply_upsert(&mut writer, fields, doc); 400 pending += 1; 401 } 402 RecvOutcome::Message(Some(WriteOp::Remove(uri))) => { 403 apply_remove(&mut writer, fields, &uri); 404 pending += 1; 405 } 406 RecvOutcome::Message(Some(WriteOp::Flush(done))) => { 407 if pending > 0 { 408 commit_and_reload(&mut writer, &reader); 409 pending = 0; 410 } 411 let _ = done.send(()); 412 deadline = clock.now_instant() + BATCH_INTERVAL; 413 continue; 414 } 415 RecvOutcome::Message(None) => break, 416 RecvOutcome::Idle => { 417 if pending > 0 { 418 commit_and_reload(&mut writer, &reader); 419 pending = 0; 420 } 421 deadline = clock.now_instant() + BATCH_INTERVAL; 422 continue; 423 } 424 } 425 if pending >= BATCH_SIZE { 426 commit_and_reload(&mut writer, &reader); 427 pending = 0; 428 } 429 } 430 if pending > 0 { 431 commit_and_reload(&mut writer, &reader); 432 } 433} 434 435enum RecvOutcome { 436 Message(Option<WriteOp>), 437 Idle, 438} 439 440fn apply_upsert(writer: &mut IndexWriter, fields: Fields, doc: SearchDoc) { 441 let uri_term = Term::from_field_text(fields.uri, doc.uri.as_ref()); 442 writer.delete_term(uri_term); 443 let mut td = TantivyDocument::default(); 444 td.add_text(fields.uri, doc.uri.as_ref()); 445 td.add_text(fields.nsid, doc.nsid.as_ref()); 446 td.add_text(fields.title, &doc.title); 447 td.add_text(fields.body, &doc.body); 448 if let Some(author) = &doc.author { 449 td.add_text(fields.author, author.as_ref()); 450 } 451 if let Some(ts) = doc.created_at { 452 td.add_i64(fields.created_at, ts); 453 } 454 if let Some(repo) = &doc.repo { 455 td.add_text(fields.repo, repo.as_ref()); 456 } 457 if let Err(e) = writer.add_document(td) { 458 warn!(?e, "search add_document failed"); 459 } 460} 461 462fn apply_remove(writer: &mut IndexWriter, fields: Fields, uri: &AtUri<DefaultStr>) { 463 let term = Term::from_field_text(fields.uri, uri.as_ref()); 464 writer.delete_term(term); 465} 466 467fn commit_and_reload(writer: &mut IndexWriter, reader: &IndexReader) { 468 match writer.commit() { 469 Ok(_) => { 470 if let Err(e) = reader.reload() { 471 warn!(?e, "search reader reload failed"); 472 } 473 } 474 Err(e) => warn!(?e, "search index commit failed"), 475 } 476} 477 478fn stored_text( 479 doc: &TantivyDocument, 480 field: Field, 481 name: &'static str, 482) -> Result<String, SearchError> { 483 doc.get_first(field) 484 .and_then(|v| v.as_str().map(|s| s.to_owned())) 485 .ok_or(SearchError::MissingField(name)) 486} 487 488impl SearchSink for SearchIndex { 489 async fn upsert(&self, doc: SearchDoc) { 490 if self.inner.tx.send(WriteOp::Upsert(doc)).await.is_err() { 491 warn!("search writer channel closed; upsert dropped"); 492 } 493 } 494 495 async fn remove(&self, uri: &AtUri<DefaultStr>) { 496 if self 497 .inner 498 .tx 499 .send(WriteOp::Remove(uri.clone())) 500 .await 501 .is_err() 502 { 503 warn!("search writer channel closed; remove dropped"); 504 } 505 } 506} 507 508#[cfg(test)] 509mod tests { 510 use super::*; 511 use bobbin_runtime::SystemClock; 512 use bobbin_types::search::SearchDoc; 513 use jacquard_common::types::nsid::Nsid as NsidType; 514 515 fn at(s: &str) -> AtUri<DefaultStr> { 516 AtUri::new_owned(s).unwrap() 517 } 518 519 fn nsid(s: &'static str) -> NsidType<DefaultStr> { 520 NsidType::new_static(s).unwrap() 521 } 522 523 fn doc( 524 uri: AtUri<DefaultStr>, 525 nsid: NsidType<DefaultStr>, 526 title: &str, 527 body: &str, 528 ) -> SearchDoc { 529 SearchDoc { 530 uri, 531 nsid, 532 title: title.to_owned(), 533 body: body.to_owned(), 534 author: None, 535 created_at: None, 536 repo: None, 537 } 538 } 539 540 fn build() -> SearchIndex { 541 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap() 542 } 543 544 #[tokio::test] 545 async fn upsert_then_query_returns_matching_doc() { 546 let idx = build(); 547 idx.upsert(doc( 548 at("at://did:plc:nel/sh.tangled.repo.issue/r1"), 549 nsid("sh.tangled.repo.issue"), 550 "barnacle pagination", 551 "scroll resets", 552 )) 553 .await; 554 idx.upsert(doc( 555 at("at://did:plc:teq/sh.tangled.repo.issue/r2"), 556 nsid("sh.tangled.repo.issue"), 557 "kelp grew sideways", 558 "kelp", 559 )) 560 .await; 561 idx.flush().await; 562 563 let page = idx 564 .search( 565 "barnacle", 566 SearchFilters::default(), 567 SearchCursor::Start, 568 10, 569 ) 570 .await 571 .unwrap(); 572 assert_eq!(page.hits.len(), 1); 573 assert_eq!( 574 page.hits[0].uri.as_ref(), 575 "at://did:plc:nel/sh.tangled.repo.issue/r1" 576 ); 577 assert!(page.next.is_none()); 578 } 579 580 #[tokio::test] 581 async fn query_filters_by_nsid() { 582 let idx = build(); 583 idx.upsert(doc( 584 at("at://did:plc:nel/sh.tangled.repo.issue/r1"), 585 nsid("sh.tangled.repo.issue"), 586 "anemone tide", 587 "", 588 )) 589 .await; 590 idx.upsert(doc( 591 at("at://did:plc:teq/sh.tangled.string/k1"), 592 nsid("sh.tangled.string"), 593 "anemone.md", 594 "anemone again", 595 )) 596 .await; 597 idx.flush().await; 598 599 let only_strings = idx 600 .search( 601 "anemone", 602 SearchFilters { 603 nsid: Some(nsid("sh.tangled.string")), 604 ..SearchFilters::default() 605 }, 606 SearchCursor::Start, 607 10, 608 ) 609 .await 610 .unwrap(); 611 assert_eq!(only_strings.hits.len(), 1); 612 assert_eq!(only_strings.hits[0].nsid.as_ref(), "sh.tangled.string"); 613 } 614 615 #[tokio::test] 616 async fn upsert_replaces_prior_document_for_same_uri() { 617 let idx = build(); 618 let uri = at("at://did:plc:nel/sh.tangled.repo.issue/r1"); 619 idx.upsert(doc( 620 uri.clone(), 621 nsid("sh.tangled.repo.issue"), 622 "abalone", 623 "", 624 )) 625 .await; 626 idx.upsert(doc(uri, nsid("sh.tangled.repo.issue"), "limpet", "")) 627 .await; 628 idx.flush().await; 629 630 let abalone_hits = idx 631 .search("abalone", SearchFilters::default(), SearchCursor::Start, 10) 632 .await 633 .unwrap(); 634 assert!(abalone_hits.hits.is_empty(), "old title must be evicted"); 635 636 let limpet_hits = idx 637 .search("limpet", SearchFilters::default(), SearchCursor::Start, 10) 638 .await 639 .unwrap(); 640 assert_eq!(limpet_hits.hits.len(), 1); 641 } 642 643 #[tokio::test] 644 async fn remove_drops_document_from_index() { 645 let idx = build(); 646 let uri = at("at://did:plc:nel/sh.tangled.repo.issue/r1"); 647 idx.upsert(doc( 648 uri.clone(), 649 nsid("sh.tangled.repo.issue"), 650 "whelk", 651 "shell", 652 )) 653 .await; 654 idx.remove(&uri).await; 655 idx.flush().await; 656 let hits = idx 657 .search("whelk", SearchFilters::default(), SearchCursor::Start, 10) 658 .await 659 .unwrap(); 660 assert!(hits.hits.is_empty()); 661 } 662 663 #[tokio::test] 664 async fn pagination_advances_cursor() { 665 let idx = build(); 666 let names = ["nel", "olaren", "teq", "lyna", "bailey"]; 667 for (i, owner) in names.iter().enumerate() { 668 idx.upsert(doc( 669 at(&format!("at://did:plc:{owner}/sh.tangled.repo.issue/r{i}")), 670 nsid("sh.tangled.repo.issue"), 671 "anemone", 672 "tides", 673 )) 674 .await; 675 } 676 idx.flush().await; 677 678 let page1 = idx 679 .search("anemone", SearchFilters::default(), SearchCursor::Start, 2) 680 .await 681 .unwrap(); 682 assert_eq!(page1.hits.len(), 2); 683 let next = page1.next.expect("more pages"); 684 let page2 = idx 685 .search( 686 "anemone", 687 SearchFilters::default(), 688 SearchCursor::At(next), 689 2, 690 ) 691 .await 692 .unwrap(); 693 assert_eq!(page2.hits.len(), 2); 694 let next2 = page2.next.expect("more pages"); 695 let page3 = idx 696 .search( 697 "anemone", 698 SearchFilters::default(), 699 SearchCursor::At(next2), 700 2, 701 ) 702 .await 703 .unwrap(); 704 assert_eq!(page3.hits.len(), 1); 705 assert!(page3.next.is_none()); 706 } 707 708 #[tokio::test] 709 async fn empty_query_short_circuits_to_empty_page() { 710 let idx = build(); 711 idx.upsert(doc( 712 at("at://did:plc:nel/sh.tangled.repo.issue/r1"), 713 nsid("sh.tangled.repo.issue"), 714 "abalone", 715 "", 716 )) 717 .await; 718 idx.flush().await; 719 let page = idx 720 .search(" ", SearchFilters::default(), SearchCursor::Start, 10) 721 .await 722 .unwrap(); 723 assert!(page.hits.is_empty()); 724 assert!(page.next.is_none()); 725 } 726 727 #[tokio::test] 728 async fn flush_without_pending_ops_completes() { 729 let idx = build(); 730 idx.flush().await; 731 } 732 733 #[tokio::test] 734 async fn writer_auto_commits_on_idle_interval() { 735 let idx = build(); 736 idx.upsert(doc( 737 at("at://did:plc:nel/sh.tangled.repo.issue/r1"), 738 nsid("sh.tangled.repo.issue"), 739 "auto", 740 "", 741 )) 742 .await; 743 tokio::time::sleep(BATCH_INTERVAL * 3).await; 744 let page = idx 745 .search("auto", SearchFilters::default(), SearchCursor::Start, 10) 746 .await 747 .unwrap(); 748 assert_eq!(page.hits.len(), 1); 749 } 750 751 #[test] 752 fn next_cursor_advances_by_limit_when_more_results() { 753 assert_eq!(next_cursor(0, 10, true), Some(SearchOffset::new(10))); 754 assert_eq!(next_cursor(10, 25, true), Some(SearchOffset::new(35))); 755 } 756 757 #[test] 758 fn next_cursor_is_none_when_no_more_results() { 759 assert_eq!(next_cursor(0, 10, false), None); 760 assert_eq!(next_cursor(99, 50, false), None); 761 } 762 763 #[test] 764 fn next_cursor_returns_none_on_u32_overflow_rather_than_wrapping() { 765 let max = u32::MAX as usize; 766 assert_eq!(next_cursor(max, 1, true), None); 767 assert_eq!(next_cursor(max - 5, 10, true), None); 768 assert_eq!( 769 next_cursor(max - 10, 10, true), 770 Some(SearchOffset::new(u32::MAX)) 771 ); 772 } 773 774 #[test] 775 fn cursor_token_round_trips() { 776 let off = SearchOffset::new(0xc0fe); 777 let token = off.encode_token(); 778 assert_eq!(token.len(), OFFSET_TOKEN_LEN); 779 assert_eq!(SearchOffset::decode_token(&token).unwrap(), off); 780 } 781 782 #[test] 783 fn cursor_decode_rejects_malformed() { 784 let bad = ["", "deadbeef0", "no-hex!!", "zzzzzzzz", "1234567"]; 785 for s in bad { 786 assert!(matches!( 787 SearchOffset::decode_token(s), 788 Err(SearchOffsetError::Malformed), 789 )); 790 } 791 } 792}