rollups work????
tests pass???????????
···
30
30
BadStateError(String),
31
31
#[error("Fjall error")]
32
32
FjallError(#[from] fjall::Error),
33
33
+
#[error("LSM-tree error (from fjall)")]
34
34
+
FjallLsmError(#[from] fjall::LsmError),
33
35
#[error("Bytes encoding error")]
34
36
EncodingError(#[from] EncodingError),
35
37
}
···
13
13
SeenCounter, TakeoffKey, WeekTruncatedCursor, WeeklyRollupKey,
14
14
};
15
15
use crate::{CommitAction, Did, EventBatch, Nsid, RecordKey, UFOsRecord};
16
16
-
use cardinality_estimator::CardinalityEstimator;
17
16
use fjall::{
18
17
Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle,
19
18
};
···
179
178
}
180
179
181
180
let reader = FjallReader {
181
181
+
keyspace: keyspace.clone(),
182
182
global: global.clone(),
183
183
feeds: feeds.clone(),
184
184
records: records.clone(),
···
198
198
199
199
#[derive(Clone)]
200
200
pub struct FjallReader {
201
201
+
keyspace: Keyspace,
201
202
global: PartitionHandle,
202
203
feeds: PartitionHandle,
203
204
records: PartitionHandle,
···
206
207
207
208
impl StoreReader for FjallReader {
208
209
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
209
209
-
// TODO: start from rollup
210
210
-
let full_range = LiveCountsKey::range_from_cursor(Cursor::from_start())?;
211
211
-
let mut total = 0;
212
212
-
let mut dids = CardinalityEstimator::new();
213
213
-
for kv in self.rollups.range(full_range) {
210
210
+
// 0. grab a snapshot in case rollups happen while we're working
211
211
+
let instant = self.keyspace.instant();
212
212
+
let global = self.global.snapshot_at(instant);
213
213
+
let rollups = self.rollups.snapshot_at(instant);
214
214
+
215
215
+
// 1. all-time counts
216
216
+
let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?;
217
217
+
let mut total_counts = rollups
218
218
+
.get(&all_time_key)?
219
219
+
.as_deref()
220
220
+
.map(db_complete::<CountsValue>)
221
221
+
.transpose()?
222
222
+
.unwrap_or_default();
223
223
+
224
224
+
// 2. live counts that haven't been rolled into all-time yet.
225
225
+
let rollup_cursor =
226
226
+
get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or(
227
227
+
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
228
228
+
)?;
229
229
+
230
230
+
let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?;
231
231
+
for kv in rollups.range(full_range) {
214
232
let (key_bytes, val_bytes) = kv?;
215
233
let key = db_complete::<LiveCountsKey>(&key_bytes)?;
216
234
if key.collection() == collection {
217
235
let counts = db_complete::<CountsValue>(&val_bytes)?;
218
218
-
total += counts.records();
219
219
-
dids.merge(counts.dids());
236
236
+
total_counts.merge(&counts);
220
237
}
221
238
}
222
222
-
Ok((total, dids.estimate() as u64))
239
239
+
Ok((
240
240
+
total_counts.records(),
241
241
+
total_counts.dids().estimate() as u64,
242
242
+
))
223
243
}
224
244
225
245
fn get_records_by_collections(
···
340
360
Some((delete_cursor, delete_key_bytes, delete_val_bytes)),
341
361
) => {
342
362
if timely_next_cursor < delete_cursor {
343
343
-
eprintln!("rollup until delete cursor");
344
363
self.rollup_live_counts(
345
364
timely_iter,
346
365
Some(delete_cursor),
347
366
MAX_BATCHED_ROLLUP_COUNTS,
348
367
)?
349
368
} else {
350
350
-
eprintln!("delete then come back for rollups");
351
369
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
352
370
}
353
371
}
354
372
(Some(_), None) => {
355
355
-
eprintln!("do as much rollup as we want");
356
373
self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?
357
374
}
358
375
(None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {
359
359
-
eprintln!("just delete an account");
360
376
self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)?
361
377
}
362
362
-
(None, None) => {
363
363
-
eprintln!("do nothing.");
364
364
-
0
365
365
-
}
378
378
+
(None, None) => 0,
366
379
};
367
380
368
381
Ok(cursors_stepped)
···
378
391
self.delete_account(&did)?;
379
392
let mut batch = self.keyspace.batch();
380
393
batch.remove(&self.queues, key_bytes);
381
381
-
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor.next())?;
394
394
+
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?;
382
395
batch.commit()?;
383
396
Ok(1)
384
397
}
···
466
479
batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?);
467
480
}
468
481
469
469
-
insert_batch_static_neu::<NewRollupCursorKey>(
470
470
-
&mut batch,
471
471
-
&self.global,
472
472
-
last_cursor.next(),
473
473
-
)?;
482
482
+
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
474
483
475
484
batch.commit()?;
476
485
Ok(cursors_advanced)
···
947
956
948
957
/// Get a value from a fixed key
949
958
fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> {
959
959
+
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
960
960
+
let value = global
961
961
+
.get(&key_bytes)?
962
962
+
.map(|value_bytes| db_complete(&value_bytes))
963
963
+
.transpose()?;
964
964
+
Ok(value)
965
965
+
}
966
966
+
967
967
+
/// Get a value from a fixed key
968
968
+
fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>(
969
969
+
global: &fjall::Snapshot,
970
970
+
) -> StorageResult<Option<V>> {
950
971
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
951
972
let value = global
952
973
.get(&key_bytes)?
···
1786
1807
let n = write.step_rollup()?;
1787
1808
assert_eq!(n, 2);
1788
1809
1810
1810
+
let n = write.step_rollup()?;
1811
1811
+
assert_eq!(n, 0);
1812
1812
+
1813
1813
+
Ok(())
1814
1814
+
}
1815
1815
+
1816
1816
+
#[test]
1817
1817
+
fn counts_before_and_after_rollup() -> anyhow::Result<()> {
1818
1818
+
let (read, mut write) = fjall_db();
1819
1819
+
1820
1820
+
let mut batch = TestBatch::default();
1821
1821
+
batch.create(
1822
1822
+
"did:plc:person-a",
1823
1823
+
"a.a.a",
1824
1824
+
"rkey-aaa",
1825
1825
+
"{}",
1826
1826
+
Some("rev-aaa"),
1827
1827
+
None,
1828
1828
+
10_000,
1829
1829
+
);
1830
1830
+
batch.create(
1831
1831
+
"did:plc:person-b",
1832
1832
+
"a.a.a",
1833
1833
+
"rkey-bbb",
1834
1834
+
"{}",
1835
1835
+
Some("rev-bbb"),
1836
1836
+
None,
1837
1837
+
10_001,
1838
1838
+
);
1839
1839
+
write.insert_batch(batch.batch)?;
1840
1840
+
1841
1841
+
let mut batch = TestBatch::default();
1842
1842
+
batch.delete_account("did:plc:person-a", 11_000);
1843
1843
+
write.insert_batch(batch.batch)?;
1844
1844
+
1845
1845
+
let mut batch = TestBatch::default();
1846
1846
+
batch.create(
1847
1847
+
"did:plc:person-a",
1848
1848
+
"a.a.a",
1849
1849
+
"rkey-aac",
1850
1850
+
"{}",
1851
1851
+
Some("rev-aac"),
1852
1852
+
None,
1853
1853
+
12_000,
1854
1854
+
);
1855
1855
+
write.insert_batch(batch.batch)?;
1856
1856
+
1857
1857
+
// before any rollup
1858
1858
+
let (records, dids) =
1859
1859
+
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
1860
1860
+
assert_eq!(records, 3);
1861
1861
+
assert_eq!(dids, 2);
1862
1862
+
1863
1863
+
// first batch rolled up
1864
1864
+
let n = write.step_rollup()?;
1865
1865
+
assert_eq!(n, 1);
1866
1866
+
1867
1867
+
let (records, dids) =
1868
1868
+
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
1869
1869
+
assert_eq!(records, 3);
1870
1870
+
assert_eq!(dids, 2);
1871
1871
+
1872
1872
+
// delete account rolled up
1873
1873
+
let n = write.step_rollup()?;
1874
1874
+
assert_eq!(n, 1);
1875
1875
+
1876
1876
+
let (records, dids) =
1877
1877
+
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
1878
1878
+
assert_eq!(records, 3);
1879
1879
+
assert_eq!(dids, 2);
1880
1880
+
1881
1881
+
// second batch rolled up
1882
1882
+
let n = write.step_rollup()?;
1883
1883
+
assert_eq!(n, 1);
1884
1884
+
1885
1885
+
let (records, dids) =
1886
1886
+
read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?;
1887
1887
+
assert_eq!(records, 3);
1888
1888
+
assert_eq!(dids, 2);
1889
1889
+
1890
1890
+
// no more rollups left
1789
1891
let n = write.step_rollup()?;
1790
1892
assert_eq!(n, 0);
1791
1893