···2828 /// Location to store persist data to disk
2929 #[arg(long)]
3030 data: PathBuf,
3131+ /// DEBUG: force the rw loop to fall behind by pausing it
3232+ #[arg(long, action)]
3333+ pause_rw: bool,
3134}
32353336// #[tokio::main(flavor = "current_thread")] // TODO: move this to config via args
···6265 });
63666467 let t3 = tokio::task::spawn(async move {
6565- let r = storage.rw_loop().await;
6666- log::warn!("storage.rw_loop ended with: {r:?}");
6868+ if !args.pause_rw {
6969+ let r = storage.rw_loop().await;
7070+ log::warn!("storage.rw_loop ended with: {r:?}");
7171+ } else {
7272+ log::info!("not starting rw loop.");
7373+ }
6774 });
68756976 // tokio::select! {
+131-98
ufos/src/store.rs
···1717use std::path::{Path, PathBuf};
1818use std::time::{Duration, Instant};
1919use tokio::time::sleep;
2020-use tokio::sync::{mpsc::Receiver, Mutex};
2020+use tokio::sync::{mpsc::Receiver};
21212222/// Commit the RW batch immediately if this number of events have been read off the mod queue
2323-const MAX_BATCHED_RW_EVENTS: usize = 16;
2323+const MAX_BATCHED_RW_EVENTS: usize = 18;
24242525/// Commit the RW batch immediately if this number of records is reached
2626///
···2929/// - doing more work whenever scheduled means getting more CPU time in general
3030///
3131/// this is higher than [MAX_BATCHED_RW_EVENTS] because account-deletes can have lots of items
3232-const MAX_BATCHED_RW_ITEMS: usize = 48;
3232+const MAX_BATCHED_RW_ITEMS: usize = 24;
333334343535#[derive(Clone)]
···3838 partition: PartitionHandle,
3939}
40404141-// struct FakeMutex<T> {
4242-// thing: T,
4343-// }
4444-// impl<T: Clone> FakeMutex<T> {
4545-// pub fn new(thing: T) -> Self {
4646-// Self { thing }
4747-// }
4848-// pub async fn lock(&self) -> T {
4949-// self.thing.clone()
5050-// }
5151-// }
4141+struct FakeMutex<T> {
4242+ thing: T,
4343+}
4444+impl<T: Clone> FakeMutex<T> {
4545+ pub fn new(thing: T) -> Self {
4646+ Self { thing }
4747+ }
4848+ pub async fn lock(&self) -> T {
4949+ self.thing.clone()
5050+ }
5151+}
52525353/**
5454 * data format, roughly:
···7878#[derive(Clone)]
7979pub struct Storage {
8080 /// horrible: gate all db access behind this to force global serialization to avoid deadlock
8181- db: Arc<Mutex<SerialDb>>,
8181+ db: Arc<FakeMutex<SerialDb>>,
8282}
83838484impl Storage {
···8989 PartitionCreateOptions::default().compression(CompressionType::None),
9090 )?;
9191 Ok(Self {
9292- db: Arc::new(Mutex::new(SerialDb { keyspace, partition })),
9292+ db: Arc::new(FakeMutex::new(SerialDb { keyspace, partition })),
9393 })
9494 }
9595···168168 pub async fn rw_loop(&self) -> anyhow::Result<()> {
169169 // TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time.
170170 loop {
171171- sleep(Duration::from_secs_f64(0.15)).await; // todo: interval rate-limit instead
171171+ sleep(Duration::from_secs_f64(0.1)).await; // todo: interval rate-limit instead
172172173173 let db = self.db.lock().await;
174174 let keyspace = db.keyspace.clone();
···186186 let mut any_tasks_found = false;
187187188188 log::trace!("rw: iterating newer rw items...");
189189- for (i, pair) in partition.range(range.clone()).enumerate() {
190190- log::trace!("rw: iterating {i}");
191191- any_tasks_found = true;
192189193193- if i >= MAX_BATCHED_RW_EVENTS {
194194- break;
195195- }
196190197197- let (key_bytes, val_bytes) = pair?;
198198- let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
199199- Ok(k) => k,
200200- Err(EncodingError::WrongStaticPrefix(_, _)) => {
201201- panic!("wsp: mod queue empty.");
191191+ //// ITER
192192+193193+ {
194194+ let iterator = partition.range(range.clone()).enumerate().into_iter();
195195+196196+ for (i, pair) in iterator {
197197+ log::trace!("rw: iterating {i}");
198198+ any_tasks_found = true;
199199+200200+ if i >= MAX_BATCHED_RW_EVENTS {
201201+ break;
202202 }
203203- otherwise => otherwise?,
204204- };
203203+204204+ let (key_bytes, val_bytes) = pair?;
205205+ let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
206206+ Ok(k) => k,
207207+ Err(EncodingError::WrongStaticPrefix(_, _)) => {
208208+ panic!("wsp: mod queue empty.");
209209+ }
210210+ otherwise => otherwise?,
211211+ };
205212206206- let mod_value: ModQueueItemValue =
207207- db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
213213+ let mod_value: ModQueueItemValue =
214214+ db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
208215209209- log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
210210- batched_rw_items += DBWriter {
211211- keyspace: keyspace.clone(),
212212- partition: partition.clone(),
213213- }
214214- .write_rw(&mut db_batch, mod_key, mod_value)?;
215215- log::trace!("rw: iterating {i}: back from batcher.");
216216+ log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
217217+ batched_rw_items += DBWriter {
218218+ keyspace: keyspace.clone(),
219219+ partition: partition.clone(),
220220+ }
221221+ .write_rw(&mut db_batch, mod_key, mod_value)?;
222222+ log::trace!("rw: iterating {i}: back from batcher.");
216223217217- if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
218218- log::trace!("rw: iterating {i}: batch big enough, breaking out.");
219219- break;
224224+ if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
225225+ log::trace!("rw: iterating {i}: batch big enough, breaking out.");
226226+ break;
227227+ }
220228 }
229229+ // drop(iterator); // moved -- must be dropped hopefully
221230 }
222231223232 if !any_tasks_found {
···246255 let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?;
247256 tokio::task::spawn_blocking(move || {
248257 let mut output = Vec::new();
249249- for pair in partition.prefix(&prefix).rev().take(limit) {
250250- let (k_bytes, v_bytes) = pair?;
251251- let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into();
252252- let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into();
253253- output.push(CreateRecord {
254254- did,
255255- rkey,
256256- record,
257257- cursor,
258258- })
258258+259259+260260+ ////// ITER
261261+ {
262262+ for pair in partition.prefix(&prefix).rev().take(limit) {
263263+ let (k_bytes, v_bytes) = pair?;
264264+ let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into();
265265+ let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into();
266266+ output.push(CreateRecord {
267267+ did,
268268+ rkey,
269269+ record,
270270+ cursor,
271271+ })
272272+ }
259273 }
260274 Ok(output)
261275 })
···375389376390 let mut scanned = 0;
377391 let mut rolled = 0;
378378- for pair in partition.range(range) {
379379- let (key_bytes, value_bytes) = pair?;
380380- let key = db_complete::<ByCursorSeenKey>(&key_bytes)?;
381381- let val = db_complete::<ByCursorSeenValue>(&value_bytes)?;
392392+393393+394394+ ////// ITER
395395+396396+ {
397397+ for pair in partition.range(range) {
398398+ let (key_bytes, value_bytes) = pair?;
399399+ let key = db_complete::<ByCursorSeenKey>(&key_bytes)?;
400400+ let val = db_complete::<ByCursorSeenValue>(&value_bytes)?;
382401383383- if *key.collection() == collection {
384384- let SeenCounter(n) = val;
385385- collection_total += n;
386386- rolled += 1;
402402+ if *key.collection() == collection {
403403+ let SeenCounter(n) = val;
404404+ collection_total += n;
405405+ rolled += 1;
406406+ }
407407+ scanned += 1;
387408 }
388388- scanned += 1;
389409 }
390410391411 eprintln!("scanned: {scanned}, rolled: {rolled}");
···491511 let mut items_removed = 0;
492512493513 log::trace!("delete_record: iterate over up to current cursor...");
494494- for (i, pair) in self.partition.range(key_prefix_bytes..key_limit).enumerate() {
495495- log::trace!("delete_record iter {i}: found");
496496- // find all (hopefully 1)
497497- let (key_bytes, _) = pair?;
498498- let key = db_complete::<ByIdKey>(&key_bytes)?;
499499- let found_cursor = key.cursor();
500500- if found_cursor > cursor {
501501- // we are *only* allowed to delete records that came before the record delete event
502502- // log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}");
503503- panic!("wtf, found newer version than cursor limit we tried to set.");
504504- // break;
505505- }
506514507507- // remove the by_id entry
508508- db_batch.remove(&self.partition, key_bytes);
509515510510- // remove its record sample
511511- let by_collection_key_bytes =
512512- ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?;
513513- db_batch.remove(&self.partition, by_collection_key_bytes);
516516+ ////////// ITER
514517515515- items_removed += 1;
518518+ {
519519+ for (i, pair) in self.partition.range(key_prefix_bytes..key_limit).enumerate() {
520520+ log::trace!("delete_record iter {i}: found");
521521+ // find all (hopefully 1)
522522+ let (key_bytes, _) = pair?;
523523+ let key = db_complete::<ByIdKey>(&key_bytes)?;
524524+ let found_cursor = key.cursor();
525525+ if found_cursor > cursor {
526526+ // we are *only* allowed to delete records that came before the record delete event
527527+ // log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}");
528528+ panic!("wtf, found newer version than cursor limit we tried to set.");
529529+ // break;
530530+ }
531531+532532+ // remove the by_id entry
533533+ db_batch.remove(&self.partition, key_bytes);
534534+535535+ // remove its record sample
536536+ let by_collection_key_bytes =
537537+ ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?;
538538+ db_batch.remove(&self.partition, by_collection_key_bytes);
539539+540540+ items_removed += 1;
541541+ }
516542 }
517543518544 // if items_removed > 1 {
···541567 let key_prefix_bytes = ByIdKey::did_prefix(did).to_db_bytes()?;
542568543569 let mut items_added = 0;
544544- for pair in self.partition.prefix(&key_prefix_bytes) {
545545- let (key_bytes, _) = pair?;
570570+571571+572572+573573+ ////////// ITER
574574+575575+ {
576576+ for pair in self.partition.prefix(&key_prefix_bytes) {
577577+ let (key_bytes, _) = pair?;
546578547547- let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into();
548548- if found_cursor > cursor {
549549- log::trace!(
550550- "delete account: found (and ignoring) newer records than the delete event??"
551551- );
552552- continue;
553553- }
579579+ let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into();
580580+ if found_cursor > cursor {
581581+ log::trace!(
582582+ "delete account: found (and ignoring) newer records than the delete event??"
583583+ );
584584+ continue;
585585+ }
554586555555- // remove the by_id entry
556556- db_batch.remove(&self.partition, key_bytes);
587587+ // remove the by_id entry
588588+ db_batch.remove(&self.partition, key_bytes);
557589558558- // remove its record sample
559559- let by_collection_key_bytes =
560560- ByCollectionKey::new(collection, found_cursor).to_db_bytes()?;
561561- db_batch.remove(&self.partition, by_collection_key_bytes);
590590+ // remove its record sample
591591+ let by_collection_key_bytes =
592592+ ByCollectionKey::new(collection, found_cursor).to_db_bytes()?;
593593+ db_batch.remove(&self.partition, by_collection_key_bytes);
562594563563- items_added += 1;
564564- if items_added >= MAX_BATCHED_RW_ITEMS {
565565- return Ok((items_added, false)); // there might be more records but we've done enough for this batch
595595+ items_added += 1;
596596+ if items_added >= MAX_BATCHED_RW_ITEMS {
597597+ return Ok((items_added, false)); // there might be more records but we've done enough for this batch
598598+ }
566599 }
567600 }
568601