Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

ufos: hook the server up to the new storage

lkasdjflaksjdlfjkas lfdkja slfdj alskdfj laksj dflkaj sdflkaj sdf
jlajk sdflkja sldfkj a
sd f
kj asldfj alksdjf laj d
f;ja sldj falksfd lkasjdf lakj dflj

alksdjf lakjs dflakjs dflkajs dlfkaj sldfkja
s;df jlaksjd flkasj fj

+185 -162
+1 -1
ufos/Cargo.toml
··· 15 15 jetstream = { path = "../jetstream" } 16 16 log = "0.4.26" 17 17 lsm-tree = "2.6.6" 18 - schemars = "0.8.22" 18 + schemars = { version = "0.8.22", features = ["raw_value"] } 19 19 semver = "1.0.26" 20 20 serde = "1.0.219" 21 21 serde_json = "1.0.140"
+120 -125
ufos/src/server.rs
··· 1 - use crate::storage::{StoreReader}; 2 - use crate::{Nsid, ConsumerInfo}; 1 + use crate::storage::StoreReader; 2 + use crate::{ConsumerInfo, Nsid, TopCollections, UFOsRecord}; 3 3 use dropshot::endpoint; 4 4 use dropshot::ApiDescription; 5 5 use dropshot::ConfigDropshot; ··· 15 15 use serde::{Deserialize, Serialize}; 16 16 use std::collections::HashMap; 17 17 use std::sync::Arc; 18 - use tokio::task::block_in_place; 19 18 20 19 struct Context { 21 20 pub spec: Arc<serde_json::Value>, ··· 31 30 let spec = (*ctx.context().spec).clone(); 32 31 ok_cors(spec) 33 32 } 34 - 35 33 36 34 #[derive(Debug, Serialize, JsonSchema)] 37 35 struct MetaInfo { ··· 48 46 let failed_to_get = 49 47 |what| move |e| HttpError::for_internal_error(format!("failed to get {what}: {e:?}")); 50 48 51 - let storage_info = storage.get_storage_stats_a().await 49 + let storage_info = storage 50 + .get_storage_stats() 51 + .await 52 52 .map_err(failed_to_get("storage info"))?; 53 53 54 - let consumer = block_in_place(|| storage.get_consumer_info()) 54 + let consumer = storage 55 + .get_consumer_info() 56 + .await 55 57 .map_err(failed_to_get("consumer info"))?; 56 58 57 59 ok_cors(MetaInfo { ··· 60 62 }) 61 63 } 62 64 63 - // #[derive(Debug, Deserialize, JsonSchema)] 64 - // struct CollectionsQuery { 65 - // collection: String, // JsonSchema not implemented for Nsid :( 66 - // } 67 - // impl CollectionsQuery { 68 - // fn to_multiple_nsids(&self) -> Result<Vec<Nsid>, String> { 69 - // let mut out = Vec::with_capacity(self.collection.len()); 70 - // for collection in self.collection.split(',') { 71 - // let Ok(nsid) = Nsid::new(collection.to_string()) else { 72 - // return Err(format!("collection {collection:?} was not a valid NSID")); 73 - // }; 74 - // out.push(nsid); 75 - // } 76 - // Ok(out) 77 - // } 78 - // } 79 - // // #[derive(Debug, Serialize, JsonSchema)] 80 - // // struct ApiRecord { 81 - // // did: String, 82 - // // collection: String, 83 - // // rkey: String, 84 - // // record: serde_json::Value, 85 - // // time_us: u64, 86 - // // } 87 - // // impl ApiRecord { 88 - // // fn from_create_record(create_record: CreateRecord, collection: &Nsid) -> Self { 89 - // // let CreateRecord { 90 - // // did, 91 - // // rkey, 92 - // // record, 93 - // // cursor, 94 - // // } = create_record; 95 - // // Self { 96 - // // did: did.to_string(), 97 - // // collection: collection.to_string(), 98 - // // rkey: rkey.to_string(), 99 - // // record, 100 - // // time_us: cursor.to_raw_u64(), 101 - // // } 102 - // // } 103 - // // } 104 - // // /// Get recent records by collection 105 - // // /// 106 - // // /// Multiple collections are supported. they will be delivered in one big array with no 107 - // // /// specified order. 108 - // // #[endpoint { 109 - // // method = GET, 110 - // // path = "/records", 111 - // // }] 112 - // // async fn get_records_by_collection( 113 - // // ctx: RequestContext<Context>, 114 - // // collection_query: Query<CollectionsQuery>, 115 - // // ) -> OkCorsResponse<Vec<ApiRecord>> { 116 - // // let Context { storage, .. } = ctx.context(); 65 + #[derive(Debug, Deserialize, JsonSchema)] 66 + struct CollectionsQuery { 67 + collection: String, // JsonSchema not implemented for Nsid :( 68 + } 69 + impl CollectionsQuery { 70 + fn to_multiple_nsids(&self) -> Result<Vec<Nsid>, String> { 71 + let mut out = Vec::with_capacity(self.collection.len()); 72 + for collection in self.collection.split(',') { 73 + let Ok(nsid) = Nsid::new(collection.to_string()) else { 74 + return Err(format!("collection {collection:?} was not a valid NSID")); 75 + }; 76 + out.push(nsid); 77 + } 78 + Ok(out) 79 + } 80 + } 81 + #[derive(Debug, Serialize, JsonSchema)] 82 + struct ApiRecord { 83 + did: String, 84 + collection: String, 85 + rkey: String, 86 + record: Box<serde_json::value::RawValue>, 87 + time_us: u64, 88 + } 89 + impl From<UFOsRecord> for ApiRecord { 90 + fn from(ufo: UFOsRecord) -> Self { 91 + Self { 92 + did: ufo.did.to_string(), 93 + collection: ufo.collection.to_string(), 94 + rkey: ufo.rkey.to_string(), 95 + record: ufo.record, 96 + time_us: ufo.cursor.to_raw_u64(), 97 + } 98 + } 99 + } 100 + /// Get recent records by collection 101 + /// 102 + /// Multiple collections are supported. they will be delivered in one big array with no 103 + /// specified order. 104 + #[endpoint { 105 + method = GET, 106 + path = "/records", 107 + }] 108 + async fn get_records_by_collection( 109 + ctx: RequestContext<Context>, 110 + collection_query: Query<CollectionsQuery>, 111 + ) -> OkCorsResponse<Vec<ApiRecord>> { 112 + let Context { storage, .. } = ctx.context(); 117 113 118 - // // let collections = collection_query 119 - // // .into_inner() 120 - // // .to_multiple_nsids() 121 - // // .map_err(|reason| HttpError::for_bad_request(None, reason))?; 114 + let collections = collection_query 115 + .into_inner() 116 + .to_multiple_nsids() 117 + .map_err(|reason| HttpError::for_bad_request(None, reason))?; 122 118 123 - // // let mut api_records = Vec::new(); 119 + let records = storage 120 + .get_records_by_collections(&collections, 100) 121 + .await 122 + .map_err(|e| HttpError::for_internal_error(e.to_string()))? 123 + .into_iter() 124 + .map(|r| r.into()) 125 + .collect(); 124 126 125 - // // // TODO: set up multiple db iterators and iterate them together with merge sort 126 - // // for collection in &collections { 127 - // // let records = storage 128 - // // .get_collection_records(collection, 100) 129 - // // .await 130 - // // .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 127 + ok_cors(records) 128 + } 131 129 132 - // // for record in records { 133 - // // let api_record = ApiRecord::from_create_record(record, collection); 134 - // // api_records.push(api_record); 135 - // // } 136 - // // } 137 - 138 - // // ok_cors(api_records) 139 - // // } 140 - 141 - // /// Get total records seen by collection 142 - // #[endpoint { 143 - // method = GET, 144 - // path = "/records/total-seen" 145 - // }] 146 - // async fn get_records_total_seen( 147 - // ctx: RequestContext<Context>, 148 - // collection_query: Query<CollectionsQuery>, 149 - // ) -> OkCorsResponse<HashMap<String, u64>> { 150 - // let Context { storage, .. } = ctx.context(); 130 + #[derive(Debug, Serialize, JsonSchema)] 131 + struct TotalCounts { 132 + total_records: u64, 133 + dids_estimate: u64, 134 + } 135 + /// Get total records seen by collection 136 + #[endpoint { 137 + method = GET, 138 + path = "/records/total-seen" 139 + }] 140 + async fn get_records_total_seen( 141 + ctx: RequestContext<Context>, 142 + collection_query: Query<CollectionsQuery>, 143 + ) -> OkCorsResponse<HashMap<String, TotalCounts>> { 144 + let Context { storage, .. } = ctx.context(); 151 145 152 - // let collections = collection_query 153 - // .into_inner() 154 - // .to_multiple_nsids() 155 - // .map_err(|reason| HttpError::for_bad_request(None, reason))?; 156 - 157 - // let mut seen_by_collection = HashMap::with_capacity(collections.len()); 146 + let collections = collection_query 147 + .into_inner() 148 + .to_multiple_nsids() 149 + .map_err(|reason| HttpError::for_bad_request(None, reason))?; 158 150 159 - // for collection in &collections { 160 - // let total = storage 161 - // .get_collection_total_seen(collection) 162 - // .await 163 - // .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 151 + let mut seen_by_collection = HashMap::with_capacity(collections.len()); 164 152 165 - // seen_by_collection.insert(collection.to_string(), total); 166 - // } 153 + for collection in &collections { 154 + let (total_records, dids_estimate) = storage 155 + .get_counts_by_collection(collection) 156 + .await 157 + .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 167 158 168 - // ok_cors(seen_by_collection) 169 - // } 159 + seen_by_collection.insert( 160 + collection.to_string(), 161 + TotalCounts { 162 + total_records, 163 + dids_estimate, 164 + }, 165 + ); 166 + } 170 167 171 - // /// Get top collections 172 - // #[endpoint { 173 - // method = GET, 174 - // path = "/collections" 175 - // }] 176 - // async fn get_top_collections(ctx: RequestContext<Context>) -> OkCorsResponse<HashMap<String, u64>> { 177 - // let Context { storage, .. } = ctx.context(); 178 - // let collections = storage 179 - // .get_top_collections() 180 - // .await 181 - // .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 168 + ok_cors(seen_by_collection) 169 + } 182 170 183 - // ok_cors(collections) 184 - // } 171 + /// Get top collections 172 + #[endpoint { 173 + method = GET, 174 + path = "/collections" 175 + }] 176 + async fn get_top_collections(ctx: RequestContext<Context>) -> OkCorsResponse<TopCollections> { 177 + let Context { storage, .. } = ctx.context(); 178 + let collections = storage 179 + .get_top_collections() 180 + .await 181 + .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 185 182 186 - // impl Context { 187 - // fn what() -> () {} 188 - // } 183 + ok_cors(collections) 184 + } 189 185 190 186 pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 191 - 192 187 let log = ConfigLogging::StderrTerminal { 193 188 level: ConfigLoggingLevel::Info, 194 189 } ··· 199 194 200 195 api.register(get_openapi).unwrap(); 201 196 api.register(get_meta_info).unwrap(); 202 - // // api.register(get_records_by_collection).unwrap(); 203 - // api.register(get_records_total_seen).unwrap(); 204 - // api.register(get_top_collections).unwrap(); 197 + api.register(get_records_by_collection).unwrap(); 198 + api.register(get_records_total_seen).unwrap(); 199 + api.register(get_top_collections).unwrap(); 205 200 206 201 let context = Context { 207 202 spec: Arc::new(
+7 -8
ufos/src/storage.rs
··· 1 1 // use crate::store_types::CountsValue; 2 2 use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, TopCollections, UFOsRecord}; 3 + use async_trait::async_trait; 3 4 use jetstream::exports::{Did, Nsid}; 4 5 use std::path::Path; 5 - use async_trait::async_trait; 6 6 7 7 pub type StorageResult<T> = Result<T, StorageError>; 8 8 ··· 30 30 31 31 #[async_trait] 32 32 pub trait StoreReader: Send + Sync { 33 - fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 34 - async fn get_storage_stats_a(&self) -> StorageResult<serde_json::Value>; 33 + async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 35 34 36 - fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 35 + async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 37 36 38 - fn get_top_collections(&self) -> StorageResult<TopCollections>; 37 + async fn get_top_collections(&self) -> StorageResult<TopCollections>; 39 38 40 - fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>; 39 + async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>; 41 40 42 - fn get_records_by_collections( 41 + async fn get_records_by_collections( 43 42 &self, 44 - collections: &[&Nsid], 43 + collections: &[Nsid], 45 44 limit: usize, 46 45 ) -> StorageResult<Vec<UFOsRecord>>; 47 46 }
+57 -28
ufos/src/storage_fjall.rs
··· 15 15 use crate::{ 16 16 CommitAction, ConsumerInfo, Did, EventBatch, Nsid, RecordKey, TopCollections, UFOsRecord, 17 17 }; 18 + use async_trait::async_trait; 18 19 use fjall::{ 19 20 Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, 20 21 }; ··· 26 27 use std::time::{Duration, Instant, SystemTime}; 27 28 use tokio::sync::mpsc::Receiver; 28 29 use tokio::time::{interval_at, sleep}; 29 - use async_trait::async_trait; 30 30 31 31 /// Commit the RW batch immediately if this number of events have been read off the mod queue 32 32 const MAX_BATCHED_RW_EVENTS: usize = 18; ··· 301 301 } 302 302 } 303 303 304 - #[async_trait] 305 - impl StoreReader for FjallReader { 304 + impl FjallReader { 306 305 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 307 306 let rollup_cursor = 308 307 get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? ··· 314 313 "keyspace_sequence": self.keyspace.instant(), 315 314 "rollup_cursor": rollup_cursor, 316 315 })) 317 - } 318 - async fn get_storage_stats_a(&self) -> StorageResult<serde_json::Value> { 319 - let s = self.clone(); 320 - tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await? 321 316 } 322 317 323 318 fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { ··· 427 422 428 423 fn get_records_by_collections( 429 424 &self, 430 - collections: &[&Nsid], 425 + collections: &[Nsid], 431 426 limit: usize, 432 427 ) -> StorageResult<Vec<UFOsRecord>> { 433 428 if collections.is_empty() { ··· 467 462 merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 468 463 } 469 464 Ok(merged) 465 + } 466 + } 467 + 468 + #[async_trait] 469 + impl StoreReader for FjallReader { 470 + async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 471 + let s = self.clone(); 472 + tokio::task::spawn_blocking(move || FjallReader::get_storage_stats(&s)).await? 473 + } 474 + async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 475 + let s = self.clone(); 476 + tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 477 + } 478 + async fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 479 + let s = self.clone(); 480 + tokio::task::spawn_blocking(move || FjallReader::get_top_collections(&s)).await? 481 + } 482 + async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 483 + let s = self.clone(); 484 + let collection = collection.clone(); 485 + tokio::task::spawn_blocking(move || FjallReader::get_counts_by_collection(&s, &collection)) 486 + .await? 487 + } 488 + async fn get_records_by_collections( 489 + &self, 490 + collections: &[Nsid], 491 + limit: usize, 492 + ) -> StorageResult<Vec<UFOsRecord>> { 493 + let s = self.clone(); 494 + let collections = collections.to_vec(); 495 + tokio::task::spawn_blocking(move || { 496 + FjallReader::get_records_by_collections(&s, &collections, limit) 497 + }) 498 + .await? 470 499 } 471 500 } 472 501 ··· 1665 1694 assert_eq!(records, 0); 1666 1695 assert_eq!(dids, 0); 1667 1696 1668 - let records = read.get_records_by_collections(&[&collection], 2)?; 1697 + let records = read.get_records_by_collections(&[collection], 2)?; 1669 1698 assert_eq!(records.len(), 1); 1670 1699 let rec = &records[0]; 1671 1700 assert_eq!(rec.record.get(), "{}"); 1672 1701 assert!(!rec.is_update); 1673 1702 1674 1703 let records = 1675 - read.get_records_by_collections(&[&Nsid::new("d.e.f".to_string()).unwrap()], 2)?; 1704 + read.get_records_by_collections(&[Nsid::new("d.e.f".to_string()).unwrap()], 2)?; 1676 1705 assert_eq!(records.len(), 0); 1677 1706 1678 1707 Ok(()) ··· 1714 1743 1715 1744 let records = read.get_records_by_collections( 1716 1745 &[ 1717 - &Nsid::new("a.a.a".to_string()).unwrap(), 1718 - &Nsid::new("a.a.b".to_string()).unwrap(), 1719 - &Nsid::new("a.a.c".to_string()).unwrap(), 1746 + Nsid::new("a.a.a".to_string()).unwrap(), 1747 + Nsid::new("a.a.b".to_string()).unwrap(), 1748 + Nsid::new("a.a.c".to_string()).unwrap(), 1720 1749 ], 1721 1750 100, 1722 1751 )?; ··· 1772 1801 assert_eq!(records, 1); 1773 1802 assert_eq!(dids, 1); 1774 1803 1775 - let records = read.get_records_by_collections(&[&collection], 2)?; 1804 + let records = read.get_records_by_collections(&[collection], 2)?; 1776 1805 assert_eq!(records.len(), 1); 1777 1806 let rec = &records[0]; 1778 1807 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); ··· 1810 1839 assert_eq!(records, 1); 1811 1840 assert_eq!(dids, 1); 1812 1841 1813 - let records = read.get_records_by_collections(&[&collection], 2)?; 1842 + let records = read.get_records_by_collections(&[collection], 2)?; 1814 1843 assert_eq!(records.len(), 0); 1815 1844 1816 1845 Ok(()) ··· 1856 1885 write.insert_batch(batch.batch)?; 1857 1886 1858 1887 let records = 1859 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1888 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1860 1889 assert_eq!(records.len(), 1); 1861 1890 let records = 1862 - read.get_records_by_collections(&[&Nsid::new("a.a.b".to_string()).unwrap()], 100)?; 1891 + read.get_records_by_collections(&[Nsid::new("a.a.b".to_string()).unwrap()], 100)?; 1863 1892 assert_eq!(records.len(), 10); 1864 1893 let records = 1865 - read.get_records_by_collections(&[&Nsid::new("a.a.c".to_string()).unwrap()], 100)?; 1894 + read.get_records_by_collections(&[Nsid::new("a.a.c".to_string()).unwrap()], 100)?; 1866 1895 assert_eq!(records.len(), 1); 1867 1896 let records = 1868 - read.get_records_by_collections(&[&Nsid::new("a.a.d".to_string()).unwrap()], 100)?; 1897 + read.get_records_by_collections(&[Nsid::new("a.a.d".to_string()).unwrap()], 100)?; 1869 1898 assert_eq!(records.len(), 0); 1870 1899 1871 1900 write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?; ··· 1874 1903 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?; 1875 1904 1876 1905 let records = 1877 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1906 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1878 1907 assert_eq!(records.len(), 1); 1879 1908 let records = 1880 - read.get_records_by_collections(&[&Nsid::new("a.a.b".to_string()).unwrap()], 100)?; 1909 + read.get_records_by_collections(&[Nsid::new("a.a.b".to_string()).unwrap()], 100)?; 1881 1910 assert_eq!(records.len(), 6); 1882 1911 let records = 1883 - read.get_records_by_collections(&[&Nsid::new("a.a.c".to_string()).unwrap()], 100)?; 1912 + read.get_records_by_collections(&[Nsid::new("a.a.c".to_string()).unwrap()], 100)?; 1884 1913 assert_eq!(records.len(), 1); 1885 1914 let records = 1886 - read.get_records_by_collections(&[&Nsid::new("a.a.d".to_string()).unwrap()], 100)?; 1915 + read.get_records_by_collections(&[Nsid::new("a.a.d".to_string()).unwrap()], 100)?; 1887 1916 assert_eq!(records.len(), 0); 1888 1917 1889 1918 Ok(()) ··· 1917 1946 write.insert_batch(batch.batch)?; 1918 1947 1919 1948 let records = 1920 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1949 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1921 1950 assert_eq!(records.len(), 3); 1922 1951 1923 1952 let records_deleted = ··· 1925 1954 assert_eq!(records_deleted, 2); 1926 1955 1927 1956 let records = 1928 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1957 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1929 1958 assert_eq!(records.len(), 1); 1930 1959 1931 1960 Ok(()) ··· 1954 1983 write.step_rollup()?; 1955 1984 1956 1985 let records = 1957 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1986 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1958 1987 assert_eq!(records.len(), 0); 1959 1988 1960 1989 Ok(()) ··· 1984 2013 write.insert_batch(batch.batch)?; 1985 2014 1986 2015 let records = 1987 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 2016 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1988 2017 assert_eq!(records.len(), 1); 1989 2018 1990 2019 let n = write.step_rollup()?; 1991 2020 assert_eq!(n, 1); 1992 2021 1993 2022 let records = 1994 - read.get_records_by_collections(&[&Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 2023 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1995 2024 assert_eq!(records.len(), 0); 1996 2025 1997 2026 let mut batch = TestBatch::default();