Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

half a dozen new indices for ranking

could probably get away with either just ranking records (upper bound for DIDs) or DIDs (lower bound for records) but ordering by the other would probably be messy / approximate. these indices should be pretty cheap to store, so doing it is must a matter of overcoming my laziness

to that effect i'm finally adding a macro for the static_str thing, and made the keys generic-ish.

+105 -2
+27 -2
ufos/src/storage_fjall.rs
··· 70 70 /// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 71 71 /// - val: u64 || HLL (count (not cursor), estimator) 72 72 /// 73 + /// 73 74 /// - Hourly total record counts and dids estimate per collection 74 75 /// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 75 76 /// - val: u64 || HLL (count (not cursor), estimator) 76 77 /// 78 + /// - Hourly record count ranking 79 + /// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid) 80 + /// - val: [empty] 81 + /// 82 + /// - Hourly did estimate ranking 83 + /// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid) 84 + /// - val: [empty] 85 + /// 86 + /// 77 87 /// - Weekly total record counts and dids estimate per collection 78 - /// - key: "weekly_counts" || u64 || nullstr (hour, nsid) 88 + /// - key: "weekly_counts" || u64 || nullstr (week, nsid) 79 89 /// - val: u64 || HLL (count (not cursor), estimator) 80 90 /// 91 + /// - Weekly record count ranking 92 + /// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid) 93 + /// - val: [empty] 94 + /// 95 + /// - Weekly did estimate ranking 96 + /// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid) 97 + /// - val: [empty] 98 + /// 99 + /// 81 100 /// - All-time total record counts and dids estimate per collection 82 101 /// - key: "ever_counts" || nullstr (nsid) 83 102 /// - val: u64 || HLL (count (not cursor), estimator) 84 103 /// 85 - /// - TODO: sorted indexes for all-times? 104 + /// - All-time total record record count ranking 105 + /// - key: "ever_rank_records" || u64 || nullstr (count, nsid) 106 + /// - val: [empty] 107 + /// 108 + /// - All-time did estimate ranking 109 + /// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid) 110 + /// - val: [empty] 86 111 /// 87 112 /// 88 113 /// Partition: 'queues'
+78
ufos/src/store_types.rs
··· 6 6 use cardinality_estimator_safe::CardinalityEstimator; 7 7 use std::ops::Range; 8 8 9 + macro_rules! static_str { 10 + ($prefix:expr, $name:ident) => { 11 + #[derive(Debug, PartialEq)] 12 + pub struct $name {} 13 + impl StaticStr for $name { 14 + fn static_str() -> &'static str { 15 + $prefix 16 + } 17 + } 18 + }; 19 + } 20 + 9 21 /// key format: ["js_cursor"] 10 22 #[derive(Debug, PartialEq)] 11 23 pub struct JetstreamCursorKey {} ··· 289 301 } 290 302 pub type DeleteAccountQueueVal = Did; 291 303 304 + /// big-endian encoded u64 for LSM prefix-fiendly key 305 + #[derive(Debug, Clone, Copy, PartialEq)] 306 + pub struct KeyRank(u64); 307 + impl DbBytes for KeyRank { 308 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 309 + Ok(self.0.to_be_bytes().to_vec()) 310 + } 311 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 312 + if bytes.len() < 8 { 313 + return Err(EncodingError::DecodeNotEnoughBytes); 314 + } 315 + let bytes8 = TryInto::<[u8; 8]>::try_into(&bytes[..8])?; 316 + let rank = KeyRank(u64::from_be_bytes(bytes8)); 317 + Ok((rank, 8)) 318 + } 319 + } 320 + 321 + pub type BucketedRankRecordsKey<P, C> = 322 + DbConcat<DbConcat<DbConcat<DbStaticStr<P>, C>, KeyRank>, Nsid>; 323 + impl<P, C> BucketedRankRecordsKey<P, C> 324 + where 325 + P: StaticStr + PartialEq + std::fmt::Debug, 326 + C: DbBytes + PartialEq + std::fmt::Debug, 327 + { 328 + pub fn new(cursor: C, rank: KeyRank, nsid: &Nsid) -> Self { 329 + Self::from_pair( 330 + DbConcat::from_pair(DbConcat::from_pair(Default::default(), cursor), rank), 331 + nsid.clone(), 332 + ) 333 + } 334 + pub fn update_rank(&mut self, new_rank: KeyRank) { 335 + self.prefix.suffix = new_rank; 336 + } 337 + } 338 + 292 339 #[derive(Debug, PartialEq)] 293 340 pub struct _HourlyRollupStaticStr {} 294 341 impl StaticStr for _HourlyRollupStaticStr { ··· 308 355 } 309 356 pub type HourlyRollupVal = CountsValue; 310 357 358 + static_str!("hourly_rank_records", _HourlyRecordsStaticStr); 359 + pub type HourlyRecordsKey = BucketedRankRecordsKey<_HourlyRecordsStaticStr, HourTruncatedCursor>; 360 + 361 + static_str!("hourly_rank_dids", _HourlyDidsStaticStr); 362 + pub type HourlyDidsKey = BucketedRankRecordsKey<_HourlyDidsStaticStr, HourTruncatedCursor>; 363 + 311 364 #[derive(Debug, PartialEq)] 312 365 pub struct _WeeklyRollupStaticStr {} 313 366 impl StaticStr for _WeeklyRollupStaticStr { ··· 327 380 } 328 381 pub type WeeklyRollupVal = CountsValue; 329 382 383 + static_str!("weekly_rank_records", _WeeklyRecordsStaticStr); 384 + pub type WeeklyRecordsKey = BucketedRankRecordsKey<_WeeklyRecordsStaticStr, WeekTruncatedCursor>; 385 + 386 + static_str!("weekly_rank_dids", _WeeklyDidsStaticStr); 387 + pub type WeeklyDidsKey = BucketedRankRecordsKey<_WeeklyDidsStaticStr, WeekTruncatedCursor>; 388 + 330 389 #[derive(Debug, PartialEq)] 331 390 pub struct _AllTimeRollupStaticStr {} 332 391 impl StaticStr for _AllTimeRollupStaticStr { ··· 345 404 } 346 405 } 347 406 pub type AllTimeRollupVal = CountsValue; 407 + 408 + pub type AllTimeRankRecordsKey<P> = DbConcat<DbConcat<DbStaticStr<P>, KeyRank>, Nsid>; 409 + impl<P> AllTimeRankRecordsKey<P> 410 + where 411 + P: StaticStr + PartialEq + std::fmt::Debug, 412 + { 413 + pub fn new(rank: KeyRank, nsid: &Nsid) -> Self { 414 + Self::from_pair(DbConcat::from_pair(Default::default(), rank), nsid.clone()) 415 + } 416 + pub fn update_rank(&mut self, new_rank: KeyRank) { 417 + self.prefix.suffix = new_rank; 418 + } 419 + } 420 + 421 + static_str!("ever_rank_records", _AllTimeRecordsStaticStr); 422 + pub type AllTimeRecordsKey = AllTimeRankRecordsKey<_AllTimeRecordsStaticStr>; 423 + 424 + static_str!("ever_rank_dids", _AllTimeDidsStaticStr); 425 + pub type AllTimeDidsKey = AllTimeRankRecordsKey<_AllTimeDidsStaticStr>; 348 426 349 427 #[derive(Debug, Copy, Clone, PartialEq, Hash, PartialOrd, Eq)] 350 428 pub struct TruncatedCursor<const MOD: u64>(u64);