Monorepo for Tangled tangled.org
10

Configure Feed

Select the types of activity you want to include in your feed.

1use std::num::NonZeroUsize; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicU64, Ordering}; 4use std::time::Duration; 5 6use bobbin_edge_index::{CoverageWatch, EdgeStore}; 7use bobbin_ingest::{ 8 DEFAULT_INGEST_PARALLELISM, DisconnectSink, IngestConfig, IngestRuntime, RepoIdResolver, 9 WarmingBuffer, WarmingShadowBuffer, run as run_ingest, 10}; 11use bobbin_record_lru::NoopRecordStore; 12use bobbin_runtime::{ 13 Clock, DEFAULT_MEM_WS_CAPACITY, MemHttpTransport, MemWsTransport, RuntimeHasher, SeededEntropy, 14 SimClock, UnixMicros, 15}; 16use bobbin_slingshot_client::SlingshotClient; 17use bobbin_types::search::NoopSearchSink; 18use tokio_util::sync::CancellationToken; 19use url::Url; 20 21use crate::report::{SimOutcome, SimReport}; 22use crate::workload::{Workload, WorkloadCtx}; 23 24#[derive(Clone, Debug)] 25pub struct SimConfig { 26 pub seed: u64, 27 pub base_unix: UnixMicros, 28 pub max_virtual_runtime: Duration, 29 pub parallelism: NonZeroUsize, 30 pub hydrant_base: Url, 31 pub slingshot_base: Url, 32 pub mem_ws_capacity: usize, 33 pub warming_buffer_enabled: bool, 34} 35 36impl SimConfig { 37 pub fn new(seed: u64) -> Self { 38 Self { 39 seed, 40 base_unix: UnixMicros::new(1_700_000_000_000_000), 41 max_virtual_runtime: Duration::from_secs(60), 42 parallelism: DEFAULT_INGEST_PARALLELISM, 43 hydrant_base: Url::parse("ws://hydrant.sim/").unwrap(), 44 slingshot_base: Url::parse("http://slingshot.sim/").unwrap(), 45 mem_ws_capacity: DEFAULT_MEM_WS_CAPACITY, 46 warming_buffer_enabled: true, 47 } 48 } 49} 50 51pub struct Sim { 52 config: SimConfig, 53 workload: Box<dyn Workload>, 54} 55 56impl Sim { 57 pub fn new(config: SimConfig, workload: Box<dyn Workload>) -> Self { 58 Self { config, workload } 59 } 60 61 pub async fn run(self) -> SimReport { 62 let SimConfig { 63 seed, 64 base_unix, 65 max_virtual_runtime, 66 parallelism, 67 hydrant_base, 68 slingshot_base, 69 mem_ws_capacity, 70 warming_buffer_enabled, 71 } = self.config; 72 73 let entropy = Arc::new(SeededEntropy::new(seed)); 74 let hasher = RuntimeHasher::from_entropy(&*entropy); 75 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(base_unix)); 76 77 let store = Arc::new(EdgeStore::new(hasher.clone())); 78 let coverage = Arc::new(CoverageWatch::new()); 79 let records = Arc::new(NoopRecordStore); 80 let cancel = CancellationToken::new(); 81 let consumer_too_slow_count = Arc::new(AtomicU64::new(0)); 82 let disconnects = Arc::new(DisconnectSink::new()); 83 let warming_shadow = Arc::new(WarmingShadowBuffer::new(hasher.clone())); 84 let warming_buffer = Arc::new(WarmingBuffer::new(hasher.clone())); 85 86 let workload_name = self.workload.name(); 87 let ctx = WorkloadCtx { 88 seed, 89 clock: clock.clone(), 90 entropy: entropy.clone(), 91 store: store.clone(), 92 coverage: coverage.clone(), 93 records: records.clone(), 94 cancel: cancel.clone(), 95 consumer_too_slow_count: consumer_too_slow_count.clone(), 96 }; 97 let hooks = self.workload.build(ctx); 98 99 let slingshot_http = MemHttpTransport::shared(hooks.slingshot.clone(), clock.clone()); 100 let slingshot_client = SlingshotClient::new(slingshot_base, slingshot_http) 101 .expect("slingshot base url is valid"); 102 let resolver = Arc::new(RepoIdResolver::with_slingshot( 103 slingshot_client, 104 clock.clone(), 105 hasher.clone(), 106 )); 107 108 let mem_ws = MemWsTransport::shared_with_capacity(hooks.hydrant.clone(), mem_ws_capacity); 109 110 let ingest_runtime: IngestRuntime<NoopSearchSink> = IngestRuntime { 111 store: store.clone(), 112 issue_states: Arc::new(bobbin_edge_index::StateIndex::new(hasher.clone())), 113 pull_statuses: Arc::new(bobbin_edge_index::StateIndex::new(hasher.clone())), 114 coverage: coverage.clone(), 115 search: Arc::new(NoopSearchSink), 116 records: records.clone() as Arc<dyn bobbin_record_lru::RecordStore>, 117 resolver: resolver.clone(), 118 clock: clock.clone(), 119 entropy: entropy.clone(), 120 ws: mem_ws, 121 cancel: cancel.clone(), 122 disconnects: Some(disconnects.clone()), 123 warming_shadow: Some(warming_shadow.clone()), 124 warming_buffer: warming_buffer_enabled.then(|| warming_buffer.clone()), 125 }; 126 let ingest_config = IngestConfig { 127 hydrant_base, 128 start_cursor: bobbin_edge_index::HydrantCursor::new(0), 129 parallelism, 130 }; 131 let mut ingest_handle = tokio::spawn(async move { 132 let _ = run_ingest(ingest_config, ingest_runtime).await; 133 }); 134 135 let script = hooks.script; 136 let initial_report = tokio::select! { 137 biased; 138 _ = clock.sleep(max_virtual_runtime) => SimReport { 139 workload: workload_name, 140 seed, 141 outcome: SimOutcome::TimedOut, 142 virtual_runtime: max_virtual_runtime, 143 virtual_clock_end: clock.now_unix_micros(), 144 events_processed: coverage.snapshot().events_processed(), 145 last_cursor: coverage.snapshot().last_cursor().raw(), 146 edge_count: store.key_count() as u64, 147 resolver_hits: 0, 148 resolver_misses: 0, 149 consumer_too_slow_count: consumer_too_slow_count.load(Ordering::Relaxed), 150 disconnect_count: disconnects.count(), 151 last_disconnect: disconnects.snapshot(), 152 warming_shadow: warming_shadow.snapshot(), 153 warming_buffer: warming_buffer.snapshot(), 154 failure_reason: Some(format!( 155 "max_virtual_runtime {max_virtual_runtime:?} exhausted", 156 )), 157 }, 158 r = script => r, 159 }; 160 161 cancel.cancel(); 162 let drain_deadline = clock.sleep(Duration::from_secs(30)); 163 tokio::pin!(drain_deadline); 164 tokio::select! { 165 _ = &mut drain_deadline => { 166 ingest_handle.abort(); 167 let _ = ingest_handle.await; 168 } 169 res = &mut ingest_handle => { 170 let _ = res; 171 } 172 } 173 174 let stats = resolver.stats(); 175 SimReport { 176 edge_count: store.key_count() as u64, 177 events_processed: coverage.snapshot().events_processed(), 178 last_cursor: coverage.snapshot().last_cursor().raw(), 179 resolver_hits: stats.hits, 180 resolver_misses: stats.miss_count(), 181 consumer_too_slow_count: consumer_too_slow_count.load(Ordering::Relaxed), 182 disconnect_count: disconnects.count(), 183 last_disconnect: disconnects.snapshot(), 184 warming_shadow: warming_shadow.snapshot(), 185 warming_buffer: warming_buffer.snapshot(), 186 ..initial_report 187 } 188 } 189}