Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

get JustCount from collection stats

now feat updates and deletes

+87 -56
+2
ufos/src/lib.rs
··· 286 286 #[derive(Debug, Serialize, JsonSchema)] 287 287 pub struct JustCount { 288 288 creates: u64, 289 + updates: u64, 290 + deletes: u64, 289 291 dids_estimate: u64, 290 292 } 291 293
+4 -15
ufos/src/server/mod.rs
··· 211 211 /// default: now 212 212 until: Option<DateTime<Utc>>, 213 213 } 214 - #[derive(Debug, Serialize, JsonSchema)] 215 - struct TotalCounts { 216 - total_creates: u64, 217 - dids_estimate: u64, 218 - } 219 214 /// Collection stats 220 215 /// 221 216 /// Get record statistics for collections during a specific time period. ··· 232 227 ctx: RequestContext<Context>, 233 228 collections_query: MultiCollectionQuery, 234 229 query: Query<CollectionsStatsQuery>, 235 - ) -> OkCorsResponse<HashMap<String, TotalCounts>> { 230 + ) -> OkCorsResponse<HashMap<String, JustCount>> { 236 231 let Context { storage, .. } = ctx.context(); 237 232 let q = query.into_inner(); 238 233 let collections: HashSet<Nsid> = collections_query.try_into()?; ··· 248 243 let mut seen_by_collection = HashMap::with_capacity(collections.len()); 249 244 250 245 for collection in &collections { 251 - let (total_creates, dids_estimate) = storage 252 - .get_counts_by_collection(collection, since, until) 246 + let counts = storage 247 + .get_collection_counts(collection, since, until) 253 248 .await 254 249 .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 255 250 256 - seen_by_collection.insert( 257 - collection.to_string(), 258 - TotalCounts { 259 - total_creates, 260 - dids_estimate, 261 - }, 262 - ); 251 + seen_by_collection.insert(collection.to_string(), counts); 263 252 } 264 253 265 254 OkCors(seen_by_collection).into()
+4 -4
ufos/src/storage.rs
··· 1 1 use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix}; 2 2 use crate::{ 3 - error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, OrderCollectionsBy, 4 - UFOsRecord, 3 + error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, 4 + OrderCollectionsBy, UFOsRecord, 5 5 }; 6 6 use async_trait::async_trait; 7 7 use jetstream::exports::{Did, Nsid}; ··· 92 92 step: u64, 93 93 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>; 94 94 95 - async fn get_counts_by_collection( 95 + async fn get_collection_counts( 96 96 &self, 97 97 collection: &Nsid, 98 98 since: HourTruncatedCursor, 99 99 until: Option<HourTruncatedCursor>, 100 - ) -> StorageResult<(u64, u64)>; 100 + ) -> StorageResult<JustCount>; 101 101 102 102 async fn get_records_by_collections( 103 103 &self,
+69 -36
ufos/src/storage_fjall.rs
··· 13 13 WEEK_IN_MICROS, 14 14 }; 15 15 use crate::{ 16 - nice_duration, CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, 16 + nice_duration, CommitAction, ConsumerInfo, Did, EventBatch, JustCount, Nsid, NsidCount, 17 17 OrderCollectionsBy, UFOsRecord, 18 18 }; 19 19 use async_trait::async_trait; ··· 715 715 Ok((output_hours, output_series)) 716 716 } 717 717 718 - fn get_counts_by_collection( 718 + fn get_collection_counts( 719 719 &self, 720 720 collection: &Nsid, 721 721 since: HourTruncatedCursor, 722 722 until: Option<HourTruncatedCursor>, 723 - ) -> StorageResult<(u64, u64)> { 723 + ) -> StorageResult<JustCount> { 724 724 // grab snapshots in case rollups happen while we're working 725 725 let rollups = self.rollups.snapshot(); 726 726 ··· 743 743 total_counts.merge(&count); 744 744 } 745 745 746 - Ok(( 747 - total_counts.counts().creates, 748 - total_counts.dids().estimate() as u64, 749 - )) 746 + Ok((&total_counts).into()) 750 747 } 751 748 752 749 fn get_records_by_collections( ··· 838 835 }) 839 836 .await? 840 837 } 841 - async fn get_counts_by_collection( 838 + async fn get_collection_counts( 842 839 &self, 843 840 collection: &Nsid, 844 841 since: HourTruncatedCursor, 845 842 until: Option<HourTruncatedCursor>, 846 - ) -> StorageResult<(u64, u64)> { 843 + ) -> StorageResult<JustCount> { 847 844 let s = self.clone(); 848 845 let collection = collection.clone(); 849 846 tokio::task::spawn_blocking(move || { 850 - FjallReader::get_counts_by_collection(&s, &collection, since, until) 847 + FjallReader::get_collection_counts(&s, &collection, since, until) 851 848 }) 852 849 .await? 853 850 } ··· 1630 1627 fn test_hello() -> anyhow::Result<()> { 1631 1628 let (read, mut write) = fjall_db(); 1632 1629 write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1633 - let (records, dids) = read.get_counts_by_collection( 1630 + let JustCount { 1631 + creates, 1632 + dids_estimate, 1633 + .. 1634 + } = read.get_collection_counts( 1634 1635 &Nsid::new("a.b.c".to_string()).unwrap(), 1635 1636 beginning(), 1636 1637 None, 1637 1638 )?; 1638 - assert_eq!(records, 0); 1639 - assert_eq!(dids, 0); 1639 + assert_eq!(creates, 0); 1640 + assert_eq!(dids_estimate, 0); 1640 1641 Ok(()) 1641 1642 } 1642 1643 ··· 1657 1658 write.insert_batch(batch.batch)?; 1658 1659 write.step_rollup()?; 1659 1660 1660 - let (records, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1661 - assert_eq!(records, 1); 1662 - assert_eq!(dids, 1); 1663 - let (records, dids) = read.get_counts_by_collection( 1661 + let JustCount { 1662 + creates, 1663 + dids_estimate, 1664 + .. 1665 + } = read.get_collection_counts(&collection, beginning(), None)?; 1666 + assert_eq!(creates, 1); 1667 + assert_eq!(dids_estimate, 1); 1668 + let JustCount { 1669 + creates, 1670 + dids_estimate, 1671 + .. 1672 + } = read.get_collection_counts( 1664 1673 &Nsid::new("d.e.f".to_string()).unwrap(), 1665 1674 beginning(), 1666 1675 None, 1667 1676 )?; 1668 - assert_eq!(records, 0); 1669 - assert_eq!(dids, 0); 1677 + assert_eq!(creates, 0); 1678 + assert_eq!(dids_estimate, 0); 1670 1679 1671 1680 let records = read.get_records_by_collections([collection].into(), 2, false)?; 1672 1681 assert_eq!(records.len(), 1); ··· 1832 1841 write.insert_batch(batch.batch)?; 1833 1842 write.step_rollup()?; 1834 1843 1835 - let (records, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1836 - assert_eq!(records, 1); 1837 - assert_eq!(dids, 1); 1844 + let JustCount { 1845 + creates, 1846 + dids_estimate, 1847 + .. 1848 + } = read.get_collection_counts(&collection, beginning(), None)?; 1849 + assert_eq!(creates, 1); 1850 + assert_eq!(dids_estimate, 1); 1838 1851 1839 1852 let records = read.get_records_by_collections([collection].into(), 2, false)?; 1840 1853 assert_eq!(records.len(), 1); ··· 1871 1884 write.insert_batch(batch.batch)?; 1872 1885 write.step_rollup()?; 1873 1886 1874 - let (creates, dids) = read.get_counts_by_collection(&collection, beginning(), None)?; 1887 + let JustCount { 1888 + creates, 1889 + dids_estimate, 1890 + .. 1891 + } = read.get_collection_counts(&collection, beginning(), None)?; 1875 1892 assert_eq!(creates, 1); 1876 - assert_eq!(dids, 1); 1893 + assert_eq!(dids_estimate, 1); 1877 1894 1878 1895 let records = read.get_records_by_collections([collection].into(), 2, false)?; 1879 1896 assert_eq!(records.len(), 0); ··· 2187 2204 write.insert_batch(batch.batch)?; 2188 2205 2189 2206 // before any rollup 2190 - let (records, dids) = read.get_counts_by_collection( 2207 + let JustCount { 2208 + creates, 2209 + dids_estimate, 2210 + .. 2211 + } = read.get_collection_counts( 2191 2212 &Nsid::new("a.a.a".to_string()).unwrap(), 2192 2213 beginning(), 2193 2214 None, 2194 2215 )?; 2195 - assert_eq!(records, 0); 2196 - assert_eq!(dids, 0); 2216 + assert_eq!(creates, 0); 2217 + assert_eq!(dids_estimate, 0); 2197 2218 2198 2219 // first batch rolled up 2199 2220 let (n, _) = write.step_rollup()?; 2200 2221 assert_eq!(n, 1); 2201 2222 2202 - let (records, dids) = read.get_counts_by_collection( 2223 + let JustCount { 2224 + creates, 2225 + dids_estimate, 2226 + .. 2227 + } = read.get_collection_counts( 2203 2228 &Nsid::new("a.a.a".to_string()).unwrap(), 2204 2229 beginning(), 2205 2230 None, 2206 2231 )?; 2207 - assert_eq!(records, 2); 2208 - assert_eq!(dids, 2); 2232 + assert_eq!(creates, 2); 2233 + assert_eq!(dids_estimate, 2); 2209 2234 2210 2235 // delete account rolled up 2211 2236 let (n, _) = write.step_rollup()?; 2212 2237 assert_eq!(n, 1); 2213 2238 2214 - let (records, dids) = read.get_counts_by_collection( 2239 + let JustCount { 2240 + creates, 2241 + dids_estimate, 2242 + .. 2243 + } = read.get_collection_counts( 2215 2244 &Nsid::new("a.a.a".to_string()).unwrap(), 2216 2245 beginning(), 2217 2246 None, 2218 2247 )?; 2219 - assert_eq!(records, 2); 2220 - assert_eq!(dids, 2); 2248 + assert_eq!(creates, 2); 2249 + assert_eq!(dids_estimate, 2); 2221 2250 2222 2251 // second batch rolled up 2223 2252 let (n, _) = write.step_rollup()?; 2224 2253 assert_eq!(n, 1); 2225 2254 2226 - let (records, dids) = read.get_counts_by_collection( 2255 + let JustCount { 2256 + creates, 2257 + dids_estimate, 2258 + .. 2259 + } = read.get_collection_counts( 2227 2260 &Nsid::new("a.a.a".to_string()).unwrap(), 2228 2261 beginning(), 2229 2262 None, 2230 2263 )?; 2231 - assert_eq!(records, 3); 2232 - assert_eq!(dids, 2); 2264 + assert_eq!(creates, 3); 2265 + assert_eq!(dids_estimate, 2); 2233 2266 2234 2267 // no more rollups left 2235 2268 let (n, _) = write.step_rollup()?;
+8 -1
ufos/src/store_types.rs
··· 269 269 } 270 270 impl From<&CountsValue> for JustCount { 271 271 fn from(cv: &CountsValue) -> Self { 272 + let CommitCounts { 273 + creates, 274 + updates, 275 + deletes, 276 + } = cv.counts(); 272 277 Self { 273 - creates: cv.counts().creates, 278 + creates, 279 + updates, 280 + deletes, 274 281 dids_estimate: cv.dids().estimate() as u64, 275 282 } 276 283 }