This repository has no description
1use std::sync::Arc;
2use std::sync::Mutex;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4
5use bobbin_edge_index::HydrantCursor;
6use bobbin_runtime::RuntimeHasher;
7use bobbin_types::edges::{Edge, Record};
8use bobbin_types::ids::RepoIdent;
9use bytes::Bytes;
10use jacquard_common::DefaultStr;
11use jacquard_common::types::did::Did;
12use jacquard_common::types::nsid::Nsid;
13use jacquard_common::types::recordkey::Rkey;
14use jacquard_common::types::string::{AtUri, Cid};
15use scc::HashMap as SccMap;
16use scc::hash_map::Entry;
17
18pub struct ParkedUpsert {
19 pub cursor: HydrantCursor,
20 pub source: AtUri<DefaultStr>,
21 pub nsid: Nsid<DefaultStr>,
22 pub parsed: Record,
23 pub bytes: Bytes,
24 pub cid: Option<Cid<DefaultStr>>,
25 pub edges: Vec<Edge>,
26}
27
28struct EntryState {
29 upsert: ParkedUpsert,
30 deps: Vec<RepoIdent>,
31}
32
33type EntryHandle = Arc<Mutex<Option<EntryState>>>;
34
35pub struct WarmingBuffer {
36 by_source: SccMap<AtUri<DefaultStr>, EntryHandle, RuntimeHasher>,
37 by_key: SccMap<RepoIdent, Vec<EntryHandle>, RuntimeHasher>,
38 hasher: RuntimeHasher,
39 sealed: AtomicBool,
40 active_parks: AtomicU64,
41 enqueued_total: AtomicU64,
42 drained_observe_total: AtomicU64,
43 drained_promote_total: AtomicU64,
44 evicted_total: AtomicU64,
45 rejected_after_seal: AtomicU64,
46 distinct_keys_seen: AtomicU64,
47 current_entries: AtomicU64,
48 max_concurrent_entries: AtomicU64,
49 dep_enqueued_total: AtomicU64,
50 dep_drained_observe_total: AtomicU64,
51}
52
53#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
54pub struct WarmingBufferSnapshot {
55 pub enqueued_total: u64,
56 pub drained_observe_total: u64,
57 pub drained_promote_total: u64,
58 pub evicted_total: u64,
59 pub rejected_after_seal: u64,
60 pub distinct_keys_seen: u64,
61 pub current_entries: u64,
62 pub max_concurrent_entries: u64,
63 pub dep_enqueued_total: u64,
64 pub dep_drained_observe_total: u64,
65}
66
67impl WarmingBuffer {
68 pub fn new(hasher: RuntimeHasher) -> Self {
69 Self {
70 by_source: SccMap::with_hasher(hasher.clone()),
71 by_key: SccMap::with_hasher(hasher.clone()),
72 hasher,
73 sealed: AtomicBool::new(false),
74 active_parks: AtomicU64::new(0),
75 enqueued_total: AtomicU64::new(0),
76 drained_observe_total: AtomicU64::new(0),
77 drained_promote_total: AtomicU64::new(0),
78 evicted_total: AtomicU64::new(0),
79 rejected_after_seal: AtomicU64::new(0),
80 distinct_keys_seen: AtomicU64::new(0),
81 current_entries: AtomicU64::new(0),
82 max_concurrent_entries: AtomicU64::new(0),
83 dep_enqueued_total: AtomicU64::new(0),
84 dep_drained_observe_total: AtomicU64::new(0),
85 }
86 }
87
88 pub fn hasher(&self) -> &RuntimeHasher {
89 &self.hasher
90 }
91
92 pub fn is_sealed(&self) -> bool {
93 self.sealed.load(Ordering::Acquire)
94 }
95
96 pub async fn try_park(
97 &self,
98 upsert: ParkedUpsert,
99 deps: Vec<RepoIdent>,
100 ) -> Result<(), ParkedUpsert> {
101 self.active_parks.fetch_add(1, Ordering::AcqRel);
102 let outcome = if self.is_sealed() {
103 self.rejected_after_seal.fetch_add(1, Ordering::Relaxed);
104 Err(upsert)
105 } else {
106 self.park(upsert, deps).await;
107 Ok(())
108 };
109 self.active_parks.fetch_sub(1, Ordering::Release);
110 outcome
111 }
112
113 async fn park(&self, upsert: ParkedUpsert, deps: Vec<RepoIdent>) {
114 debug_assert!(
115 !deps.is_empty(),
116 "park requires at least one unresolved dep"
117 );
118 let dep_count = deps.len() as u64;
119 let source = upsert.source.clone();
120 let handle: EntryHandle = Arc::new(Mutex::new(Some(EntryState {
121 upsert,
122 deps: deps.clone(),
123 })));
124
125 let prior = match self.by_source.entry_async(source).await {
126 Entry::Occupied(mut occ) => Some(occ.insert(handle.clone())),
127 Entry::Vacant(vac) => {
128 vac.insert_entry(handle.clone());
129 None
130 }
131 };
132 if let Some(prior) = prior {
133 let prior_deps = self.snapshot_deps(&prior);
134 if self.deactivate_handle(&prior) {
135 self.evicted_total.fetch_add(1, Ordering::Relaxed);
136 self.cleanup_by_key(&prior, &prior_deps).await;
137 }
138 }
139
140 let _ = futures::future::join_all(deps.into_iter().map(|dep| {
141 let h = handle.clone();
142 async move { self.register_dep(dep, h).await }
143 }))
144 .await;
145
146 self.enqueued_total.fetch_add(1, Ordering::Relaxed);
147 self.dep_enqueued_total
148 .fetch_add(dep_count, Ordering::Relaxed);
149 let cur = self.current_entries.fetch_add(1, Ordering::Relaxed) + 1;
150 self.max_concurrent_entries
151 .fetch_max(cur, Ordering::Relaxed);
152 }
153
154 async fn register_dep(&self, dep: RepoIdent, handle: EntryHandle) {
155 let was_new = match self.by_key.entry_async(dep).await {
156 Entry::Occupied(mut occ) => {
157 occ.get_mut().push(handle);
158 false
159 }
160 Entry::Vacant(vac) => {
161 vac.insert_entry(vec![handle]);
162 true
163 }
164 };
165 if was_new {
166 self.distinct_keys_seen.fetch_add(1, Ordering::Relaxed);
167 }
168 }
169
170 pub async fn evict_source(&self, source: &AtUri<DefaultStr>) -> bool {
171 let Some((_, handle)) = self.by_source.remove_async(source).await else {
172 return false;
173 };
174 let deps = self.snapshot_deps(&handle);
175 if !self.deactivate_handle(&handle) {
176 return false;
177 }
178 self.evicted_total.fetch_add(1, Ordering::Relaxed);
179 self.cleanup_by_key(&handle, &deps).await;
180 true
181 }
182
183 pub async fn take_observed(
184 &self,
185 owner: &Did<DefaultStr>,
186 rkey: &Rkey<DefaultStr>,
187 ) -> Vec<ParkedUpsert> {
188 let park_key = RepoIdent::new(owner.clone(), rkey.clone());
189 let Some((_, handles)) = self.by_key.remove_async(&park_key).await else {
190 return Vec::new();
191 };
192 let mut drained: Vec<ParkedUpsert> = Vec::new();
193 let mut sources_to_remove: Vec<AtUri<DefaultStr>> = Vec::new();
194 let mut dep_matches: u64 = 0;
195 handles.into_iter().for_each(|handle| {
196 let outcome = {
197 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned");
198 match slot.as_mut() {
199 Some(state) => {
200 let before = state.deps.len();
201 state.deps.retain(|d| d != &park_key);
202 let after = state.deps.len();
203 Some((before > after, after == 0))
204 }
205 None => None,
206 }
207 };
208 let Some((matched, drain_now)) = outcome else {
209 return;
210 };
211 if matched {
212 dep_matches += 1;
213 }
214 if drain_now {
215 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned");
216 if let Some(state) = slot.take() {
217 sources_to_remove.push(state.upsert.source.clone());
218 drained.push(state.upsert);
219 }
220 }
221 });
222 if dep_matches > 0 {
223 self.dep_drained_observe_total
224 .fetch_add(dep_matches, Ordering::Relaxed);
225 }
226 let drained_count = drained.len() as u64;
227 if drained_count > 0 {
228 self.drained_observe_total
229 .fetch_add(drained_count, Ordering::Relaxed);
230 self.current_entries
231 .fetch_sub(drained_count, Ordering::Relaxed);
232 let _ = futures::future::join_all(
233 sources_to_remove
234 .into_iter()
235 .map(|source| async move { self.by_source.remove_async(&source).await }),
236 )
237 .await;
238 }
239 drained
240 }
241
242 pub async fn drain_for_promote(&self) -> Vec<(ParkedUpsert, Vec<RepoIdent>)> {
243 self.sealed.store(true, Ordering::Release);
244 while self.active_parks.load(Ordering::Acquire) > 0 {
245 tokio::task::yield_now().await;
246 }
247 let mut drained: Vec<(ParkedUpsert, Vec<RepoIdent>)> = Vec::new();
248 self.by_source
249 .retain_async(|_source, handle| {
250 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned");
251 if let Some(state) = slot.take() {
252 drained.push((state.upsert, state.deps));
253 }
254 false
255 })
256 .await;
257 let count = drained.len() as u64;
258 if count > 0 {
259 self.drained_promote_total
260 .fetch_add(count, Ordering::Relaxed);
261 self.current_entries.fetch_sub(count, Ordering::Relaxed);
262 }
263 self.by_key.clear_async().await;
264 drained
265 }
266
267 pub fn snapshot(&self) -> WarmingBufferSnapshot {
268 WarmingBufferSnapshot {
269 enqueued_total: self.enqueued_total.load(Ordering::Relaxed),
270 drained_observe_total: self.drained_observe_total.load(Ordering::Relaxed),
271 drained_promote_total: self.drained_promote_total.load(Ordering::Relaxed),
272 evicted_total: self.evicted_total.load(Ordering::Relaxed),
273 rejected_after_seal: self.rejected_after_seal.load(Ordering::Relaxed),
274 distinct_keys_seen: self.distinct_keys_seen.load(Ordering::Relaxed),
275 current_entries: self.current_entries.load(Ordering::Relaxed),
276 max_concurrent_entries: self.max_concurrent_entries.load(Ordering::Relaxed),
277 dep_enqueued_total: self.dep_enqueued_total.load(Ordering::Relaxed),
278 dep_drained_observe_total: self.dep_drained_observe_total.load(Ordering::Relaxed),
279 }
280 }
281
282 fn snapshot_deps(&self, handle: &EntryHandle) -> Vec<RepoIdent> {
283 let slot = handle.lock().expect("warming buffer entry mutex poisoned");
284 slot.as_ref().map(|s| s.deps.clone()).unwrap_or_default()
285 }
286
287 fn deactivate_handle(&self, handle: &EntryHandle) -> bool {
288 let mut slot = handle.lock().expect("warming buffer entry mutex poisoned");
289 if slot.take().is_some() {
290 self.current_entries.fetch_sub(1, Ordering::Relaxed);
291 true
292 } else {
293 false
294 }
295 }
296
297 async fn cleanup_by_key(&self, handle: &EntryHandle, deps: &[RepoIdent]) {
298 let _ = futures::future::join_all(deps.iter().map(|dep| async move {
299 let Some(mut occupied) = self.by_key.get_async(dep).await else {
300 return;
301 };
302 occupied.get_mut().retain(|h| !Arc::ptr_eq(h, handle));
303 if occupied.get().is_empty() {
304 let _ = occupied.remove_entry();
305 }
306 }))
307 .await;
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use bobbin_types::ids::nsid_static;
315
316 fn d(s: &str) -> Did<DefaultStr> {
317 Did::new_owned(s).unwrap()
318 }
319
320 fn r(s: &str) -> Rkey<DefaultStr> {
321 Rkey::new_owned(s).unwrap()
322 }
323
324 fn at(s: &str) -> AtUri<DefaultStr> {
325 AtUri::new_owned(s).unwrap()
326 }
327
328 fn make_upsert(source: &str) -> ParkedUpsert {
329 let source_uri = at(source);
330 let star_json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":{"$type":"sh.tangled.feed.star#repo","did":"did:plc:abalone"}}"#;
331 let parsed = Record::from_json_bytes(&nsid_static("sh.tangled.feed.star"), star_json)
332 .expect("star fixture parses");
333 ParkedUpsert {
334 cursor: HydrantCursor::new(1),
335 source: source_uri,
336 nsid: nsid_static("sh.tangled.feed.star"),
337 parsed,
338 bytes: Bytes::from_static(star_json),
339 cid: None,
340 edges: Vec::new(),
341 }
342 }
343
344 #[tokio::test]
345 async fn park_then_observe_drains_to_zero() {
346 let hasher = RuntimeHasher::default();
347 let buf = WarmingBuffer::new(hasher);
348 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
349 let upsert = make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz");
350 buf.park(upsert, vec![dep.clone()]).await;
351 let drained = buf.take_observed(&dep.owner, &dep.rkey).await;
352 assert_eq!(drained.len(), 1);
353 let s = buf.snapshot();
354 assert_eq!(s.enqueued_total, 1);
355 assert_eq!(s.drained_observe_total, 1);
356 assert_eq!(s.distinct_keys_seen, 1);
357 assert_eq!(s.current_entries, 0);
358 assert_eq!(s.max_concurrent_entries, 1);
359 assert_eq!(s.dep_enqueued_total, 1);
360 assert_eq!(s.dep_drained_observe_total, 1);
361 }
362
363 #[tokio::test]
364 async fn distinct_keys_tracked_separately() {
365 let hasher = RuntimeHasher::default();
366 let buf = WarmingBuffer::new(hasher);
367 let dep_a = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
368 let dep_b = RepoIdent::new(d("did:plc:olaren"), r("abcabcabcabd1"));
369 buf.park(
370 make_upsert("at://did:plc:starer1/sh.tangled.feed.star/aaaaaaaaaaaaa"),
371 vec![dep_a.clone()],
372 )
373 .await;
374 buf.park(
375 make_upsert("at://did:plc:starer2/sh.tangled.feed.star/bbbbbbbbbbbbb"),
376 vec![dep_b.clone()],
377 )
378 .await;
379 let s = buf.snapshot();
380 assert_eq!(s.enqueued_total, 2);
381 assert_eq!(s.distinct_keys_seen, 2);
382 assert_eq!(s.current_entries, 2);
383 assert_eq!(s.max_concurrent_entries, 2);
384 assert_eq!(s.dep_enqueued_total, 2);
385 }
386
387 #[tokio::test]
388 async fn observe_unrelated_key_is_noop() {
389 let hasher = RuntimeHasher::default();
390 let buf = WarmingBuffer::new(hasher);
391 buf.park(
392 make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"),
393 vec![RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"))],
394 )
395 .await;
396 let drained = buf
397 .take_observed(&d("did:plc:olaren"), &r("nopenopenopep"))
398 .await;
399 assert!(drained.is_empty());
400 let s = buf.snapshot();
401 assert_eq!(s.current_entries, 1);
402 assert_eq!(s.drained_observe_total, 0);
403 assert_eq!(s.dep_drained_observe_total, 0);
404 }
405
406 #[tokio::test]
407 async fn evict_on_replacement_clears_prior_entry() {
408 let hasher = RuntimeHasher::default();
409 let buf = WarmingBuffer::new(hasher);
410 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
411 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz";
412 buf.park(make_upsert(source), vec![dep.clone()]).await;
413 let evicted = buf.evict_source(&at(source)).await;
414 assert!(evicted);
415 let drained = buf.take_observed(&dep.owner, &dep.rkey).await;
416 assert!(
417 drained.is_empty(),
418 "evicted entry must not surface on observe",
419 );
420 let s = buf.snapshot();
421 assert_eq!(s.current_entries, 0);
422 assert_eq!(s.evicted_total, 1);
423 }
424
425 #[tokio::test]
426 async fn evict_clears_by_key_so_observe_walks_no_handles() {
427 let hasher = RuntimeHasher::default();
428 let buf = WarmingBuffer::new(hasher);
429 let dep_a = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
430 let dep_b = RepoIdent::new(d("did:plc:olaren"), r("abcabcabcabd1"));
431 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz";
432 buf.park(make_upsert(source), vec![dep_a.clone(), dep_b.clone()])
433 .await;
434 assert!(buf.evict_source(&at(source)).await);
435 let drained_a = buf.take_observed(&dep_a.owner, &dep_a.rkey).await;
436 let drained_b = buf.take_observed(&dep_b.owner, &dep_b.rkey).await;
437 assert!(drained_a.is_empty());
438 assert!(drained_b.is_empty());
439 let s = buf.snapshot();
440 assert_eq!(s.dep_drained_observe_total, 0);
441 }
442
443 #[tokio::test]
444 async fn replacement_park_under_same_source_increments_evicted_total() {
445 let hasher = RuntimeHasher::default();
446 let buf = WarmingBuffer::new(hasher);
447 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
448 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz";
449 buf.park(make_upsert(source), vec![dep.clone()]).await;
450 buf.park(make_upsert(source), vec![dep.clone()]).await;
451 let s = buf.snapshot();
452 assert_eq!(s.enqueued_total, 2);
453 assert_eq!(s.evicted_total, 1);
454 assert_eq!(s.current_entries, 1);
455 }
456
457 #[tokio::test]
458 async fn replacement_park_does_not_double_drain_on_observe() {
459 let hasher = RuntimeHasher::default();
460 let buf = WarmingBuffer::new(hasher);
461 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
462 let source = "at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz";
463 buf.park(make_upsert(source), vec![dep.clone()]).await;
464 buf.park(make_upsert(source), vec![dep.clone()]).await;
465 let drained = buf.take_observed(&dep.owner, &dep.rkey).await;
466 assert_eq!(
467 drained.len(),
468 1,
469 "evicted handle must not surface alongside the live one",
470 );
471 let s = buf.snapshot();
472 assert_eq!(s.drained_observe_total, 1);
473 assert_eq!(s.current_entries, 0);
474 }
475
476 #[tokio::test]
477 async fn drain_for_promote_returns_residual() {
478 let hasher = RuntimeHasher::default();
479 let buf = WarmingBuffer::new(hasher);
480 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
481 buf.park(
482 make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"),
483 vec![dep],
484 )
485 .await;
486 let residual = buf.drain_for_promote().await;
487 assert_eq!(residual.len(), 1);
488 let s = buf.snapshot();
489 assert_eq!(s.drained_promote_total, 1);
490 assert_eq!(s.current_entries, 0);
491 }
492
493 #[tokio::test]
494 async fn try_park_after_drain_for_promote_is_rejected() {
495 let hasher = RuntimeHasher::default();
496 let buf = WarmingBuffer::new(hasher);
497 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
498 let _ = buf.drain_for_promote().await;
499 let upsert = make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz");
500 let res = buf.try_park(upsert, vec![dep]).await;
501 assert!(res.is_err(), "post-seal try_park must reject");
502 let s = buf.snapshot();
503 assert_eq!(s.rejected_after_seal, 1);
504 assert_eq!(s.enqueued_total, 0);
505 assert_eq!(s.current_entries, 0);
506 }
507
508 #[tokio::test]
509 async fn double_observe_after_drain_does_not_double_count() {
510 let hasher = RuntimeHasher::default();
511 let buf = WarmingBuffer::new(hasher);
512 let dep = RepoIdent::new(d("did:plc:nel"), r("abcabcabcabcz"));
513 buf.park(
514 make_upsert("at://did:plc:starer/sh.tangled.feed.star/zzzzzzzzzzzzz"),
515 vec![dep.clone()],
516 )
517 .await;
518 let first = buf.take_observed(&dep.owner, &dep.rkey).await;
519 let second = buf.take_observed(&dep.owner, &dep.rkey).await;
520 assert_eq!(first.len(), 1);
521 assert!(second.is_empty());
522 let s = buf.snapshot();
523 assert_eq!(s.drained_observe_total, 1);
524 assert_eq!(s.dep_drained_observe_total, 1);
525 }
526}