Monorepo for Tangled
tangled.org
1use std::sync::atomic::{AtomicU64, Ordering};
2
3use bobbin_runtime::RuntimeHasher;
4use bobbin_types::ids::RepoIdent;
5use jacquard_common::DefaultStr;
6use jacquard_common::types::did::Did;
7use jacquard_common::types::recordkey::Rkey;
8use scc::HashMap as SccMap;
9
10pub struct WarmingShadowBuffer {
11 pending: SccMap<RepoIdent, u64, RuntimeHasher>,
12 enqueued_total: AtomicU64,
13 drained_via_observe_total: AtomicU64,
14 max_concurrent: AtomicU64,
15 current_concurrent: AtomicU64,
16 distinct_keys_seen: AtomicU64,
17}
18
19#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
20pub struct WarmingShadowSnapshot {
21 pub enqueued_total: u64,
22 pub drained_via_observe_total: u64,
23 pub max_concurrent: u64,
24 pub residual: u64,
25 pub distinct_keys_seen: u64,
26}
27
28impl WarmingShadowBuffer {
29 pub fn new(hasher: RuntimeHasher) -> Self {
30 Self {
31 pending: SccMap::with_hasher(hasher),
32 enqueued_total: AtomicU64::new(0),
33 drained_via_observe_total: AtomicU64::new(0),
34 max_concurrent: AtomicU64::new(0),
35 current_concurrent: AtomicU64::new(0),
36 distinct_keys_seen: AtomicU64::new(0),
37 }
38 }
39
40 pub async fn note_unresolved(&self, owner: Did<DefaultStr>, rkey: Rkey<DefaultStr>) {
41 let key = RepoIdent::new(owner, rkey);
42 let mut entry = self.pending.entry_async(key).await.or_insert_with(|| {
43 self.distinct_keys_seen.fetch_add(1, Ordering::Relaxed);
44 0
45 });
46 *entry.get_mut() += 1;
47 self.enqueued_total.fetch_add(1, Ordering::Relaxed);
48 let cur = self.current_concurrent.fetch_add(1, Ordering::Relaxed) + 1;
49 self.max_concurrent.fetch_max(cur, Ordering::Relaxed);
50 }
51
52 pub async fn note_observed(&self, owner: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) {
53 let key = RepoIdent::new(owner.clone(), rkey.clone());
54 let Some(mut entry) = self.pending.get_async(&key).await else {
55 return;
56 };
57 let count = std::mem::replace(entry.get_mut(), 0);
58 if count > 0 {
59 self.drained_via_observe_total
60 .fetch_add(count, Ordering::Relaxed);
61 self.current_concurrent.fetch_sub(count, Ordering::Relaxed);
62 }
63 }
64
65 pub fn snapshot(&self) -> WarmingShadowSnapshot {
66 WarmingShadowSnapshot {
67 enqueued_total: self.enqueued_total.load(Ordering::Relaxed),
68 drained_via_observe_total: self.drained_via_observe_total.load(Ordering::Relaxed),
69 max_concurrent: self.max_concurrent.load(Ordering::Relaxed),
70 residual: self.current_concurrent.load(Ordering::Relaxed),
71 distinct_keys_seen: self.distinct_keys_seen.load(Ordering::Relaxed),
72 }
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use jacquard_common::types::did::Did;
80 use jacquard_common::types::recordkey::Rkey;
81
82 fn d(s: &str) -> Did<DefaultStr> {
83 Did::new_owned(s).unwrap()
84 }
85
86 fn r(s: &str) -> Rkey<DefaultStr> {
87 Rkey::new_owned(s).unwrap()
88 }
89
90 #[tokio::test]
91 async fn enqueue_then_observe_drains_to_zero() {
92 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default());
93 shadow
94 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz"))
95 .await;
96 shadow
97 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz"))
98 .await;
99 shadow
100 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz"))
101 .await;
102 let s = shadow.snapshot();
103 assert_eq!(s.enqueued_total, 2);
104 assert_eq!(s.drained_via_observe_total, 2);
105 assert_eq!(s.max_concurrent, 2);
106 assert_eq!(s.residual, 0);
107 assert_eq!(s.distinct_keys_seen, 1);
108 }
109
110 #[tokio::test]
111 async fn distinct_keys_tracked_separately() {
112 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default());
113 shadow
114 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz"))
115 .await;
116 shadow
117 .note_unresolved(d("did:plc:olaren"), r("abcabcabcabd1"))
118 .await;
119 let s = shadow.snapshot();
120 assert_eq!(s.enqueued_total, 2);
121 assert_eq!(s.distinct_keys_seen, 2);
122 assert_eq!(s.max_concurrent, 2);
123 assert_eq!(s.residual, 2);
124 }
125
126 #[tokio::test]
127 async fn observe_without_prior_enqueue_is_a_noop() {
128 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default());
129 shadow
130 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz"))
131 .await;
132 let s = shadow.snapshot();
133 assert_eq!(s.enqueued_total, 0);
134 assert_eq!(s.drained_via_observe_total, 0);
135 assert_eq!(s.distinct_keys_seen, 0);
136 }
137
138 #[tokio::test]
139 async fn second_observe_after_drain_does_not_double_count() {
140 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default());
141 shadow
142 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz"))
143 .await;
144 shadow
145 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz"))
146 .await;
147 shadow
148 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz"))
149 .await;
150 let s = shadow.snapshot();
151 assert_eq!(s.drained_via_observe_total, 1);
152 assert_eq!(s.residual, 0);
153 }
154
155 #[tokio::test]
156 async fn residual_reflects_unobserved_keys() {
157 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default());
158 shadow
159 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz"))
160 .await;
161 shadow
162 .note_unresolved(d("did:plc:olaren"), r("abcabcabcabd1"))
163 .await;
164 shadow
165 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz"))
166 .await;
167 let s = shadow.snapshot();
168 assert_eq!(s.enqueued_total, 2);
169 assert_eq!(s.drained_via_observe_total, 1);
170 assert_eq!(s.residual, 1);
171 }
172}