Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1use std::num::NonZeroUsize; 2use std::path::PathBuf; 3use std::process::ExitCode; 4use std::time::Duration; 5 6use bobbin_sim::workloads::{ 7 CancelMidHydration, CancelMidHydrationConfig, ColdStartUnderLiveLoad, 8 ColdStartUnderLiveLoadConfig, ConcurrentReadsDuringReplay, ConcurrentReadsDuringReplayConfig, 9 FrameBurst, HydrantDisconnectBarrage, HydrantDisconnectBarrageConfig, SlingshotFlap, 10 SlingshotFlapConfig, 11}; 12use bobbin_sim::{ 13 LeakOutcome, LeakRunConfig, Sim, SimConfig, SimOutcome, SimReport, Workload, run_leak_check, 14}; 15use clap::{Parser, ValueEnum}; 16 17#[derive(Parser, Debug)] 18#[command(name = "bobbin-sim", version)] 19struct Cli { 20 #[arg(long, default_value_t = 0)] 21 seed: u64, 22 #[arg(long, value_enum, default_value_t = WorkloadName::FrameBurst)] 23 workload: WorkloadName, 24 #[arg(long, default_value_t = 64)] 25 frames: usize, 26 #[arg(long, default_value_t = 60)] 27 max_virtual_seconds: u64, 28 #[arg(long)] 29 no_brownout: bool, 30 #[arg(long, default_value_t = 200)] 31 cross_did_stars: usize, 32 #[arg(long, default_value_t = 1_000)] 33 brownout_start_ms: u64, 34 #[arg(long, default_value_t = 5_000)] 35 brownout_duration_ms: u64, 36 #[arg(long, default_value_t = 200)] 37 brownout_latency_ms: u64, 38 #[arg(long, default_value_t = 2)] 39 normal_latency_ms: u64, 40 #[arg(long, default_value_t = 16)] 41 parallelism: usize, 42 #[arg(long, default_value_t = 4096)] 43 mem_ws_capacity: usize, 44 #[arg(long, default_value_t = 30_000)] 45 hydrant_send_timeout_ms: u64, 46 #[arg(long, default_value_t = 0)] 47 hydrant_frame_pace_us: u64, 48 #[arg(long, default_value_t = 1)] 49 seeds: u64, 50 #[arg(long)] 51 quarantine_out: Option<PathBuf>, 52 #[arg(long)] 53 leak_check: bool, 54} 55 56#[derive(Clone, Copy, Debug, ValueEnum, Eq, PartialEq)] 57enum WorkloadName { 58 FrameBurst, 59 SlingshotFlap, 60 CancelMidHydration, 61 HydrantDisconnectBarrage, 62 ColdStartUnderLiveLoad, 63 ConcurrentReadsDuringReplay, 64} 65 66fn main() -> ExitCode { 67 let cli = Cli::parse(); 68 tracing_subscriber::fmt() 69 .with_env_filter( 70 tracing_subscriber::EnvFilter::try_from_default_env() 71 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn,bobbin_sim=info")), 72 ) 73 .with_writer(std::io::stderr) 74 .init(); 75 76 if cli.seeds <= 1 && !cli.leak_check { 77 let report = run_single(&cli, cli.seed); 78 println!( 79 "{}", 80 serde_json::to_string_pretty(&report_as_json(&report)).unwrap() 81 ); 82 exit_code_from(report.outcome) 83 } else if cli.leak_check && cli.seeds <= 1 { 84 let result = run_leak(&cli, cli.seed); 85 let json = leak_result_as_json(&result); 86 println!("{}", serde_json::to_string_pretty(&json).unwrap()); 87 if matches!(result.outcome, LeakOutcome::Match) 88 && matches!(result.first_report.outcome, SimOutcome::Passed) 89 { 90 ExitCode::from(0) 91 } else { 92 ExitCode::from(1) 93 } 94 } else { 95 run_sweep(&cli) 96 } 97} 98 99fn run_sweep(cli: &Cli) -> ExitCode { 100 let mut quarantined: Vec<serde_json::Value> = Vec::new(); 101 let mut passed = 0u64; 102 let total = cli.seeds; 103 for offset in 0..total { 104 let seed = cli.seed.wrapping_add(offset); 105 let outcome_record = if cli.leak_check { 106 let result = run_leak(cli, seed); 107 let leak_match = matches!(result.outcome, LeakOutcome::Match); 108 let workload_passed = matches!(result.first_report.outcome, SimOutcome::Passed); 109 let ok = leak_match && workload_passed; 110 if ok { 111 passed += 1; 112 None 113 } else { 114 Some(leak_result_as_json(&result)) 115 } 116 } else { 117 let report = run_single(cli, seed); 118 if matches!(report.outcome, SimOutcome::Passed) { 119 passed += 1; 120 None 121 } else { 122 Some(report_as_json(&report)) 123 } 124 }; 125 if let Some(rec) = outcome_record { 126 quarantined.push(rec); 127 } 128 } 129 let summary = serde_json::json!({ 130 "workload": format!("{:?}", cli.workload), 131 "seeds_run": total, 132 "passed": passed, 133 "failed": total - passed, 134 "leak_check": cli.leak_check, 135 "parallelism": cli.parallelism, 136 "quarantine_count": quarantined.len(), 137 }); 138 println!("{}", serde_json::to_string_pretty(&summary).unwrap()); 139 if let Some(path) = &cli.quarantine_out { 140 let payload = serde_json::json!({ 141 "summary": summary, 142 "failures": quarantined, 143 }); 144 std::fs::write(path, serde_json::to_vec_pretty(&payload).unwrap()) 145 .expect("write quarantine output"); 146 eprintln!("quarantine written to {}", path.display()); 147 } 148 if quarantined.is_empty() { 149 ExitCode::from(0) 150 } else { 151 ExitCode::from(1) 152 } 153} 154 155fn run_single(cli: &Cli, seed: u64) -> SimReport { 156 let workload = build_workload(cli); 157 let mut config = SimConfig::new(seed); 158 config.max_virtual_runtime = Duration::from_secs(cli.max_virtual_seconds); 159 config.parallelism = NonZeroUsize::new(cli.parallelism).expect("parallelism > 0"); 160 config.mem_ws_capacity = cli.mem_ws_capacity; 161 let runtime = tokio::runtime::Builder::new_current_thread() 162 .enable_all() 163 .start_paused(true) 164 .build() 165 .expect("build current_thread runtime with paused time"); 166 runtime.block_on(Sim::new(config, workload).run()) 167} 168 169fn run_leak(cli: &Cli, seed: u64) -> bobbin_sim::LeakRunResult { 170 let leak_config = LeakRunConfig { 171 seed, 172 parallelism: NonZeroUsize::new(cli.parallelism).expect("parallelism > 0"), 173 max_virtual_runtime: Duration::from_secs(cli.max_virtual_seconds), 174 mem_ws_capacity: cli.mem_ws_capacity, 175 warming_buffer_enabled: true, 176 }; 177 let cli_snapshot = CliSnapshot::from(cli); 178 let factory = move || -> Box<dyn Workload> { build_workload_from(&cli_snapshot) }; 179 run_leak_check(leak_config, factory) 180} 181 182#[derive(Clone)] 183struct CliSnapshot { 184 workload: WorkloadName, 185 frames: usize, 186 cross_did_stars: usize, 187 normal_latency_ms: u64, 188 brownout_start_ms: u64, 189 brownout_duration_ms: u64, 190 brownout_latency_ms: u64, 191 brownout_enabled: bool, 192 hydrant_send_timeout_ms: u64, 193 hydrant_frame_pace_us: u64, 194} 195 196impl From<&Cli> for CliSnapshot { 197 fn from(cli: &Cli) -> Self { 198 Self { 199 workload: cli.workload, 200 frames: cli.frames, 201 cross_did_stars: cli.cross_did_stars, 202 normal_latency_ms: cli.normal_latency_ms, 203 brownout_start_ms: cli.brownout_start_ms, 204 brownout_duration_ms: cli.brownout_duration_ms, 205 brownout_latency_ms: cli.brownout_latency_ms, 206 brownout_enabled: !cli.no_brownout, 207 hydrant_send_timeout_ms: cli.hydrant_send_timeout_ms, 208 hydrant_frame_pace_us: cli.hydrant_frame_pace_us, 209 } 210 } 211} 212 213fn build_workload(cli: &Cli) -> Box<dyn Workload> { 214 build_workload_from(&CliSnapshot::from(cli)) 215} 216 217fn build_workload_from(snap: &CliSnapshot) -> Box<dyn Workload> { 218 match snap.workload { 219 WorkloadName::FrameBurst => Box::new(FrameBurst::new(snap.frames)), 220 WorkloadName::SlingshotFlap => Box::new(SlingshotFlap::new(SlingshotFlapConfig { 221 cross_did_stars: snap.cross_did_stars, 222 normal_latency_ms: snap.normal_latency_ms, 223 brownout_start_ms: snap.brownout_start_ms, 224 brownout_duration_ms: snap.brownout_duration_ms, 225 brownout_latency_ms: snap.brownout_latency_ms, 226 brownout_enabled: snap.brownout_enabled, 227 hydrant_send_timeout_ms: snap.hydrant_send_timeout_ms, 228 hydrant_frame_pace_us: snap.hydrant_frame_pace_us, 229 omit_target_repos: false, 230 emit_live_promotion_frame: false, 231 })), 232 WorkloadName::CancelMidHydration => { 233 Box::new(CancelMidHydration::new(CancelMidHydrationConfig::default())) 234 } 235 WorkloadName::HydrantDisconnectBarrage => Box::new(HydrantDisconnectBarrage::new( 236 HydrantDisconnectBarrageConfig::default(), 237 )), 238 WorkloadName::ColdStartUnderLiveLoad => Box::new(ColdStartUnderLiveLoad::new( 239 ColdStartUnderLiveLoadConfig::default(), 240 )), 241 WorkloadName::ConcurrentReadsDuringReplay => Box::new(ConcurrentReadsDuringReplay::new( 242 ConcurrentReadsDuringReplayConfig::default(), 243 )), 244 } 245} 246 247fn report_as_json(r: &SimReport) -> serde_json::Value { 248 serde_json::json!({ 249 "workload": r.workload, 250 "seed": r.seed, 251 "outcome": format!("{:?}", r.outcome), 252 "virtual_runtime_micros": r.virtual_runtime.as_micros() as u64, 253 "virtual_clock_end_unix_micros": r.virtual_clock_end.raw(), 254 "events_processed": r.events_processed, 255 "last_cursor": r.last_cursor, 256 "edge_count": r.edge_count, 257 "resolver_hits": r.resolver_hits, 258 "resolver_misses": r.resolver_misses, 259 "consumer_too_slow_count": r.consumer_too_slow_count, 260 "disconnect_count": r.disconnect_count, 261 "last_disconnect": r.last_disconnect.as_ref().map(|d| serde_json::json!({ 262 "kind": format!("{:?}", d.kind), 263 "message": d.message, 264 "at_unix_micros": d.at_unix_micros.raw(), 265 "last_cursor": d.last_cursor.raw(), 266 })), 267 "warming_shadow": { 268 "enqueued_total": r.warming_shadow.enqueued_total, 269 "drained_via_observe_total": r.warming_shadow.drained_via_observe_total, 270 "max_concurrent": r.warming_shadow.max_concurrent, 271 "residual": r.warming_shadow.residual, 272 "distinct_keys_seen": r.warming_shadow.distinct_keys_seen, 273 }, 274 "warming_buffer": { 275 "enqueued_total": r.warming_buffer.enqueued_total, 276 "drained_observe_total": r.warming_buffer.drained_observe_total, 277 "drained_promote_total": r.warming_buffer.drained_promote_total, 278 "evicted_total": r.warming_buffer.evicted_total, 279 "rejected_after_seal": r.warming_buffer.rejected_after_seal, 280 "distinct_keys_seen": r.warming_buffer.distinct_keys_seen, 281 "current_entries": r.warming_buffer.current_entries, 282 "max_concurrent_entries": r.warming_buffer.max_concurrent_entries, 283 "dep_enqueued_total": r.warming_buffer.dep_enqueued_total, 284 "dep_drained_observe_total": r.warming_buffer.dep_drained_observe_total, 285 }, 286 "failure_reason": r.failure_reason, 287 }) 288} 289 290fn leak_result_as_json(r: &bobbin_sim::LeakRunResult) -> serde_json::Value { 291 serde_json::json!({ 292 "workload": r.workload, 293 "seed": r.config.seed, 294 "parallelism": r.config.parallelism.get(), 295 "leak_outcome": leak_outcome_as_json(&r.outcome), 296 "first": report_as_json(&r.first_report), 297 "second": report_as_json(&r.second_report), 298 }) 299} 300 301fn leak_outcome_as_json(outcome: &LeakOutcome) -> serde_json::Value { 302 match outcome { 303 LeakOutcome::Match => serde_json::json!({ "kind": "match" }), 304 LeakOutcome::OutcomeMismatch { first, second } => serde_json::json!({ 305 "kind": "outcome_mismatch", 306 "first": format!("{first:?}"), 307 "second": format!("{second:?}"), 308 }), 309 LeakOutcome::EventCountMismatch { first, second } => serde_json::json!({ 310 "kind": "event_count_mismatch", 311 "first": first, 312 "second": second, 313 }), 314 LeakOutcome::EdgeCountMismatch { first, second } => serde_json::json!({ 315 "kind": "edge_count_mismatch", 316 "first": first, 317 "second": second, 318 }), 319 LeakOutcome::LastCursorMismatch { first, second } => serde_json::json!({ 320 "kind": "last_cursor_mismatch", 321 "first": first, 322 "second": second, 323 }), 324 LeakOutcome::ResolverHitsMismatch { first, second } => serde_json::json!({ 325 "kind": "resolver_hits_mismatch", 326 "first": first, 327 "second": second, 328 }), 329 LeakOutcome::ResolverMissesMismatch { first, second } => serde_json::json!({ 330 "kind": "resolver_misses_mismatch", 331 "first": first, 332 "second": second, 333 }), 334 LeakOutcome::ConsumerTooSlowMismatch { first, second } => serde_json::json!({ 335 "kind": "consumer_too_slow_mismatch", 336 "first": first, 337 "second": second, 338 }), 339 LeakOutcome::DisconnectCountMismatch { first, second } => serde_json::json!({ 340 "kind": "disconnect_count_mismatch", 341 "first": first, 342 "second": second, 343 }), 344 LeakOutcome::LastDisconnectMismatch { first, second } => serde_json::json!({ 345 "kind": "last_disconnect_mismatch", 346 "first": first.as_ref().map(|d| format!("{:?}: {}", d.kind, d.message)), 347 "second": second.as_ref().map(|d| format!("{:?}: {}", d.kind, d.message)), 348 }), 349 LeakOutcome::WarmingShadowMismatch { first, second } => serde_json::json!({ 350 "kind": "warming_shadow_mismatch", 351 "first": format!("{first:?}"), 352 "second": format!("{second:?}"), 353 }), 354 LeakOutcome::WarmingBufferMismatch { first, second } => serde_json::json!({ 355 "kind": "warming_buffer_mismatch", 356 "first": format!("{first:?}"), 357 "second": format!("{second:?}"), 358 }), 359 LeakOutcome::TraceLengthMismatch { first, second } => serde_json::json!({ 360 "kind": "trace_length_mismatch", 361 "first": first, 362 "second": second, 363 }), 364 LeakOutcome::TraceLineMismatch { 365 index, 366 first, 367 second, 368 } => serde_json::json!({ 369 "kind": "trace_line_mismatch", 370 "index": index, 371 "first": first, 372 "second": second, 373 }), 374 } 375} 376 377fn exit_code_from(outcome: SimOutcome) -> ExitCode { 378 match outcome { 379 SimOutcome::Passed => ExitCode::from(0), 380 SimOutcome::Failed | SimOutcome::TimedOut => ExitCode::from(1), 381 } 382}