Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

schedule collection trims

keeps a list of dirty collections in memory from rollups and trims those

+211 -112
+2
ufos/src/error.rs
··· 38 38 Stolen, 39 39 #[error("Failed to join tokio task: {0}")] 40 40 JoinError(#[from] tokio::task::JoinError), 41 + #[error("Background task already started")] 42 + BackgroundAlreadyStarted, 41 43 }
+32 -40
ufos/src/main.rs
··· 2 2 use jetstream::events::Cursor; 3 3 use std::path::PathBuf; 4 4 use ufos::consumer; 5 - use ufos::error::StorageError; 6 5 use ufos::file_consumer; 7 6 use ufos::server; 8 - use ufos::storage::{StorageWhatever, StoreReader, StoreWriter}; 7 + use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 9 8 use ufos::storage_fjall::FjallStorage; 10 9 use ufos::storage_mem::MemStorage; 11 10 ··· 51 50 jetstream_fixture: bool, 52 51 } 53 52 54 - // #[tokio::main(flavor = "current_thread")] // TODO: move this to config via args 55 53 #[tokio::main] 56 54 async fn main() -> anyhow::Result<()> { 57 55 env_logger::init(); ··· 95 93 Ok(()) 96 94 } 97 95 98 - async fn go( 96 + async fn go<B: StoreBackground>( 99 97 jetstream: String, 100 98 jetstream_fixture: bool, 101 99 pause_writer: bool, 102 100 read_store: impl StoreReader + 'static, 103 - mut write_store: impl StoreWriter + 'static, 101 + mut write_store: impl StoreWriter<B> + 'static, 104 102 cursor: Option<Cursor>, 105 103 ) -> anyhow::Result<()> { 106 104 println!("starting server with storage..."); 107 105 let serving = server::serve(read_store); 108 106 109 - let t1 = tokio::task::spawn(async { 110 - let r = serving.await; 111 - log::warn!("serving ended with: {r:?}"); 112 - }); 107 + if pause_writer { 108 + log::info!("not starting jetstream or the write loop."); 109 + serving.await.map_err(|e| anyhow::anyhow!(e))?; 110 + return Ok(()); 111 + } 113 112 114 - let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 115 - async move { 116 - if !pause_writer { 117 - println!( 118 - "starting consumer with cursor: {cursor:?} from {:?} ago", 119 - cursor.map(|c| c.elapsed()) 120 - ); 121 - let mut batches = if jetstream_fixture { 122 - file_consumer::consume(jetstream.into()).await? 123 - } else { 124 - consumer::consume(&jetstream, cursor, false).await? 125 - }; 113 + let batches = if jetstream_fixture { 114 + log::info!("starting with jestream file fixture: {jetstream:?}"); 115 + file_consumer::consume(jetstream.into()).await? 116 + } else { 117 + log::info!( 118 + "starting consumer with cursor: {cursor:?} from {:?} ago", 119 + cursor.map(|c| c.elapsed()) 120 + ); 121 + consumer::consume(&jetstream, cursor, false).await? 122 + }; 126 123 127 - tokio::task::spawn_blocking(move || { 128 - while let Some(event_batch) = batches.blocking_recv() { 129 - write_store.insert_batch(event_batch)?; 130 - write_store 131 - .step_rollup() 132 - .inspect_err(|e| log::error!("laksjdfl: {e:?}"))?; 133 - } 134 - Ok::<(), StorageError>(()) 135 - }) 136 - .await??; 124 + let rolling = write_store.background_tasks()?.run(); 125 + let storing = write_store.receive_batches(batches); 137 126 138 - log::warn!("storage.receive ended with"); 139 - } else { 140 - log::info!("not starting jetstream or the write loop."); 141 - } 142 - Ok(()) 143 - } 144 - }); 127 + // let storing = tokio::task::spawn_blocking(move || { 128 + // while let Some(event_batch) = batches.blocking_recv() { 129 + // write_store.insert_batch(event_batch)?; 130 + // write_store 131 + // .step_rollup() 132 + // .inspect_err(|e| log::error!("rollup error: {e:?}"))?; 133 + // } 134 + // Ok::<(), StorageError>(()) 135 + // }); 145 136 146 137 tokio::select! { 147 - z = t1 => log::warn!("serve task ended: {z:?}"), 148 - z = t2 => log::warn!("storage task ended: {z:?}"), 138 + z = serving => log::warn!("serve task ended: {z:?}"), 139 + z = rolling => log::warn!("rollup task ended: {z:?}"), 140 + z = storing => log::warn!("storage task ended: {z:?}"), 149 141 }; 150 142 151 143 println!("bye!");
+33 -4
ufos/src/storage.rs
··· 1 - // use crate::store_types::CountsValue; 2 1 use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, TopCollections, UFOsRecord}; 3 2 use async_trait::async_trait; 4 3 use jetstream::exports::{Did, Nsid}; 4 + use std::collections::HashSet; 5 5 use std::path::Path; 6 + use tokio::sync::mpsc::Receiver; 6 7 7 8 pub type StorageResult<T> = Result<T, StorageError>; 8 9 9 - pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> { 10 + pub trait StorageWhatever<R: StoreReader, W: StoreWriter<B>, B: StoreBackground, C> { 10 11 fn init( 11 12 path: impl AsRef<Path>, 12 13 endpoint: String, ··· 17 18 Self: Sized; 18 19 } 19 20 20 - pub trait StoreWriter: Send + Sync { 21 + pub trait StoreWriter<B: StoreBackground>: Send + Sync 22 + where 23 + Self: 'static, 24 + { 25 + fn background_tasks(&mut self) -> StorageResult<B>; 26 + 27 + fn receive_batches<const LIMIT: usize>( 28 + mut self, 29 + mut batches: Receiver<EventBatch<LIMIT>>, 30 + ) -> impl std::future::Future<Output = StorageResult<()>> + Send 31 + where 32 + Self: Sized, 33 + { 34 + async { 35 + tokio::task::spawn_blocking(move || { 36 + while let Some(event_batch) = batches.blocking_recv() { 37 + self.insert_batch(event_batch)?; 38 + } 39 + Ok::<(), StorageError>(()) 40 + }) 41 + .await? 42 + } 43 + } 44 + 21 45 fn insert_batch<const LIMIT: usize>( 22 46 &mut self, 23 47 event_batch: EventBatch<LIMIT>, 24 48 ) -> StorageResult<()>; 25 49 26 - fn step_rollup(&mut self) -> StorageResult<usize>; 50 + fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>; 27 51 28 52 fn trim_collection(&mut self, collection: &Nsid, limit: usize) -> StorageResult<()>; 29 53 30 54 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>; 55 + } 56 + 57 + #[async_trait] 58 + pub trait StoreBackground: Send + Sync { 59 + async fn run(mut self) -> StorageResult<()>; 31 60 } 32 61 33 62 #[async_trait]
+88 -35
ufos/src/storage_fjall.rs
··· 1 1 use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr}; 2 2 use crate::error::StorageError; 3 - use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter}; 3 + use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 4 4 use crate::store_types::{ 5 5 AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 6 6 HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, ··· 13 13 use async_trait::async_trait; 14 14 use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; 15 15 use jetstream::events::Cursor; 16 - use std::collections::HashMap; 16 + use std::collections::{HashMap, HashSet}; 17 17 use std::path::Path; 18 - use std::time::SystemTime; 18 + use std::sync::{ 19 + atomic::{AtomicBool, Ordering}, 20 + Arc, 21 + }; 22 + use std::time::{Duration, Instant, SystemTime}; 19 23 20 24 const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds 21 25 const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; ··· 99 103 pub temp: bool, 100 104 } 101 105 102 - impl StorageWhatever<FjallReader, FjallWriter, FjallConfig> for FjallStorage { 106 + impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage { 103 107 fn init( 104 108 path: impl AsRef<Path>, 105 109 endpoint: String, ··· 160 164 rollups: rollups.clone(), 161 165 }; 162 166 let writer = FjallWriter { 167 + bg_taken: Arc::new(AtomicBool::new(false)), 163 168 keyspace, 164 169 global, 165 170 feeds, ··· 471 476 } 472 477 } 473 478 479 + #[derive(Clone)] 474 480 pub struct FjallWriter { 481 + bg_taken: Arc<AtomicBool>, 475 482 keyspace: Keyspace, 476 483 global: PartitionHandle, 477 484 feeds: PartitionHandle, ··· 602 609 } 603 610 } 604 611 605 - impl StoreWriter for FjallWriter { 612 + impl StoreWriter<FjallBackground> for FjallWriter { 613 + fn background_tasks(&mut self) -> StorageResult<FjallBackground> { 614 + if self.bg_taken.swap(true, Ordering::SeqCst) { 615 + Err(StorageError::BackgroundAlreadyStarted) 616 + } else { 617 + Ok(FjallBackground(self.clone())) 618 + } 619 + } 620 + 606 621 fn insert_batch<const LIMIT: usize>( 607 622 &mut self, 608 623 event_batch: EventBatch<LIMIT>, ··· 673 688 Ok(()) 674 689 } 675 690 676 - fn step_rollup(&mut self) -> StorageResult<usize> { 691 + fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> { 692 + let mut dirty_nsids = HashSet::new(); 693 + 677 694 let rollup_cursor = 678 695 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 679 696 StorageError::BadStateError("Could not find current rollup cursor".to_string()), ··· 683 700 let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 684 701 let mut timely_iter = self.rollups.range(live_counts_range).peekable(); 685 702 686 - let timely_next_cursor = timely_iter 703 + let timely_next = timely_iter 687 704 .peek_mut() 688 - .map(|kv| -> StorageResult<Cursor> { 705 + .map(|kv| -> StorageResult<LiveCountsKey> { 689 706 match kv { 690 707 Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?, 691 708 Ok((key_bytes, _)) => { 692 709 let key = db_complete::<LiveCountsKey>(key_bytes)?; 693 - Ok(key.cursor()) 710 + Ok(key) 694 711 } 695 712 } 696 713 }) ··· 711 728 }) 712 729 .transpose()?; 713 730 714 - let cursors_stepped = match (timely_next_cursor, next_delete) { 715 - ( 716 - Some(timely_next_cursor), 717 - Some((delete_cursor, delete_key_bytes, delete_val_bytes)), 718 - ) => { 719 - if timely_next_cursor < delete_cursor { 720 - self.rollup_live_counts( 731 + let cursors_stepped = match (timely_next, next_delete) { 732 + (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 733 + if timely.cursor() < delete_cursor { 734 + let n = self.rollup_live_counts( 721 735 timely_iter, 722 736 Some(delete_cursor), 723 737 MAX_BATCHED_ROLLUP_COUNTS, 724 - )? 738 + )?; 739 + dirty_nsids.insert(timely.collection().clone()); 740 + n 725 741 } else { 726 742 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 727 743 } 728 744 } 729 - (Some(_), None) => { 730 - self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)? 745 + (Some(timely), None) => { 746 + let n = self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?; 747 + dirty_nsids.insert(timely.collection().clone()); 748 + n 731 749 } 732 750 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 733 751 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? ··· 735 753 (None, None) => 0, 736 754 }; 737 755 738 - Ok(cursors_stepped) 756 + Ok((cursors_stepped, dirty_nsids)) 739 757 } 740 758 741 759 fn trim_collection( ··· 760 778 761 779 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 762 780 // record was deleted (hopefully) 763 - batch.remove(&self.feeds, &location_key_bytes); 781 + batch.remove(&self.feeds, &*key_bytes); 764 782 dangling_feed_keys_cleaned += 1; 765 783 continue; 766 784 }; ··· 769 787 770 788 if meta.cursor() != feed_key.cursor() { 771 789 // older/different version 772 - batch.remove(&self.feeds, &location_key_bytes); 790 + batch.remove(&self.feeds, &*key_bytes); 773 791 dangling_feed_keys_cleaned += 1; 774 792 continue; 775 793 } 776 794 if meta.rev != feed_val.rev() { 777 795 // weird... 778 796 log::warn!("record lookup: cursor match but rev did not...? removing."); 779 - batch.remove(&self.feeds, &location_key_bytes); 797 + batch.remove(&self.feeds, &*key_bytes); 798 + batch.remove(&self.records, &location_key_bytes); 780 799 dangling_feed_keys_cleaned += 1; 781 800 continue; 782 801 } ··· 791 810 continue; 792 811 } 793 812 794 - batch.remove(&self.feeds, &location_key_bytes); 813 + batch.remove(&self.feeds, key_bytes); 795 814 batch.remove(&self.records, &location_key_bytes); 796 815 records_deleted += 1; 797 816 } ··· 817 836 } 818 837 batch.commit()?; 819 838 Ok(records_deleted) 839 + } 840 + } 841 + 842 + pub struct FjallBackground(FjallWriter); 843 + 844 + #[async_trait] 845 + impl StoreBackground for FjallBackground { 846 + async fn run(mut self) -> StorageResult<()> { 847 + let mut dirty_nsids = HashSet::new(); 848 + 849 + let mut rollup = tokio::time::interval(Duration::from_millis(42)); 850 + rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 851 + 852 + let mut trim = tokio::time::interval(Duration::from_millis(3_000)); 853 + trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 854 + 855 + loop { 856 + tokio::select! { 857 + _ = rollup.tick() => { 858 + let (n, dirty) = self.0.step_rollup().inspect_err(|e| log::error!("rollup error: {e:?}"))?; 859 + dirty_nsids.extend(dirty); 860 + log::info!("rolled up {n} items {dirty_nsids:?} ({} collections now dirty)", dirty_nsids.len()); 861 + }, 862 + _ = trim.tick() => { 863 + log::info!("trimming {} nsids: {dirty_nsids:?}", dirty_nsids.len()); 864 + let t0 = Instant::now(); 865 + for collection in &dirty_nsids { 866 + self.0.trim_collection(collection, 512).inspect_err(|e| log::error!("trim error: {e:?}"))?; 867 + } 868 + log::info!("finished trimming in {:?}", t0.elapsed()); 869 + dirty_nsids.clear(); 870 + }, 871 + }; 872 + } 820 873 } 821 874 } 822 875 ··· 1473 1526 ); 1474 1527 write.insert_batch(batch.batch)?; 1475 1528 1476 - let n = write.step_rollup()?; 1529 + let (n, _) = write.step_rollup()?; 1477 1530 assert_eq!(n, 1); 1478 1531 1479 1532 let mut batch = TestBatch::default(); ··· 1484 1537 read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1485 1538 assert_eq!(records.len(), 1); 1486 1539 1487 - let n = write.step_rollup()?; 1540 + let (n, _) = write.step_rollup()?; 1488 1541 assert_eq!(n, 1); 1489 1542 1490 1543 let records = ··· 1495 1548 batch.delete_account("did:plc:person-a", 9_999); 1496 1549 write.insert_batch(batch.batch)?; 1497 1550 1498 - let n = write.step_rollup()?; 1551 + let (n, _) = write.step_rollup()?; 1499 1552 assert_eq!(n, 0); 1500 1553 1501 1554 Ok(()) ··· 1529 1582 ); 1530 1583 write.insert_batch(batch.batch)?; 1531 1584 1532 - let n = write.step_rollup()?; 1585 + let (n, _) = write.step_rollup()?; 1533 1586 assert_eq!(n, 2); 1534 1587 1535 - let n = write.step_rollup()?; 1588 + let (n, _) = write.step_rollup()?; 1536 1589 assert_eq!(n, 0); 1537 1590 1538 1591 Ok(()) ··· 1586 1639 assert_eq!(dids, 2); 1587 1640 1588 1641 // first batch rolled up 1589 - let n = write.step_rollup()?; 1642 + let (n, _) = write.step_rollup()?; 1590 1643 assert_eq!(n, 1); 1591 1644 1592 1645 let (records, dids) = ··· 1595 1648 assert_eq!(dids, 2); 1596 1649 1597 1650 // delete account rolled up 1598 - let n = write.step_rollup()?; 1651 + let (n, _) = write.step_rollup()?; 1599 1652 assert_eq!(n, 1); 1600 1653 1601 1654 let (records, dids) = ··· 1604 1657 assert_eq!(dids, 2); 1605 1658 1606 1659 // second batch rolled up 1607 - let n = write.step_rollup()?; 1660 + let (n, _) = write.step_rollup()?; 1608 1661 assert_eq!(n, 1); 1609 1662 1610 1663 let (records, dids) = ··· 1613 1666 assert_eq!(dids, 2); 1614 1667 1615 1668 // no more rollups left 1616 - let n = write.step_rollup()?; 1669 + let (n, _) = write.step_rollup()?; 1617 1670 assert_eq!(n, 0); 1618 1671 1619 1672 Ok(()) ··· 1662 1715 ); 1663 1716 write.insert_batch(batch.batch)?; 1664 1717 1665 - let n = write.step_rollup()?; 1718 + let (n, _) = write.step_rollup()?; 1666 1719 assert_eq!(n, 3); // 3 collections 1667 1720 1668 1721 let tops = read.get_top_collections()?; ··· 1750 1803 ); 1751 1804 write.insert_batch(batch.batch)?; 1752 1805 1753 - let n = write.step_rollup()?; 1806 + let (n, _) = write.step_rollup()?; 1754 1807 assert_eq!(n, 2); // 3 collections 1755 1808 1756 1809 let tops = read.get_top_collections()?;
+56 -33
ufos/src/storage_mem.rs
··· 3 3 4 4 use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr}; 5 5 use crate::error::StorageError; 6 - use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter}; 6 + use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 7 7 use crate::store_types::{ 8 8 AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 9 9 HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, ··· 18 18 use lsm_tree::range::prefix_to_range; 19 19 use std::collections::BTreeMap; 20 20 use std::collections::HashMap; 21 + use std::collections::HashSet; 21 22 use std::path::Path; 22 23 use std::sync::Mutex; 23 24 use std::sync::RwLock; ··· 250 251 //////////// 251 252 //////////// 252 253 253 - impl StorageWhatever<MemReader, MemWriter, MemConfig> for MemStorage { 254 + impl StorageWhatever<MemReader, MemWriter, MemBackground, MemConfig> for MemStorage { 254 255 fn init( 255 256 _path: impl AsRef<Path>, 256 257 endpoint: String, ··· 780 781 } 781 782 } 782 783 783 - impl StoreWriter for MemWriter { 784 + impl StoreWriter<MemBackground> for MemWriter { 785 + fn background_tasks(&mut self) -> StorageResult<MemBackground> { 786 + Ok(MemBackground {}) 787 + } 788 + 784 789 fn insert_batch<const LIMIT: usize>( 785 790 &mut self, 786 791 event_batch: EventBatch<LIMIT>, ··· 851 856 Ok(()) 852 857 } 853 858 854 - fn step_rollup(&mut self) -> StorageResult<usize> { 859 + fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)> { 860 + let mut dirty_nsids = HashSet::new(); 861 + 855 862 let rollup_cursor = 856 863 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 857 864 .ok_or(StorageError::BadStateError( ··· 864 871 .inspect_err(|e| log::warn!("live counts range: {e:?}"))?; 865 872 let mut timely_iter = self.rollups.range(live_counts_range).into_iter().peekable(); 866 873 867 - let timely_next_cursor = timely_iter 874 + let timely_next = timely_iter 868 875 .peek_mut() 869 - .map(|kv| -> StorageResult<Cursor> { 876 + .map(|kv| -> StorageResult<LiveCountsKey> { 870 877 match kv { 871 878 Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 872 879 Ok((key_bytes, _)) => { 873 880 let key = db_complete::<LiveCountsKey>(key_bytes).inspect_err(|e| { 874 881 log::warn!("failed getting key for next timely: {e:?}") 875 882 })?; 876 - Ok(key.cursor()) 883 + Ok(key) 877 884 } 878 885 } 879 886 }) ··· 899 906 .transpose() 900 907 .inspect_err(|e| log::warn!("failed getting next delete: {e:?}"))?; 901 908 902 - let cursors_stepped = match (timely_next_cursor, next_delete) { 903 - ( 904 - Some(timely_next_cursor), 905 - Some((delete_cursor, delete_key_bytes, delete_val_bytes)), 906 - ) => { 907 - if timely_next_cursor < delete_cursor { 908 - self.rollup_live_counts( 909 - timely_iter, 910 - Some(delete_cursor), 911 - MAX_BATCHED_ROLLUP_COUNTS, 912 - ) 913 - .inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))? 909 + let cursors_stepped = match (timely_next, next_delete) { 910 + (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 911 + if timely.cursor() < delete_cursor { 912 + let n = self 913 + .rollup_live_counts( 914 + timely_iter, 915 + Some(delete_cursor), 916 + MAX_BATCHED_ROLLUP_COUNTS, 917 + ) 918 + .inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))?; 919 + dirty_nsids.insert(timely.collection().clone()); 920 + n 914 921 } else { 915 922 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 916 923 .inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))? 917 924 } 918 925 } 919 - (Some(_), None) => self 920 - .rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS) 921 - .inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?, 926 + (Some(timely), None) => { 927 + let n = self 928 + .rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS) 929 + .inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?; 930 + dirty_nsids.insert(timely.collection().clone()); 931 + n 932 + } 922 933 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => self 923 934 .rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 924 935 .inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))?, 925 936 (None, None) => 0, 926 937 }; 927 938 928 - Ok(cursors_stepped) 939 + Ok((cursors_stepped, dirty_nsids)) 929 940 } 930 941 931 942 fn trim_collection( ··· 1007 1018 } 1008 1019 batch.commit()?; 1009 1020 Ok(records_deleted) 1021 + } 1022 + } 1023 + 1024 + pub struct MemBackground; 1025 + 1026 + #[async_trait] 1027 + impl StoreBackground for MemBackground { 1028 + async fn run(mut self) -> StorageResult<()> { 1029 + // noop for mem (is there a nicer way to do this?) 1030 + loop { 1031 + tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await; 1032 + } 1010 1033 } 1011 1034 } 1012 1035 ··· 1590 1613 ); 1591 1614 write.insert_batch(batch.batch)?; 1592 1615 1593 - let n = write.step_rollup()?; 1616 + let (n, _) = write.step_rollup()?; 1594 1617 assert_eq!(n, 1); 1595 1618 1596 1619 let mut batch = TestBatch::default(); ··· 1601 1624 read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1602 1625 assert_eq!(records.len(), 1); 1603 1626 1604 - let n = write.step_rollup()?; 1627 + let (n, _) = write.step_rollup()?; 1605 1628 assert_eq!(n, 1); 1606 1629 1607 1630 let records = ··· 1612 1635 batch.delete_account("did:plc:person-a", 9_999); 1613 1636 write.insert_batch(batch.batch)?; 1614 1637 1615 - let n = write.step_rollup()?; 1638 + let (n, _) = write.step_rollup()?; 1616 1639 assert_eq!(n, 0); 1617 1640 1618 1641 Ok(()) ··· 1646 1669 ); 1647 1670 write.insert_batch(batch.batch)?; 1648 1671 1649 - let n = write.step_rollup()?; 1672 + let (n, _) = write.step_rollup()?; 1650 1673 assert_eq!(n, 2); 1651 1674 1652 - let n = write.step_rollup()?; 1675 + let (n, _) = write.step_rollup()?; 1653 1676 assert_eq!(n, 0); 1654 1677 1655 1678 Ok(()) ··· 1703 1726 assert_eq!(dids, 2); 1704 1727 1705 1728 // first batch rolled up 1706 - let n = write.step_rollup()?; 1729 + let (n, _) = write.step_rollup()?; 1707 1730 assert_eq!(n, 1); 1708 1731 1709 1732 let (records, dids) = ··· 1712 1735 assert_eq!(dids, 2); 1713 1736 1714 1737 // delete account rolled up 1715 - let n = write.step_rollup()?; 1738 + let (n, _) = write.step_rollup()?; 1716 1739 assert_eq!(n, 1); 1717 1740 1718 1741 let (records, dids) = ··· 1721 1744 assert_eq!(dids, 2); 1722 1745 1723 1746 // second batch rolled up 1724 - let n = write.step_rollup()?; 1747 + let (n, _) = write.step_rollup()?; 1725 1748 assert_eq!(n, 1); 1726 1749 1727 1750 let (records, dids) = ··· 1730 1753 assert_eq!(dids, 2); 1731 1754 1732 1755 // no more rollups left 1733 - let n = write.step_rollup()?; 1756 + let (n, _) = write.step_rollup()?; 1734 1757 assert_eq!(n, 0); 1735 1758 1736 1759 Ok(()) ··· 1779 1802 ); 1780 1803 write.insert_batch(batch.batch)?; 1781 1804 1782 - let n = write.step_rollup()?; 1805 + let (n, _) = write.step_rollup()?; 1783 1806 assert_eq!(n, 3); // 3 collections 1784 1807 1785 1808 let tops = read.get_top_collections()?;