alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
actually get a record back out
author
phil
date
1 year ago
(Apr 7, 2025, 3:44 PM -0400)
commit
6deb998f
6deb998fd214dec65427ef7bc1a7bddaf240a491
parent
a3598965
a359896581250dec0384f0ff4c0a27cae6faa139
+188
-30
6 changed files
Expand all
Collapse all
Unified
Split
jetstream
src
events.rs
ufos
src
db_types.rs
lib.rs
storage.rs
storage_fjall.rs
store_types.rs
+5
-2
jetstream/src/events.rs
Reviewed
···
6
6
};
7
7
8
8
use chrono::Utc;
9
9
-
use serde::Deserialize;
9
9
+
use serde::{
10
10
+
Deserialize,
11
11
+
Serialize,
12
12
+
};
10
13
use serde_json::value::RawValue;
11
14
12
15
use crate::exports;
13
16
14
17
/// Opaque wrapper for the time_us cursor used by jetstream
15
15
-
#[derive(Deserialize, Debug, Copy, Clone, PartialEq, PartialOrd)]
18
18
+
#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, PartialOrd)]
16
19
pub struct Cursor(u64);
17
20
18
21
#[derive(Debug, Deserialize)]
+11
ufos/src/db_types.rs
Reviewed
···
32
32
UnterminatedString,
33
33
#[error("could not convert from utf8: {0}")]
34
34
NotUtf8(#[from] std::str::Utf8Error),
35
35
+
#[error("could not convert from utf8: {0}")]
36
36
+
NotUtf8String(#[from] std::string::FromUtf8Error),
35
37
#[error("could not get array from slice: {0}")]
36
38
BadSlice(#[from] std::array::TryFromSliceError),
37
39
#[error("wrong static prefix. expected {1:?}, found {0:?}")]
···
200
202
}
201
203
202
204
//////
205
205
+
206
206
+
impl DbBytes for Vec<u8> {
207
207
+
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
208
208
+
Ok(self.to_vec())
209
209
+
}
210
210
+
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
211
211
+
Ok((bytes.to_owned(), bytes.len()))
212
212
+
}
213
213
+
}
203
214
204
215
/// Lexicographic-sort-friendly null-terminating serialization for String
205
216
///
+13
ufos/src/lib.rs
Reviewed
···
10
10
use error::FirehoseEventError;
11
11
use jetstream::events::{CommitEvent, CommitOp, Cursor};
12
12
use jetstream::exports::{Did, Nsid, RecordKey};
13
13
+
use serde::Serialize;
13
14
use serde_json::value::RawValue;
14
15
use std::collections::{HashMap, VecDeque};
15
16
···
54
55
rkey: RecordKey,
55
56
rev: String,
56
57
action: CommitAction,
58
58
+
}
59
59
+
60
60
+
#[derive(Debug, Clone, Serialize)]
61
61
+
pub struct UFOsRecord {
62
62
+
pub cursor: Cursor,
63
63
+
pub did: Did,
64
64
+
pub collection: Nsid,
65
65
+
pub rkey: RecordKey,
66
66
+
pub rev: String,
67
67
+
// TODO: cid?
68
68
+
pub record: Box<RawValue>,
69
69
+
pub is_update: bool,
57
70
}
58
71
59
72
impl UFOsCommit {
+11
-5
ufos/src/storage.rs
Reviewed
···
1
1
-
use crate::{error::StorageError, Cursor, EventBatch};
1
1
+
use crate::{error::StorageError, Cursor, EventBatch, UFOsRecord};
2
2
use jetstream::exports::Nsid;
3
3
use std::path::Path;
4
4
+
5
5
+
pub type StorageResult<T> = Result<T, StorageError>;
4
6
5
7
pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> {
6
8
// TODO: extract this
···
9
11
endpoint: String,
10
12
force_endpoint: bool,
11
13
config: C,
12
12
-
) -> Result<(R, W, Option<Cursor>), StorageError>
14
14
+
) -> StorageResult<(R, W, Option<Cursor>)>
13
15
where
14
16
Self: Sized;
15
17
}
16
18
17
19
pub trait StoreWriter {
18
18
-
fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError>;
20
20
+
fn insert_batch(&mut self, event_batch: EventBatch) -> StorageResult<()>;
19
21
}
20
22
21
23
pub trait StoreReader: Clone {
22
22
-
fn get_counts_by_collection(&self, collection: &Nsid) -> Result<(u64, u64), StorageError>;
23
23
-
// fn get_records_by_collections(&self, collections: &)
24
24
+
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>;
25
25
+
fn get_records_by_collections(
26
26
+
&self,
27
27
+
collections: &[&Nsid],
28
28
+
limit: usize,
29
29
+
) -> StorageResult<Vec<UFOsRecord>>;
24
30
}
+81
-16
ufos/src/storage_fjall.rs
Reviewed
···
1
1
use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr};
2
2
use crate::error::StorageError;
3
3
-
use crate::storage::{StorageWhatever, StoreReader, StoreWriter};
3
3
+
use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter};
4
4
use crate::store_types::{
5
5
ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue,
6
6
CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, JetstreamCursorKey,
7
7
JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey,
8
8
ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue,
9
9
NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
10
10
-
RecordLocationKey, RecordLocationVal, RollupCursorKey, RollupCursorValue, SeenCounter,
11
11
-
TakeoffKey, TakeoffValue,
10
10
+
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, RollupCursorKey,
11
11
+
RollupCursorValue, SeenCounter, TakeoffKey,
12
12
};
13
13
-
use crate::{CommitAction, DeleteAccount, Did, EventBatch, Nsid, RecordKey};
13
13
+
use crate::{CommitAction, DeleteAccount, Did, EventBatch, Nsid, RecordKey, UFOsRecord};
14
14
use cardinality_estimator::CardinalityEstimator;
15
15
use fjall::{
16
16
Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle,
···
123
123
endpoint: String,
124
124
force_endpoint: bool,
125
125
_config: FjallConfig,
126
126
-
) -> Result<(FjallReader, FjallWriter, Option<Cursor>), StorageError> {
126
126
+
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>)> {
127
127
let keyspace = {
128
128
let config = Config::new(path);
129
129
···
197
197
}
198
198
199
199
impl StoreReader for FjallReader {
200
200
-
fn get_counts_by_collection(&self, collection: &Nsid) -> Result<(u64, u64), StorageError> {
200
200
+
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
201
201
// TODO: start from rollup
202
202
let full_range = LiveCountsKey::range_from_cursor(Cursor::from_start())?;
203
203
let mut total = 0;
···
213
213
}
214
214
Ok((total, dids.estimate() as u64))
215
215
}
216
216
+
217
217
+
fn get_records_by_collections(
218
218
+
&self,
219
219
+
collections: &[&Nsid],
220
220
+
limit: usize,
221
221
+
) -> StorageResult<Vec<UFOsRecord>> {
222
222
+
if collections.is_empty() {
223
223
+
return Ok(vec![]);
224
224
+
} else if collections.len() > 1 {
225
225
+
todo!()
226
226
+
}
227
227
+
228
228
+
let collection = collections[0];
229
229
+
230
230
+
let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
231
231
+
let collected = 0;
232
232
+
let mut out = vec![];
233
233
+
for kv in self.feeds.prefix(prefix).rev() {
234
234
+
let (key_bytes, val_bytes) = kv?;
235
235
+
let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
236
236
+
let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
237
237
+
let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
238
238
+
239
239
+
let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else {
240
240
+
// record was deleted (hopefully)
241
241
+
continue;
242
242
+
};
243
243
+
244
244
+
let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
245
245
+
246
246
+
if meta.cursor() != feed_key.cursor() {
247
247
+
// older/different version
248
248
+
continue;
249
249
+
}
250
250
+
if meta.rev != feed_val.rev() {
251
251
+
// weird...
252
252
+
log::warn!("record lookup: cursor match but rev did not...? excluding.");
253
253
+
continue;
254
254
+
}
255
255
+
let Some(raw_value_bytes) = location_val_bytes.get(n..) else {
256
256
+
log::warn!(
257
257
+
"record lookup: found record but could not get bytes to decode the record??"
258
258
+
);
259
259
+
continue;
260
260
+
};
261
261
+
let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?;
262
262
+
out.push(UFOsRecord {
263
263
+
collection: feed_key.collection().clone(),
264
264
+
cursor: feed_key.cursor(),
265
265
+
did: feed_val.did().clone(),
266
266
+
rkey: feed_val.rkey().clone(),
267
267
+
rev: meta.rev.to_string(),
268
268
+
record: rawval.try_into()?,
269
269
+
is_update: meta.is_update,
270
270
+
});
271
271
+
272
272
+
if collected >= limit {
273
273
+
break;
274
274
+
}
275
275
+
}
276
276
+
277
277
+
Ok(out)
278
278
+
}
216
279
}
217
280
218
281
pub struct FjallWriter {
···
225
288
}
226
289
227
290
impl FjallWriter {
228
228
-
pub fn step_rollup(&mut self) -> Result<(), StorageError> {
291
291
+
pub fn step_rollup(&mut self) -> StorageResult<()> {
229
292
// let mut batch = self.keyspace.batch();
230
293
231
294
let rollup_cursor =
···
284
347
}
285
348
286
349
impl StoreWriter for FjallWriter {
287
287
-
fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError> {
350
350
+
fn insert_batch(&mut self, event_batch: EventBatch) -> StorageResult<()> {
288
351
if event_batch.is_empty() {
289
352
return Ok(());
290
353
}
···
669
732
}
670
733
671
734
/// Get a value from a fixed key
672
672
-
fn get_static_neu<K: StaticStr, V: DbBytes>(
673
673
-
global: &PartitionHandle,
674
674
-
) -> Result<Option<V>, StorageError> {
735
735
+
fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> {
675
736
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
676
737
let value = global
677
738
.get(&key_bytes)?
···
695
756
fn insert_static_neu<K: StaticStr>(
696
757
global: &PartitionHandle,
697
758
value: impl DbBytes,
698
698
-
) -> Result<(), StorageError> {
759
759
+
) -> StorageResult<()> {
699
760
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
700
761
let value_bytes = value.to_db_bytes()?;
701
762
global.insert(&key_bytes, &value_bytes)?;
···
1206
1267
assert_eq!(records, 0);
1207
1268
assert_eq!(dids, 0);
1208
1269
1209
1209
-
// let records = read.get_records_by_collections(&vec![collection], 2);
1210
1210
-
// assert_eq!(records.len, 1);
1270
1270
+
let records = read.get_records_by_collections(&vec![&collection], 2)?;
1271
1271
+
assert_eq!(records.len(), 1);
1272
1272
+
let rec = &records[0];
1273
1273
+
assert_eq!(rec.record.get(), "{}");
1274
1274
+
assert_eq!(rec.is_update, false);
1211
1275
1212
1212
-
// let records = read.get_records_by_collections(&vec![&Nsid::new("d.e.f".to_string()).unwrap()], 2);
1213
1213
-
// assert_eq!(records.len, 0);
1276
1276
+
let records =
1277
1277
+
read.get_records_by_collections(&vec![&Nsid::new("d.e.f".to_string()).unwrap()], 2)?;
1278
1278
+
assert_eq!(records.len(), 0);
1214
1279
1215
1280
Ok(())
1216
1281
}
+67
-7
ufos/src/store_types.rs
Reviewed
···
84
84
}
85
85
86
86
pub type NsidRecordFeedKey = DbConcat<Nsid, Cursor>;
87
87
+
impl NsidRecordFeedKey {
88
88
+
pub fn collection(&self) -> &Nsid {
89
89
+
&self.prefix
90
90
+
}
91
91
+
pub fn cursor(&self) -> Cursor {
92
92
+
self.suffix
93
93
+
}
94
94
+
}
87
95
pub type NsidRecordFeedVal = DbConcat<Did, DbConcat<RecordKey, String>>;
96
96
+
impl NsidRecordFeedVal {
97
97
+
pub fn did(&self) -> &Did {
98
98
+
&self.prefix
99
99
+
}
100
100
+
pub fn rkey(&self) -> &RecordKey {
101
101
+
&self.suffix.prefix
102
102
+
}
103
103
+
pub fn rev(&self) -> &str {
104
104
+
&self.suffix.suffix
105
105
+
}
106
106
+
}
88
107
impl From<(&Did, &RecordKey, &str)> for NsidRecordFeedVal {
89
108
fn from((did, rkey, rev): (&Did, &RecordKey, &str)) -> Self {
90
109
Self::from_pair(
···
95
114
}
96
115
97
116
pub type RecordLocationKey = DbConcat<Did, DbConcat<Nsid, RecordKey>>;
117
117
+
impl RecordLocationKey {
118
118
+
pub fn did(&self) -> &Did {
119
119
+
&self.prefix
120
120
+
}
121
121
+
pub fn collection(&self) -> &Nsid {
122
122
+
&self.suffix.prefix
123
123
+
}
124
124
+
pub fn rkey(&self) -> &RecordKey {
125
125
+
&self.suffix.suffix
126
126
+
}
127
127
+
}
98
128
impl From<(&UFOsCommit, &Nsid)> for RecordLocationKey {
99
129
fn from((commit, collection): (&UFOsCommit, &Nsid)) -> Self {
100
130
Self::from_pair(
···
103
133
)
104
134
}
105
135
}
136
136
+
impl From<(&NsidRecordFeedKey, &NsidRecordFeedVal)> for RecordLocationKey {
137
137
+
fn from((key, val): (&NsidRecordFeedKey, &NsidRecordFeedVal)) -> Self {
138
138
+
Self::from_pair(
139
139
+
val.did().clone(),
140
140
+
DbConcat::from_pair(key.collection().clone(), val.rkey().clone()),
141
141
+
)
142
142
+
}
143
143
+
}
144
144
+
106
145
#[derive(Debug, PartialEq, Encode, Decode)]
107
146
pub struct RecordLocationMeta {
108
108
-
pub cursor: u64, // ugh no bincode impl
147
147
+
cursor: u64, // ugh no bincode impl
109
148
pub is_update: bool,
110
149
pub rev: String,
111
150
}
151
151
+
impl RecordLocationMeta {
152
152
+
pub fn cursor(&self) -> Cursor {
153
153
+
Cursor::from_raw_u64(self.cursor)
154
154
+
}
155
155
+
}
112
156
impl UseBincodePlz for RecordLocationMeta {}
113
157
114
114
-
impl DbBytes for Vec<u8> {
115
115
-
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
116
116
-
Ok(self.to_vec())
158
158
+
#[derive(Debug, Clone, PartialEq)]
159
159
+
pub struct RecordRawValue(Vec<u8>);
160
160
+
impl DbBytes for RecordRawValue {
161
161
+
fn to_db_bytes(&self) -> Result<std::vec::Vec<u8>, EncodingError> {
162
162
+
self.0.to_db_bytes()
117
163
}
118
164
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
119
119
-
Ok((bytes.to_owned(), bytes.len()))
165
165
+
let (v, n) = DbBytes::from_db_bytes(bytes)?;
166
166
+
Ok((Self(v), n))
167
167
+
}
168
168
+
}
169
169
+
impl From<Box<serde_json::value::RawValue>> for RecordRawValue {
170
170
+
fn from(v: Box<serde_json::value::RawValue>) -> Self {
171
171
+
Self(v.get().into())
172
172
+
}
173
173
+
}
174
174
+
impl TryFrom<RecordRawValue> for Box<serde_json::value::RawValue> {
175
175
+
type Error = EncodingError;
176
176
+
fn try_from(rrv: RecordRawValue) -> Result<Self, Self::Error> {
177
177
+
let s = String::from_utf8(rrv.0)?;
178
178
+
let rv = serde_json::value::RawValue::from_string(s)?;
179
179
+
Ok(rv)
120
180
}
121
181
}
122
182
123
123
-
pub type RecordLocationVal = DbConcat<RecordLocationMeta, Vec<u8>>;
183
183
+
pub type RecordLocationVal = DbConcat<RecordLocationMeta, RecordRawValue>;
124
184
impl From<(Cursor, &str, PutAction)> for RecordLocationVal {
125
185
fn from((cursor, rev, put): (Cursor, &str, PutAction)) -> Self {
126
186
let meta = RecordLocationMeta {
···
128
188
is_update: put.is_update,
129
189
rev: rev.to_string(),
130
190
};
131
131
-
Self::from_pair(meta, put.record.get().into())
191
191
+
Self::from_pair(meta, put.record.into())
132
192
}
133
193
}
134
194