This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1use std::sync::Arc; 2use std::sync::Mutex; 3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 4 5use bobbin_edge_index::HydrantCursor; 6use bobbin_runtime::RuntimeHasher; 7use bobbin_types::edges::{Edge, Record}; 8use bobbin_types::ids::RepoIdent; 9use bytes::Bytes; 10use jacquard_common::DefaultStr; 11use jacquard_common::types::did::Did; 12use jacquard_common::types::nsid::Nsid; 13use jacquard_common::types::recordkey::Rkey; 14use jacquard_common::types::string::{AtUri, Cid}; 15use scc::HashMap as SccMap; 16use scc::hash_map::Entry; 17 18pub struct ParkedUpsert { 19 pub cursor: HydrantCursor, 20 pub source: AtUri<DefaultStr>, 21 pub nsid: Nsid<DefaultStr>, 22 pub parsed: Record, 23 pub bytes: Bytes, 24 pub cid: Option<Cid<DefaultStr>>, 25 pub edges: Vec<Edge>, 26} 27 28struct EntryState { 29 upsert: ParkedUpsert, 30 deps: Vec<RepoIdent>, 31} 32 33type EntryHandle = Arc<Mutex<Option<EntryState>>>; 34 35pub struct WarmingBuffer { 36 by_source: SccMap<AtUri<DefaultStr>, EntryHandle, RuntimeHasher>, 37 by_key: SccMap<RepoIdent, Vec<EntryHandle>, RuntimeHasher>, 38 hasher: RuntimeHasher, 39 sealed: AtomicBool, 40 active_parks: AtomicU64, 41 enqueued_total: AtomicU64, 42 drained_observe_total: AtomicU64, 43 drained_promote_total: AtomicU64, 44 evicted_total: AtomicU64, 45 rejected_after_seal: AtomicU64, 46 distinct_keys_seen: AtomicU64, 47 current_entries: AtomicU64, 48 max_concurrent_entries: AtomicU64, 49 dep_enqueued_total: AtomicU64, 50 dep_drained_observe_total: AtomicU64, 51} 52 53#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] 54pub struct WarmingBufferSnapshot { 55 pub enqueued_total: u64, 56 pub drained_observe_total: u64, 57 pub drained_promote_total: u64, 58 pub evicted_total: u64, 59 pub rejected_after_seal: u64, 60 pub distinct_keys_seen: u64, 61 pub current_entries: u64, 62 pub max_concurrent_entries: u64, 63 pub dep_enqueued_total: u64, 64 pub dep_drained_observe_total: u64, 65} 66 67impl WarmingBuffer { 68 pub fn new(hasher: RuntimeHasher) -> Self { 69 Self { 70 by_source: SccMap::with_hasher(hasher.clone()), 71 by_key: SccMap::with_hasher(hasher.clone()), 72 hasher, 73 sealed: AtomicBool::new(false), 74 active_parks: AtomicU64::new(0), 75 enqueued_total: AtomicU64::new(0), 76 drained_observe_total: AtomicU64::new(0), 77 drained_promote_total: AtomicU64::new(0), 78 evicted_total: AtomicU64::new(0), 79 rejected_after_seal: AtomicU64::new(0), 80 distinct_keys_seen: AtomicU64::new(0), 81 current_entries: AtomicU64::new(0), 82 max_concurrent_entries: AtomicU64::new(0), 83 dep_enqueued_total: AtomicU64::new(0), 84 dep_drained_observe_total: AtomicU64::new(0), 85 } 86 } 87 88 pub fn hasher(&self) -> &RuntimeHasher { 89 &self.hasher 90 } 91 92 pub fn is_sealed(&self) -> bool { 93 self.sealed.load(Ordering::Acquire) 94 } 95 96 pub async fn try_park( 97 &self, 98 upsert: ParkedUpsert, 99 deps: Vec<RepoIdent>, 100 ) -> Result<(), ParkedUpsert> { 101 self.active_parks.fetch_add(1, Ordering::AcqRel); 102 let outcome = if self.is_sealed() { 103 self.rejected_after_seal.fetch_add(1, Ordering::Relaxed); 104 Err(upsert) 105 } else { 106 self.park(upsert, deps).await; 107 Ok(()) 108 }; 109 self.active_parks.fetch_sub(1, Ordering::Release); 110 outcome 111 } 112 113 async fn park(&self, upsert: ParkedUpsert, deps: Vec<RepoIdent>) { 114 debug_assert!( 115 !deps.is_empty(), 116 "park requires at least one unresolved dep" 117 ); 118 let dep_count = deps.len() as u64; 119 let source = upsert.source.clone(); 120 let handle: EntryHandle = Arc::new(Mutex::new(Some(EntryState { 121 upsert, 122 deps: deps.clone(), 123 }))); 124 125 let prior = match self.by_source.entry_async(source).await { 126 Entry::Occupied(mut occ) => Some(occ.insert(handle.clone())), 127 Entry::Vacant(vac) => { 128 vac.insert_entry(handle.clone()); 129 None 130 } 131 }; 132 if let Some(prior) = prior { 133 let prior_deps = self.snapshot_deps(&prior); 134 if self.deactivate_handle(&prior) { 135 self.evicted_total.fetch_add(1, Ordering::Relaxed); 136 self.cleanup_by_key(&prior, &prior_deps).await; 137 } 138 } 139 140 let _ = futures::future::join_all(deps.into_iter().map(|dep| { 141 let h = handle.clone(); 142 async move { self.register_dep(dep, h).await } 143 })) 144 .await; 145 146 self.enqueued_total.fetch_add(1, Ordering::Relaxed); 147 self.dep_enqueued_total 148 .fetch_add(dep_count, Ordering::Relaxed); 149 let cur = self.current_entries.fetch_add(1, Ordering::Relaxed) + 1; 150 self.max_concurrent_entries 151 .fetch_max(cur, Ordering::Relaxed); 152 } 153 154 async fn register_dep(&self, dep: RepoIdent, handle: EntryHandle) { 155 let was_new = match self.by_key.entry_async(dep).await { 156 Entry::Occupied(mut occ) => { 157 occ.get_mut().push(handle); 158 false 159 } 160 Entry::Vacant(vac) => { 161 vac.insert_entry(vec![handle]); 162 true 163 } 164 }; 165 if was_new { 166 self.distinct_keys_seen.fetch_add(1, Ordering::Relaxed); 167 } 168 } 169 170 pub async fn evict_source(&self, source: &AtUri<DefaultStr>) -> bool { 171 let Some((_, handle)) = self.by_source.remove_async(source).await else { 172 return false; 173 }; 174 let deps = self.snapshot_deps(&handle); 175 if !self.deactivate_handle(&handle) { 176 return false; 177 } 178 self.evicted_total.fetch_add(1, Ordering::Relaxed); 179 self.cleanup_by_key(&handle, &deps).await; 180 true 181 } 182 183 pub async fn take_observed( 184 &self, 185 owner: &Did<DefaultStr>, 186 rkey: &Rkey<DefaultStr>, 187 ) -> Vec<ParkedUpsert> { 188 let park_key = RepoIdent::new(owner.clone(), rkey.clone()); 189 let Some((_, handles)) = self.by_key.remove_async(&park_key).await else { 190 return Vec::new(); 191 }; 192 let mut drained: Vec<ParkedUpsert> = Vec::new(); 193 let mut sources_to_remove: Vec<AtUri<DefaultStr>> = Vec::new(); 194 let mut dep_matches: u64 = 0; 195 handles.into_iter().for_each(|handle| { 196 let outcome = { 197 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned"); 198 match slot.as_mut() { 199 Some(state) => { 200 let before = state.deps.len(); 201 state.deps.retain(|d| d != &park_key); 202 let after = state.deps.len(); 203 Some((before > after, after == 0)) 204 } 205 None => None, 206 } 207 }; 208 let Some((matched, drain_now)) = outcome else { 209 return; 210 }; 211 if matched { 212 dep_matches += 1; 213 } 214 if drain_now { 215 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned"); 216 if let Some(state) = slot.take() { 217 sources_to_remove.push(state.upsert.source.clone()); 218 drained.push(state.upsert); 219 } 220 } 221 }); 222 if dep_matches > 0 { 223 self.dep_drained_observe_total 224 .fetch_add(dep_matches, Ordering::Relaxed); 225 } 226 let drained_count = drained.len() as u64; 227 if drained_count > 0 { 228 self.drained_observe_total 229 .fetch_add(drained_count, Ordering::Relaxed); 230 self.current_entries 231 .fetch_sub(drained_count, Ordering::Relaxed); 232 let _ = futures::future::join_all( 233 sources_to_remove 234 .into_iter() 235 .map(|source| async move { self.by_source.remove_async(&source).await }), 236 ) 237 .await; 238 } 239 drained 240 } 241 242 pub async fn drain_for_promote(&self) -> Vec<(ParkedUpsert, Vec<RepoIdent>)> { 243 self.sealed.store(true, Ordering::Release); 244 while self.active_parks.load(Ordering::Acquire) > 0 { 245 tokio::task::yield_now().await; 246 } 247 let mut drained: Vec<(ParkedUpsert, Vec<RepoIdent>)> = Vec::new(); 248 self.by_source 249 .retain_async(|_source, handle| { 250 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned"); 251 if let Some(state) = slot.take() { 252 drained.push((state.upsert, state.deps)); 253 } 254 false 255 }) 256 .await; 257 let count = drained.len() as u64; 258 if count > 0 { 259 self.drained_promote_total 260 .fetch_add(count, Ordering::Relaxed); 261 self.current_entries.fetch_sub(count, Ordering::Relaxed); 262 } 263 self.by_key.clear_async().await; 264 drained 265 } 266 267 pub fn snapshot(&self) -> WarmingBufferSnapshot { 268 WarmingBufferSnapshot { 269 enqueued_total: self.enqueued_total.load(Ordering::Relaxed), 270 drained_observe_total: self.drained_observe_total.load(Ordering::Relaxed), 271 drained_promote_total: self.drained_promote_total.load(Ordering::Relaxed), 272 evicted_total: self.evicted_total.load(Ordering::Relaxed), 273 rejected_after_seal: self.rejected_after_seal.load(Ordering::Relaxed), 274 distinct_keys_seen: self.distinct_keys_seen.load(Ordering::Relaxed), 275 current_entries: self.current_entries.load(Ordering::Relaxed), 276 max_concurrent_entries: self.max_concurrent_entries.load(Ordering::Relaxed), 277 dep_enqueued_total: self.dep_enqueued_total.load(Ordering::Relaxed), 278 dep_drained_observe_total: self.dep_drained_observe_total.load(Ordering::Relaxed), 279 } 280 } 281 282 fn snapshot_deps(&self, handle: &EntryHandle) -> Vec<RepoIdent> { 283 let slot = handle.lock().expect("warming buffer entry mutex poisoned"); 284 slot.as_ref().map(|s| s.deps.clone()).unwrap_or_default() 285 } 286 287 fn deactivate_handle(&self, handle: &EntryHandle) -> bool { 288 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned"); 289 if slot.take().is_some() { 290 self.current_entries.fetch_sub(1, Ordering::Relaxed); 291 true 292 } else { 293 false 294 } 295 } 296 297 async fn cleanup_by_key(&self, handle: &EntryHandle, deps: &[RepoIdent]) { 298 let _ = futures::future::join_all(deps.iter().map(|dep| async move { 299 let Some(mut occupied) = self.by_key.get_async(dep).await else { 300 return; 301 }; 302 occupied.get_mut().retain(|h| !Arc::ptr_eq(h, handle)); 303 if occupied.get().is_empty() { 304 let _ = occupied.remove_entry(); 305 } 306 })) 307 .await; 308 } 309} 310 311#[cfg(test)] 312mod tests { 313 use super::*; 314 use bobbin_types::ids::nsid_static; 315 316 fn d(s: &str) -> Did<DefaultStr> { 317 Did::new_owned(s).unwrap() 318 } 319 320 fn r(s: &str) -> Rkey<DefaultStr> { 321 Rkey::new_owned(s).unwrap() 322 } 323 324 fn at(s: &str) -> AtUri<DefaultStr> { 325 AtUri::new_owned(s).unwrap() 326 } 327 328 fn make_upsert(source: &str) -> ParkedUpsert { 329 let source_uri = at(source); 330 let star_json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":{"$type":"sh.tangled.feed.star#repo","did":"did:plc:abalone"}}"#; 331 let parsed = Record::from_json_bytes(&nsid_static("sh.tangled.feed.star"), star_json) 332 .expect("star fixture parses"); 333 ParkedUpsert { 334 cursor: HydrantCursor::new(1), 335 source: source_uri, 336 nsid: nsid_static("sh.tangled.feed.star"), 337 parsed, 338 bytes: Bytes::from_static(star_json), 339 cid: None, 340 edges: Vec::new(), 341 } 342 } 343 344 #[tokio::test] 345 async fn park_then_observe_drains_to_zero() { 346 let hasher = RuntimeHasher::default(); 347 let buf = WarmingBuffer::new(hasher); 348 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 349 let upsert = make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"); 350 buf.park(upsert, vec![dep.clone()]).await; 351 let drained = buf.take_observed(&dep.owner, &dep.rkey).await; 352 assert_eq!(drained.len(), 1); 353 let s = buf.snapshot(); 354 assert_eq!(s.enqueued_total, 1); 355 assert_eq!(s.drained_observe_total, 1); 356 assert_eq!(s.distinct_keys_seen, 1); 357 assert_eq!(s.current_entries, 0); 358 assert_eq!(s.max_concurrent_entries, 1); 359 assert_eq!(s.dep_enqueued_total, 1); 360 assert_eq!(s.dep_drained_observe_total, 1); 361 } 362 363 #[tokio::test] 364 async fn distinct_keys_tracked_separately() { 365 let hasher = RuntimeHasher::default(); 366 let buf = WarmingBuffer::new(hasher); 367 let dep_a = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 368 let dep_b = RepoIdent::new(d("did:plc:olaren"), r("abcabcabcabd1")); 369 buf.park( 370 make_upsert("at://did:plc:starer1/sh.tangled.feed.star/aaaaaaaaaaaaa"), 371 vec![dep_a.clone()], 372 ) 373 .await; 374 buf.park( 375 make_upsert("at://did:plc:starer2/sh.tangled.feed.star/bbbbbbbbbbbbb"), 376 vec![dep_b.clone()], 377 ) 378 .await; 379 let s = buf.snapshot(); 380 assert_eq!(s.enqueued_total, 2); 381 assert_eq!(s.distinct_keys_seen, 2); 382 assert_eq!(s.current_entries, 2); 383 assert_eq!(s.max_concurrent_entries, 2); 384 assert_eq!(s.dep_enqueued_total, 2); 385 } 386 387 #[tokio::test] 388 async fn observe_unrelated_key_is_noop() { 389 let hasher = RuntimeHasher::default(); 390 let buf = WarmingBuffer::new(hasher); 391 buf.park( 392 make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"), 393 vec![RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"))], 394 ) 395 .await; 396 let drained = buf 397 .take_observed(&d("did:plc:olaren"), &r("nopenopenopep")) 398 .await; 399 assert!(drained.is_empty()); 400 let s = buf.snapshot(); 401 assert_eq!(s.current_entries, 1); 402 assert_eq!(s.drained_observe_total, 0); 403 assert_eq!(s.dep_drained_observe_total, 0); 404 } 405 406 #[tokio::test] 407 async fn evict_on_replacement_clears_prior_entry() { 408 let hasher = RuntimeHasher::default(); 409 let buf = WarmingBuffer::new(hasher); 410 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 411 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"; 412 buf.park(make_upsert(source), vec![dep.clone()]).await; 413 let evicted = buf.evict_source(&at(source)).await; 414 assert!(evicted); 415 let drained = buf.take_observed(&dep.owner, &dep.rkey).await; 416 assert!( 417 drained.is_empty(), 418 "evicted entry must not surface on observe", 419 ); 420 let s = buf.snapshot(); 421 assert_eq!(s.current_entries, 0); 422 assert_eq!(s.evicted_total, 1); 423 } 424 425 #[tokio::test] 426 async fn evict_clears_by_key_so_observe_walks_no_handles() { 427 let hasher = RuntimeHasher::default(); 428 let buf = WarmingBuffer::new(hasher); 429 let dep_a = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 430 let dep_b = RepoIdent::new(d("did:plc:olaren"), r("abcabcabcabd1")); 431 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"; 432 buf.park(make_upsert(source), vec![dep_a.clone(), dep_b.clone()]) 433 .await; 434 assert!(buf.evict_source(&at(source)).await); 435 let drained_a = buf.take_observed(&dep_a.owner, &dep_a.rkey).await; 436 let drained_b = buf.take_observed(&dep_b.owner, &dep_b.rkey).await; 437 assert!(drained_a.is_empty()); 438 assert!(drained_b.is_empty()); 439 let s = buf.snapshot(); 440 assert_eq!(s.dep_drained_observe_total, 0); 441 } 442 443 #[tokio::test] 444 async fn replacement_park_under_same_source_increments_evicted_total() { 445 let hasher = RuntimeHasher::default(); 446 let buf = WarmingBuffer::new(hasher); 447 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 448 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"; 449 buf.park(make_upsert(source), vec![dep.clone()]).await; 450 buf.park(make_upsert(source), vec![dep.clone()]).await; 451 let s = buf.snapshot(); 452 assert_eq!(s.enqueued_total, 2); 453 assert_eq!(s.evicted_total, 1); 454 assert_eq!(s.current_entries, 1); 455 } 456 457 #[tokio::test] 458 async fn replacement_park_does_not_double_drain_on_observe() { 459 let hasher = RuntimeHasher::default(); 460 let buf = WarmingBuffer::new(hasher); 461 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 462 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"; 463 buf.park(make_upsert(source), vec![dep.clone()]).await; 464 buf.park(make_upsert(source), vec![dep.clone()]).await; 465 let drained = buf.take_observed(&dep.owner, &dep.rkey).await; 466 assert_eq!( 467 drained.len(), 468 1, 469 "evicted handle must not surface alongside the live one", 470 ); 471 let s = buf.snapshot(); 472 assert_eq!(s.drained_observe_total, 1); 473 assert_eq!(s.current_entries, 0); 474 } 475 476 #[tokio::test] 477 async fn drain_for_promote_returns_residual() { 478 let hasher = RuntimeHasher::default(); 479 let buf = WarmingBuffer::new(hasher); 480 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 481 buf.park( 482 make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"), 483 vec![dep], 484 ) 485 .await; 486 let residual = buf.drain_for_promote().await; 487 assert_eq!(residual.len(), 1); 488 let s = buf.snapshot(); 489 assert_eq!(s.drained_promote_total, 1); 490 assert_eq!(s.current_entries, 0); 491 } 492 493 #[tokio::test] 494 async fn try_park_after_drain_for_promote_is_rejected() { 495 let hasher = RuntimeHasher::default(); 496 let buf = WarmingBuffer::new(hasher); 497 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 498 let _ = buf.drain_for_promote().await; 499 let upsert = make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"); 500 let res = buf.try_park(upsert, vec![dep]).await; 501 assert!(res.is_err(), "post-seal try_park must reject"); 502 let s = buf.snapshot(); 503 assert_eq!(s.rejected_after_seal, 1); 504 assert_eq!(s.enqueued_total, 0); 505 assert_eq!(s.current_entries, 0); 506 } 507 508 #[tokio::test] 509 async fn double_observe_after_drain_does_not_double_count() { 510 let hasher = RuntimeHasher::default(); 511 let buf = WarmingBuffer::new(hasher); 512 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz")); 513 buf.park( 514 make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"), 515 vec![dep.clone()], 516 ) 517 .await; 518 let first = buf.take_observed(&dep.owner, &dep.rkey).await; 519 let second = buf.take_observed(&dep.owner, &dep.rkey).await; 520 assert_eq!(first.len(), 1); 521 assert!(second.is_empty()); 522 let s = buf.snapshot(); 523 assert_eq!(s.drained_observe_total, 1); 524 assert_eq!(s.dep_drained_observe_total, 1); 525 } 526}