···276276 }
277277}
278278279279+// BUG: this needs to use the null-terminating string thing!!!!!!!!!!!!!! the whole point of all of this!!!!
279280impl DbBytes for Nsid {
280281 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
281282 let (s, n) = decode_from_slice(bytes, bincode_conf())?;
+103-39
ufos/src/storage_fjall.rs
···22use crate::error::StorageError;
33use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
44use crate::store_types::{
55- AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CountsValue, DeleteAccountQueueKey,
66- DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, HourlyRecordsKey, HourlyRollupKey,
77- JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
88- LiveCountsKey, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
55+ AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CountsValue, CursorBucket,
66+ DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey,
77+ HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey,
88+ JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
99+ NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
910 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
1011 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
1112 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey,
···1516 UFOsRecord,
1617};
1718use async_trait::async_trait;
1919+use fjall::Snapshot;
1820use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle};
1921use jetstream::events::Cursor;
2022use std::collections::{HashMap, HashSet};
···373375 })
374376 }
375377378378+ fn get_earliest_hour(&self, rollups: Option<&Snapshot>) -> StorageResult<HourTruncatedCursor> {
379379+ let cursor = rollups
380380+ .unwrap_or(&self.rollups.snapshot())
381381+ .prefix(HourlyRollupStaticPrefix::default().to_db_bytes()?)
382382+ .next()
383383+ .transpose()?
384384+ .map(|(key_bytes, _)| db_complete::<HourlyRollupKey>(&key_bytes))
385385+ .transpose()?
386386+ .map(|key| key.cursor())
387387+ .unwrap_or_else(|| Cursor::from_start().into());
388388+ Ok(cursor)
389389+ }
390390+376391 fn get_all_collections(
377392 &self,
378393 limit: usize,
379394 cursor: Option<Vec<u8>>,
380380- _since: Option<HourTruncatedCursor>,
381381- _until: Option<HourTruncatedCursor>,
395395+ since: Option<HourTruncatedCursor>,
396396+ until: Option<HourTruncatedCursor>,
382397 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
383383- Ok(if true {
384384- let snapshot = self.rollups.snapshot();
398398+ let snapshot = self.rollups.snapshot();
385399386386- let start = if let Some(cursor_bytes) = cursor {
387387- let nsid = db_complete::<Nsid>(&cursor_bytes)?; // TODO: bubble a *client* error type
388388- Bound::Excluded(
389389- AllTimeRollupKey::from_pair(Default::default(), nsid).to_db_bytes()?,
390390- )
391391- } else {
392392- Bound::Included(AllTimeRollupKey::from_prefix_to_db_bytes(
393393- &Default::default(),
394394- )?)
395395- };
400400+ let buckets = if let (None, None) = (since, until) {
401401+ vec![CursorBucket::AllTime]
402402+ } else {
403403+ let mut lower = self.get_earliest_hour(Some(&snapshot))?;
404404+ if let Some(specified) = since {
405405+ if specified > lower {
406406+ lower = specified;
407407+ }
408408+ }
409409+ let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
410410+ CursorBucket::buckets_spanning(lower, upper)
411411+ };
396412397397- let end_bytes = AllTimeRollupKey::prefix_range_end(&Default::default())?;
398398- let end = Bound::Excluded(end_bytes.clone());
413413+ let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; // TODO: bubble a *client* error type
399414400400- let mut out = Vec::new();
401401- let mut next_cursor = None;
402402- for (i, kv) in snapshot.range((start, end)).take(limit).enumerate() {
403403- let (key_bytes, val_bytes) = kv?;
404404- let key = db_complete::<AllTimeRollupKey>(&key_bytes)?;
405405- let db_counts = db_complete::<CountsValue>(&val_bytes)?;
406406- out.push(NsidCount {
407407- nsid: key.collection().to_string(),
408408- records: db_counts.records(),
409409- dids_estimate: db_counts.dids().estimate() as u64,
410410- });
411411- if i == limit - 1 {
412412- log::warn!("reached limit, setting next cursor");
413413- let nsid_bytes = key.collection().to_db_bytes()?;
414414- next_cursor = Some(nsid_bytes);
415415+ type Item = StorageResult<(Nsid, fjall::Slice)>;
416416+ let mut iters = Vec::with_capacity(buckets.len());
417417+ for bucket in buckets {
418418+ match bucket {
419419+ CursorBucket::Hour(_hour) => todo!(),
420420+ CursorBucket::Week(_week) => todo!(),
421421+ CursorBucket::AllTime => {
422422+ let start = if let Some(ref nsid) = cursor_nsid {
423423+ Bound::Excluded(
424424+ AllTimeRollupKey::from_pair(Default::default(), nsid.clone())
425425+ .to_db_bytes()?,
426426+ )
427427+ } else {
428428+ Bound::Included(AllTimeRollupKey::from_prefix_to_db_bytes(
429429+ &Default::default(),
430430+ )?)
431431+ };
432432+ let end =
433433+ Bound::Excluded(AllTimeRollupKey::prefix_range_end(&Default::default())?);
434434+ let it = snapshot.range((start, end)).map(|kv| match kv {
435435+ Ok((k_bytes, v_bytes)) => db_complete::<AllTimeRollupKey>(&k_bytes)
436436+ .map(|key| (key.collection().clone(), v_bytes))
437437+ .map_err(|e| e.into()),
438438+ Err(e) => Err(e.into()), // lsm-tree error into fjall error
439439+ });
440440+ let boxed: Box<dyn Iterator<Item = Item>> = Box::new(it);
441441+ iters.push(boxed.peekable());
415442 }
416443 }
444444+ }
417445418418- (out, next_cursor)
419419- } else {
420420- todo!()
421421- })
446446+ let mut out = Vec::new();
447447+ let mut current_nsid = None;
448448+ for _ in 0..limit {
449449+ // double-scan the iters for each element: this could be eliminated but we're starting simple.
450450+ // first scan: find the lowest nsid
451451+ // second scan: take + merge, and advance all iters with lowest nsid
452452+ let mut lowest: Option<Nsid> = None;
453453+ for iter in &mut iters {
454454+ if let Some(bla) = iter.peek_mut() {
455455+ let (nsid, _) = match bla {
456456+ Ok(v) => v,
457457+ Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
458458+ };
459459+ lowest = match lowest {
460460+ Some(ref current) if nsid.as_str() > current.as_str() => lowest,
461461+ _ => Some(nsid.clone()),
462462+ };
463463+ }
464464+ }
465465+ current_nsid = lowest.clone();
466466+ let Some(nsid) = lowest else { break };
467467+468468+ let mut merged = CountsValue::default();
469469+ for iter in &mut iters {
470470+ // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop
471471+ if let Some(Ok((_, count_bytes))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid)
472472+ {
473473+ let counts = db_complete::<CountsValue>(&count_bytes)?;
474474+ merged.merge(&counts);
475475+ }
476476+ }
477477+ out.push(NsidCount {
478478+ nsid: nsid.to_string(),
479479+ records: merged.records(),
480480+ dids_estimate: merged.dids().estimate() as u64,
481481+ });
482482+ }
483483+484484+ let next_cursor = current_nsid.map(|s| s.to_db_bytes()).transpose()?;
485485+ Ok((out, next_cursor))
422486 }
423487424488 fn get_top_collections_by_count(