···
351
351
loop {
352
352
let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
353
353
354
354
-
let req = match build_request(connect_cursor.clone()) {
354
354
+
let req = match build_request(connect_cursor) {
355
355
Ok(req) => req,
356
356
Err(e) => {
357
357
log::error!("Could not build jetstream websocket request: {e:?}");
···
359
359
}
360
360
};
361
361
362
362
-
let mut last_cursor = connect_cursor.clone();
362
362
+
let mut last_cursor = connect_cursor;
363
363
retry_attempt += 1;
364
364
if let Ok((ws_stream, _)) = connect_async(req).await {
365
365
let t_connected = Instant::now();
···
427
427
Message::Text(json) => {
428
428
let event: JetstreamEvent = serde_json::from_str(&json)
429
429
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
430
430
-
let event_cursor = event.cursor.clone();
430
430
+
let event_cursor = event.cursor;
431
431
432
432
if let Some(last) = last_cursor {
433
433
if event_cursor <= *last {
···
458
458
459
459
let event: JetstreamEvent = serde_json::from_reader(decoder)
460
460
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
461
461
-
let event_cursor = event.cursor.clone();
461
461
+
let event_cursor = event.cursor;
462
462
463
463
if let Some(last) = last_cursor {
464
464
if event_cursor <= *last {
···
126
126
) -> Result<(Self, Nsid), FirehoseEventError> {
127
127
let action = match commit.operation {
128
128
CommitOp::Delete => CommitAction::Cut,
129
129
-
cru @ _ => CommitAction::Put(PutAction {
129
129
+
cru => CommitAction::Put(PutAction {
130
130
record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?,
131
131
is_update: cru == CommitOp::Update,
132
132
}),
···
66
66
if !args.pause_writer {
67
67
println!(
68
68
"starting consumer with cursor: {cursor:?} from {:?} ago",
69
69
-
cursor.clone().map(|c| c.elapsed())
69
69
+
cursor.map(|c| c.elapsed())
70
70
);
71
71
let mut batches =
72
72
consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?;
···
324
324
325
325
// timelies
326
326
let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
327
327
-
let mut timely_iter = self.rollups.range(live_counts_range).into_iter().peekable();
327
327
+
let mut timely_iter = self.rollups.range(live_counts_range).peekable();
328
328
329
329
let timely_next_cursor = timely_iter
330
330
.peek_mut()
···
332
332
match kv {
333
333
Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?,
334
334
Ok((key_bytes, _)) => {
335
335
-
let key = db_complete::<LiveCountsKey>(&key_bytes)?;
335
335
+
let key = db_complete::<LiveCountsKey>(key_bytes)?;
336
336
Ok(key.cursor())
337
337
}
338
338
}
···
702
702
log::info!("{}", batch_summary);
703
703
704
704
// todo!();
705
705
-
// let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first.
705
705
+
// let last = event_batch.last_jetstream_cursor; // TODO: get this from the data. track last in consumer. compute or track first.
706
706
707
707
// let db = &self.db;
708
708
// let keyspace = db.keyspace.clone();
···
777
777
log::trace!("rw: getting rw cursor...");
778
778
let mod_cursor =
779
779
get_static::<ModCursorKey, ModCursorValue>(&global)?.unwrap_or(Cursor::from_start());
780
780
-
let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?;
780
780
+
let range = ModQueueItemKey::new(mod_cursor).range_to_prefix_end()?;
781
781
782
782
let mut db_batch = keyspace.batch();
783
783
let mut batched_rw_items = 0;
···
1106
1106
) -> anyhow::Result<usize> {
1107
1107
// update the current rw cursor to this item (atomically with the batch if it succeeds)
1108
1108
let mod_cursor: Cursor = (&mod_key).into();
1109
1109
-
insert_batch_static::<ModCursorKey>(db_batch, &self.global, mod_cursor.clone())?;
1109
1109
+
insert_batch_static::<ModCursorKey>(db_batch, &self.global, mod_cursor)?;
1110
1110
1111
1111
let items_modified = match mod_value {
1112
1112
ModQueueItemValue::DeleteAccount(did) => {
···
1150
1150
// 1. delete any existing versions older than us
1151
1151
let items_deleted = self.delete_record(
1152
1152
db_batch,
1153
1153
-
cursor.clone(),
1153
1153
+
cursor,
1154
1154
did.clone(),
1155
1155
collection.clone(),
1156
1156
rkey.clone(),
···
1175
1175
ByIdKey::record_prefix(did.clone(), collection.clone(), rkey.clone()).to_db_bytes()?;
1176
1176
1177
1177
// put the cursor of the actual deletion event in to prevent prefix iter from touching newer docs
1178
1178
-
let key_limit =
1179
1179
-
ByIdKey::new(did, collection.clone(), rkey, cursor.clone()).to_db_bytes()?;
1178
1178
+
let key_limit = ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?;
1180
1179
1181
1180
let mut items_removed = 0;
1182
1181
···
1273
1272
// ["by_collection"|collection|js_cursor] => [did|rkey|record]
1274
1273
db_batch.insert(
1275
1274
&self.global,
1276
1276
-
ByCollectionKey::new(collection.clone(), cursor.clone()).to_db_bytes()?,
1275
1275
+
ByCollectionKey::new(collection.clone(), cursor).to_db_bytes()?,
1277
1276
ByCollectionValue::new(did.clone(), rkey.clone(), record).to_db_bytes()?,
1278
1277
);
1279
1278
···
1341
1340
}
1342
1341
1343
1342
impl TestBatch {
1343
1343
+
#[allow(clippy::too_many_arguments)]
1344
1344
pub fn create(
1345
1345
&mut self,
1346
1346
did: &str,
···
1382
1382
1383
1383
collection
1384
1384
}
1385
1385
+
#[allow(clippy::too_many_arguments)]
1385
1386
pub fn update(
1386
1387
&mut self,
1387
1388
did: &str,
···
1423
1424
1424
1425
collection
1425
1426
}
1427
1427
+
#[allow(clippy::too_many_arguments)]
1426
1428
pub fn delete(
1427
1429
&mut self,
1428
1430
did: &str,
···
1499
1501
assert_eq!(records, 0);
1500
1502
assert_eq!(dids, 0);
1501
1503
1502
1502
-
let records = read.get_records_by_collections(&vec![&collection], 2)?;
1504
1504
+
let records = read.get_records_by_collections(&[&collection], 2)?;
1503
1505
assert_eq!(records.len(), 1);
1504
1506
let rec = &records[0];
1505
1507
assert_eq!(rec.record.get(), "{}");
1506
1506
-
assert_eq!(rec.is_update, false);
1508
1508
+
assert!(!rec.is_update);
1507
1509
1508
1510
let records =
1509
1509
-
read.get_records_by_collections(&vec![&Nsid::new("d.e.f".to_string()).unwrap()], 2)?;
1511
1511
+
read.get_records_by_collections(&[&Nsid::new("d.e.f".to_string()).unwrap()], 2)?;
1510
1512
assert_eq!(records.len(), 0);
1511
1513
1512
1514
Ok(())
···
1544
1546
assert_eq!(records, 1);
1545
1547
assert_eq!(dids, 1);
1546
1548
1547
1547
-
let records = read.get_records_by_collections(&vec![&collection], 2)?;
1549
1549
+
let records = read.get_records_by_collections(&[&collection], 2)?;
1548
1550
assert_eq!(records.len(), 1);
1549
1551
let rec = &records[0];
1550
1552
assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#);
1551
1551
-
assert_eq!(rec.is_update, true);
1553
1553
+
assert!(rec.is_update);
1552
1554
Ok(())
1553
1555
}
1554
1556
···
1582
1584
assert_eq!(records, 1);
1583
1585
assert_eq!(dids, 1);
1584
1586
1585
1585
-
let records = read.get_records_by_collections(&vec![&collection], 2)?;
1587
1587
+
let records = read.get_records_by_collections(&[&collection], 2)?;
1586
1588
assert_eq!(records.len(), 0);
1587
1589
1588
1590
Ok(())
···
1628
1630
write.insert_batch(batch.batch)?;
1629
1631
1630
1632
let records =
1631
1631
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1633
1633
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1632
1634
assert_eq!(records.len(), 1);
1633
1635
let records =
1634
1634
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.b".to_string()).unwrap()], 100)?;
1636
1636
+
read.get_records_by_collections(&[&Nsid::new("a.a.b".to_string()).unwrap()], 100)?;
1635
1637
assert_eq!(records.len(), 10);
1636
1638
let records =
1637
1637
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.c".to_string()).unwrap()], 100)?;
1639
1639
+
read.get_records_by_collections(&[&Nsid::new("a.a.c".to_string()).unwrap()], 100)?;
1638
1640
assert_eq!(records.len(), 1);
1639
1641
let records =
1640
1640
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.d".to_string()).unwrap()], 100)?;
1642
1642
+
read.get_records_by_collections(&[&Nsid::new("a.a.d".to_string()).unwrap()], 100)?;
1641
1643
assert_eq!(records.len(), 0);
1642
1644
1643
1645
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?;
···
1646
1648
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?;
1647
1649
1648
1650
let records =
1649
1649
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1651
1651
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1650
1652
assert_eq!(records.len(), 1);
1651
1653
let records =
1652
1652
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.b".to_string()).unwrap()], 100)?;
1654
1654
+
read.get_records_by_collections(&[&Nsid::new("a.a.b".to_string()).unwrap()], 100)?;
1653
1655
assert_eq!(records.len(), 6);
1654
1656
let records =
1655
1655
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.c".to_string()).unwrap()], 100)?;
1657
1657
+
read.get_records_by_collections(&[&Nsid::new("a.a.c".to_string()).unwrap()], 100)?;
1656
1658
assert_eq!(records.len(), 1);
1657
1659
let records =
1658
1658
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.d".to_string()).unwrap()], 100)?;
1660
1660
+
read.get_records_by_collections(&[&Nsid::new("a.a.d".to_string()).unwrap()], 100)?;
1659
1661
assert_eq!(records.len(), 0);
1660
1662
1661
1663
Ok(())
···
1689
1691
write.insert_batch(batch.batch)?;
1690
1692
1691
1693
let records =
1692
1692
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1694
1694
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1693
1695
assert_eq!(records.len(), 3);
1694
1696
1695
1697
let records_deleted =
···
1697
1699
assert_eq!(records_deleted, 2);
1698
1700
1699
1701
let records =
1700
1700
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1702
1702
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1701
1703
assert_eq!(records.len(), 1);
1702
1704
1703
1705
Ok(())
···
1726
1728
write.step_rollup()?;
1727
1729
1728
1730
let records =
1729
1729
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 1)?;
1731
1731
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 1)?;
1730
1732
assert_eq!(records.len(), 0);
1731
1733
1732
1734
Ok(())
···
1756
1758
write.insert_batch(batch.batch)?;
1757
1759
1758
1760
let records =
1759
1759
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 1)?;
1761
1761
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 1)?;
1760
1762
assert_eq!(records.len(), 1);
1761
1763
1762
1764
let n = write.step_rollup()?;
1763
1765
assert_eq!(n, 1);
1764
1766
1765
1767
let records =
1766
1766
-
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 1)?;
1768
1768
+
read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 1)?;
1767
1769
assert_eq!(records.len(), 0);
1768
1770
1769
1771
let mut batch = TestBatch::default();
···
206
206
impl LiveCountsKey {
207
207
pub fn range_from_cursor(cursor: Cursor) -> Result<Range<Vec<u8>>, EncodingError> {
208
208
let prefix = LiveCountsCursorPrefix::from_pair(Default::default(), cursor);
209
209
-
Ok(prefix.range_to_prefix_end()?)
209
209
+
prefix.range_to_prefix_end()
210
210
}
211
211
pub fn cursor(&self) -> Cursor {
212
212
self.prefix.suffix
···
432
432
ByIdDidPrefix::from_pair(Default::default(), did)
433
433
}
434
434
pub fn cursor(&self) -> Cursor {
435
435
-
self.suffix.clone()
435
435
+
self.suffix
436
436
}
437
437
}
438
438
impl From<ByIdKey> for (Did, Nsid, RecordKey, Cursor) {
···
515
515
}
516
516
impl From<&ModQueueItemKey> for ModCursorValue {
517
517
fn from(k: &ModQueueItemKey) -> Self {
518
518
-
k.suffix.clone()
518
518
+
k.suffix
519
519
}
520
520
}
521
521
···
596
596
pub struct TruncatedCursor<const MOD: u64>(u64);
597
597
impl<const MOD: u64> TruncatedCursor<MOD> {
598
598
fn truncate(raw: u64) -> u64 {
599
599
-
let floored_ts = raw / MOD;
600
600
-
let truncated = floored_ts * MOD;
601
601
-
truncated
599
599
+
(raw / MOD) * MOD
602
600
}
603
601
pub fn try_from_raw_u64(time_us: u64) -> Result<Self, EncodingError> {
604
602
let rem = time_us % MOD;