Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use std::sync::{Arc, Mutex}; 2use std::time::Duration; 3 4use bobbin_runtime::Clock; 5use thiserror::Error; 6use tokio::time::Instant; 7 8#[derive(Clone, Copy, Debug, Eq, PartialEq)] 9pub struct FailureThreshold(u32); 10 11#[derive(Clone, Copy, Debug, Error)] 12#[error("failure threshold must be at least 1")] 13pub struct ThresholdError; 14 15impl FailureThreshold { 16 pub const fn new(n: u32) -> Result<Self, ThresholdError> { 17 match n { 18 0 => Err(ThresholdError), 19 other => Ok(Self(other)), 20 } 21 } 22 23 pub const fn get(self) -> u32 { 24 self.0 25 } 26} 27 28#[derive(Clone, Copy, Debug)] 29enum BreakerState { 30 Closed { failures: u32 }, 31 Open { until: Instant }, 32 HalfOpen, 33} 34 35#[derive(Clone, Copy, Debug, Error)] 36#[error("circuit breaker open")] 37pub struct CircuitOpen; 38 39pub struct Breaker { 40 state: Mutex<BreakerState>, 41 threshold: FailureThreshold, 42 cooldown: Duration, 43 clock: Arc<dyn Clock>, 44} 45 46impl std::fmt::Debug for Breaker { 47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 48 f.debug_struct("Breaker") 49 .field("state", &self.state) 50 .field("threshold", &self.threshold) 51 .field("cooldown", &self.cooldown) 52 .finish_non_exhaustive() 53 } 54} 55 56impl Breaker { 57 pub fn new(threshold: FailureThreshold, cooldown: Duration, clock: Arc<dyn Clock>) -> Self { 58 Self { 59 state: Mutex::new(BreakerState::Closed { failures: 0 }), 60 threshold, 61 cooldown, 62 clock, 63 } 64 } 65 66 pub fn try_acquire(self: &Arc<Self>) -> Result<BreakerPermit, CircuitOpen> { 67 self.try_acquire_at(self.clock.now_instant()) 68 } 69 70 fn try_acquire_at(self: &Arc<Self>, now: Instant) -> Result<BreakerPermit, CircuitOpen> { 71 let mut state = self.state.lock().expect("breaker mutex poisoned"); 72 match *state { 73 BreakerState::Closed { .. } => Ok(BreakerPermit::new(Arc::clone(self))), 74 BreakerState::Open { until } if now >= until => { 75 *state = BreakerState::HalfOpen; 76 Ok(BreakerPermit::new(Arc::clone(self))) 77 } 78 BreakerState::Open { .. } | BreakerState::HalfOpen => Err(CircuitOpen), 79 } 80 } 81 82 pub fn record_success(&self) { 83 let mut state = self.state.lock().expect("breaker mutex poisoned"); 84 *state = BreakerState::Closed { failures: 0 }; 85 } 86 87 pub fn record_failure(&self) { 88 self.record_failure_at(self.clock.now_instant()); 89 } 90 91 fn record_failure_at(&self, now: Instant) { 92 let mut state = self.state.lock().expect("breaker mutex poisoned"); 93 let next = match *state { 94 BreakerState::Closed { failures } => { 95 let bumped = failures.saturating_add(1); 96 if bumped >= self.threshold.get() { 97 BreakerState::Open { 98 until: now + self.cooldown, 99 } 100 } else { 101 BreakerState::Closed { failures: bumped } 102 } 103 } 104 BreakerState::HalfOpen => BreakerState::Open { 105 until: now + self.cooldown, 106 }, 107 BreakerState::Open { .. } => *state, 108 }; 109 *state = next; 110 } 111} 112 113#[must_use = "permit must outlive the upstream call so failures can be recorded"] 114#[derive(Debug)] 115pub struct BreakerPermit { 116 breaker: Arc<Breaker>, 117 resolved: bool, 118} 119 120impl BreakerPermit { 121 fn new(breaker: Arc<Breaker>) -> Self { 122 Self { 123 breaker, 124 resolved: false, 125 } 126 } 127 128 pub fn record_success(mut self) { 129 self.resolved = true; 130 self.breaker.record_success(); 131 } 132 133 pub fn record_failure(mut self) { 134 self.resolved = true; 135 self.breaker.record_failure(); 136 } 137} 138 139impl Drop for BreakerPermit { 140 fn drop(&mut self) { 141 if !self.resolved { 142 self.breaker.record_success(); 143 } 144 } 145} 146 147#[cfg(test)] 148impl BreakerPermit { 149 fn record_failure_at(mut self, now: Instant) { 150 self.resolved = true; 151 self.breaker.record_failure_at(now); 152 } 153} 154 155#[cfg(test)] 156mod tests { 157 use super::*; 158 use bobbin_runtime::SystemClock; 159 160 fn breaker(threshold: u32, cooldown_ms: u64) -> Arc<Breaker> { 161 Arc::new(Breaker::new( 162 FailureThreshold::new(threshold).unwrap(), 163 Duration::from_millis(cooldown_ms), 164 Arc::new(SystemClock::new()), 165 )) 166 } 167 168 #[test] 169 fn closed_breaker_admits_all() { 170 let b = breaker(3, 100); 171 b.try_acquire().unwrap().record_success(); 172 b.try_acquire().unwrap().record_success(); 173 b.try_acquire().unwrap().record_success(); 174 } 175 176 #[test] 177 fn opens_at_threshold() { 178 let b = breaker(2, 1_000); 179 b.record_failure(); 180 b.record_failure(); 181 assert!(b.try_acquire().is_err(), "must be open after 2 failures"); 182 } 183 184 #[test] 185 fn success_resets_failure_count() { 186 let b = breaker(2, 1_000); 187 b.record_failure(); 188 b.record_success(); 189 b.record_failure(); 190 b.try_acquire() 191 .expect("successful run between failures must reset count") 192 .record_success(); 193 } 194 195 #[test] 196 fn cooldown_admits_one_trial_only() { 197 let b = breaker(1, 50); 198 let t0 = Instant::now(); 199 let after = t0 + Duration::from_millis(60); 200 b.record_failure_at(t0); 201 assert!(b.try_acquire_at(t0).is_err()); 202 let trial = b 203 .try_acquire_at(after) 204 .expect("trial admitted after cooldown"); 205 assert!( 206 b.try_acquire_at(after).is_err(), 207 "second concurrent half-open call must be rejected", 208 ); 209 trial.record_success(); 210 } 211 212 #[test] 213 fn half_open_failure_reopens() { 214 let b = breaker(1, 50); 215 let t0 = Instant::now(); 216 let after = t0 + Duration::from_millis(60); 217 b.record_failure_at(t0); 218 b.try_acquire_at(after) 219 .expect("trial admitted") 220 .record_failure_at(after); 221 assert!( 222 b.try_acquire_at(after).is_err(), 223 "half-open failure must reopen breaker", 224 ); 225 } 226 227 #[test] 228 fn half_open_success_closes() { 229 let b = breaker(1, 50); 230 let t0 = Instant::now(); 231 let after = t0 + Duration::from_millis(60); 232 b.record_failure_at(t0); 233 b.try_acquire_at(after) 234 .expect("trial admitted") 235 .record_success(); 236 b.try_acquire_at(after).expect("closed").record_success(); 237 b.try_acquire_at(after).expect("closed").record_success(); 238 } 239 240 #[test] 241 fn open_failure_does_not_extend_cooldown_indefinitely() { 242 let b = breaker(1, 50); 243 b.record_failure(); 244 let mid = Instant::now(); 245 b.record_failure(); 246 b.try_acquire_at(mid + Duration::from_millis(60)) 247 .expect("second failure while open must not push cooldown out") 248 .record_success(); 249 } 250 251 #[test] 252 fn dropped_permit_counts_as_success() { 253 let b = breaker(2, 1_000); 254 b.record_failure(); 255 drop(b.try_acquire().expect("admitted")); 256 b.try_acquire() 257 .expect("dropped permit must reset failure count") 258 .record_success(); 259 } 260 261 #[test] 262 fn dropped_half_open_permit_closes_breaker() { 263 let b = breaker(1, 50); 264 let t0 = Instant::now(); 265 let after = t0 + Duration::from_millis(60); 266 b.record_failure_at(t0); 267 drop(b.try_acquire_at(after).expect("trial admitted")); 268 b.try_acquire_at(after) 269 .expect("dropped half-open permit must close breaker") 270 .record_success(); 271 } 272}