Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1use std::num::NonZeroUsize; 2use std::time::Duration; 3 4use bobbin_ingest::{DisconnectSnapshot, WarmingBufferSnapshot, WarmingShadowSnapshot}; 5use tokio::runtime::Builder as TokioBuilder; 6use tracing_subscriber::Registry; 7use tracing_subscriber::layer::SubscriberExt; 8 9use crate::report::{SimOutcome, SimReport}; 10use crate::runtime::{Sim, SimConfig}; 11use crate::trace_capture::TraceCapture; 12use crate::workload::Workload; 13 14#[derive(Clone, Debug)] 15pub struct LeakRunConfig { 16 pub seed: u64, 17 pub parallelism: NonZeroUsize, 18 pub max_virtual_runtime: Duration, 19 pub mem_ws_capacity: usize, 20 pub warming_buffer_enabled: bool, 21} 22 23impl LeakRunConfig { 24 pub fn new( 25 seed: u64, 26 parallelism: NonZeroUsize, 27 max_virtual_runtime: Duration, 28 mem_ws_capacity: usize, 29 ) -> Self { 30 Self { 31 seed, 32 parallelism, 33 max_virtual_runtime, 34 mem_ws_capacity, 35 warming_buffer_enabled: true, 36 } 37 } 38} 39 40#[derive(Clone, Debug, Eq, PartialEq)] 41pub enum LeakOutcome { 42 Match, 43 OutcomeMismatch { 44 first: SimOutcome, 45 second: SimOutcome, 46 }, 47 EventCountMismatch { 48 first: u64, 49 second: u64, 50 }, 51 EdgeCountMismatch { 52 first: u64, 53 second: u64, 54 }, 55 LastCursorMismatch { 56 first: u64, 57 second: u64, 58 }, 59 ResolverHitsMismatch { 60 first: u64, 61 second: u64, 62 }, 63 ResolverMissesMismatch { 64 first: u64, 65 second: u64, 66 }, 67 ConsumerTooSlowMismatch { 68 first: u64, 69 second: u64, 70 }, 71 DisconnectCountMismatch { 72 first: u64, 73 second: u64, 74 }, 75 LastDisconnectMismatch { 76 first: Option<DisconnectSnapshot>, 77 second: Option<DisconnectSnapshot>, 78 }, 79 WarmingShadowMismatch { 80 first: WarmingShadowSnapshot, 81 second: WarmingShadowSnapshot, 82 }, 83 WarmingBufferMismatch { 84 first: WarmingBufferSnapshot, 85 second: WarmingBufferSnapshot, 86 }, 87 TraceLengthMismatch { 88 first: usize, 89 second: usize, 90 }, 91 TraceLineMismatch { 92 index: usize, 93 first: String, 94 second: String, 95 }, 96} 97 98#[derive(Debug)] 99pub struct LeakRunResult { 100 pub config: LeakRunConfig, 101 pub workload: &'static str, 102 pub outcome: LeakOutcome, 103 pub first_report: SimReport, 104 pub second_report: SimReport, 105} 106 107impl LeakRunResult { 108 pub fn passed(&self) -> bool { 109 matches!(self.outcome, LeakOutcome::Match) 110 } 111} 112 113pub fn run_leak_check<F>(config: LeakRunConfig, workload_factory: F) -> LeakRunResult 114where 115 F: Fn() -> Box<dyn Workload>, 116{ 117 let workload_name = { 118 let probe = workload_factory(); 119 probe.name() 120 }; 121 122 let (first_report, first_lines) = run_once(&config, workload_factory()); 123 let (second_report, second_lines) = run_once(&config, workload_factory()); 124 125 let outcome = compare_runs(&first_report, &first_lines, &second_report, &second_lines); 126 127 LeakRunResult { 128 config, 129 workload: workload_name, 130 outcome, 131 first_report, 132 second_report, 133 } 134} 135 136fn run_once(config: &LeakRunConfig, workload: Box<dyn Workload>) -> (SimReport, Vec<String>) { 137 let capture = TraceCapture::new(); 138 let subscriber = Registry::default().with(capture.layer()); 139 140 let mut sim_config = SimConfig::new(config.seed); 141 sim_config.max_virtual_runtime = config.max_virtual_runtime; 142 sim_config.parallelism = config.parallelism; 143 sim_config.mem_ws_capacity = config.mem_ws_capacity; 144 sim_config.warming_buffer_enabled = config.warming_buffer_enabled; 145 146 let runtime = TokioBuilder::new_current_thread() 147 .enable_all() 148 .start_paused(true) 149 .build() 150 .expect("build current_thread runtime with paused time"); 151 152 let report = tracing::subscriber::with_default(subscriber, || { 153 runtime.block_on(Sim::new(sim_config, workload).run()) 154 }); 155 drop(runtime); 156 157 let lines = capture.into_lines(); 158 (report, lines) 159} 160 161fn compare_runs( 162 a: &SimReport, 163 a_lines: &[String], 164 b: &SimReport, 165 b_lines: &[String], 166) -> LeakOutcome { 167 if a.outcome != b.outcome { 168 return LeakOutcome::OutcomeMismatch { 169 first: a.outcome, 170 second: b.outcome, 171 }; 172 } 173 if a.events_processed != b.events_processed { 174 return LeakOutcome::EventCountMismatch { 175 first: a.events_processed, 176 second: b.events_processed, 177 }; 178 } 179 if a.edge_count != b.edge_count { 180 return LeakOutcome::EdgeCountMismatch { 181 first: a.edge_count, 182 second: b.edge_count, 183 }; 184 } 185 if a.last_cursor != b.last_cursor { 186 return LeakOutcome::LastCursorMismatch { 187 first: a.last_cursor, 188 second: b.last_cursor, 189 }; 190 } 191 if a.resolver_hits != b.resolver_hits { 192 return LeakOutcome::ResolverHitsMismatch { 193 first: a.resolver_hits, 194 second: b.resolver_hits, 195 }; 196 } 197 if a.resolver_misses != b.resolver_misses { 198 return LeakOutcome::ResolverMissesMismatch { 199 first: a.resolver_misses, 200 second: b.resolver_misses, 201 }; 202 } 203 if a.consumer_too_slow_count != b.consumer_too_slow_count { 204 return LeakOutcome::ConsumerTooSlowMismatch { 205 first: a.consumer_too_slow_count, 206 second: b.consumer_too_slow_count, 207 }; 208 } 209 if a.disconnect_count != b.disconnect_count { 210 return LeakOutcome::DisconnectCountMismatch { 211 first: a.disconnect_count, 212 second: b.disconnect_count, 213 }; 214 } 215 if a.last_disconnect != b.last_disconnect { 216 return LeakOutcome::LastDisconnectMismatch { 217 first: a.last_disconnect.clone(), 218 second: b.last_disconnect.clone(), 219 }; 220 } 221 if a.warming_shadow != b.warming_shadow { 222 return LeakOutcome::WarmingShadowMismatch { 223 first: a.warming_shadow, 224 second: b.warming_shadow, 225 }; 226 } 227 if a.warming_buffer != b.warming_buffer { 228 return LeakOutcome::WarmingBufferMismatch { 229 first: a.warming_buffer, 230 second: b.warming_buffer, 231 }; 232 } 233 if a_lines.len() != b_lines.len() { 234 return LeakOutcome::TraceLengthMismatch { 235 first: a_lines.len(), 236 second: b_lines.len(), 237 }; 238 } 239 for (i, (x, y)) in a_lines.iter().zip(b_lines.iter()).enumerate() { 240 if x != y { 241 return LeakOutcome::TraceLineMismatch { 242 index: i, 243 first: x.clone(), 244 second: y.clone(), 245 }; 246 } 247 } 248 LeakOutcome::Match 249}