Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

count rollups

o dear it seems like it works

+272 -44
+2 -2
ufos/src/db_types.rs
··· 44 44 DecodeTooManyBytes(usize), 45 45 #[error("expected exclusive bound from lsm_tree (likely bug)")] 46 46 BadRangeBound, 47 - #[error("expected an hourly-truncated u64, found remainder: {0}")] 48 - InvalidHourlyTruncated(u64), 47 + #[error("expected a truncated u64 for mod {0}, found remainder: {1}")] 48 + InvalidTruncated(u64, u64), 49 49 } 50 50 51 51 fn bincode_conf() -> impl Config {
+164 -28
ufos/src/storage_fjall.rs
··· 3 3 use crate::error::StorageError; 4 4 use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter}; 5 5 use crate::store_types::{ 6 - ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, 7 - CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, JetstreamCursorKey, 8 - JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, 9 - ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, 10 - NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 11 - RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, RollupCursorKey, 12 - RollupCursorValue, SeenCounter, TakeoffKey, 6 + AllTimeRollupKey, ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, 7 + ByIdKey, ByIdValue, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 8 + HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, 9 + JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, ModCursorKey, ModCursorValue, 10 + ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, NewRollupCursorKey, 11 + NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 12 + RecordLocationMeta, RecordLocationVal, RecordRawValue, RollupCursorKey, RollupCursorValue, 13 + SeenCounter, TakeoffKey, WeekTruncatedCursor, WeeklyRollupKey, 13 14 }; 14 15 use crate::{CommitAction, Did, EventBatch, Nsid, RecordKey, UFOsRecord}; 15 16 use cardinality_estimator::CardinalityEstimator; ··· 37 38 38 39 const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds 39 40 const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 41 + const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 40 42 41 43 #[derive(Clone)] 42 44 struct Db { ··· 90 92 * key: "hourly_counts" || u64 || nullstr (hour, nsid) 91 93 * val: u64 || HLL (count (not cursor), estimator) 92 94 * 93 - * - TODO: weekly rollups? 95 + * - Weekly total record counts and dids estimate per collection 96 + * key: "weekly_counts" || u64 || nullstr (hour, nsid) 97 + * val: u64 || HLL (count (not cursor), estimator) 94 98 * 95 99 * - All-time total record counts and dids estimate per collection 96 100 * key: "ever_counts" || nullstr (nsid) ··· 292 296 } 293 297 294 298 impl FjallWriter { 295 - pub fn step_rollup(&mut self) -> StorageResult<()> { 299 + pub fn step_rollup(&mut self) -> StorageResult<usize> { 296 300 let rollup_cursor = 297 301 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 298 302 StorageError::BadStateError("Could not find current rollup cursor".to_string()), ··· 330 334 }) 331 335 .transpose()?; 332 336 333 - match (timely_next_cursor, next_delete) { 337 + let cursors_stepped = match (timely_next_cursor, next_delete) { 334 338 ( 335 339 Some(timely_next_cursor), 336 340 Some((delete_cursor, delete_key_bytes, delete_val_bytes)), 337 341 ) => { 338 342 if timely_next_cursor < delete_cursor { 339 343 eprintln!("rollup until delete cursor"); 344 + self.rollup_live_counts( 345 + timely_iter, 346 + Some(delete_cursor), 347 + MAX_BATCHED_ROLLUP_COUNTS, 348 + )? 340 349 } else { 341 - self.rollup_delete_account( 342 - delete_cursor, 343 - &delete_key_bytes, 344 - &delete_val_bytes, 345 - )?; 346 350 eprintln!("delete then come back for rollups"); 351 + self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 347 352 } 348 353 } 349 - (Some(timely_next_cursor), None) => { 354 + (Some(_), None) => { 350 355 eprintln!("do as much rollup as we want"); 356 + self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)? 351 357 } 352 358 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 353 359 eprintln!("just delete an account"); 354 - self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?; 360 + self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 355 361 } 356 362 (None, None) => { 357 363 eprintln!("do nothing."); 364 + 0 358 365 } 359 - } 360 - 361 - // TODO: advance the rollup cursor 366 + }; 362 367 363 - // batch.commit()?; 364 - Ok(()) 368 + Ok(cursors_stepped) 365 369 } 366 370 367 371 fn rollup_delete_account( ··· 369 373 cursor: Cursor, 370 374 key_bytes: &[u8], 371 375 val_bytes: &[u8], 372 - ) -> StorageResult<()> { 376 + ) -> StorageResult<usize> { 373 377 let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?; 374 378 self.delete_account(&did)?; 375 379 let mut batch = self.keyspace.batch(); 376 380 batch.remove(&self.queues, key_bytes); 377 381 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor.next())?; 378 382 batch.commit()?; 379 - Ok(()) 383 + Ok(1) 384 + } 385 + 386 + fn rollup_live_counts( 387 + &mut self, 388 + timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>, 389 + cursor_exclusive_limit: Option<Cursor>, 390 + rollup_limit: usize, 391 + ) -> StorageResult<usize> { 392 + // current strategy is to buffer counts in mem before writing the rollups 393 + // we *could* read+write every single batch to rollup.. but their merge is associative so 394 + // ...so save the db some work up front? is this worth it? who knows... 395 + 396 + #[derive(Eq, Hash, PartialEq)] 397 + enum Rollup { 398 + Hourly(HourTruncatedCursor), 399 + Weekly(WeekTruncatedCursor), 400 + AllTime, 401 + } 402 + 403 + let mut batch = self.keyspace.batch(); 404 + let mut cursors_advanced = 0; 405 + let mut last_cursor = Cursor::from_start(); 406 + let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 407 + 408 + for (i, kv) in timelies.enumerate() { 409 + if i >= rollup_limit { 410 + break; 411 + } 412 + 413 + let (key_bytes, val_bytes) = kv?; 414 + let key = db_complete::<LiveCountsKey>(&key_bytes)?; 415 + 416 + if cursor_exclusive_limit 417 + .map(|limit| key.cursor() > limit) 418 + .unwrap_or(false) 419 + { 420 + break; 421 + } 422 + 423 + batch.remove(&self.rollups, key_bytes); 424 + let val = db_complete::<CountsValue>(&val_bytes)?; 425 + counts_by_rollup 426 + .entry(( 427 + key.collection().clone(), 428 + Rollup::Hourly(key.cursor().into()), 429 + )) 430 + .or_default() 431 + .merge(&val); 432 + counts_by_rollup 433 + .entry(( 434 + key.collection().clone(), 435 + Rollup::Weekly(key.cursor().into()), 436 + )) 437 + .or_default() 438 + .merge(&val); 439 + counts_by_rollup 440 + .entry((key.collection().clone(), Rollup::AllTime)) 441 + .or_default() 442 + .merge(&val); 443 + 444 + cursors_advanced += 1; 445 + last_cursor = key.cursor(); 446 + } 447 + 448 + for ((nsid, rollup), counts) in counts_by_rollup { 449 + let key_bytes = match rollup { 450 + Rollup::Hourly(hourly_cursor) => { 451 + HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()? 452 + } 453 + Rollup::Weekly(weekly_cursor) => { 454 + WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()? 455 + } 456 + Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?, 457 + }; 458 + let mut rolled = self 459 + .rollups 460 + .get(&key_bytes)? 461 + .as_deref() 462 + .map(db_complete::<CountsValue>) 463 + .transpose()? 464 + .unwrap_or_default(); 465 + rolled.merge(&counts); 466 + batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?); 467 + } 468 + 469 + insert_batch_static_neu::<NewRollupCursorKey>( 470 + &mut batch, 471 + &self.global, 472 + last_cursor.next(), 473 + )?; 474 + 475 + batch.commit()?; 476 + Ok(cursors_advanced) 380 477 } 381 478 } 382 479 ··· 1630 1727 ); 1631 1728 write.insert_batch(batch.batch)?; 1632 1729 1633 - write.step_rollup()?; 1730 + let n = write.step_rollup()?; 1731 + assert_eq!(n, 1); 1634 1732 1635 1733 let mut batch = TestBatch::default(); 1636 1734 batch.delete_account("did:plc:person-a", 10_001); ··· 1640 1738 read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1641 1739 assert_eq!(records.len(), 1); 1642 1740 1643 - write.step_rollup()?; 1741 + let n = write.step_rollup()?; 1742 + assert_eq!(n, 1); 1644 1743 1645 1744 let records = 1646 1745 read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 1)?; ··· 1650 1749 batch.delete_account("did:plc:person-a", 9_999); 1651 1750 write.insert_batch(batch.batch)?; 1652 1751 1653 - write.step_rollup()?; 1752 + let n = write.step_rollup()?; 1753 + assert_eq!(n, 0); 1754 + 1755 + Ok(()) 1756 + } 1757 + 1758 + #[test] 1759 + fn rollup_multiple_count_batches() -> anyhow::Result<()> { 1760 + let (_read, mut write) = fjall_db(); 1761 + 1762 + let mut batch = TestBatch::default(); 1763 + batch.create( 1764 + "did:plc:person-a", 1765 + "a.a.a", 1766 + "rkey-aaa", 1767 + "{}", 1768 + Some("rev-aaa"), 1769 + None, 1770 + 10_000, 1771 + ); 1772 + write.insert_batch(batch.batch)?; 1773 + 1774 + let mut batch = TestBatch::default(); 1775 + batch.create( 1776 + "did:plc:person-a", 1777 + "a.a.a", 1778 + "rkey-aab", 1779 + "{}", 1780 + Some("rev-aab"), 1781 + None, 1782 + 10_001, 1783 + ); 1784 + write.insert_batch(batch.batch)?; 1785 + 1786 + let n = write.step_rollup()?; 1787 + assert_eq!(n, 2); 1788 + 1789 + let n = write.step_rollup()?; 1790 + assert_eq!(n, 0); 1654 1791 1655 - assert!(false); 1656 1792 Ok(()) 1657 1793 } 1658 1794 }
+106 -14
ufos/src/store_types.rs
··· 253 253 pub fn dids(&self) -> &CardinalityEstimator<Did> { 254 254 &self.suffix.0 255 255 } 256 + pub fn merge(&mut self, other: &Self) { 257 + self.prefix.0 += other.records(); 258 + self.suffix.0.merge(other.dids()); 259 + } 260 + } 261 + impl Default for CountsValue { 262 + fn default() -> Self { 263 + Self { 264 + prefix: TotalRecordsValue(0), 265 + suffix: EstimatedDidsValue(CardinalityEstimator::new()), 266 + } 267 + } 256 268 } 257 269 258 270 #[derive(Debug, PartialEq)] ··· 270 282 } 271 283 } 272 284 pub type DeleteAccountQueueVal = Did; 285 + 286 + #[derive(Debug, PartialEq)] 287 + pub struct _HourlyRollupStaticStr {} 288 + impl StaticStr for _HourlyRollupStaticStr { 289 + fn static_str() -> &'static str { 290 + "hourly_counts" 291 + } 292 + } 293 + pub type HourlyRollupStaticPrefix = DbStaticStr<_HourlyRollupStaticStr>; 294 + pub type HourlyRollupKey = DbConcat<DbConcat<HourlyRollupStaticPrefix, HourTruncatedCursor>, Nsid>; 295 + impl HourlyRollupKey { 296 + pub fn new(hourly_cursor: HourTruncatedCursor, nsid: &Nsid) -> Self { 297 + Self::from_pair( 298 + DbConcat::from_pair(Default::default(), hourly_cursor), 299 + nsid.clone(), 300 + ) 301 + } 302 + } 303 + pub type HourlyRollupVal = CountsValue; 304 + 305 + #[derive(Debug, PartialEq)] 306 + pub struct _WeeklyRollupStaticStr {} 307 + impl StaticStr for _WeeklyRollupStaticStr { 308 + fn static_str() -> &'static str { 309 + "weekly_counts" 310 + } 311 + } 312 + pub type WeeklyRollupStaticPrefix = DbStaticStr<_WeeklyRollupStaticStr>; 313 + pub type WeeklyRollupKey = DbConcat<DbConcat<WeeklyRollupStaticPrefix, WeekTruncatedCursor>, Nsid>; 314 + impl WeeklyRollupKey { 315 + pub fn new(weekly_cursor: WeekTruncatedCursor, nsid: &Nsid) -> Self { 316 + Self::from_pair( 317 + DbConcat::from_pair(Default::default(), weekly_cursor), 318 + nsid.clone(), 319 + ) 320 + } 321 + } 322 + pub type WeeklyRollupVal = CountsValue; 323 + 324 + #[derive(Debug, PartialEq)] 325 + pub struct _AllTimeRollupStaticStr {} 326 + impl StaticStr for _AllTimeRollupStaticStr { 327 + fn static_str() -> &'static str { 328 + "ever_counts" 329 + } 330 + } 331 + pub type AllTimeRollupStaticPrefix = DbStaticStr<_AllTimeRollupStaticStr>; 332 + pub type AllTimeRollupKey = DbConcat<AllTimeRollupStaticPrefix, Nsid>; 333 + impl AllTimeRollupKey { 334 + pub fn new(nsid: &Nsid) -> Self { 335 + Self::from_pair(Default::default(), nsid.clone()) 336 + } 337 + } 338 + pub type AllTimeRollupVal = CountsValue; 339 + 340 + /////////// old stuff, probably: ///////////// 273 341 274 342 #[derive(Debug, Clone, Encode, Decode)] 275 343 pub struct SeenCounter(pub u64); ··· 524 592 } 525 593 } 526 594 527 - const HOUR_IN_MICROS: u64 = 1_000_000 * 3600; 528 - #[derive(Debug, Copy, Clone, PartialEq, PartialOrd)] 529 - pub struct HourTrucatedCursor(u64); 530 - impl HourTrucatedCursor { 595 + #[derive(Debug, Copy, Clone, PartialEq, Hash, PartialOrd, Eq)] 596 + pub struct TruncatedCursor<const MOD: u64>(u64); 597 + impl<const MOD: u64> TruncatedCursor<MOD> { 531 598 fn truncate(raw: u64) -> u64 { 532 - let hours_ts = raw / HOUR_IN_MICROS; 533 - let truncated = hours_ts * HOUR_IN_MICROS; 599 + let floored_ts = raw / MOD; 600 + let truncated = floored_ts * MOD; 534 601 truncated 535 602 } 536 603 pub fn try_from_raw_u64(time_us: u64) -> Result<Self, EncodingError> { 537 - let rem = time_us % HOUR_IN_MICROS; 604 + let rem = time_us % MOD; 538 605 if rem != 0 { 539 - return Err(EncodingError::InvalidHourlyTruncated(rem)); 606 + return Err(EncodingError::InvalidTruncated(MOD, rem)); 540 607 } 541 608 Ok(Self(time_us)) 542 609 } 610 + pub fn try_from_cursor(cursor: Cursor) -> Result<Self, EncodingError> { 611 + Self::try_from_raw_u64(cursor.to_raw_u64()) 612 + } 543 613 pub fn truncate_cursor(cursor: Cursor) -> Self { 544 614 let raw = cursor.to_raw_u64(); 545 615 let truncated = Self::truncate(raw); 546 616 Self(truncated) 547 617 } 548 618 } 549 - impl From<HourTrucatedCursor> for Cursor { 550 - fn from(hour_truncated: HourTrucatedCursor) -> Self { 551 - Cursor::from_raw_u64(hour_truncated.0) 619 + impl<const MOD: u64> From<TruncatedCursor<MOD>> for Cursor { 620 + fn from(truncated: TruncatedCursor<MOD>) -> Self { 621 + Cursor::from_raw_u64(truncated.0) 552 622 } 553 623 } 624 + impl<const MOD: u64> From<Cursor> for TruncatedCursor<MOD> { 625 + fn from(cursor: Cursor) -> Self { 626 + Self::truncate_cursor(cursor) 627 + } 628 + } 629 + impl<const MOD: u64> DbBytes for TruncatedCursor<MOD> { 630 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 631 + let as_cursor: Cursor = (*self).into(); 632 + as_cursor.to_db_bytes() 633 + } 634 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 635 + let (cursor, n) = Cursor::from_db_bytes(bytes)?; 636 + let me = Self::try_from_cursor(cursor)?; 637 + Ok((me, n)) 638 + } 639 + } 640 + 641 + const HOUR_IN_MICROS: u64 = 1_000_000 * 3600; 642 + pub type HourTruncatedCursor = TruncatedCursor<HOUR_IN_MICROS>; 643 + 644 + const WEEK_IN_MICROS: u64 = HOUR_IN_MICROS * 24 * 7; 645 + pub type WeekTruncatedCursor = TruncatedCursor<WEEK_IN_MICROS>; 554 646 555 647 #[cfg(test)] 556 648 mod test { 557 649 use super::{ 558 - ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, HourTrucatedCursor, Nsid, 650 + ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, HourTruncatedCursor, Nsid, 559 651 RecordKey, HOUR_IN_MICROS, 560 652 }; 561 653 use crate::db_types::DbBytes; ··· 596 688 #[test] 597 689 fn test_hour_truncated_cursor() { 598 690 let us = Cursor::from_raw_u64(1_743_778_483_483_895); 599 - let hr = HourTrucatedCursor::truncate_cursor(us); 691 + let hr = HourTruncatedCursor::truncate_cursor(us); 600 692 let back: Cursor = hr.into(); 601 693 assert!(back < us); 602 694 let diff = us.to_raw_u64() - back.to_raw_u64(); ··· 606 698 #[test] 607 699 fn test_hour_truncated_cursor_already_truncated() { 608 700 let us = Cursor::from_raw_u64(1_743_775_200_000_000); 609 - let hr = HourTrucatedCursor::truncate_cursor(us); 701 + let hr = HourTruncatedCursor::truncate_cursor(us); 610 702 let back: Cursor = hr.into(); 611 703 assert_eq!(back, us); 612 704 let diff = us.to_raw_u64() - back.to_raw_u64();