alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
HLL sketch: secret hash prefix
author
phil
date
1 year ago
(May 23, 2025, 3:50 PM -0400)
commit
0f3c2f87
0f3c2f87d75d57f0376ed28cd0f076966ba787cf
parent
860257c7
860257c71cf565ba866442fddc33ce22255c83d3
+244
-137
11 changed files
Expand all
Collapse all
Unified
Split
Cargo.lock
ufos
Cargo.toml
src
consumer.rs
db_types.rs
file_consumer.rs
lib.rs
main.rs
storage.rs
storage_fjall.rs
storage_mem.rs
store_types.rs
+10
-10
Cargo.lock
Reviewed
···
1360
1360
1361
1361
[[package]]
1362
1362
name = "getrandom"
1363
1363
-
version = "0.3.2"
1363
1363
+
version = "0.3.3"
1364
1364
source = "registry+https://github.com/rust-lang/crates.io-index"
1365
1365
-
checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0"
1365
1365
+
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
1366
1366
dependencies = [
1367
1367
"cfg-if",
1368
1368
"libc",
···
1909
1909
source = "registry+https://github.com/rust-lang/crates.io-index"
1910
1910
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
1911
1911
dependencies = [
1912
1912
-
"getrandom 0.3.2",
1912
1912
+
"getrandom 0.3.3",
1913
1913
"libc",
1914
1914
]
1915
1915
···
2697
2697
2698
2698
[[package]]
2699
2699
name = "rand"
2700
2700
-
version = "0.9.0"
2700
2700
+
version = "0.9.1"
2701
2701
source = "registry+https://github.com/rust-lang/crates.io-index"
2702
2702
-
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
2702
2702
+
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
2703
2703
dependencies = [
2704
2704
"rand_chacha 0.9.0",
2705
2705
"rand_core 0.9.3",
2706
2706
-
"zerocopy 0.8.24",
2707
2706
]
2708
2707
2709
2708
[[package]]
···
2741
2740
source = "registry+https://github.com/rust-lang/crates.io-index"
2742
2741
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
2743
2742
dependencies = [
2744
2744
-
"getrandom 0.3.2",
2743
2743
+
"getrandom 0.3.3",
2745
2744
]
2746
2745
2747
2746
[[package]]
···
3426
3425
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
3427
3426
dependencies = [
3428
3427
"fastrand",
3429
3429
-
"getrandom 0.3.2",
3428
3428
+
"getrandom 0.3.3",
3430
3429
"once_cell",
3431
3430
"rustix 1.0.5",
3432
3431
"windows-sys 0.59.0",
···
3794
3793
"httparse",
3795
3794
"log",
3796
3795
"native-tls",
3797
3797
-
"rand 0.9.0",
3796
3796
+
"rand 0.9.1",
3798
3797
"sha1",
3799
3798
"thiserror 2.0.12",
3800
3799
"url",
···
3819
3818
"dropshot",
3820
3819
"env_logger",
3821
3820
"fjall",
3821
3821
+
"getrandom 0.3.3",
3822
3822
"http",
3823
3823
"jetstream",
3824
3824
"log",
···
3917
3917
source = "registry+https://github.com/rust-lang/crates.io-index"
3918
3918
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
3919
3919
dependencies = [
3920
3920
-
"getrandom 0.3.2",
3920
3920
+
"getrandom 0.3.3",
3921
3921
"serde",
3922
3922
]
3923
3923
+1
ufos/Cargo.toml
Reviewed
···
12
12
dropshot = "0.16.0"
13
13
env_logger = "0.11.7"
14
14
fjall = { version = "2.8.0", features = ["lz4"] }
15
15
+
getrandom = "0.3.3"
15
16
http = "1.3.1"
16
17
jetstream = { path = "../jetstream" }
17
18
log = "0.4.26"
+12
-2
ufos/src/consumer.rs
Reviewed
···
1
1
+
use crate::store_types::SketchSecretPrefix;
1
2
use jetstream::{
2
3
events::{Cursor, EventKind, JetstreamEvent},
3
4
exports::{Did, Nsid},
···
32
33
jetstream_receiver: JetstreamReceiver,
33
34
batch_sender: Sender<LimitedBatch>,
34
35
current_batch: CurrentBatch,
36
36
+
sketch_secret: SketchSecretPrefix,
35
37
}
36
38
37
39
pub async fn consume(
38
40
jetstream_endpoint: &str,
39
41
cursor: Option<Cursor>,
40
42
no_compress: bool,
43
43
+
sketch_secret: SketchSecretPrefix,
41
44
) -> anyhow::Result<Receiver<LimitedBatch>> {
42
45
let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
43
46
if endpoint == jetstream_endpoint {
···
60
63
.connect_cursor(cursor)
61
64
.await?;
62
65
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
63
63
-
let mut batcher = Batcher::new(jetstream_receiver, batch_sender);
66
66
+
let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret);
64
67
tokio::task::spawn(async move { batcher.run().await });
65
68
Ok(batch_reciever)
66
69
}
67
70
68
71
impl Batcher {
69
69
-
pub fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<LimitedBatch>) -> Self {
72
72
+
pub fn new(
73
73
+
jetstream_receiver: JetstreamReceiver,
74
74
+
batch_sender: Sender<LimitedBatch>,
75
75
+
sketch_secret: SketchSecretPrefix,
76
76
+
) -> Self {
70
77
Self {
71
78
jetstream_receiver,
72
79
batch_sender,
73
80
current_batch: Default::default(),
81
81
+
sketch_secret,
74
82
}
75
83
}
76
84
···
129
137
&collection,
130
138
commit,
131
139
MAX_BATCHED_COLLECTIONS,
140
140
+
&self.sketch_secret,
132
141
);
133
142
134
143
if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
···
137
146
&collection,
138
147
commit,
139
148
MAX_BATCHED_COLLECTIONS,
149
149
+
&self.sketch_secret,
140
150
)?;
141
151
} else {
142
152
optimistic_res?;
+2
ufos/src/db_types.rs
Reviewed
···
224
224
225
225
//////
226
226
227
227
+
impl<const N: usize> UseBincodePlz for [u8; N] {}
228
228
+
227
229
impl DbBytes for Vec<u8> {
228
230
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
229
231
Ok(self.to_vec())
+6
-2
ufos/src/file_consumer.rs
Reviewed
···
1
1
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
2
2
+
use crate::store_types::SketchSecretPrefix;
2
3
use anyhow::Result;
3
4
use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
4
5
use std::path::PathBuf;
···
21
22
Ok(())
22
23
}
23
24
24
24
-
pub async fn consume(p: PathBuf) -> Result<Receiver<LimitedBatch>> {
25
25
+
pub async fn consume(
26
26
+
p: PathBuf,
27
27
+
sketch_secret: SketchSecretPrefix,
28
28
+
) -> Result<Receiver<LimitedBatch>> {
25
29
let f = File::open(p).await?;
26
30
let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
27
31
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
28
28
-
let mut batcher = Batcher::new(jsonl_receiver, batch_sender);
32
32
+
let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret);
29
33
tokio::task::spawn(async move { read_jsonl(f, jsonl_sender).await });
30
34
tokio::task::spawn(async move { batcher.run().await });
31
35
Ok(batch_reciever)
+131
-95
ufos/src/lib.rs
Reviewed
···
10
10
pub mod store_types;
11
11
12
12
use crate::error::BatchInsertError;
13
13
+
use crate::store_types::SketchSecretPrefix;
13
14
use cardinality_estimator_safe::{Element, Sketch};
14
15
use error::FirehoseEventError;
15
16
use jetstream::events::{CommitEvent, CommitOp, Cursor};
···
20
21
use sha2::Sha256;
21
22
use std::collections::HashMap;
22
23
24
24
+
fn did_element(sketch_secret: &SketchSecretPrefix, did: &Did) -> Element<14> {
25
25
+
Element::from_digest_with_prefix::<Sha256>(sketch_secret, did.as_bytes())
26
26
+
}
27
27
+
23
28
#[derive(Debug, Default, Clone)]
24
29
pub struct CollectionCommits<const LIMIT: usize> {
25
30
pub total_seen: usize,
···
29
34
non_creates: usize,
30
35
}
31
36
32
32
-
fn did_element(did: &Did) -> Element<14> {
33
33
-
Element::from_digest_oneshot::<Sha256>(did.as_bytes())
34
34
-
}
35
35
-
36
37
impl<const LIMIT: usize> CollectionCommits<LIMIT> {
37
38
fn advance_head(&mut self) {
38
39
self.head += 1;
···
40
41
self.head = 0;
41
42
}
42
43
}
43
43
-
pub fn truncating_insert(&mut self, commit: UFOsCommit) -> Result<(), BatchInsertError> {
44
44
+
pub fn truncating_insert(
45
45
+
&mut self,
46
46
+
commit: UFOsCommit,
47
47
+
sketch_secret: &SketchSecretPrefix,
48
48
+
) -> Result<(), BatchInsertError> {
44
49
if self.non_creates == LIMIT {
45
50
return Err(BatchInsertError::BatchFull(commit));
46
51
}
···
71
76
72
77
if is_create {
73
78
self.total_seen += 1;
74
74
-
self.dids_estimate.insert(did_element(&did));
79
79
+
self.dids_estimate.insert(did_element(sketch_secret, &did));
75
80
} else {
76
81
self.non_creates += 1;
77
82
}
···
163
168
collection: &Nsid,
164
169
commit: UFOsCommit,
165
170
max_collections: usize,
171
171
+
sketch_secret: &SketchSecretPrefix,
166
172
) -> Result<(), BatchInsertError> {
167
173
let map = &mut self.commits_by_nsid;
168
174
if !map.contains_key(collection) && map.len() >= max_collections {
···
170
176
}
171
177
map.entry(collection.clone())
172
178
.or_default()
173
173
-
.truncating_insert(commit)?;
179
179
+
.truncating_insert(commit, sketch_secret)?;
174
180
Ok(())
175
181
}
176
182
pub fn total_records(&self) -> usize {
···
313
319
fn test_truncating_insert_truncates() -> anyhow::Result<()> {
314
320
let mut commits: CollectionCommits<2> = Default::default();
315
321
316
316
-
commits.truncating_insert(UFOsCommit {
317
317
-
cursor: Cursor::from_raw_u64(100),
318
318
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
319
319
-
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
320
320
-
rev: "rev-asdf".to_string(),
321
321
-
action: CommitAction::Put(PutAction {
322
322
-
record: RawValue::from_string("{}".to_string())?,
323
323
-
is_update: false,
324
324
-
}),
325
325
-
})?;
322
322
+
commits.truncating_insert(
323
323
+
UFOsCommit {
324
324
+
cursor: Cursor::from_raw_u64(100),
325
325
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
326
326
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
327
327
+
rev: "rev-asdf".to_string(),
328
328
+
action: CommitAction::Put(PutAction {
329
329
+
record: RawValue::from_string("{}".to_string())?,
330
330
+
is_update: false,
331
331
+
}),
332
332
+
},
333
333
+
&[0u8; 16],
334
334
+
)?;
326
335
327
327
-
commits.truncating_insert(UFOsCommit {
328
328
-
cursor: Cursor::from_raw_u64(101),
329
329
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
330
330
-
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
331
331
-
rev: "rev-asdg".to_string(),
332
332
-
action: CommitAction::Put(PutAction {
333
333
-
record: RawValue::from_string("{}".to_string())?,
334
334
-
is_update: false,
335
335
-
}),
336
336
-
})?;
336
336
+
commits.truncating_insert(
337
337
+
UFOsCommit {
338
338
+
cursor: Cursor::from_raw_u64(101),
339
339
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
340
340
+
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
341
341
+
rev: "rev-asdg".to_string(),
342
342
+
action: CommitAction::Put(PutAction {
343
343
+
record: RawValue::from_string("{}".to_string())?,
344
344
+
is_update: false,
345
345
+
}),
346
346
+
},
347
347
+
&[0u8; 16],
348
348
+
)?;
337
349
338
338
-
commits.truncating_insert(UFOsCommit {
339
339
-
cursor: Cursor::from_raw_u64(102),
340
340
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
341
341
-
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
342
342
-
rev: "rev-asdh".to_string(),
343
343
-
action: CommitAction::Put(PutAction {
344
344
-
record: RawValue::from_string("{}".to_string())?,
345
345
-
is_update: false,
346
346
-
}),
347
347
-
})?;
350
350
+
commits.truncating_insert(
351
351
+
UFOsCommit {
352
352
+
cursor: Cursor::from_raw_u64(102),
353
353
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
354
354
+
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
355
355
+
rev: "rev-asdh".to_string(),
356
356
+
action: CommitAction::Put(PutAction {
357
357
+
record: RawValue::from_string("{}".to_string())?,
358
358
+
is_update: false,
359
359
+
}),
360
360
+
},
361
361
+
&[0u8; 16],
362
362
+
)?;
348
363
349
364
assert_eq!(commits.total_seen, 3);
350
365
assert_eq!(commits.dids_estimate.estimate(), 1);
···
373
388
fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> {
374
389
let mut commits: CollectionCommits<2> = Default::default();
375
390
376
376
-
commits.truncating_insert(UFOsCommit {
377
377
-
cursor: Cursor::from_raw_u64(100),
378
378
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
379
379
-
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
380
380
-
rev: "rev-asdf".to_string(),
381
381
-
action: CommitAction::Cut,
382
382
-
})?;
391
391
+
commits.truncating_insert(
392
392
+
UFOsCommit {
393
393
+
cursor: Cursor::from_raw_u64(100),
394
394
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
395
395
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
396
396
+
rev: "rev-asdf".to_string(),
397
397
+
action: CommitAction::Cut,
398
398
+
},
399
399
+
&[0u8; 16],
400
400
+
)?;
383
401
384
384
-
commits.truncating_insert(UFOsCommit {
385
385
-
cursor: Cursor::from_raw_u64(101),
386
386
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
387
387
-
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
388
388
-
rev: "rev-asdg".to_string(),
389
389
-
action: CommitAction::Put(PutAction {
390
390
-
record: RawValue::from_string("{}".to_string())?,
391
391
-
is_update: false,
392
392
-
}),
393
393
-
})?;
402
402
+
commits.truncating_insert(
403
403
+
UFOsCommit {
404
404
+
cursor: Cursor::from_raw_u64(101),
405
405
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
406
406
+
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
407
407
+
rev: "rev-asdg".to_string(),
408
408
+
action: CommitAction::Put(PutAction {
409
409
+
record: RawValue::from_string("{}".to_string())?,
410
410
+
is_update: false,
411
411
+
}),
412
412
+
},
413
413
+
&[0u8; 16],
414
414
+
)?;
394
415
395
395
-
commits.truncating_insert(UFOsCommit {
396
396
-
cursor: Cursor::from_raw_u64(102),
397
397
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
398
398
-
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
399
399
-
rev: "rev-asdh".to_string(),
400
400
-
action: CommitAction::Put(PutAction {
401
401
-
record: RawValue::from_string("{}".to_string())?,
402
402
-
is_update: false,
403
403
-
}),
404
404
-
})?;
416
416
+
commits.truncating_insert(
417
417
+
UFOsCommit {
418
418
+
cursor: Cursor::from_raw_u64(102),
419
419
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
420
420
+
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
421
421
+
rev: "rev-asdh".to_string(),
422
422
+
action: CommitAction::Put(PutAction {
423
423
+
record: RawValue::from_string("{}".to_string())?,
424
424
+
is_update: false,
425
425
+
}),
426
426
+
},
427
427
+
&[0u8; 16],
428
428
+
)?;
405
429
406
430
assert_eq!(commits.total_seen, 2);
407
431
assert_eq!(commits.dids_estimate.estimate(), 1);
···
436
460
let mut commits: CollectionCommits<2> = Default::default();
437
461
438
462
commits
439
439
-
.truncating_insert(UFOsCommit {
440
440
-
cursor: Cursor::from_raw_u64(100),
441
441
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
442
442
-
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
443
443
-
rev: "rev-asdf".to_string(),
444
444
-
action: CommitAction::Cut,
445
445
-
})
463
463
+
.truncating_insert(
464
464
+
UFOsCommit {
465
465
+
cursor: Cursor::from_raw_u64(100),
466
466
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
467
467
+
rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(),
468
468
+
rev: "rev-asdf".to_string(),
469
469
+
action: CommitAction::Cut,
470
470
+
},
471
471
+
&[0u8; 16],
472
472
+
)
446
473
.unwrap();
447
474
448
475
// this create will just be discarded
449
476
commits
450
450
-
.truncating_insert(UFOsCommit {
451
451
-
cursor: Cursor::from_raw_u64(80),
452
452
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
453
453
-
rkey: RecordKey::new("rkey-asdf-zzz".to_string()).unwrap(),
454
454
-
rev: "rev-asdzzz".to_string(),
455
455
-
action: CommitAction::Put(PutAction {
456
456
-
record: RawValue::from_string("{}".to_string())?,
457
457
-
is_update: false,
458
458
-
}),
459
459
-
})
477
477
+
.truncating_insert(
478
478
+
UFOsCommit {
479
479
+
cursor: Cursor::from_raw_u64(80),
480
480
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
481
481
+
rkey: RecordKey::new("rkey-asdf-zzz".to_string()).unwrap(),
482
482
+
rev: "rev-asdzzz".to_string(),
483
483
+
action: CommitAction::Put(PutAction {
484
484
+
record: RawValue::from_string("{}".to_string())?,
485
485
+
is_update: false,
486
486
+
}),
487
487
+
},
488
488
+
&[0u8; 16],
489
489
+
)
460
490
.unwrap();
461
491
462
492
commits
463
463
-
.truncating_insert(UFOsCommit {
464
464
-
cursor: Cursor::from_raw_u64(101),
465
465
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
466
466
-
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
467
467
-
rev: "rev-asdg".to_string(),
468
468
-
action: CommitAction::Cut,
469
469
-
})
493
493
+
.truncating_insert(
494
494
+
UFOsCommit {
495
495
+
cursor: Cursor::from_raw_u64(101),
496
496
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
497
497
+
rkey: RecordKey::new("rkey-asdf-b".to_string()).unwrap(),
498
498
+
rev: "rev-asdg".to_string(),
499
499
+
action: CommitAction::Cut,
500
500
+
},
501
501
+
&[0u8; 16],
502
502
+
)
470
503
.unwrap();
471
504
472
472
-
let res = commits.truncating_insert(UFOsCommit {
473
473
-
cursor: Cursor::from_raw_u64(102),
474
474
-
did: Did::new("did:plc:whatever".to_string()).unwrap(),
475
475
-
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
476
476
-
rev: "rev-asdh".to_string(),
477
477
-
action: CommitAction::Cut,
478
478
-
});
505
505
+
let res = commits.truncating_insert(
506
506
+
UFOsCommit {
507
507
+
cursor: Cursor::from_raw_u64(102),
508
508
+
did: Did::new("did:plc:whatever".to_string()).unwrap(),
509
509
+
rkey: RecordKey::new("rkey-asdf-c".to_string()).unwrap(),
510
510
+
rev: "rev-asdh".to_string(),
511
511
+
action: CommitAction::Cut,
512
512
+
},
513
513
+
&[0u8; 16],
514
514
+
);
479
515
480
516
assert!(res.is_err());
481
517
let overflowed = match res {
+8
-4
ufos/src/main.rs
Reviewed
···
7
7
use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
8
8
use ufos::storage_fjall::FjallStorage;
9
9
use ufos::storage_mem::MemStorage;
10
10
+
use ufos::store_types::SketchSecretPrefix;
10
11
11
12
#[cfg(not(target_env = "msvc"))]
12
13
use tikv_jemallocator::Jemalloc;
···
57
58
let args = Args::parse();
58
59
let jetstream = args.jetstream.clone();
59
60
if args.in_mem {
60
60
-
let (read_store, write_store, cursor) = MemStorage::init(
61
61
+
let (read_store, write_store, cursor, sketch_secret) = MemStorage::init(
61
62
args.data,
62
63
jetstream,
63
64
args.jetstream_force,
···
70
71
read_store,
71
72
write_store,
72
73
cursor,
74
74
+
sketch_secret,
73
75
)
74
76
.await?;
75
77
} else {
76
76
-
let (read_store, write_store, cursor) = FjallStorage::init(
78
78
+
let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
77
79
args.data,
78
80
jetstream,
79
81
args.jetstream_force,
···
86
88
read_store,
87
89
write_store,
88
90
cursor,
91
91
+
sketch_secret,
89
92
)
90
93
.await?;
91
94
}
···
100
103
read_store: impl StoreReader + 'static,
101
104
mut write_store: impl StoreWriter<B> + 'static,
102
105
cursor: Option<Cursor>,
106
106
+
sketch_secret: SketchSecretPrefix,
103
107
) -> anyhow::Result<()> {
104
108
println!("starting server with storage...");
105
109
let serving = server::serve(read_store);
···
112
116
113
117
let batches = if jetstream_fixture {
114
118
log::info!("starting with jestream file fixture: {jetstream:?}");
115
115
-
file_consumer::consume(jetstream.into()).await?
119
119
+
file_consumer::consume(jetstream.into(), sketch_secret).await?
116
120
} else {
117
121
log::info!(
118
122
"starting consumer with cursor: {cursor:?} from {:?} ago",
119
123
cursor.map(|c| c.elapsed())
120
124
);
121
121
-
consumer::consume(&jetstream, cursor, false).await?
125
125
+
consumer::consume(&jetstream, cursor, false, sketch_secret).await?
122
126
};
123
127
124
128
let rolling = write_store.background_tasks()?.run();
+2
-1
ufos/src/storage.rs
Reviewed
···
1
1
+
use crate::store_types::SketchSecretPrefix;
1
2
use crate::{
2
3
error::StorageError, ConsumerInfo, Count, Cursor, EventBatch, QueryPeriod, TopCollections,
3
4
UFOsRecord,
···
16
17
endpoint: String,
17
18
force_endpoint: bool,
18
19
config: C,
19
19
-
) -> StorageResult<(R, W, Option<Cursor>)>
20
20
+
) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)>
20
21
where
21
22
Self: Sized;
22
23
}
+59
-15
ufos/src/storage_fjall.rs
Reviewed
···
6
6
DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, HourlyRecordsKey, HourlyRollupKey,
7
7
JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
8
8
LiveCountsKey, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
9
9
-
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey,
10
10
-
TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyDidsKey, WeeklyRecordsKey,
11
11
-
WeeklyRollupKey,
9
9
+
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
10
10
+
SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
11
11
+
WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey,
12
12
};
13
13
use crate::{
14
14
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
···
45
45
/// - Launch date
46
46
/// - key: "takeoff" (literal)
47
47
/// - val: u64 (micros timestamp, not from jetstream for now so not precise)
48
48
+
///
49
49
+
/// - Cardinality estimator secret
50
50
+
/// - key: "sketch_secret" (literal)
51
51
+
/// - val: [u8; 16]
48
52
///
49
53
/// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes)
50
54
/// - key: "rollup_cursor" (literal)
···
141
145
endpoint: String,
142
146
force_endpoint: bool,
143
147
_config: FjallConfig,
144
144
-
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>)> {
148
148
+
) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
145
149
let keyspace = {
146
150
let config = Config::new(path);
147
151
···
159
163
160
164
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
161
165
162
162
-
if js_cursor.is_some() {
166
166
+
let sketch_secret = if js_cursor.is_some() {
163
167
let stored_endpoint =
164
168
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
165
165
-
166
169
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
167
170
"found cursor but missing js_endpoint, refusing to start.".to_string(),
168
171
))?;
169
172
173
173
+
let Some(stored_secret) =
174
174
+
get_static_neu::<SketchSecretKey, SketchSecretPrefix>(&global)?
175
175
+
else {
176
176
+
return Err(StorageError::InitError(
177
177
+
"found cursor but missing sketch_secret, refusing to start.".to_string(),
178
178
+
));
179
179
+
};
180
180
+
170
181
if stored != endpoint {
171
182
if force_endpoint {
172
183
log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}");
···
179
190
"stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start.")));
180
191
}
181
192
}
193
193
+
stored_secret
182
194
} else {
183
183
-
insert_static_neu::<JetstreamEndpointKey>(
195
195
+
log::info!("initializing a fresh db!");
196
196
+
init_static_neu::<JetstreamEndpointKey>(
184
197
&global,
185
198
JetstreamEndpointValue(endpoint.to_string()),
186
199
)?;
187
187
-
insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
188
188
-
insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
189
189
-
}
200
200
+
201
201
+
log::info!("generating new secret for cardinality sketches...");
202
202
+
let mut sketch_secret: SketchSecretPrefix = [0u8; 16];
203
203
+
getrandom::fill(&mut sketch_secret).map_err(|e| {
204
204
+
StorageError::InitError(format!(
205
205
+
"failed to get a random secret for cardinality sketches: {e:?}"
206
206
+
))
207
207
+
})?;
208
208
+
init_static_neu::<SketchSecretKey>(&global, sketch_secret)?;
209
209
+
210
210
+
init_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
211
211
+
init_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
212
212
+
213
213
+
sketch_secret
214
214
+
};
190
215
191
216
let reader = FjallReader {
192
217
keyspace: keyspace.clone(),
···
204
229
rollups,
205
230
queues,
206
231
};
207
207
-
Ok((reader, writer, js_cursor))
232
232
+
Ok((reader, writer, js_cursor, sketch_secret))
208
233
}
209
234
}
210
235
···
1089
1114
Ok(())
1090
1115
}
1091
1116
1117
1117
+
/// Set a value to a fixed key, erroring if the value already exists
1118
1118
+
///
1119
1119
+
/// Intended for single-threaded init: not safe under concurrency, since there
1120
1120
+
/// is no transaction between checking if the already exists and writing it.
1121
1121
+
fn init_static_neu<K: StaticStr>(
1122
1122
+
global: &PartitionHandle,
1123
1123
+
value: impl DbBytes,
1124
1124
+
) -> StorageResult<()> {
1125
1125
+
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1126
1126
+
if global.get(&key_bytes)?.is_some() {
1127
1127
+
return Err(StorageError::InitError(format!(
1128
1128
+
"init failed: value for key {key_bytes:?} already exists"
1129
1129
+
)));
1130
1130
+
}
1131
1131
+
let value_bytes = value.to_db_bytes()?;
1132
1132
+
global.insert(&key_bytes, &value_bytes)?;
1133
1133
+
Ok(())
1134
1134
+
}
1135
1135
+
1092
1136
/// Set a value to a fixed key
1093
1137
fn insert_batch_static_neu<K: StaticStr>(
1094
1138
batch: &mut FjallBatch,
···
1132
1176
use serde_json::value::RawValue;
1133
1177
1134
1178
fn fjall_db() -> (FjallReader, FjallWriter) {
1135
1135
-
let (read, write, _) = FjallStorage::init(
1179
1179
+
let (read, write, _, _) = FjallStorage::init(
1136
1180
tempfile::tempdir().unwrap(),
1137
1181
"offline test (no real jetstream endpoint)".to_string(),
1138
1182
false,
···
1187
1231
.commits_by_nsid
1188
1232
.entry(collection.clone())
1189
1233
.or_default()
1190
1190
-
.truncating_insert(commit)
1234
1234
+
.truncating_insert(commit, &[0u8; 16])
1191
1235
.unwrap();
1192
1236
1193
1237
collection
···
1229
1273
.commits_by_nsid
1230
1274
.entry(collection.clone())
1231
1275
.or_default()
1232
1232
-
.truncating_insert(commit)
1276
1276
+
.truncating_insert(commit, &[0u8; 16])
1233
1277
.unwrap();
1234
1278
1235
1279
collection
···
1261
1305
.commits_by_nsid
1262
1306
.entry(collection.clone())
1263
1307
.or_default()
1264
1264
-
.truncating_insert(commit)
1308
1308
+
.truncating_insert(commit, &[0u8; 16])
1265
1309
.unwrap();
1266
1310
1267
1311
collection
+9
-8
ufos/src/storage_mem.rs
Reviewed
···
9
9
HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue,
10
10
JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey,
11
11
NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey,
12
12
-
RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue,
13
13
-
WeekTruncatedCursor, WeeklyRollupKey,
12
12
+
RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey,
13
13
+
TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey,
14
14
};
15
15
use crate::{
16
16
CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections,
···
257
257
endpoint: String,
258
258
force_endpoint: bool,
259
259
_config: MemConfig,
260
260
-
) -> StorageResult<(MemReader, MemWriter, Option<Cursor>)> {
260
260
+
) -> StorageResult<(MemReader, MemWriter, Option<Cursor>, SketchSecretPrefix)> {
261
261
let keyspace = MemKeyspace::open();
262
262
263
263
let global = keyspace.open_partition("global")?;
···
312
312
rollups,
313
313
queues,
314
314
};
315
315
-
Ok((reader, writer, js_cursor))
315
315
+
let secret_prefix = [0u8; 16]; // in-mem store is always deterministic: no secret
316
316
+
Ok((reader, writer, js_cursor, secret_prefix))
316
317
}
317
318
}
318
319
···
1106
1107
use serde_json::value::RawValue;
1107
1108
1108
1109
fn fjall_db() -> (MemReader, MemWriter) {
1109
1109
-
let (read, write, _) = MemStorage::init(
1110
1110
+
let (read, write, _, _) = MemStorage::init(
1110
1111
tempfile::tempdir().unwrap(),
1111
1112
"offline test (no real jetstream endpoint)".to_string(),
1112
1113
false,
···
1161
1162
.commits_by_nsid
1162
1163
.entry(collection.clone())
1163
1164
.or_default()
1164
1164
-
.truncating_insert(commit)
1165
1165
+
.truncating_insert(commit, &[0u8; 16])
1165
1166
.unwrap();
1166
1167
1167
1168
collection
···
1203
1204
.commits_by_nsid
1204
1205
.entry(collection.clone())
1205
1206
.or_default()
1206
1206
-
.truncating_insert(commit)
1207
1207
+
.truncating_insert(commit, &[0u8; 16])
1207
1208
.unwrap();
1208
1209
1209
1210
collection
···
1235
1236
.commits_by_nsid
1236
1237
.entry(collection.clone())
1237
1238
.or_default()
1238
1238
-
.truncating_insert(commit)
1239
1239
+
.truncating_insert(commit, &[0u8; 16])
1239
1240
.unwrap();
1240
1241
1241
1242
collection
+4
ufos/src/store_types.rs
Reviewed
···
22
22
static_str!("js_cursor", JetstreamCursorKey);
23
23
pub type JetstreamCursorValue = Cursor;
24
24
25
25
+
// key format: ["sketch_secret"]
26
26
+
static_str!("sketch_secret", SketchSecretKey);
27
27
+
pub type SketchSecretPrefix = [u8; 16];
28
28
+
25
29
// key format: ["rollup_cursor"]
26
30
static_str!("rollup_cursor", NewRollupCursorKey);
27
31
// pub type NewRollupCursorKey = DbStaticStr<_NewRollupCursorKey>;