Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 4.6 kB View raw
1use std::sync::Arc; 2use std::sync::atomic::{AtomicUsize, Ordering}; 3 4use bobbin_runtime::MemoryBudget; 5 6use crate::XrpcError; 7 8#[derive(Clone, Copy, Debug, Eq, PartialEq)] 9pub struct PerRequestAnonBytes(u64); 10 11impl PerRequestAnonBytes { 12 pub const fn new(bytes: u64) -> Self { 13 Self(bytes) 14 } 15 16 pub const fn bytes(self) -> u64 { 17 self.0 18 } 19} 20 21#[derive(Clone, Copy, Debug, Eq, PartialEq)] 22pub struct ReservedFloor(u64); 23 24impl ReservedFloor { 25 pub const fn new(bytes: u64) -> Self { 26 Self(bytes) 27 } 28 29 pub const fn bytes(self) -> u64 { 30 self.0 31 } 32} 33 34#[derive(Clone, Copy, Debug, Eq, PartialEq)] 35pub struct MaxInFlight(usize); 36 37impl MaxInFlight { 38 pub fn from_budget( 39 budget: MemoryBudget, 40 reserved: ReservedFloor, 41 per_request: PerRequestAnonBytes, 42 ) -> Self { 43 let usable = budget.bytes().saturating_sub(reserved.bytes()); 44 let per = per_request.bytes().max(1); 45 let slots = (usable / per).max(1); 46 Self(usize::try_from(slots).unwrap_or(usize::MAX)) 47 } 48 49 pub const fn get(self) -> usize { 50 self.0 51 } 52} 53 54#[derive(Clone, Copy, Debug, Eq, PartialEq)] 55pub enum PressureVerdict { 56 Tighten, 57 Hold, 58 Relieve, 59} 60 61struct LimiterCore { 62 in_flight: AtomicUsize, 63 limit: AtomicUsize, 64 floor: usize, 65 ceiling: usize, 66 relieve_step: usize, 67} 68 69pub struct HeavyLimiter { 70 core: Arc<LimiterCore>, 71} 72 73pub struct HeavyPermit { 74 core: Arc<LimiterCore>, 75} 76 77impl Drop for HeavyPermit { 78 fn drop(&mut self) { 79 self.core.in_flight.fetch_sub(1, Ordering::AcqRel); 80 } 81} 82 83impl HeavyLimiter { 84 pub fn new(max: MaxInFlight) -> Self { 85 let ceiling = max.get(); 86 Self { 87 core: Arc::new(LimiterCore { 88 in_flight: AtomicUsize::new(0), 89 limit: AtomicUsize::new(ceiling), 90 floor: 1, 91 ceiling, 92 relieve_step: (ceiling / 16).max(1), 93 }), 94 } 95 } 96 97 pub fn try_enter(&self) -> Result<HeavyPermit, XrpcError> { 98 let prev = self.core.in_flight.fetch_add(1, Ordering::AcqRel); 99 if prev < self.core.limit.load(Ordering::Acquire) { 100 Ok(HeavyPermit { 101 core: Arc::clone(&self.core), 102 }) 103 } else { 104 self.core.in_flight.fetch_sub(1, Ordering::AcqRel); 105 Err(XrpcError::overloaded()) 106 } 107 } 108 109 pub fn adjust(&self, verdict: PressureVerdict) { 110 let cur = self.core.limit.load(Ordering::Acquire); 111 let next = match verdict { 112 PressureVerdict::Tighten => (cur / 2).max(self.core.floor), 113 PressureVerdict::Relieve => (cur + self.core.relieve_step).min(self.core.ceiling), 114 PressureVerdict::Hold => cur, 115 }; 116 self.core.limit.store(next, Ordering::Release); 117 } 118 119 pub fn limit(&self) -> usize { 120 self.core.limit.load(Ordering::Acquire) 121 } 122} 123 124#[cfg(test)] 125mod tests { 126 use super::*; 127 128 #[test] 129 fn reserved_floor_above_budget_yields_one_slot() { 130 let max = MaxInFlight::from_budget( 131 MemoryBudget::new(100 * 1024 * 1024), 132 ReservedFloor::new(114 * 1024 * 1024), 133 PerRequestAnonBytes::new(2 * 1024 * 1024), 134 ); 135 assert_eq!(max.get(), 1); 136 } 137 138 #[test] 139 fn larger_budget_opens_more_slots() { 140 let max = MaxInFlight::from_budget( 141 MemoryBudget::new(500 * 1024 * 1024), 142 ReservedFloor::new(114 * 1024 * 1024), 143 PerRequestAnonBytes::new(2 * 1024 * 1024), 144 ); 145 assert_eq!(max.get(), (386 * 1024 * 1024) / (2 * 1024 * 1024)); 146 } 147 148 #[test] 149 fn limiter_sheds_when_at_limit() { 150 let limiter = HeavyLimiter::new(MaxInFlight(1)); 151 let held = limiter.try_enter().expect("first permit enters"); 152 assert!(matches!(limiter.try_enter(), Err(XrpcError::Overloaded))); 153 drop(held); 154 assert!(limiter.try_enter().is_ok()); 155 } 156 157 #[test] 158 fn adjust_clamps_between_floor_and_ceiling() { 159 let limiter = HeavyLimiter::new(MaxInFlight(8)); 160 limiter.adjust(PressureVerdict::Tighten); 161 assert_eq!(limiter.limit(), 4); 162 limiter.adjust(PressureVerdict::Tighten); 163 assert_eq!(limiter.limit(), 2); 164 limiter.adjust(PressureVerdict::Tighten); 165 assert_eq!(limiter.limit(), 1); 166 limiter.adjust(PressureVerdict::Tighten); 167 assert_eq!(limiter.limit(), 1); 168 (0..20).for_each(|_| limiter.adjust(PressureVerdict::Relieve)); 169 assert_eq!(limiter.limit(), 8); 170 } 171}