Monorepo for Tangled
tangled.org
1use std::num::NonZeroUsize;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Duration;
5
6use bobbin_edge_index::{CoverageWatch, EdgeStore};
7use bobbin_ingest::{
8 DEFAULT_INGEST_PARALLELISM, DisconnectSink, IngestConfig, IngestRuntime, RepoIdResolver,
9 WarmingBuffer, WarmingShadowBuffer, run as run_ingest,
10};
11use bobbin_record_lru::NoopRecordStore;
12use bobbin_runtime::{
13 Clock, DEFAULT_MEM_WS_CAPACITY, MemHttpTransport, MemWsTransport, RuntimeHasher, SeededEntropy,
14 SimClock, UnixMicros,
15};
16use bobbin_slingshot_client::SlingshotClient;
17use bobbin_types::search::NoopSearchSink;
18use tokio_util::sync::CancellationToken;
19use url::Url;
20
21use crate::report::{SimOutcome, SimReport};
22use crate::workload::{Workload, WorkloadCtx};
23
24#[derive(Clone, Debug)]
25pub struct SimConfig {
26 pub seed: u64,
27 pub base_unix: UnixMicros,
28 pub max_virtual_runtime: Duration,
29 pub parallelism: NonZeroUsize,
30 pub hydrant_base: Url,
31 pub slingshot_base: Url,
32 pub mem_ws_capacity: usize,
33 pub warming_buffer_enabled: bool,
34}
35
36impl SimConfig {
37 pub fn new(seed: u64) -> Self {
38 Self {
39 seed,
40 base_unix: UnixMicros::new(1_700_000_000_000_000),
41 max_virtual_runtime: Duration::from_secs(60),
42 parallelism: DEFAULT_INGEST_PARALLELISM,
43 hydrant_base: Url::parse("ws://hydrant.sim/").unwrap(),
44 slingshot_base: Url::parse("http://slingshot.sim/").unwrap(),
45 mem_ws_capacity: DEFAULT_MEM_WS_CAPACITY,
46 warming_buffer_enabled: true,
47 }
48 }
49}
50
51pub struct Sim {
52 config: SimConfig,
53 workload: Box<dyn Workload>,
54}
55
56impl Sim {
57 pub fn new(config: SimConfig, workload: Box<dyn Workload>) -> Self {
58 Self { config, workload }
59 }
60
61 pub async fn run(self) -> SimReport {
62 let SimConfig {
63 seed,
64 base_unix,
65 max_virtual_runtime,
66 parallelism,
67 hydrant_base,
68 slingshot_base,
69 mem_ws_capacity,
70 warming_buffer_enabled,
71 } = self.config;
72
73 let entropy = Arc::new(SeededEntropy::new(seed));
74 let hasher = RuntimeHasher::from_entropy(&*entropy);
75 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(base_unix));
76
77 let store = Arc::new(EdgeStore::new(hasher.clone()));
78 let coverage = Arc::new(CoverageWatch::new());
79 let records = Arc::new(NoopRecordStore);
80 let cancel = CancellationToken::new();
81 let consumer_too_slow_count = Arc::new(AtomicU64::new(0));
82 let disconnects = Arc::new(DisconnectSink::new());
83 let warming_shadow = Arc::new(WarmingShadowBuffer::new(hasher.clone()));
84 let warming_buffer = Arc::new(WarmingBuffer::new(hasher.clone()));
85
86 let workload_name = self.workload.name();
87 let ctx = WorkloadCtx {
88 seed,
89 clock: clock.clone(),
90 entropy: entropy.clone(),
91 store: store.clone(),
92 coverage: coverage.clone(),
93 records: records.clone(),
94 cancel: cancel.clone(),
95 consumer_too_slow_count: consumer_too_slow_count.clone(),
96 };
97 let hooks = self.workload.build(ctx);
98
99 let slingshot_http = MemHttpTransport::shared(hooks.slingshot.clone(), clock.clone());
100 let slingshot_client = SlingshotClient::new(slingshot_base, slingshot_http)
101 .expect("slingshot base url is valid");
102 let resolver = Arc::new(RepoIdResolver::with_slingshot(
103 slingshot_client,
104 clock.clone(),
105 hasher.clone(),
106 ));
107
108 let mem_ws = MemWsTransport::shared_with_capacity(hooks.hydrant.clone(), mem_ws_capacity);
109
110 let ingest_runtime: IngestRuntime<NoopSearchSink> = IngestRuntime {
111 store: store.clone(),
112 issue_states: Arc::new(bobbin_edge_index::StateIndex::new(hasher.clone())),
113 pull_statuses: Arc::new(bobbin_edge_index::StateIndex::new(hasher.clone())),
114 coverage: coverage.clone(),
115 search: Arc::new(NoopSearchSink),
116 records: records.clone() as Arc<dyn bobbin_record_lru::RecordStore>,
117 resolver: resolver.clone(),
118 clock: clock.clone(),
119 entropy: entropy.clone(),
120 ws: mem_ws,
121 cancel: cancel.clone(),
122 disconnects: Some(disconnects.clone()),
123 warming_shadow: Some(warming_shadow.clone()),
124 warming_buffer: warming_buffer_enabled.then(|| warming_buffer.clone()),
125 knot_registry: None,
126 knot_gate: None,
127 };
128 let ingest_config = IngestConfig {
129 hydrant_base,
130 start_cursor: bobbin_edge_index::HydrantCursor::new(0),
131 parallelism,
132 };
133 let mut ingest_handle = tokio::spawn(async move {
134 let _ = run_ingest(ingest_config, ingest_runtime).await;
135 });
136
137 let script = hooks.script;
138 let initial_report = tokio::select! {
139 biased;
140 _ = clock.sleep(max_virtual_runtime) => SimReport {
141 workload: workload_name,
142 seed,
143 outcome: SimOutcome::TimedOut,
144 virtual_runtime: max_virtual_runtime,
145 virtual_clock_end: clock.now_unix_micros(),
146 events_processed: coverage.snapshot().events_processed(),
147 last_cursor: coverage.snapshot().last_cursor().raw(),
148 edge_count: store.key_count() as u64,
149 resolver_hits: 0,
150 resolver_misses: 0,
151 consumer_too_slow_count: consumer_too_slow_count.load(Ordering::Relaxed),
152 disconnect_count: disconnects.count(),
153 last_disconnect: disconnects.snapshot(),
154 warming_shadow: warming_shadow.snapshot(),
155 warming_buffer: warming_buffer.snapshot(),
156 failure_reason: Some(format!(
157 "max_virtual_runtime {max_virtual_runtime:?} exhausted",
158 )),
159 },
160 r = script => r,
161 };
162
163 cancel.cancel();
164 let drain_deadline = clock.sleep(Duration::from_secs(30));
165 tokio::pin!(drain_deadline);
166 tokio::select! {
167 _ = &mut drain_deadline => {
168 ingest_handle.abort();
169 let _ = ingest_handle.await;
170 }
171 res = &mut ingest_handle => {
172 let _ = res;
173 }
174 }
175
176 let stats = resolver.stats();
177 SimReport {
178 edge_count: store.key_count() as u64,
179 events_processed: coverage.snapshot().events_processed(),
180 last_cursor: coverage.snapshot().last_cursor().raw(),
181 resolver_hits: stats.hits,
182 resolver_misses: stats.miss_count(),
183 consumer_too_slow_count: consumer_too_slow_count.load(Ordering::Relaxed),
184 disconnect_count: disconnects.count(),
185 last_disconnect: disconnects.snapshot(),
186 warming_shadow: warming_shadow.snapshot(),
187 warming_buffer: warming_buffer.snapshot(),
188 ..initial_report
189 }
190 }
191}