Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1use std::sync::Arc; 2use std::sync::Mutex; 3 4use jacquard_common::DefaultStr; 5use jacquard_common::types::nsid::Nsid; 6use tracing::field::{Field, Visit}; 7use tracing::{Event, Subscriber}; 8use tracing_subscriber::Layer; 9use tracing_subscriber::layer::Context; 10use tracing_subscriber::registry::LookupSpan; 11 12#[derive(Clone, Default)] 13pub struct TraceCapture { 14 inner: Arc<Mutex<Vec<String>>>, 15} 16 17impl TraceCapture { 18 pub fn new() -> Self { 19 Self::default() 20 } 21 22 pub fn lines(&self) -> Vec<String> { 23 self.inner.lock().unwrap().clone() 24 } 25 26 pub fn into_lines(self) -> Vec<String> { 27 std::mem::take(&mut *self.inner.lock().unwrap()) 28 } 29 30 pub fn layer(&self) -> StageLayer { 31 StageLayer { 32 sink: self.inner.clone(), 33 } 34 } 35} 36 37pub struct StageLayer { 38 sink: Arc<Mutex<Vec<String>>>, 39} 40 41impl<S> Layer<S> for StageLayer 42where 43 S: Subscriber + for<'a> LookupSpan<'a>, 44{ 45 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { 46 let meta = event.metadata(); 47 if meta.target() != "bobbin_ingest::stage" { 48 return; 49 } 50 let mut visitor = StageVisitor::default(); 51 event.record(&mut visitor); 52 let line = format!( 53 "cursor={} regime={} nsid={} edge_count={} parallelism={}", 54 visitor.cursor.unwrap_or(u64::MAX), 55 visitor.regime.as_deref().unwrap_or(""), 56 visitor.nsid.as_ref().map(Nsid::as_ref).unwrap_or(""), 57 visitor.edge_count.unwrap_or(0), 58 visitor.parallelism.unwrap_or(0), 59 ); 60 self.sink.lock().unwrap().push(line); 61 } 62} 63 64#[derive(Default)] 65struct StageVisitor { 66 cursor: Option<u64>, 67 regime: Option<String>, 68 nsid: Option<Nsid<DefaultStr>>, 69 edge_count: Option<u64>, 70 parallelism: Option<usize>, 71} 72 73impl Visit for StageVisitor { 74 fn record_u64(&mut self, field: &Field, value: u64) { 75 match field.name() { 76 "cursor" => self.cursor = Some(value), 77 "edge_count" => self.edge_count = Some(value), 78 "parallelism" => self.parallelism = Some(value as usize), 79 _ => {} 80 } 81 } 82 83 fn record_i64(&mut self, field: &Field, value: i64) { 84 if field.name() == "cursor" { 85 self.cursor = Some(value as u64); 86 } 87 } 88 89 fn record_str(&mut self, field: &Field, value: &str) { 90 match field.name() { 91 "regime" => self.regime = Some(value.to_owned()), 92 "nsid" => self.nsid = Nsid::new_owned(value).ok(), 93 _ => {} 94 } 95 } 96 97 fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {} 98}