···
30
30
self.total_seen += 1;
31
31
self.dids_estimate.insert(&commit.did);
32
32
}
33
33
+
// TODO: oops we can't truncate *delete* commits!!!!!!!
33
34
self.commits.truncate(limit - 1);
34
35
self.commits.push_front(commit);
35
36
}
···
18
18
19
19
pub trait StoreWriter {
20
20
fn insert_batch(&mut self, event_batch: EventBatch) -> StorageResult<()>;
21
21
+
fn trim_collection(&mut self, collection: &Nsid, limit: usize) -> StorageResult<()>;
21
22
}
22
23
23
24
pub trait StoreReader: Clone {
···
34
34
/// this is higher than [MAX_BATCHED_RW_EVENTS] because account-deletes can have lots of items
35
35
const MAX_BATCHED_RW_ITEMS: usize = 24;
36
36
37
37
+
const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds
38
38
+
37
39
#[derive(Clone)]
38
40
struct Db {
39
41
keyspace: Keyspace,
···
411
413
);
412
414
413
415
batch.commit()?;
416
416
+
Ok(())
417
417
+
}
418
418
+
fn trim_collection(
419
419
+
&mut self,
420
420
+
collection: &Nsid,
421
421
+
limit: usize,
422
422
+
// TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end)
423
423
+
) -> StorageResult<()> {
424
424
+
let mut dangling_feed_keys_cleaned = 0;
425
425
+
let mut records_deleted = 0;
426
426
+
427
427
+
let mut batch = self.keyspace.batch();
428
428
+
429
429
+
let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?;
430
430
+
let mut found = 0;
431
431
+
for kv in self.feeds.prefix(prefix).rev() {
432
432
+
let (key_bytes, val_bytes) = kv?;
433
433
+
let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
434
434
+
let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
435
435
+
let location_key: RecordLocationKey = (&feed_key, &feed_val).into();
436
436
+
let location_key_bytes = location_key.to_db_bytes()?;
437
437
+
438
438
+
let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else {
439
439
+
// record was deleted (hopefully)
440
440
+
batch.remove(&self.feeds, &location_key_bytes);
441
441
+
dangling_feed_keys_cleaned += 1;
442
442
+
continue;
443
443
+
};
444
444
+
445
445
+
let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?;
446
446
+
447
447
+
if meta.cursor() != feed_key.cursor() {
448
448
+
// older/different version
449
449
+
batch.remove(&self.feeds, &location_key_bytes);
450
450
+
dangling_feed_keys_cleaned += 1;
451
451
+
continue;
452
452
+
}
453
453
+
if meta.rev != feed_val.rev() {
454
454
+
// weird...
455
455
+
log::warn!("record lookup: cursor match but rev did not...? removing.");
456
456
+
batch.remove(&self.feeds, &location_key_bytes);
457
457
+
dangling_feed_keys_cleaned += 1;
458
458
+
continue;
459
459
+
}
460
460
+
461
461
+
if batch.len() >= MAX_BATCHED_CLEANUP_SIZE {
462
462
+
batch.commit()?;
463
463
+
batch = self.keyspace.batch();
464
464
+
}
465
465
+
466
466
+
found += 1;
467
467
+
if found <= limit {
468
468
+
continue;
469
469
+
}
470
470
+
471
471
+
batch.remove(&self.feeds, &location_key_bytes);
472
472
+
batch.remove(&self.records, &location_key_bytes);
473
473
+
records_deleted += 1;
474
474
+
}
475
475
+
476
476
+
batch.commit()?;
477
477
+
478
478
+
log::info!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records");
479
479
+
eprintln!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records");
414
480
Ok(())
415
481
}
416
482
}
···
1168
1234
#[cfg(test)]
1169
1235
mod tests {
1170
1236
use super::*;
1171
1171
-
use crate::{CollectionCommits, UFOsCommit};
1237
1237
+
use crate::UFOsCommit;
1172
1238
use jetstream::events::{CommitEvent, CommitOp};
1173
1239
use jetstream::exports::Cid;
1174
1240
use serde_json::value::RawValue;
···
1199
1265
rev: Option<&str>,
1200
1266
cid: Option<Cid>,
1201
1267
cursor: u64,
1268
1268
+
truncate_at: usize,
1202
1269
) -> Nsid {
1203
1270
let did = Did::new(did.to_string()).unwrap();
1204
1271
let collection = Nsid::new(collection.to_string()).unwrap();
···
1226
1293
.commits_by_nsid
1227
1294
.entry(collection.clone())
1228
1295
.or_default()
1229
1229
-
.truncating_insert(commit, 1);
1296
1296
+
.truncating_insert(commit, truncate_at);
1230
1297
1231
1298
collection
1232
1299
}
···
1239
1306
rev: Option<&str>,
1240
1307
cid: Option<Cid>,
1241
1308
cursor: u64,
1309
1309
+
truncate_at: usize,
1242
1310
) -> Nsid {
1243
1311
let did = Did::new(did.to_string()).unwrap();
1244
1312
let collection = Nsid::new(collection.to_string()).unwrap();
···
1266
1334
.commits_by_nsid
1267
1335
.entry(collection.clone())
1268
1336
.or_default()
1269
1269
-
.truncating_insert(commit, 1);
1337
1337
+
.truncating_insert(commit, truncate_at);
1270
1338
1271
1339
collection
1272
1340
}
···
1296
1364
.commits_by_nsid
1297
1365
.entry(collection.clone())
1298
1366
.or_default()
1299
1299
-
.truncating_insert(commit, 1);
1367
1367
+
.truncating_insert(commit, 10000); // eek this needs to be fixed!!
1300
1368
1301
1369
collection
1302
1370
}
···
1326
1394
Some("rev-z"),
1327
1395
None,
1328
1396
100,
1397
1397
+
1,
1329
1398
);
1330
1399
write.insert_batch(batch.batch)?;
1331
1400
···
1363
1432
Some("rev-a"),
1364
1433
None,
1365
1434
100,
1435
1435
+
1,
1366
1436
);
1367
1437
write.insert_batch(batch.batch)?;
1368
1438
···
1375
1445
Some("rev-z"),
1376
1446
None,
1377
1447
101,
1448
1448
+
1,
1378
1449
);
1379
1450
write.insert_batch(batch.batch)?;
1380
1451
···
1403
1474
Some("rev-a"),
1404
1475
None,
1405
1476
100,
1477
1477
+
1,
1406
1478
);
1407
1479
write.insert_batch(batch.batch)?;
1408
1480
···
1425
1497
1426
1498
Ok(())
1427
1499
}
1500
1500
+
1501
1501
+
#[test]
1502
1502
+
fn test_collection_trim() -> anyhow::Result<()> {
1503
1503
+
let (read, mut write) = fjall_db();
1504
1504
+
1505
1505
+
let mut batch = TestBatch::default();
1506
1506
+
batch.create(
1507
1507
+
"did:plc:inze6wrmsm7pjl7yta3oig77",
1508
1508
+
"a.a.a",
1509
1509
+
"rkey-aaa",
1510
1510
+
"{}",
1511
1511
+
Some("rev-aaa"),
1512
1512
+
None,
1513
1513
+
10_000,
1514
1514
+
100,
1515
1515
+
);
1516
1516
+
let mut last_b_cursor;
1517
1517
+
for i in 1..=10 {
1518
1518
+
last_b_cursor = 11_000 + i;
1519
1519
+
batch.create(
1520
1520
+
&format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3),
1521
1521
+
"a.a.b",
1522
1522
+
&format!("rkey-bbb-{i}"),
1523
1523
+
&format!(r#"{{"n": {i}}}"#),
1524
1524
+
Some(&format!("rev-bbb-{i}")),
1525
1525
+
None,
1526
1526
+
last_b_cursor,
1527
1527
+
100,
1528
1528
+
);
1529
1529
+
}
1530
1530
+
batch.create(
1531
1531
+
"did:plc:inze6wrmsm7pjl7yta3oig77",
1532
1532
+
"a.a.c",
1533
1533
+
"rkey-ccc",
1534
1534
+
"{}",
1535
1535
+
Some("rev-ccc"),
1536
1536
+
None,
1537
1537
+
12_000,
1538
1538
+
100,
1539
1539
+
);
1540
1540
+
1541
1541
+
write.insert_batch(batch.batch)?;
1542
1542
+
1543
1543
+
let records =
1544
1544
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1545
1545
+
assert_eq!(records.len(), 1);
1546
1546
+
let records =
1547
1547
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.b".to_string()).unwrap()], 100)?;
1548
1548
+
assert_eq!(records.len(), 10);
1549
1549
+
let records =
1550
1550
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.c".to_string()).unwrap()], 100)?;
1551
1551
+
assert_eq!(records.len(), 1);
1552
1552
+
let records =
1553
1553
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.d".to_string()).unwrap()], 100)?;
1554
1554
+
assert_eq!(records.len(), 0);
1555
1555
+
1556
1556
+
write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?;
1557
1557
+
write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6)?;
1558
1558
+
write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6)?;
1559
1559
+
write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?;
1560
1560
+
1561
1561
+
let records =
1562
1562
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.a".to_string()).unwrap()], 100)?;
1563
1563
+
assert_eq!(records.len(), 1);
1564
1564
+
let records =
1565
1565
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.b".to_string()).unwrap()], 100)?;
1566
1566
+
assert_eq!(records.len(), 6);
1567
1567
+
let records =
1568
1568
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.c".to_string()).unwrap()], 100)?;
1569
1569
+
assert_eq!(records.len(), 1);
1570
1570
+
let records =
1571
1571
+
read.get_records_by_collections(&vec![&Nsid::new("a.a.d".to_string()).unwrap()], 100)?;
1572
1572
+
assert_eq!(records.len(), 0);
1573
1573
+
1574
1574
+
Ok(())
1575
1575
+
}
1576
1576
+
1577
1577
+
// TODO: test that delete commits don't get truncated
1428
1578
}