···
14
14
use jetstream::events::Cursor;
15
15
use std::collections::HashMap;
16
16
use std::path::{Path, PathBuf};
17
17
-
use std::sync::Arc;
18
17
use std::time::{Duration, Instant};
19
18
use tokio::sync::mpsc::Receiver;
20
19
use tokio::time::sleep;
···
32
31
const MAX_BATCHED_RW_ITEMS: usize = 24;
33
32
34
33
#[derive(Clone)]
35
35
-
struct SerialDb {
34
34
+
struct Db {
36
35
keyspace: Keyspace,
37
36
partition: PartitionHandle,
38
37
}
39
38
40
40
-
struct FakeMutex<T> {
41
41
-
thing: T,
42
42
-
}
43
43
-
impl<T: Clone> FakeMutex<T> {
44
44
-
pub fn new(thing: T) -> Self {
45
45
-
Self { thing }
46
46
-
}
47
47
-
pub async fn lock(&self) -> T {
48
48
-
self.thing.clone()
49
49
-
}
50
50
-
}
51
51
-
52
39
/**
53
40
* data format, roughly:
54
41
*
···
77
64
#[derive(Clone)]
78
65
pub struct Storage {
79
66
/// horrible: gate all db access behind this to force global serialization to avoid deadlock
80
80
-
db: Arc<FakeMutex<SerialDb>>,
67
67
+
db: Db,
81
68
}
82
69
83
70
impl Storage {
···
88
75
PartitionCreateOptions::default().compression(CompressionType::None),
89
76
)?;
90
77
Ok(Self {
91
91
-
db: Arc::new(FakeMutex::new(SerialDb {
78
78
+
db: Db {
92
79
keyspace,
93
80
partition,
94
94
-
})),
81
81
+
},
95
82
})
96
83
}
97
84
···
140
127
141
128
let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first.
142
129
143
143
-
let db = self.db.lock().await;
130
130
+
let db = &self.db;
144
131
let keyspace = db.keyspace.clone();
145
132
let partition = db.partition.clone();
146
133
···
156
143
.await??;
157
144
log::trace!("write: back from blocking task, successfully wrote batch");
158
145
let wrote_for = writer_t0.elapsed();
159
159
-
drop(db);
160
146
161
147
println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}");
162
148
} else {
···
172
158
loop {
173
159
sleep(Duration::from_secs_f64(0.1)).await; // todo: interval rate-limit instead
174
160
175
175
-
let db = self.db.lock().await;
161
161
+
let db = &self.db;
176
162
let keyspace = db.keyspace.clone();
177
163
let partition = db.partition.clone();
178
164
···
245
231
collection: &Nsid,
246
232
limit: usize,
247
233
) -> anyhow::Result<Vec<CreateRecord>> {
248
248
-
let partition = self.db.lock().await.partition.clone();
234
234
+
let partition = self.db.partition.clone();
249
235
let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?;
250
236
tokio::task::spawn_blocking(move || {
251
237
let mut output = Vec::new();
···
267
253
}
268
254
269
255
pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> {
270
270
-
let db = self.db.lock().await;
256
256
+
let db = &self.db;
271
257
let keyspace = db.keyspace.clone();
272
258
let partition = db.partition.clone();
273
259
tokio::task::spawn_blocking(move || {
···
282
268
}
283
269
284
270
pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> {
285
285
-
let partition = self.db.lock().await.partition.clone();
271
271
+
let partition = self.db.partition.clone();
286
272
let collection = collection.clone();
287
273
tokio::task::spawn_blocking(move || get_unrolled_collection_seen(&partition, collection))
288
274
.await?
289
275
}
290
276
291
277
pub async fn get_top_collections(&self) -> anyhow::Result<HashMap<String, u64>> {
292
292
-
let partition = self.db.lock().await.partition.clone();
278
278
+
let partition = self.db.partition.clone();
293
279
tokio::task::spawn_blocking(move || get_unrolled_top_collections(&partition)).await?
294
280
}
295
281
296
282
pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> {
297
297
-
let partition = self.db.lock().await.partition.clone();
283
283
+
let partition = self.db.partition.clone();
298
284
tokio::task::spawn_blocking(move || {
299
285
get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&partition)
300
286
})
···
302
288
}
303
289
304
290
async fn set_jetstream_endpoint(&self, endpoint: &str) -> anyhow::Result<()> {
305
305
-
let partition = self.db.lock().await.partition.clone();
291
291
+
let partition = self.db.partition.clone();
306
292
let endpoint = endpoint.to_string();
307
293
tokio::task::spawn_blocking(move || {
308
294
insert_static::<JetstreamEndpointKey>(&partition, JetstreamEndpointValue(endpoint))
···
311
297
}
312
298
313
299
pub async fn get_jetstream_cursor(&self) -> anyhow::Result<Option<Cursor>> {
314
314
-
let partition = self.db.lock().await.partition.clone();
300
300
+
let partition = self.db.partition.clone();
315
301
tokio::task::spawn_blocking(move || {
316
302
get_static::<JetstreamCursorKey, JetstreamCursorValue>(&partition)
317
303
})
···
319
305
}
320
306
321
307
pub async fn get_mod_cursor(&self) -> anyhow::Result<Option<Cursor>> {
322
322
-
let partition = self.db.lock().await.partition.clone();
308
308
+
let partition = self.db.partition.clone();
323
309
tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&partition))
324
310
.await?
325
311
}