Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

get collection stats by time range

+34 -33
+6 -1
ufos/src/server/mod.rs
··· 218 218 } 219 219 /// Collection stats 220 220 /// 221 - /// Get record statistics for collections during a specific time period 221 + /// Get record statistics for collections during a specific time period. 222 + /// 223 + /// Note: the statistics are "rolled up" into hourly buckets in the background, 224 + /// so the data here can be as stale as that background task is behind. See the 225 + /// meta info endpoint to find out how up-to-date the rollup currently is. (In 226 + /// general it sholud be pretty close to live) 222 227 #[endpoint { 223 228 method = GET, 224 229 path = "/collections/stats"
+28 -32
ufos/src/storage_fjall.rs
··· 718 718 fn get_counts_by_collection( 719 719 &self, 720 720 collection: &Nsid, 721 - _since: HourTruncatedCursor, 722 - _until: Option<HourTruncatedCursor>, 721 + since: HourTruncatedCursor, 722 + until: Option<HourTruncatedCursor>, 723 723 ) -> StorageResult<(u64, u64)> { 724 - // 0. grab a snapshot in case rollups happen while we're working 725 - let instant = self.keyspace.instant(); 726 - let global = self.global.snapshot_at(instant); 727 - let rollups = self.rollups.snapshot_at(instant); 724 + // grab snapshots in case rollups happen while we're working 725 + let rollups = self.rollups.snapshot(); 728 726 729 - // 1. all-time counts 730 - let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?; 731 - let mut total_counts = rollups 732 - .get(&all_time_key)? 733 - .as_deref() 734 - .map(db_complete::<CountsValue>) 735 - .transpose()? 736 - .unwrap_or_default(); 727 + let until = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 728 + let buckets = CursorBucket::buckets_spanning(since, until); 729 + let mut total_counts = CountsValue::default(); 737 730 738 - // 2. live counts that haven't been rolled into all-time yet. 739 - let rollup_cursor = 740 - get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or( 741 - StorageError::BadStateError("Could not find current rollup cursor".to_string()), 742 - )?; 743 - 744 - let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 745 - for kv in rollups.range(full_range) { 746 - let (key_bytes, val_bytes) = kv?; 747 - let key = db_complete::<LiveCountsKey>(&key_bytes)?; 748 - if key.collection() == collection { 749 - let counts = db_complete::<CountsValue>(&val_bytes)?; 750 - total_counts.merge(&counts); 751 - } 731 + for bucket in buckets { 732 + let key = match bucket { 733 + CursorBucket::Hour(t) => HourlyRollupKey::new(t, collection).to_db_bytes()?, 734 + CursorBucket::Week(t) => WeeklyRollupKey::new(t, collection).to_db_bytes()?, 735 + CursorBucket::AllTime => unreachable!(), // TODO: fall back on this if the time span spans the whole dataset? 736 + }; 737 + let count = rollups 738 + .get(&key)? 739 + .as_deref() 740 + .map(db_complete::<CountsValue>) 741 + .transpose()? 742 + .unwrap_or_default(); 743 + total_counts.merge(&count); 752 744 } 745 + 753 746 Ok(( 754 747 total_counts.counts().creates, 755 748 total_counts.dids().estimate() as u64, ··· 1662 1655 100, 1663 1656 ); 1664 1657 write.insert_batch(batch.batch)?; 1658 + write.step_rollup()?; 1665 1659 1666 1660 let (records, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1667 1661 assert_eq!(records, 1); ··· 1836 1830 101, 1837 1831 ); 1838 1832 write.insert_batch(batch.batch)?; 1833 + write.step_rollup()?; 1839 1834 1840 1835 let (records, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1841 1836 assert_eq!(records, 1); ··· 1874 1869 101, 1875 1870 ); 1876 1871 write.insert_batch(batch.batch)?; 1872 + write.step_rollup()?; 1877 1873 1878 1874 let (creates, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1879 1875 assert_eq!(creates, 1); ··· 2196 2192 beginning(), 2197 2193 None, 2198 2194 )?; 2199 - assert_eq!(records, 3); 2200 - assert_eq!(dids, 2); 2195 + assert_eq!(records, 0); 2196 + assert_eq!(dids, 0); 2201 2197 2202 2198 // first batch rolled up 2203 2199 let (n, _) = write.step_rollup()?; ··· 2208 2204 beginning(), 2209 2205 None, 2210 2206 )?; 2211 - assert_eq!(records, 3); 2207 + assert_eq!(records, 2); 2212 2208 assert_eq!(dids, 2); 2213 2209 2214 2210 // delete account rolled up ··· 2220 2216 beginning(), 2221 2217 None, 2222 2218 )?; 2223 - assert_eq!(records, 3); 2219 + assert_eq!(records, 2); 2224 2220 assert_eq!(dids, 2); 2225 2221 2226 2222 // second batch rolled up