···
2
2
use crate::error::StorageError;
3
3
use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
4
4
use crate::store_types::{
5
5
-
AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal,
6
6
-
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
7
7
-
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
8
8
-
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
9
9
-
RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue,
10
10
-
TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyRollupKey,
5
5
+
AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CountsValue, DeleteAccountQueueKey,
6
6
+
DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, HourlyRecordsKey, HourlyRollupKey,
7
7
+
JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
8
8
+
LiveCountsKey, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
9
9
+
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey,
10
10
+
TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyDidsKey, WeeklyRecordsKey,
11
11
+
WeeklyRollupKey,
11
12
};
12
13
use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord};
13
14
use async_trait::async_trait;
···
597
598
last_cursor = key.cursor();
598
599
}
599
600
601
601
+
// go through each new rollup thing and merge it with whatever might already be in the db
600
602
for ((nsid, rollup), counts) in counts_by_rollup {
601
601
-
let key_bytes = match rollup {
603
603
+
let rollup_key_bytes = match rollup {
602
604
Rollup::Hourly(hourly_cursor) => {
603
605
HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()?
604
606
}
···
609
611
};
610
612
let mut rolled: CountsValue = self
611
613
.rollups
612
612
-
.get(&key_bytes)?
614
614
+
.get(&rollup_key_bytes)?
613
615
.as_deref()
614
616
.map(db_complete::<CountsValue>)
615
617
.transpose()?
616
618
.unwrap_or_default();
617
619
620
620
+
// now that we have values, we can fetch the ranks
621
621
+
// TODO: save .records() and .estimate() and then do this match only once later
622
622
+
let (rank_records_key_bytes, rank_dids_key_bytes) = match rollup {
623
623
+
Rollup::Hourly(hourly_cursor) => (
624
624
+
HourlyRecordsKey::new(hourly_cursor, rolled.records().into(), &nsid)
625
625
+
.to_db_bytes()?,
626
626
+
HourlyDidsKey::new(
627
627
+
hourly_cursor,
628
628
+
(rolled.dids().estimate() as u64).into(),
629
629
+
&nsid,
630
630
+
)
631
631
+
.to_db_bytes()?,
632
632
+
),
633
633
+
Rollup::Weekly(weekly_cursor) => (
634
634
+
WeeklyRecordsKey::new(weekly_cursor, rolled.records().into(), &nsid)
635
635
+
.to_db_bytes()?,
636
636
+
WeeklyDidsKey::new(
637
637
+
weekly_cursor,
638
638
+
(rolled.dids().estimate() as u64).into(),
639
639
+
&nsid,
640
640
+
)
641
641
+
.to_db_bytes()?,
642
642
+
),
643
643
+
Rollup::AllTime => (
644
644
+
AllTimeRecordsKey::new(rolled.records().into(), &nsid).to_db_bytes()?,
645
645
+
AllTimeDidsKey::new((rolled.dids().estimate() as u64).into(), &nsid)
646
646
+
.to_db_bytes()?,
647
647
+
),
648
648
+
};
649
649
+
650
650
+
// update the rollup
618
651
rolled.merge(&counts);
619
619
-
batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?);
652
652
+
653
653
+
// replace rank entries
654
654
+
let (new_rank_records_key_bytes, new_rank_dids_key_bytes) = match rollup {
655
655
+
Rollup::Hourly(hourly_cursor) => (
656
656
+
HourlyRecordsKey::new(hourly_cursor, rolled.records().into(), &nsid)
657
657
+
.to_db_bytes()?,
658
658
+
HourlyDidsKey::new(
659
659
+
hourly_cursor,
660
660
+
(rolled.dids().estimate() as u64).into(),
661
661
+
&nsid,
662
662
+
)
663
663
+
.to_db_bytes()?,
664
664
+
),
665
665
+
Rollup::Weekly(weekly_cursor) => (
666
666
+
WeeklyRecordsKey::new(weekly_cursor, rolled.records().into(), &nsid)
667
667
+
.to_db_bytes()?,
668
668
+
WeeklyDidsKey::new(
669
669
+
weekly_cursor,
670
670
+
(rolled.dids().estimate() as u64).into(),
671
671
+
&nsid,
672
672
+
)
673
673
+
.to_db_bytes()?,
674
674
+
),
675
675
+
Rollup::AllTime => (
676
676
+
AllTimeRecordsKey::new(rolled.records().into(), &nsid).to_db_bytes()?,
677
677
+
AllTimeDidsKey::new((rolled.dids().estimate() as u64).into(), &nsid)
678
678
+
.to_db_bytes()?,
679
679
+
),
680
680
+
};
681
681
+
if new_rank_records_key_bytes != rank_records_key_bytes {
682
682
+
batch.remove(&self.rollups, &rank_records_key_bytes);
683
683
+
batch.insert(&self.rollups, &new_rank_records_key_bytes, "");
684
684
+
}
685
685
+
if new_rank_dids_key_bytes != rank_dids_key_bytes {
686
686
+
batch.remove(&self.rollups, &rank_dids_key_bytes);
687
687
+
batch.insert(&self.rollups, &new_rank_dids_key_bytes, "");
688
688
+
}
689
689
+
690
690
+
// set the updated rollup
691
691
+
batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
620
692
}
621
693
622
694
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
···
275
275
Ok((rank, 8))
276
276
}
277
277
}
278
278
+
impl From<u64> for KeyRank {
279
279
+
fn from(n: u64) -> Self {
280
280
+
Self(n)
281
281
+
}
282
282
+
}
283
283
+
impl From<KeyRank> for u64 {
284
284
+
fn from(kr: KeyRank) -> Self {
285
285
+
kr.0
286
286
+
}
287
287
+
}
278
288
279
289
pub type BucketedRankRecordsKey<P, C> =
280
290
DbConcat<DbConcat<DbConcat<DbStaticStr<P>, C>, KeyRank>, Nsid>;