Monorepo for Tangled
tangled.org
1use std::num::NonZeroUsize;
2use std::path::PathBuf;
3use std::process::ExitCode;
4use std::time::Duration;
5
6use bobbin_sim::workloads::{
7 CancelMidHydration, CancelMidHydrationConfig, ColdStartUnderLiveLoad,
8 ColdStartUnderLiveLoadConfig, ConcurrentReadsDuringReplay, ConcurrentReadsDuringReplayConfig,
9 FrameBurst, HydrantDisconnectBarrage, HydrantDisconnectBarrageConfig, SlingshotFlap,
10 SlingshotFlapConfig,
11};
12use bobbin_sim::{
13 LeakOutcome, LeakRunConfig, Sim, SimConfig, SimOutcome, SimReport, Workload, run_leak_check,
14};
15use clap::{Parser, ValueEnum};
16
17#[derive(Parser, Debug)]
18#[command(name = "bobbin-sim", version)]
19struct Cli {
20 #[arg(long, default_value_t = 0)]
21 seed: u64,
22 #[arg(long, value_enum, default_value_t = WorkloadName::FrameBurst)]
23 workload: WorkloadName,
24 #[arg(long, default_value_t = 64)]
25 frames: usize,
26 #[arg(long, default_value_t = 60)]
27 max_virtual_seconds: u64,
28 #[arg(long)]
29 no_brownout: bool,
30 #[arg(long, default_value_t = 200)]
31 cross_did_stars: usize,
32 #[arg(long, default_value_t = 1_000)]
33 brownout_start_ms: u64,
34 #[arg(long, default_value_t = 5_000)]
35 brownout_duration_ms: u64,
36 #[arg(long, default_value_t = 200)]
37 brownout_latency_ms: u64,
38 #[arg(long, default_value_t = 2)]
39 normal_latency_ms: u64,
40 #[arg(long, default_value_t = 16)]
41 parallelism: usize,
42 #[arg(long, default_value_t = 4096)]
43 mem_ws_capacity: usize,
44 #[arg(long, default_value_t = 30_000)]
45 hydrant_send_timeout_ms: u64,
46 #[arg(long, default_value_t = 0)]
47 hydrant_frame_pace_us: u64,
48 #[arg(long, default_value_t = 1)]
49 seeds: u64,
50 #[arg(long)]
51 quarantine_out: Option<PathBuf>,
52 #[arg(long)]
53 leak_check: bool,
54}
55
56#[derive(Clone, Copy, Debug, ValueEnum, Eq, PartialEq)]
57enum WorkloadName {
58 FrameBurst,
59 SlingshotFlap,
60 CancelMidHydration,
61 HydrantDisconnectBarrage,
62 ColdStartUnderLiveLoad,
63 ConcurrentReadsDuringReplay,
64}
65
66fn main() -> ExitCode {
67 let cli = Cli::parse();
68 tracing_subscriber::fmt()
69 .with_env_filter(
70 tracing_subscriber::EnvFilter::try_from_default_env()
71 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn,bobbin_sim=info")),
72 )
73 .with_writer(std::io::stderr)
74 .init();
75
76 if cli.seeds <= 1 && !cli.leak_check {
77 let report = run_single(&cli, cli.seed);
78 println!(
79 "{}",
80 serde_json::to_string_pretty(&report_as_json(&report)).unwrap()
81 );
82 exit_code_from(report.outcome)
83 } else if cli.leak_check && cli.seeds <= 1 {
84 let result = run_leak(&cli, cli.seed);
85 let json = leak_result_as_json(&result);
86 println!("{}", serde_json::to_string_pretty(&json).unwrap());
87 if matches!(result.outcome, LeakOutcome::Match)
88 && matches!(result.first_report.outcome, SimOutcome::Passed)
89 {
90 ExitCode::from(0)
91 } else {
92 ExitCode::from(1)
93 }
94 } else {
95 run_sweep(&cli)
96 }
97}
98
99fn run_sweep(cli: &Cli) -> ExitCode {
100 let mut quarantined: Vec<serde_json::Value> = Vec::new();
101 let mut passed = 0u64;
102 let total = cli.seeds;
103 for offset in 0..total {
104 let seed = cli.seed.wrapping_add(offset);
105 let outcome_record = if cli.leak_check {
106 let result = run_leak(cli, seed);
107 let leak_match = matches!(result.outcome, LeakOutcome::Match);
108 let workload_passed = matches!(result.first_report.outcome, SimOutcome::Passed);
109 let ok = leak_match && workload_passed;
110 if ok {
111 passed += 1;
112 None
113 } else {
114 Some(leak_result_as_json(&result))
115 }
116 } else {
117 let report = run_single(cli, seed);
118 if matches!(report.outcome, SimOutcome::Passed) {
119 passed += 1;
120 None
121 } else {
122 Some(report_as_json(&report))
123 }
124 };
125 if let Some(rec) = outcome_record {
126 quarantined.push(rec);
127 }
128 }
129 let summary = serde_json::json!({
130 "workload": format!("{:?}", cli.workload),
131 "seeds_run": total,
132 "passed": passed,
133 "failed": total - passed,
134 "leak_check": cli.leak_check,
135 "parallelism": cli.parallelism,
136 "quarantine_count": quarantined.len(),
137 });
138 println!("{}", serde_json::to_string_pretty(&summary).unwrap());
139 if let Some(path) = &cli.quarantine_out {
140 let payload = serde_json::json!({
141 "summary": summary,
142 "failures": quarantined,
143 });
144 std::fs::write(path, serde_json::to_vec_pretty(&payload).unwrap())
145 .expect("write quarantine output");
146 eprintln!("quarantine written to {}", path.display());
147 }
148 if quarantined.is_empty() {
149 ExitCode::from(0)
150 } else {
151 ExitCode::from(1)
152 }
153}
154
155fn run_single(cli: &Cli, seed: u64) -> SimReport {
156 let workload = build_workload(cli);
157 let mut config = SimConfig::new(seed);
158 config.max_virtual_runtime = Duration::from_secs(cli.max_virtual_seconds);
159 config.parallelism = NonZeroUsize::new(cli.parallelism).expect("parallelism > 0");
160 config.mem_ws_capacity = cli.mem_ws_capacity;
161 let runtime = tokio::runtime::Builder::new_current_thread()
162 .enable_all()
163 .start_paused(true)
164 .build()
165 .expect("build current_thread runtime with paused time");
166 runtime.block_on(Sim::new(config, workload).run())
167}
168
169fn run_leak(cli: &Cli, seed: u64) -> bobbin_sim::LeakRunResult {
170 let leak_config = LeakRunConfig {
171 seed,
172 parallelism: NonZeroUsize::new(cli.parallelism).expect("parallelism > 0"),
173 max_virtual_runtime: Duration::from_secs(cli.max_virtual_seconds),
174 mem_ws_capacity: cli.mem_ws_capacity,
175 warming_buffer_enabled: true,
176 };
177 let cli_snapshot = CliSnapshot::from(cli);
178 let factory = move || -> Box<dyn Workload> { build_workload_from(&cli_snapshot) };
179 run_leak_check(leak_config, factory)
180}
181
182#[derive(Clone)]
183struct CliSnapshot {
184 workload: WorkloadName,
185 frames: usize,
186 cross_did_stars: usize,
187 normal_latency_ms: u64,
188 brownout_start_ms: u64,
189 brownout_duration_ms: u64,
190 brownout_latency_ms: u64,
191 brownout_enabled: bool,
192 hydrant_send_timeout_ms: u64,
193 hydrant_frame_pace_us: u64,
194}
195
196impl From<&Cli> for CliSnapshot {
197 fn from(cli: &Cli) -> Self {
198 Self {
199 workload: cli.workload,
200 frames: cli.frames,
201 cross_did_stars: cli.cross_did_stars,
202 normal_latency_ms: cli.normal_latency_ms,
203 brownout_start_ms: cli.brownout_start_ms,
204 brownout_duration_ms: cli.brownout_duration_ms,
205 brownout_latency_ms: cli.brownout_latency_ms,
206 brownout_enabled: !cli.no_brownout,
207 hydrant_send_timeout_ms: cli.hydrant_send_timeout_ms,
208 hydrant_frame_pace_us: cli.hydrant_frame_pace_us,
209 }
210 }
211}
212
213fn build_workload(cli: &Cli) -> Box<dyn Workload> {
214 build_workload_from(&CliSnapshot::from(cli))
215}
216
217fn build_workload_from(snap: &CliSnapshot) -> Box<dyn Workload> {
218 match snap.workload {
219 WorkloadName::FrameBurst => Box::new(FrameBurst::new(snap.frames)),
220 WorkloadName::SlingshotFlap => Box::new(SlingshotFlap::new(SlingshotFlapConfig {
221 cross_did_stars: snap.cross_did_stars,
222 normal_latency_ms: snap.normal_latency_ms,
223 brownout_start_ms: snap.brownout_start_ms,
224 brownout_duration_ms: snap.brownout_duration_ms,
225 brownout_latency_ms: snap.brownout_latency_ms,
226 brownout_enabled: snap.brownout_enabled,
227 hydrant_send_timeout_ms: snap.hydrant_send_timeout_ms,
228 hydrant_frame_pace_us: snap.hydrant_frame_pace_us,
229 omit_target_repos: false,
230 emit_live_promotion_frame: false,
231 })),
232 WorkloadName::CancelMidHydration => {
233 Box::new(CancelMidHydration::new(CancelMidHydrationConfig::default()))
234 }
235 WorkloadName::HydrantDisconnectBarrage => Box::new(HydrantDisconnectBarrage::new(
236 HydrantDisconnectBarrageConfig::default(),
237 )),
238 WorkloadName::ColdStartUnderLiveLoad => Box::new(ColdStartUnderLiveLoad::new(
239 ColdStartUnderLiveLoadConfig::default(),
240 )),
241 WorkloadName::ConcurrentReadsDuringReplay => Box::new(ConcurrentReadsDuringReplay::new(
242 ConcurrentReadsDuringReplayConfig::default(),
243 )),
244 }
245}
246
247fn report_as_json(r: &SimReport) -> serde_json::Value {
248 serde_json::json!({
249 "workload": r.workload,
250 "seed": r.seed,
251 "outcome": format!("{:?}", r.outcome),
252 "virtual_runtime_micros": r.virtual_runtime.as_micros() as u64,
253 "virtual_clock_end_unix_micros": r.virtual_clock_end.raw(),
254 "events_processed": r.events_processed,
255 "last_cursor": r.last_cursor,
256 "edge_count": r.edge_count,
257 "resolver_hits": r.resolver_hits,
258 "resolver_misses": r.resolver_misses,
259 "consumer_too_slow_count": r.consumer_too_slow_count,
260 "disconnect_count": r.disconnect_count,
261 "last_disconnect": r.last_disconnect.as_ref().map(|d| serde_json::json!({
262 "kind": format!("{:?}", d.kind),
263 "message": d.message,
264 "at_unix_micros": d.at_unix_micros.raw(),
265 "last_cursor": d.last_cursor.raw(),
266 })),
267 "warming_shadow": {
268 "enqueued_total": r.warming_shadow.enqueued_total,
269 "drained_via_observe_total": r.warming_shadow.drained_via_observe_total,
270 "max_concurrent": r.warming_shadow.max_concurrent,
271 "residual": r.warming_shadow.residual,
272 "distinct_keys_seen": r.warming_shadow.distinct_keys_seen,
273 },
274 "warming_buffer": {
275 "enqueued_total": r.warming_buffer.enqueued_total,
276 "drained_observe_total": r.warming_buffer.drained_observe_total,
277 "drained_promote_total": r.warming_buffer.drained_promote_total,
278 "evicted_total": r.warming_buffer.evicted_total,
279 "rejected_after_seal": r.warming_buffer.rejected_after_seal,
280 "distinct_keys_seen": r.warming_buffer.distinct_keys_seen,
281 "current_entries": r.warming_buffer.current_entries,
282 "max_concurrent_entries": r.warming_buffer.max_concurrent_entries,
283 "dep_enqueued_total": r.warming_buffer.dep_enqueued_total,
284 "dep_drained_observe_total": r.warming_buffer.dep_drained_observe_total,
285 },
286 "failure_reason": r.failure_reason,
287 })
288}
289
290fn leak_result_as_json(r: &bobbin_sim::LeakRunResult) -> serde_json::Value {
291 serde_json::json!({
292 "workload": r.workload,
293 "seed": r.config.seed,
294 "parallelism": r.config.parallelism.get(),
295 "leak_outcome": leak_outcome_as_json(&r.outcome),
296 "first": report_as_json(&r.first_report),
297 "second": report_as_json(&r.second_report),
298 })
299}
300
301fn leak_outcome_as_json(outcome: &LeakOutcome) -> serde_json::Value {
302 match outcome {
303 LeakOutcome::Match => serde_json::json!({ "kind": "match" }),
304 LeakOutcome::OutcomeMismatch { first, second } => serde_json::json!({
305 "kind": "outcome_mismatch",
306 "first": format!("{first:?}"),
307 "second": format!("{second:?}"),
308 }),
309 LeakOutcome::EventCountMismatch { first, second } => serde_json::json!({
310 "kind": "event_count_mismatch",
311 "first": first,
312 "second": second,
313 }),
314 LeakOutcome::EdgeCountMismatch { first, second } => serde_json::json!({
315 "kind": "edge_count_mismatch",
316 "first": first,
317 "second": second,
318 }),
319 LeakOutcome::LastCursorMismatch { first, second } => serde_json::json!({
320 "kind": "last_cursor_mismatch",
321 "first": first,
322 "second": second,
323 }),
324 LeakOutcome::ResolverHitsMismatch { first, second } => serde_json::json!({
325 "kind": "resolver_hits_mismatch",
326 "first": first,
327 "second": second,
328 }),
329 LeakOutcome::ResolverMissesMismatch { first, second } => serde_json::json!({
330 "kind": "resolver_misses_mismatch",
331 "first": first,
332 "second": second,
333 }),
334 LeakOutcome::ConsumerTooSlowMismatch { first, second } => serde_json::json!({
335 "kind": "consumer_too_slow_mismatch",
336 "first": first,
337 "second": second,
338 }),
339 LeakOutcome::DisconnectCountMismatch { first, second } => serde_json::json!({
340 "kind": "disconnect_count_mismatch",
341 "first": first,
342 "second": second,
343 }),
344 LeakOutcome::LastDisconnectMismatch { first, second } => serde_json::json!({
345 "kind": "last_disconnect_mismatch",
346 "first": first.as_ref().map(|d| format!("{:?}: {}", d.kind, d.message)),
347 "second": second.as_ref().map(|d| format!("{:?}: {}", d.kind, d.message)),
348 }),
349 LeakOutcome::WarmingShadowMismatch { first, second } => serde_json::json!({
350 "kind": "warming_shadow_mismatch",
351 "first": format!("{first:?}"),
352 "second": format!("{second:?}"),
353 }),
354 LeakOutcome::WarmingBufferMismatch { first, second } => serde_json::json!({
355 "kind": "warming_buffer_mismatch",
356 "first": format!("{first:?}"),
357 "second": format!("{second:?}"),
358 }),
359 LeakOutcome::TraceLengthMismatch { first, second } => serde_json::json!({
360 "kind": "trace_length_mismatch",
361 "first": first,
362 "second": second,
363 }),
364 LeakOutcome::TraceLineMismatch {
365 index,
366 first,
367 second,
368 } => serde_json::json!({
369 "kind": "trace_line_mismatch",
370 "index": index,
371 "first": first,
372 "second": second,
373 }),
374 }
375}
376
377fn exit_code_from(outcome: SimOutcome) -> ExitCode {
378 match outcome {
379 SimOutcome::Passed => ExitCode::from(0),
380 SimOutcome::Failed | SimOutcome::TimedOut => ExitCode::from(1),
381 }
382}