···
20
20
21
21
pub trait StoreReader: Clone {
22
22
fn get_total_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError>;
23
23
+
fn get_dids_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError>;
24
24
+
// fn get_records_by_collections(&self, collections: &)
23
25
}
···
203
203
}
204
204
205
205
impl StoreReader for FjallReader {
206
206
-
fn get_total_by_collection(
207
207
-
&self,
208
208
-
collection: &jetstream::exports::Nsid,
209
209
-
) -> Result<u64, StorageError> {
206
206
+
fn get_total_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError> {
210
207
// TODO: start from rollup
211
208
let full_range = LiveRecordsKey::range_from_cursor(Cursor::from_start())?;
212
209
let mut total = 0;
···
219
216
}
220
217
}
221
218
Ok(total)
219
219
+
}
220
220
+
fn get_dids_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError> {
221
221
+
// TODO: start from rollup
222
222
+
let full_range = LiveDidsKey::range_from_cursor(Cursor::from_start())?;
223
223
+
let mut total_estimate = cardinality_estimator::CardinalityEstimator::new();
224
224
+
for kv in self.rollups.range(full_range) {
225
225
+
let (key_bytes, val_bytes) = kv?;
226
226
+
let key = db_complete::<LiveDidsKey>(&key_bytes)?;
227
227
+
if key.collection() == collection {
228
228
+
let LiveDidsValue(estimate) = db_complete(&val_bytes)?;
229
229
+
total_estimate.merge(&estimate);
230
230
+
}
231
231
+
}
232
232
+
Ok(total_estimate.estimate() as u64)
222
233
}
223
234
}
224
235
···
1153
1164
record: &str,
1154
1165
rev: Option<&str>,
1155
1166
cid: Option<Cid>,
1167
1167
+
cursor: u64,
1156
1168
) -> Nsid {
1157
1169
let did = Did::new(did.to_string()).unwrap();
1158
1170
let collection = Nsid::new(collection.to_string()).unwrap();
···
1173
1185
};
1174
1186
1175
1187
let (commit, collection) =
1176
1176
-
UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(100))
1188
1188
+
UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor))
1177
1189
.unwrap();
1178
1190
1179
1191
self.batch
···
1188
1200
1189
1201
#[test]
1190
1202
fn test_hello() -> anyhow::Result<()> {
1191
1191
-
let (_, mut write) = fjall_db();
1203
1203
+
let (read, mut write) = fjall_db();
1192
1204
write.insert_batch(EventBatch::default())?;
1205
1205
+
let total = read.get_total_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?;
1206
1206
+
assert_eq!(total, 0);
1193
1207
Ok(())
1194
1208
}
1195
1209
···
1205
1219
"{}",
1206
1220
Some("rev-z"),
1207
1221
None,
1222
1222
+
100,
1208
1223
);
1209
1224
write.insert_batch(batch.batch)?;
1210
1225
1211
1226
let total = read.get_total_by_collection(&collection)?;
1212
1227
assert_eq!(total, 1);
1228
1228
+
let total = read.get_total_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?;
1229
1229
+
assert_eq!(total, 0);
1230
1230
+
1231
1231
+
let total = read.get_dids_by_collection(&collection)?;
1232
1232
+
assert_eq!(total, 1);
1233
1233
+
let total = read.get_dids_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?;
1234
1234
+
assert_eq!(total, 0);
1235
1235
+
1236
1236
+
// let records = read.get_records_by_collections(&vec![collection], 2);
1237
1237
+
// assert_eq!(records.len, 1);
1238
1238
+
1239
1239
+
// let records = read.get_records_by_collections(&vec![&Nsid::new("d.e.f".to_string()).unwrap()], 2);
1240
1240
+
// assert_eq!(records.len, 0);
1213
1241
1214
1242
Ok(())
1215
1243
}
···
180
180
impl LiveDidsKey {
181
181
pub fn range_from_cursor(cursor: Cursor) -> Result<Range<Vec<u8>>, EncodingError> {
182
182
let prefix = LiveDidsCursorPrefix::from_pair(Default::default(), cursor);
183
183
-
let end = Self::prefix_range_end(&prefix)?;
184
184
-
Ok(prefix.to_db_bytes()?..end.to_db_bytes()?)
183
183
+
Ok(prefix.range_to_prefix_end()?)
184
184
+
}
185
185
+
pub fn collection(&self) -> &Nsid {
186
186
+
&self.suffix
185
187
}
186
188
}
187
189
impl From<(Cursor, &Nsid)> for LiveDidsKey {