alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
some info and stats from storage
author
phil
date
1 year ago
(Apr 9, 2025, 3:14 PM -0400)
commit
207c079e
207c079e579a13d69dc958f95ebf659e48487577
parent
a1f911d6
a1f911d6c808e09711cd26ca5617df92596c5e79
+100
-52
4 changed files
Expand all
Collapse all
Unified
Split
ufos
src
lib.rs
server.rs
storage.rs
storage_fjall.rs
+10
ufos/src/lib.rs
Reviewed
···
11
11
use error::FirehoseEventError;
12
12
use jetstream::events::{CommitEvent, CommitOp, Cursor};
13
13
use jetstream::exports::{Did, Nsid, RecordKey};
14
14
+
use schemars::JsonSchema;
14
15
use serde::Serialize;
15
16
use serde_json::value::RawValue;
16
17
use std::collections::HashMap;
···
206
207
pub fn is_empty(&self) -> bool {
207
208
self.commits_by_nsid.is_empty() && self.account_removes.is_empty()
208
209
}
210
210
+
}
211
211
+
212
212
+
#[derive(Debug, Serialize, JsonSchema)]
213
213
+
pub enum ConsumerInfo {
214
214
+
Jetstream {
215
215
+
endpoint: String,
216
216
+
started_at: u64,
217
217
+
latest_cursor: Option<u64>,
218
218
+
},
209
219
}
210
220
211
221
#[cfg(test)]
-8
ufos/src/server.rs
Reviewed
···
37
37
storage_info: StorageInfo,
38
38
jetstream_endpoint: Option<String>,
39
39
jetstream_cursor: Option<u64>,
40
40
-
mod_cursor: Option<u64>,
41
40
}
42
41
/// Get meta information about UFOs itself
43
42
#[endpoint {
···
67
66
.map_err(failed_to_get("jetstream cursor"))?
68
67
.map(|c| c.to_raw_u64());
69
68
70
70
-
let mod_cursor = storage
71
71
-
.get_mod_cursor()
72
72
-
.await
73
73
-
.map_err(failed_to_get("jetstream cursor"))?
74
74
-
.map(|c| c.to_raw_u64());
75
75
-
76
69
ok_cors(MetaInfo {
77
70
storage_info,
78
71
jetstream_endpoint,
79
72
jetstream_cursor,
80
80
-
mod_cursor,
81
73
})
82
74
}
83
75
+14
-18
ufos/src/storage.rs
Reviewed
···
1
1
// use crate::store_types::CountsValue;
2
2
-
use crate::{error::StorageError, Cursor, EventBatch, UFOsRecord};
2
2
+
use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, UFOsRecord};
3
3
use jetstream::exports::{Did, Nsid};
4
4
+
use schemars::JsonSchema;
5
5
+
use serde::Serialize;
4
6
use std::path::Path;
5
7
6
8
pub type StorageResult<T> = Result<T, StorageError>;
7
9
8
8
-
// #[derive(Debug)]
9
9
-
// pub enum RollupTask {
10
10
-
// CollectionRollup {
11
11
-
// live_counts_key_bytes: Vec<u8>,
12
12
-
// counts: CountsValue,
13
13
-
// },
14
14
-
// DeleteAccount(Did),
15
15
-
// }
16
16
-
17
17
-
pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> {
18
18
-
// TODO: extract this
10
10
+
pub trait StorageWhatever<R, W, C, S>
11
11
+
where
12
12
+
R: StoreReader<S>,
13
13
+
W: StoreWriter,
14
14
+
S: Serialize + JsonSchema,
15
15
+
{
19
16
fn init(
20
17
path: impl AsRef<Path>,
21
18
endpoint: String,
···
35
32
fn trim_collection(&mut self, collection: &Nsid, limit: usize) -> StorageResult<()>;
36
33
37
34
fn delete_account(&mut self, did: &Did) -> StorageResult<usize>;
35
35
+
}
38
36
39
39
-
// fn rollup_tasks(
40
40
-
// &mut self,
41
41
-
// from_cursor: Cursor,
42
42
-
// ) -> impl Iterator<Item = StorageResult<(RollupTask, Cursor)>>;
43
43
-
}
37
37
+
pub trait StoreReader<S>: Clone {
38
38
+
fn get_storage_stats(&self) -> StorageResult<S>;
39
39
+
40
40
+
fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>;
44
41
45
45
-
pub trait StoreReader: Clone {
46
42
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>;
47
43
48
44
fn get_records_by_collections(
+76
-26
ufos/src/storage_fjall.rs
Reviewed
···
10
10
ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, NewRollupCursorKey,
11
11
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
12
12
RecordLocationMeta, RecordLocationVal, RecordRawValue, RollupCursorKey, RollupCursorValue,
13
13
-
SeenCounter, TakeoffKey, WeekTruncatedCursor, WeeklyRollupKey,
13
13
+
SeenCounter, TakeoffKey, TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey,
14
14
};
15
15
-
use crate::{CommitAction, Did, EventBatch, Nsid, RecordKey, UFOsRecord};
15
15
+
use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, RecordKey, UFOsRecord};
16
16
use fjall::{
17
17
Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle,
18
18
};
19
19
use jetstream::events::Cursor;
20
20
+
use schemars::JsonSchema;
21
21
+
use serde::Serialize;
20
22
use std::collections::HashMap;
21
23
use std::path::{Path, PathBuf};
22
24
use std::time::{Duration, Instant, SystemTime};
···
51
53
/// Partion: 'global'
52
54
///
53
55
/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps)
54
54
-
/// key: "js_cursor" (literal)
55
55
-
/// val: u64
56
56
+
/// - key: "js_cursor" (literal)
57
57
+
/// - val: u64
56
58
///
57
59
/// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss)
58
58
-
/// key: "js_endpoint" (literal)
59
59
-
/// val: string (URL of the instance)
60
60
+
/// - key: "js_endpoint" (literal)
61
61
+
/// - val: string (URL of the instance)
60
62
///
61
63
/// - Launch date
62
62
-
/// key: "takeoff" (literal)
63
63
-
/// val: u64 (micros timestamp, not from jetstream for now so not precise)
64
64
+
/// - key: "takeoff" (literal)
65
65
+
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
64
66
///
65
67
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
66
66
-
/// key: "rollup_cursor" (literal)
67
67
-
/// val: u64 (tracks behind js_cursor)
68
68
+
/// - key: "rollup_cursor" (literal)
69
69
+
/// - val: u64 (tracks behind js_cursor)
68
70
///
69
71
///
70
72
/// Partition: 'feed'
71
73
///
72
74
/// - Per-collection list of record references ordered by jetstream cursor
73
73
-
/// key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
74
74
-
/// val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
75
75
+
/// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
76
76
+
/// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
75
77
///
76
78
///
77
79
/// Partition: 'records'
78
80
///
79
81
/// - Actual records by their atproto location
80
80
-
/// key: nullstr || nullstr || nullstr (did, collection, rkey)
81
81
-
/// val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
82
82
+
/// - key: nullstr || nullstr || nullstr (did, collection, rkey)
83
83
+
/// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
82
84
///
83
85
///
84
86
/// Partition: 'rollups'
85
87
///
86
88
/// - Live (batched) records counts and dids estimate per collection
87
87
-
/// key: "live_counts" || u64 || nullstr (js_cursor, nsid)
88
88
-
/// val: u64 || HLL (count (not cursor), estimator)
89
89
+
/// - key: "live_counts" || u64 || nullstr (js_cursor, nsid)
90
90
+
/// - val: u64 || HLL (count (not cursor), estimator)
89
91
///
90
92
/// - Hourly total record counts and dids estimate per collection
91
91
-
/// key: "hourly_counts" || u64 || nullstr (hour, nsid)
92
92
-
/// val: u64 || HLL (count (not cursor), estimator)
93
93
+
/// - key: "hourly_counts" || u64 || nullstr (hour, nsid)
94
94
+
/// - val: u64 || HLL (count (not cursor), estimator)
93
95
///
94
96
/// - Weekly total record counts and dids estimate per collection
95
95
-
/// key: "weekly_counts" || u64 || nullstr (hour, nsid)
96
96
-
/// val: u64 || HLL (count (not cursor), estimator)
97
97
+
/// - key: "weekly_counts" || u64 || nullstr (hour, nsid)
98
98
+
/// - val: u64 || HLL (count (not cursor), estimator)
97
99
///
98
100
/// - All-time total record counts and dids estimate per collection
99
99
-
/// key: "ever_counts" || nullstr (nsid)
100
100
-
/// val: u64 || HLL (count (not cursor), estimator)
101
101
+
/// - key: "ever_counts" || nullstr (nsid)
102
102
+
/// - val: u64 || HLL (count (not cursor), estimator)
101
103
///
102
104
/// - TODO: sorted indexes for all-times?
103
105
///
···
105
107
/// Partition: 'queues'
106
108
///
107
109
/// - Delete account queue
108
108
-
/// key: "delete_acount" || u64 (js_cursor)
109
109
-
/// val: nullstr (did)
110
110
+
/// - key: "delete_acount" || u64 (js_cursor)
111
111
+
/// - val: nullstr (did)
110
112
///
111
113
///
112
114
/// TODO: moderation actions
···
123
125
pub temp: bool,
124
126
}
125
127
126
126
-
impl StorageWhatever<FjallReader, FjallWriter, FjallConfig> for FjallStorage {
128
128
+
#[derive(Debug, Serialize, JsonSchema)]
129
129
+
pub struct FjallStats {
130
130
+
pub keyspace_disk_space: u64,
131
131
+
pub keyspace_journal_count: usize,
132
132
+
pub keyspace_sequence: u64,
133
133
+
pub rollup_cursor: Option<u64>,
134
134
+
}
135
135
+
136
136
+
impl StorageWhatever<FjallReader, FjallWriter, FjallConfig, FjallStats> for FjallStorage {
127
137
fn init(
128
138
path: impl AsRef<Path>,
129
139
endpoint: String,
···
204
214
rollups: PartitionHandle,
205
215
}
206
216
207
207
-
impl StoreReader for FjallReader {
217
217
+
impl StoreReader<FjallStats> for FjallReader {
218
218
+
fn get_storage_stats(&self) -> StorageResult<FjallStats> {
219
219
+
let rollup_cursor =
220
220
+
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
221
221
+
.map(|c| c.to_raw_u64());
222
222
+
223
223
+
Ok(FjallStats {
224
224
+
keyspace_disk_space: self.keyspace.disk_space(),
225
225
+
keyspace_journal_count: self.keyspace.journal_count(),
226
226
+
keyspace_sequence: self.keyspace.instant(),
227
227
+
rollup_cursor,
228
228
+
})
229
229
+
}
230
230
+
231
231
+
fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {
232
232
+
let global = self.global.snapshot();
233
233
+
234
234
+
let endpoint =
235
235
+
get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?
236
236
+
.ok_or(StorageError::BadStateError(
237
237
+
"Could not find jetstream endpoint".to_string(),
238
238
+
))?
239
239
+
.0;
240
240
+
241
241
+
let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)?
242
242
+
.ok_or(StorageError::BadStateError(
243
243
+
"Could not find jetstream takeoff time".to_string(),
244
244
+
))?
245
245
+
.to_raw_u64();
246
246
+
247
247
+
let latest_cursor =
248
248
+
get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?
249
249
+
.map(|c| c.to_raw_u64());
250
250
+
251
251
+
Ok(ConsumerInfo::Jetstream {
252
252
+
endpoint,
253
253
+
started_at,
254
254
+
latest_cursor,
255
255
+
})
256
256
+
}
257
257
+
208
258
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
209
259
// 0. grab a snapshot in case rollups happen while we're working
210
260
let instant = self.keyspace.instant();