Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

remove old ufos storage code

+36 -948
+21 -655
ufos/src/storage_fjall.rs
··· 1 - use crate::consumer::LimitedBatch; 2 - use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr}; 1 + use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr}; 3 2 use crate::error::StorageError; 4 3 use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter}; 5 4 use crate::store_types::{ 6 - AllTimeRollupKey, ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, 7 - ByIdKey, ByIdValue, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 5 + AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 8 6 HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, 9 - JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, ModCursorKey, ModCursorValue, 10 - ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue, NewRollupCursorKey, 7 + JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 11 8 NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 12 - RecordLocationMeta, RecordLocationVal, RecordRawValue, RollupCursorKey, RollupCursorValue, 13 - SeenCounter, TakeoffKey, TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, 9 + RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue, 10 + WeekTruncatedCursor, WeeklyRollupKey, 14 11 }; 15 - use crate::{ 16 - CommitAction, ConsumerInfo, Did, EventBatch, Nsid, RecordKey, TopCollections, UFOsRecord, 17 - }; 12 + use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord}; 18 13 use async_trait::async_trait; 19 - use fjall::{ 20 - Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, 21 - }; 14 + use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; 22 15 use jetstream::events::Cursor; 23 16 use schemars::JsonSchema; 24 17 use serde::Serialize; 25 18 use std::collections::HashMap; 26 - use std::path::{Path, PathBuf}; 27 - use std::time::{Duration, Instant, SystemTime}; 28 - use tokio::sync::mpsc::Receiver; 29 - use tokio::time::{interval_at, sleep}; 30 - 31 - /// Commit the RW batch immediately if this number of events have been read off the mod queue 32 - const MAX_BATCHED_RW_EVENTS: usize = 18; 33 - 34 - /// Commit the RW batch immediately if this number of records is reached 35 - /// 36 - /// there are probably some efficiency gains for higher, at cost of more memory. 37 - /// interestingly, this kind of sets a priority weight for the RW loop: 38 - /// - doing more work whenever scheduled means getting more CPU time in general 39 - /// 40 - /// this is higher than [MAX_BATCHED_RW_EVENTS] because account-deletes can have lots of items 41 - const MAX_BATCHED_RW_ITEMS: usize = 24; 19 + use std::path::Path; 20 + use std::time::SystemTime; 42 21 43 22 const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds 44 23 const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 45 24 const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 46 - 47 - #[derive(Clone)] 48 - struct Db { 49 - keyspace: Keyspace, 50 - global: PartitionHandle, 51 - } 52 25 53 26 /// 54 27 /// new data format, roughly: ··· 832 805 } 833 806 } 834 807 835 - #[derive(Clone)] 836 - pub struct Storage { 837 - /// horrible: gate all db access behind this to force global serialization to avoid deadlock 838 - db: Db, 839 - } 840 - 841 - impl Storage { 842 - fn init_self(path: impl AsRef<Path>) -> anyhow::Result<Self> { 843 - let keyspace = Config::new(path).fsync_ms(Some(4_000)).open()?; 844 - let global = keyspace.open_partition( 845 - "default", 846 - PartitionCreateOptions::default().compression(CompressionType::None), 847 - )?; 848 - Ok(Self { 849 - db: Db { keyspace, global }, 850 - }) 851 - } 852 - 853 - pub async fn open( 854 - path: PathBuf, 855 - endpoint: &str, 856 - force_endpoint: bool, 857 - ) -> anyhow::Result<(Self, Option<Cursor>)> { 858 - let me = tokio::task::spawn_blocking(move || Storage::init_self(path)).await??; 859 - 860 - let js_cursor = me.get_jetstream_cursor().await?; 861 - 862 - if js_cursor.is_some() { 863 - let Some(JetstreamEndpointValue(stored)) = me.get_jetstream_endpoint().await? else { 864 - anyhow::bail!("found cursor but missing js_endpoint, refusing to start."); 865 - }; 866 - if stored != endpoint { 867 - if force_endpoint { 868 - log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 869 - me.set_jetstream_endpoint(endpoint).await?; 870 - } else { 871 - anyhow::bail!("stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start."); 872 - } 873 - } 874 - } else { 875 - me.set_jetstream_endpoint(endpoint).await?; 876 - } 877 - 878 - Ok((me, js_cursor)) 879 - } 880 - 881 - /// Jetstream event batch receiver: writes without any reads 882 - /// 883 - /// Events that require reads like record updates or delets are written to a queue 884 - pub async fn receive(&self, mut receiver: Receiver<LimitedBatch>) -> anyhow::Result<()> { 885 - // TODO: see rw_loop: enforce single-thread. 886 - loop { 887 - let t_sleep = Instant::now(); 888 - sleep(Duration::from_secs_f64(0.08)).await; // TODO: minimize during replay 889 - let _slept_for = t_sleep.elapsed(); 890 - let _queue_size = receiver.len(); 891 - 892 - if let Some(event_batch) = receiver.recv().await { 893 - log::info!("write: received write batch"); 894 - let batch_summary = summarize_batch(&event_batch); 895 - log::info!("{}", batch_summary); 896 - 897 - // todo!(); 898 - // let last = event_batch.last_jetstream_cursor; // TODO: get this from the data. track last in consumer. compute or track first. 899 - 900 - // let db = &self.db; 901 - // let keyspace = db.keyspace.clone(); 902 - // let global = db.global.clone(); 903 - 904 - // let writer_t0 = Instant::now(); 905 - // log::trace!("spawn_blocking for write batch"); 906 - // tokio::task::spawn_blocking(move || { 907 - // DBWriter { 908 - // keyspace, 909 - // global, 910 - // } 911 - // .write_batch(event_batch, last) 912 - // }) 913 - // .await??; 914 - // log::trace!("write: back from blocking task, successfully wrote batch"); 915 - // let wrote_for = writer_t0.elapsed(); 916 - 917 - // println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}"); 918 - } else { 919 - log::error!("store consumer: receive channel failed (dropped/closed?)"); 920 - anyhow::bail!("receive channel closed"); 921 - } 922 - } 923 - } 924 - 925 - /// Read-write loop reads from the queue for record-modifying events and does rollups 926 - pub async fn rw_loop(&self) -> anyhow::Result<()> { 927 - // TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time. 928 - 929 - let now = tokio::time::Instant::now(); 930 - let mut time_to_update_events = interval_at(now, Duration::from_secs_f64(0.051)); 931 - let mut time_to_trim_surplus = interval_at( 932 - now + Duration::from_secs_f64(1.0), 933 - Duration::from_secs_f64(3.3), 934 - ); 935 - let mut time_to_roll_up = interval_at( 936 - now + Duration::from_secs_f64(0.4), 937 - Duration::from_secs_f64(0.9), 938 - ); 939 - 940 - time_to_update_events.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 941 - time_to_trim_surplus.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 942 - time_to_roll_up.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 943 - 944 - loop { 945 - let keyspace = self.db.keyspace.clone(); 946 - let global = self.db.global.clone(); 947 - tokio::select! { 948 - _ = time_to_update_events.tick() => { 949 - log::debug!("beginning event update task"); 950 - tokio::task::spawn_blocking(move || Self::update_events(keyspace, global)).await??; 951 - log::debug!("finished event update task"); 952 - } 953 - _ = time_to_trim_surplus.tick() => { 954 - log::debug!("beginning record trim task"); 955 - tokio::task::spawn_blocking(move || Self::trim_old_events(keyspace, global)).await??; 956 - log::debug!("finished record trim task"); 957 - } 958 - _ = time_to_roll_up.tick() => { 959 - log::debug!("beginning rollup task"); 960 - tokio::task::spawn_blocking(move || Self::roll_up_counts(keyspace, global)).await??; 961 - log::debug!("finished rollup task"); 962 - }, 963 - } 964 - } 965 - } 966 - 967 - fn update_events(keyspace: Keyspace, global: PartitionHandle) -> anyhow::Result<()> { 968 - // TODO: lock this to prevent concurrent rw 969 - 970 - log::trace!("rw: getting rw cursor..."); 971 - let mod_cursor = 972 - get_static::<ModCursorKey, ModCursorValue>(&global)?.unwrap_or(Cursor::from_start()); 973 - let range = ModQueueItemKey::new(mod_cursor).range_to_prefix_end()?; 974 - 975 - let mut db_batch = keyspace.batch(); 976 - let mut batched_rw_items = 0; 977 - let mut any_tasks_found = false; 978 - 979 - log::trace!("rw: iterating newer rw items..."); 980 - 981 - for (i, pair) in global.range(range.clone()).enumerate() { 982 - log::trace!("rw: iterating {i}"); 983 - any_tasks_found = true; 984 - 985 - if i >= MAX_BATCHED_RW_EVENTS { 986 - break; 987 - } 988 - 989 - let (key_bytes, val_bytes) = pair?; 990 - let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) { 991 - Ok(k) => k, 992 - Err(EncodingError::WrongStaticPrefix(_, _)) => { 993 - panic!("wsp: mod queue empty."); 994 - } 995 - otherwise => otherwise?, 996 - }; 997 - 998 - let mod_value: ModQueueItemValue = 999 - db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?; 1000 - 1001 - log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}"); 1002 - batched_rw_items += DBWriter { 1003 - global: global.clone(), 1004 - } 1005 - .write_rw(&mut db_batch, mod_key, mod_value)?; 1006 - log::trace!("rw: iterating {i}: back from batcher."); 1007 - 1008 - if batched_rw_items >= MAX_BATCHED_RW_ITEMS { 1009 - log::trace!("rw: iterating {i}: batch big enough, breaking out."); 1010 - break; 1011 - } 1012 - } 1013 - 1014 - if !any_tasks_found { 1015 - log::trace!("rw: skipping batch commit since apparently no items were added (this is normal, skipping is new)"); 1016 - // TODO: is this missing a chance to update the cursor? 1017 - return Ok(()); 1018 - } 1019 - 1020 - log::info!("rw: committing rw batch with {batched_rw_items} items (items != total inserts/deletes)..."); 1021 - let r = db_batch.commit(); 1022 - log::info!("rw: commit result: {r:?}"); 1023 - r?; 1024 - Ok(()) 1025 - } 1026 - 1027 - fn trim_old_events(_keyspace: Keyspace, _global: PartitionHandle) -> anyhow::Result<()> { 1028 - // we *could* keep a collection dirty list in memory to reduce the amount of searching here 1029 - // actually can we use seen_by_js_cursor_collection?? 1030 - // * ["seen_by_js_cursor_collection"|js_cursor|collection] => u64 1031 - // -> the rollup cursor could handle trims. 1032 - 1033 - // key structure: 1034 - // * ["by_collection"|collection|js_cursor] => [did|rkey|record] 1035 - 1036 - // *new* strategy: 1037 - // 1. collect `collection`s seen during rollup 1038 - // 2. for each collected collection: 1039 - // 3. set up prefix iterator 1040 - // 4. reverse and try to walk back MAX_RETAINED steps 1041 - // 5. if we didn't end iteration yet, start deleting records (and their forward links) until we get to the end 1042 - 1043 - // oh we might be able to walk *forward* instead of reverse from the cursor, which might help avoid iterating over a lot of deletion tombstones 1044 - 1045 - // ... we can probably do even better with cursor ranges too, since we'll have a cursor range from rollup and it's in the by_collection key 1046 - 1047 - Ok(()) 1048 - } 1049 - 1050 - fn roll_up_counts(_keyspace: Keyspace, _global: PartitionHandle) -> anyhow::Result<()> { 1051 - Ok(()) 1052 - } 1053 - 1054 - pub async fn get_collection_records( 1055 - &self, 1056 - _collection: &Nsid, 1057 - _limit: usize, 1058 - ) -> anyhow::Result<Vec<()>> { 1059 - todo!(); 1060 - // let global = self.db.global.clone(); 1061 - // let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?; 1062 - // tokio::task::spawn_blocking(move || { 1063 - // let mut output = Vec::new(); 1064 - 1065 - // for pair in global.prefix(&prefix).rev().take(limit) { 1066 - // let (k_bytes, v_bytes) = pair?; 1067 - // let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into(); 1068 - // let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into(); 1069 - // output.push(CreateRecord { 1070 - // did, 1071 - // rkey, 1072 - // record, 1073 - // cursor, 1074 - // }) 1075 - // } 1076 - // Ok(output) 1077 - // }) 1078 - // .await? 1079 - } 1080 - 1081 - pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> { 1082 - let db = &self.db; 1083 - let keyspace = db.keyspace.clone(); 1084 - let global = db.global.clone(); 1085 - tokio::task::spawn_blocking(move || { 1086 - Ok(StorageInfo { 1087 - keyspace_disk_space: keyspace.disk_space(), 1088 - keyspace_journal_count: keyspace.journal_count(), 1089 - keyspace_sequence: keyspace.instant(), 1090 - global_approximate_len: global.approximate_len(), 1091 - }) 1092 - }) 1093 - .await? 1094 - } 1095 - 1096 - pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> { 1097 - let global = self.db.global.clone(); 1098 - let collection = collection.clone(); 1099 - tokio::task::spawn_blocking(move || get_unrolled_collection_seen(&global, collection)) 1100 - .await? 1101 - } 1102 - 1103 - pub async fn get_top_collections(&self) -> anyhow::Result<HashMap<String, u64>> { 1104 - let global = self.db.global.clone(); 1105 - tokio::task::spawn_blocking(move || get_unrolled_top_collections(&global)).await? 1106 - } 1107 - 1108 - pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> { 1109 - let global = self.db.global.clone(); 1110 - tokio::task::spawn_blocking(move || { 1111 - get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&global) 1112 - }) 1113 - .await? 1114 - } 1115 - 1116 - async fn set_jetstream_endpoint(&self, endpoint: &str) -> anyhow::Result<()> { 1117 - let global = self.db.global.clone(); 1118 - let endpoint = endpoint.to_string(); 1119 - tokio::task::spawn_blocking(move || { 1120 - insert_static::<JetstreamEndpointKey>(&global, JetstreamEndpointValue(endpoint)) 1121 - }) 1122 - .await? 1123 - } 1124 - 1125 - pub async fn get_jetstream_cursor(&self) -> anyhow::Result<Option<Cursor>> { 1126 - let global = self.db.global.clone(); 1127 - tokio::task::spawn_blocking(move || { 1128 - get_static::<JetstreamCursorKey, JetstreamCursorValue>(&global) 1129 - }) 1130 - .await? 1131 - } 1132 - 1133 - pub async fn get_mod_cursor(&self) -> anyhow::Result<Option<Cursor>> { 1134 - let global = self.db.global.clone(); 1135 - tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&global)) 1136 - .await? 1137 - } 1138 - } 1139 - 1140 - /// Get a value from a fixed key 1141 - fn get_static<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> anyhow::Result<Option<V>> { 1142 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1143 - let value = global 1144 - .get(&key_bytes)? 1145 - .map(|value_bytes| db_complete(&value_bytes)) 1146 - .transpose()?; 1147 - Ok(value) 1148 - } 1149 - 1150 808 /// Get a value from a fixed key 1151 809 fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> StorageResult<Option<V>> { 1152 810 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; ··· 1170 828 } 1171 829 1172 830 /// Set a value to a fixed key 1173 - fn insert_static<K: StaticStr>( 1174 - global: &PartitionHandle, 1175 - value: impl DbBytes, 1176 - ) -> anyhow::Result<()> { 1177 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1178 - let value_bytes = value.to_db_bytes()?; 1179 - global.insert(&key_bytes, &value_bytes)?; 1180 - Ok(()) 1181 - } 1182 - 1183 - /// Set a value to a fixed key 1184 831 fn insert_static_neu<K: StaticStr>( 1185 832 global: &PartitionHandle, 1186 833 value: impl DbBytes, ··· 1192 839 } 1193 840 1194 841 /// Set a value to a fixed key 1195 - fn insert_batch_static<K: StaticStr>( 1196 - batch: &mut FjallBatch, 1197 - global: &PartitionHandle, 1198 - value: impl DbBytes, 1199 - ) -> anyhow::Result<()> { 1200 - let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1201 - let value_bytes = value.to_db_bytes()?; 1202 - batch.insert(global, &key_bytes, &value_bytes); 1203 - Ok(()) 1204 - } 1205 - 1206 - /// Set a value to a fixed key 1207 842 fn insert_batch_static_neu<K: StaticStr>( 1208 843 batch: &mut FjallBatch, 1209 844 global: &PartitionHandle, ··· 1215 850 Ok(()) 1216 851 } 1217 852 1218 - /// Remove a key 1219 - fn remove_batch<K: DbBytes>( 1220 - batch: &mut FjallBatch, 1221 - global: &PartitionHandle, 1222 - key: K, 1223 - ) -> Result<(), EncodingError> { 1224 - let key_bytes = key.to_db_bytes()?; 1225 - batch.remove(global, &key_bytes); 1226 - Ok(()) 1227 - } 1228 - 1229 - /// Get stats that haven't been rolled up yet 1230 - fn get_unrolled_collection_seen(global: &PartitionHandle, collection: Nsid) -> anyhow::Result<u64> { 1231 - let range = 1232 - if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? { 1233 - eprintln!("found existing cursor"); 1234 - let key: ByCursorSeenKey = cursor_value.into(); 1235 - key.range_from()? 1236 - } else { 1237 - eprintln!("cursor from start."); 1238 - ByCursorSeenKey::full_range()? 1239 - }; 1240 - 1241 - let mut collection_total = 0; 1242 - 1243 - let mut scanned = 0; 1244 - let mut rolled = 0; 1245 - 1246 - for pair in global.range(range) { 1247 - let (key_bytes, value_bytes) = pair?; 1248 - let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 1249 - let val = db_complete::<ByCursorSeenValue>(&value_bytes)?; 1250 - 1251 - if *key.collection() == collection { 1252 - let SeenCounter(n) = val; 1253 - collection_total += n; 1254 - rolled += 1; 1255 - } 1256 - scanned += 1; 1257 - } 1258 - 1259 - eprintln!("scanned: {scanned}, rolled: {rolled}"); 1260 - 1261 - Ok(collection_total) 1262 - } 1263 - 1264 - fn get_unrolled_top_collections(global: &PartitionHandle) -> anyhow::Result<HashMap<String, u64>> { 1265 - let range = 1266 - if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? { 1267 - eprintln!("found existing cursor"); 1268 - let key: ByCursorSeenKey = cursor_value.into(); 1269 - key.range_from()? 1270 - } else { 1271 - eprintln!("cursor from start."); 1272 - ByCursorSeenKey::full_range()? 1273 - }; 1274 - 1275 - let mut res = HashMap::new(); 1276 - let mut scanned = 0; 1277 - 1278 - for pair in global.range(range) { 1279 - let (key_bytes, value_bytes) = pair?; 1280 - let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 1281 - let SeenCounter(n) = db_complete(&value_bytes)?; 1282 - 1283 - *res.entry(key.collection().to_string()).or_default() += n; 1284 - 1285 - scanned += 1; 1286 - } 1287 - 1288 - eprintln!("scanned: {scanned} seen-counts."); 1289 - 1290 - Ok(res) 1291 - } 1292 - 1293 - impl DBWriter { 1294 - fn write_rw( 1295 - self, 1296 - db_batch: &mut FjallBatch, 1297 - mod_key: ModQueueItemKey, 1298 - mod_value: ModQueueItemValue, 1299 - ) -> anyhow::Result<usize> { 1300 - // update the current rw cursor to this item (atomically with the batch if it succeeds) 1301 - let mod_cursor: Cursor = (&mod_key).into(); 1302 - insert_batch_static::<ModCursorKey>(db_batch, &self.global, mod_cursor)?; 1303 - 1304 - let items_modified = match mod_value { 1305 - ModQueueItemValue::DeleteAccount(did) => { 1306 - log::trace!("rw: batcher: delete account..."); 1307 - let (items, finished) = self.delete_account(db_batch, mod_cursor, did)?; 1308 - log::trace!("rw: batcher: back from delete account (finished? {finished})"); 1309 - if finished { 1310 - // only remove the queued rw task if we have actually completed its account removal work 1311 - remove_batch::<ModQueueItemKey>(db_batch, &self.global, mod_key)?; 1312 - items + 1 1313 - } else { 1314 - items 1315 - } 1316 - } 1317 - ModQueueItemValue::DeleteRecord(did, collection, rkey) => { 1318 - log::trace!("rw: batcher: delete record..."); 1319 - let items = self.delete_record(db_batch, mod_cursor, did, collection, rkey)?; 1320 - log::trace!("rw: batcher: back from delete record"); 1321 - remove_batch::<ModQueueItemKey>(db_batch, &self.global, mod_key)?; 1322 - items + 1 1323 - } 1324 - ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 1325 - let items = 1326 - self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?; 1327 - remove_batch::<ModQueueItemKey>(db_batch, &self.global, mod_key)?; 1328 - items + 1 1329 - } 1330 - }; 1331 - Ok(items_modified) 1332 - } 1333 - 1334 - fn update_record( 1335 - &self, 1336 - db_batch: &mut FjallBatch, 1337 - cursor: Cursor, 1338 - did: Did, 1339 - collection: Nsid, 1340 - rkey: RecordKey, 1341 - record: serde_json::Value, 1342 - ) -> anyhow::Result<usize> { 1343 - // 1. delete any existing versions older than us 1344 - let items_deleted = self.delete_record( 1345 - db_batch, 1346 - cursor, 1347 - did.clone(), 1348 - collection.clone(), 1349 - rkey.clone(), 1350 - )?; 1351 - 1352 - // 2. insert the updated version, at our new cursor 1353 - self.add_record(db_batch, cursor, did, collection, rkey, record)?; 1354 - 1355 - let items_total = items_deleted + 1; 1356 - Ok(items_total) 1357 - } 1358 - 1359 - fn delete_record( 1360 - &self, 1361 - db_batch: &mut FjallBatch, 1362 - cursor: Cursor, 1363 - did: Did, 1364 - collection: Nsid, 1365 - rkey: RecordKey, 1366 - ) -> anyhow::Result<usize> { 1367 - let key_prefix_bytes = 1368 - ByIdKey::record_prefix(did.clone(), collection.clone(), rkey.clone()).to_db_bytes()?; 1369 - 1370 - // put the cursor of the actual deletion event in to prevent prefix iter from touching newer docs 1371 - let key_limit = ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?; 1372 - 1373 - let mut items_removed = 0; 1374 - 1375 - log::trace!("delete_record: iterate over up to current cursor..."); 1376 - 1377 - for (i, pair) in self.global.range(key_prefix_bytes..key_limit).enumerate() { 1378 - log::trace!("delete_record iter {i}: found"); 1379 - // find all (hopefully 1) 1380 - let (key_bytes, _) = pair?; 1381 - let key = db_complete::<ByIdKey>(&key_bytes)?; 1382 - let found_cursor = key.cursor(); 1383 - if found_cursor > cursor { 1384 - // we are *only* allowed to delete records that came before the record delete event 1385 - // log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}"); 1386 - panic!("wtf, found newer version than cursor limit we tried to set."); 1387 - // break; 1388 - } 1389 - 1390 - // remove the by_id entry 1391 - db_batch.remove(&self.global, key_bytes); 1392 - 1393 - // remove its record sample 1394 - let by_collection_key_bytes = 1395 - ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?; 1396 - db_batch.remove(&self.global, by_collection_key_bytes); 1397 - 1398 - items_removed += 1; 1399 - } 1400 - 1401 - // if items_removed > 1 { 1402 - // log::trace!("odd, removed {items_removed} records for one record removal:"); 1403 - // for (i, pair) in self.global.prefix(&key_prefix_bytes).enumerate() { 1404 - // // find all (hopefully 1) 1405 - // let (key_bytes, _) = pair?; 1406 - // let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor(); 1407 - // if found_cursor > cursor { 1408 - // break; 1409 - // } 1410 - 1411 - // let key = db_complete::<ByIdKey>(&key_bytes)?; 1412 - // log::trace!(" {i}: key {key:?}"); 1413 - // } 1414 - // } 1415 - Ok(items_removed) 1416 - } 1417 - 1418 - fn delete_account( 1419 - &self, 1420 - db_batch: &mut FjallBatch, 1421 - cursor: Cursor, 1422 - did: Did, 1423 - ) -> anyhow::Result<(usize, bool)> { 1424 - let key_prefix_bytes = ByIdKey::did_prefix(did).to_db_bytes()?; 1425 - 1426 - let mut items_added = 0; 1427 - 1428 - for pair in self.global.prefix(&key_prefix_bytes) { 1429 - let (key_bytes, _) = pair?; 1430 - 1431 - let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into(); 1432 - if found_cursor > cursor { 1433 - log::trace!( 1434 - "delete account: found (and ignoring) newer records than the delete event??" 1435 - ); 1436 - continue; 1437 - } 1438 - 1439 - // remove the by_id entry 1440 - db_batch.remove(&self.global, key_bytes); 1441 - 1442 - // remove its record sample 1443 - let by_collection_key_bytes = 1444 - ByCollectionKey::new(collection, found_cursor).to_db_bytes()?; 1445 - db_batch.remove(&self.global, by_collection_key_bytes); 1446 - 1447 - items_added += 1; 1448 - if items_added >= MAX_BATCHED_RW_ITEMS { 1449 - return Ok((items_added, false)); // there might be more records but we've done enough for this batch 1450 - } 1451 - } 1452 - 1453 - Ok((items_added, true)) 1454 - } 1455 - 1456 - fn add_record( 1457 - &self, 1458 - db_batch: &mut FjallBatch, 1459 - cursor: Cursor, 1460 - did: Did, 1461 - collection: Nsid, 1462 - rkey: RecordKey, 1463 - record: serde_json::Value, 1464 - ) -> anyhow::Result<()> { 1465 - // ["by_collection"|collection|js_cursor] => [did|rkey|record] 1466 - db_batch.insert( 1467 - &self.global, 1468 - ByCollectionKey::new(collection.clone(), cursor).to_db_bytes()?, 1469 - ByCollectionValue::new(did.clone(), rkey.clone(), record).to_db_bytes()?, 1470 - ); 1471 - 1472 - // ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 1473 - db_batch.insert( 1474 - &self.global, 1475 - ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?, 1476 - ByIdValue::default().to_db_bytes()?, 1477 - ); 1478 - 1479 - Ok(()) 1480 - } 1481 - } 1482 - 1483 853 #[derive(Debug, serde::Serialize, schemars::JsonSchema)] 1484 854 pub struct StorageInfo { 1485 855 pub keyspace_disk_space: u64, ··· 1488 858 pub global_approximate_len: usize, 1489 859 } 1490 860 1491 - struct DBWriter { 1492 - global: PartitionHandle, 1493 - } 1494 - 1495 861 ////////// temp stuff to remove: 1496 862 1497 - fn summarize_batch<const LIMIT: usize>(batch: &EventBatch<LIMIT>) -> String { 1498 - format!( 1499 - "batch of {: >3} samples from {: >4} records in {: >2} collections from ~{: >4} DIDs, {} acct removes, cursor {: <12?}", 1500 - batch.total_records(), 1501 - batch.total_seen(), 1502 - batch.total_collections(), 1503 - batch.estimate_dids(), 1504 - batch.account_removes(), 1505 - batch.latest_cursor().map(|c| c.elapsed()), 1506 - ) 1507 - } 863 + // fn summarize_batch<const LIMIT: usize>(batch: &EventBatch<LIMIT>) -> String { 864 + // format!( 865 + // "batch of {: >3} samples from {: >4} records in {: >2} collections from ~{: >4} DIDs, {} acct removes, cursor {: <12?}", 866 + // batch.total_records(), 867 + // batch.total_seen(), 868 + // batch.total_collections(), 869 + // batch.estimate_dids(), 870 + // batch.account_removes(), 871 + // batch.latest_cursor().map(|c| c.elapsed()), 872 + // ) 873 + // } 1508 874 1509 875 #[cfg(test)] 1510 876 mod tests { 1511 877 use super::*; 1512 - use crate::{DeleteAccount, UFOsCommit}; 878 + use crate::{DeleteAccount, RecordKey, UFOsCommit}; 1513 879 use jetstream::events::{CommitEvent, CommitOp}; 1514 880 use jetstream::exports::Cid; 1515 881 use serde_json::value::RawValue;
+15 -293
ufos/src/store_types.rs
··· 1 1 use crate::db_types::{ 2 - DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, SerdeBytes, StaticStr, UseBincodePlz, 2 + DbBytes, DbConcat, DbStaticStr, EncodingError, SerdeBytes, StaticStr, UseBincodePlz, 3 3 }; 4 4 use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit}; 5 5 use bincode::{Decode, Encode}; ··· 15 15 } 16 16 } 17 17 pub type JetstreamCursorValue = Cursor; 18 - 19 - /// key format: ["mod_cursor"] 20 - #[derive(Debug, PartialEq)] 21 - pub struct ModCursorKey {} 22 - impl StaticStr for ModCursorKey { 23 - fn static_str() -> &'static str { 24 - "mod_cursor" 25 - } 26 - } 27 - pub type ModCursorValue = Cursor; 28 - 29 - /// key format: ["rollup_cursor"] 30 - #[derive(Debug, PartialEq)] 31 - pub struct RollupCursorKey {} 32 - impl StaticStr for RollupCursorKey { 33 - fn static_str() -> &'static str { 34 - "rollup_cursor" 35 - } 36 - } 37 - /// value format: [rollup_cursor(Cursor)|collection(Nsid)] 38 - pub type RollupCursorValue = DbConcat<Cursor, Nsid>; 39 18 40 19 /// key format: ["rollup_cursor"] 41 20 #[derive(Debug, PartialEq)] ··· 340 319 } 341 320 pub type AllTimeRollupVal = CountsValue; 342 321 343 - /////////// old stuff, probably: ///////////// 344 - 345 - #[derive(Debug, Clone, Encode, Decode)] 346 - pub struct SeenCounter(pub u64); 347 - impl SeenCounter { 348 - pub fn new(n: u64) -> Self { 349 - Self(n) 350 - } 351 - } 352 - impl UseBincodePlz for SeenCounter {} 353 - 354 - #[derive(Debug, PartialEq)] 355 - pub struct _ByCollectionStaticStr {} 356 - impl StaticStr for _ByCollectionStaticStr { 357 - fn static_str() -> &'static str { 358 - "by_collection" 359 - } 360 - } 361 - type ByCollectionPrefix = DbStaticStr<_ByCollectionStaticStr>; 362 - /// key format: ["by_collection"|collection|js_cursor] 363 - pub type ByCollectionKey = DbConcat<DbConcat<ByCollectionPrefix, Nsid>, Cursor>; 364 - impl ByCollectionKey { 365 - pub fn new(collection: Nsid, cursor: Cursor) -> Self { 366 - Self { 367 - prefix: DbConcat::from_pair(Default::default(), collection), 368 - suffix: cursor, 369 - } 370 - } 371 - pub fn prefix_from_collection(collection: Nsid) -> Result<Vec<u8>, EncodingError> { 372 - DbConcat::from_pair(ByCollectionPrefix::default(), collection).to_db_bytes() 373 - } 374 - } 375 - impl From<ByCollectionKey> for (Nsid, Cursor) { 376 - fn from(k: ByCollectionKey) -> Self { 377 - (k.prefix.suffix, k.suffix) 378 - } 379 - } 380 - 381 - #[derive(Debug, PartialEq, Encode, Decode)] 382 - pub struct ByCollectionValueInfo { 383 - #[bincode(with_serde)] 384 - pub did: Did, 385 - #[bincode(with_serde)] 386 - pub rkey: RecordKey, 387 - } 388 - impl UseBincodePlz for ByCollectionValueInfo {} 389 - /// value format: contains did, rkey, record 390 - pub type ByCollectionValue = DbConcat<ByCollectionValueInfo, serde_json::Value>; 391 - impl ByCollectionValue { 392 - pub fn new(did: Did, rkey: RecordKey, record: serde_json::Value) -> Self { 393 - Self { 394 - prefix: ByCollectionValueInfo { did, rkey }, 395 - suffix: record, 396 - } 397 - } 398 - } 399 - impl From<ByCollectionValue> for (Did, RecordKey, serde_json::Value) { 400 - fn from(v: ByCollectionValue) -> Self { 401 - (v.prefix.did, v.prefix.rkey, v.suffix) 402 - } 403 - } 404 - 405 - #[derive(Debug, PartialEq)] 406 - pub struct _ByIdStaticStr {} 407 - impl StaticStr for _ByIdStaticStr { 408 - fn static_str() -> &'static str { 409 - "by_id" 410 - } 411 - } 412 - type ByIdStaticPrefix = DbStaticStr<_ByIdStaticStr>; 413 - pub type ByIdDidPrefix = DbConcat<ByIdStaticPrefix, Did>; 414 - pub type ByIdCollectionPrefix = DbConcat<ByIdDidPrefix, Nsid>; 415 - pub type ByIdRecordPrefix = DbConcat<ByIdCollectionPrefix, RecordKey>; 416 - /// look up records by user or directly, instead of by collections 417 - /// 418 - /// required to support deletes; did first prefix for account deletes. 419 - /// key format: ["by_id"|did|collection|rkey|js_cursor] 420 - pub type ByIdKey = DbConcat<ByIdRecordPrefix, Cursor>; 421 - impl ByIdKey { 422 - pub fn new(did: Did, collection: Nsid, rkey: RecordKey, cursor: Cursor) -> Self { 423 - Self::from_pair(Self::record_prefix(did, collection, rkey), cursor) 424 - } 425 - pub fn record_prefix(did: Did, collection: Nsid, rkey: RecordKey) -> ByIdRecordPrefix { 426 - ByIdRecordPrefix { 427 - prefix: ByIdCollectionPrefix { 428 - prefix: Self::did_prefix(did), 429 - suffix: collection, 430 - }, 431 - suffix: rkey, 432 - } 433 - } 434 - pub fn did_prefix(did: Did) -> ByIdDidPrefix { 435 - ByIdDidPrefix::from_pair(Default::default(), did) 436 - } 437 - pub fn cursor(&self) -> Cursor { 438 - self.suffix 439 - } 440 - } 441 - impl From<ByIdKey> for (Did, Nsid, RecordKey, Cursor) { 442 - fn from(k: ByIdKey) -> Self { 443 - ( 444 - k.prefix.prefix.prefix.suffix, 445 - k.prefix.prefix.suffix, 446 - k.prefix.suffix, 447 - k.suffix, 448 - ) 449 - } 450 - } 451 - 452 - pub type ByIdValue = DbEmpty; 453 - 454 - #[derive(Debug, PartialEq)] 455 - pub struct _ByCursorSeenStaticStr {} 456 - impl StaticStr for _ByCursorSeenStaticStr { 457 - fn static_str() -> &'static str { 458 - "seen_by_js_cursor" 459 - } 460 - } 461 - type ByCursorSeenPrefix = DbStaticStr<_ByCursorSeenStaticStr>; 462 - type ByCursorSeenCursorPrefix = DbConcat<ByCursorSeenPrefix, Cursor>; 463 - /// key format: ["seen_by_js_cursor"|js_cursor|collection] 464 - pub type ByCursorSeenKey = DbConcat<ByCursorSeenCursorPrefix, Nsid>; 465 - impl ByCursorSeenKey { 466 - pub fn new(cursor: Cursor, nsid: Nsid) -> Self { 467 - Self { 468 - prefix: DbConcat::from_pair(Default::default(), cursor), 469 - suffix: nsid, 470 - } 471 - } 472 - pub fn full_range() -> Result<Range<Vec<u8>>, EncodingError> { 473 - let prefix = ByCursorSeenCursorPrefix::from_pair(Default::default(), Cursor::from_start()); 474 - prefix.range() 475 - } 476 - pub fn range_from(&self) -> Result<Range<Vec<u8>>, EncodingError> { 477 - let start = self.to_db_bytes()?; 478 - let end = self.prefix.range_end()?; 479 - Ok(start..end) 480 - } 481 - pub fn collection(&self) -> &Nsid { 482 - &self.suffix 483 - } 484 - } 485 - impl From<RollupCursorValue> for ByCursorSeenKey { 486 - fn from(v: RollupCursorValue) -> Self { 487 - Self::new(v.prefix, v.suffix) 488 - } 489 - } 490 - impl From<ByCursorSeenKey> for (Cursor, Nsid) { 491 - fn from(k: ByCursorSeenKey) -> Self { 492 - (k.prefix.suffix, k.suffix) 493 - } 494 - } 495 - 496 - pub type ByCursorSeenValue = SeenCounter; 497 - 498 - #[derive(Debug, PartialEq)] 499 - pub struct _ModQueueItemStaticStr {} 500 - impl StaticStr for _ModQueueItemStaticStr { 501 - fn static_str() -> &'static str { 502 - "mod_queue" 503 - } 504 - } 505 - pub type ModQueueItemPrefix = DbStaticStr<_ModQueueItemStaticStr>; 506 - /// key format: ["mod_queue"|js_cursor] 507 - pub type ModQueueItemKey = DbConcat<ModQueueItemPrefix, Cursor>; 508 - impl ModQueueItemKey { 509 - pub fn new(cursor: Cursor) -> Self { 510 - Self::from_pair(Default::default(), cursor) 511 - } 512 - } 513 - // todo: remove this? all we need is the ModCursorValue version? 514 - impl From<ModQueueItemKey> for Cursor { 515 - fn from(k: ModQueueItemKey) -> Self { 516 - k.suffix 517 - } 518 - } 519 - impl From<&ModQueueItemKey> for ModCursorValue { 520 - fn from(k: &ModQueueItemKey) -> Self { 521 - k.suffix 522 - } 523 - } 524 - 525 - #[derive(Debug, Encode, Decode)] 526 - pub enum ModQueueItemStringValue { 527 - DeleteAccount(String), // did 528 - DeleteRecord(String, String, String), // did, collection, rkey 529 - UpdateRecord(String, String, String, String), // did, collection, rkey, json record 530 - } 531 - impl UseBincodePlz for ModQueueItemStringValue {} 532 - #[derive(Debug, Clone, PartialEq)] 533 - pub enum ModQueueItemValue { 534 - DeleteAccount(Did), 535 - DeleteRecord(Did, Nsid, RecordKey), 536 - UpdateRecord(Did, Nsid, RecordKey, serde_json::Value), 537 - } 538 - impl From<ModQueueItemValue> for ModQueueItemStringValue { 539 - fn from(v: ModQueueItemValue) -> Self { 540 - match v { 541 - ModQueueItemValue::DeleteAccount(did) => { 542 - ModQueueItemStringValue::DeleteAccount(did.to_string()) 543 - } 544 - ModQueueItemValue::DeleteRecord(did, collection, rkey) => { 545 - ModQueueItemStringValue::DeleteRecord( 546 - did.to_string(), 547 - collection.to_string(), 548 - rkey.to_string(), 549 - ) 550 - } 551 - ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 552 - ModQueueItemStringValue::UpdateRecord( 553 - did.to_string(), 554 - collection.to_string(), 555 - rkey.to_string(), 556 - record.to_string(), 557 - ) 558 - } 559 - } 560 - } 561 - } 562 - impl TryFrom<ModQueueItemStringValue> for ModQueueItemValue { 563 - type Error = EncodingError; 564 - fn try_from(v: ModQueueItemStringValue) -> Result<Self, Self::Error> { 565 - match v { 566 - ModQueueItemStringValue::DeleteAccount(did) => Ok(ModQueueItemValue::DeleteAccount( 567 - Did::new(did).map_err(EncodingError::BadAtriumStringType)?, 568 - )), 569 - ModQueueItemStringValue::DeleteRecord(did, collection, rkey) => { 570 - Ok(ModQueueItemValue::DeleteRecord( 571 - Did::new(did).map_err(EncodingError::BadAtriumStringType)?, 572 - Nsid::new(collection).map_err(EncodingError::BadAtriumStringType)?, 573 - RecordKey::new(rkey).map_err(EncodingError::BadAtriumStringType)?, 574 - )) 575 - } 576 - ModQueueItemStringValue::UpdateRecord(did, collection, rkey, record) => { 577 - Ok(ModQueueItemValue::UpdateRecord( 578 - Did::new(did).map_err(EncodingError::BadAtriumStringType)?, 579 - Nsid::new(collection).map_err(EncodingError::BadAtriumStringType)?, 580 - RecordKey::new(rkey).map_err(EncodingError::BadAtriumStringType)?, 581 - record.parse()?, 582 - )) 583 - } 584 - } 585 - } 586 - } 587 - impl DbBytes for ModQueueItemValue { 588 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 589 - Into::<ModQueueItemStringValue>::into(self.clone()).to_db_bytes() 590 - } 591 - fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 592 - let (stringy, n) = ModQueueItemStringValue::from_db_bytes(bytes)?; 593 - let me = TryInto::<ModQueueItemValue>::try_into(stringy)?; 594 - Ok((me, n)) 595 - } 596 - } 597 - 598 322 #[derive(Debug, Copy, Clone, PartialEq, Hash, PartialOrd, Eq)] 599 323 pub struct TruncatedCursor<const MOD: u64>(u64); 600 324 impl<const MOD: u64> TruncatedCursor<MOD> { ··· 648 372 #[cfg(test)] 649 373 mod test { 650 374 use super::{ 651 - ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, HourTruncatedCursor, Nsid, 652 - RecordKey, HOUR_IN_MICROS, 375 + CardinalityEstimator, CountsValue, Cursor, Did, EncodingError, HourTruncatedCursor, 376 + HourlyRollupKey, Nsid, HOUR_IN_MICROS, 653 377 }; 654 378 use crate::db_types::DbBytes; 655 379 656 380 #[test] 657 - fn test_by_collection_key() -> Result<(), EncodingError> { 381 + fn test_by_hourly_rollup_key() -> Result<(), EncodingError> { 658 382 let nsid = Nsid::new("ab.cd.efg".to_string()).unwrap(); 659 - let original = ByCollectionKey::new(nsid.clone(), Cursor::from_raw_u64(456)); 383 + let original = HourlyRollupKey::new(Cursor::from_raw_u64(4567890).into(), &nsid); 660 384 let serialized = original.to_db_bytes()?; 661 - let (restored, bytes_consumed) = ByCollectionKey::from_db_bytes(&serialized)?; 385 + let (restored, bytes_consumed) = HourlyRollupKey::from_db_bytes(&serialized)?; 662 386 assert_eq!(restored, original); 663 387 assert_eq!(bytes_consumed, serialized.len()); 664 388 665 389 let serialized_prefix = original.to_prefix_db_bytes()?; 666 - assert!(serialized.starts_with(&serialized_prefix)); 667 - let just_prefix = ByCollectionKey::prefix_from_collection(nsid)?; 668 - assert_eq!(just_prefix, serialized_prefix); 669 - assert!(just_prefix.starts_with("by_collection".as_bytes())); 390 + assert!(serialized_prefix.starts_with("hourly_counts".as_bytes())); 391 + assert!(serialized_prefix.starts_with(&serialized_prefix)); 670 392 671 393 Ok(()) 672 394 } 673 395 674 396 #[test] 675 - fn test_by_collection_value() -> Result<(), EncodingError> { 676 - let did = Did::new("did:plc:inze6wrmsm7pjl7yta3oig77".to_string()).unwrap(); 677 - let rkey = RecordKey::new("asdfasdf".to_string()).unwrap(); 678 - let record = serde_json::Value::String("hellooooo".into()); 679 - 680 - let original = ByCollectionValue::new(did, rkey, record); 397 + fn test_by_hourly_rollup_value() -> Result<(), EncodingError> { 398 + let mut estimator = CardinalityEstimator::new(); 399 + for i in 0..10 { 400 + estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap()); 401 + } 402 + let original = CountsValue::new(123, estimator); 681 403 let serialized = original.to_db_bytes()?; 682 - let (restored, bytes_consumed) = ByCollectionValue::from_db_bytes(&serialized)?; 404 + let (restored, bytes_consumed) = CountsValue::from_db_bytes(&serialized)?; 683 405 assert_eq!(restored, original); 684 406 assert_eq!(bytes_consumed, serialized.len()); 685 407