Monorepo for Tangled
tangled.org
1use std::num::NonZeroUsize;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Duration;
5
6use bobbin_edge_index::{CoverageWatch, EdgeStore};
7use bobbin_ingest::{
8 DEFAULT_INGEST_PARALLELISM, DisconnectSink, IngestConfig, IngestRuntime, RepoIdResolver,
9 WarmingBuffer, WarmingShadowBuffer, run as run_ingest,
10};
11use bobbin_record_lru::NoopRecordStore;
12use bobbin_runtime::{
13 Clock, DEFAULT_MEM_WS_CAPACITY, MemHttpTransport, MemWsTransport, RuntimeHasher, SeededEntropy,
14 SimClock, UnixMicros,
15};
16use bobbin_slingshot_client::SlingshotClient;
17use bobbin_types::search::NoopSearchSink;
18use tokio_util::sync::CancellationToken;
19use url::Url;
20
21use crate::report::{SimOutcome, SimReport};
22use crate::workload::{Workload, WorkloadCtx};
23
24#[derive(Clone, Debug)]
25pub struct SimConfig {
26 pub seed: u64,
27 pub base_unix: UnixMicros,
28 pub max_virtual_runtime: Duration,
29 pub parallelism: NonZeroUsize,
30 pub hydrant_base: Url,
31 pub slingshot_base: Url,
32 pub mem_ws_capacity: usize,
33 pub warming_buffer_enabled: bool,
34}
35
36impl SimConfig {
37 pub fn new(seed: u64) -> Self {
38 Self {
39 seed,
40 base_unix: UnixMicros::new(1_700_000_000_000_000),
41 max_virtual_runtime: Duration::from_secs(60),
42 parallelism: DEFAULT_INGEST_PARALLELISM,
43 hydrant_base: Url::parse("ws://hydrant.sim/").unwrap(),
44 slingshot_base: Url::parse("http://slingshot.sim/").unwrap(),
45 mem_ws_capacity: DEFAULT_MEM_WS_CAPACITY,
46 warming_buffer_enabled: true,
47 }
48 }
49}
50
51pub struct Sim {
52 config: SimConfig,
53 workload: Box<dyn Workload>,
54}
55
56impl Sim {
57 pub fn new(config: SimConfig, workload: Box<dyn Workload>) -> Self {
58 Self { config, workload }
59 }
60
61 pub async fn run(self) -> SimReport {
62 let SimConfig {
63 seed,
64 base_unix,
65 max_virtual_runtime,
66 parallelism,
67 hydrant_base,
68 slingshot_base,
69 mem_ws_capacity,
70 warming_buffer_enabled,
71 } = self.config;
72
73 let entropy = Arc::new(SeededEntropy::new(seed));
74 let hasher = RuntimeHasher::from_entropy(&*entropy);
75 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(base_unix));
76
77 let store = Arc::new(EdgeStore::new(hasher.clone()));
78 let coverage = Arc::new(CoverageWatch::new());
79 let records = Arc::new(NoopRecordStore);
80 let cancel = CancellationToken::new();
81 let consumer_too_slow_count = Arc::new(AtomicU64::new(0));
82 let disconnects = Arc::new(DisconnectSink::new());
83 let warming_shadow = Arc::new(WarmingShadowBuffer::new(hasher.clone()));
84 let warming_buffer = Arc::new(WarmingBuffer::new(hasher.clone()));
85
86 let workload_name = self.workload.name();
87 let ctx = WorkloadCtx {
88 seed,
89 clock: clock.clone(),
90 entropy: entropy.clone(),
91 store: store.clone(),
92 coverage: coverage.clone(),
93 records: records.clone(),
94 cancel: cancel.clone(),
95 consumer_too_slow_count: consumer_too_slow_count.clone(),
96 };
97 let hooks = self.workload.build(ctx);
98
99 let slingshot_http = MemHttpTransport::shared(hooks.slingshot.clone(), clock.clone());
100 let slingshot_client = SlingshotClient::new(slingshot_base, slingshot_http)
101 .expect("slingshot base url is valid");
102 let resolver = Arc::new(RepoIdResolver::with_slingshot(
103 slingshot_client,
104 clock.clone(),
105 hasher.clone(),
106 ));
107
108 let mem_ws = MemWsTransport::shared_with_capacity(hooks.hydrant.clone(), mem_ws_capacity);
109
110 let ingest_runtime: IngestRuntime<NoopSearchSink> = IngestRuntime {
111 store: store.clone(),
112 issue_states: Arc::new(bobbin_edge_index::StateIndex::new(hasher.clone())),
113 pull_statuses: Arc::new(bobbin_edge_index::StateIndex::new(hasher.clone())),
114 coverage: coverage.clone(),
115 search: Arc::new(NoopSearchSink),
116 records: records.clone() as Arc<dyn bobbin_record_lru::RecordStore>,
117 resolver: resolver.clone(),
118 clock: clock.clone(),
119 entropy: entropy.clone(),
120 ws: mem_ws,
121 cancel: cancel.clone(),
122 disconnects: Some(disconnects.clone()),
123 warming_shadow: Some(warming_shadow.clone()),
124 warming_buffer: warming_buffer_enabled.then(|| warming_buffer.clone()),
125 };
126 let ingest_config = IngestConfig {
127 hydrant_base,
128 start_cursor: bobbin_edge_index::HydrantCursor::new(0),
129 parallelism,
130 };
131 let mut ingest_handle = tokio::spawn(async move {
132 let _ = run_ingest(ingest_config, ingest_runtime).await;
133 });
134
135 let script = hooks.script;
136 let initial_report = tokio::select! {
137 biased;
138 _ = clock.sleep(max_virtual_runtime) => SimReport {
139 workload: workload_name,
140 seed,
141 outcome: SimOutcome::TimedOut,
142 virtual_runtime: max_virtual_runtime,
143 virtual_clock_end: clock.now_unix_micros(),
144 events_processed: coverage.snapshot().events_processed(),
145 last_cursor: coverage.snapshot().last_cursor().raw(),
146 edge_count: store.key_count() as u64,
147 resolver_hits: 0,
148 resolver_misses: 0,
149 consumer_too_slow_count: consumer_too_slow_count.load(Ordering::Relaxed),
150 disconnect_count: disconnects.count(),
151 last_disconnect: disconnects.snapshot(),
152 warming_shadow: warming_shadow.snapshot(),
153 warming_buffer: warming_buffer.snapshot(),
154 failure_reason: Some(format!(
155 "max_virtual_runtime {max_virtual_runtime:?} exhausted",
156 )),
157 },
158 r = script => r,
159 };
160
161 cancel.cancel();
162 let drain_deadline = clock.sleep(Duration::from_secs(30));
163 tokio::pin!(drain_deadline);
164 tokio::select! {
165 _ = &mut drain_deadline => {
166 ingest_handle.abort();
167 let _ = ingest_handle.await;
168 }
169 res = &mut ingest_handle => {
170 let _ = res;
171 }
172 }
173
174 let stats = resolver.stats();
175 SimReport {
176 edge_count: store.key_count() as u64,
177 events_processed: coverage.snapshot().events_processed(),
178 last_cursor: coverage.snapshot().last_cursor().raw(),
179 resolver_hits: stats.hits,
180 resolver_misses: stats.miss_count(),
181 consumer_too_slow_count: consumer_too_slow_count.load(Ordering::Relaxed),
182 disconnect_count: disconnects.count(),
183 last_disconnect: disconnects.snapshot(),
184 warming_shadow: warming_shadow.snapshot(),
185 warming_buffer: warming_buffer.snapshot(),
186 ..initial_report
187 }
188 }
189}