This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 8.6 kB View raw
1use std::num::NonZeroUsize; 2use std::time::Duration; 3 4use bobbin_sim::workloads::{SlingshotFlap, SlingshotFlapConfig}; 5use bobbin_sim::{Sim, SimConfig, SimOutcome}; 6use tokio::runtime::Builder as TokioBuilder; 7 8#[test] 9fn lever_b_absorbs_sustained_brownout_without_disconnect() { 10 let cfg = SlingshotFlapConfig { 11 cross_did_stars: 5000, 12 normal_latency_ms: 2, 13 brownout_start_ms: 0, 14 brownout_duration_ms: 60_000, 15 brownout_latency_ms: 200, 16 brownout_enabled: true, 17 hydrant_send_timeout_ms: 30_000, 18 hydrant_frame_pace_us: 1_000, 19 omit_target_repos: false, 20 emit_live_promotion_frame: false, 21 }; 22 let mut sim_config = SimConfig::new(7); 23 sim_config.parallelism = NonZeroUsize::new(4).unwrap(); 24 sim_config.max_virtual_runtime = Duration::from_secs(600); 25 26 let workload = Box::new(SlingshotFlap::new(cfg.clone())); 27 let runtime = TokioBuilder::new_current_thread() 28 .enable_all() 29 .start_paused(true) 30 .build() 31 .expect("build current_thread runtime"); 32 let report = runtime.block_on(Sim::new(sim_config, workload).run()); 33 34 let total_frames = (cfg.cross_did_stars as u64) * 2; 35 assert_eq!( 36 report.outcome, 37 SimOutcome::Passed, 38 "Lever B should keep slingshot off the hot path during cold replay, got {:?} reason={:?}", 39 report.outcome, 40 report.failure_reason, 41 ); 42 assert_eq!(report.events_processed, total_frames); 43 assert_eq!( 44 report.disconnect_count, 0, 45 "expected zero disconnects under sustained brownout once Lever B parks the cohort, got {} (last={:?})", 46 report.disconnect_count, report.last_disconnect, 47 ); 48 49 let b = report.warming_buffer; 50 assert_eq!( 51 b.enqueued_total, cfg.cross_did_stars as u64, 52 "every cross-DID star must park exactly once: {b:?}", 53 ); 54 assert_eq!( 55 b.drained_observe_total, cfg.cross_did_stars as u64, 56 "every observed repo must drain its parked star: {b:?}", 57 ); 58 assert_eq!(b.drained_promote_total, 0, "no residual at promote: {b:?}"); 59 assert_eq!(b.current_entries, 0, "buffer empty after drain: {b:?}"); 60 assert_eq!(b.evicted_total, 0, "no evictions in this workload: {b:?}"); 61 assert!( 62 b.max_concurrent_entries > 0 && b.max_concurrent_entries <= cfg.cross_did_stars as u64, 63 "peak depth bounded by cohort size, got {} for {} stars", 64 b.max_concurrent_entries, 65 cfg.cross_did_stars, 66 ); 67} 68 69#[test] 70fn shadow_and_buffer_observe_the_same_population() { 71 let cfg = SlingshotFlapConfig { 72 cross_did_stars: 2000, 73 normal_latency_ms: 2, 74 brownout_start_ms: 0, 75 brownout_duration_ms: 0, 76 brownout_latency_ms: 0, 77 brownout_enabled: false, 78 hydrant_send_timeout_ms: 30_000, 79 hydrant_frame_pace_us: 1_000, 80 omit_target_repos: false, 81 emit_live_promotion_frame: false, 82 }; 83 let mut sim_config = SimConfig::new(7); 84 sim_config.parallelism = NonZeroUsize::new(16).unwrap(); 85 sim_config.max_virtual_runtime = Duration::from_secs(120); 86 87 let workload = Box::new(SlingshotFlap::new(cfg.clone())); 88 let runtime = TokioBuilder::new_current_thread() 89 .enable_all() 90 .start_paused(true) 91 .build() 92 .expect("build current_thread runtime"); 93 let report = runtime.block_on(Sim::new(sim_config, workload).run()); 94 95 assert_eq!( 96 report.outcome, 97 SimOutcome::Passed, 98 "{:?}", 99 report.failure_reason 100 ); 101 102 let s = report.warming_shadow; 103 let b = report.warming_buffer; 104 assert_eq!( 105 s.enqueued_total, b.dep_enqueued_total, 106 "shadow note_unresolved per dep must equal buffer dep_enqueued_total: shadow={s:?} buffer={b:?}", 107 ); 108 assert_eq!( 109 s.drained_via_observe_total, b.dep_drained_observe_total, 110 "shadow note_observed per dep must equal buffer dep_drained_observe_total: shadow={s:?} buffer={b:?}", 111 ); 112 assert_eq!( 113 s.distinct_keys_seen, b.distinct_keys_seen, 114 "shadow and buffer must see the same distinct (owner, rkey) keys: shadow={s:?} buffer={b:?}", 115 ); 116 assert_eq!( 117 s.residual, 118 b.dep_enqueued_total - b.dep_drained_observe_total, 119 "shadow residual must equal buffer un-observed deps: shadow={s:?} buffer={b:?}", 120 ); 121} 122 123#[test] 124fn warming_to_ready_promote_drains_residual_via_parallel_slingshot_wave() { 125 let cfg = SlingshotFlapConfig { 126 cross_did_stars: 200, 127 normal_latency_ms: 2, 128 brownout_start_ms: 0, 129 brownout_duration_ms: 0, 130 brownout_latency_ms: 0, 131 brownout_enabled: false, 132 hydrant_send_timeout_ms: 30_000, 133 hydrant_frame_pace_us: 1_000, 134 omit_target_repos: true, 135 emit_live_promotion_frame: true, 136 }; 137 let mut sim_config = SimConfig::new(7); 138 sim_config.parallelism = NonZeroUsize::new(16).unwrap(); 139 sim_config.max_virtual_runtime = Duration::from_secs(120); 140 141 let workload = Box::new(SlingshotFlap::new(cfg.clone())); 142 let runtime = TokioBuilder::new_current_thread() 143 .enable_all() 144 .start_paused(true) 145 .build() 146 .expect("build current_thread runtime"); 147 let report = runtime.block_on(Sim::new(sim_config, workload).run()); 148 149 assert_eq!( 150 report.outcome, 151 SimOutcome::Passed, 152 "promote-flush workload must complete, got {:?} reason={:?}", 153 report.outcome, 154 report.failure_reason, 155 ); 156 157 let b = report.warming_buffer; 158 let stars = cfg.cross_did_stars as u64; 159 assert_eq!( 160 b.enqueued_total, stars, 161 "every cross-DID star must park (no repos arrive): {b:?}", 162 ); 163 assert_eq!( 164 b.drained_observe_total, 0, 165 "no repos arrive on the stream so observe-drain stays at zero: {b:?}", 166 ); 167 assert_eq!( 168 b.drained_promote_total, stars, 169 "Warming->Ready promote must drain every parked entry: {b:?}", 170 ); 171 assert_eq!( 172 b.current_entries, 0, 173 "buffer must be empty after promote drain: {b:?}", 174 ); 175 assert!( 176 report.resolver_hits + report.resolver_misses >= stars, 177 "promote wave must fan out one slingshot resolve per distinct dep, got hits={} misses={} for {stars} stars", 178 report.resolver_hits, 179 report.resolver_misses, 180 ); 181 assert_eq!( 182 report.edge_count, 183 stars * 2 + 1, 184 "each drained star contributes a primary edge plus a sh.tangled.feed.star.by mirror edge; promoter repo adds one. got {} for {stars} stars", 185 report.edge_count, 186 ); 187} 188 189#[test] 190fn buffer_enabled_and_disabled_produce_identical_edge_index() { 191 let cfg = SlingshotFlapConfig { 192 cross_did_stars: 200, 193 normal_latency_ms: 2, 194 brownout_start_ms: 0, 195 brownout_duration_ms: 0, 196 brownout_latency_ms: 0, 197 brownout_enabled: false, 198 hydrant_send_timeout_ms: 30_000, 199 hydrant_frame_pace_us: 1_000, 200 omit_target_repos: false, 201 emit_live_promotion_frame: false, 202 }; 203 204 let run_with_buffer = |enabled: bool| { 205 let mut sim_config = SimConfig::new(7); 206 sim_config.parallelism = NonZeroUsize::new(16).unwrap(); 207 sim_config.max_virtual_runtime = Duration::from_secs(120); 208 sim_config.warming_buffer_enabled = enabled; 209 let workload = Box::new(SlingshotFlap::new(cfg.clone())); 210 let runtime = TokioBuilder::new_current_thread() 211 .enable_all() 212 .start_paused(true) 213 .build() 214 .expect("build current_thread runtime"); 215 runtime.block_on(Sim::new(sim_config, workload).run()) 216 }; 217 218 let with_buffer = run_with_buffer(true); 219 let without_buffer = run_with_buffer(false); 220 221 assert_eq!( 222 with_buffer.outcome, 223 SimOutcome::Passed, 224 "{:?}", 225 with_buffer.failure_reason 226 ); 227 assert_eq!( 228 without_buffer.outcome, 229 SimOutcome::Passed, 230 "{:?}", 231 without_buffer.failure_reason 232 ); 233 assert_eq!( 234 with_buffer.events_processed, without_buffer.events_processed, 235 "events_processed must match across buffer modes", 236 ); 237 assert_eq!( 238 with_buffer.edge_count, without_buffer.edge_count, 239 "edge_count must match: lever B is correctness-preserving (with={} without={})", 240 with_buffer.edge_count, without_buffer.edge_count, 241 ); 242 assert_eq!( 243 with_buffer.last_cursor, without_buffer.last_cursor, 244 "final cursor must match across buffer modes", 245 ); 246}