Monorepo for Tangled
tangled.org
1use tokio::sync::watch;
2
3#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
4pub struct HydrantCursor(u64);
5
6impl HydrantCursor {
7 pub const fn new(id: u64) -> Self {
8 Self(id)
9 }
10
11 pub const fn raw(self) -> u64 {
12 self.0
13 }
14}
15
16impl From<u64> for HydrantCursor {
17 fn from(id: u64) -> Self {
18 Self(id)
19 }
20}
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
23pub enum Coverage {
24 Warming {
25 events_processed: u64,
26 last_cursor: HydrantCursor,
27 },
28 Ready {
29 events_processed: u64,
30 last_cursor: HydrantCursor,
31 },
32}
33
34impl Default for Coverage {
35 fn default() -> Self {
36 Self::Warming {
37 events_processed: 0,
38 last_cursor: HydrantCursor::default(),
39 }
40 }
41}
42
43#[derive(Clone, Copy, Debug, Eq, PartialEq)]
44pub struct PromotionSignal {
45 pub rev_micros: Option<u64>,
46 pub now_micros: u64,
47 pub skew_micros: u64,
48}
49
50impl PromotionSignal {
51 pub fn caught_up(self) -> bool {
52 let Some(rev) = self.rev_micros else {
53 return false;
54 };
55 self.now_micros.abs_diff(rev) <= self.skew_micros
56 }
57}
58
59impl Coverage {
60 pub const fn is_ready(self) -> bool {
61 matches!(self, Self::Ready { .. })
62 }
63
64 pub const fn events_processed(self) -> u64 {
65 match self {
66 Self::Warming {
67 events_processed, ..
68 }
69 | Self::Ready {
70 events_processed, ..
71 } => events_processed,
72 }
73 }
74
75 pub const fn last_cursor(self) -> HydrantCursor {
76 match self {
77 Self::Warming { last_cursor, .. } | Self::Ready { last_cursor, .. } => last_cursor,
78 }
79 }
80
81 pub fn advance(self, cursor: HydrantCursor) -> Self {
82 debug_assert!(
83 cursor.raw() >= self.last_cursor().raw(),
84 "hydrant cursor regressed: {} -> {}",
85 self.last_cursor().raw(),
86 cursor.raw(),
87 );
88 let processed = self.events_processed().saturating_add(1);
89 match self {
90 Self::Warming { .. } => Self::Warming {
91 events_processed: processed,
92 last_cursor: cursor,
93 },
94 Self::Ready { .. } => Self::Ready {
95 events_processed: processed,
96 last_cursor: cursor,
97 },
98 }
99 }
100
101 pub fn maybe_promote(self, signal: PromotionSignal) -> Self {
102 match self {
103 Self::Ready { .. } => self,
104 Self::Warming {
105 events_processed,
106 last_cursor,
107 } if signal.caught_up() => Self::Ready {
108 events_processed,
109 last_cursor,
110 },
111 warming => warming,
112 }
113 }
114
115 pub const fn force_ready(self) -> Self {
116 match self {
117 Self::Ready { .. } => self,
118 Self::Warming {
119 events_processed,
120 last_cursor,
121 } => Self::Ready {
122 events_processed,
123 last_cursor,
124 },
125 }
126 }
127}
128
129#[derive(Debug)]
130pub struct CoverageWatch {
131 tx: watch::Sender<Coverage>,
132}
133
134impl Default for CoverageWatch {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl CoverageWatch {
141 pub fn new() -> Self {
142 let (tx, _) = watch::channel(Coverage::default());
143 Self { tx }
144 }
145
146 pub fn snapshot(&self) -> Coverage {
147 *self.tx.borrow()
148 }
149
150 pub fn subscribe(&self) -> watch::Receiver<Coverage> {
151 self.tx.subscribe()
152 }
153
154 pub fn update<F>(&self, transform: F)
155 where
156 F: FnOnce(Coverage) -> Coverage,
157 {
158 self.tx.send_if_modified(|c| {
159 let next = transform(*c);
160 if *c == next {
161 false
162 } else {
163 *c = next;
164 true
165 }
166 });
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 const SKEW: u64 = 60_000_000;
175
176 fn signal(rev: u64, now: u64) -> PromotionSignal {
177 PromotionSignal {
178 rev_micros: Some(rev),
179 now_micros: now,
180 skew_micros: SKEW,
181 }
182 }
183
184 #[test]
185 fn defaults_to_warming_at_zero() {
186 let c = Coverage::default();
187 assert!(!c.is_ready());
188 assert_eq!(c.events_processed(), 0);
189 assert_eq!(c.last_cursor(), HydrantCursor::default());
190 }
191
192 #[test]
193 fn advance_increments_and_tracks_cursor() {
194 let c = Coverage::default()
195 .advance(HydrantCursor::new(7))
196 .advance(HydrantCursor::new(9));
197 assert_eq!(c.events_processed(), 2);
198 assert_eq!(c.last_cursor(), HydrantCursor::new(9));
199 assert!(!c.is_ready());
200 }
201
202 #[test]
203 fn promotion_requires_recent_rev() {
204 let now = 1_000_000_000;
205 let recent = now - SKEW / 2;
206 let stale = now - SKEW * 10;
207
208 let c = Coverage::default().advance(HydrantCursor::new(1));
209 assert!(!c.maybe_promote(signal(stale, now)).is_ready());
210 assert!(c.maybe_promote(signal(recent, now)).is_ready());
211 }
212
213 #[test]
214 fn promotion_preserves_counters() {
215 let now = 1_000_000_000;
216 let c = Coverage::default()
217 .advance(HydrantCursor::new(3))
218 .maybe_promote(signal(now, now))
219 .advance(HydrantCursor::new(4));
220 assert!(c.is_ready());
221 assert_eq!(c.events_processed(), 2);
222 assert_eq!(c.last_cursor(), HydrantCursor::new(4));
223 }
224
225 #[test]
226 fn ready_is_sticky() {
227 let now = 1_000_000_000;
228 let stale = now - SKEW * 10;
229 let c = Coverage::default()
230 .advance(HydrantCursor::new(1))
231 .maybe_promote(signal(now, now))
232 .advance(HydrantCursor::new(2))
233 .maybe_promote(signal(stale, now));
234 assert!(c.is_ready());
235 }
236
237 #[test]
238 fn future_rev_within_skew_promotes() {
239 let now = 1_000_000_000;
240 let near_future = now + SKEW / 2;
241 let c = Coverage::default()
242 .advance(HydrantCursor::new(1))
243 .maybe_promote(signal(near_future, now));
244 assert!(c.is_ready());
245 }
246
247 #[test]
248 fn future_rev_beyond_skew_does_not_promote() {
249 let now = 1_000_000_000;
250 let far_future = now + SKEW * 10;
251 let c = Coverage::default()
252 .advance(HydrantCursor::new(1))
253 .maybe_promote(signal(far_future, now));
254 assert!(!c.is_ready());
255 }
256
257 #[test]
258 fn missing_rev_does_not_promote() {
259 let c = Coverage::default().advance(HydrantCursor::new(1));
260 let s = PromotionSignal {
261 rev_micros: None,
262 now_micros: 1_000_000_000,
263 skew_micros: SKEW,
264 };
265 assert!(!c.maybe_promote(s).is_ready());
266 }
267}