Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'ufos-stats'

+188 -9
+2 -2
Cargo.lock
··· 737 737 738 738 [[package]] 739 739 name = "cardinality-estimator-safe" 740 - version = "4.0.2" 740 + version = "4.0.3" 741 741 source = "registry+https://github.com/rust-lang/crates.io-index" 742 - checksum = "dc9887b4092040ea9a416fc3de9769ee7783e3cd5c168c941e6a8de69723b971" 742 + checksum = "3879e0b6ebe0bef99874ab3942caee80365d00cf686b93a7cc9c0c9cb3a9f8e7" 743 743 dependencies = [ 744 744 "digest", 745 745 "enum_dispatch",
+5 -1
ufos/Cargo.toml
··· 8 8 async-trait = "0.1.88" 9 9 base64 = "0.22.1" 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 - cardinality-estimator-safe = { version = "4.0.2", features = ["with_serde", "with_digest"] } 11 + cardinality-estimator-safe = { version = "4.0.3", features = ["with_serde", "with_digest"] } 12 12 chrono = { version = "0.4.41", features = ["serde"] } 13 13 clap = { workspace = true } 14 14 dropshot = "0.16.0" ··· 33 33 34 34 [target.'cfg(not(target_env = "msvc"))'.dependencies] 35 35 tikv-jemallocator = "0.6.0" 36 + 37 + [[bin]] 38 + name = "analyze" 39 + path = "src/bin/analyze.rs" 36 40 37 41 [dev-dependencies] 38 42 tempfile = "3.19.1"
+170
ufos/src/bin/analyze.rs
··· 1 + use cardinality_estimator_safe::Sketch; 2 + use chrono::{DateTime, Utc}; 3 + use clap::{Parser, Subcommand}; 4 + use fjall::{Config, PartitionCreateOptions, PartitionHandle}; 5 + use std::collections::BTreeMap; 6 + use std::path::PathBuf; 7 + use ufos::db_types::{db_complete, DbBytes}; 8 + use ufos::store_types::{ 9 + AllTimeRollupKey, AllTimeRollupStaticPrefix, CountsValue, WeekTruncatedCursor, WeeklyRollupKey, 10 + WeeklyRollupStaticPrefix, WithCollection, 11 + }; 12 + 13 + #[derive(Parser)] 14 + #[command(about = "One-off data analysis of ufos rollup data")] 15 + struct Cli { 16 + /// Path to the fjall data directory 17 + data: PathBuf, 18 + #[command(subcommand)] 19 + command: Command, 20 + } 21 + 22 + #[derive(Subcommand)] 23 + enum Command { 24 + /// Total estimated distinct users across all time and every group 25 + TotalUsers, 26 + /// Weekly estimated distinct users (excluding app.bsky.*/chat.bsky.*) 27 + WeeklyUsers, 28 + /// Weekly count of groups with >10 estimated distinct users (excluding app.bsky.*/chat.bsky.*) 29 + WeeklyGroups, 30 + /// Like weekly-groups but with the last NSID segment removed 31 + WeeklyParents, 32 + } 33 + 34 + fn week_label(week: WeekTruncatedCursor) -> String { 35 + let us = week.to_raw_u64(); 36 + let secs = (us / 1_000_000) as i64; 37 + let dt = DateTime::<Utc>::from_timestamp(secs, 0).unwrap(); 38 + dt.format("%Y-%m-%d").to_string() 39 + } 40 + 41 + fn is_excluded(nsid: &str) -> bool { 42 + nsid.starts_with("app.bsky.") || nsid.starts_with("chat.bsky.") 43 + } 44 + 45 + fn parent_prefix(nsid: &str) -> &str { 46 + let Some((pre, _)) = nsid.rsplit_once('.') else { 47 + eprintln!("no segments in nsid? nsid={nsid}"); 48 + return nsid; 49 + }; 50 + pre 51 + } 52 + 53 + fn total_users(rollups: &PartitionHandle) -> anyhow::Result<()> { 54 + eprintln!("scanning all-time rollups..."); 55 + let mut global_sketch = Sketch::<14>::default(); 56 + let mut all_time_count = 0u64; 57 + let prefix_bytes = AllTimeRollupStaticPrefix::default().to_db_bytes()?; 58 + for kv in rollups.prefix(prefix_bytes) { 59 + let (key_bytes, val_bytes) = kv?; 60 + let _key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 61 + let val = db_complete::<CountsValue>(&val_bytes)?; 62 + global_sketch.merge(val.dids()); 63 + all_time_count += 1; 64 + } 65 + println!("groups scanned: {all_time_count}"); 66 + println!("estimated distinct users: {}", global_sketch.estimate()); 67 + Ok(()) 68 + } 69 + 70 + /// Scan weekly rollups once, returning week -> (merged sketch, per-group entries) 71 + /// Only non-excluded groups are included. 72 + #[expect(clippy::type_complexity)] 73 + fn scan_weekly( 74 + rollups: &PartitionHandle, 75 + ) -> anyhow::Result<BTreeMap<u64, (Sketch<14>, Vec<(String, u64)>)>> { 76 + eprintln!("scanning weekly rollups..."); 77 + let mut weekly_data: BTreeMap<u64, (Sketch<14>, Vec<(String, u64)>)> = BTreeMap::new(); 78 + let prefix_bytes = WeeklyRollupStaticPrefix::default().to_db_bytes()?; 79 + let mut scanned = 0u64; 80 + for kv in rollups.prefix(prefix_bytes) { 81 + let (key_bytes, val_bytes) = kv?; 82 + let key = db_complete::<WeeklyRollupKey>(&key_bytes)?; 83 + let val = db_complete::<CountsValue>(&val_bytes)?; 84 + let week_us = key.cursor().to_raw_u64(); 85 + let nsid_str = key.collection().to_string(); 86 + let estimate = val.dids().estimate() as u64; 87 + 88 + let entry = weekly_data 89 + .entry(week_us) 90 + .or_insert_with(|| (Sketch::<14>::default(), Vec::new())); 91 + 92 + if !is_excluded(&nsid_str) { 93 + entry.0.merge(val.dids()); 94 + entry.1.push((nsid_str, estimate)); 95 + } 96 + 97 + scanned += 1; 98 + if scanned.is_multiple_of(500_000) { 99 + eprintln!(" ...scanned {scanned} weekly entries"); 100 + } 101 + } 102 + eprintln!(" total weekly entries scanned: {scanned}"); 103 + Ok(weekly_data) 104 + } 105 + 106 + fn weekly_users(rollups: &PartitionHandle) -> anyhow::Result<()> { 107 + let weekly_data = scan_weekly(rollups)?; 108 + println!("week\test_users"); 109 + for (&week_us, (sketch, _)) in &weekly_data { 110 + let week = WeekTruncatedCursor::try_from_raw_u64(week_us)?; 111 + println!("{}\t{}", week_label(week), sketch.estimate()); 112 + } 113 + Ok(()) 114 + } 115 + 116 + fn weekly_groups(rollups: &PartitionHandle) -> anyhow::Result<()> { 117 + let weekly_data = scan_weekly(rollups)?; 118 + println!("week\tgroups"); 119 + for (&week_us, (_, entries)) in &weekly_data { 120 + let week = WeekTruncatedCursor::try_from_raw_u64(week_us)?; 121 + let count = entries.iter().filter(|(_, est)| *est > 10).count(); 122 + println!("{}\t{}", week_label(week), count); 123 + } 124 + Ok(()) 125 + } 126 + 127 + fn weekly_parents(rollups: &PartitionHandle) -> anyhow::Result<()> { 128 + let weekly_data = scan_weekly(rollups)?; 129 + println!("week\tparents\ttop parent prefixes"); 130 + for (&week_us, (_, entries)) in &weekly_data { 131 + let week = WeekTruncatedCursor::try_from_raw_u64(week_us)?; 132 + let mut parent_counts: BTreeMap<&str, usize> = BTreeMap::new(); 133 + for (nsid, est) in entries { 134 + if *est > 10 { 135 + let parent = parent_prefix(nsid); 136 + *parent_counts.entry(parent).or_default() += 1; 137 + } 138 + } 139 + let total_parents = parent_counts.len(); 140 + let mut sorted: Vec<_> = parent_counts.into_iter().collect(); 141 + sorted.sort_by(|a, b| b.1.cmp(&a.1)); 142 + let top: Vec<String> = sorted 143 + .iter() 144 + .take(5) 145 + .map(|(prefix, count)| format!("{prefix}({count})")) 146 + .collect(); 147 + println!( 148 + "{}\t{}\t{}", 149 + week_label(week), 150 + total_parents, 151 + top.join(", ") 152 + ); 153 + } 154 + Ok(()) 155 + } 156 + 157 + fn main() -> anyhow::Result<()> { 158 + let cli = Cli::parse(); 159 + 160 + eprintln!("opening db at {:?}...", cli.data); 161 + let keyspace = Config::new(&cli.data).open()?; 162 + let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?; 163 + 164 + match cli.command { 165 + Command::TotalUsers => total_users(&rollups), 166 + Command::WeeklyUsers => weekly_users(&rollups), 167 + Command::WeeklyGroups => weekly_groups(&rollups), 168 + Command::WeeklyParents => weekly_parents(&rollups), 169 + } 170 + }
+3 -3
ufos/src/error.rs
··· 28 28 InitError(String), 29 29 #[error("DB seems to be in a bad state: {0}")] 30 30 BadStateError(String), 31 - #[error("Fjall error")] 31 + #[error("Fjall error: {0}")] 32 32 FjallError(#[from] fjall::Error), 33 - #[error("LSM-tree error (from fjall)")] 33 + #[error("LSM-tree error (from fjall): {0}")] 34 34 FjallLsmError(#[from] fjall::LsmError), 35 - #[error("Bytes encoding error")] 35 + #[error("Bytes encoding error: {0}")] 36 36 EncodingError(#[from] EncodingError), 37 37 #[error("If you ever see this, there's a bug in the code. The error was stolen")] 38 38 Stolen,
+8 -3
ufos/src/storage_fjall.rs
··· 1205 1205 1206 1206 let mut dirty_nsids = HashSet::new(); 1207 1207 1208 - #[derive(Eq, Hash, PartialEq)] 1208 + #[derive(Debug, Eq, Hash, PartialEq)] 1209 1209 enum Rollup { 1210 1210 Hourly(HourTruncatedCursor), 1211 1211 Weekly(WeekTruncatedCursor), ··· 1235 1235 dirty_nsids.insert(key.collection().clone()); 1236 1236 1237 1237 batch.remove(&self.rollups, key_bytes); 1238 - let val = db_complete::<CountsValue>(&val_bytes)?; 1238 + let val = db_complete::<CountsValue>(&val_bytes).inspect_err(|e| { 1239 + log::error!("bad CountsValue at {key:?} from rolling up timelies: {e}") 1240 + })?; 1239 1241 counts_by_rollup 1240 1242 .entry(( 1241 1243 key.collection().clone(), ··· 1275 1277 .get(&rollup_key_bytes)? 1276 1278 .as_deref() 1277 1279 .map(db_complete::<CountsValue>) 1278 - .transpose()? 1280 + .transpose() 1281 + .inspect_err(|e| { 1282 + log::error!("bad CountsValue at {nsid:?}/{rollup:?} from counts_by_rollup: {e}") 1283 + })? 1279 1284 .unwrap_or_default(); 1280 1285 1281 1286 // now that we have values, we can know the exising ranks