Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

externally serializing reads did not help!

what!

+30 -22
+30 -22
ufos/src/store.rs
··· 1 - // use std::sync::{Arc, Mutex}; // BLOCKING mutex -- only in spawn_blocking tasks! 1 + use std::sync::Arc; 2 2 use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr}; 3 3 use crate::store_types::{ 4 4 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, ··· 16 16 use std::collections::HashMap; 17 17 use std::path::{Path, PathBuf}; 18 18 use std::time::{Duration, Instant}; 19 - use tokio::{sync::mpsc::Receiver, time::sleep}; 19 + use tokio::time::sleep; 20 + use tokio::sync::{mpsc::Receiver, Mutex}; 20 21 21 22 /// Commit the RW batch immediately if this nubmer of events have been read off the mod queue 22 23 const MAX_BATCHED_RW_EVENTS: usize = 3; ··· 30 31 /// this is higher than [MAX_BATCHED_RW_EVENTS] because account-deletes can have lots of items 31 32 const MAX_BATCHED_RW_ITEMS: usize = 36; 32 33 34 + 35 + struct SerialDb { 36 + keyspace: Keyspace, 37 + partition: PartitionHandle, 38 + } 39 + 33 40 /** 34 41 * data format, roughly: 35 42 * ··· 57 64 **/ 58 65 #[derive(Clone)] 59 66 pub struct Storage { 60 - keyspace: Keyspace, 61 - partition: PartitionHandle, 62 - // write_lock: Arc<Mutex<()>>, 67 + /// horrible: gate all db access behind this to force global serialization to avoid deadlock 68 + db: Arc<Mutex<SerialDb>>, 63 69 } 64 70 65 71 impl Storage { ··· 70 76 PartitionCreateOptions::default().compression(CompressionType::None), 71 77 )?; 72 78 Ok(Self { 73 - keyspace, 74 - partition, 75 - // write_lock: Arc::new(Mutex::new(())), 79 + db: Arc::new(Mutex::new(SerialDb { keyspace, partition })), 76 80 }) 77 81 } 78 82 ··· 121 125 122 126 let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first. 123 127 124 - let keyspace = self.keyspace.clone(); 125 - let partition = self.partition.clone(); 126 - // let write_lock = self.write_lock.clone(); 128 + let db = self.db.lock().await; 129 + let keyspace = db.keyspace.clone(); 130 + let partition = db.partition.clone(); 127 131 128 132 let writer_t0 = Instant::now(); 129 133 log::trace!("spawn_blocking for write batch"); ··· 131 135 DBWriter { 132 136 keyspace, 133 137 partition, 134 - // write_lock, 135 138 } 136 139 .write_batch(event_batch, last) 137 140 }) 138 141 .await??; 139 142 log::trace!("write: back from blocking task, successfully wrote batch"); 140 143 let wrote_for = writer_t0.elapsed(); 144 + drop(db); 141 145 142 146 println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}"); 143 147 } else { ··· 152 156 // TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time. 153 157 loop { 154 158 sleep(Duration::from_secs_f64(0.001)).await; // todo: interval rate-limit instead 155 - let keyspace = self.keyspace.clone(); 156 - let partition = self.partition.clone(); 159 + 160 + let db = self.db.lock().await; 161 + let keyspace = db.keyspace.clone(); 162 + let partition = db.partition.clone(); 163 + 157 164 log::trace!("rw: spawn blocking for batch..."); 158 165 tokio::task::spawn_blocking(move || -> anyhow::Result<()> { 159 166 log::trace!("rw: getting rw cursor..."); ··· 222 229 collection: &Nsid, 223 230 limit: usize, 224 231 ) -> anyhow::Result<Vec<CreateRecord>> { 225 - let partition = self.partition.clone(); 232 + let partition = self.db.lock().await.partition.clone(); 226 233 let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?; 227 234 tokio::task::spawn_blocking(move || { 228 235 let mut output = Vec::new(); ··· 243 250 } 244 251 245 252 pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> { 246 - let keyspace = self.keyspace.clone(); 247 - let partition = self.partition.clone(); 253 + let db = self.db.lock().await; 254 + let keyspace = db.keyspace.clone(); 255 + let partition = db.partition.clone(); 248 256 tokio::task::spawn_blocking(move || { 249 257 Ok(StorageInfo { 250 258 keyspace_disk_space: keyspace.disk_space(), ··· 257 265 } 258 266 259 267 pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> { 260 - let partition = self.partition.clone(); 268 + let partition = self.db.lock().await.partition.clone(); 261 269 let collection = collection.clone(); 262 270 tokio::task::spawn_blocking(move || get_unrolled_asdf(&partition, collection)).await? 263 271 } 264 272 265 273 pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> { 266 - let partition = self.partition.clone(); 274 + let partition = self.db.lock().await.partition.clone(); 267 275 tokio::task::spawn_blocking(move || { 268 276 get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&partition) 269 277 }) ··· 271 279 } 272 280 273 281 async fn set_jetstream_endpoint(&self, endpoint: &str) -> anyhow::Result<()> { 274 - let partition = self.partition.clone(); 282 + let partition = self.db.lock().await.partition.clone(); 275 283 let endpoint = endpoint.to_string(); 276 284 tokio::task::spawn_blocking(move || { 277 285 insert_static::<JetstreamEndpointKey>(&partition, JetstreamEndpointValue(endpoint)) ··· 280 288 } 281 289 282 290 pub async fn get_jetstream_cursor(&self) -> anyhow::Result<Option<Cursor>> { 283 - let partition = self.partition.clone(); 291 + let partition = self.db.lock().await.partition.clone(); 284 292 tokio::task::spawn_blocking(move || { 285 293 get_static::<JetstreamCursorKey, JetstreamCursorValue>(&partition) 286 294 }) ··· 288 296 } 289 297 290 298 pub async fn get_mod_cursor(&self) -> anyhow::Result<Option<Cursor>> { 291 - let partition = self.partition.clone(); 299 + let partition = self.db.lock().await.partition.clone(); 292 300 tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&partition)) 293 301 .await? 294 302 }