Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

first attempt to re-adapt for server

laksdjflkasjdflkajsdflkjasldfkja

can't use generics in server context for dropshot so everything is pain

- we can use a dyn thing to get the store reader into the context
- but then we can't be clone, so
- we cannot spawn_blocking to wrap the db calls because
- dropshot only gives us a borrow of the context and
- tokio spawn_blocking needs 'static

we *can* use block_in_place, but this doesn't feel particularly good.

we could hard-code the server to use the fjall store, but this feels worse.

it might be possible to use async-trait and push the spawning down to the store internals, who can clone the necessary things to become 'static i think? will try this next.

+140 -154
+10 -9
ufos/src/main.rs
··· 2 2 use std::path::PathBuf; 3 3 use ufos::consumer; 4 4 use ufos::error::StorageError; 5 + use ufos::server; 5 6 use ufos::storage::{StorageWhatever, StoreWriter}; 6 7 use ufos::storage_fjall::FjallStorage; 7 8 ··· 46 47 47 48 let args = Args::parse(); 48 49 let jetstream = args.jetstream.clone(); 49 - let (_read_store, mut write_store, cursor) = FjallStorage::init( 50 + let (read_store, mut write_store, cursor) = FjallStorage::init( 50 51 args.data, 51 52 jetstream, 52 53 args.jetstream_force, 53 54 Default::default(), 54 55 )?; 55 56 56 - // println!("starting server with storage..."); 57 - // let serving = server::serve(storage.clone()); 57 + println!("starting server with storage..."); 58 + let serving = server::serve(read_store); 58 59 59 - // let t1 = tokio::task::spawn(async { 60 - // let r = serving.await; 61 - // log::warn!("serving ended with: {r:?}"); 62 - // }); 60 + let t1 = tokio::task::spawn(async { 61 + let r = serving.await; 62 + log::warn!("serving ended with: {r:?}"); 63 + }); 63 64 64 65 let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 65 66 async move { ··· 105 106 // }; 106 107 107 108 // log::trace!("tasks running. waiting."); 108 - // t1.await?; 109 - // log::trace!("serve task ended."); 109 + t1.await?; 110 + log::trace!("serve task ended."); 110 111 t2.await??; 111 112 log::trace!("storage receive task ended."); 112 113 // t3.await?;
+118 -126
ufos/src/server.rs
··· 1 - use crate::storage_fjall::{Storage, StorageInfo}; 2 - use crate::Nsid; 1 + use crate::storage::{StoreReader}; 2 + use crate::{Nsid, ConsumerInfo}; 3 3 use dropshot::endpoint; 4 4 use dropshot::ApiDescription; 5 5 use dropshot::ConfigDropshot; ··· 15 15 use serde::{Deserialize, Serialize}; 16 16 use std::collections::HashMap; 17 17 use std::sync::Arc; 18 + use tokio::task::block_in_place; 18 19 19 - #[derive(Clone)] 20 20 struct Context { 21 21 pub spec: Arc<serde_json::Value>, 22 - storage: Storage, 22 + storage: Box<dyn StoreReader>, 23 23 } 24 24 25 25 /// Meta: get the openapi spec for this api ··· 32 32 ok_cors(spec) 33 33 } 34 34 35 + 35 36 #[derive(Debug, Serialize, JsonSchema)] 36 37 struct MetaInfo { 37 - storage_info: StorageInfo, 38 - jetstream_endpoint: Option<String>, 39 - jetstream_cursor: Option<u64>, 38 + storage: serde_json::Value, 39 + consumer: ConsumerInfo, 40 40 } 41 41 /// Get meta information about UFOs itself 42 42 #[endpoint { ··· 45 45 }] 46 46 async fn get_meta_info(ctx: RequestContext<Context>) -> OkCorsResponse<MetaInfo> { 47 47 let Context { storage, .. } = ctx.context(); 48 - 49 48 let failed_to_get = 50 49 |what| move |e| HttpError::for_internal_error(format!("failed to get {what}: {e:?}")); 51 50 52 - let storage_info = storage 53 - .get_meta_info() 54 - .await 55 - .map_err(failed_to_get("meta info"))?; 51 + let storage_info = block_in_place(|| storage.get_storage_stats()) 52 + .map_err(failed_to_get("storage info"))?; 56 53 57 - let jetstream_endpoint = storage 58 - .get_jetstream_endpoint() 59 - .await 60 - .map_err(failed_to_get("jetstream endpoint"))? 61 - .map(|v| v.0); 62 - 63 - let jetstream_cursor = storage 64 - .get_jetstream_cursor() 65 - .await 66 - .map_err(failed_to_get("jetstream cursor"))? 67 - .map(|c| c.to_raw_u64()); 54 + let consumer = block_in_place(|| storage.get_consumer_info()) 55 + .map_err(failed_to_get("consumer info"))?; 68 56 69 57 ok_cors(MetaInfo { 70 - storage_info, 71 - jetstream_endpoint, 72 - jetstream_cursor, 58 + storage: storage_info, 59 + consumer, 73 60 }) 74 61 } 75 62 76 - #[derive(Debug, Deserialize, JsonSchema)] 77 - struct CollectionsQuery { 78 - collection: String, // JsonSchema not implemented for Nsid :( 79 - } 80 - impl CollectionsQuery { 81 - fn to_multiple_nsids(&self) -> Result<Vec<Nsid>, String> { 82 - let mut out = Vec::with_capacity(self.collection.len()); 83 - for collection in self.collection.split(',') { 84 - let Ok(nsid) = Nsid::new(collection.to_string()) else { 85 - return Err(format!("collection {collection:?} was not a valid NSID")); 86 - }; 87 - out.push(nsid); 88 - } 89 - Ok(out) 90 - } 91 - } 92 - // #[derive(Debug, Serialize, JsonSchema)] 93 - // struct ApiRecord { 94 - // did: String, 95 - // collection: String, 96 - // rkey: String, 97 - // record: serde_json::Value, 98 - // time_us: u64, 63 + // #[derive(Debug, Deserialize, JsonSchema)] 64 + // struct CollectionsQuery { 65 + // collection: String, // JsonSchema not implemented for Nsid :( 99 66 // } 100 - // impl ApiRecord { 101 - // fn from_create_record(create_record: CreateRecord, collection: &Nsid) -> Self { 102 - // let CreateRecord { 103 - // did, 104 - // rkey, 105 - // record, 106 - // cursor, 107 - // } = create_record; 108 - // Self { 109 - // did: did.to_string(), 110 - // collection: collection.to_string(), 111 - // rkey: rkey.to_string(), 112 - // record, 113 - // time_us: cursor.to_raw_u64(), 67 + // impl CollectionsQuery { 68 + // fn to_multiple_nsids(&self) -> Result<Vec<Nsid>, String> { 69 + // let mut out = Vec::with_capacity(self.collection.len()); 70 + // for collection in self.collection.split(',') { 71 + // let Ok(nsid) = Nsid::new(collection.to_string()) else { 72 + // return Err(format!("collection {collection:?} was not a valid NSID")); 73 + // }; 74 + // out.push(nsid); 114 75 // } 76 + // Ok(out) 115 77 // } 116 78 // } 117 - // /// Get recent records by collection 118 - // /// 119 - // /// Multiple collections are supported. they will be delivered in one big array with no 120 - // /// specified order. 79 + // // #[derive(Debug, Serialize, JsonSchema)] 80 + // // struct ApiRecord { 81 + // // did: String, 82 + // // collection: String, 83 + // // rkey: String, 84 + // // record: serde_json::Value, 85 + // // time_us: u64, 86 + // // } 87 + // // impl ApiRecord { 88 + // // fn from_create_record(create_record: CreateRecord, collection: &Nsid) -> Self { 89 + // // let CreateRecord { 90 + // // did, 91 + // // rkey, 92 + // // record, 93 + // // cursor, 94 + // // } = create_record; 95 + // // Self { 96 + // // did: did.to_string(), 97 + // // collection: collection.to_string(), 98 + // // rkey: rkey.to_string(), 99 + // // record, 100 + // // time_us: cursor.to_raw_u64(), 101 + // // } 102 + // // } 103 + // // } 104 + // // /// Get recent records by collection 105 + // // /// 106 + // // /// Multiple collections are supported. they will be delivered in one big array with no 107 + // // /// specified order. 108 + // // #[endpoint { 109 + // // method = GET, 110 + // // path = "/records", 111 + // // }] 112 + // // async fn get_records_by_collection( 113 + // // ctx: RequestContext<Context>, 114 + // // collection_query: Query<CollectionsQuery>, 115 + // // ) -> OkCorsResponse<Vec<ApiRecord>> { 116 + // // let Context { storage, .. } = ctx.context(); 117 + 118 + // // let collections = collection_query 119 + // // .into_inner() 120 + // // .to_multiple_nsids() 121 + // // .map_err(|reason| HttpError::for_bad_request(None, reason))?; 122 + 123 + // // let mut api_records = Vec::new(); 124 + 125 + // // // TODO: set up multiple db iterators and iterate them together with merge sort 126 + // // for collection in &collections { 127 + // // let records = storage 128 + // // .get_collection_records(collection, 100) 129 + // // .await 130 + // // .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 131 + 132 + // // for record in records { 133 + // // let api_record = ApiRecord::from_create_record(record, collection); 134 + // // api_records.push(api_record); 135 + // // } 136 + // // } 137 + 138 + // // ok_cors(api_records) 139 + // // } 140 + 141 + // /// Get total records seen by collection 121 142 // #[endpoint { 122 143 // method = GET, 123 - // path = "/records", 144 + // path = "/records/total-seen" 124 145 // }] 125 - // async fn get_records_by_collection( 146 + // async fn get_records_total_seen( 126 147 // ctx: RequestContext<Context>, 127 148 // collection_query: Query<CollectionsQuery>, 128 - // ) -> OkCorsResponse<Vec<ApiRecord>> { 149 + // ) -> OkCorsResponse<HashMap<String, u64>> { 129 150 // let Context { storage, .. } = ctx.context(); 130 151 131 152 // let collections = collection_query ··· 133 154 // .to_multiple_nsids() 134 155 // .map_err(|reason| HttpError::for_bad_request(None, reason))?; 135 156 136 - // let mut api_records = Vec::new(); 157 + // let mut seen_by_collection = HashMap::with_capacity(collections.len()); 137 158 138 - // // TODO: set up multiple db iterators and iterate them together with merge sort 139 159 // for collection in &collections { 140 - // let records = storage 141 - // .get_collection_records(collection, 100) 160 + // let total = storage 161 + // .get_collection_total_seen(collection) 142 162 // .await 143 - // .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 163 + // .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 144 164 145 - // for record in records { 146 - // let api_record = ApiRecord::from_create_record(record, collection); 147 - // api_records.push(api_record); 148 - // } 165 + // seen_by_collection.insert(collection.to_string(), total); 149 166 // } 150 167 151 - // ok_cors(api_records) 168 + // ok_cors(seen_by_collection) 152 169 // } 153 170 154 - /// Get total records seen by collection 155 - #[endpoint { 156 - method = GET, 157 - path = "/records/total-seen" 158 - }] 159 - async fn get_records_total_seen( 160 - ctx: RequestContext<Context>, 161 - collection_query: Query<CollectionsQuery>, 162 - ) -> OkCorsResponse<HashMap<String, u64>> { 163 - let Context { storage, .. } = ctx.context(); 171 + // /// Get top collections 172 + // #[endpoint { 173 + // method = GET, 174 + // path = "/collections" 175 + // }] 176 + // async fn get_top_collections(ctx: RequestContext<Context>) -> OkCorsResponse<HashMap<String, u64>> { 177 + // let Context { storage, .. } = ctx.context(); 178 + // let collections = storage 179 + // .get_top_collections() 180 + // .await 181 + // .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 164 182 165 - let collections = collection_query 166 - .into_inner() 167 - .to_multiple_nsids() 168 - .map_err(|reason| HttpError::for_bad_request(None, reason))?; 183 + // ok_cors(collections) 184 + // } 169 185 170 - let mut seen_by_collection = HashMap::with_capacity(collections.len()); 186 + // impl Context { 187 + // fn what() -> () {} 188 + // } 171 189 172 - for collection in &collections { 173 - let total = storage 174 - .get_collection_total_seen(collection) 175 - .await 176 - .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 190 + pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 177 191 178 - seen_by_collection.insert(collection.to_string(), total); 179 - } 180 - 181 - ok_cors(seen_by_collection) 182 - } 183 - 184 - /// Get top collections 185 - #[endpoint { 186 - method = GET, 187 - path = "/collections" 188 - }] 189 - async fn get_top_collections(ctx: RequestContext<Context>) -> OkCorsResponse<HashMap<String, u64>> { 190 - let Context { storage, .. } = ctx.context(); 191 - let collections = storage 192 - .get_top_collections() 193 - .await 194 - .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 195 - 196 - ok_cors(collections) 197 - } 198 - 199 - pub async fn serve(storage: Storage) -> Result<(), String> { 200 192 let log = ConfigLogging::StderrTerminal { 201 193 level: ConfigLoggingLevel::Info, 202 194 } ··· 207 199 208 200 api.register(get_openapi).unwrap(); 209 201 api.register(get_meta_info).unwrap(); 210 - // api.register(get_records_by_collection).unwrap(); 211 - api.register(get_records_total_seen).unwrap(); 212 - api.register(get_top_collections).unwrap(); 202 + // // api.register(get_records_by_collection).unwrap(); 203 + // api.register(get_records_total_seen).unwrap(); 204 + // api.register(get_top_collections).unwrap(); 213 205 214 206 let context = Context { 215 207 spec: Arc::new( ··· 217 209 .json() 218 210 .map_err(|e| e.to_string())?, 219 211 ), 220 - storage, 212 + storage: Box::new(storage), 221 213 }; 222 214 223 215 ServerBuilder::new(api, context, log)
+3 -10
ufos/src/storage.rs
··· 1 1 // use crate::store_types::CountsValue; 2 2 use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, TopCollections, UFOsRecord}; 3 3 use jetstream::exports::{Did, Nsid}; 4 - use schemars::JsonSchema; 5 - use serde::Serialize; 6 4 use std::path::Path; 7 5 8 6 pub type StorageResult<T> = Result<T, StorageError>; 9 7 10 - pub trait StorageWhatever<R, W, C, S> 11 - where 12 - R: StoreReader<S>, 13 - W: StoreWriter, 14 - S: Serialize + JsonSchema, 15 - { 8 + pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> { 16 9 fn init( 17 10 path: impl AsRef<Path>, 18 11 endpoint: String, ··· 34 27 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>; 35 28 } 36 29 37 - pub trait StoreReader<S>: Clone { 38 - fn get_storage_stats(&self) -> StorageResult<S>; 30 + pub trait StoreReader: Send + Sync { 31 + fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 39 32 40 33 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 41 34
+9 -9
ufos/src/storage_fjall.rs
··· 135 135 pub rollup_cursor: Option<u64>, 136 136 } 137 137 138 - impl StorageWhatever<FjallReader, FjallWriter, FjallConfig, FjallStats> for FjallStorage { 138 + impl StorageWhatever<FjallReader, FjallWriter, FjallConfig> for FjallStorage { 139 139 fn init( 140 140 path: impl AsRef<Path>, 141 141 endpoint: String, ··· 300 300 } 301 301 } 302 302 303 - impl StoreReader<FjallStats> for FjallReader { 304 - fn get_storage_stats(&self) -> StorageResult<FjallStats> { 303 + impl StoreReader for FjallReader { 304 + fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 305 305 let rollup_cursor = 306 306 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 307 307 .map(|c| c.to_raw_u64()); 308 308 309 - Ok(FjallStats { 310 - keyspace_disk_space: self.keyspace.disk_space(), 311 - keyspace_journal_count: self.keyspace.journal_count(), 312 - keyspace_sequence: self.keyspace.instant(), 313 - rollup_cursor, 314 - }) 309 + Ok(serde_json::json!({ 310 + "keyspace_disk_space": self.keyspace.disk_space(), 311 + "keyspace_journal_count": self.keyspace.journal_count(), 312 + "keyspace_sequence": self.keyspace.instant(), 313 + "rollup_cursor": rollup_cursor, 314 + })) 315 315 } 316 316 317 317 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> {