Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

merging record iterator!?!?

it working!!!

+96 -12
+2
ufos/src/error.rs
··· 34 34 FjallLsmError(#[from] fjall::LsmError), 35 35 #[error("Bytes encoding error")] 36 36 EncodingError(#[from] EncodingError), 37 + #[error("If you ever see this, there's a bug in the code. The error was stolen")] 38 + Stolen, 37 39 }
+94 -12
ufos/src/storage_fjall.rs
··· 426 426 ) -> StorageResult<Vec<UFOsRecord>> { 427 427 if collections.is_empty() { 428 428 return Ok(vec![]); 429 - } else if collections.len() > 1 { 430 - todo!() 431 429 } 432 - 433 - let collection = collections[0]; 434 - 435 - let mut out = Vec::new(); 436 - for rec in RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)? { 437 - if let Some(r) = rec? { 438 - out.push(r) 439 - } else { 440 - break; 430 + let mut record_iterators = Vec::new(); 431 + for collection in collections { 432 + let iter = RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)?; 433 + record_iterators.push(iter.peekable()); 434 + } 435 + let mut merged = Vec::new(); 436 + loop { 437 + let mut latest: Option<(Cursor, usize)> = None; // ugh 438 + for (i, iter) in record_iterators.iter_mut().enumerate() { 439 + let Some(it) = iter.peek_mut() else { 440 + continue; 441 + }; 442 + let it = match it { 443 + Ok(v) => v, 444 + Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 445 + }; 446 + let Some(rec) = it else { 447 + break; 448 + }; 449 + if let Some((cursor, _)) = latest { 450 + if rec.cursor > cursor { 451 + latest = Some((rec.cursor, i)) 452 + } 453 + } else { 454 + latest = Some((rec.cursor, i)); 455 + } 441 456 } 457 + let Some((_, idx)) = latest else { 458 + break; 459 + }; 460 + // yeah yeah whateverrrrrrrrrrrrrrrr 461 + merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 442 462 } 443 - Ok(out) 463 + Ok(merged) 444 464 } 445 465 } 446 466 ··· 1648 1668 let records = 1649 1669 read.get_records_by_collections(&[&Nsid::new("d.e.f".to_string()).unwrap()], 2)?; 1650 1670 assert_eq!(records.len(), 0); 1671 + 1672 + Ok(()) 1673 + } 1674 + 1675 + #[test] 1676 + fn test_get_multi_collection() -> anyhow::Result<()> { 1677 + let (read, mut write) = fjall_db(); 1678 + 1679 + let mut batch = TestBatch::default(); 1680 + batch.create( 1681 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1682 + "a.a.a", 1683 + "aaa", 1684 + r#""earliest""#, 1685 + Some("rev-a"), 1686 + None, 1687 + 100, 1688 + ); 1689 + batch.create( 1690 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1691 + "a.a.b", 1692 + "aab", 1693 + r#""in between""#, 1694 + Some("rev-ab"), 1695 + None, 1696 + 101, 1697 + ); 1698 + batch.create( 1699 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1700 + "a.a.a", 1701 + "aaa-2", 1702 + r#""last""#, 1703 + Some("rev-a-2"), 1704 + None, 1705 + 102, 1706 + ); 1707 + write.insert_batch(batch.batch)?; 1708 + 1709 + let records = read.get_records_by_collections( 1710 + &[ 1711 + &Nsid::new("a.a.a".to_string()).unwrap(), 1712 + &Nsid::new("a.a.b".to_string()).unwrap(), 1713 + &Nsid::new("a.a.c".to_string()).unwrap(), 1714 + ], 1715 + 100, 1716 + )?; 1717 + assert_eq!(records.len(), 3); 1718 + assert_eq!(records[0].record.get(), r#""last""#); 1719 + assert_eq!( 1720 + records[0].collection, 1721 + Nsid::new("a.a.a".to_string()).unwrap() 1722 + ); 1723 + assert_eq!(records[1].record.get(), r#""in between""#); 1724 + assert_eq!( 1725 + records[1].collection, 1726 + Nsid::new("a.a.b".to_string()).unwrap() 1727 + ); 1728 + assert_eq!(records[2].record.get(), r#""earliest""#); 1729 + assert_eq!( 1730 + records[2].collection, 1731 + Nsid::new("a.a.a".to_string()).unwrap() 1732 + ); 1651 1733 1652 1734 Ok(()) 1653 1735 }