Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2use std::sync::Mutex;
3
4use jacquard_common::DefaultStr;
5use jacquard_common::types::nsid::Nsid;
6use tracing::field::{Field, Visit};
7use tracing::{Event, Subscriber};
8use tracing_subscriber::Layer;
9use tracing_subscriber::layer::Context;
10use tracing_subscriber::registry::LookupSpan;
11
12#[derive(Clone, Default)]
13pub struct TraceCapture {
14 inner: Arc<Mutex<Vec<String>>>,
15}
16
17impl TraceCapture {
18 pub fn new() -> Self {
19 Self::default()
20 }
21
22 pub fn lines(&self) -> Vec<String> {
23 self.inner.lock().unwrap().clone()
24 }
25
26 pub fn into_lines(self) -> Vec<String> {
27 std::mem::take(&mut *self.inner.lock().unwrap())
28 }
29
30 pub fn layer(&self) -> StageLayer {
31 StageLayer {
32 sink: self.inner.clone(),
33 }
34 }
35}
36
37pub struct StageLayer {
38 sink: Arc<Mutex<Vec<String>>>,
39}
40
41impl<S> Layer<S> for StageLayer
42where
43 S: Subscriber + for<'a> LookupSpan<'a>,
44{
45 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
46 let meta = event.metadata();
47 if meta.target() != "bobbin_ingest::stage" {
48 return;
49 }
50 let mut visitor = StageVisitor::default();
51 event.record(&mut visitor);
52 let line = format!(
53 "cursor={} regime={} nsid={} edge_count={} parallelism={}",
54 visitor.cursor.unwrap_or(u64::MAX),
55 visitor.regime.as_deref().unwrap_or(""),
56 visitor.nsid.as_ref().map(Nsid::as_ref).unwrap_or(""),
57 visitor.edge_count.unwrap_or(0),
58 visitor.parallelism.unwrap_or(0),
59 );
60 self.sink.lock().unwrap().push(line);
61 }
62}
63
64#[derive(Default)]
65struct StageVisitor {
66 cursor: Option<u64>,
67 regime: Option<String>,
68 nsid: Option<Nsid<DefaultStr>>,
69 edge_count: Option<u64>,
70 parallelism: Option<usize>,
71}
72
73impl Visit for StageVisitor {
74 fn record_u64(&mut self, field: &Field, value: u64) {
75 match field.name() {
76 "cursor" => self.cursor = Some(value),
77 "edge_count" => self.edge_count = Some(value),
78 "parallelism" => self.parallelism = Some(value as usize),
79 _ => {}
80 }
81 }
82
83 fn record_i64(&mut self, field: &Field, value: i64) {
84 if field.name() == "cursor" {
85 self.cursor = Some(value as u64);
86 }
87 }
88
89 fn record_str(&mut self, field: &Field, value: &str) {
90 match field.name() {
91 "regime" => self.regime = Some(value.to_owned()),
92 "nsid" => self.nsid = Nsid::new_owned(value).ok(),
93 _ => {}
94 }
95 }
96
97 fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
98}