This repository has no description
1use std::num::NonZeroUsize;
2use std::time::Duration;
3
4use bobbin_sim::workloads::{SlingshotFlap, SlingshotFlapConfig};
5use bobbin_sim::{Sim, SimConfig, SimOutcome};
6use tokio::runtime::Builder as TokioBuilder;
7
8#[test]
9fn lever_b_absorbs_sustained_brownout_without_disconnect() {
10 let cfg = SlingshotFlapConfig {
11 cross_did_stars: 5000,
12 normal_latency_ms: 2,
13 brownout_start_ms: 0,
14 brownout_duration_ms: 60_000,
15 brownout_latency_ms: 200,
16 brownout_enabled: true,
17 hydrant_send_timeout_ms: 30_000,
18 hydrant_frame_pace_us: 1_000,
19 omit_target_repos: false,
20 emit_live_promotion_frame: false,
21 };
22 let mut sim_config = SimConfig::new(7);
23 sim_config.parallelism = NonZeroUsize::new(4).unwrap();
24 sim_config.max_virtual_runtime = Duration::from_secs(600);
25
26 let workload = Box::new(SlingshotFlap::new(cfg.clone()));
27 let runtime = TokioBuilder::new_current_thread()
28 .enable_all()
29 .start_paused(true)
30 .build()
31 .expect("build current_thread runtime");
32 let report = runtime.block_on(Sim::new(sim_config, workload).run());
33
34 let total_frames = (cfg.cross_did_stars as u64) * 2;
35 assert_eq!(
36 report.outcome,
37 SimOutcome::Passed,
38 "Lever B should keep slingshot off the hot path during cold replay, got {:?} reason={:?}",
39 report.outcome,
40 report.failure_reason,
41 );
42 assert_eq!(report.events_processed, total_frames);
43 assert_eq!(
44 report.disconnect_count, 0,
45 "expected zero disconnects under sustained brownout once Lever B parks the cohort, got {} (last={:?})",
46 report.disconnect_count, report.last_disconnect,
47 );
48
49 let b = report.warming_buffer;
50 assert_eq!(
51 b.enqueued_total, cfg.cross_did_stars as u64,
52 "every cross-DID star must park exactly once: {b:?}",
53 );
54 assert_eq!(
55 b.drained_observe_total, cfg.cross_did_stars as u64,
56 "every observed repo must drain its parked star: {b:?}",
57 );
58 assert_eq!(b.drained_promote_total, 0, "no residual at promote: {b:?}");
59 assert_eq!(b.current_entries, 0, "buffer empty after drain: {b:?}");
60 assert_eq!(b.evicted_total, 0, "no evictions in this workload: {b:?}");
61 assert!(
62 b.max_concurrent_entries > 0 && b.max_concurrent_entries <= cfg.cross_did_stars as u64,
63 "peak depth bounded by cohort size, got {} for {} stars",
64 b.max_concurrent_entries,
65 cfg.cross_did_stars,
66 );
67}
68
69#[test]
70fn shadow_and_buffer_observe_the_same_population() {
71 let cfg = SlingshotFlapConfig {
72 cross_did_stars: 2000,
73 normal_latency_ms: 2,
74 brownout_start_ms: 0,
75 brownout_duration_ms: 0,
76 brownout_latency_ms: 0,
77 brownout_enabled: false,
78 hydrant_send_timeout_ms: 30_000,
79 hydrant_frame_pace_us: 1_000,
80 omit_target_repos: false,
81 emit_live_promotion_frame: false,
82 };
83 let mut sim_config = SimConfig::new(7);
84 sim_config.parallelism = NonZeroUsize::new(16).unwrap();
85 sim_config.max_virtual_runtime = Duration::from_secs(120);
86
87 let workload = Box::new(SlingshotFlap::new(cfg.clone()));
88 let runtime = TokioBuilder::new_current_thread()
89 .enable_all()
90 .start_paused(true)
91 .build()
92 .expect("build current_thread runtime");
93 let report = runtime.block_on(Sim::new(sim_config, workload).run());
94
95 assert_eq!(
96 report.outcome,
97 SimOutcome::Passed,
98 "{:?}",
99 report.failure_reason
100 );
101
102 let s = report.warming_shadow;
103 let b = report.warming_buffer;
104 assert_eq!(
105 s.enqueued_total, b.dep_enqueued_total,
106 "shadow note_unresolved per dep must equal buffer dep_enqueued_total: shadow={s:?} buffer={b:?}",
107 );
108 assert_eq!(
109 s.drained_via_observe_total, b.dep_drained_observe_total,
110 "shadow note_observed per dep must equal buffer dep_drained_observe_total: shadow={s:?} buffer={b:?}",
111 );
112 assert_eq!(
113 s.distinct_keys_seen, b.distinct_keys_seen,
114 "shadow and buffer must see the same distinct (owner, rkey) keys: shadow={s:?} buffer={b:?}",
115 );
116 assert_eq!(
117 s.residual,
118 b.dep_enqueued_total - b.dep_drained_observe_total,
119 "shadow residual must equal buffer un-observed deps: shadow={s:?} buffer={b:?}",
120 );
121}
122
123#[test]
124fn warming_to_ready_promote_drains_residual_via_parallel_slingshot_wave() {
125 let cfg = SlingshotFlapConfig {
126 cross_did_stars: 200,
127 normal_latency_ms: 2,
128 brownout_start_ms: 0,
129 brownout_duration_ms: 0,
130 brownout_latency_ms: 0,
131 brownout_enabled: false,
132 hydrant_send_timeout_ms: 30_000,
133 hydrant_frame_pace_us: 1_000,
134 omit_target_repos: true,
135 emit_live_promotion_frame: true,
136 };
137 let mut sim_config = SimConfig::new(7);
138 sim_config.parallelism = NonZeroUsize::new(16).unwrap();
139 sim_config.max_virtual_runtime = Duration::from_secs(120);
140
141 let workload = Box::new(SlingshotFlap::new(cfg.clone()));
142 let runtime = TokioBuilder::new_current_thread()
143 .enable_all()
144 .start_paused(true)
145 .build()
146 .expect("build current_thread runtime");
147 let report = runtime.block_on(Sim::new(sim_config, workload).run());
148
149 assert_eq!(
150 report.outcome,
151 SimOutcome::Passed,
152 "promote-flush workload must complete, got {:?} reason={:?}",
153 report.outcome,
154 report.failure_reason,
155 );
156
157 let b = report.warming_buffer;
158 let stars = cfg.cross_did_stars as u64;
159 assert_eq!(
160 b.enqueued_total, stars,
161 "every cross-DID star must park (no repos arrive): {b:?}",
162 );
163 assert_eq!(
164 b.drained_observe_total, 0,
165 "no repos arrive on the stream so observe-drain stays at zero: {b:?}",
166 );
167 assert_eq!(
168 b.drained_promote_total, stars,
169 "Warming->Ready promote must drain every parked entry: {b:?}",
170 );
171 assert_eq!(
172 b.current_entries, 0,
173 "buffer must be empty after promote drain: {b:?}",
174 );
175 assert!(
176 report.resolver_hits + report.resolver_misses >= stars,
177 "promote wave must fan out one slingshot resolve per distinct dep, got hits={} misses={} for {stars} stars",
178 report.resolver_hits,
179 report.resolver_misses,
180 );
181 assert_eq!(
182 report.edge_count,
183 stars * 2 + 1,
184 "each drained star contributes a primary edge plus a sh.tangled.feed.star.by mirror edge; promoter repo adds one. got {} for {stars} stars",
185 report.edge_count,
186 );
187}
188
189#[test]
190fn buffer_enabled_and_disabled_produce_identical_edge_index() {
191 let cfg = SlingshotFlapConfig {
192 cross_did_stars: 200,
193 normal_latency_ms: 2,
194 brownout_start_ms: 0,
195 brownout_duration_ms: 0,
196 brownout_latency_ms: 0,
197 brownout_enabled: false,
198 hydrant_send_timeout_ms: 30_000,
199 hydrant_frame_pace_us: 1_000,
200 omit_target_repos: false,
201 emit_live_promotion_frame: false,
202 };
203
204 let run_with_buffer = |enabled: bool| {
205 let mut sim_config = SimConfig::new(7);
206 sim_config.parallelism = NonZeroUsize::new(16).unwrap();
207 sim_config.max_virtual_runtime = Duration::from_secs(120);
208 sim_config.warming_buffer_enabled = enabled;
209 let workload = Box::new(SlingshotFlap::new(cfg.clone()));
210 let runtime = TokioBuilder::new_current_thread()
211 .enable_all()
212 .start_paused(true)
213 .build()
214 .expect("build current_thread runtime");
215 runtime.block_on(Sim::new(sim_config, workload).run())
216 };
217
218 let with_buffer = run_with_buffer(true);
219 let without_buffer = run_with_buffer(false);
220
221 assert_eq!(
222 with_buffer.outcome,
223 SimOutcome::Passed,
224 "{:?}",
225 with_buffer.failure_reason
226 );
227 assert_eq!(
228 without_buffer.outcome,
229 SimOutcome::Passed,
230 "{:?}",
231 without_buffer.failure_reason
232 );
233 assert_eq!(
234 with_buffer.events_processed, without_buffer.events_processed,
235 "events_processed must match across buffer modes",
236 );
237 assert_eq!(
238 with_buffer.edge_count, without_buffer.edge_count,
239 "edge_count must match: lever B is correctness-preserving (with={} without={})",
240 with_buffer.edge_count, without_buffer.edge_count,
241 );
242 assert_eq!(
243 with_buffer.last_cursor, without_buffer.last_cursor,
244 "final cursor must match across buffer modes",
245 );
246}