Monorepo for Tangled
tangled.org
1use std::sync::{Arc, Mutex};
2use std::time::Duration;
3
4use bobbin_runtime::Clock;
5use thiserror::Error;
6use tokio::time::Instant;
7
8#[derive(Clone, Copy, Debug, Eq, PartialEq)]
9pub struct FailureThreshold(u32);
10
11#[derive(Clone, Copy, Debug, Error)]
12#[error("failure threshold must be at least 1")]
13pub struct ThresholdError;
14
15impl FailureThreshold {
16 pub const fn new(n: u32) -> Result<Self, ThresholdError> {
17 match n {
18 0 => Err(ThresholdError),
19 other => Ok(Self(other)),
20 }
21 }
22
23 pub const fn get(self) -> u32 {
24 self.0
25 }
26}
27
28#[derive(Clone, Copy, Debug)]
29enum BreakerState {
30 Closed { failures: u32 },
31 Open { until: Instant },
32 HalfOpen,
33}
34
35#[derive(Clone, Copy, Debug, Error)]
36#[error("circuit breaker open")]
37pub struct CircuitOpen;
38
39pub struct Breaker {
40 state: Mutex<BreakerState>,
41 threshold: FailureThreshold,
42 cooldown: Duration,
43 clock: Arc<dyn Clock>,
44}
45
46impl std::fmt::Debug for Breaker {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("Breaker")
49 .field("state", &self.state)
50 .field("threshold", &self.threshold)
51 .field("cooldown", &self.cooldown)
52 .finish_non_exhaustive()
53 }
54}
55
56impl Breaker {
57 pub fn new(threshold: FailureThreshold, cooldown: Duration, clock: Arc<dyn Clock>) -> Self {
58 Self {
59 state: Mutex::new(BreakerState::Closed { failures: 0 }),
60 threshold,
61 cooldown,
62 clock,
63 }
64 }
65
66 pub fn try_acquire(self: &Arc<Self>) -> Result<BreakerPermit, CircuitOpen> {
67 self.try_acquire_at(self.clock.now_instant())
68 }
69
70 fn try_acquire_at(self: &Arc<Self>, now: Instant) -> Result<BreakerPermit, CircuitOpen> {
71 let mut state = self.state.lock().expect("breaker mutex poisoned");
72 match *state {
73 BreakerState::Closed { .. } => Ok(BreakerPermit::new(Arc::clone(self))),
74 BreakerState::Open { until } if now >= until => {
75 *state = BreakerState::HalfOpen;
76 Ok(BreakerPermit::new(Arc::clone(self)))
77 }
78 BreakerState::Open { .. } | BreakerState::HalfOpen => Err(CircuitOpen),
79 }
80 }
81
82 pub fn record_success(&self) {
83 let mut state = self.state.lock().expect("breaker mutex poisoned");
84 *state = BreakerState::Closed { failures: 0 };
85 }
86
87 pub fn record_failure(&self) {
88 self.record_failure_at(self.clock.now_instant());
89 }
90
91 fn record_failure_at(&self, now: Instant) {
92 let mut state = self.state.lock().expect("breaker mutex poisoned");
93 let next = match *state {
94 BreakerState::Closed { failures } => {
95 let bumped = failures.saturating_add(1);
96 if bumped >= self.threshold.get() {
97 BreakerState::Open {
98 until: now + self.cooldown,
99 }
100 } else {
101 BreakerState::Closed { failures: bumped }
102 }
103 }
104 BreakerState::HalfOpen => BreakerState::Open {
105 until: now + self.cooldown,
106 },
107 BreakerState::Open { .. } => *state,
108 };
109 *state = next;
110 }
111}
112
113#[must_use = "permit must outlive the upstream call so failures can be recorded"]
114#[derive(Debug)]
115pub struct BreakerPermit {
116 breaker: Arc<Breaker>,
117 resolved: bool,
118}
119
120impl BreakerPermit {
121 fn new(breaker: Arc<Breaker>) -> Self {
122 Self {
123 breaker,
124 resolved: false,
125 }
126 }
127
128 pub fn record_success(mut self) {
129 self.resolved = true;
130 self.breaker.record_success();
131 }
132
133 pub fn record_failure(mut self) {
134 self.resolved = true;
135 self.breaker.record_failure();
136 }
137}
138
139impl Drop for BreakerPermit {
140 fn drop(&mut self) {
141 if !self.resolved {
142 self.breaker.record_success();
143 }
144 }
145}
146
147#[cfg(test)]
148impl BreakerPermit {
149 fn record_failure_at(mut self, now: Instant) {
150 self.resolved = true;
151 self.breaker.record_failure_at(now);
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use bobbin_runtime::SystemClock;
159
160 fn breaker(threshold: u32, cooldown_ms: u64) -> Arc<Breaker> {
161 Arc::new(Breaker::new(
162 FailureThreshold::new(threshold).unwrap(),
163 Duration::from_millis(cooldown_ms),
164 Arc::new(SystemClock::new()),
165 ))
166 }
167
168 #[test]
169 fn closed_breaker_admits_all() {
170 let b = breaker(3, 100);
171 b.try_acquire().unwrap().record_success();
172 b.try_acquire().unwrap().record_success();
173 b.try_acquire().unwrap().record_success();
174 }
175
176 #[test]
177 fn opens_at_threshold() {
178 let b = breaker(2, 1_000);
179 b.record_failure();
180 b.record_failure();
181 assert!(b.try_acquire().is_err(), "must be open after 2 failures");
182 }
183
184 #[test]
185 fn success_resets_failure_count() {
186 let b = breaker(2, 1_000);
187 b.record_failure();
188 b.record_success();
189 b.record_failure();
190 b.try_acquire()
191 .expect("successful run between failures must reset count")
192 .record_success();
193 }
194
195 #[test]
196 fn cooldown_admits_one_trial_only() {
197 let b = breaker(1, 50);
198 let t0 = Instant::now();
199 let after = t0 + Duration::from_millis(60);
200 b.record_failure_at(t0);
201 assert!(b.try_acquire_at(t0).is_err());
202 let trial = b
203 .try_acquire_at(after)
204 .expect("trial admitted after cooldown");
205 assert!(
206 b.try_acquire_at(after).is_err(),
207 "second concurrent half-open call must be rejected",
208 );
209 trial.record_success();
210 }
211
212 #[test]
213 fn half_open_failure_reopens() {
214 let b = breaker(1, 50);
215 let t0 = Instant::now();
216 let after = t0 + Duration::from_millis(60);
217 b.record_failure_at(t0);
218 b.try_acquire_at(after)
219 .expect("trial admitted")
220 .record_failure_at(after);
221 assert!(
222 b.try_acquire_at(after).is_err(),
223 "half-open failure must reopen breaker",
224 );
225 }
226
227 #[test]
228 fn half_open_success_closes() {
229 let b = breaker(1, 50);
230 let t0 = Instant::now();
231 let after = t0 + Duration::from_millis(60);
232 b.record_failure_at(t0);
233 b.try_acquire_at(after)
234 .expect("trial admitted")
235 .record_success();
236 b.try_acquire_at(after).expect("closed").record_success();
237 b.try_acquire_at(after).expect("closed").record_success();
238 }
239
240 #[test]
241 fn open_failure_does_not_extend_cooldown_indefinitely() {
242 let b = breaker(1, 50);
243 b.record_failure();
244 let mid = Instant::now();
245 b.record_failure();
246 b.try_acquire_at(mid + Duration::from_millis(60))
247 .expect("second failure while open must not push cooldown out")
248 .record_success();
249 }
250
251 #[test]
252 fn dropped_permit_counts_as_success() {
253 let b = breaker(2, 1_000);
254 b.record_failure();
255 drop(b.try_acquire().expect("admitted"));
256 b.try_acquire()
257 .expect("dropped permit must reset failure count")
258 .record_success();
259 }
260
261 #[test]
262 fn dropped_half_open_permit_closes_breaker() {
263 let b = breaker(1, 50);
264 let t0 = Instant::now();
265 let after = t0 + Duration::from_millis(60);
266 b.record_failure_at(t0);
267 drop(b.try_acquire_at(after).expect("trial admitted"));
268 b.try_acquire_at(after)
269 .expect("dropped half-open permit must close breaker")
270 .record_success();
271 }
272}