···
88
88
if let Some(earliest) = &self.current_batch.initial_cursor {
89
89
if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
90
90
{
91
91
-
self.send_current_batch_now().await?;
91
91
+
self.send_current_batch_now(false).await?;
92
92
}
93
93
} else {
94
94
self.current_batch.initial_cursor = Some(event.cursor);
···
118
118
if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
119
119
&& self.batch_sender.capacity() == BATCH_QUEUE_SIZE
120
120
{
121
121
-
log::info!("queue empty: immediately sending batch.");
122
122
-
self.send_current_batch_now().await?;
121
121
+
self.send_current_batch_now(true).await?;
123
122
}
124
123
}
125
124
Ok(())
···
133
132
);
134
133
135
134
if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
136
136
-
self.send_current_batch_now().await?;
135
135
+
self.send_current_batch_now(false).await?;
137
136
self.current_batch.batch.insert_commit_by_nsid(
138
137
&collection,
139
138
commit,
···
148
147
149
148
async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> {
150
149
if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES {
151
151
-
self.send_current_batch_now().await?;
150
150
+
self.send_current_batch_now(false).await?;
152
151
}
153
152
self.current_batch
154
153
.batch
···
159
158
160
159
// holds up all consumer progress until it can send to the channel
161
160
// use this when the current batch is too full to add more to it
162
162
-
async fn send_current_batch_now(&mut self) -> anyhow::Result<()> {
161
161
+
async fn send_current_batch_now(&mut self, small: bool) -> anyhow::Result<()> {
163
162
log::info!(
164
164
-
"attempting to send batch now (capacity: {})",
163
163
+
"attempting to send batch now (small? {small}, capacity: {})",
165
164
self.batch_sender.capacity()
166
165
);
167
166
let current = mem::take(&mut self.current_batch);
···
1
1
-
use std::fmt;
2
1
use crate::{Cursor, Did, Nsid, RecordKey};
3
2
use bincode::{
4
3
config::{standard, Config},
···
9
8
error::{DecodeError, EncodeError},
10
9
};
11
10
use lsm_tree::range::prefix_to_range;
11
11
+
use std::fmt;
12
12
use std::marker::PhantomData;
13
13
use std::ops::{Bound, Range};
14
14
use thiserror::Error;
···
119
119
Self: Sized,
120
120
{
121
121
let (prefix, eaten) = P::from_db_bytes(bytes)?;
122
122
-
assert!(eaten <= bytes.len(), "eaten({}) < len({})", eaten, bytes.len());
122
122
+
assert!(
123
123
+
eaten <= bytes.len(),
124
124
+
"eaten({}) < len({})",
125
125
+
eaten,
126
126
+
bytes.len()
127
127
+
);
123
128
let Some(suffix_bytes) = bytes.get(eaten..) else {
124
129
return Err(EncodingError::DecodeMissingSuffix);
125
130
};
126
126
-
if suffix_bytes.len() == 0 {
131
131
+
if suffix_bytes.is_empty() {
127
132
return Err(EncodingError::DecodeMissingSuffix);
128
133
};
129
134
let (suffix, also_eaten) = S::from_db_bytes(suffix_bytes)?;
130
130
-
assert!(also_eaten <= suffix_bytes.len(), "also eaten({}) < suffix len({})", also_eaten, suffix_bytes.len());
135
135
+
assert!(
136
136
+
also_eaten <= suffix_bytes.len(),
137
137
+
"also eaten({}) < suffix len({})",
138
138
+
also_eaten,
139
139
+
suffix_bytes.len()
140
140
+
);
131
141
Ok((Self { prefix, suffix }, eaten + also_eaten))
132
142
}
133
143
}
···
192
202
T: BincodeEncode + BincodeDecode<()> + UseBincodePlz + Sized + std::fmt::Debug,
193
203
{
194
204
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
195
195
-
log::info!("bincode to_db_bytes: {self:?}");
196
205
Ok(encode_to_vec(self, bincode_conf())?)
197
206
}
198
207
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
199
199
-
log::info!("bincode from_db_bytes...");
200
208
Ok(decode_from_slice(bytes, bincode_conf())?)
201
209
}
202
210
}
203
211
204
212
/// helper trait: impl on a type to get helpers to implement DbBytes
205
213
pub trait SerdeBytes: serde::Serialize + for<'a> serde::Deserialize<'a> {
206
206
-
fn to_bytes(&self) -> Result<Vec<u8>, EncodingError> where Self: std::fmt::Debug {
207
207
-
log::info!("bincode serde to_db_bytes: {self:?}");
214
214
+
fn to_bytes(&self) -> Result<Vec<u8>, EncodingError>
215
215
+
where
216
216
+
Self: std::fmt::Debug,
217
217
+
{
208
218
Ok(bincode::serde::encode_to_vec(self, bincode_conf())?)
209
219
}
210
220
fn from_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
211
211
-
log::info!("bincode serde from_db_bytes...");
212
221
Ok(bincode::serde::decode_from_slice(bytes, bincode_conf())?)
213
222
}
214
223
}
···
217
226
218
227
impl DbBytes for Vec<u8> {
219
228
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
220
220
-
log::info!("bincode vec to_db_bytes");
221
229
Ok(self.to_vec())
222
230
}
223
231
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
224
224
-
log::info!("bincode vec from_db_bytes...");
225
232
Ok((bytes.to_owned(), bytes.len()))
226
233
}
227
234
}
···
258
265
259
266
impl DbBytes for Did {
260
267
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
261
261
-
log::info!("bincode did dbbytes from_db_bytes...");
262
268
let (s, n) = decode_from_slice(bytes, bincode_conf())?;
263
269
let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?;
264
270
Ok((me, n))
265
271
}
266
272
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
267
267
-
log::info!("bincode did dbbytes to_db_bytes {self:?}");
268
273
Ok(encode_to_vec(self.as_ref(), bincode_conf())?)
269
274
}
270
275
}
271
276
272
277
impl DbBytes for Nsid {
273
278
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
274
274
-
log::info!("bincode nsid dbbytes from_db_bytes...");
275
279
let (s, n) = decode_from_slice(bytes, bincode_conf())?;
276
280
let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?;
277
281
Ok((me, n))
278
282
}
279
283
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
280
280
-
log::info!("bincode nsid dbbytes to_db_bytes {self:?}");
281
284
Ok(encode_to_vec(self.as_ref(), bincode_conf())?)
282
285
}
283
286
}
284
287
285
288
impl DbBytes for RecordKey {
286
289
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
287
287
-
log::info!("bincode rkey dbbytes from_db_bytes...");
288
290
let (s, n) = decode_from_slice(bytes, bincode_conf())?;
289
291
let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?;
290
292
Ok((me, n))
291
293
}
292
294
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
293
293
-
log::info!("bincode rkey dbbytes to_db_bytes {self:?}");
294
295
Ok(encode_to_vec(self.as_ref(), bincode_conf())?)
295
296
}
296
297
}
···
404
405
(1234, "", "empty string"),
405
406
(789, "aaaaa", "string and cursor"),
406
407
] {
407
407
-
eprintln!("{desc}");
408
408
let original = TwoThings {
409
409
prefix: Cursor::from_raw_u64(tired_prefix),
410
410
suffix: sad_suffix.to_string(),
···
1
1
+
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
1
2
use anyhow::Result;
2
2
-
use jetstream::{
3
3
-
error::JetstreamEventError,
4
4
-
events::JetstreamEvent,
5
5
-
};
3
3
+
use jetstream::{error::JetstreamEventError, events::JetstreamEvent};
6
4
use std::path::PathBuf;
7
5
use tokio::{
8
6
fs::File,
9
7
io::{AsyncBufReadExt, BufReader},
10
10
-
sync::mpsc::{Sender, Receiver, channel},
8
8
+
sync::mpsc::{channel, Receiver, Sender},
11
9
};
12
12
-
use crate::consumer::{Batcher, LimitedBatch, BATCH_QUEUE_SIZE};
13
10
14
11
async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>) -> Result<()> {
15
12
let mut lines = BufReader::new(f).lines();
16
13
while let Some(line) = lines.next_line().await? {
17
17
-
let event: JetstreamEvent = serde_json::from_str(&line)
18
18
-
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
14
14
+
let event: JetstreamEvent =
15
15
+
serde_json::from_str(&line).map_err(JetstreamEventError::ReceivedMalformedJSON)?;
19
16
if sender.send(event).await.is_err() {
20
20
-
log::info!("All receivers for the jsonl fixture have been dropped, bye.");
17
17
+
log::warn!("All receivers for the jsonl fixture have been dropped, bye.");
21
18
return Err(JetstreamEventError::ReceiverClosedError.into());
22
19
}
23
20
}
24
21
Ok(())
25
22
}
26
23
27
27
-
pub async fn consume(
28
28
-
p: PathBuf,
29
29
-
) -> Result<Receiver<LimitedBatch>> {
24
24
+
pub async fn consume(p: PathBuf) -> Result<Receiver<LimitedBatch>> {
30
25
let f = File::open(p).await?;
31
26
let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
32
27
let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
···
1
1
use clap::Parser;
2
2
+
use jetstream::events::Cursor;
2
3
use std::path::PathBuf;
3
4
use ufos::consumer;
4
4
-
use ufos::file_consumer;
5
5
use ufos::error::StorageError;
6
6
-
// use ufos::server;
7
7
-
use jetstream::events::Cursor;
6
6
+
use ufos::file_consumer;
7
7
+
use ufos::server;
8
8
use ufos::storage::{StorageWhatever, StoreReader, StoreWriter};
9
9
use ufos::storage_fjall::FjallStorage;
10
10
use ufos::storage_mem::MemStorage;
···
65
65
args.jetstream_force,
66
66
Default::default(),
67
67
)?;
68
68
-
go(args.jetstream, args.jetstream_fixture, args.pause_writer, read_store, write_store, cursor).await?;
68
68
+
go(
69
69
+
args.jetstream,
70
70
+
args.jetstream_fixture,
71
71
+
args.pause_writer,
72
72
+
read_store,
73
73
+
write_store,
74
74
+
cursor,
75
75
+
)
76
76
+
.await?;
69
77
} else {
70
78
let (read_store, write_store, cursor) = FjallStorage::init(
71
79
args.data,
···
73
81
args.jetstream_force,
74
82
Default::default(),
75
83
)?;
76
76
-
go(args.jetstream, args.jetstream_fixture, args.pause_writer, read_store, write_store, cursor).await?;
84
84
+
go(
85
85
+
args.jetstream,
86
86
+
args.jetstream_fixture,
87
87
+
args.pause_writer,
88
88
+
read_store,
89
89
+
write_store,
90
90
+
cursor,
91
91
+
)
92
92
+
.await?;
77
93
}
78
94
79
95
Ok(())
···
83
99
jetstream: String,
84
100
jetstream_fixture: bool,
85
101
pause_writer: bool,
86
86
-
_read_store: impl StoreReader + 'static,
102
102
+
read_store: impl StoreReader + 'static,
87
103
mut write_store: impl StoreWriter + 'static,
88
104
cursor: Option<Cursor>,
89
105
) -> anyhow::Result<()> {
90
90
-
// println!("starting server with storage...");
91
91
-
// let serving = server::serve(read_store);
106
106
+
println!("starting server with storage...");
107
107
+
let serving = server::serve(read_store);
92
108
93
93
-
// let t1 = tokio::task::spawn(async {
94
94
-
// let r = serving.await;
95
95
-
// log::warn!("serving ended with: {r:?}");
96
96
-
// });
109
109
+
let t1 = tokio::task::spawn(async {
110
110
+
let r = serving.await;
111
111
+
log::warn!("serving ended with: {r:?}");
112
112
+
});
97
113
98
114
let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({
99
115
async move {
···
108
124
consumer::consume(&jetstream, cursor, false).await?
109
125
};
110
126
111
111
-
log::info!("started consumer, got chan etc...");
112
112
-
113
127
tokio::task::spawn_blocking(move || {
114
128
while let Some(event_batch) = batches.blocking_recv() {
115
115
-
log::info!("got batch, putting to storage...");
116
129
write_store.insert_batch(event_batch)?;
117
117
-
log::info!("inserted batch...");
118
118
-
write_store.step_rollup()
130
130
+
write_store
131
131
+
.step_rollup()
119
132
.inspect_err(|e| log::error!("laksjdfl: {e:?}"))?;
120
120
-
log::info!("inserted and stepped rollup. ready for next...");
121
133
}
122
122
-
log::warn!("??????????????????????");
123
134
Ok::<(), StorageError>(())
124
135
})
125
136
.await??;
126
137
127
127
-
// let r = storage.receive(batches).await;
128
138
log::warn!("storage.receive ended with");
129
139
} else {
130
140
log::info!("not starting jetstream or the write loop.");
···
133
143
}
134
144
});
135
145
136
136
-
// let t3 = tokio::task::spawn(async move {
137
137
-
// if !args.pause_rw {
138
138
-
// let r = storage.rw_loop().await;
139
139
-
// log::warn!("storage.rw_loop ended with: {r:?}");
140
140
-
// } else {
141
141
-
// log::info!("not starting rw loop.");
142
142
-
// }
143
143
-
// });
144
144
-
145
145
-
// tokio::select! {
146
146
-
// // v = serving => eprintln!("serving ended: {v:?}"),
147
147
-
// v = storage.receive(batches) => eprintln!("storage consumer ended: {v:?}"),
148
148
-
// v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"),
149
149
-
// };
150
150
-
151
151
-
log::trace!("tasks running. waiting.");
152
146
tokio::select! {
153
153
-
// z = t1 => log::warn!("serve task ended: {z:?}"),
147
147
+
z = t1 => log::warn!("serve task ended: {z:?}"),
154
148
z = t2 => log::warn!("storage task ended: {z:?}"),
155
149
};
156
156
-
157
157
-
// t1.await?;
158
158
-
// log::trace!("serve task ended.");
159
159
-
// t2.await??;
160
160
-
// log::trace!("storage receive task ended.");
161
161
-
// // t3.await?;
162
162
-
// // log::trace!("storage rw task ended.");
163
150
164
151
println!("bye!");
165
152
···
497
497
// we *could* read+write every single batch to rollup.. but their merge is associative so
498
498
// ...so save the db some work up front? is this worth it? who knows...
499
499
500
500
-
log::warn!("sup!!!");
501
501
-
502
500
#[derive(Eq, Hash, PartialEq)]
503
501
enum Rollup {
504
502
Hourly(HourTruncatedCursor),
···
511
509
let mut last_cursor = Cursor::from_start();
512
510
let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new();
513
511
514
514
-
log::warn!("about to loop....");
515
512
for (i, kv) in timelies.enumerate() {
516
516
-
// log::warn!("loop {i}...");
517
513
if i >= rollup_limit {
518
514
break;
519
515
}
520
516
521
517
let (key_bytes, val_bytes) = kv?;
522
522
-
let key = db_complete::<LiveCountsKey>(&key_bytes)
523
523
-
.inspect_err(|e| log::warn!("rlc: key: {e:?}"))?;
518
518
+
let key = db_complete::<LiveCountsKey>(&key_bytes)?;
524
519
525
520
if cursor_exclusive_limit
526
521
.map(|limit| key.cursor() > limit)
···
530
525
}
531
526
532
527
batch.remove(&self.rollups, key_bytes);
533
533
-
let val = db_complete::<CountsValue>(&val_bytes)
534
534
-
.inspect_err(|e| log::warn!("rlc: val: {e:?}"))?;
528
528
+
let val = db_complete::<CountsValue>(&val_bytes)?;
535
529
counts_by_rollup
536
530
.entry((
537
531
key.collection().clone(),
···
554
548
cursors_advanced += 1;
555
549
last_cursor = key.cursor();
556
550
}
557
557
-
log::warn!("done looping. looping cbr counts(?)..");
558
551
559
552
for ((nsid, rollup), counts) in counts_by_rollup {
560
560
-
log::warn!("######################## cbr loop {nsid:?} {counts:?} ########################");
561
553
let key_bytes = match rollup {
562
554
Rollup::Hourly(hourly_cursor) => {
563
555
let k = HourlyRollupKey::new(hourly_cursor, &nsid);
564
564
-
log::info!("hrly k: {k:?}");
565
556
k.to_db_bytes()?
566
557
}
567
558
Rollup::Weekly(weekly_cursor) => {
568
559
let k = WeeklyRollupKey::new(weekly_cursor, &nsid);
569
569
-
log::info!("weekly k: {k:?}");
570
560
k.to_db_bytes()?
571
561
}
572
562
Rollup::AllTime => {
573
563
let k = AllTimeRollupKey::new(&nsid);
574
574
-
log::info!("alltime k: {k:?}");
575
564
k.to_db_bytes()?
576
565
}
577
566
};
578
578
-
// log::info!("key bytes: {key_bytes:?}");
579
567
let mut rolled: CountsValue = self
580
568
.rollups
581
569
.get(&key_bytes)?
582
582
-
.inspect(|v| {
583
583
-
let lax = CountsValue::from_db_bytes(v);
584
584
-
log::info!("val: len={}, lax={lax:?} first32={:?}", v.len(), v.get(..32));
585
585
-
})
586
570
.as_deref()
587
571
.map(db_complete::<CountsValue>)
588
588
-
.transpose()
589
589
-
.inspect_err(|e| log::warn!("oooh did we break on the rolled thing? {e:?}"))?
572
572
+
.transpose()?
590
573
.unwrap_or_default();
591
591
-
592
574
593
575
// try to round-trip before inserting, for funsies
594
576
let tripppin = counts.to_db_bytes()?;
···
599
581
if counts.records() > 20_000_000 {
600
582
panic!("COUNTS maybe wtf? {counts:?}")
601
583
}
602
602
-
// assert_eq!(rolled, and_back);
603
603
-
604
584
605
585
rolled.merge(&counts);
606
606
-
607
607
-
// try to round-trip before inserting, for funsies
608
608
-
let tripppin = rolled.to_db_bytes()?;
609
609
-
let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?;
610
610
-
assert_eq!(n, tripppin.len());
611
611
-
assert_eq!(rolled.prefix, and_back.prefix);
612
612
-
assert_eq!(rolled.dids().estimate(), and_back.dids().estimate());
613
613
-
if rolled.records() > 20_000_000 {
614
614
-
panic!("maybe wtf? {rolled:?}")
615
615
-
}
616
616
-
// assert_eq!(rolled, and_back);
617
617
-
618
586
batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?);
619
587
}
620
588
621
621
-
log::warn!("done cbr loop.");
622
622
-
623
623
-
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)
624
624
-
.inspect_err(|e| log::warn!("insert neu: {e:?}"))?;
589
589
+
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
625
590
626
591
batch.commit()?;
627
627
-
628
628
-
log::warn!("ok finished rlc stuff. huh.");
629
592
Ok(cursors_advanced)
630
593
}
631
594
}
···
662
625
feed_val.to_db_bytes()?,
663
626
);
664
627
665
665
-
666
628
let location_val: RecordLocationVal =
667
629
(commit.cursor, commit.rev.as_str(), put_action).into();
668
630
batch.insert(
···
702
664
Ok(())
703
665
}
704
666
705
705
-
fn step_rollup(&mut self) -> StorageResult<usize> {
667
667
+
fn step_rollup(&mut self) -> StorageResult<usize> {
706
668
let rollup_cursor =
707
669
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
708
670
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
709
709
-
)
710
710
-
.inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?;
671
671
+
)?;
711
672
712
673
// timelies
713
713
-
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)
714
714
-
.inspect_err(|e| log::warn!("live counts range: {e:?}"))?;
674
674
+
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
715
675
let mut timely_iter = self.rollups.range(live_counts_range).peekable();
716
676
717
677
let timely_next_cursor = timely_iter
···
720
680
match kv {
721
681
Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
722
682
Ok((key_bytes, _)) => {
723
723
-
let key = db_complete::<LiveCountsKey>(key_bytes)
724
724
-
.inspect_err(|e| log::warn!("failed getting key for next timely: {e:?}"))?;
683
683
+
let key = db_complete::<LiveCountsKey>(key_bytes)?;
725
684
Ok(key.cursor())
726
685
}
727
686
}
728
687
})
729
729
-
.transpose()
730
730
-
.inspect_err(|e| log::warn!("something about timely: {e:?}"))?;
688
688
+
.transpose()?;
731
689
732
690
// delete accounts
733
691
let delete_accounts_range =
···
737
695
.queues
738
696
.range(delete_accounts_range)
739
697
.next()
740
740
-
.transpose()
741
741
-
.inspect_err(|e| log::warn!("range for next delete: {e:?}"))?
698
698
+
.transpose()?
742
699
.map(|(key_bytes, val_bytes)| {
743
700
db_complete::<DeleteAccountQueueKey>(&key_bytes)
744
744
-
.inspect_err(|e| log::warn!("failed inside next delete thing????: {e:?}"))
745
701
.map(|k| (k.suffix, key_bytes, val_bytes))
746
702
})
747
747
-
.transpose()
748
748
-
.inspect_err(|e| log::warn!("failed getting next delete: {e:?}"))?;
703
703
+
.transpose()?;
749
704
750
705
let cursors_stepped = match (timely_next_cursor, next_delete) {
751
706
(
···
757
712
timely_iter,
758
713
Some(delete_cursor),
759
714
MAX_BATCHED_ROLLUP_COUNTS,
760
760
-
)
761
761
-
.inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))?
715
715
+
)?
762
716
} else {
763
763
-
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
764
764
-
.inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))?
717
717
+
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
765
718
}
766
719
}
767
720
(Some(_), None) => {
768
768
-
self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)
769
769
-
.inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?
721
721
+
self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?
770
722
}
771
723
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
772
772
-
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
773
773
-
.inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))?
724
724
+
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
774
725
}
775
726
(None, None) => 0,
776
727
};
···
1
1
-
use std::sync::Arc;
2
1
use std::ops::Bound;
2
2
+
use std::sync::Arc;
3
3
4
4
-
use std::sync::RwLock;
5
5
-
use std::sync::Mutex;
6
4
use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr};
7
5
use crate::error::StorageError;
8
6
use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter};
···
17
15
use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord};
18
16
use async_trait::async_trait;
19
17
use jetstream::events::Cursor;
20
20
-
use std::collections::HashMap;
18
18
+
use lsm_tree::range::prefix_to_range;
21
19
use std::collections::BTreeMap;
20
20
+
use std::collections::HashMap;
22
21
use std::path::Path;
22
22
+
use std::sync::Mutex;
23
23
+
use std::sync::RwLock;
23
24
use std::time::SystemTime;
24
24
-
use lsm_tree::range::prefix_to_range;
25
25
-
26
25
27
26
const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds
28
27
const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024;
···
122
121
123
122
impl MemKeyspace {
124
123
pub fn open() -> Self {
125
125
-
Self { keyspace_guard: Arc::new(RwLock::new(BatchSentinel {})) }
124
124
+
Self {
125
125
+
keyspace_guard: Arc::new(RwLock::new(BatchSentinel {})),
126
126
+
}
126
127
}
127
128
pub fn open_partition(&self, _name: &str) -> StorageResult<MemPartion> {
128
129
Ok(MemPartion {
···
137
138
tasks: Vec::new(),
138
139
}
139
140
}
140
140
-
pub fn instant(&self) -> () {}
141
141
+
pub fn instant(&self) -> u64 {
142
142
+
1
143
143
+
}
141
144
}
142
145
143
146
enum BatchTask {
···
176
179
let _guard = self.keyspace_guard.write().unwrap();
177
180
for task in &mut self.tasks {
178
181
match task {
179
179
-
BatchTask::Insert { p, key, val } =>
180
180
-
p.contents
181
181
-
.try_lock()
182
182
-
.unwrap()
183
183
-
.insert(key.to_vec(), val.to_vec()),
184
184
-
BatchTask::Remove { p, key } =>
185
185
-
p.contents
186
186
-
.try_lock()
187
187
-
.unwrap()
188
188
-
.remove(key),
182
182
+
BatchTask::Insert { p, key, val } => p
183
183
+
.contents
184
184
+
.try_lock()
185
185
+
.unwrap()
186
186
+
.insert(key.to_vec(), val.to_vec()),
187
187
+
BatchTask::Remove { p, key } => p.contents.try_lock().unwrap().remove(key),
189
188
};
190
189
}
191
190
Ok(())
···
201
200
impl MemPartion {
202
201
pub fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>> {
203
202
let _guard = self.keyspace_guard.read().unwrap();
204
204
-
Ok(self.contents
205
205
-
.lock()
206
206
-
.unwrap()
207
207
-
.get(key)
208
208
-
.cloned())
203
203
+
Ok(self.contents.lock().unwrap().get(key).cloned())
209
204
}
210
205
pub fn prefix(&self, pre: &[u8]) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> {
211
206
// let prefix_bytes = prefix.to_db_bytes()?;
212
212
-
let (_, Bound::Excluded(range_end)) = prefix_to_range(&pre) else {
207
207
+
let (_, Bound::Excluded(range_end)) = prefix_to_range(pre) else {
213
208
panic!("bad range thing");
214
209
};
215
210
216
216
-
return self.range(pre.to_vec()..range_end.to_vec()).into()
211
211
+
return self.range(pre.to_vec()..range_end.to_vec());
217
212
}
218
213
pub fn range(&self, r: std::ops::Range<Vec<u8>>) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> {
219
214
let _guard = self.keyspace_guard.read().unwrap();
···
240
235
// .remove(key);
241
236
// Ok(())
242
237
// }
243
243
-
pub fn snapshot_at(&self, _instant: ()) -> Self {
238
238
+
pub fn snapshot_at(&self, _instant: u64) -> Self {
244
239
self.clone()
245
240
}
246
241
pub fn snapshot(&self) -> Self {
···
666
661
667
662
let (key_bytes, val_bytes) = kv?;
668
663
let key = db_complete::<LiveCountsKey>(&key_bytes)
669
669
-
.inspect_err(|e| log::warn!("rlc: key: {e:?}"))?;
664
664
+
.inspect_err(|e| log::warn!("rlc: key: {e:?}"))?;
670
665
671
666
if cursor_exclusive_limit
672
667
.map(|limit| key.cursor() > limit)
···
677
672
678
673
batch.remove(&self.rollups, &key_bytes);
679
674
let val = db_complete::<CountsValue>(&val_bytes)
680
680
-
.inspect_err(|e| log::warn!("rlc: val: {e:?}"))?;
675
675
+
.inspect_err(|e| log::warn!("rlc: val: {e:?}"))?;
681
676
counts_by_rollup
682
677
.entry((
683
678
key.collection().clone(),
···
703
698
log::warn!("done looping. looping cbr counts(?)..");
704
699
705
700
for ((nsid, rollup), counts) in counts_by_rollup {
706
706
-
log::warn!("######################## cbr loop {nsid:?} {counts:?} ########################");
701
701
+
log::warn!(
702
702
+
"######################## cbr loop {nsid:?} {counts:?} ########################"
703
703
+
);
707
704
let key_bytes = match rollup {
708
705
Rollup::Hourly(hourly_cursor) => {
709
706
let k = HourlyRollupKey::new(hourly_cursor, &nsid);
···
727
724
.get(&key_bytes)?
728
725
.inspect(|v| {
729
726
let lax = CountsValue::from_db_bytes(v);
730
730
-
log::info!("val: len={}, lax={lax:?} first32={:?}", v.len(), v.get(..32));
727
727
+
log::info!(
728
728
+
"val: len={}, lax={lax:?} first32={:?}",
729
729
+
v.len(),
730
730
+
v.get(..32)
731
731
+
);
731
732
})
732
733
.as_deref()
733
734
.map(db_complete::<CountsValue>)
734
735
.transpose()
735
736
.inspect_err(|e| log::warn!("oooh did we break on the rolled thing? {e:?}"))?
736
737
.unwrap_or_default();
737
737
-
738
738
739
739
// try to round-trip before inserting, for funsies
740
740
let tripppin = counts.to_db_bytes()?;
···
746
746
panic!("COUNTS maybe wtf? {counts:?}")
747
747
}
748
748
// assert_eq!(rolled, and_back);
749
749
-
750
749
751
750
rolled.merge(&counts);
752
751
···
807
806
&feed_key.to_db_bytes()?,
808
807
&feed_val.to_db_bytes()?,
809
808
);
810
810
-
811
809
812
810
let location_val: RecordLocationVal =
813
811
(commit.cursor, commit.rev.as_str(), put_action).into();
···
848
846
Ok(())
849
847
}
850
848
851
851
-
fn step_rollup(&mut self) -> StorageResult<usize> {
849
849
+
fn step_rollup(&mut self) -> StorageResult<usize> {
852
850
let rollup_cursor =
853
853
-
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
854
854
-
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
855
855
-
)
856
856
-
.inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?;
851
851
+
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
852
852
+
.ok_or(StorageError::BadStateError(
853
853
+
"Could not find current rollup cursor".to_string(),
854
854
+
))
855
855
+
.inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?;
857
856
858
857
// timelies
859
858
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)
···
866
865
match kv {
867
866
Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?,
868
867
Ok((key_bytes, _)) => {
869
869
-
let key = db_complete::<LiveCountsKey>(key_bytes)
870
870
-
.inspect_err(|e| log::warn!("failed getting key for next timely: {e:?}"))?;
868
868
+
let key = db_complete::<LiveCountsKey>(key_bytes).inspect_err(|e| {
869
869
+
log::warn!("failed getting key for next timely: {e:?}")
870
870
+
})?;
871
871
Ok(key.cursor())
872
872
}
873
873
}
···
908
908
.inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))?
909
909
} else {
910
910
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
911
911
-
.inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))?
911
911
+
.inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))?
912
912
}
913
913
}
914
914
-
(Some(_), None) => {
915
915
-
self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)
916
916
-
.inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?
917
917
-
}
918
918
-
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
919
919
-
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
920
920
-
.inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))?
921
921
-
}
914
914
+
(Some(_), None) => self
915
915
+
.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)
916
916
+
.inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))?,
917
917
+
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => self
918
918
+
.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)
919
919
+
.inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))?,
922
920
(None, None) => 0,
923
921
};
924
922
···
1030
1028
}
1031
1029
1032
1030
/// Set a value to a fixed key
1033
1033
-
fn insert_static_neu<K: StaticStr>(
1034
1034
-
global: &MemPartion,
1035
1035
-
value: impl DbBytes,
1036
1036
-
) -> StorageResult<()> {
1031
1031
+
fn insert_static_neu<K: StaticStr>(global: &MemPartion, value: impl DbBytes) -> StorageResult<()> {
1037
1032
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
1038
1033
let value_bytes = value.to_db_bytes()?;
1039
1034
global.insert(&key_bytes, &value_bytes)?;
···
210
210
pub struct EstimatedDidsValue(pub CardinalityEstimator<Did>);
211
211
impl SerdeBytes for EstimatedDidsValue {}
212
212
impl DbBytes for EstimatedDidsValue {
213
213
+
#[cfg(test)]
213
214
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
214
214
-
// Ok(vec![1, 2, 3])
215
215
SerdeBytes::to_bytes(self)
216
216
}
217
217
+
#[cfg(test)]
217
218
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
218
218
-
// Ok((Self(CardinalityEstimator::new()), 3))
219
219
SerdeBytes::from_bytes(bytes)
220
220
+
}
221
221
+
222
222
+
#[cfg(not(test))]
223
223
+
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
224
224
+
Ok(vec![1, 2, 3]) // TODO: un-stub when their heap overflow is fixed
225
225
+
}
226
226
+
#[cfg(not(test))]
227
227
+
fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> {
228
228
+
if bytes.len() < 3 {
229
229
+
return Err(EncodingError::DecodeNotEnoughBytes);
230
230
+
}
231
231
+
Ok((Self(CardinalityEstimator::new()), 3)) // TODO: un-stub when their heap overflow is fixed
220
232
}
221
233
}
222
234
···
407
419
assert_eq!(restored, original);
408
420
assert_eq!(bytes_consumed, serialized.len());
409
421
410
410
-
for i in 10..10_000 {
422
422
+
for i in 10..1_000 {
411
423
estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap());
412
424
}
413
425
let original = CountsValue::new(123, estimator);