Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2use std::time::Duration;
3
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_ingest::{IngestConfig, IngestRuntime, RepoIdResolver, run};
6use bobbin_record_lru::{NoopRecordStore, RecordStore};
7use bobbin_runtime::{OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs};
8use bobbin_types::search::NoopSearchSink;
9use futures::stream::{self, StreamExt};
10use tokio_util::sync::CancellationToken;
11use url::Url;
12
13const MIN_EVENTS_PER_RUN: u64 = 100;
14
15#[tokio::main(flavor = "multi_thread")]
16async fn main() {
17 tracing_subscriber::fmt::try_init().ok();
18
19 let endpoint = std::env::args()
20 .nth(1)
21 .unwrap_or_else(|| "ws://127.0.0.1:3010".to_owned());
22 let ticks: u32 = std::env::args()
23 .nth(2)
24 .and_then(|s| s.parse().ok())
25 .unwrap_or(6);
26
27 let url = Url::parse(&endpoint).expect("valid hydrant base url");
28 let hasher = RuntimeHasher::from_entropy(&OsEntropy);
29 let store = Arc::new(EdgeStore::new(hasher.clone()));
30 let coverage = Arc::new(CoverageWatch::new());
31
32 let cfg = IngestConfig::new(url);
33 let cancel = CancellationToken::new();
34 let runtime = IngestRuntime {
35 store: store.clone(),
36 issue_states: Arc::new(StateIndex::new(hasher.clone())),
37 pull_statuses: Arc::new(StateIndex::new(hasher.clone())),
38 coverage: coverage.clone(),
39 search: Arc::new(NoopSearchSink),
40 records: Arc::new(NoopRecordStore) as Arc<dyn RecordStore>,
41 resolver: Arc::new(RepoIdResolver::detached(hasher)),
42 clock: Arc::new(SystemClock::new()),
43 entropy: Arc::new(OsEntropy),
44 ws: TungsteniteWs::shared(),
45 cancel: cancel.clone(),
46 disconnects: None,
47 warming_shadow: None,
48 warming_buffer: None,
49 knot_registry: None,
50 knot_gate: None,
51 };
52 let task = tokio::spawn(async move {
53 let _ = run(cfg, runtime).await;
54 });
55
56 stream::iter(0..ticks)
57 .for_each(|i| {
58 let store = store.clone();
59 let coverage = coverage.clone();
60 async move {
61 tokio::time::sleep(Duration::from_secs(5)).await;
62 let snap = coverage.snapshot();
63 println!(
64 "[{i:02}] coverage={snap:?} edge_keys={} sources={}",
65 store.key_count(),
66 store.source_count()
67 );
68 }
69 })
70 .await;
71
72 cancel.cancel();
73 let _ = task.await;
74
75 let final_snap = coverage.snapshot();
76 let events = final_snap.events_processed();
77 println!(
78 "done: events={events} edge_keys={} sources={} ready={}",
79 store.key_count(),
80 store.source_count(),
81 final_snap.is_ready(),
82 );
83 assert!(
84 events >= MIN_EVENTS_PER_RUN,
85 "smoke received {events} events from {endpoint} across {ticks} five-second ticks, below threshold of {MIN_EVENTS_PER_RUN}; hydrant may be silent or unreachable",
86 );
87}