Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use bobbin_runtime::MemoryBudget;
5
6use crate::XrpcError;
7
8#[derive(Clone, Copy, Debug, Eq, PartialEq)]
9pub struct PerRequestAnonBytes(u64);
10
11impl PerRequestAnonBytes {
12 pub const fn new(bytes: u64) -> Self {
13 Self(bytes)
14 }
15
16 pub const fn bytes(self) -> u64 {
17 self.0
18 }
19}
20
21#[derive(Clone, Copy, Debug, Eq, PartialEq)]
22pub struct ReservedFloor(u64);
23
24impl ReservedFloor {
25 pub const fn new(bytes: u64) -> Self {
26 Self(bytes)
27 }
28
29 pub const fn bytes(self) -> u64 {
30 self.0
31 }
32}
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub struct MaxInFlight(usize);
36
37impl MaxInFlight {
38 pub fn from_budget(
39 budget: MemoryBudget,
40 reserved: ReservedFloor,
41 per_request: PerRequestAnonBytes,
42 ) -> Self {
43 let usable = budget.bytes().saturating_sub(reserved.bytes());
44 let per = per_request.bytes().max(1);
45 let slots = (usable / per).max(1);
46 Self(usize::try_from(slots).unwrap_or(usize::MAX))
47 }
48
49 pub const fn get(self) -> usize {
50 self.0
51 }
52}
53
54#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub enum PressureVerdict {
56 Tighten,
57 Hold,
58 Relieve,
59}
60
61struct LimiterCore {
62 in_flight: AtomicUsize,
63 limit: AtomicUsize,
64 floor: usize,
65 ceiling: usize,
66 relieve_step: usize,
67}
68
69pub struct HeavyLimiter {
70 core: Arc<LimiterCore>,
71}
72
73pub struct HeavyPermit {
74 core: Arc<LimiterCore>,
75}
76
77impl Drop for HeavyPermit {
78 fn drop(&mut self) {
79 self.core.in_flight.fetch_sub(1, Ordering::AcqRel);
80 }
81}
82
83impl HeavyLimiter {
84 pub fn new(max: MaxInFlight) -> Self {
85 let ceiling = max.get();
86 Self {
87 core: Arc::new(LimiterCore {
88 in_flight: AtomicUsize::new(0),
89 limit: AtomicUsize::new(ceiling),
90 floor: 1,
91 ceiling,
92 relieve_step: (ceiling / 16).max(1),
93 }),
94 }
95 }
96
97 pub fn try_enter(&self) -> Result<HeavyPermit, XrpcError> {
98 let prev = self.core.in_flight.fetch_add(1, Ordering::AcqRel);
99 if prev < self.core.limit.load(Ordering::Acquire) {
100 Ok(HeavyPermit {
101 core: Arc::clone(&self.core),
102 })
103 } else {
104 self.core.in_flight.fetch_sub(1, Ordering::AcqRel);
105 Err(XrpcError::overloaded())
106 }
107 }
108
109 pub fn adjust(&self, verdict: PressureVerdict) {
110 let cur = self.core.limit.load(Ordering::Acquire);
111 let next = match verdict {
112 PressureVerdict::Tighten => (cur / 2).max(self.core.floor),
113 PressureVerdict::Relieve => (cur + self.core.relieve_step).min(self.core.ceiling),
114 PressureVerdict::Hold => cur,
115 };
116 self.core.limit.store(next, Ordering::Release);
117 }
118
119 pub fn limit(&self) -> usize {
120 self.core.limit.load(Ordering::Acquire)
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn reserved_floor_above_budget_yields_one_slot() {
130 let max = MaxInFlight::from_budget(
131 MemoryBudget::new(100 * 1024 * 1024),
132 ReservedFloor::new(114 * 1024 * 1024),
133 PerRequestAnonBytes::new(2 * 1024 * 1024),
134 );
135 assert_eq!(max.get(), 1);
136 }
137
138 #[test]
139 fn larger_budget_opens_more_slots() {
140 let max = MaxInFlight::from_budget(
141 MemoryBudget::new(500 * 1024 * 1024),
142 ReservedFloor::new(114 * 1024 * 1024),
143 PerRequestAnonBytes::new(2 * 1024 * 1024),
144 );
145 assert_eq!(max.get(), (386 * 1024 * 1024) / (2 * 1024 * 1024));
146 }
147
148 #[test]
149 fn limiter_sheds_when_at_limit() {
150 let limiter = HeavyLimiter::new(MaxInFlight(1));
151 let held = limiter.try_enter().expect("first permit enters");
152 assert!(matches!(limiter.try_enter(), Err(XrpcError::Overloaded)));
153 drop(held);
154 assert!(limiter.try_enter().is_ok());
155 }
156
157 #[test]
158 fn adjust_clamps_between_floor_and_ceiling() {
159 let limiter = HeavyLimiter::new(MaxInFlight(8));
160 limiter.adjust(PressureVerdict::Tighten);
161 assert_eq!(limiter.limit(), 4);
162 limiter.adjust(PressureVerdict::Tighten);
163 assert_eq!(limiter.limit(), 2);
164 limiter.adjust(PressureVerdict::Tighten);
165 assert_eq!(limiter.limit(), 1);
166 limiter.adjust(PressureVerdict::Tighten);
167 assert_eq!(limiter.limit(), 1);
168 (0..20).for_each(|_| limiter.adjust(PressureVerdict::Relieve));
169 assert_eq!(limiter.limit(), 8);
170 }
171}