Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

async storage init + jetstream and cursor meta

+178 -47
+1 -1
ufos/src/main.rs
··· 37 37 38 38 let args = Args::parse(); 39 39 let (storage, cursor) = 40 - store::Storage::open(&args.data, &args.jetstream, args.jetstream_force)?; 40 + store::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?; 41 41 42 42 println!("starting consumer with cursor: {cursor:?}"); 43 43 let batches = consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?;
+33 -2
ufos/src/server.rs
··· 35 35 #[derive(Debug, Serialize, JsonSchema)] 36 36 struct MetaInfo { 37 37 storage_info: StorageInfo, 38 + jetstream_endpoint: Option<String>, 39 + jetstream_cursor: Option<u64>, 40 + mod_cursor: Option<u64>, 38 41 } 39 42 /// Get meta information about UFOs itself 40 43 #[endpoint { ··· 45 48 ctx: RequestContext<Context>, 46 49 ) -> Result<HttpResponseOk<MetaInfo>, HttpError> { 47 50 let Context { storage, .. } = ctx.context(); 51 + 52 + let failed_to_get = 53 + |what| move |e| HttpError::for_internal_error(format!("failed to get {what}: {e:?}")); 54 + 48 55 let storage_info = storage 49 56 .get_meta_info() 50 57 .await 51 - .map_err(|e| HttpError::for_internal_error(format!("failed to get meta info: {e}")))?; 52 - Ok(HttpResponseOk(MetaInfo { storage_info })) 58 + .map_err(failed_to_get("meta info"))?; 59 + 60 + let jetstream_endpoint = storage 61 + .get_jetstream_endpoint() 62 + .await 63 + .map_err(failed_to_get("jetstream endpoint"))? 64 + .map(|v| v.0); 65 + 66 + let jetstream_cursor = storage 67 + .get_jetstream_cursor() 68 + .await 69 + .map_err(failed_to_get("jetstream cursor"))? 70 + .map(|c| c.to_raw_u64()); 71 + 72 + let mod_cursor = storage 73 + .get_mod_cursor() 74 + .await 75 + .map_err(failed_to_get("jetstream cursor"))? 76 + .map(|c| c.to_raw_u64()); 77 + 78 + Ok(HttpResponseOk(MetaInfo { 79 + storage_info, 80 + jetstream_endpoint, 81 + jetstream_cursor, 82 + mod_cursor, 83 + })) 53 84 } 54 85 55 86 #[derive(Debug, Deserialize, JsonSchema)]
+100 -44
ufos/src/store.rs
··· 1 - use crate::db_types::{db_complete, DbBytes}; 1 + use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr}; 2 2 use crate::store_types::{ 3 3 ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue, 4 - ModQueueItemKey, ModQueueItemValue, 4 + JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, 5 + ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemValue, 5 6 }; 6 7 use crate::{CollectionSamples, CreateRecord, DeleteAccount, EventBatch, ModifyRecord, Nsid}; 7 8 use fjall::{ 8 - Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, 9 - PartitionHandle, Slice, 9 + Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, 10 10 }; 11 11 use jetstream::events::Cursor; 12 12 use std::collections::HashMap; 13 - use std::path::Path; 13 + use std::path::{Path, PathBuf}; 14 14 use std::time::{Duration, Instant}; 15 15 use tokio::{sync::mpsc::Receiver, time::sleep}; 16 16 ··· 46 46 } 47 47 48 48 impl Storage { 49 - pub fn open( 50 - path: impl AsRef<Path>, 51 - endpoint: &str, 52 - force_endpoint: bool, 53 - ) -> anyhow::Result<(Self, Option<Cursor>)> { 54 - // TODO: make this async? or should the caller remember that storage is sync? 49 + fn init_self(path: impl AsRef<Path>) -> anyhow::Result<Self> { 55 50 let keyspace = Config::new(path).fsync_ms(Some(4_000)).open()?; 56 - 57 51 let partition = keyspace.open_partition( 58 52 "default", 59 53 PartitionCreateOptions::default().compression(CompressionType::None), 60 54 )?; 55 + Ok(Self { 56 + keyspace, 57 + partition, 58 + }) 59 + } 61 60 62 - let js_cursor = partition.get("js_cursor")?.map(cursor_from_slice); 61 + pub async fn open( 62 + path: PathBuf, 63 + endpoint: &str, 64 + force_endpoint: bool, 65 + ) -> anyhow::Result<(Self, Option<Cursor>)> { 66 + let me = tokio::task::spawn_blocking(move || Storage::init_self(path)).await??; 67 + 68 + let js_cursor = me.get_jetstream_cursor().await?; 63 69 64 70 if js_cursor.is_some() { 65 - let Some(endpoint_bytes) = partition.get("js_endpoint")? else { 71 + let Some(JetstreamEndpointValue(stored)) = me.get_jetstream_endpoint().await? else { 66 72 anyhow::bail!("found cursor but missing js_endpoint, refusing to start."); 67 73 }; 68 - let stored = std::str::from_utf8(endpoint_bytes.as_ref())?; 69 74 if stored != endpoint { 70 75 if force_endpoint { 71 76 log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 77 + me.set_jetstream_endpoint(endpoint).await?; 72 78 } else { 73 79 anyhow::bail!("stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start."); 74 80 } 75 81 } 76 82 } else { 77 - partition.insert("js_endpoint", endpoint.as_bytes())?; 83 + me.set_jetstream_endpoint(endpoint).await?; 78 84 } 79 85 80 - Ok(( 81 - Self { 82 - keyspace, 83 - partition, 84 - }, 85 - js_cursor, 86 - )) 86 + Ok((me, js_cursor)) 87 87 } 88 88 89 89 pub async fn receive(&self, mut receiver: Receiver<EventBatch>) -> anyhow::Result<()> { ··· 144 144 .await? 145 145 } 146 146 147 - pub async fn get_meta_info(&self) -> Result<StorageInfo, tokio::task::JoinError> { 147 + pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> { 148 148 let keyspace = self.keyspace.clone(); 149 149 let partition = self.partition.clone(); 150 150 tokio::task::spawn_blocking(move || { ··· 157 157 }) 158 158 .await? 159 159 } 160 + 161 + pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> { 162 + let partition = self.partition.clone(); 163 + tokio::task::spawn_blocking(move || { 164 + get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&partition) 165 + }) 166 + .await? 167 + } 168 + 169 + async fn set_jetstream_endpoint(&self, endpoint: &str) -> anyhow::Result<()> { 170 + let partition = self.partition.clone(); 171 + let endpoint = endpoint.to_string(); 172 + tokio::task::spawn_blocking(move || { 173 + insert_static::<JetstreamEndpointKey>(&partition, JetstreamEndpointValue(endpoint)) 174 + }) 175 + .await? 176 + } 177 + 178 + pub async fn get_jetstream_cursor(&self) -> anyhow::Result<Option<Cursor>> { 179 + let partition = self.partition.clone(); 180 + tokio::task::spawn_blocking(move || { 181 + get_static::<JetstreamCursorKey, JetstreamCursorValue>(&partition) 182 + }) 183 + .await? 184 + } 185 + 186 + pub async fn get_mod_cursor(&self) -> anyhow::Result<Option<Cursor>> { 187 + let partition = self.partition.clone(); 188 + tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&partition)) 189 + .await? 190 + } 160 191 } 161 192 162 - #[derive(Debug, serde::Serialize, schemars::JsonSchema)] 163 - pub struct StorageInfo { 164 - pub keyspace_disk_space: u64, 165 - pub keyspace_journal_count: usize, 166 - pub keyspace_sequence: u64, 167 - pub partition_approximate_len: usize, 193 + /// Get a value from a fixed key 194 + fn get_static<K: StaticStr, V: DbBytes>(partition: &PartitionHandle) -> anyhow::Result<Option<V>> { 195 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 196 + let value = partition 197 + .get(&key_bytes)? 198 + .map(|value_bytes| db_complete(&value_bytes)) 199 + .transpose()?; 200 + Ok(value) 201 + } 202 + 203 + /// Set a value to a fixed key 204 + fn insert_static<K: StaticStr>( 205 + partition: &PartitionHandle, 206 + value: impl DbBytes, 207 + ) -> anyhow::Result<()> { 208 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 209 + let value_bytes = value.to_db_bytes()?; 210 + partition.insert(&key_bytes, &value_bytes)?; 211 + Ok(()) 168 212 } 169 213 170 - struct BatchWriter { 171 - keyspace: Keyspace, 172 - partition: PartitionHandle, 214 + /// Set a value to a fixed key 215 + fn insert_batch_static<K: StaticStr>( 216 + batch: &mut FjallBatch, 217 + partition: &PartitionHandle, 218 + value: impl DbBytes, 219 + ) -> anyhow::Result<()> { 220 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 221 + let value_bytes = value.to_db_bytes()?; 222 + batch.insert(partition, &key_bytes, &value_bytes); 223 + Ok(()) 173 224 } 174 225 175 226 impl BatchWriter { ··· 179 230 self.add_record_modifies(&mut db_batch, event_batch.record_modifies)?; 180 231 self.add_account_removes(&mut db_batch, event_batch.account_removes)?; 181 232 if let Some(cursor) = last { 182 - db_batch.insert(&self.partition, "js_cursor", cursor_to_slice(cursor)); 233 + insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.partition, cursor)?; 183 234 } 184 235 db_batch.commit()?; 185 236 Ok(()) ··· 277 328 } 278 329 } 279 330 331 + #[derive(Debug, serde::Serialize, schemars::JsonSchema)] 332 + pub struct StorageInfo { 333 + pub keyspace_disk_space: u64, 334 + pub keyspace_journal_count: usize, 335 + pub keyspace_sequence: u64, 336 + pub partition_approximate_len: usize, 337 + } 338 + 339 + struct BatchWriter { 340 + keyspace: Keyspace, 341 + partition: PartitionHandle, 342 + } 343 + 344 + ////////// temp stuff to remove: 345 + 280 346 fn summarize_batch(batch: &EventBatch) -> String { 281 347 let EventBatch { 282 348 record_creates, ··· 295 361 last_jetstream_cursor.clone().map(|c| c.elapsed()) 296 362 ) 297 363 } 298 - 299 - fn cursor_to_slice(cursor: Cursor) -> [u8; 8] { 300 - cursor.to_raw_u64().to_be_bytes() 301 - } 302 - fn cursor_from_slice(bytes: Slice) -> Cursor { 303 - let mut buf = [0u8; 8]; 304 - let len = 8.min(bytes.len()); 305 - buf[..len].copy_from_slice(&bytes[..len]); 306 - Cursor::from_raw_u64(u64::from_be_bytes(buf)) 307 - }
+44
ufos/src/store_types.rs
··· 4 4 use crate::{Cursor, Did, Nsid, RecordKey}; 5 5 use bincode::{Decode, Encode}; 6 6 7 + /// key format: ["js_cursor"] 8 + #[derive(Debug, PartialEq)] 9 + pub struct JetstreamCursorKey {} 10 + impl StaticStr for JetstreamCursorKey { 11 + fn static_str() -> &'static str { 12 + "js_cursor" 13 + } 14 + } 15 + pub type JetstreamCursorValue = Cursor; 16 + 17 + /// key format: ["mod_cursor"] 18 + #[derive(Debug, PartialEq)] 19 + pub struct ModCursorKey {} 20 + impl StaticStr for ModCursorKey { 21 + fn static_str() -> &'static str { 22 + "mod_cursor" 23 + } 24 + } 25 + pub type ModCursorValue = Cursor; 26 + 27 + /// key format: ["js_endpoint"] 28 + #[derive(Debug, PartialEq)] 29 + pub struct JetstreamEndpointKey {} 30 + impl StaticStr for JetstreamEndpointKey { 31 + fn static_str() -> &'static str { 32 + "js_endpoint" 33 + } 34 + } 35 + #[derive(Debug, PartialEq)] 36 + pub struct JetstreamEndpointValue(pub String); 37 + /// String wrapper for jetstream endpoint value 38 + /// 39 + /// Warning: this is a non-terminating byte representation of a string: it cannot be used in prefix position of DbConcat 40 + impl DbBytes for JetstreamEndpointValue { 41 + // TODO: maybe make a helper type in db_types 42 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 43 + Ok(self.0.as_bytes().to_vec()) 44 + } 45 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 46 + let s = std::str::from_utf8(bytes)?.to_string(); 47 + Ok((Self(s), bytes.len())) 48 + } 49 + } 50 + 7 51 #[derive(Debug, Clone, Encode, Decode)] 8 52 pub struct SeenCounter(pub u64); 9 53 impl SeenCounter {