Monorepo for Tangled tangled.org
11

Configure Feed

Select the types of activity you want to include in your feed.

1use std::collections::{BTreeSet, HashMap}; 2use std::marker::PhantomData; 3use std::num::NonZeroU32; 4use std::ops::{Bound, ControlFlow}; 5use std::sync::atomic::{AtomicU32, Ordering}; 6use std::sync::{Arc, Mutex}; 7 8const FILTER_SCAN_MULTIPLIER: usize = 64; 9const FILTER_SCAN_FLOOR: usize = 512; 10 11struct ScanState { 12 matched: Vec<(BucketKey, AtUri<DefaultStr>)>, 13 last_scanned: Option<BucketKey>, 14 scanned: usize, 15} 16 17impl ScanState { 18 fn with_capacity(cap: usize) -> Self { 19 Self { 20 matched: Vec::with_capacity(cap), 21 last_scanned: None, 22 scanned: 0, 23 } 24 } 25} 26 27use bobbin_runtime::RuntimeHasher; 28use bobbin_types::edges::Edge; 29use bobbin_types::ids::EdgeKey; 30use either::Either; 31use jacquard_common::DefaultStr; 32use jacquard_common::types::string::AtUri; 33use lasso::{Key, Spur, ThreadedRodeo}; 34use scc::HashMap as SccMap; 35use scc::hash_map::Entry; 36use smallvec::SmallVec; 37use thiserror::Error; 38 39#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Ord, PartialOrd)] 40struct SortMicros(u64); 41 42#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Ord, PartialOrd)] 43struct BucketKey { 44 micros: SortMicros, 45 source: SourceId, 46} 47 48impl BucketKey { 49 fn new(micros: u64, source: SourceId) -> Self { 50 Self { 51 micros: SortMicros(micros), 52 source, 53 } 54 } 55 56 fn token(self) -> PageToken { 57 PageToken::new(self.micros.0, self.source.index()) 58 } 59 60 fn from_token(tok: PageToken) -> Self { 61 Self { 62 micros: SortMicros(tok.micros), 63 source: SourceId::from_raw(tok.source), 64 } 65 } 66} 67 68pub mod coverage; 69pub mod state_index; 70pub use coverage::{Coverage, CoverageWatch, HydrantCursor, PromotionSignal}; 71pub use state_index::{ 72 ApplyOutcome, IssueStateKind, PullStatusKind, StateIndex, StateKind, apply_record_state, 73}; 74 75#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 76pub struct SourceTag; 77#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 78struct AuthorTag; 79#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 80struct CollectionTag; 81 82#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 83pub struct Interned<T>(u32, PhantomData<T>); 84 85impl<T: Copy> Interned<T> { 86 fn from_spur(spur: Spur) -> Self { 87 Self(spur.into_usize() as u32, PhantomData) 88 } 89 90 fn from_raw(raw: u32) -> Self { 91 Self(raw, PhantomData) 92 } 93 94 fn index(self) -> u32 { 95 self.0 96 } 97 98 fn to_spur(self) -> Option<Spur> { 99 Spur::try_from_usize(self.0 as usize) 100 } 101} 102 103pub type SourceId = Interned<SourceTag>; 104type AuthorId = Interned<AuthorTag>; 105type CollectionId = Interned<CollectionTag>; 106 107#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 108pub struct PageToken { 109 micros: u64, 110 source: u32, 111} 112 113impl PageToken { 114 pub fn new(micros: u64, source: u32) -> Self { 115 Self { micros, source } 116 } 117 118 pub fn micros(self) -> u64 { 119 self.micros 120 } 121 122 pub fn source(self) -> u32 { 123 self.source 124 } 125 126 pub fn encode_token(self) -> String { 127 let mut bytes = [0u8; 12]; 128 bytes[..8].copy_from_slice(&self.micros.to_be_bytes()); 129 bytes[8..].copy_from_slice(&self.source.to_be_bytes()); 130 encode_hex(&bytes) 131 } 132 133 pub fn decode_token(token: &str) -> Result<Self, CursorParseError> { 134 let bytes: [u8; 12] = decode_hex_array(token).ok_or(CursorParseError::Malformed)?; 135 let micros = u64::from_be_bytes(bytes[..8].try_into().unwrap()); 136 let source = u32::from_be_bytes(bytes[8..].try_into().unwrap()); 137 Ok(Self { micros, source }) 138 } 139} 140 141fn encode_hex(bytes: &[u8]) -> String { 142 bytes 143 .iter() 144 .fold(String::with_capacity(bytes.len() * 2), |mut acc, b| { 145 acc.push(char::from_digit((b >> 4) as u32, 16).unwrap()); 146 acc.push(char::from_digit((b & 0x0f) as u32, 16).unwrap()); 147 acc 148 }) 149} 150 151fn decode_hex_array<const N: usize>(token: &str) -> Option<[u8; N]> { 152 if token.len() != N * 2 { 153 return None; 154 } 155 let parsed: Vec<u8> = token 156 .as_bytes() 157 .chunks_exact(2) 158 .map(|pair| { 159 let hi = (pair[0] as char).to_digit(16)?; 160 let lo = (pair[1] as char).to_digit(16)?; 161 Some(((hi << 4) | lo) as u8) 162 }) 163 .collect::<Option<Vec<u8>>>()?; 164 parsed.try_into().ok() 165} 166 167#[derive(Clone, Copy, Debug, Eq, PartialEq)] 168pub enum PageCursor { 169 Start, 170 After(PageToken), 171} 172 173#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] 174pub enum SortDir { 175 Asc, 176 #[default] 177 Desc, 178} 179 180#[derive(Clone, Copy, Debug, Eq, PartialEq, Error)] 181pub enum CursorParseError { 182 #[error("cursor token must be a valid TID")] 183 Malformed, 184} 185 186impl PageCursor { 187 pub fn from_token(raw: Option<&str>) -> Result<Self, CursorParseError> { 188 raw.map_or(Ok(Self::Start), |t| { 189 PageToken::decode_token(t).map(Self::After) 190 }) 191 } 192} 193 194#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 195pub struct PageLimit(u32); 196 197#[derive(Clone, Copy, Debug, Eq, PartialEq, Error)] 198pub enum PageLimitError { 199 #[error("page limit {value} below minimum {min}")] 200 TooSmall { value: u32, min: u32 }, 201 #[error("page limit {value} above maximum {max}")] 202 TooLarge { value: u32, max: u32 }, 203} 204 205impl PageLimit { 206 pub const MIN: u32 = 1; 207 pub const MAX: u32 = 1000; 208 209 pub fn new(value: u32) -> Result<Self, PageLimitError> { 210 match value { 211 v if v < Self::MIN => Err(PageLimitError::TooSmall { 212 value: v, 213 min: Self::MIN, 214 }), 215 v if v > Self::MAX => Err(PageLimitError::TooLarge { 216 value: v, 217 max: Self::MAX, 218 }), 219 v => Ok(Self(v)), 220 } 221 } 222 223 pub const fn get(self) -> u32 { 224 self.0 225 } 226} 227 228#[derive(Debug)] 229pub struct EdgePage { 230 pub items: Vec<EdgeItem>, 231 pub next: Option<PageToken>, 232} 233 234#[derive(Clone, Debug, Eq, PartialEq)] 235pub struct EdgeItem { 236 pub uri: AtUri<DefaultStr>, 237 pub sort_micros: u64, 238} 239 240impl AsRef<str> for EdgeItem { 241 fn as_ref(&self) -> &str { 242 self.uri.as_ref() 243 } 244} 245 246#[derive(Clone, Copy, Debug, Default)] 247pub struct EdgeMemReport { 248 pub key_count: u64, 249 pub source_count: u64, 250 pub source_interner_bytes: u64, 251 pub did_interner_bytes: u64, 252 pub collection_interner_bytes: u64, 253 pub key_interner_bytes: u64, 254 pub edges_total: u64, 255 pub author_refs_total: u64, 256 pub reverse_entries: u64, 257 pub reverse_cap: u64, 258 pub forward_struct_bytes: u64, 259 pub reverse_struct_bytes: u64, 260 pub bucket_struct_bytes: u64, 261 pub max_bucket: u64, 262 pub bucket_size_classes: [u64; BUCKET_CLASS_COUNT], 263} 264 265const BUCKET_CLASS_BOUNDS: [u64; 16] = [ 266 1, 267 2, 268 4, 269 8, 270 16, 271 32, 272 64, 273 128, 274 256, 275 512, 276 1024, 277 2048, 278 8192, 279 32768, 280 131072, 281 u64::MAX, 282]; 283const BUCKET_CLASS_COUNT: usize = BUCKET_CLASS_BOUNDS.len(); 284 285fn bucket_class(n: u64) -> usize { 286 BUCKET_CLASS_BOUNDS 287 .iter() 288 .position(|&bound| n <= bound) 289 .unwrap_or(BUCKET_CLASS_COUNT - 1) 290} 291 292impl EdgeMemReport { 293 pub fn bucket_histogram(&self) -> impl Iterator<Item = (u64, u64)> { 294 BUCKET_CLASS_BOUNDS 295 .into_iter() 296 .zip(self.bucket_size_classes) 297 } 298} 299 300#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 301struct EdgeKeyId(u32); 302 303fn bump_author(authors: &mut HashMap<AuthorId, NonZeroU32, RuntimeHasher>, author: AuthorId) { 304 authors 305 .entry(author) 306 .and_modify(|c| *c = c.saturating_add(1)) 307 .or_insert(NonZeroU32::MIN); 308} 309 310fn drop_author(authors: &mut HashMap<AuthorId, NonZeroU32, RuntimeHasher>, author: AuthorId) { 311 match authors.get(&author).map(|c| c.get() - 1) { 312 Some(0) | None => { 313 authors.remove(&author); 314 } 315 Some(next) => { 316 authors.insert(author, NonZeroU32::new(next).unwrap()); 317 } 318 } 319} 320 321struct LargeBucket { 322 keys: BTreeSet<BucketKey>, 323 authors: HashMap<AuthorId, NonZeroU32, RuntimeHasher>, 324} 325 326const BUCKET_PROMOTE_AT: usize = 256; 327const SOURCE_BYTES: u64 = 16; 328 329enum Sources { 330 Small(SmallVec<[BucketKey; 2]>), 331 Large(Box<LargeBucket>), 332} 333 334impl Default for Sources { 335 fn default() -> Self { 336 Self::Small(SmallVec::new()) 337 } 338} 339 340impl Sources { 341 fn len(&self) -> usize { 342 match self { 343 Self::Small(v) => v.len(), 344 Self::Large(big) => big.keys.len(), 345 } 346 } 347 348 fn is_empty(&self) -> bool { 349 self.len() == 0 350 } 351 352 fn insert(&mut self, key: BucketKey, author: Option<AuthorId>) -> bool { 353 match self { 354 Self::Small(v) => match v.binary_search(&key) { 355 Ok(_) => false, 356 Err(pos) => { 357 v.insert(pos, key); 358 true 359 } 360 }, 361 Self::Large(big) => { 362 let inserted = big.keys.insert(key); 363 if inserted && let Some(a) = author { 364 bump_author(&mut big.authors, a); 365 } 366 inserted 367 } 368 } 369 } 370 371 fn remove(&mut self, key: &BucketKey, author: Option<AuthorId>) { 372 match self { 373 Self::Small(v) => { 374 if let Ok(pos) = v.binary_search(key) { 375 v.remove(pos); 376 } 377 } 378 Self::Large(big) => { 379 if big.keys.remove(key) 380 && let Some(a) = author 381 { 382 drop_author(&mut big.authors, a); 383 } 384 } 385 } 386 } 387 388 fn directed( 389 &self, 390 cursor: PageCursor, 391 dir: SortDir, 392 ) -> Box<dyn Iterator<Item = BucketKey> + '_> { 393 match self { 394 Self::Small(v) => Box::new(directed_slice(v, cursor, dir)), 395 Self::Large(big) => Box::new(directed_tree(&big.keys, cursor, dir)), 396 } 397 } 398 399 fn heap_bytes(&self) -> u64 { 400 const BTREE_BYTES_PER_KEY: u64 = 32; 401 const HASHMAP_FIXED: u64 = 48; 402 const HASHMAP_PER_CAP: u64 = 9; 403 match self { 404 Self::Small(v) if v.spilled() => v.capacity() as u64 * SOURCE_BYTES, 405 Self::Small(_) => 0, 406 Self::Large(big) => { 407 std::mem::size_of::<LargeBucket>() as u64 408 + big.keys.len() as u64 * BTREE_BYTES_PER_KEY 409 + HASHMAP_FIXED 410 + big.authors.capacity() as u64 * HASHMAP_PER_CAP 411 } 412 } 413 } 414} 415 416#[derive(Clone, Copy, Debug, Eq, PartialEq)] 417struct ReverseEntry { 418 key_id: EdgeKeyId, 419 sort_micros: u64, 420} 421 422pub struct EdgeStore { 423 source_interner: Arc<ThreadedRodeo<Spur, RuntimeHasher>>, 424 did_interner: Arc<ThreadedRodeo<Spur, RuntimeHasher>>, 425 collection_interner: Arc<ThreadedRodeo<Spur, RuntimeHasher>>, 426 key_ids: SccMap<EdgeKey, EdgeKeyId, RuntimeHasher>, 427 next_key_id: AtomicU32, 428 forward: SccMap<EdgeKeyId, Sources, RuntimeHasher>, 429 reverse: SccMap<SourceId, SmallVec<[ReverseEntry; 1]>, RuntimeHasher>, 430 hasher: RuntimeHasher, 431 writer: Mutex<()>, 432} 433 434impl EdgeStore { 435 pub fn new(hasher: RuntimeHasher) -> Self { 436 Self { 437 source_interner: Arc::new(ThreadedRodeo::with_hasher(hasher.clone())), 438 did_interner: Arc::new(ThreadedRodeo::with_hasher(hasher.clone())), 439 collection_interner: Arc::new(ThreadedRodeo::with_hasher(hasher.clone())), 440 key_ids: SccMap::with_hasher(hasher.clone()), 441 next_key_id: AtomicU32::new(0), 442 forward: SccMap::with_hasher(hasher.clone()), 443 reverse: SccMap::with_hasher(hasher.clone()), 444 hasher, 445 writer: Mutex::new(()), 446 } 447 } 448 449 fn intern_key(&self, key: EdgeKey) -> EdgeKeyId { 450 match self.key_ids.entry_sync(key) { 451 Entry::Occupied(e) => *e.get(), 452 Entry::Vacant(e) => { 453 let id = EdgeKeyId(self.next_key_id.fetch_add(1, Ordering::Relaxed)); 454 e.insert_entry(id); 455 id 456 } 457 } 458 } 459 460 fn lookup_key(&self, key: &EdgeKey) -> Option<EdgeKeyId> { 461 self.key_ids.read_sync(key, |_, id| *id) 462 } 463 464 pub fn add(&self, edge: Edge) { 465 let _w = self 466 .writer 467 .lock() 468 .expect("edge-store writer mutex poisoned"); 469 self.add_locked(edge); 470 } 471 472 pub fn intern_source(&self, source: &AtUri<DefaultStr>) -> SourceId { 473 let author = self.intern_author(source); 474 SourceId::from_spur( 475 self.source_interner 476 .get_or_intern(self.source_key(source, author)), 477 ) 478 } 479 480 pub fn upsert_source(&self, source: &AtUri<DefaultStr>, edges: Vec<Edge>) { 481 let _w = self 482 .writer 483 .lock() 484 .expect("edge-store writer mutex poisoned"); 485 self.clear_source_locked(source); 486 edges.into_iter().for_each(|e| self.add_locked(e)); 487 } 488 489 pub fn remove_source(&self, source: &AtUri<DefaultStr>) { 490 let _w = self 491 .writer 492 .lock() 493 .expect("edge-store writer mutex poisoned"); 494 self.clear_source_locked(source); 495 } 496 497 fn add_locked(&self, edge: Edge) { 498 let author = self.intern_author(&edge.source); 499 let source_key = self.source_key(&edge.source, author); 500 let id = SourceId::from_spur(self.source_interner.get_or_intern(source_key)); 501 let sort_micros = edge.sort_micros; 502 let key_id = self.intern_key(EdgeKey::new(edge.kind, edge.subject)); 503 504 let key = BucketKey::new(sort_micros, id); 505 let mut entry = self.forward.entry_sync(key_id).or_default(); 506 let inserted = entry.get_mut().insert(key, author); 507 let promote_keys = match entry.get() { 508 Sources::Small(v) if v.len() > BUCKET_PROMOTE_AT => { 509 Some(v.iter().copied().collect::<Vec<_>>()) 510 } 511 _ => None, 512 }; 513 drop(entry); 514 515 if let Some(keys) = promote_keys { 516 let authors = self.build_author_map(&keys); 517 let large = LargeBucket { 518 keys: keys.into_iter().collect(), 519 authors, 520 }; 521 if let Entry::Occupied(mut e) = self.forward.entry_sync(key_id) { 522 *e.get_mut() = Sources::Large(Box::new(large)); 523 } 524 } 525 526 if inserted { 527 let mut rev = self.reverse.entry_sync(id).or_default(); 528 rev.get_mut().push(ReverseEntry { 529 key_id, 530 sort_micros, 531 }); 532 } 533 } 534 535 fn clear_source_locked(&self, source: &AtUri<DefaultStr>) { 536 let author = source_authority_did(source) 537 .and_then(|s| self.did_interner.get(s)) 538 .map(AuthorId::from_spur); 539 let Some(source_spur) = self.source_interner.get(self.source_key(source, author)) else { 540 return; 541 }; 542 let id = SourceId::from_spur(source_spur); 543 let Some((_, entries)) = self.reverse.remove_sync(&id) else { 544 return; 545 }; 546 entries.into_iter().for_each( 547 |ReverseEntry { 548 key_id, 549 sort_micros, 550 }| { 551 self.forward.update_sync(&key_id, |_, sources| { 552 sources.remove(&BucketKey::new(sort_micros, id), author); 553 }); 554 self.forward 555 .remove_if_sync(&key_id, |sources| sources.is_empty()); 556 }, 557 ); 558 } 559 560 fn intern_author(&self, source: &AtUri<DefaultStr>) -> Option<AuthorId> { 561 let did = source_authority_did(source)?; 562 Some(AuthorId::from_spur(self.did_interner.get_or_intern(did))) 563 } 564 565 fn source_key(&self, source: &AtUri<DefaultStr>, author: Option<AuthorId>) -> String { 566 match (split_record_uri(source.as_ref()), author) { 567 (Some((_, collection, rkey)), Some(author)) => { 568 let collection = 569 CollectionId::from_spur(self.collection_interner.get_or_intern(collection)); 570 format!("{}/{}/{}", author.index(), collection.index(), rkey) 571 } 572 _ => source.as_ref().to_owned(), 573 } 574 } 575 576 fn decode_source(&self, stored: &str) -> Option<String> { 577 if stored.starts_with("at://") { 578 return Some(stored.to_owned()); 579 } 580 let mut parts = stored.splitn(3, '/'); 581 let author = AuthorId::from_raw(parts.next()?.parse().ok()?); 582 let collection = CollectionId::from_raw(parts.next()?.parse().ok()?); 583 let rkey = parts.next()?; 584 let did = self.did_interner.try_resolve(&author.to_spur()?)?; 585 let collection = self 586 .collection_interner 587 .try_resolve(&collection.to_spur()?)?; 588 Some(format!("at://{did}/{collection}/{rkey}")) 589 } 590 591 fn author_of_stored(&self, stored: &str) -> Option<AuthorId> { 592 match stored.strip_prefix("at://") { 593 Some(rest) => { 594 let authority = rest.split('/').next().unwrap_or(rest); 595 authority 596 .starts_with("did:") 597 .then(|| self.did_interner.get(authority).map(AuthorId::from_spur)) 598 .flatten() 599 } 600 None => stored 601 .split('/') 602 .next()? 603 .parse::<u32>() 604 .ok() 605 .map(AuthorId::from_raw), 606 } 607 } 608 609 fn author_of(&self, source: SourceId) -> Option<AuthorId> { 610 let spur = source.to_spur()?; 611 let stored = self.source_interner.try_resolve(&spur)?; 612 self.author_of_stored(stored) 613 } 614 615 fn distinct_authors_small(&self, keys: &[BucketKey]) -> u64 { 616 keys.iter() 617 .filter_map(|key| self.author_of(key.source)) 618 .collect::<std::collections::HashSet<AuthorId>>() 619 .len() as u64 620 } 621 622 fn build_author_map(&self, keys: &[BucketKey]) -> HashMap<AuthorId, NonZeroU32, RuntimeHasher> { 623 keys.iter().fold( 624 HashMap::with_hasher(self.hasher.clone()), 625 |mut authors, key| { 626 if let Some(a) = self.author_of(key.source) { 627 bump_author(&mut authors, a); 628 } 629 authors 630 }, 631 ) 632 } 633 634 pub fn count(&self, key: &EdgeKey) -> u64 { 635 self.lookup_key(key) 636 .and_then(|id| { 637 self.forward 638 .read_sync(&id, |_, sources| sources.len() as u64) 639 }) 640 .unwrap_or(0) 641 } 642 643 pub fn count_distinct_authors(&self, key: &EdgeKey) -> u64 { 644 self.lookup_key(key) 645 .and_then(|id| { 646 self.forward.read_sync(&id, |_, sources| match sources { 647 Sources::Large(big) => big.authors.len() as u64, 648 Sources::Small(v) => self.distinct_authors_small(v), 649 }) 650 }) 651 .unwrap_or(0) 652 } 653 654 pub fn sources_for(&self, key: &EdgeKey) -> Vec<AtUri<DefaultStr>> { 655 self.lookup_key(key) 656 .and_then(|id| { 657 self.forward.read_sync(&id, |_, sources| { 658 sources 659 .directed(PageCursor::Start, SortDir::Desc) 660 .filter_map(|bucket| { 661 let spur = bucket.source.to_spur()?; 662 let stored = self.source_interner.try_resolve(&spur)?; 663 AtUri::new_owned(self.decode_source(stored)?).ok() 664 }) 665 .collect::<Vec<_>>() 666 }) 667 }) 668 .unwrap_or_default() 669 } 670 671 pub fn list( 672 &self, 673 key: &EdgeKey, 674 cursor: PageCursor, 675 limit: PageLimit, 676 dir: SortDir, 677 ) -> EdgePage { 678 let limit_usize = limit.get() as usize; 679 self.lookup_key(key) 680 .and_then(|id| { 681 self.forward.read_sync(&id, |_, sources| { 682 let iter = sources.directed(cursor, dir); 683 let entries: Vec<BucketKey> = iter.take(limit_usize + 1).collect(); 684 let has_more = entries.len() > limit_usize; 685 let page = &entries[..entries.len().min(limit_usize)]; 686 let items = page 687 .iter() 688 .filter_map(|&key| { 689 let spur = key.source.to_spur()?; 690 let stored = self.source_interner.try_resolve(&spur)?; 691 let uri = AtUri::new_owned(self.decode_source(stored)?).ok()?; 692 Some(EdgeItem { 693 uri, 694 sort_micros: key.micros.0, 695 }) 696 }) 697 .collect(); 698 let next = has_more 699 .then(|| page.last().copied()) 700 .flatten() 701 .map(BucketKey::token); 702 EdgePage { items, next } 703 }) 704 }) 705 .unwrap_or(EdgePage { 706 items: Vec::new(), 707 next: None, 708 }) 709 } 710 711 pub fn list_filtered<F>( 712 &self, 713 key: &EdgeKey, 714 cursor: PageCursor, 715 limit: PageLimit, 716 dir: SortDir, 717 predicate: F, 718 ) -> EdgePage 719 where 720 F: Fn(&AtUri<DefaultStr>) -> bool, 721 { 722 let limit_usize = limit.get() as usize; 723 let scan_cap = limit_usize 724 .saturating_mul(FILTER_SCAN_MULTIPLIER) 725 .max(FILTER_SCAN_FLOOR); 726 self.lookup_key(key) 727 .and_then(|id| { 728 self.forward.read_sync(&id, |_, sources| { 729 let init = ScanState::with_capacity(limit_usize + 1); 730 let outcome = sources 731 .directed(cursor, dir) 732 .try_fold(init, |mut state, key| { 733 if state.scanned >= scan_cap && state.matched.len() <= limit_usize { 734 return ControlFlow::Break(state); 735 } 736 state.scanned += 1; 737 state.last_scanned = Some(key); 738 if let Some(spur) = key.source.to_spur() 739 && let Some(stored) = self.source_interner.try_resolve(&spur) 740 && let Some(decoded) = self.decode_source(stored) 741 && let Ok(uri) = AtUri::new_owned(decoded) 742 && predicate(&uri) 743 { 744 state.matched.push((key, uri)); 745 if state.matched.len() > limit_usize { 746 return ControlFlow::Break(state); 747 } 748 } 749 ControlFlow::Continue(state) 750 }); 751 let (state, bucket_exhausted) = match outcome { 752 ControlFlow::Continue(s) => (s, true), 753 ControlFlow::Break(s) => (s, false), 754 }; 755 let has_more_matches = state.matched.len() > limit_usize; 756 let visible_len = state.matched.len().min(limit_usize); 757 let next = if has_more_matches && visible_len > 0 { 758 Some(state.matched[visible_len - 1].0.token()) 759 } else if !bucket_exhausted { 760 state.last_scanned.map(BucketKey::token) 761 } else { 762 None 763 }; 764 let items = state 765 .matched 766 .into_iter() 767 .take(visible_len) 768 .map(|(key, uri)| EdgeItem { 769 uri, 770 sort_micros: key.micros.0, 771 }) 772 .collect(); 773 EdgePage { items, next } 774 }) 775 }) 776 .unwrap_or(EdgePage { 777 items: Vec::new(), 778 next: None, 779 }) 780 } 781 782 pub fn key_count(&self) -> usize { 783 self.forward.len() 784 } 785 786 pub fn source_count(&self) -> usize { 787 self.reverse.len() 788 } 789 790 pub fn mem_report(&self) -> EdgeMemReport { 791 const SCC_SLOT: u64 = 32; 792 let edge_key = std::mem::size_of::<EdgeKey>() as u64; 793 let rev_entry = std::mem::size_of::<ReverseEntry>() as u64; 794 let rev_smallvec = std::mem::size_of::<SmallVec<[ReverseEntry; 1]>>() as u64; 795 let bucket_struct = std::mem::size_of::<Sources>() as u64; 796 797 let mut edges_total = 0u64; 798 let mut author_refs_total = 0u64; 799 let mut forward_struct_bytes = 0u64; 800 let mut max_bucket = 0u64; 801 let mut bucket_size_classes = [0u64; BUCKET_CLASS_COUNT]; 802 self.forward.iter_sync(|_, sources| { 803 let bucket_len = sources.len() as u64; 804 edges_total += bucket_len; 805 max_bucket = max_bucket.max(bucket_len); 806 bucket_size_classes[bucket_class(bucket_len)] += 1; 807 if let Sources::Large(big) = sources { 808 author_refs_total += big.authors.len() as u64; 809 } 810 forward_struct_bytes += SCC_SLOT + bucket_struct + sources.heap_bytes(); 811 true 812 }); 813 let key_interner_bytes = self.key_ids.len() as u64 814 * (edge_key + std::mem::size_of::<EdgeKeyId>() as u64 + SCC_SLOT); 815 816 let mut reverse_entries = 0u64; 817 let mut reverse_cap = 0u64; 818 let mut reverse_struct_bytes = 0u64; 819 self.reverse.iter_sync(|_, refs| { 820 let cap = refs.capacity() as u64; 821 reverse_entries += refs.len() as u64; 822 reverse_cap += cap; 823 let heap = if refs.spilled() { cap * rev_entry } else { 0 }; 824 reverse_struct_bytes += SCC_SLOT + rev_smallvec + heap; 825 true 826 }); 827 828 EdgeMemReport { 829 key_count: self.forward.len() as u64, 830 source_count: self.reverse.len() as u64, 831 source_interner_bytes: self.source_interner.current_memory_usage() as u64, 832 did_interner_bytes: self.did_interner.current_memory_usage() as u64, 833 collection_interner_bytes: self.collection_interner.current_memory_usage() as u64, 834 key_interner_bytes, 835 edges_total, 836 author_refs_total, 837 reverse_entries, 838 reverse_cap, 839 forward_struct_bytes, 840 reverse_struct_bytes, 841 bucket_struct_bytes: bucket_struct, 842 max_bucket, 843 bucket_size_classes, 844 } 845 } 846} 847 848fn directed_slice( 849 sources: &[BucketKey], 850 cursor: PageCursor, 851 dir: SortDir, 852) -> impl Iterator<Item = BucketKey> + '_ { 853 match dir { 854 SortDir::Asc => { 855 let start = match cursor { 856 PageCursor::Start => 0, 857 PageCursor::After(tok) => { 858 sources.partition_point(|k| *k <= BucketKey::from_token(tok)) 859 } 860 }; 861 Either::Left(sources[start..].iter().copied()) 862 } 863 SortDir::Desc => { 864 let end = match cursor { 865 PageCursor::Start => sources.len(), 866 PageCursor::After(tok) => { 867 sources.partition_point(|k| *k < BucketKey::from_token(tok)) 868 } 869 }; 870 Either::Right(sources[..end].iter().rev().copied()) 871 } 872 } 873} 874 875fn directed_tree( 876 keys: &BTreeSet<BucketKey>, 877 cursor: PageCursor, 878 dir: SortDir, 879) -> impl Iterator<Item = BucketKey> + '_ { 880 match dir { 881 SortDir::Asc => { 882 let lower = match cursor { 883 PageCursor::Start => Bound::Unbounded, 884 PageCursor::After(tok) => Bound::Excluded(BucketKey::from_token(tok)), 885 }; 886 Either::Left(keys.range((lower, Bound::Unbounded)).copied()) 887 } 888 SortDir::Desc => { 889 let upper = match cursor { 890 PageCursor::Start => Bound::Unbounded, 891 PageCursor::After(tok) => Bound::Excluded(BucketKey::from_token(tok)), 892 }; 893 Either::Right(keys.range((Bound::Unbounded, upper)).rev().copied()) 894 } 895 } 896} 897 898fn split_record_uri(source: &str) -> Option<(&str, &str, &str)> { 899 let rest = source.strip_prefix("at://")?; 900 let mut parts = rest.split('/'); 901 let authority = parts.next()?; 902 let collection = parts.next()?; 903 let rkey = parts.next()?; 904 if parts.next().is_some() { 905 return None; 906 } 907 (authority.starts_with("did:") && !collection.is_empty() && !rkey.is_empty()) 908 .then_some((authority, collection, rkey)) 909} 910 911fn source_authority_did(source: &AtUri<DefaultStr>) -> Option<&str> { 912 let rest = source.as_ref().strip_prefix("at://")?; 913 let end = rest.find('/').unwrap_or(rest.len()); 914 let candidate = &rest[..end]; 915 candidate.starts_with("did:").then_some(candidate) 916} 917 918#[cfg(test)] 919mod tests { 920 use super::*; 921 use bobbin_types::ids::SubjectRef; 922 use jacquard_common::types::did::Did; 923 use jacquard_common::types::nsid::Nsid; 924 use jacquard_common::types::string::AtUri; 925 926 fn store() -> EdgeStore { 927 EdgeStore::new(RuntimeHasher::default()) 928 } 929 930 fn nsid(s: &'static str) -> Nsid<DefaultStr> { 931 Nsid::new_static(s).unwrap() 932 } 933 934 fn at(s: &str) -> AtUri<DefaultStr> { 935 AtUri::new_owned(s).unwrap() 936 } 937 938 fn did(s: &str) -> Did<DefaultStr> { 939 Did::new_owned(s).unwrap() 940 } 941 942 fn did_subj(s: &str) -> SubjectRef { 943 SubjectRef::Did(did(s)) 944 } 945 946 fn limit(n: u32) -> PageLimit { 947 PageLimit::new(n).unwrap() 948 } 949 950 const NAMES: [&str; 5] = ["nel", "olaren", "teq", "lyna", "bailey"]; 951 952 fn star_edge(source: AtUri<DefaultStr>, subject: Did<DefaultStr>) -> Edge { 953 star_edge_at(source, subject, 0) 954 } 955 956 fn star_edge_at(source: AtUri<DefaultStr>, subject: Did<DefaultStr>, sort_micros: u64) -> Edge { 957 Edge { 958 kind: nsid("sh.tangled.feed.star"), 959 subject: SubjectRef::Did(subject), 960 source, 961 sort_micros, 962 } 963 } 964 965 fn shuffled_micros(i: usize) -> u64 { 966 (i as u64).wrapping_mul(2_654_435_761) % 64 967 } 968 969 fn source_uri(i: usize) -> String { 970 format!( 971 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 972 NAMES[i % NAMES.len()] 973 ) 974 } 975 976 fn fill_subject(store: &EdgeStore, n: usize) -> EdgeKey { 977 let subject = did("did:plc:squid"); 978 (0..n).for_each(|i| { 979 store.add(star_edge_at( 980 at(&source_uri(i)), 981 subject.clone(), 982 shuffled_micros(i), 983 )); 984 }); 985 EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:squid")) 986 } 987 988 fn reference(kept: impl Iterator<Item = usize>) -> Vec<String> { 989 let mut rows: Vec<(u64, usize)> = kept.map(|i| (shuffled_micros(i), i)).collect(); 990 rows.sort_unstable(); 991 rows.into_iter().map(|(_, i)| source_uri(i)).collect() 992 } 993 994 fn paginate_all(store: &EdgeStore, key: &EdgeKey, dir: SortDir, page: u32) -> Vec<String> { 995 std::iter::successors( 996 Some(store.list(key, PageCursor::Start, limit(page), dir)), 997 |prev| { 998 prev.next 999 .map(|tok| store.list(key, PageCursor::After(tok), limit(page), dir)) 1000 }, 1001 ) 1002 .flat_map(|p| { 1003 p.items 1004 .into_iter() 1005 .map(|u| u.as_ref().to_owned()) 1006 .collect::<Vec<_>>() 1007 }) 1008 .collect() 1009 } 1010 1011 #[test] 1012 fn pagination_matches_reference_across_small_and_large() { 1013 [64usize, 1000].into_iter().for_each(|n| { 1014 let store = store(); 1015 let key = fill_subject(&store, n); 1016 assert_eq!(store.count(&key), n as u64, "count mismatch at n={n}"); 1017 assert_eq!( 1018 store.count_distinct_authors(&key), 1019 NAMES.len() as u64, 1020 "distinct authors at n={n}" 1021 ); 1022 1023 let asc = reference(0..n); 1024 let desc: Vec<String> = asc.iter().rev().cloned().collect(); 1025 1026 [3u32, 7, 50].into_iter().for_each(|page| { 1027 assert_eq!( 1028 paginate_all(&store, &key, SortDir::Asc, page), 1029 asc, 1030 "asc mismatch n={n} page={page}" 1031 ); 1032 assert_eq!( 1033 paginate_all(&store, &key, SortDir::Desc, page), 1034 desc, 1035 "desc mismatch n={n} page={page}" 1036 ); 1037 }); 1038 }); 1039 } 1040 1041 #[test] 1042 fn remove_from_large_bucket_keeps_pagination_exact() { 1043 let store = store(); 1044 let n = 1000usize; 1045 let key = fill_subject(&store, n); 1046 (0..n).step_by(3).for_each(|i| { 1047 store.remove_source(&at(&source_uri(i))); 1048 }); 1049 let expected = reference((0..n).filter(|i| i % 3 != 0)); 1050 assert_eq!(store.count(&key), expected.len() as u64); 1051 assert_eq!( 1052 store.count_distinct_authors(&key), 1053 NAMES.len() as u64, 1054 "every author keeps sources after partial removal" 1055 ); 1056 assert_eq!(paginate_all(&store, &key, SortDir::Asc, 7), expected); 1057 assert_eq!( 1058 paginate_all(&store, &key, SortDir::Desc, 11), 1059 expected.iter().rev().cloned().collect::<Vec<_>>() 1060 ); 1061 } 1062 1063 #[test] 1064 fn add_then_count() { 1065 let store = store(); 1066 let key = EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:abalone")); 1067 1068 store.add(star_edge( 1069 at("at://did:plc:nel/sh.tangled.feed.star/r1"), 1070 did("did:plc:abalone"), 1071 )); 1072 store.add(star_edge( 1073 at("at://did:plc:olaren/sh.tangled.feed.star/r2"), 1074 did("did:plc:abalone"), 1075 )); 1076 store.add(star_edge( 1077 at("at://did:plc:nel/sh.tangled.feed.star/r3"), 1078 did("did:plc:abalone"), 1079 )); 1080 1081 assert_eq!(store.count(&key), 3); 1082 assert_eq!(store.count_distinct_authors(&key), 2); 1083 } 1084 1085 #[test] 1086 fn duplicate_add_is_idempotent() { 1087 let store = store(); 1088 let key = EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:abalone")); 1089 let edge = star_edge( 1090 at("at://did:plc:nel/sh.tangled.feed.star/r1"), 1091 did("did:plc:abalone"), 1092 ); 1093 store.add(edge.clone()); 1094 store.add(edge); 1095 assert_eq!(store.count(&key), 1); 1096 assert_eq!(store.count_distinct_authors(&key), 1); 1097 } 1098 1099 #[test] 1100 fn remove_source_clears_all_keys_for_that_source() { 1101 let store = store(); 1102 let star_key = EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:abalone")); 1103 let follow_key = EdgeKey::new(nsid("sh.tangled.graph.follow"), did_subj("did:plc:lyna")); 1104 let source = "at://did:plc:nel/sh.tangled.feed.star/r1"; 1105 1106 store.add(Edge { 1107 kind: nsid("sh.tangled.feed.star"), 1108 subject: did_subj("did:plc:abalone"), 1109 source: at(source), 1110 sort_micros: 0, 1111 }); 1112 store.add(Edge { 1113 kind: nsid("sh.tangled.graph.follow"), 1114 subject: did_subj("did:plc:lyna"), 1115 source: at(source), 1116 sort_micros: 0, 1117 }); 1118 1119 assert_eq!(store.count(&star_key), 1); 1120 assert_eq!(store.count(&follow_key), 1); 1121 1122 store.remove_source(&at(source)); 1123 assert_eq!(store.count(&star_key), 0); 1124 assert_eq!(store.count(&follow_key), 0); 1125 } 1126 1127 #[test] 1128 fn upsert_source_replaces_old_edges() { 1129 let store = store(); 1130 let source = at("at://did:plc:teq/sh.tangled.feed.star/r1"); 1131 let old_subject = did_subj("did:plc:abalone"); 1132 let new_subject = did_subj("did:plc:uni"); 1133 let kind = nsid("sh.tangled.feed.star"); 1134 1135 store.upsert_source( 1136 &source, 1137 vec![Edge { 1138 kind: kind.clone(), 1139 subject: old_subject.clone(), 1140 source: source.clone(), 1141 sort_micros: 0, 1142 }], 1143 ); 1144 assert_eq!( 1145 store.count(&EdgeKey::new(kind.clone(), old_subject.clone())), 1146 1 1147 ); 1148 1149 store.upsert_source( 1150 &source, 1151 vec![Edge { 1152 kind: kind.clone(), 1153 subject: new_subject.clone(), 1154 source: source.clone(), 1155 sort_micros: 0, 1156 }], 1157 ); 1158 assert_eq!(store.count(&EdgeKey::new(kind.clone(), old_subject)), 0); 1159 assert_eq!(store.count(&EdgeKey::new(kind, new_subject)), 1); 1160 } 1161 1162 #[test] 1163 fn list_pages_in_sort_order() { 1164 let store = store(); 1165 let key = EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:abalone")); 1166 (0..5).for_each(|i| { 1167 store.add(star_edge_at( 1168 at(&format!( 1169 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 1170 NAMES[i] 1171 )), 1172 did("did:plc:abalone"), 1173 1_000_000 + i as u64 * 1_000_000, 1174 )); 1175 }); 1176 1177 let page1 = store.list(&key, PageCursor::Start, limit(2), SortDir::Asc); 1178 assert_eq!(page1.items.len(), 2); 1179 let cursor = PageCursor::After(page1.next.expect("has next")); 1180 1181 let page2 = store.list(&key, cursor, limit(2), SortDir::Asc); 1182 assert_eq!(page2.items.len(), 2); 1183 1184 let cursor2 = PageCursor::After(page2.next.expect("has next")); 1185 let page3 = store.list(&key, cursor2, limit(2), SortDir::Asc); 1186 assert_eq!(page3.items.len(), 1); 1187 assert!( 1188 page3.next.is_none(), 1189 "final partial page should signal exhaustion" 1190 ); 1191 } 1192 1193 #[test] 1194 fn list_exact_fill_signals_exhaustion() { 1195 let store = store(); 1196 let key = EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:abalone")); 1197 (0..2).for_each(|i| { 1198 store.add(star_edge( 1199 at(&format!( 1200 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 1201 NAMES[i] 1202 )), 1203 did("did:plc:abalone"), 1204 )); 1205 }); 1206 1207 let page = store.list(&key, PageCursor::Start, limit(2), SortDir::Asc); 1208 assert_eq!(page.items.len(), 2); 1209 assert!(page.next.is_none(), "exact-fill page must not promise more"); 1210 } 1211 1212 #[test] 1213 fn list_on_unknown_key_is_empty() { 1214 let store = store(); 1215 let page = store.list( 1216 &EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:periwinkle")), 1217 PageCursor::Start, 1218 limit(10), 1219 SortDir::Asc, 1220 ); 1221 assert!(page.items.is_empty()); 1222 assert!(page.next.is_none()); 1223 } 1224 1225 #[test] 1226 fn distinct_authors_decreases_when_last_source_from_author_removed() { 1227 let store = store(); 1228 let key = EdgeKey::new(nsid("sh.tangled.feed.star"), did_subj("did:plc:abalone")); 1229 let s1 = at("at://did:plc:nel/sh.tangled.feed.star/r1"); 1230 let s2 = at("at://did:plc:nel/sh.tangled.feed.star/r2"); 1231 let s3 = at("at://did:plc:olaren/sh.tangled.feed.star/r3"); 1232 1233 store.add(star_edge(s1.clone(), did("did:plc:abalone"))); 1234 store.add(star_edge(s2.clone(), did("did:plc:abalone"))); 1235 store.add(star_edge(s3, did("did:plc:abalone"))); 1236 assert_eq!(store.count_distinct_authors(&key), 2); 1237 1238 store.remove_source(&s1); 1239 assert_eq!(store.count_distinct_authors(&key), 2, "user1 still has s2"); 1240 1241 store.remove_source(&s2); 1242 assert_eq!(store.count_distinct_authors(&key), 1, "user1 fully gone"); 1243 } 1244 1245 #[test] 1246 fn page_limit_rejects_zero_and_oversize() { 1247 assert!(matches!( 1248 PageLimit::new(0), 1249 Err(PageLimitError::TooSmall { value: 0, min: 1 }) 1250 )); 1251 assert!(matches!( 1252 PageLimit::new(PageLimit::MAX + 1), 1253 Err(PageLimitError::TooLarge { .. }) 1254 )); 1255 assert_eq!(PageLimit::new(50).unwrap().get(), 50); 1256 } 1257 1258 #[test] 1259 fn cursor_token_round_trip() { 1260 let original = PageToken::new(1_730_000_000_000_000, 0x1234_abcd); 1261 let token = original.encode_token(); 1262 assert_eq!(token.len(), 24, "12-byte cursor encodes to 24 hex chars"); 1263 assert_eq!(PageToken::decode_token(&token).unwrap(), original); 1264 } 1265 1266 #[test] 1267 fn from_token_none_yields_start() { 1268 assert_eq!(PageCursor::from_token(None).unwrap(), PageCursor::Start); 1269 } 1270 1271 #[test] 1272 fn from_token_some_yields_after() { 1273 let original = PageToken::new(1_730_000_000_000_000, 42); 1274 let token = original.encode_token(); 1275 assert_eq!( 1276 PageCursor::from_token(Some(&token)).unwrap(), 1277 PageCursor::After(original), 1278 ); 1279 } 1280 1281 #[test] 1282 fn cursor_decode_rejects_malformed() { 1283 let bad = [ 1284 "", 1285 "deadbeef", 1286 "no-hex!!aaaaaaaaaaaaaaaa", 1287 "12345", 1288 "this-string-is-way-too-long-to-be-a-valid-cursor", 1289 ]; 1290 bad.into_iter().for_each(|s| { 1291 assert!( 1292 matches!(PageToken::decode_token(s), Err(CursorParseError::Malformed)), 1293 "expected malformed for {s:?}", 1294 ); 1295 }); 1296 } 1297 1298 #[test] 1299 fn cursor_token_is_hex_shape() { 1300 let token = PageToken::new(1_730_000_000_000_000, 7).encode_token(); 1301 assert_eq!(token.len(), 24); 1302 assert!(token.chars().all(|c| c.is_ascii_hexdigit())); 1303 } 1304 1305 #[test] 1306 fn list_does_not_drop_entries_at_same_sort_micros_across_pages() { 1307 let store = store(); 1308 let subject_did = did("did:plc:limpet"); 1309 let key = EdgeKey::new( 1310 nsid("sh.tangled.feed.star"), 1311 SubjectRef::Did(subject_did.clone()), 1312 ); 1313 (0..4).for_each(|i| { 1314 store.add(star_edge_at( 1315 at(&format!( 1316 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 1317 NAMES[i] 1318 )), 1319 subject_did.clone(), 1320 42, 1321 )); 1322 }); 1323 1324 let page1 = store.list(&key, PageCursor::Start, limit(2), SortDir::Asc); 1325 assert_eq!(page1.items.len(), 2); 1326 let token = page1.next.expect("cursor must continue across ties"); 1327 1328 let page2 = store.list(&key, PageCursor::After(token), limit(2), SortDir::Asc); 1329 assert_eq!( 1330 page2.items.len(), 1331 2, 1332 "remaining ties must surface on next page" 1333 ); 1334 assert!(page2.next.is_none()); 1335 let combined: std::collections::HashSet<String> = page1 1336 .items 1337 .iter() 1338 .chain(page2.items.iter()) 1339 .map(|u| u.as_ref().to_owned()) 1340 .collect(); 1341 assert_eq!(combined.len(), 4, "every tied entry visible exactly once"); 1342 } 1343 1344 #[test] 1345 fn list_filtered_narrows_by_predicate_and_paginates() { 1346 let store = store(); 1347 let subject_did = did("did:plc:limpet"); 1348 let key = EdgeKey::new( 1349 nsid("sh.tangled.repo.issue"), 1350 SubjectRef::Did(subject_did.clone()), 1351 ); 1352 let by_nel = (0..3).map(|i| { 1353 star_edge_at( 1354 at(&format!("at://did:plc:nel/sh.tangled.repo.issue/r{i}")), 1355 subject_did.clone(), 1356 100 + i as u64, 1357 ) 1358 }); 1359 let by_olaren = (0..2).map(|i| { 1360 star_edge_at( 1361 at(&format!("at://did:plc:olaren/sh.tangled.repo.issue/o{i}")), 1362 subject_did.clone(), 1363 500 + i as u64, 1364 ) 1365 }); 1366 by_nel 1367 .chain(by_olaren) 1368 .map(|mut e| { 1369 e.kind = nsid("sh.tangled.repo.issue"); 1370 e 1371 }) 1372 .for_each(|e| store.add(e)); 1373 1374 let only_nel = |u: &AtUri<DefaultStr>| u.as_ref().starts_with("at://did:plc:nel/"); 1375 let page1 = store.list_filtered(&key, PageCursor::Start, limit(2), SortDir::Asc, only_nel); 1376 assert_eq!(page1.items.len(), 2); 1377 assert!(page1.next.is_some(), "cursor must allow more nel matches"); 1378 1379 let page2 = store.list_filtered( 1380 &key, 1381 PageCursor::After(page1.next.unwrap()), 1382 limit(2), 1383 SortDir::Asc, 1384 only_nel, 1385 ); 1386 assert_eq!(page2.items.len(), 1, "only one nel issue left"); 1387 assert!(page2.next.is_none(), "tail page must not promise more"); 1388 } 1389 1390 #[test] 1391 fn list_descending_returns_newest_first() { 1392 let store = store(); 1393 let subject_did = did("did:plc:limpet"); 1394 let key = EdgeKey::new( 1395 nsid("sh.tangled.feed.star"), 1396 SubjectRef::Did(subject_did.clone()), 1397 ); 1398 (0..5).for_each(|i| { 1399 store.add(star_edge_at( 1400 at(&format!( 1401 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 1402 NAMES[i] 1403 )), 1404 subject_did.clone(), 1405 1_000_000 + i as u64 * 1_000_000, 1406 )); 1407 }); 1408 1409 let asc = store.list(&key, PageCursor::Start, limit(5), SortDir::Asc); 1410 let desc = store.list(&key, PageCursor::Start, limit(5), SortDir::Desc); 1411 assert_eq!(asc.items.len(), 5); 1412 assert_eq!(desc.items.len(), 5); 1413 let asc_uris: Vec<_> = asc.items.iter().map(|u| u.as_ref().to_owned()).collect(); 1414 let mut reversed = asc_uris.clone(); 1415 reversed.reverse(); 1416 let desc_uris: Vec<_> = desc.items.iter().map(|u| u.as_ref().to_owned()).collect(); 1417 assert_eq!(desc_uris, reversed, "desc must be exact reverse of asc"); 1418 } 1419 1420 #[test] 1421 fn list_descending_paginates_with_cursor() { 1422 let store = store(); 1423 let subject_did = did("did:plc:whelk"); 1424 let key = EdgeKey::new( 1425 nsid("sh.tangled.feed.star"), 1426 SubjectRef::Did(subject_did.clone()), 1427 ); 1428 (0..5).for_each(|i| { 1429 store.add(star_edge_at( 1430 at(&format!( 1431 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 1432 NAMES[i] 1433 )), 1434 subject_did.clone(), 1435 1_000_000 + i as u64 * 1_000_000, 1436 )); 1437 }); 1438 1439 let page1 = store.list(&key, PageCursor::Start, limit(2), SortDir::Desc); 1440 assert_eq!(page1.items.len(), 2); 1441 let cursor = PageCursor::After(page1.next.expect("desc page1 must continue")); 1442 let page2 = store.list(&key, cursor, limit(2), SortDir::Desc); 1443 assert_eq!(page2.items.len(), 2); 1444 let cursor2 = PageCursor::After(page2.next.expect("desc page2 must continue")); 1445 let page3 = store.list(&key, cursor2, limit(2), SortDir::Desc); 1446 assert_eq!(page3.items.len(), 1); 1447 assert!(page3.next.is_none()); 1448 1449 let combined: std::collections::HashSet<String> = page1 1450 .items 1451 .iter() 1452 .chain(page2.items.iter()) 1453 .chain(page3.items.iter()) 1454 .map(|u| u.as_ref().to_owned()) 1455 .collect(); 1456 assert_eq!( 1457 combined.len(), 1458 5, 1459 "desc pagination visits each item exactly once" 1460 ); 1461 } 1462 1463 #[test] 1464 fn list_filtered_descending_respects_predicate() { 1465 let store = store(); 1466 let subject_did = did("did:plc:scallop"); 1467 let key = EdgeKey::new( 1468 nsid("sh.tangled.repo.issue"), 1469 SubjectRef::Did(subject_did.clone()), 1470 ); 1471 let by_nel = (0..3).map(|i| { 1472 star_edge_at( 1473 at(&format!("at://did:plc:nel/sh.tangled.repo.issue/r{i}")), 1474 subject_did.clone(), 1475 100 + i as u64, 1476 ) 1477 }); 1478 let by_olaren = (0..2).map(|i| { 1479 star_edge_at( 1480 at(&format!("at://did:plc:olaren/sh.tangled.repo.issue/o{i}")), 1481 subject_did.clone(), 1482 500 + i as u64, 1483 ) 1484 }); 1485 by_nel 1486 .chain(by_olaren) 1487 .map(|mut e| { 1488 e.kind = nsid("sh.tangled.repo.issue"); 1489 e 1490 }) 1491 .for_each(|e| store.add(e)); 1492 1493 let only_nel = |u: &AtUri<DefaultStr>| u.as_ref().starts_with("at://did:plc:nel/"); 1494 let page = store.list_filtered(&key, PageCursor::Start, limit(5), SortDir::Desc, only_nel); 1495 assert_eq!(page.items.len(), 3, "all three nel issues visible"); 1496 let last = page.items.last().unwrap().as_ref(); 1497 let first = page.items.first().unwrap().as_ref(); 1498 assert!( 1499 first > last, 1500 "desc order: first item rkey must be greater than last (got first={first}, last={last})", 1501 ); 1502 } 1503 1504 #[test] 1505 fn non_did_source_round_trips_via_raw_fallback() { 1506 let store = store(); 1507 let subject = did("did:plc:limpet"); 1508 let key = EdgeKey::new( 1509 nsid("sh.tangled.feed.star"), 1510 SubjectRef::Did(subject.clone()), 1511 ); 1512 let did_source = at("at://did:plc:nel/sh.tangled.feed.star/r1"); 1513 let handle_source = at("at://witchcraft.systems/sh.tangled.feed.star/r2"); 1514 store.add(star_edge_at(did_source.clone(), subject.clone(), 1)); 1515 store.add(star_edge_at(handle_source.clone(), subject.clone(), 2)); 1516 1517 let page = store.list(&key, PageCursor::Start, limit(10), SortDir::Asc); 1518 let got: std::collections::HashSet<String> = 1519 page.items.iter().map(|u| u.as_ref().to_owned()).collect(); 1520 assert!( 1521 got.contains(did_source.as_ref()), 1522 "did source must decode exactly" 1523 ); 1524 assert!( 1525 got.contains(handle_source.as_ref()), 1526 "non-did authority must round-trip through the raw fallback" 1527 ); 1528 1529 store.remove_source(&handle_source); 1530 assert_eq!( 1531 store.count(&key), 1532 1, 1533 "raw-keyed source removable by its uri" 1534 ); 1535 } 1536 1537 #[test] 1538 fn distinct_collections_decode_with_their_own_collection() { 1539 let store = store(); 1540 let subject = did("did:plc:limpet"); 1541 let key = EdgeKey::new( 1542 nsid("sh.tangled.feed.star"), 1543 SubjectRef::Did(subject.clone()), 1544 ); 1545 let star_src = at("at://did:plc:nel/sh.tangled.feed.star/aaa"); 1546 let issue_src = at("at://did:plc:nel/sh.tangled.repo.issue/bbb"); 1547 store.add(Edge { 1548 kind: nsid("sh.tangled.feed.star"), 1549 subject: SubjectRef::Did(subject.clone()), 1550 source: star_src.clone(), 1551 sort_micros: 1, 1552 }); 1553 store.add(Edge { 1554 kind: nsid("sh.tangled.feed.star"), 1555 subject: SubjectRef::Did(subject.clone()), 1556 source: issue_src.clone(), 1557 sort_micros: 2, 1558 }); 1559 1560 let page = store.list(&key, PageCursor::Start, limit(10), SortDir::Asc); 1561 let got: std::collections::HashSet<String> = 1562 page.items.iter().map(|u| u.as_ref().to_owned()).collect(); 1563 assert!( 1564 got.contains(star_src.as_ref()), 1565 "star-collection source decodes exactly" 1566 ); 1567 assert!( 1568 got.contains(issue_src.as_ref()), 1569 "issue-collection source must keep its own collection, not borrow the star one" 1570 ); 1571 } 1572 1573 #[test] 1574 fn author_refs_spill_preserves_distinct_count() { 1575 let store = store(); 1576 let subject = did("did:plc:scallop"); 1577 let key = EdgeKey::new( 1578 nsid("sh.tangled.feed.star"), 1579 SubjectRef::Did(subject.clone()), 1580 ); 1581 (0..5).for_each(|i| { 1582 store.add(star_edge_at( 1583 at(&format!( 1584 "at://did:plc:{}/sh.tangled.feed.star/r{i}", 1585 NAMES[i] 1586 )), 1587 subject.clone(), 1588 i as u64, 1589 )); 1590 }); 1591 assert_eq!( 1592 store.count_distinct_authors(&key), 1593 5, 1594 "five authors exceed the inline cap and spill to the map" 1595 ); 1596 1597 store.add(star_edge_at( 1598 at("at://did:plc:nel/sh.tangled.feed.star/r99"), 1599 subject.clone(), 1600 99, 1601 )); 1602 assert_eq!( 1603 store.count_distinct_authors(&key), 1604 5, 1605 "second nel source adds no author" 1606 ); 1607 1608 store.remove_source(&at("at://did:plc:nel/sh.tangled.feed.star/r0")); 1609 assert_eq!( 1610 store.count_distinct_authors(&key), 1611 5, 1612 "nel still present via r99" 1613 ); 1614 store.remove_source(&at("at://did:plc:nel/sh.tangled.feed.star/r99")); 1615 assert_eq!(store.count_distinct_authors(&key), 4, "nel fully removed"); 1616 } 1617}