Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

pass time bounds from server to db reader

+45 -13
+4 -2
Cargo.lock
··· 614 614 615 615 [[package]] 616 616 name = "chrono" 617 - version = "0.4.40" 617 + version = "0.4.41" 618 618 source = "registry+https://github.com/rust-lang/crates.io-index" 619 - checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" 619 + checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" 620 620 dependencies = [ 621 621 "android-tzdata", 622 622 "iana-time-zone", ··· 3006 3006 source = "registry+https://github.com/rust-lang/crates.io-index" 3007 3007 checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" 3008 3008 dependencies = [ 3009 + "chrono", 3009 3010 "dyn-clone", 3010 3011 "schemars_derive", 3011 3012 "serde", ··· 3812 3813 "base64 0.22.1", 3813 3814 "bincode 2.0.1", 3814 3815 "cardinality-estimator-safe", 3816 + "chrono", 3815 3817 "clap", 3816 3818 "dropshot", 3817 3819 "env_logger",
+2 -1
jetstream/src/events.rs
··· 92 92 /// 93 93 /// Warning: this exploits the internal implementation detail of jetstream cursors 94 94 /// being ~microsecond timestamps. 95 - pub fn at(t: SystemTime) -> Self { 95 + pub fn at(t: impl Into<SystemTime>) -> Self { 96 96 let unix_dt = t 97 + .into() 97 98 .duration_since(UNIX_EPOCH) 98 99 .expect("cannot set jetstream cursor earlier than unix epoch"); 99 100 Self(unix_dt.as_micros() as u64)
+2 -1
ufos/Cargo.toml
··· 9 9 base64 = "0.22.1" 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 11 cardinality-estimator-safe = { version = "4.0.1", features = ["with_serde", "with_digest"] } 12 + chrono = { version = "0.4.41", features = ["serde"] } 12 13 clap = { version = "4.5.31", features = ["derive"] } 13 14 dropshot = "0.16.0" 14 15 env_logger = "0.11.7" ··· 18 19 jetstream = { path = "../jetstream" } 19 20 log = "0.4.26" 20 21 lsm-tree = "2.6.6" 21 - schemars = { version = "0.8.22", features = ["raw_value"] } 22 + schemars = { version = "0.8.22", features = ["raw_value", "chrono"] } 22 23 semver = "1.0.26" 23 24 serde = "1.0.219" 24 25 serde_json = "1.0.140"
+22 -2
ufos/src/server.rs
··· 1 1 use crate::index_html::INDEX_HTML; 2 2 use crate::storage::StoreReader; 3 + use crate::store_types::HourTruncatedCursor; 3 4 use crate::{ConsumerInfo, Nsid, NsidCount, QueryPeriod, TopCollections, UFOsRecord}; 4 5 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 6 + use chrono::{DateTime, Utc}; 5 7 use dropshot::endpoint; 6 8 use dropshot::ApiDescription; 7 9 use dropshot::Body; ··· 23 25 struct Context { 24 26 pub spec: Arc<serde_json::Value>, 25 27 storage: Box<dyn StoreReader>, 28 + } 29 + 30 + fn dt_to_cursor(dt: DateTime<Utc>) -> Result<HourTruncatedCursor, HttpError> { 31 + let t = dt.timestamp_micros(); 32 + if t < 0 { 33 + Err(HttpError::for_bad_request(None, "timestamp too old".into())) 34 + } else { 35 + Ok(HourTruncatedCursor::truncate_raw_u64(t as u64)) 36 + } 26 37 } 27 38 28 39 /// Serve index page as html ··· 230 241 limit: usize, 231 242 /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request. 232 243 cursor: Option<String>, 244 + /// Limit collections and statistics to those seen after this UTC datetime 245 + since: Option<DateTime<Utc>>, 246 + /// Limit collections and statistics to those seen before this UTC datetime 247 + until: Option<DateTime<Utc>>, 233 248 } 234 249 fn all_collections_default_limit() -> usize { 235 250 100 ··· 240 255 }] 241 256 /// Get all collections 242 257 /// 243 - /// There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests using the `cursor` response property and request parameter to get them all. 258 + /// There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all. 244 259 /// 245 260 /// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot: 246 261 /// ··· 251 266 /// - no duplicate NSIDs will occur in the combined results 252 267 /// 253 268 /// In practice this is close enough for most use-cases to not worry about. 269 + /// 270 + /// Statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 254 271 async fn get_all_collections( 255 272 ctx: RequestContext<Context>, 256 273 query: Query<AllCollectionsQuery>, ··· 270 287 .transpose() 271 288 .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 272 289 290 + let since = q.since.map(dt_to_cursor).transpose()?; 291 + let until = q.until.map(dt_to_cursor).transpose()?; 292 + 273 293 let (collections, next_cursor) = storage 274 - .get_all_collections(QueryPeriod::all_time(), q.limit, cursor) 294 + .get_all_collections(q.limit, cursor, since, until) 275 295 .await 276 296 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 277 297
+3 -2
ufos/src/storage.rs
··· 1 - use crate::store_types::SketchSecretPrefix; 1 + use crate::store_types::{HourTruncatedCursor, SketchSecretPrefix}; 2 2 use crate::{ 3 3 error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, QueryPeriod, TopCollections, 4 4 UFOsRecord, ··· 78 78 79 79 async fn get_all_collections( 80 80 &self, 81 - period: QueryPeriod, 82 81 limit: usize, 83 82 cursor: Option<Vec<u8>>, 83 + since: Option<HourTruncatedCursor>, 84 + until: Option<HourTruncatedCursor>, 84 85 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 85 86 86 87 async fn get_top_collections_by_count(
+6 -4
ufos/src/storage_fjall.rs
··· 375 375 376 376 fn get_all_collections( 377 377 &self, 378 - period: QueryPeriod, 379 378 limit: usize, 380 379 cursor: Option<Vec<u8>>, 380 + _since: Option<HourTruncatedCursor>, 381 + _until: Option<HourTruncatedCursor>, 381 382 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 382 - Ok(if period.is_all_time() { 383 + Ok(if true { 383 384 let snapshot = self.rollups.snapshot(); 384 385 385 386 let start = if let Some(cursor_bytes) = cursor { ··· 627 628 } 628 629 async fn get_all_collections( 629 630 &self, 630 - period: QueryPeriod, 631 631 limit: usize, 632 632 cursor: Option<Vec<u8>>, 633 + since: Option<HourTruncatedCursor>, 634 + until: Option<HourTruncatedCursor>, 633 635 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 634 636 let s = self.clone(); 635 637 tokio::task::spawn_blocking(move || { 636 - FjallReader::get_all_collections(&s, period, limit, cursor) 638 + FjallReader::get_all_collections(&s, limit, cursor, since, until) 637 639 }) 638 640 .await? 639 641 }
+2 -1
ufos/src/storage_mem.rs
··· 596 596 } 597 597 async fn get_all_collections( 598 598 &self, 599 - _: QueryPeriod, 600 599 _: usize, 601 600 _: Option<Vec<u8>>, 601 + _: Option<HourTruncatedCursor>, 602 + _: Option<HourTruncatedCursor>, 602 603 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 603 604 todo!() 604 605 }
+4
ufos/src/store_types.rs
··· 400 400 pub fn try_from_cursor(cursor: Cursor) -> Result<Self, EncodingError> { 401 401 Self::try_from_raw_u64(cursor.to_raw_u64()) 402 402 } 403 + pub fn truncate_raw_u64(raw: u64) -> Self { 404 + let truncated = Self::truncate(raw); 405 + Self(truncated) 406 + } 403 407 pub fn truncate_cursor(cursor: Cursor) -> Self { 404 408 let raw = cursor.to_raw_u64(); 405 409 let truncated = Self::truncate(raw);