Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

merge time prefix iterators for collections

get stats for all collections over arbitrary time periods!!!!!!!

+63 -19
+1 -1
ufos/src/server.rs
··· 39 39 .unwrap() 40 40 .as_micros() as u64; 41 41 const ONE_HOUR: u64 = 60 * 60 * 1_000_000; 42 - if t < t_now || (t - t_now <= 2 * ONE_HOUR) { 42 + if t > t_now && (t - t_now > 2 * ONE_HOUR) { 43 43 Err(HttpError::for_bad_request(None, "future timestamp".into())) 44 44 } else { 45 45 Ok(HourTruncatedCursor::truncate_raw_u64(t))
+39 -12
ufos/src/storage_fjall.rs
··· 414 414 415 415 type Item = StorageResult<(Nsid, fjall::Slice)>; 416 416 let mut iters = Vec::with_capacity(buckets.len()); 417 - for bucket in buckets { 417 + for bucket in &buckets { 418 418 match bucket { 419 - CursorBucket::Hour(_hour) => todo!(), 420 - CursorBucket::Week(_week) => todo!(), 419 + CursorBucket::Hour(hour) => { 420 + let prefix = HourlyRollupKey::week_prefix(*hour); 421 + let start = if let Some(ref nsid) = cursor_nsid { 422 + Bound::Excluded(HourlyRollupKey::new(*hour, &nsid.clone()).to_db_bytes()?) 423 + } else { 424 + Bound::Included(HourlyRollupKey::from_prefix_to_db_bytes(&prefix)?) 425 + }; 426 + let end = Bound::Excluded(HourlyRollupKey::prefix_range_end(&prefix)?); 427 + let it = snapshot.range((start, end)).map(|kv| match kv { 428 + Ok((k_bytes, v_bytes)) => db_complete::<HourlyRollupKey>(&k_bytes) 429 + .map(|key| (key.collection().clone(), v_bytes)) 430 + .map_err(|e| e.into()), 431 + Err(e) => Err(e.into()), // lsm-tree error into fjall error 432 + }); 433 + let boxed: Box<dyn Iterator<Item = Item>> = Box::new(it); 434 + iters.push(boxed.peekable()); 435 + } 436 + CursorBucket::Week(week) => { 437 + let prefix = WeeklyRollupKey::week_prefix(*week); 438 + let start = if let Some(ref nsid) = cursor_nsid { 439 + Bound::Excluded(WeeklyRollupKey::new(*week, &nsid.clone()).to_db_bytes()?) 440 + } else { 441 + Bound::Included(WeeklyRollupKey::from_prefix_to_db_bytes(&prefix)?) 442 + }; 443 + let end = Bound::Excluded(WeeklyRollupKey::prefix_range_end(&prefix)?); 444 + let it = snapshot.range((start, end)).map(|kv| match kv { 445 + Ok((k_bytes, v_bytes)) => db_complete::<WeeklyRollupKey>(&k_bytes) 446 + .map(|key| (key.collection().clone(), v_bytes)) 447 + .map_err(|e| e.into()), 448 + Err(e) => Err(e.into()), // lsm-tree error into fjall error 449 + }); 450 + let boxed: Box<dyn Iterator<Item = Item>> = Box::new(it); 451 + iters.push(boxed.peekable()); 452 + } 421 453 CursorBucket::AllTime => { 454 + let prefix = Default::default(); 422 455 let start = if let Some(ref nsid) = cursor_nsid { 423 - Bound::Excluded( 424 - AllTimeRollupKey::from_pair(Default::default(), nsid.clone()) 425 - .to_db_bytes()?, 426 - ) 456 + Bound::Excluded(AllTimeRollupKey::new(nsid).to_db_bytes()?) 427 457 } else { 428 - Bound::Included(AllTimeRollupKey::from_prefix_to_db_bytes( 429 - &Default::default(), 430 - )?) 458 + Bound::Included(AllTimeRollupKey::from_prefix_to_db_bytes(&prefix)?) 431 459 }; 432 - let end = 433 - Bound::Excluded(AllTimeRollupKey::prefix_range_end(&Default::default())?); 460 + let end = Bound::Excluded(AllTimeRollupKey::prefix_range_end(&prefix)?); 434 461 let it = snapshot.range((start, end)).map(|kv| match kv { 435 462 Ok((k_bytes, v_bytes)) => db_complete::<AllTimeRollupKey>(&k_bytes) 436 463 .map(|key| (key.collection().clone(), v_bytes))
+23 -6
ufos/src/store_types.rs
··· 310 310 311 311 static_str!("hourly_counts", _HourlyRollupStaticStr); 312 312 pub type HourlyRollupStaticPrefix = DbStaticStr<_HourlyRollupStaticStr>; 313 - pub type HourlyRollupKey = DbConcat<DbConcat<HourlyRollupStaticPrefix, HourTruncatedCursor>, Nsid>; 313 + pub type HourlyRollupKeyHourPrefix = DbConcat<HourlyRollupStaticPrefix, HourTruncatedCursor>; 314 + pub type HourlyRollupKey = DbConcat<HourlyRollupKeyHourPrefix, Nsid>; 314 315 impl HourlyRollupKey { 315 - pub fn new(hourly_cursor: HourTruncatedCursor, nsid: &Nsid) -> Self { 316 + pub fn new(cursor: HourTruncatedCursor, nsid: &Nsid) -> Self { 316 317 Self::from_pair( 317 - DbConcat::from_pair(Default::default(), hourly_cursor), 318 + DbConcat::from_pair(Default::default(), cursor), 318 319 nsid.clone(), 319 320 ) 320 321 } 322 + pub fn week_prefix(cursor: HourTruncatedCursor) -> HourlyRollupKeyHourPrefix { 323 + HourlyRollupKeyHourPrefix::from_pair(Default::default(), cursor) 324 + } 321 325 pub fn cursor(&self) -> HourTruncatedCursor { 322 326 self.prefix.suffix 327 + } 328 + pub fn collection(&self) -> &Nsid { 329 + &self.suffix 323 330 } 324 331 } 325 332 pub type HourlyRollupVal = CountsValue; ··· 332 339 333 340 static_str!("weekly_counts", _WeeklyRollupStaticStr); 334 341 pub type WeeklyRollupStaticPrefix = DbStaticStr<_WeeklyRollupStaticStr>; 335 - pub type WeeklyRollupKey = DbConcat<DbConcat<WeeklyRollupStaticPrefix, WeekTruncatedCursor>, Nsid>; 342 + pub type WeeklyRollupKeyWeekPrefix = DbConcat<WeeklyRollupStaticPrefix, WeekTruncatedCursor>; 343 + pub type WeeklyRollupKey = DbConcat<WeeklyRollupKeyWeekPrefix, Nsid>; 336 344 impl WeeklyRollupKey { 337 - pub fn new(weekly_cursor: WeekTruncatedCursor, nsid: &Nsid) -> Self { 345 + pub fn new(cursor: WeekTruncatedCursor, nsid: &Nsid) -> Self { 338 346 Self::from_pair( 339 - DbConcat::from_pair(Default::default(), weekly_cursor), 347 + DbConcat::from_pair(Default::default(), cursor), 340 348 nsid.clone(), 341 349 ) 350 + } 351 + pub fn week_prefix(cursor: WeekTruncatedCursor) -> WeeklyRollupKeyWeekPrefix { 352 + WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), cursor) 353 + } 354 + pub fn cursor(&self) -> WeekTruncatedCursor { 355 + self.prefix.suffix 356 + } 357 + pub fn collection(&self) -> &Nsid { 358 + &self.suffix 342 359 } 343 360 } 344 361 pub type WeeklyRollupVal = CountsValue;