Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use tokio::sync::watch; 2 3#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] 4pub struct HydrantCursor(u64); 5 6impl HydrantCursor { 7 pub const fn new(id: u64) -> Self { 8 Self(id) 9 } 10 11 pub const fn raw(self) -> u64 { 12 self.0 13 } 14} 15 16impl From<u64> for HydrantCursor { 17 fn from(id: u64) -> Self { 18 Self(id) 19 } 20} 21 22#[derive(Clone, Copy, Debug, Eq, PartialEq)] 23pub enum Coverage { 24 Warming { 25 events_processed: u64, 26 last_cursor: HydrantCursor, 27 }, 28 Ready { 29 events_processed: u64, 30 last_cursor: HydrantCursor, 31 }, 32} 33 34impl Default for Coverage { 35 fn default() -> Self { 36 Self::Warming { 37 events_processed: 0, 38 last_cursor: HydrantCursor::default(), 39 } 40 } 41} 42 43#[derive(Clone, Copy, Debug, Eq, PartialEq)] 44pub struct PromotionSignal { 45 pub rev_micros: Option<u64>, 46 pub now_micros: u64, 47 pub skew_micros: u64, 48} 49 50impl PromotionSignal { 51 pub fn caught_up(self) -> bool { 52 let Some(rev) = self.rev_micros else { 53 return false; 54 }; 55 self.now_micros.abs_diff(rev) <= self.skew_micros 56 } 57} 58 59impl Coverage { 60 pub const fn is_ready(self) -> bool { 61 matches!(self, Self::Ready { .. }) 62 } 63 64 pub const fn events_processed(self) -> u64 { 65 match self { 66 Self::Warming { 67 events_processed, .. 68 } 69 | Self::Ready { 70 events_processed, .. 71 } => events_processed, 72 } 73 } 74 75 pub const fn last_cursor(self) -> HydrantCursor { 76 match self { 77 Self::Warming { last_cursor, .. } | Self::Ready { last_cursor, .. } => last_cursor, 78 } 79 } 80 81 pub fn advance(self, cursor: HydrantCursor) -> Self { 82 debug_assert!( 83 cursor.raw() >= self.last_cursor().raw(), 84 "hydrant cursor regressed: {} -> {}", 85 self.last_cursor().raw(), 86 cursor.raw(), 87 ); 88 let processed = self.events_processed().saturating_add(1); 89 match self { 90 Self::Warming { .. } => Self::Warming { 91 events_processed: processed, 92 last_cursor: cursor, 93 }, 94 Self::Ready { .. } => Self::Ready { 95 events_processed: processed, 96 last_cursor: cursor, 97 }, 98 } 99 } 100 101 pub fn maybe_promote(self, signal: PromotionSignal) -> Self { 102 match self { 103 Self::Ready { .. } => self, 104 Self::Warming { 105 events_processed, 106 last_cursor, 107 } if signal.caught_up() => Self::Ready { 108 events_processed, 109 last_cursor, 110 }, 111 warming => warming, 112 } 113 } 114 115 pub const fn force_ready(self) -> Self { 116 match self { 117 Self::Ready { .. } => self, 118 Self::Warming { 119 events_processed, 120 last_cursor, 121 } => Self::Ready { 122 events_processed, 123 last_cursor, 124 }, 125 } 126 } 127} 128 129#[derive(Debug)] 130pub struct CoverageWatch { 131 tx: watch::Sender<Coverage>, 132} 133 134impl Default for CoverageWatch { 135 fn default() -> Self { 136 Self::new() 137 } 138} 139 140impl CoverageWatch { 141 pub fn new() -> Self { 142 let (tx, _) = watch::channel(Coverage::default()); 143 Self { tx } 144 } 145 146 pub fn snapshot(&self) -> Coverage { 147 *self.tx.borrow() 148 } 149 150 pub fn subscribe(&self) -> watch::Receiver<Coverage> { 151 self.tx.subscribe() 152 } 153 154 pub fn update<F>(&self, transform: F) 155 where 156 F: FnOnce(Coverage) -> Coverage, 157 { 158 self.tx.send_if_modified(|c| { 159 let next = transform(*c); 160 if *c == next { 161 false 162 } else { 163 *c = next; 164 true 165 } 166 }); 167 } 168} 169 170#[cfg(test)] 171mod tests { 172 use super::*; 173 174 const SKEW: u64 = 60_000_000; 175 176 fn signal(rev: u64, now: u64) -> PromotionSignal { 177 PromotionSignal { 178 rev_micros: Some(rev), 179 now_micros: now, 180 skew_micros: SKEW, 181 } 182 } 183 184 #[test] 185 fn defaults_to_warming_at_zero() { 186 let c = Coverage::default(); 187 assert!(!c.is_ready()); 188 assert_eq!(c.events_processed(), 0); 189 assert_eq!(c.last_cursor(), HydrantCursor::default()); 190 } 191 192 #[test] 193 fn advance_increments_and_tracks_cursor() { 194 let c = Coverage::default() 195 .advance(HydrantCursor::new(7)) 196 .advance(HydrantCursor::new(9)); 197 assert_eq!(c.events_processed(), 2); 198 assert_eq!(c.last_cursor(), HydrantCursor::new(9)); 199 assert!(!c.is_ready()); 200 } 201 202 #[test] 203 fn promotion_requires_recent_rev() { 204 let now = 1_000_000_000; 205 let recent = now - SKEW / 2; 206 let stale = now - SKEW * 10; 207 208 let c = Coverage::default().advance(HydrantCursor::new(1)); 209 assert!(!c.maybe_promote(signal(stale, now)).is_ready()); 210 assert!(c.maybe_promote(signal(recent, now)).is_ready()); 211 } 212 213 #[test] 214 fn promotion_preserves_counters() { 215 let now = 1_000_000_000; 216 let c = Coverage::default() 217 .advance(HydrantCursor::new(3)) 218 .maybe_promote(signal(now, now)) 219 .advance(HydrantCursor::new(4)); 220 assert!(c.is_ready()); 221 assert_eq!(c.events_processed(), 2); 222 assert_eq!(c.last_cursor(), HydrantCursor::new(4)); 223 } 224 225 #[test] 226 fn ready_is_sticky() { 227 let now = 1_000_000_000; 228 let stale = now - SKEW * 10; 229 let c = Coverage::default() 230 .advance(HydrantCursor::new(1)) 231 .maybe_promote(signal(now, now)) 232 .advance(HydrantCursor::new(2)) 233 .maybe_promote(signal(stale, now)); 234 assert!(c.is_ready()); 235 } 236 237 #[test] 238 fn future_rev_within_skew_promotes() { 239 let now = 1_000_000_000; 240 let near_future = now + SKEW / 2; 241 let c = Coverage::default() 242 .advance(HydrantCursor::new(1)) 243 .maybe_promote(signal(near_future, now)); 244 assert!(c.is_ready()); 245 } 246 247 #[test] 248 fn future_rev_beyond_skew_does_not_promote() { 249 let now = 1_000_000_000; 250 let far_future = now + SKEW * 10; 251 let c = Coverage::default() 252 .advance(HydrantCursor::new(1)) 253 .maybe_promote(signal(far_future, now)); 254 assert!(!c.is_ready()); 255 } 256 257 #[test] 258 fn missing_rev_does_not_promote() { 259 let c = Coverage::default().advance(HydrantCursor::new(1)); 260 let s = PromotionSignal { 261 rev_micros: None, 262 now_micros: 1_000_000_000, 263 skew_micros: SKEW, 264 }; 265 assert!(!c.maybe_promote(s).is_ready()); 266 } 267}