alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
simplify(??) the rank update stuff
author
phil
date
1 year ago
(May 20, 2025, 1:23 PM -0400)
commit
66b77a23
66b77a23b4cd9a7cae5d734e69e9d17ce3b9915d
parent
a04e8a50
a04e8a50e70205a5834901ead5ccade055cb09c3
+60
-68
2 changed files
Expand all
Collapse all
Unified
Split
ufos
src
storage_fjall.rs
store_types.rs
+55
-63
ufos/src/storage_fjall.rs
Reviewed
···
617
617
.transpose()?
618
618
.unwrap_or_default();
619
619
620
620
-
// now that we have values, we can fetch the ranks
621
621
-
// TODO: save .records() and .estimate() and then do this match only once later
622
622
-
let (rank_records_key_bytes, rank_dids_key_bytes) = match rollup {
623
623
-
Rollup::Hourly(hourly_cursor) => (
624
624
-
HourlyRecordsKey::new(hourly_cursor, rolled.records().into(), &nsid)
625
625
-
.to_db_bytes()?,
626
626
-
HourlyDidsKey::new(
627
627
-
hourly_cursor,
628
628
-
(rolled.dids().estimate() as u64).into(),
629
629
-
&nsid,
630
630
-
)
631
631
-
.to_db_bytes()?,
632
632
-
),
633
633
-
Rollup::Weekly(weekly_cursor) => (
634
634
-
WeeklyRecordsKey::new(weekly_cursor, rolled.records().into(), &nsid)
635
635
-
.to_db_bytes()?,
636
636
-
WeeklyDidsKey::new(
637
637
-
weekly_cursor,
638
638
-
(rolled.dids().estimate() as u64).into(),
639
639
-
&nsid,
640
640
-
)
641
641
-
.to_db_bytes()?,
642
642
-
),
643
643
-
Rollup::AllTime => (
644
644
-
AllTimeRecordsKey::new(rolled.records().into(), &nsid).to_db_bytes()?,
645
645
-
AllTimeDidsKey::new((rolled.dids().estimate() as u64).into(), &nsid)
646
646
-
.to_db_bytes()?,
647
647
-
),
648
648
-
};
620
620
+
// now that we have values, we can know the exising ranks
621
621
+
let before_records_count = rolled.records();
622
622
+
let before_dids_estimate = rolled.dids().estimate() as u64;
649
623
650
624
// update the rollup
651
625
rolled.merge(&counts);
652
626
653
627
// replace rank entries
654
654
-
let (new_rank_records_key_bytes, new_rank_dids_key_bytes) = match rollup {
655
655
-
Rollup::Hourly(hourly_cursor) => (
656
656
-
HourlyRecordsKey::new(hourly_cursor, rolled.records().into(), &nsid)
657
657
-
.to_db_bytes()?,
658
658
-
HourlyDidsKey::new(
659
659
-
hourly_cursor,
660
660
-
(rolled.dids().estimate() as u64).into(),
661
661
-
&nsid,
662
662
-
)
663
663
-
.to_db_bytes()?,
664
664
-
),
665
665
-
Rollup::Weekly(weekly_cursor) => (
666
666
-
WeeklyRecordsKey::new(weekly_cursor, rolled.records().into(), &nsid)
667
667
-
.to_db_bytes()?,
668
668
-
WeeklyDidsKey::new(
669
669
-
weekly_cursor,
670
670
-
(rolled.dids().estimate() as u64).into(),
671
671
-
&nsid,
672
672
-
)
673
673
-
.to_db_bytes()?,
674
674
-
),
675
675
-
Rollup::AllTime => (
676
676
-
AllTimeRecordsKey::new(rolled.records().into(), &nsid).to_db_bytes()?,
677
677
-
AllTimeDidsKey::new((rolled.dids().estimate() as u64).into(), &nsid)
678
678
-
.to_db_bytes()?,
679
679
-
),
628
628
+
let (old_records, new_records, dids) = match rollup {
629
629
+
Rollup::Hourly(hourly_cursor) => {
630
630
+
let old_records =
631
631
+
HourlyRecordsKey::new(hourly_cursor, before_records_count.into(), &nsid);
632
632
+
let new_records = old_records.with_rank(rolled.records().into());
633
633
+
let new_estimate = rolled.dids().estimate() as u64;
634
634
+
let dids = if new_estimate == before_dids_estimate {
635
635
+
None
636
636
+
} else {
637
637
+
let old_dids =
638
638
+
HourlyDidsKey::new(hourly_cursor, before_dids_estimate.into(), &nsid);
639
639
+
let new_dids = old_dids.with_rank(new_estimate.into());
640
640
+
Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?))
641
641
+
};
642
642
+
(old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids)
643
643
+
}
644
644
+
Rollup::Weekly(weekly_cursor) => {
645
645
+
let old_records =
646
646
+
WeeklyRecordsKey::new(weekly_cursor, before_records_count.into(), &nsid);
647
647
+
let new_records = old_records.with_rank(rolled.records().into());
648
648
+
let new_estimate = rolled.dids().estimate() as u64;
649
649
+
let dids = if new_estimate == before_dids_estimate {
650
650
+
None
651
651
+
} else {
652
652
+
let old_dids =
653
653
+
WeeklyDidsKey::new(weekly_cursor, before_dids_estimate.into(), &nsid);
654
654
+
let new_dids = old_dids.with_rank(new_estimate.into());
655
655
+
Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?))
656
656
+
};
657
657
+
(old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids)
658
658
+
}
659
659
+
Rollup::AllTime => {
660
660
+
let old_records = AllTimeRecordsKey::new(before_records_count.into(), &nsid);
661
661
+
let new_records = old_records.with_rank(rolled.records().into());
662
662
+
let new_estimate = rolled.dids().estimate() as u64;
663
663
+
let dids = if new_estimate == before_dids_estimate {
664
664
+
None
665
665
+
} else {
666
666
+
let old_dids = AllTimeDidsKey::new(before_dids_estimate.into(), &nsid);
667
667
+
let new_dids = old_dids.with_rank(new_estimate.into());
668
668
+
Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?))
669
669
+
};
670
670
+
(old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids)
671
671
+
}
680
672
};
681
681
-
if new_rank_records_key_bytes != rank_records_key_bytes {
682
682
-
batch.remove(&self.rollups, &rank_records_key_bytes);
683
683
-
batch.insert(&self.rollups, &new_rank_records_key_bytes, "");
684
684
-
}
685
685
-
if new_rank_dids_key_bytes != rank_dids_key_bytes {
686
686
-
batch.remove(&self.rollups, &rank_dids_key_bytes);
687
687
-
batch.insert(&self.rollups, &new_rank_dids_key_bytes, "");
673
673
+
674
674
+
// replace the ranks
675
675
+
batch.remove(&self.rollups, &old_records);
676
676
+
batch.insert(&self.rollups, &new_records, "");
677
677
+
if let Some((old_dids, new_dids)) = dids {
678
678
+
batch.remove(&self.rollups, &old_dids);
679
679
+
batch.insert(&self.rollups, &new_dids, "");
688
680
}
689
681
690
690
-
// set the updated rollup
682
682
+
// replace the rollup
691
683
batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?);
692
684
}
693
685
+5
-5
ufos/src/store_types.rs
Reviewed
···
291
291
impl<P, C> BucketedRankRecordsKey<P, C>
292
292
where
293
293
P: StaticStr + PartialEq + std::fmt::Debug,
294
294
-
C: DbBytes + PartialEq + std::fmt::Debug,
294
294
+
C: DbBytes + PartialEq + std::fmt::Debug + Clone,
295
295
{
296
296
pub fn new(cursor: C, rank: KeyRank, nsid: &Nsid) -> Self {
297
297
Self::from_pair(
···
299
299
nsid.clone(),
300
300
)
301
301
}
302
302
-
pub fn update_rank(&mut self, new_rank: KeyRank) {
303
303
-
self.prefix.suffix = new_rank;
302
302
+
pub fn with_rank(&self, new_rank: KeyRank) -> Self {
303
303
+
Self::new(self.prefix.prefix.suffix.clone(), new_rank, &self.suffix)
304
304
}
305
305
}
306
306
···
363
363
pub fn new(rank: KeyRank, nsid: &Nsid) -> Self {
364
364
Self::from_pair(DbConcat::from_pair(Default::default(), rank), nsid.clone())
365
365
}
366
366
-
pub fn update_rank(&mut self, new_rank: KeyRank) {
367
367
-
self.prefix.suffix = new_rank;
366
366
+
pub fn with_rank(&self, new_rank: KeyRank) -> Self {
367
367
+
Self::new(new_rank, &self.suffix)
368
368
}
369
369
}
370
370