alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
get top collections by records count
author
phil
date
1 year ago
(May 20, 2025, 11:07 PM -0400)
commit
c47b582e
c47b582e470498f4766f63e4142ad5e1d62a53e0
parent
66b77a23
66b77a23b4cd9a7cae5d734e69e9d17ce3b9915d
+123
-17
6 changed files
Expand all
Collapse all
Unified
Split
ufos
src
lib.rs
server.rs
storage.rs
storage_fjall.rs
storage_mem.rs
store_types.rs
+24
ufos/src/lib.rs
Reviewed
···
249
249
}
250
250
}
251
251
252
252
+
#[derive(Debug)]
253
253
+
pub struct QueryPeriod {
254
254
+
from: Option<Cursor>,
255
255
+
until: Option<Cursor>,
256
256
+
}
257
257
+
impl QueryPeriod {
258
258
+
pub fn all_time() -> Self {
259
259
+
QueryPeriod {
260
260
+
from: None,
261
261
+
until: None,
262
262
+
}
263
263
+
}
264
264
+
pub fn is_all_time(&self) -> bool {
265
265
+
self.from.is_none() && self.until.is_none()
266
266
+
}
267
267
+
}
268
268
+
269
269
+
#[derive(Debug, Serialize, JsonSchema)]
270
270
+
pub struct Count {
271
271
+
thing: String,
272
272
+
records: u64,
273
273
+
dids_estimate: u64,
274
274
+
}
275
275
+
252
276
#[cfg(test)]
253
277
mod tests {
254
278
use super::*;
+17
-1
ufos/src/server.rs
Reviewed
···
1
1
use crate::index_html::INDEX_HTML;
2
2
use crate::storage::StoreReader;
3
3
-
use crate::{ConsumerInfo, Nsid, TopCollections, UFOsRecord};
3
3
+
use crate::{ConsumerInfo, Count, Nsid, QueryPeriod, TopCollections, UFOsRecord};
4
4
use dropshot::endpoint;
5
5
use dropshot::ApiDescription;
6
6
use dropshot::Body;
···
213
213
ok_cors(seen_by_collection)
214
214
}
215
215
216
216
+
/// Get top collections by count
217
217
+
#[endpoint {
218
218
+
method = GET,
219
219
+
path = "/collections/by-count"
220
220
+
}]
221
221
+
async fn get_top_collections_by_count(ctx: RequestContext<Context>) -> OkCorsResponse<Vec<Count>> {
222
222
+
let Context { storage, .. } = ctx.context();
223
223
+
let collections = storage
224
224
+
.get_top_collections_by_count(100, QueryPeriod::all_time())
225
225
+
.await
226
226
+
.map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?;
227
227
+
228
228
+
ok_cors(collections)
229
229
+
}
230
230
+
216
231
/// Get top collections
217
232
///
218
233
/// The format of this API response will be changing soon.
···
244
259
api.register(get_meta_info).unwrap();
245
260
api.register(get_records_by_collections).unwrap();
246
261
api.register(get_records_total_seen).unwrap();
262
262
+
api.register(get_top_collections_by_count).unwrap();
247
263
api.register(get_top_collections).unwrap();
248
264
249
265
let context = Context {
+10
-1
ufos/src/storage.rs
Reviewed
···
1
1
-
use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, TopCollections, UFOsRecord};
1
1
+
use crate::{
2
2
+
error::StorageError, ConsumerInfo, Count, Cursor, EventBatch, QueryPeriod, TopCollections,
3
3
+
UFOsRecord,
4
4
+
};
2
5
use async_trait::async_trait;
3
6
use jetstream::exports::{Did, Nsid};
4
7
use std::collections::HashSet;
···
66
69
async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>;
67
70
68
71
async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>;
72
72
+
73
73
+
async fn get_top_collections_by_count(
74
74
+
&self,
75
75
+
limit: usize,
76
76
+
period: QueryPeriod,
77
77
+
) -> StorageResult<Vec<Count>>;
69
78
70
79
async fn get_top_collections(&self) -> StorageResult<TopCollections>;
71
80
+45
-1
ufos/src/storage_fjall.rs
Reviewed
···
10
10
TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyDidsKey, WeeklyRecordsKey,
11
11
WeeklyRollupKey,
12
12
};
13
13
-
use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord};
13
13
+
use crate::{
14
14
+
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
15
15
+
UFOsRecord,
16
16
+
};
14
17
use async_trait::async_trait;
15
18
use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle};
16
19
use jetstream::events::Cursor;
···
339
342
})
340
343
}
341
344
345
345
+
fn get_top_collections_by_count(
346
346
+
&self,
347
347
+
limit: usize,
348
348
+
period: QueryPeriod,
349
349
+
) -> StorageResult<Vec<Count>> {
350
350
+
Ok(if period.is_all_time() {
351
351
+
let snapshot = self.rollups.snapshot();
352
352
+
let mut out = Vec::with_capacity(limit);
353
353
+
let prefix = AllTimeRecordsKey::from_prefix_to_db_bytes(&Default::default())?;
354
354
+
for kv in snapshot.prefix(prefix).rev().take(limit) {
355
355
+
let (key_bytes, _) = kv?;
356
356
+
let key = db_complete::<AllTimeRecordsKey>(&key_bytes)?;
357
357
+
let rollup_key = AllTimeRollupKey::new(key.collection());
358
358
+
let db_count_bytes = snapshot.get(rollup_key.to_db_bytes()?)?.expect(
359
359
+
"integrity: all-time rank rollup must have corresponding all-time count rollup",
360
360
+
);
361
361
+
let db_counts = db_complete::<CountsValue>(&db_count_bytes)?;
362
362
+
assert_eq!(db_counts.records(), key.records());
363
363
+
out.push(Count {
364
364
+
thing: key.collection().to_string(),
365
365
+
records: db_counts.records(),
366
366
+
dids_estimate: db_counts.dids().estimate() as u64,
367
367
+
});
368
368
+
}
369
369
+
out
370
370
+
} else {
371
371
+
todo!()
372
372
+
})
373
373
+
}
374
374
+
342
375
fn get_top_collections(&self) -> Result<TopCollections, StorageError> {
343
376
// TODO: limit nsid traversal depth
344
377
// TODO: limit nsid traversal breadth
···
479
512
async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
480
513
let s = self.clone();
481
514
tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await?
515
515
+
}
516
516
+
async fn get_top_collections_by_count(
517
517
+
&self,
518
518
+
limit: usize,
519
519
+
period: QueryPeriod,
520
520
+
) -> StorageResult<Vec<Count>> {
521
521
+
let s = self.clone();
522
522
+
tokio::task::spawn_blocking(move || {
523
523
+
FjallReader::get_top_collections_by_count(&s, limit, period)
524
524
+
})
525
525
+
.await?
482
526
}
483
527
async fn get_top_collections(&self) -> Result<TopCollections, StorageError> {
484
528
let s = self.clone();
+14
-7
ufos/src/storage_mem.rs
Reviewed
···
12
12
RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue,
13
13
WeekTruncatedCursor, WeeklyRollupKey,
14
14
};
15
15
-
use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord};
15
15
+
use crate::{
16
16
+
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
17
17
+
UFOsRecord,
18
18
+
};
16
19
use async_trait::async_trait;
17
20
use jetstream::events::Cursor;
18
21
use lsm_tree::range::prefix_to_range;
19
19
-
use std::collections::BTreeMap;
20
20
-
use std::collections::HashMap;
21
21
-
use std::collections::HashSet;
22
22
+
use std::collections::{BTreeMap, HashMap, HashSet};
22
23
use std::path::Path;
23
23
-
use std::sync::Mutex;
24
24
-
use std::sync::RwLock;
24
24
+
use std::sync::{Mutex, RwLock};
25
25
use std::time::SystemTime;
26
26
27
27
const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds
···
584
584
let s = self.clone();
585
585
tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await?
586
586
}
587
587
-
async fn get_top_collections(&self) -> Result<TopCollections, StorageError> {
587
587
+
async fn get_top_collections(&self) -> StorageResult<TopCollections> {
588
588
let s = self.clone();
589
589
tokio::task::spawn_blocking(move || MemReader::get_top_collections(&s)).await?
590
590
+
}
591
591
+
async fn get_top_collections_by_count(
592
592
+
&self,
593
593
+
_: usize,
594
594
+
_: QueryPeriod,
595
595
+
) -> StorageResult<Vec<Count>> {
596
596
+
todo!()
590
597
}
591
598
async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
592
599
let s = self.clone();
+13
-7
ufos/src/store_types.rs
Reviewed
···
287
287
}
288
288
289
289
pub type BucketedRankRecordsKey<P, C> =
290
290
-
DbConcat<DbConcat<DbConcat<DbStaticStr<P>, C>, KeyRank>, Nsid>;
290
290
+
DbConcat<DbConcat<DbStaticStr<P>, C>, DbConcat<KeyRank, Nsid>>;
291
291
impl<P, C> BucketedRankRecordsKey<P, C>
292
292
where
293
293
P: StaticStr + PartialEq + std::fmt::Debug,
···
295
295
{
296
296
pub fn new(cursor: C, rank: KeyRank, nsid: &Nsid) -> Self {
297
297
Self::from_pair(
298
298
-
DbConcat::from_pair(DbConcat::from_pair(Default::default(), cursor), rank),
299
299
-
nsid.clone(),
298
298
+
DbConcat::from_pair(Default::default(), cursor),
299
299
+
DbConcat::from_pair(rank, nsid.clone()),
300
300
)
301
301
}
302
302
pub fn with_rank(&self, new_rank: KeyRank) -> Self {
303
303
-
Self::new(self.prefix.prefix.suffix.clone(), new_rank, &self.suffix)
303
303
+
Self::new(self.prefix.suffix.clone(), new_rank, &self.suffix.suffix)
304
304
}
305
305
}
306
306
···
355
355
}
356
356
pub type AllTimeRollupVal = CountsValue;
357
357
358
358
-
pub type AllTimeRankRecordsKey<P> = DbConcat<DbConcat<DbStaticStr<P>, KeyRank>, Nsid>;
358
358
+
pub type AllTimeRankRecordsKey<P> = DbConcat<DbStaticStr<P>, DbConcat<KeyRank, Nsid>>;
359
359
impl<P> AllTimeRankRecordsKey<P>
360
360
where
361
361
P: StaticStr + PartialEq + std::fmt::Debug,
362
362
{
363
363
pub fn new(rank: KeyRank, nsid: &Nsid) -> Self {
364
364
-
Self::from_pair(DbConcat::from_pair(Default::default(), rank), nsid.clone())
364
364
+
Self::from_pair(Default::default(), DbConcat::from_pair(rank, nsid.clone()))
365
365
}
366
366
pub fn with_rank(&self, new_rank: KeyRank) -> Self {
367
367
-
Self::new(new_rank, &self.suffix)
367
367
+
Self::new(new_rank, &self.suffix.suffix)
368
368
+
}
369
369
+
pub fn records(&self) -> u64 {
370
370
+
self.suffix.prefix.0
371
371
+
}
372
372
+
pub fn collection(&self) -> &Nsid {
373
373
+
&self.suffix.suffix
368
374
}
369
375
}
370
376