Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

update cardinality_estimator_safe

and use sketches with P=14 (0.8% expected error)

+45 -18
+15 -3
Cargo.lock
··· 574 574 575 575 [[package]] 576 576 name = "cardinality-estimator-safe" 577 - version = "2.1.1" 577 + version = "4.0.1" 578 578 source = "registry+https://github.com/rust-lang/crates.io-index" 579 - checksum = "50c14632b90cb42ff2174d2a544ca553c1bccfab54b848ae9ab9e004b90243bf" 579 + checksum = "b41ec0cd313b46ba3b508377544b25aa1d56d05ce9e657e77dfb001d5e726e53" 580 580 dependencies = [ 581 + "digest", 581 582 "enum_dispatch", 582 583 "serde", 583 - "wyhash", 584 584 ] 585 585 586 586 [[package]] ··· 3226 3226 ] 3227 3227 3228 3228 [[package]] 3229 + name = "sha2" 3230 + version = "0.10.9" 3231 + source = "registry+https://github.com/rust-lang/crates.io-index" 3232 + checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" 3233 + dependencies = [ 3234 + "cfg-if", 3235 + "cpufeatures", 3236 + "digest", 3237 + ] 3238 + 3239 + [[package]] 3229 3240 name = "sharded-slab" 3230 3241 version = "0.1.7" 3231 3242 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3816 3827 "semver", 3817 3828 "serde", 3818 3829 "serde_json", 3830 + "sha2", 3819 3831 "tempfile", 3820 3832 "thiserror 2.0.12", 3821 3833 "tikv-jemallocator",
+2 -1
ufos/Cargo.toml
··· 7 7 anyhow = "1.0.97" 8 8 async-trait = "0.1.88" 9 9 bincode = { version = "2.0.1", features = ["serde"] } 10 - cardinality-estimator-safe = { version = "2.1.1", features = ["with_serde"] } 10 + cardinality-estimator-safe = { version = "4.0.1", features = ["with_serde", "with_digest"] } 11 11 clap = { version = "4.5.31", features = ["derive"] } 12 12 dropshot = "0.16.0" 13 13 env_logger = "0.11.7" ··· 20 20 semver = "1.0.26" 21 21 serde = "1.0.219" 22 22 serde_json = "1.0.140" 23 + sha2 = "0.10.9" 23 24 thiserror = "2.0.12" 24 25 tokio = { version = "1.44.2", features = ["full", "sync", "time"] } 25 26
+9 -4
ufos/src/lib.rs
··· 10 10 pub mod store_types; 11 11 12 12 use crate::error::BatchInsertError; 13 - use cardinality_estimator_safe::CardinalityEstimator; 13 + use cardinality_estimator_safe::{Element, Sketch}; 14 14 use error::FirehoseEventError; 15 15 use jetstream::events::{CommitEvent, CommitOp, Cursor}; 16 16 use jetstream::exports::{Did, Nsid, RecordKey}; 17 17 use schemars::JsonSchema; 18 18 use serde::Serialize; 19 19 use serde_json::value::RawValue; 20 + use sha2::Sha256; 20 21 use std::collections::HashMap; 21 22 22 23 #[derive(Debug, Default, Clone)] 23 24 pub struct CollectionCommits<const LIMIT: usize> { 24 25 pub total_seen: usize, 25 - pub dids_estimate: CardinalityEstimator<Did>, 26 + pub dids_estimate: Sketch<14>, 26 27 pub commits: Vec<UFOsCommit>, 27 28 head: usize, 28 29 non_creates: usize, 30 + } 31 + 32 + fn did_element(did: &Did) -> Element<14> { 33 + Element::from_digest_oneshot::<Sha256>(did.as_bytes()) 29 34 } 30 35 31 36 impl<const LIMIT: usize> CollectionCommits<LIMIT> { ··· 66 71 67 72 if is_create { 68 73 self.total_seen += 1; 69 - self.dids_estimate.insert(&did); 74 + self.dids_estimate.insert(did_element(&did)); 70 75 } else { 71 76 self.non_creates += 1; 72 77 } ··· 181 186 self.account_removes.len() 182 187 } 183 188 pub fn estimate_dids(&self) -> usize { 184 - let mut estimator = CardinalityEstimator::<Did>::new(); 189 + let mut estimator = Sketch::<14>::default(); 185 190 for commits in self.commits_by_nsid.values() { 186 191 estimator.merge(&commits.dids_estimate); 187 192 }
+19 -10
ufos/src/store_types.rs
··· 3 3 }; 4 4 use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit}; 5 5 use bincode::{Decode, Encode}; 6 - use cardinality_estimator_safe::CardinalityEstimator; 6 + use cardinality_estimator_safe::Sketch; 7 7 use std::ops::Range; 8 8 9 9 macro_rules! static_str { ··· 199 199 impl UseBincodePlz for TotalRecordsValue {} 200 200 201 201 #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] 202 - pub struct EstimatedDidsValue(pub CardinalityEstimator<Did>); 202 + pub struct EstimatedDidsValue(pub Sketch<14>); 203 203 impl SerdeBytes for EstimatedDidsValue {} 204 204 impl DbBytes for EstimatedDidsValue { 205 205 #[cfg(test)] ··· 223 223 224 224 pub type CountsValue = DbConcat<TotalRecordsValue, EstimatedDidsValue>; 225 225 impl CountsValue { 226 - pub fn new(total: u64, dids: CardinalityEstimator<Did>) -> Self { 226 + pub fn new(total: u64, dids: Sketch<14>) -> Self { 227 227 Self { 228 228 prefix: TotalRecordsValue(total), 229 229 suffix: EstimatedDidsValue(dids), ··· 232 232 pub fn records(&self) -> u64 { 233 233 self.prefix.0 234 234 } 235 - pub fn dids(&self) -> &CardinalityEstimator<Did> { 235 + pub fn dids(&self) -> &Sketch<14> { 236 236 &self.suffix.0 237 237 } 238 238 pub fn merge(&mut self, other: &Self) { ··· 244 244 fn default() -> Self { 245 245 Self { 246 246 prefix: TotalRecordsValue(0), 247 - suffix: EstimatedDidsValue(CardinalityEstimator::new()), 247 + suffix: EstimatedDidsValue(Sketch::<14>::default()), 248 248 } 249 249 } 250 250 } ··· 433 433 #[cfg(test)] 434 434 mod test { 435 435 use super::{ 436 - CardinalityEstimator, CountsValue, Cursor, Did, EncodingError, HourTruncatedCursor, 437 - HourlyRollupKey, Nsid, HOUR_IN_MICROS, 436 + CountsValue, Cursor, Did, EncodingError, HourTruncatedCursor, HourlyRollupKey, Nsid, 437 + Sketch, HOUR_IN_MICROS, 438 438 }; 439 439 use crate::db_types::DbBytes; 440 + use cardinality_estimator_safe::Element; 441 + use sha2::Sha256; 440 442 441 443 #[test] 442 444 fn test_by_hourly_rollup_key() -> Result<(), EncodingError> { ··· 456 458 457 459 #[test] 458 460 fn test_by_hourly_rollup_value() -> Result<(), EncodingError> { 459 - let mut estimator = CardinalityEstimator::new(); 461 + let mut estimator = Sketch::<14>::default(); 462 + fn to_element(d: Did) -> Element<14> { 463 + Element::from_digest_oneshot::<Sha256>(d.to_string().as_bytes()) 464 + } 460 465 for i in 0..10 { 461 - estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap()); 466 + estimator.insert(to_element( 467 + Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap(), 468 + )); 462 469 } 463 470 let original = CountsValue::new(123, estimator.clone()); 464 471 let serialized = original.to_db_bytes()?; ··· 467 474 assert_eq!(bytes_consumed, serialized.len()); 468 475 469 476 for i in 10..1_000 { 470 - estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap()); 477 + estimator.insert(to_element( 478 + Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap(), 479 + )); 471 480 } 472 481 let original = CountsValue::new(123, estimator); 473 482 let serialized = original.to_db_bytes()?;