Monorepo for Tangled
tangled.org
1use std::collections::{HashSet, VecDeque};
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::time::Duration;
5
6use bobbin_edge_index::{
7 ApplyOutcome, Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind,
8 PromotionSignal, PullStatusKind, StateIndex, apply_record_state,
9};
10use bobbin_knot_ingest::{CapabilityGate, KnotRegistry};
11use bobbin_record_lru::RecordStore;
12use bobbin_resolver::{NormalizeRepoRefs, decode_canon_or_upgrade_bytes, synthesize_created_at};
13use bobbin_runtime::{
14 Clock, Entropy, NetworkError, RuntimeHasher, UnixMicros, WsConn, WsMessage, WsStream,
15 WsTransport,
16};
17use bobbin_types::edges::{Edge, ExtractError, Record};
18use bobbin_types::ids::{RepoIdent, SubjectRef};
19use bobbin_types::knot_acl::KnotHostKey;
20use bobbin_types::record::RecordBody;
21use bobbin_types::search::{SearchSink, SearchableRecord};
22use bytes::Bytes;
23use futures::StreamExt;
24use jacquard_common::DefaultStr;
25use jacquard_common::types::did::Did;
26use jacquard_common::types::ident::AtIdentifier;
27use jacquard_common::types::nsid::Nsid;
28use jacquard_common::types::recordkey::Rkey;
29use jacquard_common::types::string::{AtStrError, AtUri, Cid};
30use thiserror::Error;
31use tokio::time::Instant;
32use tokio_stream::wrappers::ReceiverStream;
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, info, warn};
35use url::Url;
36
37mod frame;
38mod resolver;
39mod shadow;
40mod warming;
41use frame::HydrantStreamErrorFrame;
42pub use frame::{FrameKind, HydrantFrame, RecordAction, RecordFrame};
43pub use resolver::{RepoIdResolver, Resolution};
44pub use shadow::{WarmingShadowBuffer, WarmingShadowSnapshot};
45pub use warming::{ParkedUpsert, WarmingBuffer, WarmingBufferSnapshot};
46
47const TANGLED_PREFIX: &str = "sh.tangled.";
48const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(500);
49const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
50const PING_INTERVAL: Duration = Duration::from_secs(20);
51const PONG_TIMEOUT: Duration = Duration::from_secs(15);
52const READY_SKEW: Duration = Duration::from_secs(60);
53const FRAME_CHANNEL_DEPTH: usize = 256;
54const CONTROL_CHANNEL_DEPTH: usize = 16;
55const READER_HOLD_LIMIT: usize = 64;
56const SEND_TIMEOUT: Duration = Duration::from_secs(10);
57const NORMAL_CLOSE: u16 = 1000;
58const METRICS_DUMP_INTERVAL: Duration = Duration::from_secs(10);
59const WARMING_FLUSH_PARALLELISM: usize = 64;
60pub const DEFAULT_INGEST_PARALLELISM: NonZeroUsize = match NonZeroUsize::new(16) {
61 Some(n) => n,
62 None => unreachable!(),
63};
64
65#[derive(Clone, Debug)]
66pub struct IngestConfig {
67 pub hydrant_base: Url,
68 pub start_cursor: HydrantCursor,
69 pub parallelism: NonZeroUsize,
70}
71
72impl IngestConfig {
73 pub fn new(hydrant_base: Url) -> Self {
74 Self {
75 hydrant_base,
76 start_cursor: HydrantCursor::new(0),
77 parallelism: DEFAULT_INGEST_PARALLELISM,
78 }
79 }
80
81 fn stream_url(&self, cursor: HydrantCursor) -> Result<Url, IngestError> {
82 let mut url = self.hydrant_base.clone();
83 match url.scheme() {
84 "http" => url
85 .set_scheme("ws")
86 .map_err(|_| IngestError::Url("set ws scheme"))?,
87 "https" => url
88 .set_scheme("wss")
89 .map_err(|_| IngestError::Url("set wss scheme"))?,
90 "ws" | "wss" => {}
91 other => return Err(IngestError::UnknownScheme(other.to_owned())),
92 }
93 url.set_path("/stream");
94 url.query_pairs_mut()
95 .clear()
96 .append_pair("cursor", &cursor.raw().to_string());
97 Ok(url)
98 }
99}
100
101#[derive(Debug, Error)]
102pub enum IngestError {
103 #[error("invalid hydrant url: {0}")]
104 Url(&'static str),
105 #[error("unsupported url scheme: {0}")]
106 UnknownScheme(String),
107 #[error("network: {0}")]
108 Network(#[from] NetworkError),
109 #[error("frame decode: {0}")]
110 Decode(#[from] serde_json::Error),
111 #[error("invalid at-uri synthesized from frame: {0}")]
112 InvalidAtUri(#[from] AtStrError),
113 #[error("record extraction: {0}")]
114 Extract(#[from] ExtractError),
115 #[error("hydrant did not respond to ping within {0:?}")]
116 PongTimeout(Duration),
117 #[error("websocket send blocked for at least {0:?}, treating link as dead")]
118 SendTimeout(Duration),
119 #[error("hydrant disconnected because bobbin's stream consumer fell behind: {message}")]
120 ConsumerTooSlow { message: String },
121 #[error("hydrant signaled stream error {code}: {message}")]
122 HydrantStream { code: String, message: String },
123}
124
125#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
126pub enum DisconnectKind {
127 Url,
128 UnknownScheme,
129 Network,
130 Decode,
131 InvalidAtUri,
132 Extract,
133 PongTimeout,
134 SendTimeout,
135 ConsumerTooSlow,
136 HydrantStream,
137}
138
139impl DisconnectKind {
140 pub fn from_error(err: &IngestError) -> Self {
141 match err {
142 IngestError::Url(_) => Self::Url,
143 IngestError::UnknownScheme(_) => Self::UnknownScheme,
144 IngestError::Network(_) => Self::Network,
145 IngestError::Decode(_) => Self::Decode,
146 IngestError::InvalidAtUri(_) => Self::InvalidAtUri,
147 IngestError::Extract(_) => Self::Extract,
148 IngestError::PongTimeout(_) => Self::PongTimeout,
149 IngestError::SendTimeout(_) => Self::SendTimeout,
150 IngestError::ConsumerTooSlow { .. } => Self::ConsumerTooSlow,
151 IngestError::HydrantStream { .. } => Self::HydrantStream,
152 }
153 }
154}
155
156#[derive(Clone, Debug, Eq, PartialEq)]
157pub struct DisconnectSnapshot {
158 pub kind: DisconnectKind,
159 pub message: String,
160 pub at_unix_micros: UnixMicros,
161 pub last_cursor: HydrantCursor,
162}
163
164#[derive(Default)]
165pub struct DisconnectSink {
166 last: std::sync::Mutex<Option<DisconnectSnapshot>>,
167 count: std::sync::atomic::AtomicU64,
168}
169
170impl DisconnectSink {
171 pub fn new() -> Self {
172 Self::default()
173 }
174
175 pub fn record(&self, snap: DisconnectSnapshot) {
176 *self.last.lock().expect("disconnect sink mutex poisoned") = Some(snap);
177 self.count
178 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
179 }
180
181 pub fn snapshot(&self) -> Option<DisconnectSnapshot> {
182 self.last
183 .lock()
184 .expect("disconnect sink mutex poisoned")
185 .clone()
186 }
187
188 pub fn count(&self) -> u64 {
189 self.count.load(std::sync::atomic::Ordering::Relaxed)
190 }
191}
192
193#[derive(Clone, Copy, Debug, Eq, PartialEq)]
194enum SessionOutcome {
195 Progressed,
196 Empty,
197}
198
199#[derive(Debug)]
200struct SessionEnd {
201 outcome: SessionOutcome,
202 error: Option<IngestError>,
203}
204
205pub struct IngestRuntime<S: SearchSink + 'static> {
206 pub store: Arc<EdgeStore>,
207 pub issue_states: Arc<StateIndex<IssueStateKind>>,
208 pub pull_statuses: Arc<StateIndex<PullStatusKind>>,
209 pub coverage: Arc<CoverageWatch>,
210 pub search: Arc<S>,
211 pub records: Arc<dyn RecordStore>,
212 pub resolver: Arc<RepoIdResolver>,
213 pub clock: Arc<dyn Clock>,
214 pub entropy: Arc<dyn Entropy>,
215 pub ws: Arc<dyn WsTransport>,
216 pub cancel: CancellationToken,
217 pub disconnects: Option<Arc<DisconnectSink>>,
218 pub warming_shadow: Option<Arc<WarmingShadowBuffer>>,
219 pub warming_buffer: Option<Arc<WarmingBuffer>>,
220 pub knot_registry: Option<Arc<KnotRegistry>>,
221 pub knot_gate: Option<Arc<CapabilityGate>>,
222}
223
224impl<S: SearchSink + 'static> Clone for IngestRuntime<S> {
225 fn clone(&self) -> Self {
226 Self {
227 store: self.store.clone(),
228 issue_states: self.issue_states.clone(),
229 pull_statuses: self.pull_statuses.clone(),
230 coverage: self.coverage.clone(),
231 search: self.search.clone(),
232 records: self.records.clone(),
233 resolver: self.resolver.clone(),
234 clock: self.clock.clone(),
235 entropy: self.entropy.clone(),
236 ws: self.ws.clone(),
237 cancel: self.cancel.clone(),
238 disconnects: self.disconnects.clone(),
239 warming_shadow: self.warming_shadow.clone(),
240 warming_buffer: self.warming_buffer.clone(),
241 knot_registry: self.knot_registry.clone(),
242 knot_gate: self.knot_gate.clone(),
243 }
244 }
245}
246
247impl<S: SearchSink + 'static> IngestRuntime<S> {
248 fn pipeline_ctx(&self) -> PipelineCtx<'_, S> {
249 PipelineCtx {
250 resolver: &self.resolver,
251 store: &self.store,
252 issue_states: &self.issue_states,
253 pull_statuses: &self.pull_statuses,
254 coverage: &self.coverage,
255 records: &*self.records,
256 search: &self.search,
257 shadow: self.warming_shadow.as_deref(),
258 buffer: self.warming_buffer.as_deref(),
259 knot_registry: self.knot_registry.as_deref(),
260 knot_gate: self.knot_gate.as_deref(),
261 }
262 }
263}
264
265struct PipelineCtx<'a, S: SearchSink + 'static> {
266 resolver: &'a RepoIdResolver,
267 store: &'a EdgeStore,
268 issue_states: &'a StateIndex<IssueStateKind>,
269 pull_statuses: &'a StateIndex<PullStatusKind>,
270 coverage: &'a CoverageWatch,
271 records: &'a dyn RecordStore,
272 search: &'a S,
273 shadow: Option<&'a WarmingShadowBuffer>,
274 buffer: Option<&'a WarmingBuffer>,
275 knot_registry: Option<&'a KnotRegistry>,
276 knot_gate: Option<&'a CapabilityGate>,
277}
278
279pub async fn run<S: SearchSink + 'static>(
280 config: IngestConfig,
281 runtime: IngestRuntime<S>,
282) -> Result<(), IngestError> {
283 let metrics_dumper = spawn_metrics_dumper(&runtime);
284 let idle_promoter = spawn_idle_promoter(&runtime);
285 let warming_flusher = spawn_warming_flusher(&runtime);
286 let result = run_inner(config, &runtime).await;
287 if let Err(join) = metrics_dumper.await {
288 warn!(?join, "metrics dumper task panicked");
289 }
290 if let Err(join) = idle_promoter.await {
291 warn!(?join, "idle promoter task panicked");
292 }
293 if let Some(handle) = warming_flusher
294 && let Err(join) = handle.await
295 {
296 warn!(?join, "warming flusher task panicked");
297 }
298 result
299}
300
301async fn run_inner<S: SearchSink + 'static>(
302 config: IngestConfig,
303 runtime: &IngestRuntime<S>,
304) -> Result<(), IngestError> {
305 let mut backoff = RECONNECT_INITIAL_DELAY;
306 loop {
307 let cursor = next_connect_cursor(runtime.coverage.snapshot(), config.start_cursor);
308 let SessionEnd { outcome, error } = run_session(&config, cursor, runtime).await;
309 if runtime.cancel.is_cancelled() {
310 info!(
311 last_cursor = runtime.coverage.snapshot().last_cursor().raw(),
312 "ingest stopped after shutdown signal"
313 );
314 return Ok(());
315 }
316 match (outcome, &error) {
317 (SessionOutcome::Progressed, None) => {
318 info!("hydrant stream closed after delivering frames, reconnecting")
319 }
320 (SessionOutcome::Empty, None) => {
321 warn!("hydrant stream closed without delivering frames")
322 }
323 (_, Some(err)) => warn!(?err, "hydrant stream errored"),
324 }
325 if let (Some(sink), Some(err)) = (runtime.disconnects.as_ref(), error.as_ref()) {
326 sink.record(DisconnectSnapshot {
327 kind: DisconnectKind::from_error(err),
328 message: err.to_string(),
329 at_unix_micros: runtime.clock.now_unix_micros(),
330 last_cursor: runtime.coverage.snapshot().last_cursor(),
331 });
332 }
333 let made_progress = matches!(outcome, SessionOutcome::Progressed);
334 if made_progress {
335 backoff = RECONNECT_INITIAL_DELAY;
336 } else {
337 tokio::select! {
338 biased;
339 _ = runtime.cancel.cancelled() => return Ok(()),
340 _ = runtime.clock.sleep(jittered(backoff, &*runtime.entropy)) => {}
341 }
342 backoff = (backoff * 2).min(RECONNECT_MAX_DELAY);
343 }
344 }
345}
346
347fn spawn_warming_flusher<S: SearchSink + 'static>(
348 runtime: &IngestRuntime<S>,
349) -> Option<tokio::task::JoinHandle<()>> {
350 runtime.warming_buffer.as_ref()?;
351 let rt = runtime.clone();
352 Some(tokio::spawn(async move {
353 let buffer = rt
354 .warming_buffer
355 .as_deref()
356 .expect("warming flusher only spawns when buffer is set");
357 let mut rx = rt.coverage.subscribe();
358 let reason = loop {
359 if rx.borrow_and_update().is_ready() {
360 break FlushReason::Ready;
361 }
362 tokio::select! {
363 biased;
364 _ = rt.cancel.cancelled() => break FlushReason::Cancelled,
365 res = rx.changed() => match res {
366 Ok(()) => continue,
367 Err(_) => break FlushReason::CoverageDropped,
368 },
369 }
370 };
371 flush_warming_buffer(&rt, buffer, reason).await;
372 }))
373}
374
375#[derive(Clone, Copy, Debug, Eq, PartialEq)]
376enum FlushReason {
377 Ready,
378 Cancelled,
379 CoverageDropped,
380}
381
382impl FlushReason {
383 fn as_str(self) -> &'static str {
384 match self {
385 FlushReason::Ready => "ready",
386 FlushReason::Cancelled => "cancelled",
387 FlushReason::CoverageDropped => "coverage_dropped",
388 }
389 }
390}
391
392async fn flush_warming_buffer<S: SearchSink + 'static>(
393 runtime: &IngestRuntime<S>,
394 buffer: &WarmingBuffer,
395 reason: FlushReason,
396) {
397 let drained = buffer.drain_for_promote().await;
398 if drained.is_empty() {
399 return;
400 }
401 if reason != FlushReason::Ready {
402 info!(
403 target: "bobbin_ingest::warming",
404 abandoned_entries = drained.len(),
405 reason = reason.as_str(),
406 "abandoning parked items on non-ready flush",
407 );
408 return;
409 }
410 let hasher = buffer.hasher().clone();
411 let unique: HashSet<RepoIdent, RuntimeHasher> = drained
412 .iter()
413 .flat_map(|(_, deps)| deps.iter().cloned())
414 .fold(HashSet::with_hasher(hasher), |mut acc, dep| {
415 acc.insert(dep);
416 acc
417 });
418 if !unique.is_empty() {
419 let resolver = runtime.resolver.clone();
420 let _: Vec<()> = futures::stream::iter(unique)
421 .map(|key| {
422 let resolver = resolver.clone();
423 async move {
424 let _ = resolver.resolve(&key.owner, &key.rkey).await;
425 }
426 })
427 .buffer_unordered(WARMING_FLUSH_PARALLELISM)
428 .collect()
429 .await;
430 }
431 let upserts: Vec<ParkedUpsert> = drained.into_iter().map(|(u, _)| u).collect();
432 let count = upserts.len();
433 let ctx = runtime.pipeline_ctx();
434 finalize_drained(&ctx, upserts).await;
435 info!(
436 target: "bobbin_ingest::warming",
437 flushed_entries = count,
438 reason = reason.as_str(),
439 "drained warming buffer",
440 );
441}
442
443const IDLE_PROMOTE_WINDOW: Duration = Duration::from_secs(15);
444const IDLE_PROMOTE_MIN_EVENTS: u64 = 256;
445
446fn spawn_idle_promoter<S: SearchSink + 'static>(
447 runtime: &IngestRuntime<S>,
448) -> tokio::task::JoinHandle<()> {
449 let rt = runtime.clone();
450 tokio::spawn(async move {
451 let mut prev = rt.coverage.snapshot().events_processed();
452 loop {
453 tokio::select! {
454 biased;
455 _ = rt.cancel.cancelled() => return,
456 _ = rt.clock.sleep(IDLE_PROMOTE_WINDOW) => {}
457 }
458 let snap = rt.coverage.snapshot();
459 if snap.is_ready() {
460 return;
461 }
462 let processed = snap.events_processed();
463 if processed >= IDLE_PROMOTE_MIN_EVENTS && processed == prev {
464 rt.coverage.update(|c| c.force_ready());
465 info!(
466 target: "bobbin_ingest::coverage",
467 events_processed = processed,
468 last_cursor = snap.last_cursor().raw(),
469 "stream idle, promoting coverage to ready",
470 );
471 return;
472 }
473 prev = processed;
474 }
475 })
476}
477
478fn spawn_metrics_dumper<S: SearchSink + 'static>(
479 runtime: &IngestRuntime<S>,
480) -> tokio::task::JoinHandle<()> {
481 let rt = runtime.clone();
482 tokio::spawn(async move {
483 loop {
484 tokio::select! {
485 biased;
486 _ = rt.cancel.cancelled() => break,
487 _ = rt.clock.sleep(METRICS_DUMP_INTERVAL) => {
488 let s = rt.resolver.stats();
489 info!(
490 target: "bobbin_ingest::metrics",
491 resolver_hits = s.hits,
492 resolver_misses_mapped = s.misses_mapped,
493 resolver_misses_no_repo_did = s.misses_no_repo_did,
494 resolver_misses_unresolvable = s.misses_unresolvable,
495 resolver_misses_transient = s.misses_transient,
496 resolver_misses_no_client = s.misses_no_client,
497 resolver_miss_latency_micros_avg = s.miss_latency_micros_avg().unwrap_or(0),
498 resolver_miss_latency_micros_max = s.miss_latency_micros_max,
499 resolver_total = s.total(),
500 "resolver stats",
501 );
502 }
503 }
504 }
505 })
506}
507
508fn next_connect_cursor(snapshot: Coverage, start: HydrantCursor) -> HydrantCursor {
509 if snapshot.events_processed() == 0 {
510 start
511 } else {
512 HydrantCursor::new(snapshot.last_cursor().raw().saturating_add(1))
513 }
514}
515
516fn jittered(base: Duration, entropy: &dyn Entropy) -> Duration {
517 let base_ms = u64::try_from(base.as_millis()).unwrap_or(u64::MAX);
518 let cap_ms = (base_ms / 4).max(1);
519 base + Duration::from_millis(entropy.next_u64() % cap_ms)
520}
521
522async fn run_session<S: SearchSink + 'static>(
523 config: &IngestConfig,
524 cursor: HydrantCursor,
525 runtime: &IngestRuntime<S>,
526) -> SessionEnd {
527 let url = match config.stream_url(cursor) {
528 Ok(u) => u,
529 Err(e) => {
530 return SessionEnd {
531 outcome: SessionOutcome::Empty,
532 error: Some(e),
533 };
534 }
535 };
536 info!(%url, "connecting to hydrant /stream");
537 let connect = tokio::select! {
538 biased;
539 _ = runtime.cancel.cancelled() => {
540 return SessionEnd { outcome: SessionOutcome::Empty, error: None };
541 }
542 res = runtime.ws.connect(url) => res,
543 };
544 let WsConn {
545 sink: mut ws_sink,
546 stream: ws_stream,
547 } = match connect {
548 Ok(c) => c,
549 Err(e) => {
550 return SessionEnd {
551 outcome: SessionOutcome::Empty,
552 error: Some(IngestError::Network(e)),
553 };
554 }
555 };
556
557 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH);
558 let parallelism = config.parallelism.get();
559 let processor_runtime = runtime.clone();
560 let processor = tokio::spawn(async move {
561 let cancel = processor_runtime.cancel.clone();
562 let prep_rt = processor_runtime.clone();
563 let resolve_rt = processor_runtime.clone();
564 let commit_rt = processor_runtime;
565
566 let pipeline = ReceiverStream::new(frame_rx)
567 .map(move |frame| prep_stage(frame, prep_rt.clone()))
568 .buffered(parallelism)
569 .map(move |staged| resolve_stage(staged, resolve_rt.clone()))
570 .buffered(parallelism)
571 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism));
572
573 tokio::select! {
574 biased;
575 _ = cancel.cancelled() => {},
576 _ = pipeline => {},
577 }
578 });
579
580 let (control_tx, mut control_rx) = tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
581 let session_cancel = runtime.cancel.child_token();
582 let reader_cancel = session_cancel.clone();
583 let reader = tokio::spawn(reader_loop(ws_stream, frame_tx, control_tx, reader_cancel));
584
585 let mut next_ping = runtime.clock.now_instant() + PING_INTERVAL;
586 let mut pong_deadline: Option<Instant> = None;
587
588 let writer_error: Option<IngestError> = loop {
589 tokio::select! {
590 biased;
591 _ = runtime.cancel.cancelled() => {
592 let _ = timed_send(
593 &mut ws_sink,
594 WsMessage::Close { code: NORMAL_CLOSE, reason: "bobbin shutdown".to_owned() },
595 ).await;
596 break None;
597 }
598 _ = runtime.clock.sleep_until(next_ping) => {
599 next_ping = runtime.clock.now_instant() + PING_INTERVAL;
600 if pong_deadline.is_none() {
601 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Ping(Bytes::new())).await {
602 break Some(e);
603 }
604 pong_deadline = Some(runtime.clock.now_instant() + PONG_TIMEOUT);
605 }
606 }
607 _ = wait_until(pong_deadline, runtime.clock.as_ref()) => {
608 break Some(IngestError::PongTimeout(PONG_TIMEOUT));
609 }
610 evt = control_rx.recv() => {
611 let Some(evt) = evt else { break None; };
612 match evt {
613 WsEvent::IncomingPing(payload) => {
614 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Pong(payload)).await {
615 break Some(e);
616 }
617 }
618 WsEvent::IncomingPong => {
619 pong_deadline = None;
620 }
621 }
622 }
623 }
624 };
625
626 session_cancel.cancel();
627 drop(ws_sink);
628 drop(control_rx);
629
630 let reader_end = reader.await.unwrap_or(SessionEnd {
631 outcome: SessionOutcome::Empty,
632 error: None,
633 });
634 if let Err(join) = processor.await {
635 warn!(?join, "frame processor task panicked");
636 }
637 SessionEnd {
638 outcome: reader_end.outcome,
639 error: writer_error.or(reader_end.error),
640 }
641}
642
643async fn timed_send(
644 sink: &mut Box<dyn bobbin_runtime::WsSink>,
645 msg: WsMessage,
646) -> Result<(), IngestError> {
647 match tokio::time::timeout(SEND_TIMEOUT, sink.send(msg)).await {
648 Ok(Ok(())) => Ok(()),
649 Ok(Err(e)) => Err(IngestError::Network(e)),
650 Err(_elapsed) => Err(IngestError::SendTimeout(SEND_TIMEOUT)),
651 }
652}
653
654async fn reader_loop(
655 mut ws_stream: Box<dyn WsStream>,
656 frame_tx: tokio::sync::mpsc::Sender<HydrantFrame>,
657 control_tx: tokio::sync::mpsc::Sender<WsEvent>,
658 cancel: CancellationToken,
659) -> SessionEnd {
660 let mut outcome = SessionOutcome::Empty;
661 let mut held: VecDeque<HydrantFrame> = VecDeque::new();
662
663 let error: Option<IngestError> = loop {
664 let step = if held.is_empty() {
665 tokio::select! {
666 biased;
667 _ = cancel.cancelled() => ReaderStep::Cancelled,
668 msg = ws_stream.next() => ReaderStep::WsMessage(msg),
669 }
670 } else if held.len() < READER_HOLD_LIMIT {
671 tokio::select! {
672 biased;
673 _ = cancel.cancelled() => ReaderStep::Cancelled,
674 permit_result = frame_tx.reserve() => match permit_result {
675 Ok(permit) => {
676 let frame = held
677 .pop_front()
678 .expect("non-empty held when reserve succeeds");
679 permit.send(frame);
680 ReaderStep::Sent
681 }
682 Err(_) => ReaderStep::FrameSinkClosed,
683 },
684 msg = ws_stream.next() => ReaderStep::WsMessage(msg),
685 }
686 } else {
687 tokio::select! {
688 biased;
689 _ = cancel.cancelled() => ReaderStep::Cancelled,
690 permit_result = frame_tx.reserve() => match permit_result {
691 Ok(permit) => {
692 let frame = held
693 .pop_front()
694 .expect("held at limit when reserve succeeds");
695 permit.send(frame);
696 ReaderStep::Sent
697 }
698 Err(_) => ReaderStep::FrameSinkClosed,
699 },
700 }
701 };
702
703 match step {
704 ReaderStep::Cancelled => break None,
705 ReaderStep::FrameSinkClosed => break None,
706 ReaderStep::Sent => {
707 outcome = SessionOutcome::Progressed;
708 }
709 ReaderStep::WsMessage(msg) => {
710 let Some(msg) = msg else {
711 break None;
712 };
713 let parsed = match msg {
714 Ok(m) => m,
715 Err(e) => break Some(IngestError::Network(e)),
716 };
717 match parsed {
718 WsMessage::Text(text) => {
719 let frame = match classify_text_frame(&text) {
720 Ok(f) => f,
721 Err(e) => break Some(e),
722 };
723 held.push_back(frame);
724 }
725 WsMessage::Binary(_) => {
726 debug!("hydrant sent unexpected binary frame, ignoring");
727 }
728 WsMessage::Ping(payload) => {
729 if control_tx
730 .send(WsEvent::IncomingPing(payload))
731 .await
732 .is_err()
733 {
734 break None;
735 }
736 }
737 WsMessage::Pong(_) => {
738 if control_tx.send(WsEvent::IncomingPong).await.is_err() {
739 break None;
740 }
741 }
742 WsMessage::Close { code, reason } => {
743 debug!(code, %reason, "hydrant closed stream");
744 break None;
745 }
746 }
747 }
748 }
749 };
750 SessionEnd { outcome, error }
751}
752
753enum ReaderStep {
754 Cancelled,
755 FrameSinkClosed,
756 Sent,
757 WsMessage(Option<Result<WsMessage, NetworkError>>),
758}
759
760fn classify_text_frame(text: &str) -> Result<HydrantFrame, IngestError> {
761 #[derive(serde::Deserialize)]
762 struct PeekType<'a> {
763 #[serde(rename = "type", borrow)]
764 kind: Option<std::borrow::Cow<'a, str>>,
765 }
766 let is_error_frame = serde_json::from_str::<PeekType>(text)
767 .ok()
768 .and_then(|p| p.kind)
769 .as_deref()
770 == Some("error");
771 if is_error_frame {
772 return match serde_json::from_str::<HydrantStreamErrorFrame>(text) {
773 Ok(err_frame) => Err(classify_hydrant_error(err_frame)),
774 Err(decode_err) => Err(IngestError::Decode(decode_err)),
775 };
776 }
777 serde_json::from_str::<HydrantFrame>(text).map_err(IngestError::Decode)
778}
779
780fn classify_hydrant_error(frame: HydrantStreamErrorFrame) -> IngestError {
781 let HydrantStreamErrorFrame { error, message } = frame;
782 let message = message.unwrap_or_default();
783 match error.as_str() {
784 "ConsumerTooSlow" => IngestError::ConsumerTooSlow { message },
785 _ => IngestError::HydrantStream {
786 code: error,
787 message,
788 },
789 }
790}
791
792#[derive(Debug)]
793enum WsEvent {
794 IncomingPing(Bytes),
795 IncomingPong,
796}
797
798async fn wait_until(deadline: Option<Instant>, clock: &dyn Clock) {
799 match deadline {
800 Some(d) => clock.sleep_until(d).await,
801 None => std::future::pending::<()>().await,
802 }
803}
804
805#[derive(Clone, Copy, Debug, Eq, PartialEq)]
806enum Regime {
807 Replay,
808 Live,
809 NonRecord,
810}
811
812impl Regime {
813 fn as_str(self) -> &'static str {
814 match self {
815 Self::Replay => "replay",
816 Self::Live => "live",
817 Self::NonRecord => "non_record",
818 }
819 }
820}
821
822struct Pending {
823 cursor: HydrantCursor,
824 signal: PromotionSignal,
825 regime: Regime,
826 op: PendingOp,
827}
828
829enum PendingOp {
830 Noop,
831 ClearCache {
832 source: AtUri<DefaultStr>,
833 },
834 Upsert {
835 source: AtUri<DefaultStr>,
836 nsid: Nsid<DefaultStr>,
837 parsed: Box<Record>,
838 bytes: Bytes,
839 cid: Option<Cid<DefaultStr>>,
840 edges: Vec<Edge>,
841 },
842 Parked {
843 nsid: Nsid<DefaultStr>,
844 },
845 Delete {
846 source: AtUri<DefaultStr>,
847 nsid: Nsid<DefaultStr>,
848 },
849}
850
851struct Prepared {
852 pending: Pending,
853 prepare_start: Instant,
854 prepare_end: Instant,
855}
856
857struct Resolved {
858 pending: Pending,
859 prepare_start: Instant,
860 prepare_end: Instant,
861 resolve_start: Instant,
862 resolve_end: Instant,
863}
864
865fn pending_nsid(op: &PendingOp) -> Option<&Nsid<DefaultStr>> {
866 match op {
867 PendingOp::Upsert { nsid, .. } => Some(nsid),
868 PendingOp::Delete { nsid, .. } => Some(nsid),
869 PendingOp::Parked { nsid, .. } => Some(nsid),
870 PendingOp::Noop | PendingOp::ClearCache { .. } => None,
871 }
872}
873
874fn pending_edge_count(op: &PendingOp) -> u64 {
875 match op {
876 PendingOp::Upsert { edges, .. } => edges.len() as u64,
877 _ => 0,
878 }
879}
880
881async fn prep_stage<S: SearchSink + 'static>(
882 frame: HydrantFrame,
883 rt: IngestRuntime<S>,
884) -> Prepared {
885 let now = rt.clock.now_unix_micros();
886 let prepare_start = rt.clock.now_instant();
887 let ctx = rt.pipeline_ctx();
888 let pending = prepare_frame(frame, &ctx, now).await;
889 let prepare_end = rt.clock.now_instant();
890 Prepared {
891 pending,
892 prepare_start,
893 prepare_end,
894 }
895}
896
897async fn resolve_stage<S: SearchSink + 'static>(
898 staged: Prepared,
899 rt: IngestRuntime<S>,
900) -> Resolved {
901 let resolve_start = rt.clock.now_instant();
902 let ctx = rt.pipeline_ctx();
903 let pending = resolve_pending(staged.pending, &ctx).await;
904 let resolve_end = rt.clock.now_instant();
905 Resolved {
906 pending,
907 prepare_start: staged.prepare_start,
908 prepare_end: staged.prepare_end,
909 resolve_start,
910 resolve_end,
911 }
912}
913
914async fn commit_stage<S: SearchSink + 'static>(
915 staged: Resolved,
916 rt: IngestRuntime<S>,
917 parallelism: usize,
918) {
919 let nsid = pending_nsid(&staged.pending.op).cloned();
920 let edge_count = pending_edge_count(&staged.pending.op);
921 let regime = staged.pending.regime;
922 let cursor = staged.pending.cursor.raw();
923 let Resolved {
924 pending,
925 prepare_start,
926 prepare_end,
927 resolve_start,
928 resolve_end,
929 } = staged;
930 let commit_start = rt.clock.now_instant();
931 commit_pending(
932 pending,
933 &rt.store,
934 &rt.issue_states,
935 &rt.pull_statuses,
936 &rt.coverage,
937 &*rt.search,
938 &*rt.records,
939 &rt.resolver,
940 )
941 .await;
942 let commit_end = rt.clock.now_instant();
943 tracing::trace!(
944 target: "bobbin_ingest::stage",
945 cursor,
946 regime = regime.as_str(),
947 nsid = %nsid.as_ref().map(Nsid::as_str).unwrap_or(""),
948 edge_count,
949 prepare_us = prepare_end.duration_since(prepare_start).as_micros() as u64,
950 queue_resolve_wait_us = resolve_start.duration_since(prepare_end).as_micros() as u64,
951 resolve_us = resolve_end.duration_since(resolve_start).as_micros() as u64,
952 queue_commit_wait_us = commit_start.duration_since(resolve_end).as_micros() as u64,
953 commit_us = commit_end.duration_since(commit_start).as_micros() as u64,
954 total_us = commit_end.duration_since(prepare_start).as_micros() as u64,
955 parallelism,
956 "pipeline stage timings",
957 );
958}
959
960async fn prepare_frame<S: SearchSink + 'static>(
961 frame: HydrantFrame,
962 ctx: &PipelineCtx<'_, S>,
963 now: UnixMicros,
964) -> Pending {
965 let cursor = HydrantCursor::new(frame.id);
966 let signal = promotion_signal(frame.record.as_ref(), now);
967 let regime = match frame.record.as_ref() {
968 Some(r) if r.live => Regime::Live,
969 Some(_) => Regime::Replay,
970 None => Regime::NonRecord,
971 };
972 let op = match frame.kind {
973 FrameKind::Record => prepare_record(frame.record, ctx).await,
974 FrameKind::Identity | FrameKind::Account => PendingOp::Noop,
975 FrameKind::Other => {
976 debug!(id = frame.id, "ignoring unknown hydrant frame kind");
977 PendingOp::Noop
978 }
979 };
980 Pending {
981 cursor,
982 signal,
983 regime,
984 op,
985 }
986}
987
988async fn prepare_record<S: SearchSink + 'static>(
989 record: Option<RecordFrame>,
990 ctx: &PipelineCtx<'_, S>,
991) -> PendingOp {
992 let Some(record) = record else {
993 debug!("record-typed frame missing payload, skipping");
994 return PendingOp::Noop;
995 };
996 if !record.collection.as_ref().starts_with(TANGLED_PREFIX) {
997 return PendingOp::Noop;
998 }
999 let nsid = record.collection.clone();
1000 let source = match build_source_uri(&record) {
1001 Ok(s) => s,
1002 Err(e) => {
1003 warn!(?e, "invalid frame source, dropping record");
1004 return PendingOp::Noop;
1005 }
1006 };
1007 match record.action {
1008 RecordAction::Create | RecordAction::Update => {
1009 evict_from_buffer(ctx.buffer, &source).await;
1010 let Some(raw) = record.record else {
1011 debug!(collection = %nsid, "create/update missing record body, clearing cache");
1012 return PendingOp::ClearCache { source };
1013 };
1014 let raw_bytes = Bytes::copy_from_slice(raw.get().as_bytes());
1015 let wire_bytes = match fallback_rfc3339(&record.rkey, &record.rev)
1016 .and_then(|fallback| synthesize_created_at(&raw_bytes, &fallback))
1017 {
1018 Some(patched) => Bytes::from(patched),
1019 None => raw_bytes,
1020 };
1021 let (parsed, bytes) = match decode_canon_or_upgrade_bytes(
1022 &record.collection,
1023 &wire_bytes,
1024 ctx.resolver,
1025 )
1026 .await
1027 {
1028 Ok((parsed, canon_bytes)) => {
1029 let bytes = match canon_bytes {
1030 std::borrow::Cow::Borrowed(_) => wire_bytes,
1031 std::borrow::Cow::Owned(v) => Bytes::from(v),
1032 };
1033 (parsed, bytes)
1034 }
1035 Err(ExtractError::UnknownCollection(name)) => {
1036 debug!(collection = %name, "unknown sh.tangled.* collection, clearing cache");
1037 return PendingOp::ClearCache { source };
1038 }
1039 Err(e) => {
1040 warn!(?e, collection = %record.collection, "record decode failed, clearing cache");
1041 return PendingOp::ClearCache { source };
1042 }
1043 };
1044 if let Record::Repo(repo) = &parsed {
1045 if let Some(shadow) = ctx.shadow {
1046 shadow.note_observed(&record.did, &record.rkey).await;
1047 }
1048 let superseded = ctx
1049 .resolver
1050 .observe(
1051 record.did.clone(),
1052 record.rkey.clone(),
1053 repo.repo_did.clone(),
1054 )
1055 .await;
1056 if let Some(prior) = superseded {
1057 let prior_uri = format!(
1058 "at://{}/sh.tangled.repo/{}",
1059 prior.owner.as_ref(),
1060 prior.rkey.as_ref(),
1061 );
1062 if let Ok(prior_at_uri) = AtUri::<DefaultStr>::new_owned(&prior_uri) {
1063 evict_from_buffer(ctx.buffer, &prior_at_uri).await;
1064 ctx.store.remove_source(&prior_at_uri);
1065 ctx.records.remove(&prior_at_uri);
1066 ctx.search.remove(&prior_at_uri).await;
1067 }
1068 }
1069 if let Some(buffer) = ctx.buffer {
1070 let drained = buffer.take_observed(&record.did, &record.rkey).await;
1071 if !drained.is_empty() {
1072 finalize_drained(ctx, drained).await;
1073 }
1074 }
1075 if let Some(registry) = ctx.knot_registry {
1076 let host = KnotHostKey::new(repo.knot.as_ref());
1077 match repo.repo_did.clone() {
1078 Some(repo_did) => registry.observe_repo(&host, repo_did),
1079 None => registry.observe_host(&host),
1080 }
1081 }
1082 }
1083 match acl_disposition(&parsed, ctx.knot_gate, ctx.knot_registry) {
1084 AclDisposition::NativeSkip => {
1085 if let Some(registry) = ctx.knot_registry {
1086 registry.forget_legacy_member(&source);
1087 }
1088 return PendingOp::Delete { source, nsid };
1089 }
1090 AclDisposition::LegacyMember { host } => {
1091 if let Some(registry) = ctx.knot_registry {
1092 registry.observe_host(&host);
1093 registry.note_legacy_member(source.clone(), &host);
1094 }
1095 }
1096 AclDisposition::Other => {}
1097 }
1098 let edges = match parsed.extract_edges(&source) {
1099 Ok(es) => es,
1100 Err(e) => {
1101 warn!(?e, "edge extraction failed, clearing cache");
1102 return PendingOp::ClearCache { source };
1103 }
1104 };
1105 let _ = ctx.store.intern_source(&source);
1106 PendingOp::Upsert {
1107 source,
1108 nsid,
1109 parsed: Box::new(parsed),
1110 bytes,
1111 cid: record.cid,
1112 edges,
1113 }
1114 }
1115 RecordAction::Delete => {
1116 evict_from_buffer(ctx.buffer, &source).await;
1117 if nsid.as_ref() == "sh.tangled.repo" {
1118 ctx.resolver.forget(&record.did, &record.rkey).await;
1119 }
1120 if nsid.as_ref() == "sh.tangled.knot.member"
1121 && let Some(registry) = ctx.knot_registry
1122 {
1123 registry.forget_legacy_member(&source);
1124 }
1125 PendingOp::Delete { source, nsid }
1126 }
1127 RecordAction::Other => {
1128 debug!(collection = %nsid, "ignoring unknown record action");
1129 PendingOp::Noop
1130 }
1131 }
1132}
1133
1134enum AclDisposition {
1135 Other,
1136 NativeSkip,
1137 LegacyMember { host: KnotHostKey },
1138}
1139
1140fn acl_disposition(
1141 parsed: &Record,
1142 gate: Option<&CapabilityGate>,
1143 registry: Option<&KnotRegistry>,
1144) -> AclDisposition {
1145 let Some(gate) = gate else {
1146 return AclDisposition::Other;
1147 };
1148 match parsed {
1149 Record::KnotMember(member) => {
1150 let host = KnotHostKey::new(member.domain.as_ref());
1151 if gate.is_native(&host) {
1152 AclDisposition::NativeSkip
1153 } else {
1154 AclDisposition::LegacyMember { host }
1155 }
1156 }
1157 Record::Collaborator(collaborator) => {
1158 let native = registry
1159 .and_then(|registry| registry.host_of_repo(&collaborator.repo))
1160 .is_some_and(|host| gate.is_native(&host));
1161 if native {
1162 AclDisposition::NativeSkip
1163 } else {
1164 AclDisposition::Other
1165 }
1166 }
1167 _ => AclDisposition::Other,
1168 }
1169}
1170
1171fn fallback_rfc3339(
1172 rkey: &Rkey<DefaultStr>,
1173 rev: &jacquard_common::types::tid::Tid,
1174) -> Option<String> {
1175 let tid = jacquard_common::types::tid::Tid::new(rkey.as_ref())
1176 .ok()
1177 .unwrap_or_else(|| rev.clone());
1178 let micros = i64::try_from(tid.timestamp()).ok()?;
1179 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(micros)?;
1180 Some(dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true))
1181}
1182
1183async fn evict_from_buffer(buffer: Option<&WarmingBuffer>, source: &AtUri<DefaultStr>) {
1184 if let Some(buffer) = buffer
1185 && !buffer.is_sealed()
1186 {
1187 buffer.evict_source(source).await;
1188 }
1189}
1190
1191async fn resolve_pending<S: SearchSink + 'static>(
1192 pending: Pending,
1193 ctx: &PipelineCtx<'_, S>,
1194) -> Pending {
1195 let Pending {
1196 cursor,
1197 signal,
1198 regime,
1199 op,
1200 } = pending;
1201 let op = match op {
1202 PendingOp::Upsert {
1203 source,
1204 nsid,
1205 parsed,
1206 bytes,
1207 cid,
1208 edges,
1209 } => {
1210 let pieces = UpsertPieces {
1211 source,
1212 nsid,
1213 parsed: *parsed,
1214 bytes,
1215 cid,
1216 edges,
1217 };
1218 let pieces = match try_park_warming(ctx, cursor, pieces).await {
1219 ParkOutcome::Parked { nsid } => {
1220 return Pending {
1221 cursor,
1222 signal,
1223 regime,
1224 op: PendingOp::Parked { nsid },
1225 };
1226 }
1227 ParkOutcome::Passthrough(pieces) => *pieces,
1228 };
1229 let UpsertPieces {
1230 source,
1231 nsid,
1232 parsed,
1233 bytes,
1234 cid,
1235 edges,
1236 } = pieces;
1237 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, ctx.shadow).await;
1238 PendingOp::Upsert {
1239 source,
1240 nsid,
1241 parsed: Box::new(parsed),
1242 bytes,
1243 cid,
1244 edges,
1245 }
1246 }
1247 other => other,
1248 };
1249 Pending {
1250 cursor,
1251 signal,
1252 regime,
1253 op,
1254 }
1255}
1256
1257struct UpsertPieces {
1258 source: AtUri<DefaultStr>,
1259 nsid: Nsid<DefaultStr>,
1260 parsed: Record,
1261 bytes: Bytes,
1262 cid: Option<Cid<DefaultStr>>,
1263 edges: Vec<Edge>,
1264}
1265
1266impl From<ParkedUpsert> for UpsertPieces {
1267 fn from(u: ParkedUpsert) -> Self {
1268 Self {
1269 source: u.source,
1270 nsid: u.nsid,
1271 parsed: u.parsed,
1272 bytes: u.bytes,
1273 cid: u.cid,
1274 edges: u.edges,
1275 }
1276 }
1277}
1278
1279enum ParkOutcome {
1280 Parked { nsid: Nsid<DefaultStr> },
1281 Passthrough(Box<UpsertPieces>),
1282}
1283
1284async fn try_park_warming<S: SearchSink + 'static>(
1285 ctx: &PipelineCtx<'_, S>,
1286 cursor: HydrantCursor,
1287 pieces: UpsertPieces,
1288) -> ParkOutcome {
1289 let Some(buffer) = ctx.buffer else {
1290 return ParkOutcome::Passthrough(Box::new(pieces));
1291 };
1292 if ctx.coverage.snapshot().is_ready() || buffer.is_sealed() {
1293 return ParkOutcome::Passthrough(Box::new(pieces));
1294 }
1295 let deps = collect_unresolved_deps(&pieces.edges, ctx.resolver).await;
1296 if deps.is_empty() {
1297 return ParkOutcome::Passthrough(Box::new(pieces));
1298 }
1299 let nsid = pieces.nsid.clone();
1300 let upsert = ParkedUpsert {
1301 cursor,
1302 source: pieces.source,
1303 nsid: pieces.nsid,
1304 parsed: pieces.parsed,
1305 bytes: pieces.bytes,
1306 cid: pieces.cid,
1307 edges: pieces.edges,
1308 };
1309 let deps_for_shadow = ctx.shadow.is_some().then(|| deps.clone());
1310 match buffer.try_park(upsert, deps).await {
1311 Ok(()) => {
1312 if let Some((shadow, noted)) = ctx.shadow.zip(deps_for_shadow) {
1313 let _ = futures::future::join_all(noted.into_iter().map(|dep| async move {
1314 shadow.note_unresolved(dep.owner, dep.rkey).await;
1315 }))
1316 .await;
1317 }
1318 ParkOutcome::Parked { nsid }
1319 }
1320 Err(returned) => ParkOutcome::Passthrough(Box::new(returned.into())),
1321 }
1322}
1323
1324async fn collect_unresolved_deps(edges: &[Edge], resolver: &RepoIdResolver) -> Vec<RepoIdent> {
1325 futures::stream::iter(edges)
1326 .fold(Vec::new(), |mut acc, edge| async move {
1327 let Some(uri) = edge.subject.as_uri() else {
1328 return acc;
1329 };
1330 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else {
1331 return acc;
1332 };
1333 if resolver.cached_resolution(&owner, &rkey).await.is_some() {
1334 return acc;
1335 }
1336 let candidate = RepoIdent::new(owner, rkey);
1337 if !acc.contains(&candidate) {
1338 acc.push(candidate);
1339 }
1340 acc
1341 })
1342 .await
1343}
1344
1345async fn finalize_drained<S: SearchSink + 'static>(
1346 ctx: &PipelineCtx<'_, S>,
1347 drained: Vec<ParkedUpsert>,
1348) {
1349 for upsert in drained {
1350 let ParkedUpsert {
1351 cursor: _,
1352 source,
1353 nsid: _,
1354 parsed,
1355 bytes,
1356 cid,
1357 edges,
1358 } = upsert;
1359 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, None).await;
1360 cache_body(ctx.records, &source, cid, bytes);
1361 ctx.store.upsert_source(&source, edges);
1362 let outcome = apply_record_state(ctx.issue_states, ctx.pull_statuses, &source, &parsed);
1363 log_unknown_state_variant(outcome, &source);
1364 index_search(ctx.search, ctx.resolver, &source, parsed).await;
1365 }
1366}
1367
1368#[allow(clippy::too_many_arguments)]
1369async fn commit_pending<S: SearchSink>(
1370 pending: Pending,
1371 store: &EdgeStore,
1372 issue_states: &StateIndex<IssueStateKind>,
1373 pull_statuses: &StateIndex<PullStatusKind>,
1374 coverage: &CoverageWatch,
1375 search: &S,
1376 records: &dyn RecordStore,
1377 resolver: &RepoIdResolver,
1378) {
1379 let Pending {
1380 cursor,
1381 signal,
1382 regime: _,
1383 op,
1384 } = pending;
1385 match op {
1386 PendingOp::Noop | PendingOp::Parked { .. } => {}
1387 PendingOp::ClearCache { source } => records.remove(&source),
1388 PendingOp::Upsert {
1389 source,
1390 nsid: _,
1391 parsed,
1392 bytes,
1393 cid,
1394 edges,
1395 } => {
1396 cache_body(records, &source, cid, bytes);
1397 store.upsert_source(&source, edges);
1398 let outcome = apply_record_state(issue_states, pull_statuses, &source, &parsed);
1399 log_unknown_state_variant(outcome, &source);
1400 index_search(search, resolver, &source, *parsed).await;
1401 }
1402 PendingOp::Delete { source, nsid } => {
1403 store.remove_source(&source);
1404 apply_delete_to_state_index(issue_states, pull_statuses, &source, &nsid);
1405 records.remove(&source);
1406 search.remove(&source).await;
1407 }
1408 }
1409 coverage.update(|c| c.advance(cursor).maybe_promote(signal));
1410}
1411
1412async fn index_search<S: SearchSink>(
1413 search: &S,
1414 resolver: &RepoIdResolver,
1415 source: &AtUri<DefaultStr>,
1416 parsed: Record,
1417) {
1418 let Some(searchable) = SearchableRecord::try_from_record(parsed) else {
1419 return;
1420 };
1421 let Some(searchable) = searchable.normalize(resolver).await else {
1422 return;
1423 };
1424 search.upsert(searchable.to_search_doc(source)).await;
1425}
1426
1427#[cfg(test)]
1428#[allow(clippy::too_many_arguments)]
1429async fn handle_frame<S: SearchSink + 'static>(
1430 frame: HydrantFrame,
1431 store: &EdgeStore,
1432 issue_states: &StateIndex<IssueStateKind>,
1433 pull_statuses: &StateIndex<PullStatusKind>,
1434 coverage: &CoverageWatch,
1435 search: &S,
1436 records: &dyn RecordStore,
1437 resolver: &RepoIdResolver,
1438 clock: &dyn Clock,
1439 now: UnixMicros,
1440) {
1441 let _ = clock;
1442 let ctx = PipelineCtx {
1443 resolver,
1444 store,
1445 issue_states,
1446 pull_statuses,
1447 coverage,
1448 records,
1449 search,
1450 shadow: None,
1451 buffer: None,
1452 knot_registry: None,
1453 knot_gate: None,
1454 };
1455 let pending = prepare_frame(frame, &ctx, now).await;
1456 let pending = resolve_pending(pending, &ctx).await;
1457 commit_pending(
1458 pending,
1459 store,
1460 issue_states,
1461 pull_statuses,
1462 coverage,
1463 search,
1464 records,
1465 resolver,
1466 )
1467 .await;
1468}
1469
1470fn log_unknown_state_variant(outcome: ApplyOutcome, source: &AtUri<DefaultStr>) {
1471 if matches!(outcome, ApplyOutcome::UnknownVariant) {
1472 warn!(
1473 target: "bobbin_ingest::state_index",
1474 %source,
1475 "state record has unknown wire variant, skipping index update",
1476 );
1477 }
1478}
1479
1480fn apply_delete_to_state_index(
1481 issue_states: &StateIndex<IssueStateKind>,
1482 pull_statuses: &StateIndex<PullStatusKind>,
1483 source: &AtUri<DefaultStr>,
1484 nsid: &Nsid<DefaultStr>,
1485) {
1486 match nsid.as_ref() {
1487 "sh.tangled.repo.issue" => issue_states.remove_entity(source),
1488 "sh.tangled.repo.pull" => pull_statuses.remove_entity(source),
1489 "sh.tangled.repo.issue.state" => issue_states.remove_source(source),
1490 "sh.tangled.repo.pull.status" => pull_statuses.remove_source(source),
1491 _ => {}
1492 }
1493}
1494
1495fn promotion_signal(record: Option<&RecordFrame>, now: UnixMicros) -> PromotionSignal {
1496 PromotionSignal {
1497 rev_micros: record.map(|r| r.rev.timestamp()),
1498 now_micros: now.raw(),
1499 skew_micros: READY_SKEW.as_micros() as u64,
1500 }
1501}
1502
1503fn cache_body(
1504 records: &dyn RecordStore,
1505 source: &AtUri<DefaultStr>,
1506 cid: Option<Cid<DefaultStr>>,
1507 bytes: Bytes,
1508) {
1509 match cid {
1510 Some(cid) => records.put(
1511 source.clone(),
1512 Arc::new(RecordBody {
1513 uri: source.clone(),
1514 cid,
1515 value: bytes,
1516 }),
1517 ),
1518 None => records.remove(source),
1519 }
1520}
1521
1522async fn normalize_subjects(
1523 edges: Vec<Edge>,
1524 resolver: &RepoIdResolver,
1525 coverage: &CoverageWatch,
1526 shadow: Option<&WarmingShadowBuffer>,
1527) -> Vec<Edge> {
1528 let warming = shadow.is_some() && !coverage.snapshot().is_ready();
1529 futures::stream::iter(edges)
1530 .filter_map(|edge| async move {
1531 let Some(uri) = edge.subject.as_uri() else {
1532 return Some(edge);
1533 };
1534 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else {
1535 return Some(edge);
1536 };
1537 if warming
1538 && let Some(shadow) = shadow
1539 && resolver.cached_resolution(&owner, &rkey).await.is_none()
1540 {
1541 shadow
1542 .note_unresolved(owner.clone(), rkey.clone())
1543 .await;
1544 }
1545 match resolver.resolve(&owner, &rkey).await {
1546 Resolution::Mapped(repo_did) => Some(Edge {
1547 subject: SubjectRef::Did(repo_did),
1548 ..edge
1549 }),
1550 Resolution::NoRepoDid => {
1551 warn!(
1552 target: "bobbin_ingest::normalize",
1553 kind = %edge.kind,
1554 owner = owner.as_ref(),
1555 rkey = rkey.as_ref(),
1556 source = edge.source.as_ref(),
1557 "dropping edge: target repo has no repoDid, no canonical DID subject available",
1558 );
1559 None
1560 }
1561 Resolution::Unresolvable => {
1562 warn!(
1563 target: "bobbin_ingest::normalize",
1564 kind = %edge.kind,
1565 owner = owner.as_ref(),
1566 rkey = rkey.as_ref(),
1567 source = edge.source.as_ref(),
1568 "dropping edge: repo unresolvable, rkey-form subject will not match bare-DID queries",
1569 );
1570 None
1571 }
1572 }
1573 })
1574 .collect()
1575 .await
1576}
1577
1578fn parse_repo_subject_uri(uri: &AtUri<DefaultStr>) -> Option<(Did<DefaultStr>, Rkey<DefaultStr>)> {
1579 let collection = uri.collection()?;
1580 if collection.as_ref() != "sh.tangled.repo" {
1581 return None;
1582 }
1583 let AtIdentifier::Did(authority) = uri.authority() else {
1584 return None;
1585 };
1586 let rkey = uri.rkey()?;
1587 let owner = Did::new_owned(authority.as_ref()).ok()?;
1588 let rkey = Rkey::new_owned(rkey.as_ref()).ok()?;
1589 Some((owner, rkey))
1590}
1591
1592fn build_source_uri(r: &RecordFrame) -> Result<AtUri<DefaultStr>, IngestError> {
1593 Ok(AtUri::from_parts_owned(
1594 r.did.as_ref(),
1595 r.collection.as_ref(),
1596 r.rkey.as_ref(),
1597 )?)
1598}
1599
1600#[cfg(test)]
1601mod tests {
1602 use super::*;
1603 use bobbin_edge_index::Coverage;
1604 use bobbin_record_lru::{CacheCapacity, LruRecordStore, NoopRecordStore, RecordStore};
1605 use bobbin_runtime::{OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs};
1606 use bobbin_types::search::NoopSearchSink;
1607 use jacquard_common::types::nsid::Nsid;
1608 use jacquard_common::types::tid::Tid;
1609 use serde_json::json;
1610
1611 const VALID_CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
1612
1613 fn did_subj(s: &str) -> SubjectRef {
1614 SubjectRef::Did(Did::new_owned(s).unwrap())
1615 }
1616
1617 fn uri_subj(s: &str) -> SubjectRef {
1618 SubjectRef::Uri(AtUri::new_owned(s).unwrap())
1619 }
1620
1621 fn rkey(s: &str) -> Rkey<DefaultStr> {
1622 Rkey::new_owned(s).unwrap()
1623 }
1624
1625 #[allow(clippy::type_complexity)]
1626 fn fresh() -> (
1627 Arc<EdgeStore>,
1628 Arc<StateIndex<IssueStateKind>>,
1629 Arc<StateIndex<PullStatusKind>>,
1630 Arc<CoverageWatch>,
1631 Arc<RepoIdResolver>,
1632 ) {
1633 (
1634 Arc::new(EdgeStore::new(RuntimeHasher::default())),
1635 Arc::new(StateIndex::new(RuntimeHasher::default())),
1636 Arc::new(StateIndex::new(RuntimeHasher::default())),
1637 Arc::new(CoverageWatch::new()),
1638 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
1639 )
1640 }
1641
1642 fn now() -> UnixMicros {
1643 SystemClock::new().now_unix_micros()
1644 }
1645
1646 fn sys_clock() -> SystemClock {
1647 SystemClock::new()
1648 }
1649
1650 fn parse_frame(value: serde_json::Value) -> HydrantFrame {
1651 let text = serde_json::to_string(&value).expect("serialize fixture");
1652 serde_json::from_str(&text).expect("deserialize fixture")
1653 }
1654
1655 fn fresh_tid() -> Tid {
1656 Tid::now_0()
1657 }
1658
1659 #[tokio::test]
1660 async fn ignores_non_tangled_collections() {
1661 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1662 let frame: HydrantFrame = parse_frame(json!({
1663 "id": 1,
1664 "type": "record",
1665 "record": {
1666 "live": false,
1667 "did": "did:plc:nel",
1668 "rev": fresh_tid().as_str(),
1669 "collection": "app.bsky.feed.post",
1670 "rkey": "abcabcabcabcz",
1671 "action": "create",
1672 "record": {"$type": "app.bsky.feed.post", "text": "hi"}
1673 }
1674 }));
1675 handle_frame(
1676 frame,
1677 &store,
1678 &issue_states,
1679 &pull_statuses,
1680 &cov,
1681 &NoopSearchSink,
1682 &NoopRecordStore,
1683 &resolver,
1684 &sys_clock(),
1685 now(),
1686 )
1687 .await;
1688 assert_eq!(store.key_count(), 0);
1689 assert_eq!(cov.snapshot().events_processed(), 1);
1690 }
1691
1692 #[tokio::test]
1693 async fn native_knot_member_skipped_legacy_indexed() {
1694 use bobbin_knot_ingest::{CapabilityGate, KnotClient, KnotRegistry};
1695 use wiremock::matchers::{method, path};
1696 use wiremock::{Mock, MockServer, ResponseTemplate};
1697
1698 let server = MockServer::start().await;
1699 Mock::given(method("GET"))
1700 .and(path("/xrpc/sh.tangled.knot.version"))
1701 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1702 "version": "1.1.0",
1703 "capabilities": ["knot-acl"]
1704 })))
1705 .mount(&server)
1706 .await;
1707 let url = url::Url::parse(&server.uri()).unwrap();
1708 let native_host = format!("{}:{}", url.host_str().unwrap(), url.port().unwrap());
1709
1710 let gate = CapabilityGate::new(
1711 KnotClient::with_default_http(true).unwrap(),
1712 Arc::new(SystemClock::new()),
1713 true,
1714 true,
1715 );
1716 assert!(gate.has_knot_acl(&KnotHostKey::new(&native_host)).await);
1717
1718 let registry = KnotRegistry::new();
1719 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1720 let ctx = PipelineCtx {
1721 resolver: &resolver,
1722 store: &store,
1723 issue_states: &issue_states,
1724 pull_statuses: &pull_statuses,
1725 coverage: &cov,
1726 records: &NoopRecordStore,
1727 search: &NoopSearchSink,
1728 shadow: None,
1729 buffer: None,
1730 knot_registry: Some(®istry),
1731 knot_gate: Some(&gate),
1732 };
1733
1734 let member_frame = |id: u64, rkey: &str, domain: &str| {
1735 parse_frame(json!({
1736 "id": id,
1737 "type": "record",
1738 "record": {
1739 "live": false,
1740 "did": "did:plc:akshay",
1741 "rev": fresh_tid().as_str(),
1742 "collection": "sh.tangled.knot.member",
1743 "rkey": rkey,
1744 "action": "create",
1745 "record": {
1746 "$type": "sh.tangled.knot.member",
1747 "subject": "did:plc:boltless",
1748 "domain": domain,
1749 "createdAt": "2026-06-01T00:00:00Z"
1750 }
1751 }
1752 }))
1753 };
1754
1755 let native =
1756 prepare_frame(member_frame(1, "aaaaaaaaaaaaz", &native_host), &ctx, now()).await;
1757 assert!(
1758 matches!(native.op, PendingOp::Delete { .. }),
1759 "member record for a native knot must be dropped"
1760 );
1761
1762 let legacy =
1763 prepare_frame(member_frame(2, "bbbbbbbbbbbbz", "legacy.knot"), &ctx, now()).await;
1764 assert!(
1765 matches!(legacy.op, PendingOp::Upsert { .. }),
1766 "member record for a legacy knot must be ingested"
1767 );
1768 assert!(
1769 registry.hosts().contains(&KnotHostKey::new("legacy.knot")),
1770 "a member record seeds host discovery even before any repo is seen"
1771 );
1772 assert_eq!(
1773 registry
1774 .drain_legacy_members(&KnotHostKey::new("legacy.knot"))
1775 .len(),
1776 1,
1777 "legacy member edge is indexed for later purge once the knot upgrades"
1778 );
1779 }
1780
1781 #[tokio::test]
1782 async fn create_then_delete_round_trips_a_star() {
1783 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1784 let create: HydrantFrame = parse_frame(json!({
1785 "id": 10,
1786 "type": "record",
1787 "record": {
1788 "live": false,
1789 "did": "did:plc:olaren",
1790 "rev": fresh_tid().as_str(),
1791 "collection": "sh.tangled.feed.star",
1792 "rkey": "abcabcabcabcz",
1793 "action": "create",
1794 "record": {
1795 "$type": "sh.tangled.feed.star",
1796 "createdAt": "2026-05-01T00:00:00Z",
1797 "subjectDid": "did:plc:abalone"
1798 }
1799 }
1800 }));
1801 handle_frame(
1802 create,
1803 &store,
1804 &issue_states,
1805 &pull_statuses,
1806 &cov,
1807 &NoopSearchSink,
1808 &NoopRecordStore,
1809 &resolver,
1810 &sys_clock(),
1811 now(),
1812 )
1813 .await;
1814 let key = bobbin_types::ids::EdgeKey::new(
1815 Nsid::new_static("sh.tangled.feed.star").unwrap(),
1816 did_subj("did:plc:abalone"),
1817 );
1818 assert_eq!(store.count(&key), 1);
1819
1820 let delete: HydrantFrame = parse_frame(json!({
1821 "id": 11,
1822 "type": "record",
1823 "record": {
1824 "live": false,
1825 "did": "did:plc:olaren",
1826 "rev": fresh_tid().as_str(),
1827 "collection": "sh.tangled.feed.star",
1828 "rkey": "abcabcabcabcz",
1829 "action": "delete",
1830 "record": null
1831 }
1832 }));
1833 handle_frame(
1834 delete,
1835 &store,
1836 &issue_states,
1837 &pull_statuses,
1838 &cov,
1839 &NoopSearchSink,
1840 &NoopRecordStore,
1841 &resolver,
1842 &sys_clock(),
1843 now(),
1844 )
1845 .await;
1846 assert_eq!(store.count(&key), 0);
1847 }
1848
1849 #[tokio::test]
1850 async fn update_replaces_prior_edges() {
1851 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1852 let mk = |subject_did: &Did<DefaultStr>, id: u64| -> HydrantFrame {
1853 parse_frame(json!({
1854 "id": id,
1855 "type": "record",
1856 "record": {
1857 "live": false,
1858 "did": "did:plc:olaren",
1859 "rev": fresh_tid().as_str(),
1860 "collection": "sh.tangled.feed.star",
1861 "rkey": "abcabcabcabcz",
1862 "action": "update",
1863 "record": {
1864 "$type": "sh.tangled.feed.star",
1865 "createdAt": "2026-05-01T00:00:00Z",
1866 "subjectDid": subject_did.as_ref()
1867 }
1868 }
1869 }))
1870 };
1871 handle_frame(
1872 mk(&Did::new_owned("did:plc:abalone").unwrap(), 1),
1873 &store,
1874 &issue_states,
1875 &pull_statuses,
1876 &cov,
1877 &NoopSearchSink,
1878 &NoopRecordStore,
1879 &resolver,
1880 &sys_clock(),
1881 now(),
1882 )
1883 .await;
1884 handle_frame(
1885 mk(&Did::new_owned("did:plc:uni").unwrap(), 2),
1886 &store,
1887 &issue_states,
1888 &pull_statuses,
1889 &cov,
1890 &NoopSearchSink,
1891 &NoopRecordStore,
1892 &resolver,
1893 &sys_clock(),
1894 now(),
1895 )
1896 .await;
1897
1898 let kind = Nsid::new_static("sh.tangled.feed.star").unwrap();
1899 let old = bobbin_types::ids::EdgeKey::new(kind.clone(), did_subj("did:plc:abalone"));
1900 let new = bobbin_types::ids::EdgeKey::new(kind, did_subj("did:plc:uni"));
1901 assert_eq!(store.count(&old), 0);
1902 assert_eq!(store.count(&new), 1);
1903 }
1904
1905 #[tokio::test]
1906 async fn prepare_frame_tags_regime_from_live_flag() {
1907 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1908 let search = NoopSearchSink;
1909 let records = NoopRecordStore;
1910 let ctx = PipelineCtx {
1911 resolver: &resolver,
1912 store: &store,
1913 issue_states: &issue_states,
1914 pull_statuses: &pull_statuses,
1915 coverage: &cov,
1916 records: &records,
1917 search: &search,
1918 shadow: None,
1919 buffer: None,
1920 knot_registry: None,
1921 knot_gate: None,
1922 };
1923 let mk = |live: bool| -> HydrantFrame {
1924 parse_frame(json!({
1925 "id": 1,
1926 "type": "record",
1927 "record": {
1928 "live": live,
1929 "did": "did:plc:olaren",
1930 "rev": fresh_tid().as_str(),
1931 "collection": "sh.tangled.feed.star",
1932 "rkey": "abcabcabcabcz",
1933 "action": "create",
1934 "record": {
1935 "$type": "sh.tangled.feed.star",
1936 "createdAt": "2026-05-01T00:00:00Z",
1937 "subjectDid": "did:plc:abalone"
1938 }
1939 }
1940 }))
1941 };
1942 let live_pending = prepare_frame(mk(true), &ctx, now()).await;
1943 assert_eq!(live_pending.regime, Regime::Live);
1944 let replay_pending = prepare_frame(mk(false), &ctx, now()).await;
1945 assert_eq!(replay_pending.regime, Regime::Replay);
1946
1947 let identity: HydrantFrame = parse_frame(json!({
1948 "id": 9,
1949 "type": "identity",
1950 }));
1951 let id_pending = prepare_frame(identity, &ctx, now()).await;
1952 assert_eq!(id_pending.regime, Regime::NonRecord);
1953 }
1954
1955 #[tokio::test]
1956 async fn create_with_cid_warms_record_lru() {
1957 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1958 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
1959 let frame: HydrantFrame = parse_frame(json!({
1960 "id": 1,
1961 "type": "record",
1962 "record": {
1963 "live": false,
1964 "did": "did:plc:olaren",
1965 "rev": fresh_tid().as_str(),
1966 "collection": "sh.tangled.feed.star",
1967 "rkey": "abcabcabcabcz",
1968 "action": "create",
1969 "cid": VALID_CID,
1970 "record": {
1971 "$type": "sh.tangled.feed.star",
1972 "createdAt": "2026-05-01T00:00:00Z",
1973 "subjectDid": "did:plc:abalone"
1974 }
1975 }
1976 }));
1977 let source =
1978 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap();
1979 handle_frame(
1980 frame,
1981 &store,
1982 &issue_states,
1983 &pull_statuses,
1984 &cov,
1985 &NoopSearchSink,
1986 &lru,
1987 &resolver,
1988 &sys_clock(),
1989 now(),
1990 )
1991 .await;
1992 let cached = lru.get(&source).expect("hydrant cid must seed the lru");
1993 assert_eq!(cached.cid.as_ref(), VALID_CID);
1994 let parsed: serde_json::Value = serde_json::from_slice(&cached.value).unwrap();
1995 assert_eq!(
1996 parsed["subject"]["did"], "did:plc:abalone",
1997 "legacy wire is upgraded to canon shape before caching so downstream readers see canonical fields"
1998 );
1999 }
2000
2001 #[tokio::test]
2002 async fn create_without_cid_clears_record_lru() {
2003 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2004 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
2005 let source =
2006 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap();
2007 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap();
2008 lru.put(
2009 source.clone(),
2010 Arc::new(RecordBody {
2011 uri: source.clone(),
2012 cid,
2013 value: bytes::Bytes::from_static(b"{\"stale\":true}"),
2014 }),
2015 );
2016 let frame: HydrantFrame = parse_frame(json!({
2017 "id": 1,
2018 "type": "record",
2019 "record": {
2020 "live": false,
2021 "did": "did:plc:olaren",
2022 "rev": fresh_tid().as_str(),
2023 "collection": "sh.tangled.feed.star",
2024 "rkey": "abcabcabcabcz",
2025 "action": "update",
2026 "record": {
2027 "$type": "sh.tangled.feed.star",
2028 "createdAt": "2026-05-01T00:00:00Z",
2029 "subjectDid": "did:plc:abalone"
2030 }
2031 }
2032 }));
2033 handle_frame(
2034 frame,
2035 &store,
2036 &issue_states,
2037 &pull_statuses,
2038 &cov,
2039 &NoopSearchSink,
2040 &lru,
2041 &resolver,
2042 &sys_clock(),
2043 now(),
2044 )
2045 .await;
2046 assert!(
2047 lru.get(&source).is_none(),
2048 "missing cid means we cannot trust the body, so the lru must be cleared",
2049 );
2050 }
2051
2052 #[tokio::test]
2053 async fn delete_evicts_record_lru() {
2054 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2055 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
2056 let source =
2057 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap();
2058 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap();
2059 lru.put(
2060 source.clone(),
2061 Arc::new(RecordBody {
2062 uri: source.clone(),
2063 cid,
2064 value: bytes::Bytes::from_static(b"{}"),
2065 }),
2066 );
2067 let frame: HydrantFrame = parse_frame(json!({
2068 "id": 1,
2069 "type": "record",
2070 "record": {
2071 "live": false,
2072 "did": "did:plc:olaren",
2073 "rev": fresh_tid().as_str(),
2074 "collection": "sh.tangled.feed.star",
2075 "rkey": "abcabcabcabcz",
2076 "action": "delete",
2077 "record": null
2078 }
2079 }));
2080 handle_frame(
2081 frame,
2082 &store,
2083 &issue_states,
2084 &pull_statuses,
2085 &cov,
2086 &NoopSearchSink,
2087 &lru,
2088 &resolver,
2089 &sys_clock(),
2090 now(),
2091 )
2092 .await;
2093 assert!(lru.get(&source).is_none());
2094 }
2095
2096 #[tokio::test]
2097 async fn live_recent_event_promotes_coverage_to_ready() {
2098 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2099 assert!(!cov.snapshot().is_ready());
2100 let frame: HydrantFrame = parse_frame(json!({
2101 "id": 99,
2102 "type": "record",
2103 "record": {
2104 "live": true,
2105 "did": "did:plc:olaren",
2106 "rev": fresh_tid().as_str(),
2107 "collection": "sh.tangled.feed.star",
2108 "rkey": "abcabcabcabcz",
2109 "action": "create",
2110 "record": {
2111 "$type": "sh.tangled.feed.star",
2112 "createdAt": "2026-05-01T00:00:00Z",
2113 "subjectDid": "did:plc:abalone"
2114 }
2115 }
2116 }));
2117 handle_frame(
2118 frame,
2119 &store,
2120 &issue_states,
2121 &pull_statuses,
2122 &cov,
2123 &NoopSearchSink,
2124 &NoopRecordStore,
2125 &resolver,
2126 &sys_clock(),
2127 now(),
2128 )
2129 .await;
2130 assert!(cov.snapshot().is_ready());
2131 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(99));
2132 }
2133
2134 #[tokio::test]
2135 async fn live_but_stale_rev_does_not_promote() {
2136 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2137 let stale_tid = Tid::from_time(1_000_000, 0);
2138 let frame: HydrantFrame = parse_frame(json!({
2139 "id": 7,
2140 "type": "record",
2141 "record": {
2142 "live": true,
2143 "did": "did:plc:olaren",
2144 "rev": stale_tid.as_str(),
2145 "collection": "sh.tangled.feed.star",
2146 "rkey": "abcabcabcabcz",
2147 "action": "create",
2148 "record": {
2149 "$type": "sh.tangled.feed.star",
2150 "createdAt": "2026-05-01T00:00:00Z",
2151 "subjectDid": "did:plc:abalone"
2152 }
2153 }
2154 }));
2155 handle_frame(
2156 frame,
2157 &store,
2158 &issue_states,
2159 &pull_statuses,
2160 &cov,
2161 &NoopSearchSink,
2162 &NoopRecordStore,
2163 &resolver,
2164 &sys_clock(),
2165 now(),
2166 )
2167 .await;
2168 assert!(!cov.snapshot().is_ready());
2169 assert!(matches!(cov.snapshot(), Coverage::Warming { .. }));
2170 }
2171
2172 #[test]
2173 fn classify_consumer_too_slow_frame_returns_typed_variant() {
2174 let text = r#"{"type":"error","error":"ConsumerTooSlow","message":"stream socket send blocked for at least 30 seconds"}"#;
2175 match classify_text_frame(text) {
2176 Err(IngestError::ConsumerTooSlow { message }) => {
2177 assert!(
2178 message.contains("30 seconds"),
2179 "message field preserved verbatim, got: {message}"
2180 );
2181 }
2182 other => panic!("expected ConsumerTooSlow variant, got: {other:?}"),
2183 }
2184 }
2185
2186 #[test]
2187 fn classify_unknown_hydrant_error_falls_back_to_generic_variant() {
2188 let text = r#"{"type":"error","error":"NewFutureCode","message":"some new failure mode"}"#;
2189 match classify_text_frame(text) {
2190 Err(IngestError::HydrantStream { code, message }) => {
2191 assert_eq!(code, "NewFutureCode");
2192 assert_eq!(message, "some new failure mode");
2193 }
2194 other => panic!("expected HydrantStream variant, got: {other:?}"),
2195 }
2196 }
2197
2198 #[test]
2199 fn classify_error_frame_without_message_uses_empty_string() {
2200 let text = r#"{"type":"error","error":"ConsumerTooSlow"}"#;
2201 match classify_text_frame(text) {
2202 Err(IngestError::ConsumerTooSlow { message }) => assert!(message.is_empty()),
2203 other => panic!("expected ConsumerTooSlow with empty message, got: {other:?}"),
2204 }
2205 }
2206
2207 #[test]
2208 fn classify_normal_record_frame_unchanged() {
2209 let text = r#"{"id":42,"type":"record"}"#;
2210 let frame = classify_text_frame(text).expect("normal record frame must parse");
2211 assert_eq!(frame.id, 42);
2212 assert_eq!(frame.kind, FrameKind::Record);
2213 }
2214
2215 #[test]
2216 fn classify_garbage_object_returns_decode_error() {
2217 let text = r#"{"random":"object","without":"required fields"}"#;
2218 match classify_text_frame(text) {
2219 Err(IngestError::Decode(_)) => {}
2220 other => panic!("expected Decode error, got: {other:?}"),
2221 }
2222 }
2223
2224 struct ScriptedWsStream {
2225 messages: std::collections::VecDeque<Result<WsMessage, NetworkError>>,
2226 }
2227
2228 impl WsStream for ScriptedWsStream {
2229 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> {
2230 let msg = self.messages.pop_front();
2231 Box::pin(async move { msg })
2232 }
2233 }
2234
2235 #[tokio::test]
2236 async fn reader_loop_surfaces_consumer_too_slow_from_error_frame() {
2237 let mut messages = std::collections::VecDeque::new();
2238 messages.push_back(Ok(WsMessage::Text(
2239 r#"{"type":"error","error":"ConsumerTooSlow","message":"stream delivery blocked"}"#
2240 .to_owned(),
2241 )));
2242 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages });
2243 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH);
2244 let (control_tx, _control_rx) =
2245 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2246 let cancel = CancellationToken::new();
2247
2248 let end = reader_loop(stream, frame_tx, control_tx, cancel).await;
2249
2250 match end.error {
2251 Some(IngestError::ConsumerTooSlow { message }) => {
2252 assert_eq!(message, "stream delivery blocked");
2253 }
2254 other => panic!("expected ConsumerTooSlow SessionEnd error, got: {other:?}"),
2255 }
2256 assert_eq!(end.outcome, SessionOutcome::Empty);
2257 }
2258
2259 #[tokio::test]
2260 async fn reader_loop_surfaces_unknown_hydrant_error_distinctly() {
2261 let mut messages = std::collections::VecDeque::new();
2262 messages.push_back(Ok(WsMessage::Text(
2263 r#"{"type":"error","error":"NewFutureCode","message":"new mode"}"#.to_owned(),
2264 )));
2265 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages });
2266 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH);
2267 let (control_tx, _control_rx) =
2268 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2269 let cancel = CancellationToken::new();
2270
2271 let end = reader_loop(stream, frame_tx, control_tx, cancel).await;
2272
2273 match end.error {
2274 Some(IngestError::HydrantStream { code, message }) => {
2275 assert_eq!(code, "NewFutureCode");
2276 assert_eq!(message, "new mode");
2277 }
2278 other => panic!("expected HydrantStream SessionEnd error, got: {other:?}"),
2279 }
2280 }
2281
2282 struct ChannelWsStream {
2283 rx: tokio::sync::mpsc::Receiver<Result<WsMessage, NetworkError>>,
2284 }
2285
2286 impl WsStream for ChannelWsStream {
2287 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> {
2288 Box::pin(async move { self.rx.recv().await })
2289 }
2290 }
2291
2292 fn star_frame_text(id: u64, rkey: &Rkey<DefaultStr>) -> String {
2293 json!({
2294 "id": id,
2295 "type": "record",
2296 "record": {
2297 "live": false,
2298 "did": "did:plc:olaren",
2299 "rev": fresh_tid().as_str(),
2300 "collection": "sh.tangled.feed.star",
2301 "rkey": rkey.as_ref(),
2302 "action": "create",
2303 "record": {
2304 "$type": "sh.tangled.feed.star",
2305 "createdAt": "2026-05-01T00:00:00Z",
2306 "subjectDid": "did:plc:abalone"
2307 }
2308 }
2309 })
2310 .to_string()
2311 }
2312
2313 #[tokio::test]
2314 async fn pong_forwards_promptly_when_frame_channel_is_full() {
2315 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8);
2316 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx });
2317 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1);
2318 let (control_tx, mut control_rx) =
2319 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2320 let cancel = CancellationToken::new();
2321
2322 let prefill: HydrantFrame = parse_frame(json!({
2323 "id": 0,
2324 "type": "record",
2325 "record": {
2326 "live": false,
2327 "did": "did:plc:olaren",
2328 "rev": fresh_tid().as_str(),
2329 "collection": "sh.tangled.feed.star",
2330 "rkey": "prefilrkey001",
2331 "action": "create",
2332 "record": {
2333 "$type": "sh.tangled.feed.star",
2334 "createdAt": "2026-05-01T00:00:00Z",
2335 "subjectDid": "did:plc:abalone"
2336 }
2337 }
2338 }));
2339 frame_tx
2340 .try_send(prefill)
2341 .expect("depth-1 frame channel must accept the prefill");
2342
2343 let reader_handle = tokio::spawn(reader_loop(
2344 stream,
2345 frame_tx.clone(),
2346 control_tx.clone(),
2347 cancel.clone(),
2348 ));
2349
2350 ws_tx
2351 .send(Ok(WsMessage::Text(star_frame_text(
2352 1,
2353 &rkey("starrkeyaa001"),
2354 ))))
2355 .await
2356 .unwrap();
2357 ws_tx.send(Ok(WsMessage::Pong(Bytes::new()))).await.unwrap();
2358
2359 let pong_event = tokio::time::timeout(Duration::from_millis(500), control_rx.recv())
2360 .await
2361 .expect("pong must surface inside 500ms even when frame_tx is saturated. A reader blocked on a frame send would never poll the next ws message")
2362 .expect("control_tx was not closed");
2363 assert!(
2364 matches!(pong_event, WsEvent::IncomingPong),
2365 "first control event must be the pong, not a held text",
2366 );
2367
2368 let too_slow =
2369 "{\"type\":\"error\",\"error\":\"ConsumerTooSlow\",\"message\":\"saturated\"}"
2370 .to_string();
2371 ws_tx.send(Ok(WsMessage::Text(too_slow))).await.unwrap();
2372
2373 let end = tokio::time::timeout(Duration::from_secs(1), reader_handle)
2374 .await
2375 .expect("reader must exit promptly once ConsumerTooSlow is read")
2376 .expect("reader task must not panic");
2377 match end.error {
2378 Some(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "saturated"),
2379 other => panic!("expected ConsumerTooSlow disconnect, got: {other:?}"),
2380 }
2381
2382 drop(frame_rx);
2383 }
2384
2385 #[tokio::test]
2386 async fn reader_drains_held_frames_once_processor_catches_up() {
2387 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8);
2388 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx });
2389 let (frame_tx, mut frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1);
2390 let (control_tx, _control_rx) =
2391 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2392 let cancel = CancellationToken::new();
2393
2394 let prefill: HydrantFrame = parse_frame(json!({
2395 "id": 0,
2396 "type": "record",
2397 "record": {
2398 "live": false,
2399 "did": "did:plc:olaren",
2400 "rev": fresh_tid().as_str(),
2401 "collection": "sh.tangled.feed.star",
2402 "rkey": "prefilrkey001",
2403 "action": "create",
2404 "record": {
2405 "$type": "sh.tangled.feed.star",
2406 "createdAt": "2026-05-01T00:00:00Z",
2407 "subjectDid": "did:plc:abalone"
2408 }
2409 }
2410 }));
2411 frame_tx.try_send(prefill).unwrap();
2412
2413 let reader_handle = tokio::spawn(reader_loop(
2414 stream,
2415 frame_tx.clone(),
2416 control_tx,
2417 cancel.clone(),
2418 ));
2419
2420 ws_tx
2421 .send(Ok(WsMessage::Text(star_frame_text(
2422 1,
2423 &rkey("heldrkeyaa001"),
2424 ))))
2425 .await
2426 .unwrap();
2427 ws_tx
2428 .send(Ok(WsMessage::Text(star_frame_text(
2429 2,
2430 &rkey("heldrkeyaa002"),
2431 ))))
2432 .await
2433 .unwrap();
2434
2435 let _drained_prefill = frame_rx.recv().await.expect("prefilled frame drains");
2436 let first = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv())
2437 .await
2438 .expect("first held frame must reach frame_rx after slot opens")
2439 .expect("frame_tx still open");
2440 assert_eq!(first.id, 1);
2441 let second = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv())
2442 .await
2443 .expect("second held frame must reach frame_rx after slot opens")
2444 .expect("frame_tx still open");
2445 assert_eq!(second.id, 2);
2446
2447 cancel.cancel();
2448 let _ = tokio::time::timeout(Duration::from_secs(1), reader_handle)
2449 .await
2450 .expect("reader must stop after cancel");
2451 }
2452
2453 #[test]
2454 fn first_connect_uses_configured_start_cursor() {
2455 let start = HydrantCursor::new(42);
2456 assert_eq!(next_connect_cursor(Coverage::default(), start), start);
2457 }
2458
2459 #[test]
2460 fn reconnect_resumes_strictly_after_last_seen() {
2461 let snap = Coverage::default().advance(HydrantCursor::new(7));
2462 assert_eq!(
2463 next_connect_cursor(snap, HydrantCursor::new(0)),
2464 HydrantCursor::new(8),
2465 );
2466 }
2467
2468 #[test]
2469 fn reconnect_overrides_configured_start() {
2470 let snap = Coverage::default().advance(HydrantCursor::new(100));
2471 assert_eq!(
2472 next_connect_cursor(snap, HydrantCursor::new(50)),
2473 HydrantCursor::new(101),
2474 );
2475 }
2476
2477 #[test]
2478 fn first_connect_uses_start_even_when_first_frame_id_would_be_zero() {
2479 let start = HydrantCursor::new(7);
2480 let snap = Coverage::default();
2481 assert_eq!(snap.last_cursor(), HydrantCursor::new(0));
2482 assert_eq!(snap.events_processed(), 0);
2483 assert_eq!(next_connect_cursor(snap, start), start);
2484 }
2485
2486 #[test]
2487 fn reconnect_after_processing_id_zero_advances_to_one() {
2488 let snap = Coverage::default().advance(HydrantCursor::new(0));
2489 assert_eq!(snap.events_processed(), 1);
2490 assert_eq!(
2491 next_connect_cursor(snap, HydrantCursor::new(99)),
2492 HydrantCursor::new(1),
2493 "events_processed disambiguates 'never seen' from 'saw id 0'",
2494 );
2495 }
2496
2497 #[tokio::test]
2498 async fn identity_frame_advances_cursor_only() {
2499 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2500 let frame: HydrantFrame = parse_frame(json!({
2501 "id": 5,
2502 "type": "identity",
2503 "identity": {
2504 "did": "did:plc:olaren",
2505 "handle": "olaren.dev"
2506 }
2507 }));
2508 handle_frame(
2509 frame,
2510 &store,
2511 &issue_states,
2512 &pull_statuses,
2513 &cov,
2514 &NoopSearchSink,
2515 &NoopRecordStore,
2516 &resolver,
2517 &sys_clock(),
2518 now(),
2519 )
2520 .await;
2521 assert_eq!(store.key_count(), 0);
2522 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(5));
2523 assert!(!cov.snapshot().is_ready());
2524 }
2525
2526 #[tokio::test]
2527 async fn account_frame_advances_cursor_only() {
2528 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2529 let frame: HydrantFrame = parse_frame(json!({
2530 "id": 6,
2531 "type": "account",
2532 "account": {"did": "did:plc:olaren", "active": true}
2533 }));
2534 handle_frame(
2535 frame,
2536 &store,
2537 &issue_states,
2538 &pull_statuses,
2539 &cov,
2540 &NoopSearchSink,
2541 &NoopRecordStore,
2542 &resolver,
2543 &sys_clock(),
2544 now(),
2545 )
2546 .await;
2547 assert_eq!(store.key_count(), 0);
2548 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(6));
2549 }
2550
2551 #[tokio::test]
2552 async fn unknown_frame_kind_advances_cursor_without_panic() {
2553 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2554 let frame: HydrantFrame = parse_frame(json!({"id": 8, "type": "future_event"}));
2555 assert_eq!(frame.kind, FrameKind::Other);
2556 handle_frame(
2557 frame,
2558 &store,
2559 &issue_states,
2560 &pull_statuses,
2561 &cov,
2562 &NoopSearchSink,
2563 &NoopRecordStore,
2564 &resolver,
2565 &sys_clock(),
2566 now(),
2567 )
2568 .await;
2569 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(8));
2570 }
2571
2572 fn fresh_runtime(cancel: CancellationToken) -> IngestRuntime<NoopSearchSink> {
2573 IngestRuntime {
2574 store: Arc::new(EdgeStore::new(RuntimeHasher::default())),
2575 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())),
2576 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())),
2577 coverage: Arc::new(CoverageWatch::new()),
2578 search: Arc::new(NoopSearchSink),
2579 records: Arc::new(NoopRecordStore) as Arc<dyn RecordStore>,
2580 resolver: Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
2581 clock: Arc::new(SystemClock::new()),
2582 entropy: Arc::new(OsEntropy),
2583 ws: TungsteniteWs::shared(),
2584 cancel,
2585 disconnects: None,
2586 warming_shadow: None,
2587 warming_buffer: None,
2588 knot_registry: None,
2589 knot_gate: None,
2590 }
2591 }
2592
2593 #[tokio::test(start_paused = true)]
2594 async fn cancel_token_short_circuits_reconnect_sleep() {
2595 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap());
2596 let cancel = CancellationToken::new();
2597 let runtime = fresh_runtime(cancel.clone());
2598 let task = tokio::spawn(async move { run(cfg, runtime).await });
2599 tokio::time::advance(Duration::from_millis(10)).await;
2600 cancel.cancel();
2601 let outcome = tokio::time::timeout(Duration::from_secs(1), task)
2602 .await
2603 .expect("ingest must stop within timeout once cancel fires");
2604 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}");
2605 }
2606
2607 #[test]
2608 fn jittered_stays_within_one_quarter_of_base() {
2609 let base = Duration::from_secs(1);
2610 let cap = base + Duration::from_millis(250);
2611 let entropy = OsEntropy;
2612 (0..50).for_each(|_| {
2613 let j = jittered(base, &entropy);
2614 assert!(j >= base, "jitter must not undershoot");
2615 assert!(j <= cap, "jitter must not exceed +25%, got {:?}", j);
2616 });
2617 }
2618
2619 #[tokio::test]
2620 async fn star_after_observed_repo_keys_on_repo_did() {
2621 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2622 let repo: HydrantFrame = parse_frame(json!({
2623 "id": 1,
2624 "type": "record",
2625 "record": {
2626 "live": false,
2627 "did": "did:plc:nel",
2628 "rev": fresh_tid().as_str(),
2629 "collection": "sh.tangled.repo",
2630 "rkey": "abcabcabcabcz",
2631 "action": "create",
2632 "record": {
2633 "$type": "sh.tangled.repo",
2634 "createdAt": "2026-05-01T00:00:00Z",
2635 "knot": "oyster.cafe",
2636 "name": "abalone",
2637 "repoDid": "did:plc:abalone"
2638 }
2639 }
2640 }));
2641 handle_frame(
2642 repo,
2643 &store,
2644 &issue_states,
2645 &pull_statuses,
2646 &cov,
2647 &NoopSearchSink,
2648 &NoopRecordStore,
2649 &resolver,
2650 &sys_clock(),
2651 now(),
2652 )
2653 .await;
2654
2655 let star: HydrantFrame = parse_frame(json!({
2656 "id": 2,
2657 "type": "record",
2658 "record": {
2659 "live": false,
2660 "did": "did:plc:olaren",
2661 "rev": fresh_tid().as_str(),
2662 "collection": "sh.tangled.feed.star",
2663 "rkey": "abcabcabcabcz",
2664 "action": "create",
2665 "record": {
2666 "$type": "sh.tangled.feed.star",
2667 "createdAt": "2026-05-01T00:00:00Z",
2668 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2669 }
2670 }
2671 }));
2672 handle_frame(
2673 star,
2674 &store,
2675 &issue_states,
2676 &pull_statuses,
2677 &cov,
2678 &NoopSearchSink,
2679 &NoopRecordStore,
2680 &resolver,
2681 &sys_clock(),
2682 now(),
2683 )
2684 .await;
2685
2686 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap();
2687 let repo_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:abalone"));
2688 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel"));
2689 assert_eq!(
2690 store.count(&repo_keyed),
2691 1,
2692 "star should be keyed on repoDID once the repo is observed"
2693 );
2694 assert_eq!(
2695 store.count(&owner_keyed),
2696 0,
2697 "owner DID should not collect the edge"
2698 );
2699 }
2700
2701 #[tokio::test]
2702 async fn unresolvable_repo_subject_drops_edge() {
2703 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2704 let star: HydrantFrame = parse_frame(json!({
2705 "id": 1,
2706 "type": "record",
2707 "record": {
2708 "live": false,
2709 "did": "did:plc:olaren",
2710 "rev": fresh_tid().as_str(),
2711 "collection": "sh.tangled.feed.star",
2712 "rkey": "abcabcabcabcz",
2713 "action": "create",
2714 "record": {
2715 "$type": "sh.tangled.feed.star",
2716 "createdAt": "2026-05-01T00:00:00Z",
2717 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2718 }
2719 }
2720 }));
2721 handle_frame(
2722 star,
2723 &store,
2724 &issue_states,
2725 &pull_statuses,
2726 &cov,
2727 &NoopSearchSink,
2728 &NoopRecordStore,
2729 &resolver,
2730 &sys_clock(),
2731 now(),
2732 )
2733 .await;
2734
2735 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap();
2736 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:nel"));
2737 let uri_keyed = bobbin_types::ids::EdgeKey::new(
2738 nsid,
2739 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"),
2740 );
2741 assert_eq!(
2742 store.count(&owner_keyed),
2743 0,
2744 "must not silently misfile under the authoring DID",
2745 );
2746 assert_eq!(
2747 store.count(&uri_keyed),
2748 0,
2749 "unresolvable rkey-form subject must drop the edge; keeping it would index against a key that never matches bare-DID queries",
2750 );
2751 }
2752
2753 #[tokio::test]
2754 async fn repo_without_repo_did_drops_edge() {
2755 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2756 let repo: HydrantFrame = parse_frame(json!({
2757 "id": 1,
2758 "type": "record",
2759 "record": {
2760 "live": false,
2761 "did": "did:plc:nel",
2762 "rev": fresh_tid().as_str(),
2763 "collection": "sh.tangled.repo",
2764 "rkey": "abcabcabcabcz",
2765 "action": "create",
2766 "record": {
2767 "$type": "sh.tangled.repo",
2768 "createdAt": "2026-05-01T00:00:00Z",
2769 "knot": "oyster.cafe",
2770 "name": "abalone"
2771 }
2772 }
2773 }));
2774 handle_frame(
2775 repo,
2776 &store,
2777 &issue_states,
2778 &pull_statuses,
2779 &cov,
2780 &NoopSearchSink,
2781 &NoopRecordStore,
2782 &resolver,
2783 &sys_clock(),
2784 now(),
2785 )
2786 .await;
2787
2788 let star: HydrantFrame = parse_frame(json!({
2789 "id": 2,
2790 "type": "record",
2791 "record": {
2792 "live": false,
2793 "did": "did:plc:olaren",
2794 "rev": fresh_tid().as_str(),
2795 "collection": "sh.tangled.feed.star",
2796 "rkey": "abcabcabcabcz",
2797 "action": "create",
2798 "record": {
2799 "$type": "sh.tangled.feed.star",
2800 "createdAt": "2026-05-01T00:00:00Z",
2801 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2802 }
2803 }
2804 }));
2805 handle_frame(
2806 star,
2807 &store,
2808 &issue_states,
2809 &pull_statuses,
2810 &cov,
2811 &NoopSearchSink,
2812 &NoopRecordStore,
2813 &resolver,
2814 &sys_clock(),
2815 now(),
2816 )
2817 .await;
2818
2819 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap();
2820 let uri_keyed = bobbin_types::ids::EdgeKey::new(
2821 nsid.clone(),
2822 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"),
2823 );
2824 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel"));
2825 assert_eq!(
2826 store.count(&uri_keyed),
2827 0,
2828 "no canonical DID exists for a repo without repoDID, so the edge must be dropped",
2829 );
2830 assert_eq!(
2831 store.count(&owner_keyed),
2832 0,
2833 "the authoring DID is not the canonical repo identity",
2834 );
2835 }
2836
2837 #[tokio::test]
2838 async fn explicit_subject_did_skips_normalization() {
2839 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2840 let star: HydrantFrame = parse_frame(json!({
2841 "id": 1,
2842 "type": "record",
2843 "record": {
2844 "live": false,
2845 "did": "did:plc:olaren",
2846 "rev": fresh_tid().as_str(),
2847 "collection": "sh.tangled.feed.star",
2848 "rkey": "abcabcabcabcz",
2849 "action": "create",
2850 "record": {
2851 "$type": "sh.tangled.feed.star",
2852 "createdAt": "2026-05-01T00:00:00Z",
2853 "subjectDid": "did:plc:abalone"
2854 }
2855 }
2856 }));
2857 handle_frame(
2858 star,
2859 &store,
2860 &issue_states,
2861 &pull_statuses,
2862 &cov,
2863 &NoopSearchSink,
2864 &NoopRecordStore,
2865 &resolver,
2866 &sys_clock(),
2867 now(),
2868 )
2869 .await;
2870 let key = bobbin_types::ids::EdgeKey::new(
2871 Nsid::new_static("sh.tangled.feed.star").unwrap(),
2872 did_subj("did:plc:abalone"),
2873 );
2874 assert_eq!(store.count(&key), 1);
2875 }
2876
2877 #[tokio::test]
2878 async fn issue_with_repo_uri_resolves_to_repo_did() {
2879 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2880 resolver
2881 .observe(
2882 Did::new_owned("did:plc:nel").unwrap(),
2883 Rkey::new_owned("abcabcabcabcz").unwrap(),
2884 Some(Did::new_owned("did:plc:abalone").unwrap()),
2885 )
2886 .await;
2887 let issue: HydrantFrame = parse_frame(json!({
2888 "id": 1,
2889 "type": "record",
2890 "record": {
2891 "live": false,
2892 "did": "did:plc:olaren",
2893 "rev": fresh_tid().as_str(),
2894 "collection": "sh.tangled.repo.issue",
2895 "rkey": "abcabcabcabcz",
2896 "action": "create",
2897 "record": {
2898 "$type": "sh.tangled.repo.issue",
2899 "createdAt": "2026-05-01T00:00:00Z",
2900 "title": "bug",
2901 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2902 }
2903 }
2904 }));
2905 handle_frame(
2906 issue,
2907 &store,
2908 &issue_states,
2909 &pull_statuses,
2910 &cov,
2911 &NoopSearchSink,
2912 &NoopRecordStore,
2913 &resolver,
2914 &sys_clock(),
2915 now(),
2916 )
2917 .await;
2918 let key = bobbin_types::ids::EdgeKey::new(
2919 Nsid::new_static("sh.tangled.repo.issue").unwrap(),
2920 did_subj("did:plc:abalone"),
2921 );
2922 assert_eq!(store.count(&key), 1);
2923 }
2924
2925 #[derive(Default)]
2926 struct RecordingSearchSink {
2927 docs: tokio::sync::Mutex<Vec<bobbin_types::search::SearchDoc>>,
2928 }
2929
2930 impl SearchSink for RecordingSearchSink {
2931 async fn upsert(&self, doc: bobbin_types::search::SearchDoc) {
2932 self.docs.lock().await.push(doc);
2933 }
2934 async fn remove(&self, _uri: &AtUri<DefaultStr>) {}
2935 }
2936
2937 #[tokio::test]
2938 async fn search_index_hydrates_repo_did_via_resolver() {
2939 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2940 resolver
2941 .observe(
2942 Did::new_owned("did:plc:nel").unwrap(),
2943 Rkey::new_owned("abcabcabcabcz").unwrap(),
2944 Some(Did::new_owned("did:plc:abalone").unwrap()),
2945 )
2946 .await;
2947 let search = RecordingSearchSink::default();
2948 let issue: HydrantFrame = parse_frame(json!({
2949 "id": 1,
2950 "type": "record",
2951 "record": {
2952 "live": false,
2953 "did": "did:plc:olaren",
2954 "rev": fresh_tid().as_str(),
2955 "collection": "sh.tangled.repo.issue",
2956 "rkey": "abcabcabcabcz",
2957 "action": "create",
2958 "record": {
2959 "$type": "sh.tangled.repo.issue",
2960 "createdAt": "2026-05-01T00:00:00Z",
2961 "title": "bug",
2962 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2963 }
2964 }
2965 }));
2966 handle_frame(
2967 issue,
2968 &store,
2969 &issue_states,
2970 &pull_statuses,
2971 &cov,
2972 &search,
2973 &NoopRecordStore,
2974 &resolver,
2975 &sys_clock(),
2976 now(),
2977 )
2978 .await;
2979 let docs = search.docs.lock().await;
2980 assert_eq!(docs.len(), 1, "issue should produce one search doc");
2981 assert_eq!(
2982 docs[0].repo,
2983 Some(Did::new_owned("did:plc:abalone").unwrap()),
2984 "search doc repo field must be resolved from the observed repo, not left as None",
2985 );
2986 }
2987
2988 #[tokio::test]
2989 async fn delete_repo_record_evicts_resolver_cache() {
2990 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2991 let owner = Did::new_owned("did:plc:nel").unwrap();
2992 let rkey = Rkey::new_owned("abcabcabcabcz").unwrap();
2993 resolver
2994 .observe(
2995 owner.clone(),
2996 rkey.clone(),
2997 Some(Did::new_owned("did:plc:abalone").unwrap()),
2998 )
2999 .await;
3000 assert!(
3001 resolver.cached_resolution(&owner, &rkey).await.is_some(),
3002 "observe must seed the cache",
3003 );
3004 let delete: HydrantFrame = parse_frame(json!({
3005 "id": 1,
3006 "type": "record",
3007 "record": {
3008 "live": false,
3009 "did": owner.as_ref(),
3010 "rev": fresh_tid().as_str(),
3011 "collection": "sh.tangled.repo",
3012 "rkey": rkey.as_ref(),
3013 "action": "delete"
3014 }
3015 }));
3016 handle_frame(
3017 delete,
3018 &store,
3019 &issue_states,
3020 &pull_statuses,
3021 &cov,
3022 &NoopSearchSink,
3023 &NoopRecordStore,
3024 &resolver,
3025 &sys_clock(),
3026 now(),
3027 )
3028 .await;
3029 assert!(
3030 resolver.cached_resolution(&owner, &rkey).await.is_none(),
3031 "deleting the repo record must clear the resolver cache so future observes are not blocked by a stale Authoritative entry",
3032 );
3033 }
3034
3035 #[tokio::test]
3036 async fn cancel_short_circuits_a_hung_ws_connect() {
3037 let _listener = tokio::net::TcpListener::bind("127.0.0.1:0")
3038 .await
3039 .expect("bind sink listener");
3040 let port = _listener.local_addr().expect("local addr").port();
3041 let cfg =
3042 IngestConfig::new(Url::parse(&format!("ws://127.0.0.1:{port}")).expect("hydrant url"));
3043 let cancel = CancellationToken::new();
3044 let runtime = fresh_runtime(cancel.clone());
3045 let task = tokio::spawn(async move { run(cfg, runtime).await });
3046 tokio::time::sleep(Duration::from_millis(100)).await;
3047 cancel.cancel();
3048 let outcome = tokio::time::timeout(Duration::from_secs(2), task)
3049 .await
3050 .expect("cancel must short-circuit the hung ws connect");
3051 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}");
3052 }
3053
3054 struct CloseOnConnectTransport {
3055 used: std::sync::Mutex<bool>,
3056 }
3057 impl bobbin_runtime::WsTransport for CloseOnConnectTransport {
3058 fn connect(&self, _url: Url) -> bobbin_runtime::WsConnectFuture {
3059 let mut used = self.used.lock().unwrap();
3060 if *used {
3061 return Box::pin(async move {
3062 Err(NetworkError::Connect("only one connect allowed".to_owned()))
3063 });
3064 }
3065 *used = true;
3066 Box::pin(async move {
3067 let mut q = std::collections::VecDeque::new();
3068 q.push_back(Ok(WsMessage::Close {
3069 code: 1000,
3070 reason: "bye".to_owned(),
3071 }));
3072 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages: q });
3073 struct NoopSink;
3074 impl bobbin_runtime::WsSink for NoopSink {
3075 fn send<'a>(&'a mut self, _m: WsMessage) -> bobbin_runtime::WsSendFuture<'a> {
3076 Box::pin(async move { Ok(()) })
3077 }
3078 }
3079 let sink: Box<dyn bobbin_runtime::WsSink> = Box::new(NoopSink);
3080 Ok(bobbin_runtime::WsConn { sink, stream })
3081 })
3082 }
3083 }
3084
3085 #[tokio::test]
3086 async fn run_session_returns_after_remote_close_when_outer_cancel_unfired() {
3087 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap());
3088 let cancel = CancellationToken::new();
3089 let mut runtime = fresh_runtime(cancel.clone());
3090 runtime.ws = Arc::new(CloseOnConnectTransport {
3091 used: std::sync::Mutex::new(false),
3092 });
3093
3094 let res = tokio::time::timeout(
3095 Duration::from_secs(3),
3096 run_session(&cfg, HydrantCursor::new(0), &runtime),
3097 )
3098 .await;
3099 assert!(
3100 res.is_ok(),
3101 "run_session must return after a remote Close even when outer cancel never fires - regression for a session-scoped task hanging on the parent token",
3102 );
3103 }
3104
3105 struct HangingSink;
3106 impl bobbin_runtime::WsSink for HangingSink {
3107 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> {
3108 Box::pin(std::future::pending())
3109 }
3110 }
3111
3112 #[tokio::test(start_paused = true)]
3113 async fn timed_send_surfaces_send_timeout_when_sink_pends_forever() {
3114 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(HangingSink);
3115 let task =
3116 tokio::spawn(async move { timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await });
3117 tokio::time::advance(SEND_TIMEOUT + Duration::from_secs(1)).await;
3118 let result = task.await.expect("task panicked");
3119 match result {
3120 Err(IngestError::SendTimeout(d)) => assert_eq!(d, SEND_TIMEOUT),
3121 other => panic!(
3122 "expected SendTimeout, got {other:?}. A bare ws_sink.send.await would hang forever on a half-dead socket and starve the writer's pong-deadline arm",
3123 ),
3124 }
3125 }
3126
3127 struct OkSink;
3128 impl bobbin_runtime::WsSink for OkSink {
3129 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> {
3130 Box::pin(async move { Ok(()) })
3131 }
3132 }
3133
3134 #[tokio::test]
3135 async fn timed_send_returns_ok_when_sink_succeeds_promptly() {
3136 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(OkSink);
3137 let result = timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await;
3138 assert!(matches!(result, Ok(())), "got {result:?}");
3139 }
3140
3141 #[test]
3142 fn classify_error_frame_with_id_dispatches_as_error_not_unknown_kind() {
3143 let text = r#"{"id":42,"type":"error","error":"ConsumerTooSlow","message":"slow"}"#;
3144 match classify_text_frame(text) {
3145 Err(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "slow"),
3146 other => panic!(
3147 "type=\"error\" must dispatch to the error path even when id is present, got: {other:?}"
3148 ),
3149 }
3150 }
3151
3152 #[tokio::test]
3153 async fn buffered_pipeline_preserves_cursor_order_under_resolve_latency_skew() {
3154 use bobbin_record_lru::RecordStore;
3155 use bobbin_types::record::RecordBody;
3156 use std::sync::Mutex;
3157
3158 let server = wiremock::MockServer::start().await;
3159 wiremock::Mock::given(wiremock::matchers::method("GET"))
3160 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
3161 .respond_with(
3162 wiremock::ResponseTemplate::new(404).set_delay(Duration::from_millis(150)),
3163 )
3164 .mount(&server)
3165 .await;
3166
3167 let client = bobbin_slingshot_client::SlingshotClient::with_default_http(
3168 Url::parse(&server.uri()).unwrap(),
3169 )
3170 .unwrap();
3171 let clock: Arc<dyn Clock> = Arc::new(SystemClock::new());
3172 let resolver = Arc::new(RepoIdResolver::with_slingshot(
3173 client,
3174 clock.clone(),
3175 RuntimeHasher::default(),
3176 ));
3177
3178 let owner = Did::new_owned("did:plc:nel").unwrap();
3179 let abalone = Did::new_owned("did:plc:abalone").unwrap();
3180 let fast_rkeys: [Rkey<DefaultStr>; 2] = [rkey("fastrkeyaa01"), rkey("fastrkeyaa02")];
3181 let slow_rkeys: [Rkey<DefaultStr>; 2] = [rkey("slowrkeyaa01"), rkey("slowrkeyaa02")];
3182 for r in &fast_rkeys {
3183 resolver
3184 .observe(owner.clone(), r.clone(), Some(abalone.clone()))
3185 .await;
3186 }
3187
3188 #[derive(Default)]
3189 struct Capturing {
3190 urls: Mutex<Vec<AtUri<DefaultStr>>>,
3191 }
3192 impl RecordStore for Capturing {
3193 fn get(&self, _uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> {
3194 None
3195 }
3196 fn put(&self, uri: AtUri<DefaultStr>, _body: Arc<RecordBody>) {
3197 self.urls.lock().unwrap().push(uri);
3198 }
3199 fn remove(&self, _uri: &AtUri<DefaultStr>) {}
3200 }
3201 let capturing = Arc::new(Capturing::default());
3202
3203 let runtime: IngestRuntime<NoopSearchSink> = IngestRuntime {
3204 store: Arc::new(EdgeStore::new(RuntimeHasher::default())),
3205 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())),
3206 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())),
3207 coverage: Arc::new(CoverageWatch::new()),
3208 search: Arc::new(NoopSearchSink),
3209 records: capturing.clone() as Arc<dyn RecordStore>,
3210 resolver,
3211 clock,
3212 entropy: Arc::new(OsEntropy),
3213 ws: TungsteniteWs::shared(),
3214 cancel: CancellationToken::new(),
3215 disconnects: None,
3216 warming_shadow: None,
3217 warming_buffer: None,
3218 knot_registry: None,
3219 knot_gate: None,
3220 };
3221
3222 let parallelism = 4usize;
3223 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(64);
3224 let pipeline_rt = runtime.clone();
3225 let pipeline = tokio::spawn(async move {
3226 let prep_rt = pipeline_rt.clone();
3227 let resolve_rt = pipeline_rt.clone();
3228 let commit_rt = pipeline_rt;
3229 ReceiverStream::new(frame_rx)
3230 .then(move |frame| prep_stage(frame, prep_rt.clone()))
3231 .map(move |staged| resolve_stage(staged, resolve_rt.clone()))
3232 .buffered(parallelism)
3233 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism))
3234 .await;
3235 });
3236
3237 let mk = |id: u64, idx: u64, repo_rkey: &Rkey<DefaultStr>| -> HydrantFrame {
3238 parse_frame(json!({
3239 "id": id,
3240 "type": "record",
3241 "record": {
3242 "live": false,
3243 "did": "did:plc:olaren",
3244 "rev": fresh_tid().as_str(),
3245 "collection": "sh.tangled.feed.star",
3246 "rkey": format!("starrkeya{idx:04}"),
3247 "action": "create",
3248 "cid": VALID_CID,
3249 "record": {
3250 "$type": "sh.tangled.feed.star",
3251 "createdAt": "2026-05-01T00:00:00Z",
3252 "subject": format!("at://did:plc:nel/sh.tangled.repo/{}", repo_rkey.as_ref()),
3253 }
3254 }
3255 }))
3256 };
3257
3258 frame_tx.send(mk(1, 1, &slow_rkeys[0])).await.unwrap();
3259 frame_tx.send(mk(2, 2, &fast_rkeys[0])).await.unwrap();
3260 frame_tx.send(mk(3, 3, &slow_rkeys[1])).await.unwrap();
3261 frame_tx.send(mk(4, 4, &fast_rkeys[1])).await.unwrap();
3262 drop(frame_tx);
3263
3264 pipeline.await.unwrap();
3265
3266 let captured = capturing.urls.lock().unwrap();
3267 let rkeys: Vec<String> = captured
3268 .iter()
3269 .filter_map(|uri| uri.rkey().map(|r| r.as_ref().to_owned()))
3270 .collect();
3271 assert_eq!(
3272 rkeys,
3273 vec![
3274 "starrkeya0001".to_owned(),
3275 "starrkeya0002".to_owned(),
3276 "starrkeya0003".to_owned(),
3277 "starrkeya0004".to_owned(),
3278 ],
3279 "buffered(N) must preserve cursor order even when resolves complete out of order, with ~150ms slow vs cache-hit fast as the latency skew here",
3280 );
3281 assert_eq!(runtime.coverage.snapshot().last_cursor().raw(), 4);
3282 }
3283}