Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

at master 3.0 kB View raw
1use std::sync::Arc; 2use std::time::Duration; 3 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_ingest::{IngestConfig, IngestRuntime, RepoIdResolver, run}; 6use bobbin_record_lru::{NoopRecordStore, RecordStore}; 7use bobbin_runtime::{OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs}; 8use bobbin_types::search::NoopSearchSink; 9use futures::stream::{self, StreamExt}; 10use tokio_util::sync::CancellationToken; 11use url::Url; 12 13const MIN_EVENTS_PER_RUN: u64 = 100; 14 15#[tokio::main(flavor = "multi_thread")] 16async fn main() { 17 tracing_subscriber::fmt::try_init().ok(); 18 19 let endpoint = std::env::args() 20 .nth(1) 21 .unwrap_or_else(|| "ws://127.0.0.1:3010".to_owned()); 22 let ticks: u32 = std::env::args() 23 .nth(2) 24 .and_then(|s| s.parse().ok()) 25 .unwrap_or(6); 26 27 let url = Url::parse(&endpoint).expect("valid hydrant base url"); 28 let hasher = RuntimeHasher::from_entropy(&OsEntropy); 29 let store = Arc::new(EdgeStore::new(hasher.clone())); 30 let coverage = Arc::new(CoverageWatch::new()); 31 32 let cfg = IngestConfig::new(url); 33 let cancel = CancellationToken::new(); 34 let runtime = IngestRuntime { 35 store: store.clone(), 36 issue_states: Arc::new(StateIndex::new(hasher.clone())), 37 pull_statuses: Arc::new(StateIndex::new(hasher.clone())), 38 coverage: coverage.clone(), 39 search: Arc::new(NoopSearchSink), 40 records: Arc::new(NoopRecordStore) as Arc<dyn RecordStore>, 41 resolver: Arc::new(RepoIdResolver::detached(hasher)), 42 clock: Arc::new(SystemClock::new()), 43 entropy: Arc::new(OsEntropy), 44 ws: TungsteniteWs::shared(), 45 cancel: cancel.clone(), 46 disconnects: None, 47 warming_shadow: None, 48 warming_buffer: None, 49 knot_registry: None, 50 knot_gate: None, 51 }; 52 let task = tokio::spawn(async move { 53 let _ = run(cfg, runtime).await; 54 }); 55 56 stream::iter(0..ticks) 57 .for_each(|i| { 58 let store = store.clone(); 59 let coverage = coverage.clone(); 60 async move { 61 tokio::time::sleep(Duration::from_secs(5)).await; 62 let snap = coverage.snapshot(); 63 println!( 64 "[{i:02}] coverage={snap:?} edge_keys={} sources={}", 65 store.key_count(), 66 store.source_count() 67 ); 68 } 69 }) 70 .await; 71 72 cancel.cancel(); 73 let _ = task.await; 74 75 let final_snap = coverage.snapshot(); 76 let events = final_snap.events_processed(); 77 println!( 78 "done: events={events} edge_keys={} sources={} ready={}", 79 store.key_count(), 80 store.source_count(), 81 final_snap.is_ready(), 82 ); 83 assert!( 84 events >= MIN_EVENTS_PER_RUN, 85 "smoke received {events} events from {endpoint} across {ticks} five-second ticks, below threshold of {MIN_EVENTS_PER_RUN}; hydrant may be silent or unreachable", 86 ); 87}