Monorepo for Tangled
tangled.org
1use std::collections::{HashSet, VecDeque};
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::time::Duration;
5
6use bobbin_edge_index::{
7 ApplyOutcome, Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind,
8 PromotionSignal, PullStatusKind, StateIndex, apply_record_state,
9};
10use bobbin_record_lru::RecordStore;
11use bobbin_resolver::{NormalizeRepoRefs, decode_canon_or_upgrade_bytes, synthesize_created_at};
12use bobbin_runtime::{
13 Clock, Entropy, NetworkError, RuntimeHasher, UnixMicros, WsConn, WsMessage, WsStream,
14 WsTransport,
15};
16use bobbin_types::edges::{Edge, ExtractError, Record};
17use bobbin_types::ids::{RepoIdent, SubjectRef};
18use bobbin_types::record::RecordBody;
19use bobbin_types::search::{SearchSink, SearchableRecord};
20use bytes::Bytes;
21use futures::StreamExt;
22use jacquard_common::DefaultStr;
23use jacquard_common::types::did::Did;
24use jacquard_common::types::ident::AtIdentifier;
25use jacquard_common::types::nsid::Nsid;
26use jacquard_common::types::recordkey::Rkey;
27use jacquard_common::types::string::{AtStrError, AtUri, Cid};
28use thiserror::Error;
29use tokio::time::Instant;
30use tokio_stream::wrappers::ReceiverStream;
31use tokio_util::sync::CancellationToken;
32use tracing::{debug, info, warn};
33use url::Url;
34
35mod frame;
36mod resolver;
37mod shadow;
38mod warming;
39use frame::HydrantStreamErrorFrame;
40pub use frame::{FrameKind, HydrantFrame, RecordAction, RecordFrame};
41pub use resolver::{RepoIdResolver, Resolution};
42pub use shadow::{WarmingShadowBuffer, WarmingShadowSnapshot};
43pub use warming::{ParkedUpsert, WarmingBuffer, WarmingBufferSnapshot};
44
45const TANGLED_PREFIX: &str = "sh.tangled.";
46const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(500);
47const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30);
48const PING_INTERVAL: Duration = Duration::from_secs(20);
49const PONG_TIMEOUT: Duration = Duration::from_secs(15);
50const READY_SKEW: Duration = Duration::from_secs(60);
51const FRAME_CHANNEL_DEPTH: usize = 256;
52const CONTROL_CHANNEL_DEPTH: usize = 16;
53const READER_HOLD_LIMIT: usize = 64;
54const SEND_TIMEOUT: Duration = Duration::from_secs(10);
55const NORMAL_CLOSE: u16 = 1000;
56const METRICS_DUMP_INTERVAL: Duration = Duration::from_secs(10);
57const WARMING_FLUSH_PARALLELISM: usize = 64;
58pub const DEFAULT_INGEST_PARALLELISM: NonZeroUsize = match NonZeroUsize::new(16) {
59 Some(n) => n,
60 None => unreachable!(),
61};
62
63#[derive(Clone, Debug)]
64pub struct IngestConfig {
65 pub hydrant_base: Url,
66 pub start_cursor: HydrantCursor,
67 pub parallelism: NonZeroUsize,
68}
69
70impl IngestConfig {
71 pub fn new(hydrant_base: Url) -> Self {
72 Self {
73 hydrant_base,
74 start_cursor: HydrantCursor::new(0),
75 parallelism: DEFAULT_INGEST_PARALLELISM,
76 }
77 }
78
79 fn stream_url(&self, cursor: HydrantCursor) -> Result<Url, IngestError> {
80 let mut url = self.hydrant_base.clone();
81 match url.scheme() {
82 "http" => url
83 .set_scheme("ws")
84 .map_err(|_| IngestError::Url("set ws scheme"))?,
85 "https" => url
86 .set_scheme("wss")
87 .map_err(|_| IngestError::Url("set wss scheme"))?,
88 "ws" | "wss" => {}
89 other => return Err(IngestError::UnknownScheme(other.to_owned())),
90 }
91 url.set_path("/stream");
92 url.query_pairs_mut()
93 .clear()
94 .append_pair("cursor", &cursor.raw().to_string());
95 Ok(url)
96 }
97}
98
99#[derive(Debug, Error)]
100pub enum IngestError {
101 #[error("invalid hydrant url: {0}")]
102 Url(&'static str),
103 #[error("unsupported url scheme: {0}")]
104 UnknownScheme(String),
105 #[error("network: {0}")]
106 Network(#[from] NetworkError),
107 #[error("frame decode: {0}")]
108 Decode(#[from] serde_json::Error),
109 #[error("invalid at-uri synthesized from frame: {0}")]
110 InvalidAtUri(#[from] AtStrError),
111 #[error("record extraction: {0}")]
112 Extract(#[from] ExtractError),
113 #[error("hydrant did not respond to ping within {0:?}")]
114 PongTimeout(Duration),
115 #[error("websocket send blocked for at least {0:?}, treating link as dead")]
116 SendTimeout(Duration),
117 #[error("hydrant disconnected because bobbin's stream consumer fell behind: {message}")]
118 ConsumerTooSlow { message: String },
119 #[error("hydrant signaled stream error {code}: {message}")]
120 HydrantStream { code: String, message: String },
121}
122
123#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
124pub enum DisconnectKind {
125 Url,
126 UnknownScheme,
127 Network,
128 Decode,
129 InvalidAtUri,
130 Extract,
131 PongTimeout,
132 SendTimeout,
133 ConsumerTooSlow,
134 HydrantStream,
135}
136
137impl DisconnectKind {
138 pub fn from_error(err: &IngestError) -> Self {
139 match err {
140 IngestError::Url(_) => Self::Url,
141 IngestError::UnknownScheme(_) => Self::UnknownScheme,
142 IngestError::Network(_) => Self::Network,
143 IngestError::Decode(_) => Self::Decode,
144 IngestError::InvalidAtUri(_) => Self::InvalidAtUri,
145 IngestError::Extract(_) => Self::Extract,
146 IngestError::PongTimeout(_) => Self::PongTimeout,
147 IngestError::SendTimeout(_) => Self::SendTimeout,
148 IngestError::ConsumerTooSlow { .. } => Self::ConsumerTooSlow,
149 IngestError::HydrantStream { .. } => Self::HydrantStream,
150 }
151 }
152}
153
154#[derive(Clone, Debug, Eq, PartialEq)]
155pub struct DisconnectSnapshot {
156 pub kind: DisconnectKind,
157 pub message: String,
158 pub at_unix_micros: UnixMicros,
159 pub last_cursor: HydrantCursor,
160}
161
162#[derive(Default)]
163pub struct DisconnectSink {
164 last: std::sync::Mutex<Option<DisconnectSnapshot>>,
165 count: std::sync::atomic::AtomicU64,
166}
167
168impl DisconnectSink {
169 pub fn new() -> Self {
170 Self::default()
171 }
172
173 pub fn record(&self, snap: DisconnectSnapshot) {
174 *self.last.lock().expect("disconnect sink mutex poisoned") = Some(snap);
175 self.count
176 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177 }
178
179 pub fn snapshot(&self) -> Option<DisconnectSnapshot> {
180 self.last
181 .lock()
182 .expect("disconnect sink mutex poisoned")
183 .clone()
184 }
185
186 pub fn count(&self) -> u64 {
187 self.count.load(std::sync::atomic::Ordering::Relaxed)
188 }
189}
190
191#[derive(Clone, Copy, Debug, Eq, PartialEq)]
192enum SessionOutcome {
193 Progressed,
194 Empty,
195}
196
197#[derive(Debug)]
198struct SessionEnd {
199 outcome: SessionOutcome,
200 error: Option<IngestError>,
201}
202
203pub struct IngestRuntime<S: SearchSink + 'static> {
204 pub store: Arc<EdgeStore>,
205 pub issue_states: Arc<StateIndex<IssueStateKind>>,
206 pub pull_statuses: Arc<StateIndex<PullStatusKind>>,
207 pub coverage: Arc<CoverageWatch>,
208 pub search: Arc<S>,
209 pub records: Arc<dyn RecordStore>,
210 pub resolver: Arc<RepoIdResolver>,
211 pub clock: Arc<dyn Clock>,
212 pub entropy: Arc<dyn Entropy>,
213 pub ws: Arc<dyn WsTransport>,
214 pub cancel: CancellationToken,
215 pub disconnects: Option<Arc<DisconnectSink>>,
216 pub warming_shadow: Option<Arc<WarmingShadowBuffer>>,
217 pub warming_buffer: Option<Arc<WarmingBuffer>>,
218}
219
220impl<S: SearchSink + 'static> Clone for IngestRuntime<S> {
221 fn clone(&self) -> Self {
222 Self {
223 store: self.store.clone(),
224 issue_states: self.issue_states.clone(),
225 pull_statuses: self.pull_statuses.clone(),
226 coverage: self.coverage.clone(),
227 search: self.search.clone(),
228 records: self.records.clone(),
229 resolver: self.resolver.clone(),
230 clock: self.clock.clone(),
231 entropy: self.entropy.clone(),
232 ws: self.ws.clone(),
233 cancel: self.cancel.clone(),
234 disconnects: self.disconnects.clone(),
235 warming_shadow: self.warming_shadow.clone(),
236 warming_buffer: self.warming_buffer.clone(),
237 }
238 }
239}
240
241impl<S: SearchSink + 'static> IngestRuntime<S> {
242 fn pipeline_ctx(&self) -> PipelineCtx<'_, S> {
243 PipelineCtx {
244 resolver: &self.resolver,
245 store: &self.store,
246 issue_states: &self.issue_states,
247 pull_statuses: &self.pull_statuses,
248 coverage: &self.coverage,
249 records: &*self.records,
250 search: &self.search,
251 shadow: self.warming_shadow.as_deref(),
252 buffer: self.warming_buffer.as_deref(),
253 }
254 }
255}
256
257struct PipelineCtx<'a, S: SearchSink + 'static> {
258 resolver: &'a RepoIdResolver,
259 store: &'a EdgeStore,
260 issue_states: &'a StateIndex<IssueStateKind>,
261 pull_statuses: &'a StateIndex<PullStatusKind>,
262 coverage: &'a CoverageWatch,
263 records: &'a dyn RecordStore,
264 search: &'a S,
265 shadow: Option<&'a WarmingShadowBuffer>,
266 buffer: Option<&'a WarmingBuffer>,
267}
268
269pub async fn run<S: SearchSink + 'static>(
270 config: IngestConfig,
271 runtime: IngestRuntime<S>,
272) -> Result<(), IngestError> {
273 let metrics_dumper = spawn_metrics_dumper(&runtime);
274 let idle_promoter = spawn_idle_promoter(&runtime);
275 let warming_flusher = spawn_warming_flusher(&runtime);
276 let result = run_inner(config, &runtime).await;
277 if let Err(join) = metrics_dumper.await {
278 warn!(?join, "metrics dumper task panicked");
279 }
280 if let Err(join) = idle_promoter.await {
281 warn!(?join, "idle promoter task panicked");
282 }
283 if let Some(handle) = warming_flusher
284 && let Err(join) = handle.await
285 {
286 warn!(?join, "warming flusher task panicked");
287 }
288 result
289}
290
291async fn run_inner<S: SearchSink + 'static>(
292 config: IngestConfig,
293 runtime: &IngestRuntime<S>,
294) -> Result<(), IngestError> {
295 let mut backoff = RECONNECT_INITIAL_DELAY;
296 loop {
297 let cursor = next_connect_cursor(runtime.coverage.snapshot(), config.start_cursor);
298 let SessionEnd { outcome, error } = run_session(&config, cursor, runtime).await;
299 if runtime.cancel.is_cancelled() {
300 info!(
301 last_cursor = runtime.coverage.snapshot().last_cursor().raw(),
302 "ingest stopped after shutdown signal"
303 );
304 return Ok(());
305 }
306 match (outcome, &error) {
307 (SessionOutcome::Progressed, None) => {
308 info!("hydrant stream closed after delivering frames, reconnecting")
309 }
310 (SessionOutcome::Empty, None) => {
311 warn!("hydrant stream closed without delivering frames")
312 }
313 (_, Some(err)) => warn!(?err, "hydrant stream errored"),
314 }
315 if let (Some(sink), Some(err)) = (runtime.disconnects.as_ref(), error.as_ref()) {
316 sink.record(DisconnectSnapshot {
317 kind: DisconnectKind::from_error(err),
318 message: err.to_string(),
319 at_unix_micros: runtime.clock.now_unix_micros(),
320 last_cursor: runtime.coverage.snapshot().last_cursor(),
321 });
322 }
323 let made_progress = matches!(outcome, SessionOutcome::Progressed);
324 if made_progress {
325 backoff = RECONNECT_INITIAL_DELAY;
326 } else {
327 tokio::select! {
328 biased;
329 _ = runtime.cancel.cancelled() => return Ok(()),
330 _ = runtime.clock.sleep(jittered(backoff, &*runtime.entropy)) => {}
331 }
332 backoff = (backoff * 2).min(RECONNECT_MAX_DELAY);
333 }
334 }
335}
336
337fn spawn_warming_flusher<S: SearchSink + 'static>(
338 runtime: &IngestRuntime<S>,
339) -> Option<tokio::task::JoinHandle<()>> {
340 runtime.warming_buffer.as_ref()?;
341 let rt = runtime.clone();
342 Some(tokio::spawn(async move {
343 let buffer = rt
344 .warming_buffer
345 .as_deref()
346 .expect("warming flusher only spawns when buffer is set");
347 let mut rx = rt.coverage.subscribe();
348 let reason = loop {
349 if rx.borrow_and_update().is_ready() {
350 break FlushReason::Ready;
351 }
352 tokio::select! {
353 biased;
354 _ = rt.cancel.cancelled() => break FlushReason::Cancelled,
355 res = rx.changed() => match res {
356 Ok(()) => continue,
357 Err(_) => break FlushReason::CoverageDropped,
358 },
359 }
360 };
361 flush_warming_buffer(&rt, buffer, reason).await;
362 }))
363}
364
365#[derive(Clone, Copy, Debug, Eq, PartialEq)]
366enum FlushReason {
367 Ready,
368 Cancelled,
369 CoverageDropped,
370}
371
372impl FlushReason {
373 fn as_str(self) -> &'static str {
374 match self {
375 FlushReason::Ready => "ready",
376 FlushReason::Cancelled => "cancelled",
377 FlushReason::CoverageDropped => "coverage_dropped",
378 }
379 }
380}
381
382async fn flush_warming_buffer<S: SearchSink + 'static>(
383 runtime: &IngestRuntime<S>,
384 buffer: &WarmingBuffer,
385 reason: FlushReason,
386) {
387 let drained = buffer.drain_for_promote().await;
388 if drained.is_empty() {
389 return;
390 }
391 if reason != FlushReason::Ready {
392 info!(
393 target: "bobbin_ingest::warming",
394 abandoned_entries = drained.len(),
395 reason = reason.as_str(),
396 "abandoning parked items on non-ready flush",
397 );
398 return;
399 }
400 let hasher = buffer.hasher().clone();
401 let unique: HashSet<RepoIdent, RuntimeHasher> = drained
402 .iter()
403 .flat_map(|(_, deps)| deps.iter().cloned())
404 .fold(HashSet::with_hasher(hasher), |mut acc, dep| {
405 acc.insert(dep);
406 acc
407 });
408 if !unique.is_empty() {
409 let resolver = runtime.resolver.clone();
410 let _: Vec<()> = futures::stream::iter(unique)
411 .map(|key| {
412 let resolver = resolver.clone();
413 async move {
414 let _ = resolver.resolve(&key.owner, &key.rkey).await;
415 }
416 })
417 .buffer_unordered(WARMING_FLUSH_PARALLELISM)
418 .collect()
419 .await;
420 }
421 let upserts: Vec<ParkedUpsert> = drained.into_iter().map(|(u, _)| u).collect();
422 let count = upserts.len();
423 let ctx = runtime.pipeline_ctx();
424 finalize_drained(&ctx, upserts).await;
425 info!(
426 target: "bobbin_ingest::warming",
427 flushed_entries = count,
428 reason = reason.as_str(),
429 "drained warming buffer",
430 );
431}
432
433const IDLE_PROMOTE_WINDOW: Duration = Duration::from_secs(15);
434const IDLE_PROMOTE_MIN_EVENTS: u64 = 256;
435
436fn spawn_idle_promoter<S: SearchSink + 'static>(
437 runtime: &IngestRuntime<S>,
438) -> tokio::task::JoinHandle<()> {
439 let rt = runtime.clone();
440 tokio::spawn(async move {
441 let mut prev = rt.coverage.snapshot().events_processed();
442 loop {
443 tokio::select! {
444 biased;
445 _ = rt.cancel.cancelled() => return,
446 _ = rt.clock.sleep(IDLE_PROMOTE_WINDOW) => {}
447 }
448 let snap = rt.coverage.snapshot();
449 if snap.is_ready() {
450 return;
451 }
452 let processed = snap.events_processed();
453 if processed >= IDLE_PROMOTE_MIN_EVENTS && processed == prev {
454 rt.coverage.update(|c| c.force_ready());
455 info!(
456 target: "bobbin_ingest::coverage",
457 events_processed = processed,
458 last_cursor = snap.last_cursor().raw(),
459 "stream idle, promoting coverage to ready",
460 );
461 return;
462 }
463 prev = processed;
464 }
465 })
466}
467
468fn spawn_metrics_dumper<S: SearchSink + 'static>(
469 runtime: &IngestRuntime<S>,
470) -> tokio::task::JoinHandle<()> {
471 let rt = runtime.clone();
472 tokio::spawn(async move {
473 loop {
474 tokio::select! {
475 biased;
476 _ = rt.cancel.cancelled() => break,
477 _ = rt.clock.sleep(METRICS_DUMP_INTERVAL) => {
478 let s = rt.resolver.stats();
479 info!(
480 target: "bobbin_ingest::metrics",
481 resolver_hits = s.hits,
482 resolver_misses_mapped = s.misses_mapped,
483 resolver_misses_no_repo_did = s.misses_no_repo_did,
484 resolver_misses_unresolvable = s.misses_unresolvable,
485 resolver_misses_transient = s.misses_transient,
486 resolver_misses_no_client = s.misses_no_client,
487 resolver_miss_latency_micros_avg = s.miss_latency_micros_avg().unwrap_or(0),
488 resolver_miss_latency_micros_max = s.miss_latency_micros_max,
489 resolver_total = s.total(),
490 "resolver stats",
491 );
492 }
493 }
494 }
495 })
496}
497
498fn next_connect_cursor(snapshot: Coverage, start: HydrantCursor) -> HydrantCursor {
499 if snapshot.events_processed() == 0 {
500 start
501 } else {
502 HydrantCursor::new(snapshot.last_cursor().raw().saturating_add(1))
503 }
504}
505
506fn jittered(base: Duration, entropy: &dyn Entropy) -> Duration {
507 let base_ms = u64::try_from(base.as_millis()).unwrap_or(u64::MAX);
508 let cap_ms = (base_ms / 4).max(1);
509 base + Duration::from_millis(entropy.next_u64() % cap_ms)
510}
511
512async fn run_session<S: SearchSink + 'static>(
513 config: &IngestConfig,
514 cursor: HydrantCursor,
515 runtime: &IngestRuntime<S>,
516) -> SessionEnd {
517 let url = match config.stream_url(cursor) {
518 Ok(u) => u,
519 Err(e) => {
520 return SessionEnd {
521 outcome: SessionOutcome::Empty,
522 error: Some(e),
523 };
524 }
525 };
526 info!(%url, "connecting to hydrant /stream");
527 let connect = tokio::select! {
528 biased;
529 _ = runtime.cancel.cancelled() => {
530 return SessionEnd { outcome: SessionOutcome::Empty, error: None };
531 }
532 res = runtime.ws.connect(url) => res,
533 };
534 let WsConn {
535 sink: mut ws_sink,
536 stream: ws_stream,
537 } = match connect {
538 Ok(c) => c,
539 Err(e) => {
540 return SessionEnd {
541 outcome: SessionOutcome::Empty,
542 error: Some(IngestError::Network(e)),
543 };
544 }
545 };
546
547 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH);
548 let parallelism = config.parallelism.get();
549 let processor_runtime = runtime.clone();
550 let processor = tokio::spawn(async move {
551 let cancel = processor_runtime.cancel.clone();
552 let prep_rt = processor_runtime.clone();
553 let resolve_rt = processor_runtime.clone();
554 let commit_rt = processor_runtime;
555
556 let pipeline = ReceiverStream::new(frame_rx)
557 .map(move |frame| prep_stage(frame, prep_rt.clone()))
558 .buffered(parallelism)
559 .map(move |staged| resolve_stage(staged, resolve_rt.clone()))
560 .buffered(parallelism)
561 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism));
562
563 tokio::select! {
564 biased;
565 _ = cancel.cancelled() => {},
566 _ = pipeline => {},
567 }
568 });
569
570 let (control_tx, mut control_rx) = tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
571 let session_cancel = runtime.cancel.child_token();
572 let reader_cancel = session_cancel.clone();
573 let reader = tokio::spawn(reader_loop(ws_stream, frame_tx, control_tx, reader_cancel));
574
575 let mut next_ping = runtime.clock.now_instant() + PING_INTERVAL;
576 let mut pong_deadline: Option<Instant> = None;
577
578 let writer_error: Option<IngestError> = loop {
579 tokio::select! {
580 biased;
581 _ = runtime.cancel.cancelled() => {
582 let _ = timed_send(
583 &mut ws_sink,
584 WsMessage::Close { code: NORMAL_CLOSE, reason: "bobbin shutdown".to_owned() },
585 ).await;
586 break None;
587 }
588 _ = runtime.clock.sleep_until(next_ping) => {
589 next_ping = runtime.clock.now_instant() + PING_INTERVAL;
590 if pong_deadline.is_none() {
591 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Ping(Bytes::new())).await {
592 break Some(e);
593 }
594 pong_deadline = Some(runtime.clock.now_instant() + PONG_TIMEOUT);
595 }
596 }
597 _ = wait_until(pong_deadline, runtime.clock.as_ref()) => {
598 break Some(IngestError::PongTimeout(PONG_TIMEOUT));
599 }
600 evt = control_rx.recv() => {
601 let Some(evt) = evt else { break None; };
602 match evt {
603 WsEvent::IncomingPing(payload) => {
604 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Pong(payload)).await {
605 break Some(e);
606 }
607 }
608 WsEvent::IncomingPong => {
609 pong_deadline = None;
610 }
611 }
612 }
613 }
614 };
615
616 session_cancel.cancel();
617 drop(ws_sink);
618 drop(control_rx);
619
620 let reader_end = reader.await.unwrap_or(SessionEnd {
621 outcome: SessionOutcome::Empty,
622 error: None,
623 });
624 if let Err(join) = processor.await {
625 warn!(?join, "frame processor task panicked");
626 }
627 SessionEnd {
628 outcome: reader_end.outcome,
629 error: writer_error.or(reader_end.error),
630 }
631}
632
633async fn timed_send(
634 sink: &mut Box<dyn bobbin_runtime::WsSink>,
635 msg: WsMessage,
636) -> Result<(), IngestError> {
637 match tokio::time::timeout(SEND_TIMEOUT, sink.send(msg)).await {
638 Ok(Ok(())) => Ok(()),
639 Ok(Err(e)) => Err(IngestError::Network(e)),
640 Err(_elapsed) => Err(IngestError::SendTimeout(SEND_TIMEOUT)),
641 }
642}
643
644async fn reader_loop(
645 mut ws_stream: Box<dyn WsStream>,
646 frame_tx: tokio::sync::mpsc::Sender<HydrantFrame>,
647 control_tx: tokio::sync::mpsc::Sender<WsEvent>,
648 cancel: CancellationToken,
649) -> SessionEnd {
650 let mut outcome = SessionOutcome::Empty;
651 let mut held: VecDeque<HydrantFrame> = VecDeque::new();
652
653 let error: Option<IngestError> = loop {
654 let step = if held.is_empty() {
655 tokio::select! {
656 biased;
657 _ = cancel.cancelled() => ReaderStep::Cancelled,
658 msg = ws_stream.next() => ReaderStep::WsMessage(msg),
659 }
660 } else if held.len() < READER_HOLD_LIMIT {
661 tokio::select! {
662 biased;
663 _ = cancel.cancelled() => ReaderStep::Cancelled,
664 permit_result = frame_tx.reserve() => match permit_result {
665 Ok(permit) => {
666 let frame = held
667 .pop_front()
668 .expect("non-empty held when reserve succeeds");
669 permit.send(frame);
670 ReaderStep::Sent
671 }
672 Err(_) => ReaderStep::FrameSinkClosed,
673 },
674 msg = ws_stream.next() => ReaderStep::WsMessage(msg),
675 }
676 } else {
677 tokio::select! {
678 biased;
679 _ = cancel.cancelled() => ReaderStep::Cancelled,
680 permit_result = frame_tx.reserve() => match permit_result {
681 Ok(permit) => {
682 let frame = held
683 .pop_front()
684 .expect("held at limit when reserve succeeds");
685 permit.send(frame);
686 ReaderStep::Sent
687 }
688 Err(_) => ReaderStep::FrameSinkClosed,
689 },
690 }
691 };
692
693 match step {
694 ReaderStep::Cancelled => break None,
695 ReaderStep::FrameSinkClosed => break None,
696 ReaderStep::Sent => {
697 outcome = SessionOutcome::Progressed;
698 }
699 ReaderStep::WsMessage(msg) => {
700 let Some(msg) = msg else {
701 break None;
702 };
703 let parsed = match msg {
704 Ok(m) => m,
705 Err(e) => break Some(IngestError::Network(e)),
706 };
707 match parsed {
708 WsMessage::Text(text) => {
709 let frame = match classify_text_frame(&text) {
710 Ok(f) => f,
711 Err(e) => break Some(e),
712 };
713 held.push_back(frame);
714 }
715 WsMessage::Binary(_) => {
716 debug!("hydrant sent unexpected binary frame, ignoring");
717 }
718 WsMessage::Ping(payload) => {
719 if control_tx
720 .send(WsEvent::IncomingPing(payload))
721 .await
722 .is_err()
723 {
724 break None;
725 }
726 }
727 WsMessage::Pong(_) => {
728 if control_tx.send(WsEvent::IncomingPong).await.is_err() {
729 break None;
730 }
731 }
732 WsMessage::Close { code, reason } => {
733 debug!(code, %reason, "hydrant closed stream");
734 break None;
735 }
736 }
737 }
738 }
739 };
740 SessionEnd { outcome, error }
741}
742
743enum ReaderStep {
744 Cancelled,
745 FrameSinkClosed,
746 Sent,
747 WsMessage(Option<Result<WsMessage, NetworkError>>),
748}
749
750fn classify_text_frame(text: &str) -> Result<HydrantFrame, IngestError> {
751 #[derive(serde::Deserialize)]
752 struct PeekType<'a> {
753 #[serde(rename = "type", borrow)]
754 kind: Option<std::borrow::Cow<'a, str>>,
755 }
756 let is_error_frame = serde_json::from_str::<PeekType>(text)
757 .ok()
758 .and_then(|p| p.kind)
759 .as_deref()
760 == Some("error");
761 if is_error_frame {
762 return match serde_json::from_str::<HydrantStreamErrorFrame>(text) {
763 Ok(err_frame) => Err(classify_hydrant_error(err_frame)),
764 Err(decode_err) => Err(IngestError::Decode(decode_err)),
765 };
766 }
767 serde_json::from_str::<HydrantFrame>(text).map_err(IngestError::Decode)
768}
769
770fn classify_hydrant_error(frame: HydrantStreamErrorFrame) -> IngestError {
771 let HydrantStreamErrorFrame { error, message } = frame;
772 let message = message.unwrap_or_default();
773 match error.as_str() {
774 "ConsumerTooSlow" => IngestError::ConsumerTooSlow { message },
775 _ => IngestError::HydrantStream {
776 code: error,
777 message,
778 },
779 }
780}
781
782#[derive(Debug)]
783enum WsEvent {
784 IncomingPing(Bytes),
785 IncomingPong,
786}
787
788async fn wait_until(deadline: Option<Instant>, clock: &dyn Clock) {
789 match deadline {
790 Some(d) => clock.sleep_until(d).await,
791 None => std::future::pending::<()>().await,
792 }
793}
794
795#[derive(Clone, Copy, Debug, Eq, PartialEq)]
796enum Regime {
797 Replay,
798 Live,
799 NonRecord,
800}
801
802impl Regime {
803 fn as_str(self) -> &'static str {
804 match self {
805 Self::Replay => "replay",
806 Self::Live => "live",
807 Self::NonRecord => "non_record",
808 }
809 }
810}
811
812struct Pending {
813 cursor: HydrantCursor,
814 signal: PromotionSignal,
815 regime: Regime,
816 op: PendingOp,
817}
818
819enum PendingOp {
820 Noop,
821 ClearCache {
822 source: AtUri<DefaultStr>,
823 },
824 Upsert {
825 source: AtUri<DefaultStr>,
826 nsid: Nsid<DefaultStr>,
827 parsed: Box<Record>,
828 bytes: Bytes,
829 cid: Option<Cid<DefaultStr>>,
830 edges: Vec<Edge>,
831 },
832 Parked {
833 nsid: Nsid<DefaultStr>,
834 },
835 Delete {
836 source: AtUri<DefaultStr>,
837 nsid: Nsid<DefaultStr>,
838 },
839}
840
841struct Prepared {
842 pending: Pending,
843 prepare_start: Instant,
844 prepare_end: Instant,
845}
846
847struct Resolved {
848 pending: Pending,
849 prepare_start: Instant,
850 prepare_end: Instant,
851 resolve_start: Instant,
852 resolve_end: Instant,
853}
854
855fn pending_nsid(op: &PendingOp) -> Option<&Nsid<DefaultStr>> {
856 match op {
857 PendingOp::Upsert { nsid, .. } => Some(nsid),
858 PendingOp::Delete { nsid, .. } => Some(nsid),
859 PendingOp::Parked { nsid, .. } => Some(nsid),
860 PendingOp::Noop | PendingOp::ClearCache { .. } => None,
861 }
862}
863
864fn pending_edge_count(op: &PendingOp) -> u64 {
865 match op {
866 PendingOp::Upsert { edges, .. } => edges.len() as u64,
867 _ => 0,
868 }
869}
870
871async fn prep_stage<S: SearchSink + 'static>(
872 frame: HydrantFrame,
873 rt: IngestRuntime<S>,
874) -> Prepared {
875 let now = rt.clock.now_unix_micros();
876 let prepare_start = rt.clock.now_instant();
877 let ctx = rt.pipeline_ctx();
878 let pending = prepare_frame(frame, &ctx, now).await;
879 let prepare_end = rt.clock.now_instant();
880 Prepared {
881 pending,
882 prepare_start,
883 prepare_end,
884 }
885}
886
887async fn resolve_stage<S: SearchSink + 'static>(
888 staged: Prepared,
889 rt: IngestRuntime<S>,
890) -> Resolved {
891 let resolve_start = rt.clock.now_instant();
892 let ctx = rt.pipeline_ctx();
893 let pending = resolve_pending(staged.pending, &ctx).await;
894 let resolve_end = rt.clock.now_instant();
895 Resolved {
896 pending,
897 prepare_start: staged.prepare_start,
898 prepare_end: staged.prepare_end,
899 resolve_start,
900 resolve_end,
901 }
902}
903
904async fn commit_stage<S: SearchSink + 'static>(
905 staged: Resolved,
906 rt: IngestRuntime<S>,
907 parallelism: usize,
908) {
909 let nsid = pending_nsid(&staged.pending.op).cloned();
910 let edge_count = pending_edge_count(&staged.pending.op);
911 let regime = staged.pending.regime;
912 let cursor = staged.pending.cursor.raw();
913 let Resolved {
914 pending,
915 prepare_start,
916 prepare_end,
917 resolve_start,
918 resolve_end,
919 } = staged;
920 let commit_start = rt.clock.now_instant();
921 commit_pending(
922 pending,
923 &rt.store,
924 &rt.issue_states,
925 &rt.pull_statuses,
926 &rt.coverage,
927 &*rt.search,
928 &*rt.records,
929 &rt.resolver,
930 )
931 .await;
932 let commit_end = rt.clock.now_instant();
933 tracing::trace!(
934 target: "bobbin_ingest::stage",
935 cursor,
936 regime = regime.as_str(),
937 nsid = %nsid.as_ref().map(Nsid::as_str).unwrap_or(""),
938 edge_count,
939 prepare_us = prepare_end.duration_since(prepare_start).as_micros() as u64,
940 queue_resolve_wait_us = resolve_start.duration_since(prepare_end).as_micros() as u64,
941 resolve_us = resolve_end.duration_since(resolve_start).as_micros() as u64,
942 queue_commit_wait_us = commit_start.duration_since(resolve_end).as_micros() as u64,
943 commit_us = commit_end.duration_since(commit_start).as_micros() as u64,
944 total_us = commit_end.duration_since(prepare_start).as_micros() as u64,
945 parallelism,
946 "pipeline stage timings",
947 );
948}
949
950async fn prepare_frame<S: SearchSink + 'static>(
951 frame: HydrantFrame,
952 ctx: &PipelineCtx<'_, S>,
953 now: UnixMicros,
954) -> Pending {
955 let cursor = HydrantCursor::new(frame.id);
956 let signal = promotion_signal(frame.record.as_ref(), now);
957 let regime = match frame.record.as_ref() {
958 Some(r) if r.live => Regime::Live,
959 Some(_) => Regime::Replay,
960 None => Regime::NonRecord,
961 };
962 let op = match frame.kind {
963 FrameKind::Record => prepare_record(frame.record, ctx).await,
964 FrameKind::Identity | FrameKind::Account => PendingOp::Noop,
965 FrameKind::Other => {
966 debug!(id = frame.id, "ignoring unknown hydrant frame kind");
967 PendingOp::Noop
968 }
969 };
970 Pending {
971 cursor,
972 signal,
973 regime,
974 op,
975 }
976}
977
978async fn prepare_record<S: SearchSink + 'static>(
979 record: Option<RecordFrame>,
980 ctx: &PipelineCtx<'_, S>,
981) -> PendingOp {
982 let Some(record) = record else {
983 debug!("record-typed frame missing payload, skipping");
984 return PendingOp::Noop;
985 };
986 if !record.collection.as_ref().starts_with(TANGLED_PREFIX) {
987 return PendingOp::Noop;
988 }
989 let nsid = record.collection.clone();
990 let source = match build_source_uri(&record) {
991 Ok(s) => s,
992 Err(e) => {
993 warn!(?e, "invalid frame source, dropping record");
994 return PendingOp::Noop;
995 }
996 };
997 match record.action {
998 RecordAction::Create | RecordAction::Update => {
999 evict_from_buffer(ctx.buffer, &source).await;
1000 let Some(raw) = record.record else {
1001 debug!(collection = %nsid, "create/update missing record body, clearing cache");
1002 return PendingOp::ClearCache { source };
1003 };
1004 let raw_bytes = Bytes::copy_from_slice(raw.get().as_bytes());
1005 let wire_bytes = match fallback_rfc3339(&record.rkey, &record.rev)
1006 .and_then(|fallback| synthesize_created_at(&raw_bytes, &fallback))
1007 {
1008 Some(patched) => Bytes::from(patched),
1009 None => raw_bytes,
1010 };
1011 let (parsed, bytes) = match decode_canon_or_upgrade_bytes(
1012 &record.collection,
1013 &wire_bytes,
1014 ctx.resolver,
1015 )
1016 .await
1017 {
1018 Ok((parsed, canon_bytes)) => {
1019 let bytes = match canon_bytes {
1020 std::borrow::Cow::Borrowed(_) => wire_bytes,
1021 std::borrow::Cow::Owned(v) => Bytes::from(v),
1022 };
1023 (parsed, bytes)
1024 }
1025 Err(ExtractError::UnknownCollection(name)) => {
1026 debug!(collection = %name, "unknown sh.tangled.* collection, clearing cache");
1027 return PendingOp::ClearCache { source };
1028 }
1029 Err(e) => {
1030 warn!(?e, collection = %record.collection, "record decode failed, clearing cache");
1031 return PendingOp::ClearCache { source };
1032 }
1033 };
1034 if let Record::Repo(repo) = &parsed {
1035 if let Some(shadow) = ctx.shadow {
1036 shadow.note_observed(&record.did, &record.rkey).await;
1037 }
1038 let superseded = ctx
1039 .resolver
1040 .observe(
1041 record.did.clone(),
1042 record.rkey.clone(),
1043 repo.repo_did.clone(),
1044 )
1045 .await;
1046 if let Some(prior) = superseded {
1047 let prior_uri = format!(
1048 "at://{}/sh.tangled.repo/{}",
1049 prior.owner.as_ref(),
1050 prior.rkey.as_ref(),
1051 );
1052 if let Ok(prior_at_uri) = AtUri::<DefaultStr>::new_owned(&prior_uri) {
1053 evict_from_buffer(ctx.buffer, &prior_at_uri).await;
1054 ctx.store.remove_source(&prior_at_uri);
1055 ctx.records.remove(&prior_at_uri);
1056 ctx.search.remove(&prior_at_uri).await;
1057 }
1058 }
1059 if let Some(buffer) = ctx.buffer {
1060 let drained = buffer.take_observed(&record.did, &record.rkey).await;
1061 if !drained.is_empty() {
1062 finalize_drained(ctx, drained).await;
1063 }
1064 }
1065 }
1066 let edges = match parsed.extract_edges(&source) {
1067 Ok(es) => es,
1068 Err(e) => {
1069 warn!(?e, "edge extraction failed, clearing cache");
1070 return PendingOp::ClearCache { source };
1071 }
1072 };
1073 let _ = ctx.store.intern_source(&source);
1074 PendingOp::Upsert {
1075 source,
1076 nsid,
1077 parsed: Box::new(parsed),
1078 bytes,
1079 cid: record.cid,
1080 edges,
1081 }
1082 }
1083 RecordAction::Delete => {
1084 evict_from_buffer(ctx.buffer, &source).await;
1085 if nsid.as_ref() == "sh.tangled.repo" {
1086 ctx.resolver.forget(&record.did, &record.rkey).await;
1087 }
1088 PendingOp::Delete { source, nsid }
1089 }
1090 RecordAction::Other => {
1091 debug!(collection = %nsid, "ignoring unknown record action");
1092 PendingOp::Noop
1093 }
1094 }
1095}
1096
1097fn fallback_rfc3339(
1098 rkey: &Rkey<DefaultStr>,
1099 rev: &jacquard_common::types::tid::Tid,
1100) -> Option<String> {
1101 let tid = jacquard_common::types::tid::Tid::new(rkey.as_ref())
1102 .ok()
1103 .unwrap_or_else(|| rev.clone());
1104 let micros = i64::try_from(tid.timestamp()).ok()?;
1105 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(micros)?;
1106 Some(dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true))
1107}
1108
1109async fn evict_from_buffer(buffer: Option<&WarmingBuffer>, source: &AtUri<DefaultStr>) {
1110 if let Some(buffer) = buffer
1111 && !buffer.is_sealed()
1112 {
1113 buffer.evict_source(source).await;
1114 }
1115}
1116
1117async fn resolve_pending<S: SearchSink + 'static>(
1118 pending: Pending,
1119 ctx: &PipelineCtx<'_, S>,
1120) -> Pending {
1121 let Pending {
1122 cursor,
1123 signal,
1124 regime,
1125 op,
1126 } = pending;
1127 let op = match op {
1128 PendingOp::Upsert {
1129 source,
1130 nsid,
1131 parsed,
1132 bytes,
1133 cid,
1134 edges,
1135 } => {
1136 let pieces = UpsertPieces {
1137 source,
1138 nsid,
1139 parsed: *parsed,
1140 bytes,
1141 cid,
1142 edges,
1143 };
1144 let pieces = match try_park_warming(ctx, cursor, pieces).await {
1145 ParkOutcome::Parked { nsid } => {
1146 return Pending {
1147 cursor,
1148 signal,
1149 regime,
1150 op: PendingOp::Parked { nsid },
1151 };
1152 }
1153 ParkOutcome::Passthrough(pieces) => *pieces,
1154 };
1155 let UpsertPieces {
1156 source,
1157 nsid,
1158 parsed,
1159 bytes,
1160 cid,
1161 edges,
1162 } = pieces;
1163 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, ctx.shadow).await;
1164 PendingOp::Upsert {
1165 source,
1166 nsid,
1167 parsed: Box::new(parsed),
1168 bytes,
1169 cid,
1170 edges,
1171 }
1172 }
1173 other => other,
1174 };
1175 Pending {
1176 cursor,
1177 signal,
1178 regime,
1179 op,
1180 }
1181}
1182
1183struct UpsertPieces {
1184 source: AtUri<DefaultStr>,
1185 nsid: Nsid<DefaultStr>,
1186 parsed: Record,
1187 bytes: Bytes,
1188 cid: Option<Cid<DefaultStr>>,
1189 edges: Vec<Edge>,
1190}
1191
1192impl From<ParkedUpsert> for UpsertPieces {
1193 fn from(u: ParkedUpsert) -> Self {
1194 Self {
1195 source: u.source,
1196 nsid: u.nsid,
1197 parsed: u.parsed,
1198 bytes: u.bytes,
1199 cid: u.cid,
1200 edges: u.edges,
1201 }
1202 }
1203}
1204
1205enum ParkOutcome {
1206 Parked { nsid: Nsid<DefaultStr> },
1207 Passthrough(Box<UpsertPieces>),
1208}
1209
1210async fn try_park_warming<S: SearchSink + 'static>(
1211 ctx: &PipelineCtx<'_, S>,
1212 cursor: HydrantCursor,
1213 pieces: UpsertPieces,
1214) -> ParkOutcome {
1215 let Some(buffer) = ctx.buffer else {
1216 return ParkOutcome::Passthrough(Box::new(pieces));
1217 };
1218 if ctx.coverage.snapshot().is_ready() || buffer.is_sealed() {
1219 return ParkOutcome::Passthrough(Box::new(pieces));
1220 }
1221 let deps = collect_unresolved_deps(&pieces.edges, ctx.resolver).await;
1222 if deps.is_empty() {
1223 return ParkOutcome::Passthrough(Box::new(pieces));
1224 }
1225 let nsid = pieces.nsid.clone();
1226 let upsert = ParkedUpsert {
1227 cursor,
1228 source: pieces.source,
1229 nsid: pieces.nsid,
1230 parsed: pieces.parsed,
1231 bytes: pieces.bytes,
1232 cid: pieces.cid,
1233 edges: pieces.edges,
1234 };
1235 let deps_for_shadow = ctx.shadow.is_some().then(|| deps.clone());
1236 match buffer.try_park(upsert, deps).await {
1237 Ok(()) => {
1238 if let Some((shadow, noted)) = ctx.shadow.zip(deps_for_shadow) {
1239 let _ = futures::future::join_all(noted.into_iter().map(|dep| async move {
1240 shadow.note_unresolved(dep.owner, dep.rkey).await;
1241 }))
1242 .await;
1243 }
1244 ParkOutcome::Parked { nsid }
1245 }
1246 Err(returned) => ParkOutcome::Passthrough(Box::new(returned.into())),
1247 }
1248}
1249
1250async fn collect_unresolved_deps(edges: &[Edge], resolver: &RepoIdResolver) -> Vec<RepoIdent> {
1251 futures::stream::iter(edges)
1252 .fold(Vec::new(), |mut acc, edge| async move {
1253 let Some(uri) = edge.subject.as_uri() else {
1254 return acc;
1255 };
1256 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else {
1257 return acc;
1258 };
1259 if resolver.cached_resolution(&owner, &rkey).await.is_some() {
1260 return acc;
1261 }
1262 let candidate = RepoIdent::new(owner, rkey);
1263 if !acc.contains(&candidate) {
1264 acc.push(candidate);
1265 }
1266 acc
1267 })
1268 .await
1269}
1270
1271async fn finalize_drained<S: SearchSink + 'static>(
1272 ctx: &PipelineCtx<'_, S>,
1273 drained: Vec<ParkedUpsert>,
1274) {
1275 for upsert in drained {
1276 let ParkedUpsert {
1277 cursor: _,
1278 source,
1279 nsid: _,
1280 parsed,
1281 bytes,
1282 cid,
1283 edges,
1284 } = upsert;
1285 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, None).await;
1286 cache_body(ctx.records, &source, cid, bytes);
1287 ctx.store.upsert_source(&source, edges);
1288 let outcome = apply_record_state(ctx.issue_states, ctx.pull_statuses, &source, &parsed);
1289 log_unknown_state_variant(outcome, &source);
1290 index_search(ctx.search, ctx.resolver, &source, parsed).await;
1291 }
1292}
1293
1294#[allow(clippy::too_many_arguments)]
1295async fn commit_pending<S: SearchSink>(
1296 pending: Pending,
1297 store: &EdgeStore,
1298 issue_states: &StateIndex<IssueStateKind>,
1299 pull_statuses: &StateIndex<PullStatusKind>,
1300 coverage: &CoverageWatch,
1301 search: &S,
1302 records: &dyn RecordStore,
1303 resolver: &RepoIdResolver,
1304) {
1305 let Pending {
1306 cursor,
1307 signal,
1308 regime: _,
1309 op,
1310 } = pending;
1311 match op {
1312 PendingOp::Noop | PendingOp::Parked { .. } => {}
1313 PendingOp::ClearCache { source } => records.remove(&source),
1314 PendingOp::Upsert {
1315 source,
1316 nsid: _,
1317 parsed,
1318 bytes,
1319 cid,
1320 edges,
1321 } => {
1322 cache_body(records, &source, cid, bytes);
1323 store.upsert_source(&source, edges);
1324 let outcome = apply_record_state(issue_states, pull_statuses, &source, &parsed);
1325 log_unknown_state_variant(outcome, &source);
1326 index_search(search, resolver, &source, *parsed).await;
1327 }
1328 PendingOp::Delete { source, nsid } => {
1329 store.remove_source(&source);
1330 apply_delete_to_state_index(issue_states, pull_statuses, &source, &nsid);
1331 records.remove(&source);
1332 search.remove(&source).await;
1333 }
1334 }
1335 coverage.update(|c| c.advance(cursor).maybe_promote(signal));
1336}
1337
1338async fn index_search<S: SearchSink>(
1339 search: &S,
1340 resolver: &RepoIdResolver,
1341 source: &AtUri<DefaultStr>,
1342 parsed: Record,
1343) {
1344 let Some(searchable) = SearchableRecord::try_from_record(parsed) else {
1345 return;
1346 };
1347 let Some(searchable) = searchable.normalize(resolver).await else {
1348 return;
1349 };
1350 search.upsert(searchable.to_search_doc(source)).await;
1351}
1352
1353#[cfg(test)]
1354#[allow(clippy::too_many_arguments)]
1355async fn handle_frame<S: SearchSink + 'static>(
1356 frame: HydrantFrame,
1357 store: &EdgeStore,
1358 issue_states: &StateIndex<IssueStateKind>,
1359 pull_statuses: &StateIndex<PullStatusKind>,
1360 coverage: &CoverageWatch,
1361 search: &S,
1362 records: &dyn RecordStore,
1363 resolver: &RepoIdResolver,
1364 clock: &dyn Clock,
1365 now: UnixMicros,
1366) {
1367 let _ = clock;
1368 let ctx = PipelineCtx {
1369 resolver,
1370 store,
1371 issue_states,
1372 pull_statuses,
1373 coverage,
1374 records,
1375 search,
1376 shadow: None,
1377 buffer: None,
1378 };
1379 let pending = prepare_frame(frame, &ctx, now).await;
1380 let pending = resolve_pending(pending, &ctx).await;
1381 commit_pending(
1382 pending,
1383 store,
1384 issue_states,
1385 pull_statuses,
1386 coverage,
1387 search,
1388 records,
1389 resolver,
1390 )
1391 .await;
1392}
1393
1394fn log_unknown_state_variant(outcome: ApplyOutcome, source: &AtUri<DefaultStr>) {
1395 if matches!(outcome, ApplyOutcome::UnknownVariant) {
1396 warn!(
1397 target: "bobbin_ingest::state_index",
1398 %source,
1399 "state record has unknown wire variant, skipping index update",
1400 );
1401 }
1402}
1403
1404fn apply_delete_to_state_index(
1405 issue_states: &StateIndex<IssueStateKind>,
1406 pull_statuses: &StateIndex<PullStatusKind>,
1407 source: &AtUri<DefaultStr>,
1408 nsid: &Nsid<DefaultStr>,
1409) {
1410 match nsid.as_ref() {
1411 "sh.tangled.repo.issue" => issue_states.remove_entity(source),
1412 "sh.tangled.repo.pull" => pull_statuses.remove_entity(source),
1413 "sh.tangled.repo.issue.state" => issue_states.remove_source(source),
1414 "sh.tangled.repo.pull.status" => pull_statuses.remove_source(source),
1415 _ => {}
1416 }
1417}
1418
1419fn promotion_signal(record: Option<&RecordFrame>, now: UnixMicros) -> PromotionSignal {
1420 PromotionSignal {
1421 rev_micros: record.map(|r| r.rev.timestamp()),
1422 now_micros: now.raw(),
1423 skew_micros: READY_SKEW.as_micros() as u64,
1424 }
1425}
1426
1427fn cache_body(
1428 records: &dyn RecordStore,
1429 source: &AtUri<DefaultStr>,
1430 cid: Option<Cid<DefaultStr>>,
1431 bytes: Bytes,
1432) {
1433 match cid {
1434 Some(cid) => records.put(
1435 source.clone(),
1436 Arc::new(RecordBody {
1437 uri: source.clone(),
1438 cid,
1439 value: bytes,
1440 }),
1441 ),
1442 None => records.remove(source),
1443 }
1444}
1445
1446async fn normalize_subjects(
1447 edges: Vec<Edge>,
1448 resolver: &RepoIdResolver,
1449 coverage: &CoverageWatch,
1450 shadow: Option<&WarmingShadowBuffer>,
1451) -> Vec<Edge> {
1452 let warming = shadow.is_some() && !coverage.snapshot().is_ready();
1453 futures::stream::iter(edges)
1454 .filter_map(|edge| async move {
1455 let Some(uri) = edge.subject.as_uri() else {
1456 return Some(edge);
1457 };
1458 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else {
1459 return Some(edge);
1460 };
1461 if warming
1462 && let Some(shadow) = shadow
1463 && resolver.cached_resolution(&owner, &rkey).await.is_none()
1464 {
1465 shadow
1466 .note_unresolved(owner.clone(), rkey.clone())
1467 .await;
1468 }
1469 match resolver.resolve(&owner, &rkey).await {
1470 Resolution::Mapped(repo_did) => Some(Edge {
1471 subject: SubjectRef::Did(repo_did),
1472 ..edge
1473 }),
1474 Resolution::NoRepoDid => {
1475 warn!(
1476 target: "bobbin_ingest::normalize",
1477 kind = %edge.kind,
1478 owner = owner.as_ref(),
1479 rkey = rkey.as_ref(),
1480 source = edge.source.as_ref(),
1481 "dropping edge: target repo has no repoDid, no canonical DID subject available",
1482 );
1483 None
1484 }
1485 Resolution::Unresolvable => {
1486 warn!(
1487 target: "bobbin_ingest::normalize",
1488 kind = %edge.kind,
1489 owner = owner.as_ref(),
1490 rkey = rkey.as_ref(),
1491 source = edge.source.as_ref(),
1492 "dropping edge: repo unresolvable, rkey-form subject will not match bare-DID queries",
1493 );
1494 None
1495 }
1496 }
1497 })
1498 .collect()
1499 .await
1500}
1501
1502fn parse_repo_subject_uri(uri: &AtUri<DefaultStr>) -> Option<(Did<DefaultStr>, Rkey<DefaultStr>)> {
1503 let collection = uri.collection()?;
1504 if collection.as_ref() != "sh.tangled.repo" {
1505 return None;
1506 }
1507 let AtIdentifier::Did(authority) = uri.authority() else {
1508 return None;
1509 };
1510 let rkey = uri.rkey()?;
1511 let owner = Did::new_owned(authority.as_ref()).ok()?;
1512 let rkey = Rkey::new_owned(rkey.as_ref()).ok()?;
1513 Some((owner, rkey))
1514}
1515
1516fn build_source_uri(r: &RecordFrame) -> Result<AtUri<DefaultStr>, IngestError> {
1517 Ok(AtUri::from_parts_owned(
1518 r.did.as_ref(),
1519 r.collection.as_ref(),
1520 r.rkey.as_ref(),
1521 )?)
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526 use super::*;
1527 use bobbin_edge_index::Coverage;
1528 use bobbin_record_lru::{CacheCapacity, LruRecordStore, NoopRecordStore, RecordStore};
1529 use bobbin_runtime::{OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs};
1530 use bobbin_types::search::NoopSearchSink;
1531 use jacquard_common::types::nsid::Nsid;
1532 use jacquard_common::types::tid::Tid;
1533 use serde_json::json;
1534
1535 const VALID_CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
1536
1537 fn did_subj(s: &str) -> SubjectRef {
1538 SubjectRef::Did(Did::new_owned(s).unwrap())
1539 }
1540
1541 fn uri_subj(s: &str) -> SubjectRef {
1542 SubjectRef::Uri(AtUri::new_owned(s).unwrap())
1543 }
1544
1545 fn rkey(s: &str) -> Rkey<DefaultStr> {
1546 Rkey::new_owned(s).unwrap()
1547 }
1548
1549 #[allow(clippy::type_complexity)]
1550 fn fresh() -> (
1551 Arc<EdgeStore>,
1552 Arc<StateIndex<IssueStateKind>>,
1553 Arc<StateIndex<PullStatusKind>>,
1554 Arc<CoverageWatch>,
1555 Arc<RepoIdResolver>,
1556 ) {
1557 (
1558 Arc::new(EdgeStore::new(RuntimeHasher::default())),
1559 Arc::new(StateIndex::new(RuntimeHasher::default())),
1560 Arc::new(StateIndex::new(RuntimeHasher::default())),
1561 Arc::new(CoverageWatch::new()),
1562 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
1563 )
1564 }
1565
1566 fn now() -> UnixMicros {
1567 SystemClock::new().now_unix_micros()
1568 }
1569
1570 fn sys_clock() -> SystemClock {
1571 SystemClock::new()
1572 }
1573
1574 fn parse_frame(value: serde_json::Value) -> HydrantFrame {
1575 let text = serde_json::to_string(&value).expect("serialize fixture");
1576 serde_json::from_str(&text).expect("deserialize fixture")
1577 }
1578
1579 fn fresh_tid() -> Tid {
1580 Tid::now_0()
1581 }
1582
1583 #[tokio::test]
1584 async fn ignores_non_tangled_collections() {
1585 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1586 let frame: HydrantFrame = parse_frame(json!({
1587 "id": 1,
1588 "type": "record",
1589 "record": {
1590 "live": false,
1591 "did": "did:plc:nel",
1592 "rev": fresh_tid().as_str(),
1593 "collection": "app.bsky.feed.post",
1594 "rkey": "abcabcabcabcz",
1595 "action": "create",
1596 "record": {"$type": "app.bsky.feed.post", "text": "hi"}
1597 }
1598 }));
1599 handle_frame(
1600 frame,
1601 &store,
1602 &issue_states,
1603 &pull_statuses,
1604 &cov,
1605 &NoopSearchSink,
1606 &NoopRecordStore,
1607 &resolver,
1608 &sys_clock(),
1609 now(),
1610 )
1611 .await;
1612 assert_eq!(store.key_count(), 0);
1613 assert_eq!(cov.snapshot().events_processed(), 1);
1614 }
1615
1616 #[tokio::test]
1617 async fn create_then_delete_round_trips_a_star() {
1618 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1619 let create: HydrantFrame = parse_frame(json!({
1620 "id": 10,
1621 "type": "record",
1622 "record": {
1623 "live": false,
1624 "did": "did:plc:olaren",
1625 "rev": fresh_tid().as_str(),
1626 "collection": "sh.tangled.feed.star",
1627 "rkey": "abcabcabcabcz",
1628 "action": "create",
1629 "record": {
1630 "$type": "sh.tangled.feed.star",
1631 "createdAt": "2026-05-01T00:00:00Z",
1632 "subjectDid": "did:plc:abalone"
1633 }
1634 }
1635 }));
1636 handle_frame(
1637 create,
1638 &store,
1639 &issue_states,
1640 &pull_statuses,
1641 &cov,
1642 &NoopSearchSink,
1643 &NoopRecordStore,
1644 &resolver,
1645 &sys_clock(),
1646 now(),
1647 )
1648 .await;
1649 let key = bobbin_types::ids::EdgeKey::new(
1650 Nsid::new_static("sh.tangled.feed.star").unwrap(),
1651 did_subj("did:plc:abalone"),
1652 );
1653 assert_eq!(store.count(&key), 1);
1654
1655 let delete: HydrantFrame = parse_frame(json!({
1656 "id": 11,
1657 "type": "record",
1658 "record": {
1659 "live": false,
1660 "did": "did:plc:olaren",
1661 "rev": fresh_tid().as_str(),
1662 "collection": "sh.tangled.feed.star",
1663 "rkey": "abcabcabcabcz",
1664 "action": "delete",
1665 "record": null
1666 }
1667 }));
1668 handle_frame(
1669 delete,
1670 &store,
1671 &issue_states,
1672 &pull_statuses,
1673 &cov,
1674 &NoopSearchSink,
1675 &NoopRecordStore,
1676 &resolver,
1677 &sys_clock(),
1678 now(),
1679 )
1680 .await;
1681 assert_eq!(store.count(&key), 0);
1682 }
1683
1684 #[tokio::test]
1685 async fn update_replaces_prior_edges() {
1686 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1687 let mk = |subject_did: &Did<DefaultStr>, id: u64| -> HydrantFrame {
1688 parse_frame(json!({
1689 "id": id,
1690 "type": "record",
1691 "record": {
1692 "live": false,
1693 "did": "did:plc:olaren",
1694 "rev": fresh_tid().as_str(),
1695 "collection": "sh.tangled.feed.star",
1696 "rkey": "abcabcabcabcz",
1697 "action": "update",
1698 "record": {
1699 "$type": "sh.tangled.feed.star",
1700 "createdAt": "2026-05-01T00:00:00Z",
1701 "subjectDid": subject_did.as_ref()
1702 }
1703 }
1704 }))
1705 };
1706 handle_frame(
1707 mk(&Did::new_owned("did:plc:abalone").unwrap(), 1),
1708 &store,
1709 &issue_states,
1710 &pull_statuses,
1711 &cov,
1712 &NoopSearchSink,
1713 &NoopRecordStore,
1714 &resolver,
1715 &sys_clock(),
1716 now(),
1717 )
1718 .await;
1719 handle_frame(
1720 mk(&Did::new_owned("did:plc:uni").unwrap(), 2),
1721 &store,
1722 &issue_states,
1723 &pull_statuses,
1724 &cov,
1725 &NoopSearchSink,
1726 &NoopRecordStore,
1727 &resolver,
1728 &sys_clock(),
1729 now(),
1730 )
1731 .await;
1732
1733 let kind = Nsid::new_static("sh.tangled.feed.star").unwrap();
1734 let old = bobbin_types::ids::EdgeKey::new(kind.clone(), did_subj("did:plc:abalone"));
1735 let new = bobbin_types::ids::EdgeKey::new(kind, did_subj("did:plc:uni"));
1736 assert_eq!(store.count(&old), 0);
1737 assert_eq!(store.count(&new), 1);
1738 }
1739
1740 #[tokio::test]
1741 async fn prepare_frame_tags_regime_from_live_flag() {
1742 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1743 let search = NoopSearchSink;
1744 let records = NoopRecordStore;
1745 let ctx = PipelineCtx {
1746 resolver: &resolver,
1747 store: &store,
1748 issue_states: &issue_states,
1749 pull_statuses: &pull_statuses,
1750 coverage: &cov,
1751 records: &records,
1752 search: &search,
1753 shadow: None,
1754 buffer: None,
1755 };
1756 let mk = |live: bool| -> HydrantFrame {
1757 parse_frame(json!({
1758 "id": 1,
1759 "type": "record",
1760 "record": {
1761 "live": live,
1762 "did": "did:plc:olaren",
1763 "rev": fresh_tid().as_str(),
1764 "collection": "sh.tangled.feed.star",
1765 "rkey": "abcabcabcabcz",
1766 "action": "create",
1767 "record": {
1768 "$type": "sh.tangled.feed.star",
1769 "createdAt": "2026-05-01T00:00:00Z",
1770 "subjectDid": "did:plc:abalone"
1771 }
1772 }
1773 }))
1774 };
1775 let live_pending = prepare_frame(mk(true), &ctx, now()).await;
1776 assert_eq!(live_pending.regime, Regime::Live);
1777 let replay_pending = prepare_frame(mk(false), &ctx, now()).await;
1778 assert_eq!(replay_pending.regime, Regime::Replay);
1779
1780 let identity: HydrantFrame = parse_frame(json!({
1781 "id": 9,
1782 "type": "identity",
1783 }));
1784 let id_pending = prepare_frame(identity, &ctx, now()).await;
1785 assert_eq!(id_pending.regime, Regime::NonRecord);
1786 }
1787
1788 #[tokio::test]
1789 async fn create_with_cid_warms_record_lru() {
1790 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1791 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
1792 let frame: HydrantFrame = parse_frame(json!({
1793 "id": 1,
1794 "type": "record",
1795 "record": {
1796 "live": false,
1797 "did": "did:plc:olaren",
1798 "rev": fresh_tid().as_str(),
1799 "collection": "sh.tangled.feed.star",
1800 "rkey": "abcabcabcabcz",
1801 "action": "create",
1802 "cid": VALID_CID,
1803 "record": {
1804 "$type": "sh.tangled.feed.star",
1805 "createdAt": "2026-05-01T00:00:00Z",
1806 "subjectDid": "did:plc:abalone"
1807 }
1808 }
1809 }));
1810 let source =
1811 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap();
1812 handle_frame(
1813 frame,
1814 &store,
1815 &issue_states,
1816 &pull_statuses,
1817 &cov,
1818 &NoopSearchSink,
1819 &lru,
1820 &resolver,
1821 &sys_clock(),
1822 now(),
1823 )
1824 .await;
1825 let cached = lru.get(&source).expect("hydrant cid must seed the lru");
1826 assert_eq!(cached.cid.as_ref(), VALID_CID);
1827 let parsed: serde_json::Value = serde_json::from_slice(&cached.value).unwrap();
1828 assert_eq!(
1829 parsed["subject"]["did"], "did:plc:abalone",
1830 "legacy wire is upgraded to canon shape before caching so downstream readers see canonical fields"
1831 );
1832 }
1833
1834 #[tokio::test]
1835 async fn create_without_cid_clears_record_lru() {
1836 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1837 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
1838 let source =
1839 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap();
1840 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap();
1841 lru.put(
1842 source.clone(),
1843 Arc::new(RecordBody {
1844 uri: source.clone(),
1845 cid,
1846 value: bytes::Bytes::from_static(b"{\"stale\":true}"),
1847 }),
1848 );
1849 let frame: HydrantFrame = parse_frame(json!({
1850 "id": 1,
1851 "type": "record",
1852 "record": {
1853 "live": false,
1854 "did": "did:plc:olaren",
1855 "rev": fresh_tid().as_str(),
1856 "collection": "sh.tangled.feed.star",
1857 "rkey": "abcabcabcabcz",
1858 "action": "update",
1859 "record": {
1860 "$type": "sh.tangled.feed.star",
1861 "createdAt": "2026-05-01T00:00:00Z",
1862 "subjectDid": "did:plc:abalone"
1863 }
1864 }
1865 }));
1866 handle_frame(
1867 frame,
1868 &store,
1869 &issue_states,
1870 &pull_statuses,
1871 &cov,
1872 &NoopSearchSink,
1873 &lru,
1874 &resolver,
1875 &sys_clock(),
1876 now(),
1877 )
1878 .await;
1879 assert!(
1880 lru.get(&source).is_none(),
1881 "missing cid means we cannot trust the body, so the lru must be cleared",
1882 );
1883 }
1884
1885 #[tokio::test]
1886 async fn delete_evicts_record_lru() {
1887 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1888 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
1889 let source =
1890 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap();
1891 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap();
1892 lru.put(
1893 source.clone(),
1894 Arc::new(RecordBody {
1895 uri: source.clone(),
1896 cid,
1897 value: bytes::Bytes::from_static(b"{}"),
1898 }),
1899 );
1900 let frame: HydrantFrame = parse_frame(json!({
1901 "id": 1,
1902 "type": "record",
1903 "record": {
1904 "live": false,
1905 "did": "did:plc:olaren",
1906 "rev": fresh_tid().as_str(),
1907 "collection": "sh.tangled.feed.star",
1908 "rkey": "abcabcabcabcz",
1909 "action": "delete",
1910 "record": null
1911 }
1912 }));
1913 handle_frame(
1914 frame,
1915 &store,
1916 &issue_states,
1917 &pull_statuses,
1918 &cov,
1919 &NoopSearchSink,
1920 &lru,
1921 &resolver,
1922 &sys_clock(),
1923 now(),
1924 )
1925 .await;
1926 assert!(lru.get(&source).is_none());
1927 }
1928
1929 #[tokio::test]
1930 async fn live_recent_event_promotes_coverage_to_ready() {
1931 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1932 assert!(!cov.snapshot().is_ready());
1933 let frame: HydrantFrame = parse_frame(json!({
1934 "id": 99,
1935 "type": "record",
1936 "record": {
1937 "live": true,
1938 "did": "did:plc:olaren",
1939 "rev": fresh_tid().as_str(),
1940 "collection": "sh.tangled.feed.star",
1941 "rkey": "abcabcabcabcz",
1942 "action": "create",
1943 "record": {
1944 "$type": "sh.tangled.feed.star",
1945 "createdAt": "2026-05-01T00:00:00Z",
1946 "subjectDid": "did:plc:abalone"
1947 }
1948 }
1949 }));
1950 handle_frame(
1951 frame,
1952 &store,
1953 &issue_states,
1954 &pull_statuses,
1955 &cov,
1956 &NoopSearchSink,
1957 &NoopRecordStore,
1958 &resolver,
1959 &sys_clock(),
1960 now(),
1961 )
1962 .await;
1963 assert!(cov.snapshot().is_ready());
1964 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(99));
1965 }
1966
1967 #[tokio::test]
1968 async fn live_but_stale_rev_does_not_promote() {
1969 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
1970 let stale_tid = Tid::from_time(1_000_000, 0);
1971 let frame: HydrantFrame = parse_frame(json!({
1972 "id": 7,
1973 "type": "record",
1974 "record": {
1975 "live": true,
1976 "did": "did:plc:olaren",
1977 "rev": stale_tid.as_str(),
1978 "collection": "sh.tangled.feed.star",
1979 "rkey": "abcabcabcabcz",
1980 "action": "create",
1981 "record": {
1982 "$type": "sh.tangled.feed.star",
1983 "createdAt": "2026-05-01T00:00:00Z",
1984 "subjectDid": "did:plc:abalone"
1985 }
1986 }
1987 }));
1988 handle_frame(
1989 frame,
1990 &store,
1991 &issue_states,
1992 &pull_statuses,
1993 &cov,
1994 &NoopSearchSink,
1995 &NoopRecordStore,
1996 &resolver,
1997 &sys_clock(),
1998 now(),
1999 )
2000 .await;
2001 assert!(!cov.snapshot().is_ready());
2002 assert!(matches!(cov.snapshot(), Coverage::Warming { .. }));
2003 }
2004
2005 #[test]
2006 fn classify_consumer_too_slow_frame_returns_typed_variant() {
2007 let text = r#"{"type":"error","error":"ConsumerTooSlow","message":"stream socket send blocked for at least 30 seconds"}"#;
2008 match classify_text_frame(text) {
2009 Err(IngestError::ConsumerTooSlow { message }) => {
2010 assert!(
2011 message.contains("30 seconds"),
2012 "message field preserved verbatim, got: {message}"
2013 );
2014 }
2015 other => panic!("expected ConsumerTooSlow variant, got: {other:?}"),
2016 }
2017 }
2018
2019 #[test]
2020 fn classify_unknown_hydrant_error_falls_back_to_generic_variant() {
2021 let text = r#"{"type":"error","error":"NewFutureCode","message":"some new failure mode"}"#;
2022 match classify_text_frame(text) {
2023 Err(IngestError::HydrantStream { code, message }) => {
2024 assert_eq!(code, "NewFutureCode");
2025 assert_eq!(message, "some new failure mode");
2026 }
2027 other => panic!("expected HydrantStream variant, got: {other:?}"),
2028 }
2029 }
2030
2031 #[test]
2032 fn classify_error_frame_without_message_uses_empty_string() {
2033 let text = r#"{"type":"error","error":"ConsumerTooSlow"}"#;
2034 match classify_text_frame(text) {
2035 Err(IngestError::ConsumerTooSlow { message }) => assert!(message.is_empty()),
2036 other => panic!("expected ConsumerTooSlow with empty message, got: {other:?}"),
2037 }
2038 }
2039
2040 #[test]
2041 fn classify_normal_record_frame_unchanged() {
2042 let text = r#"{"id":42,"type":"record"}"#;
2043 let frame = classify_text_frame(text).expect("normal record frame must parse");
2044 assert_eq!(frame.id, 42);
2045 assert_eq!(frame.kind, FrameKind::Record);
2046 }
2047
2048 #[test]
2049 fn classify_garbage_object_returns_decode_error() {
2050 let text = r#"{"random":"object","without":"required fields"}"#;
2051 match classify_text_frame(text) {
2052 Err(IngestError::Decode(_)) => {}
2053 other => panic!("expected Decode error, got: {other:?}"),
2054 }
2055 }
2056
2057 struct ScriptedWsStream {
2058 messages: std::collections::VecDeque<Result<WsMessage, NetworkError>>,
2059 }
2060
2061 impl WsStream for ScriptedWsStream {
2062 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> {
2063 let msg = self.messages.pop_front();
2064 Box::pin(async move { msg })
2065 }
2066 }
2067
2068 #[tokio::test]
2069 async fn reader_loop_surfaces_consumer_too_slow_from_error_frame() {
2070 let mut messages = std::collections::VecDeque::new();
2071 messages.push_back(Ok(WsMessage::Text(
2072 r#"{"type":"error","error":"ConsumerTooSlow","message":"stream delivery blocked"}"#
2073 .to_owned(),
2074 )));
2075 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages });
2076 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH);
2077 let (control_tx, _control_rx) =
2078 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2079 let cancel = CancellationToken::new();
2080
2081 let end = reader_loop(stream, frame_tx, control_tx, cancel).await;
2082
2083 match end.error {
2084 Some(IngestError::ConsumerTooSlow { message }) => {
2085 assert_eq!(message, "stream delivery blocked");
2086 }
2087 other => panic!("expected ConsumerTooSlow SessionEnd error, got: {other:?}"),
2088 }
2089 assert_eq!(end.outcome, SessionOutcome::Empty);
2090 }
2091
2092 #[tokio::test]
2093 async fn reader_loop_surfaces_unknown_hydrant_error_distinctly() {
2094 let mut messages = std::collections::VecDeque::new();
2095 messages.push_back(Ok(WsMessage::Text(
2096 r#"{"type":"error","error":"NewFutureCode","message":"new mode"}"#.to_owned(),
2097 )));
2098 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages });
2099 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH);
2100 let (control_tx, _control_rx) =
2101 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2102 let cancel = CancellationToken::new();
2103
2104 let end = reader_loop(stream, frame_tx, control_tx, cancel).await;
2105
2106 match end.error {
2107 Some(IngestError::HydrantStream { code, message }) => {
2108 assert_eq!(code, "NewFutureCode");
2109 assert_eq!(message, "new mode");
2110 }
2111 other => panic!("expected HydrantStream SessionEnd error, got: {other:?}"),
2112 }
2113 }
2114
2115 struct ChannelWsStream {
2116 rx: tokio::sync::mpsc::Receiver<Result<WsMessage, NetworkError>>,
2117 }
2118
2119 impl WsStream for ChannelWsStream {
2120 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> {
2121 Box::pin(async move { self.rx.recv().await })
2122 }
2123 }
2124
2125 fn star_frame_text(id: u64, rkey: &Rkey<DefaultStr>) -> String {
2126 json!({
2127 "id": id,
2128 "type": "record",
2129 "record": {
2130 "live": false,
2131 "did": "did:plc:olaren",
2132 "rev": fresh_tid().as_str(),
2133 "collection": "sh.tangled.feed.star",
2134 "rkey": rkey.as_ref(),
2135 "action": "create",
2136 "record": {
2137 "$type": "sh.tangled.feed.star",
2138 "createdAt": "2026-05-01T00:00:00Z",
2139 "subjectDid": "did:plc:abalone"
2140 }
2141 }
2142 })
2143 .to_string()
2144 }
2145
2146 #[tokio::test]
2147 async fn pong_forwards_promptly_when_frame_channel_is_full() {
2148 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8);
2149 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx });
2150 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1);
2151 let (control_tx, mut control_rx) =
2152 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2153 let cancel = CancellationToken::new();
2154
2155 let prefill: HydrantFrame = parse_frame(json!({
2156 "id": 0,
2157 "type": "record",
2158 "record": {
2159 "live": false,
2160 "did": "did:plc:olaren",
2161 "rev": fresh_tid().as_str(),
2162 "collection": "sh.tangled.feed.star",
2163 "rkey": "prefilrkey001",
2164 "action": "create",
2165 "record": {
2166 "$type": "sh.tangled.feed.star",
2167 "createdAt": "2026-05-01T00:00:00Z",
2168 "subjectDid": "did:plc:abalone"
2169 }
2170 }
2171 }));
2172 frame_tx
2173 .try_send(prefill)
2174 .expect("depth-1 frame channel must accept the prefill");
2175
2176 let reader_handle = tokio::spawn(reader_loop(
2177 stream,
2178 frame_tx.clone(),
2179 control_tx.clone(),
2180 cancel.clone(),
2181 ));
2182
2183 ws_tx
2184 .send(Ok(WsMessage::Text(star_frame_text(
2185 1,
2186 &rkey("starrkeyaa001"),
2187 ))))
2188 .await
2189 .unwrap();
2190 ws_tx.send(Ok(WsMessage::Pong(Bytes::new()))).await.unwrap();
2191
2192 let pong_event = tokio::time::timeout(Duration::from_millis(500), control_rx.recv())
2193 .await
2194 .expect("pong must surface inside 500ms even when frame_tx is saturated. A reader blocked on a frame send would never poll the next ws message")
2195 .expect("control_tx was not closed");
2196 assert!(
2197 matches!(pong_event, WsEvent::IncomingPong),
2198 "first control event must be the pong, not a held text",
2199 );
2200
2201 let too_slow =
2202 "{\"type\":\"error\",\"error\":\"ConsumerTooSlow\",\"message\":\"saturated\"}"
2203 .to_string();
2204 ws_tx.send(Ok(WsMessage::Text(too_slow))).await.unwrap();
2205
2206 let end = tokio::time::timeout(Duration::from_secs(1), reader_handle)
2207 .await
2208 .expect("reader must exit promptly once ConsumerTooSlow is read")
2209 .expect("reader task must not panic");
2210 match end.error {
2211 Some(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "saturated"),
2212 other => panic!("expected ConsumerTooSlow disconnect, got: {other:?}"),
2213 }
2214
2215 drop(frame_rx);
2216 }
2217
2218 #[tokio::test]
2219 async fn reader_drains_held_frames_once_processor_catches_up() {
2220 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8);
2221 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx });
2222 let (frame_tx, mut frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1);
2223 let (control_tx, _control_rx) =
2224 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH);
2225 let cancel = CancellationToken::new();
2226
2227 let prefill: HydrantFrame = parse_frame(json!({
2228 "id": 0,
2229 "type": "record",
2230 "record": {
2231 "live": false,
2232 "did": "did:plc:olaren",
2233 "rev": fresh_tid().as_str(),
2234 "collection": "sh.tangled.feed.star",
2235 "rkey": "prefilrkey001",
2236 "action": "create",
2237 "record": {
2238 "$type": "sh.tangled.feed.star",
2239 "createdAt": "2026-05-01T00:00:00Z",
2240 "subjectDid": "did:plc:abalone"
2241 }
2242 }
2243 }));
2244 frame_tx.try_send(prefill).unwrap();
2245
2246 let reader_handle = tokio::spawn(reader_loop(
2247 stream,
2248 frame_tx.clone(),
2249 control_tx,
2250 cancel.clone(),
2251 ));
2252
2253 ws_tx
2254 .send(Ok(WsMessage::Text(star_frame_text(
2255 1,
2256 &rkey("heldrkeyaa001"),
2257 ))))
2258 .await
2259 .unwrap();
2260 ws_tx
2261 .send(Ok(WsMessage::Text(star_frame_text(
2262 2,
2263 &rkey("heldrkeyaa002"),
2264 ))))
2265 .await
2266 .unwrap();
2267
2268 let _drained_prefill = frame_rx.recv().await.expect("prefilled frame drains");
2269 let first = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv())
2270 .await
2271 .expect("first held frame must reach frame_rx after slot opens")
2272 .expect("frame_tx still open");
2273 assert_eq!(first.id, 1);
2274 let second = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv())
2275 .await
2276 .expect("second held frame must reach frame_rx after slot opens")
2277 .expect("frame_tx still open");
2278 assert_eq!(second.id, 2);
2279
2280 cancel.cancel();
2281 let _ = tokio::time::timeout(Duration::from_secs(1), reader_handle)
2282 .await
2283 .expect("reader must stop after cancel");
2284 }
2285
2286 #[test]
2287 fn first_connect_uses_configured_start_cursor() {
2288 let start = HydrantCursor::new(42);
2289 assert_eq!(next_connect_cursor(Coverage::default(), start), start);
2290 }
2291
2292 #[test]
2293 fn reconnect_resumes_strictly_after_last_seen() {
2294 let snap = Coverage::default().advance(HydrantCursor::new(7));
2295 assert_eq!(
2296 next_connect_cursor(snap, HydrantCursor::new(0)),
2297 HydrantCursor::new(8),
2298 );
2299 }
2300
2301 #[test]
2302 fn reconnect_overrides_configured_start() {
2303 let snap = Coverage::default().advance(HydrantCursor::new(100));
2304 assert_eq!(
2305 next_connect_cursor(snap, HydrantCursor::new(50)),
2306 HydrantCursor::new(101),
2307 );
2308 }
2309
2310 #[test]
2311 fn first_connect_uses_start_even_when_first_frame_id_would_be_zero() {
2312 let start = HydrantCursor::new(7);
2313 let snap = Coverage::default();
2314 assert_eq!(snap.last_cursor(), HydrantCursor::new(0));
2315 assert_eq!(snap.events_processed(), 0);
2316 assert_eq!(next_connect_cursor(snap, start), start);
2317 }
2318
2319 #[test]
2320 fn reconnect_after_processing_id_zero_advances_to_one() {
2321 let snap = Coverage::default().advance(HydrantCursor::new(0));
2322 assert_eq!(snap.events_processed(), 1);
2323 assert_eq!(
2324 next_connect_cursor(snap, HydrantCursor::new(99)),
2325 HydrantCursor::new(1),
2326 "events_processed disambiguates 'never seen' from 'saw id 0'",
2327 );
2328 }
2329
2330 #[tokio::test]
2331 async fn identity_frame_advances_cursor_only() {
2332 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2333 let frame: HydrantFrame = parse_frame(json!({
2334 "id": 5,
2335 "type": "identity",
2336 "identity": {
2337 "did": "did:plc:olaren",
2338 "handle": "olaren.dev"
2339 }
2340 }));
2341 handle_frame(
2342 frame,
2343 &store,
2344 &issue_states,
2345 &pull_statuses,
2346 &cov,
2347 &NoopSearchSink,
2348 &NoopRecordStore,
2349 &resolver,
2350 &sys_clock(),
2351 now(),
2352 )
2353 .await;
2354 assert_eq!(store.key_count(), 0);
2355 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(5));
2356 assert!(!cov.snapshot().is_ready());
2357 }
2358
2359 #[tokio::test]
2360 async fn account_frame_advances_cursor_only() {
2361 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2362 let frame: HydrantFrame = parse_frame(json!({
2363 "id": 6,
2364 "type": "account",
2365 "account": {"did": "did:plc:olaren", "active": true}
2366 }));
2367 handle_frame(
2368 frame,
2369 &store,
2370 &issue_states,
2371 &pull_statuses,
2372 &cov,
2373 &NoopSearchSink,
2374 &NoopRecordStore,
2375 &resolver,
2376 &sys_clock(),
2377 now(),
2378 )
2379 .await;
2380 assert_eq!(store.key_count(), 0);
2381 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(6));
2382 }
2383
2384 #[tokio::test]
2385 async fn unknown_frame_kind_advances_cursor_without_panic() {
2386 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2387 let frame: HydrantFrame = parse_frame(json!({"id": 8, "type": "future_event"}));
2388 assert_eq!(frame.kind, FrameKind::Other);
2389 handle_frame(
2390 frame,
2391 &store,
2392 &issue_states,
2393 &pull_statuses,
2394 &cov,
2395 &NoopSearchSink,
2396 &NoopRecordStore,
2397 &resolver,
2398 &sys_clock(),
2399 now(),
2400 )
2401 .await;
2402 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(8));
2403 }
2404
2405 fn fresh_runtime(cancel: CancellationToken) -> IngestRuntime<NoopSearchSink> {
2406 IngestRuntime {
2407 store: Arc::new(EdgeStore::new(RuntimeHasher::default())),
2408 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())),
2409 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())),
2410 coverage: Arc::new(CoverageWatch::new()),
2411 search: Arc::new(NoopSearchSink),
2412 records: Arc::new(NoopRecordStore) as Arc<dyn RecordStore>,
2413 resolver: Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
2414 clock: Arc::new(SystemClock::new()),
2415 entropy: Arc::new(OsEntropy),
2416 ws: TungsteniteWs::shared(),
2417 cancel,
2418 disconnects: None,
2419 warming_shadow: None,
2420 warming_buffer: None,
2421 }
2422 }
2423
2424 #[tokio::test(start_paused = true)]
2425 async fn cancel_token_short_circuits_reconnect_sleep() {
2426 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap());
2427 let cancel = CancellationToken::new();
2428 let runtime = fresh_runtime(cancel.clone());
2429 let task = tokio::spawn(async move { run(cfg, runtime).await });
2430 tokio::time::advance(Duration::from_millis(10)).await;
2431 cancel.cancel();
2432 let outcome = tokio::time::timeout(Duration::from_secs(1), task)
2433 .await
2434 .expect("ingest must stop within timeout once cancel fires");
2435 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}");
2436 }
2437
2438 #[test]
2439 fn jittered_stays_within_one_quarter_of_base() {
2440 let base = Duration::from_secs(1);
2441 let cap = base + Duration::from_millis(250);
2442 let entropy = OsEntropy;
2443 (0..50).for_each(|_| {
2444 let j = jittered(base, &entropy);
2445 assert!(j >= base, "jitter must not undershoot");
2446 assert!(j <= cap, "jitter must not exceed +25%, got {:?}", j);
2447 });
2448 }
2449
2450 #[tokio::test]
2451 async fn star_after_observed_repo_keys_on_repo_did() {
2452 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2453 let repo: HydrantFrame = parse_frame(json!({
2454 "id": 1,
2455 "type": "record",
2456 "record": {
2457 "live": false,
2458 "did": "did:plc:nel",
2459 "rev": fresh_tid().as_str(),
2460 "collection": "sh.tangled.repo",
2461 "rkey": "abcabcabcabcz",
2462 "action": "create",
2463 "record": {
2464 "$type": "sh.tangled.repo",
2465 "createdAt": "2026-05-01T00:00:00Z",
2466 "knot": "oyster.cafe",
2467 "name": "abalone",
2468 "repoDid": "did:plc:abalone"
2469 }
2470 }
2471 }));
2472 handle_frame(
2473 repo,
2474 &store,
2475 &issue_states,
2476 &pull_statuses,
2477 &cov,
2478 &NoopSearchSink,
2479 &NoopRecordStore,
2480 &resolver,
2481 &sys_clock(),
2482 now(),
2483 )
2484 .await;
2485
2486 let star: HydrantFrame = parse_frame(json!({
2487 "id": 2,
2488 "type": "record",
2489 "record": {
2490 "live": false,
2491 "did": "did:plc:olaren",
2492 "rev": fresh_tid().as_str(),
2493 "collection": "sh.tangled.feed.star",
2494 "rkey": "abcabcabcabcz",
2495 "action": "create",
2496 "record": {
2497 "$type": "sh.tangled.feed.star",
2498 "createdAt": "2026-05-01T00:00:00Z",
2499 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2500 }
2501 }
2502 }));
2503 handle_frame(
2504 star,
2505 &store,
2506 &issue_states,
2507 &pull_statuses,
2508 &cov,
2509 &NoopSearchSink,
2510 &NoopRecordStore,
2511 &resolver,
2512 &sys_clock(),
2513 now(),
2514 )
2515 .await;
2516
2517 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap();
2518 let repo_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:abalone"));
2519 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel"));
2520 assert_eq!(
2521 store.count(&repo_keyed),
2522 1,
2523 "star should be keyed on repoDID once the repo is observed"
2524 );
2525 assert_eq!(
2526 store.count(&owner_keyed),
2527 0,
2528 "owner DID should not collect the edge"
2529 );
2530 }
2531
2532 #[tokio::test]
2533 async fn unresolvable_repo_subject_drops_edge() {
2534 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2535 let star: HydrantFrame = parse_frame(json!({
2536 "id": 1,
2537 "type": "record",
2538 "record": {
2539 "live": false,
2540 "did": "did:plc:olaren",
2541 "rev": fresh_tid().as_str(),
2542 "collection": "sh.tangled.feed.star",
2543 "rkey": "abcabcabcabcz",
2544 "action": "create",
2545 "record": {
2546 "$type": "sh.tangled.feed.star",
2547 "createdAt": "2026-05-01T00:00:00Z",
2548 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2549 }
2550 }
2551 }));
2552 handle_frame(
2553 star,
2554 &store,
2555 &issue_states,
2556 &pull_statuses,
2557 &cov,
2558 &NoopSearchSink,
2559 &NoopRecordStore,
2560 &resolver,
2561 &sys_clock(),
2562 now(),
2563 )
2564 .await;
2565
2566 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap();
2567 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:nel"));
2568 let uri_keyed = bobbin_types::ids::EdgeKey::new(
2569 nsid,
2570 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"),
2571 );
2572 assert_eq!(
2573 store.count(&owner_keyed),
2574 0,
2575 "must not silently misfile under the authoring DID",
2576 );
2577 assert_eq!(
2578 store.count(&uri_keyed),
2579 0,
2580 "unresolvable rkey-form subject must drop the edge; keeping it would index against a key that never matches bare-DID queries",
2581 );
2582 }
2583
2584 #[tokio::test]
2585 async fn repo_without_repo_did_drops_edge() {
2586 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2587 let repo: HydrantFrame = parse_frame(json!({
2588 "id": 1,
2589 "type": "record",
2590 "record": {
2591 "live": false,
2592 "did": "did:plc:nel",
2593 "rev": fresh_tid().as_str(),
2594 "collection": "sh.tangled.repo",
2595 "rkey": "abcabcabcabcz",
2596 "action": "create",
2597 "record": {
2598 "$type": "sh.tangled.repo",
2599 "createdAt": "2026-05-01T00:00:00Z",
2600 "knot": "oyster.cafe",
2601 "name": "abalone"
2602 }
2603 }
2604 }));
2605 handle_frame(
2606 repo,
2607 &store,
2608 &issue_states,
2609 &pull_statuses,
2610 &cov,
2611 &NoopSearchSink,
2612 &NoopRecordStore,
2613 &resolver,
2614 &sys_clock(),
2615 now(),
2616 )
2617 .await;
2618
2619 let star: HydrantFrame = parse_frame(json!({
2620 "id": 2,
2621 "type": "record",
2622 "record": {
2623 "live": false,
2624 "did": "did:plc:olaren",
2625 "rev": fresh_tid().as_str(),
2626 "collection": "sh.tangled.feed.star",
2627 "rkey": "abcabcabcabcz",
2628 "action": "create",
2629 "record": {
2630 "$type": "sh.tangled.feed.star",
2631 "createdAt": "2026-05-01T00:00:00Z",
2632 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2633 }
2634 }
2635 }));
2636 handle_frame(
2637 star,
2638 &store,
2639 &issue_states,
2640 &pull_statuses,
2641 &cov,
2642 &NoopSearchSink,
2643 &NoopRecordStore,
2644 &resolver,
2645 &sys_clock(),
2646 now(),
2647 )
2648 .await;
2649
2650 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap();
2651 let uri_keyed = bobbin_types::ids::EdgeKey::new(
2652 nsid.clone(),
2653 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"),
2654 );
2655 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel"));
2656 assert_eq!(
2657 store.count(&uri_keyed),
2658 0,
2659 "no canonical DID exists for a repo without repoDID, so the edge must be dropped",
2660 );
2661 assert_eq!(
2662 store.count(&owner_keyed),
2663 0,
2664 "the authoring DID is not the canonical repo identity",
2665 );
2666 }
2667
2668 #[tokio::test]
2669 async fn explicit_subject_did_skips_normalization() {
2670 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2671 let star: HydrantFrame = parse_frame(json!({
2672 "id": 1,
2673 "type": "record",
2674 "record": {
2675 "live": false,
2676 "did": "did:plc:olaren",
2677 "rev": fresh_tid().as_str(),
2678 "collection": "sh.tangled.feed.star",
2679 "rkey": "abcabcabcabcz",
2680 "action": "create",
2681 "record": {
2682 "$type": "sh.tangled.feed.star",
2683 "createdAt": "2026-05-01T00:00:00Z",
2684 "subjectDid": "did:plc:abalone"
2685 }
2686 }
2687 }));
2688 handle_frame(
2689 star,
2690 &store,
2691 &issue_states,
2692 &pull_statuses,
2693 &cov,
2694 &NoopSearchSink,
2695 &NoopRecordStore,
2696 &resolver,
2697 &sys_clock(),
2698 now(),
2699 )
2700 .await;
2701 let key = bobbin_types::ids::EdgeKey::new(
2702 Nsid::new_static("sh.tangled.feed.star").unwrap(),
2703 did_subj("did:plc:abalone"),
2704 );
2705 assert_eq!(store.count(&key), 1);
2706 }
2707
2708 #[tokio::test]
2709 async fn issue_with_repo_uri_resolves_to_repo_did() {
2710 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2711 resolver
2712 .observe(
2713 Did::new_owned("did:plc:nel").unwrap(),
2714 Rkey::new_owned("abcabcabcabcz").unwrap(),
2715 Some(Did::new_owned("did:plc:abalone").unwrap()),
2716 )
2717 .await;
2718 let issue: HydrantFrame = parse_frame(json!({
2719 "id": 1,
2720 "type": "record",
2721 "record": {
2722 "live": false,
2723 "did": "did:plc:olaren",
2724 "rev": fresh_tid().as_str(),
2725 "collection": "sh.tangled.repo.issue",
2726 "rkey": "abcabcabcabcz",
2727 "action": "create",
2728 "record": {
2729 "$type": "sh.tangled.repo.issue",
2730 "createdAt": "2026-05-01T00:00:00Z",
2731 "title": "bug",
2732 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2733 }
2734 }
2735 }));
2736 handle_frame(
2737 issue,
2738 &store,
2739 &issue_states,
2740 &pull_statuses,
2741 &cov,
2742 &NoopSearchSink,
2743 &NoopRecordStore,
2744 &resolver,
2745 &sys_clock(),
2746 now(),
2747 )
2748 .await;
2749 let key = bobbin_types::ids::EdgeKey::new(
2750 Nsid::new_static("sh.tangled.repo.issue").unwrap(),
2751 did_subj("did:plc:abalone"),
2752 );
2753 assert_eq!(store.count(&key), 1);
2754 }
2755
2756 #[derive(Default)]
2757 struct RecordingSearchSink {
2758 docs: tokio::sync::Mutex<Vec<bobbin_types::search::SearchDoc>>,
2759 }
2760
2761 impl SearchSink for RecordingSearchSink {
2762 async fn upsert(&self, doc: bobbin_types::search::SearchDoc) {
2763 self.docs.lock().await.push(doc);
2764 }
2765 async fn remove(&self, _uri: &AtUri<DefaultStr>) {}
2766 }
2767
2768 #[tokio::test]
2769 async fn search_index_hydrates_repo_did_via_resolver() {
2770 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2771 resolver
2772 .observe(
2773 Did::new_owned("did:plc:nel").unwrap(),
2774 Rkey::new_owned("abcabcabcabcz").unwrap(),
2775 Some(Did::new_owned("did:plc:abalone").unwrap()),
2776 )
2777 .await;
2778 let search = RecordingSearchSink::default();
2779 let issue: HydrantFrame = parse_frame(json!({
2780 "id": 1,
2781 "type": "record",
2782 "record": {
2783 "live": false,
2784 "did": "did:plc:olaren",
2785 "rev": fresh_tid().as_str(),
2786 "collection": "sh.tangled.repo.issue",
2787 "rkey": "abcabcabcabcz",
2788 "action": "create",
2789 "record": {
2790 "$type": "sh.tangled.repo.issue",
2791 "createdAt": "2026-05-01T00:00:00Z",
2792 "title": "bug",
2793 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"
2794 }
2795 }
2796 }));
2797 handle_frame(
2798 issue,
2799 &store,
2800 &issue_states,
2801 &pull_statuses,
2802 &cov,
2803 &search,
2804 &NoopRecordStore,
2805 &resolver,
2806 &sys_clock(),
2807 now(),
2808 )
2809 .await;
2810 let docs = search.docs.lock().await;
2811 assert_eq!(docs.len(), 1, "issue should produce one search doc");
2812 assert_eq!(
2813 docs[0].repo,
2814 Some(Did::new_owned("did:plc:abalone").unwrap()),
2815 "search doc repo field must be resolved from the observed repo, not left as None",
2816 );
2817 }
2818
2819 #[tokio::test]
2820 async fn delete_repo_record_evicts_resolver_cache() {
2821 let (store, issue_states, pull_statuses, cov, resolver) = fresh();
2822 let owner = Did::new_owned("did:plc:nel").unwrap();
2823 let rkey = Rkey::new_owned("abcabcabcabcz").unwrap();
2824 resolver
2825 .observe(
2826 owner.clone(),
2827 rkey.clone(),
2828 Some(Did::new_owned("did:plc:abalone").unwrap()),
2829 )
2830 .await;
2831 assert!(
2832 resolver.cached_resolution(&owner, &rkey).await.is_some(),
2833 "observe must seed the cache",
2834 );
2835 let delete: HydrantFrame = parse_frame(json!({
2836 "id": 1,
2837 "type": "record",
2838 "record": {
2839 "live": false,
2840 "did": owner.as_ref(),
2841 "rev": fresh_tid().as_str(),
2842 "collection": "sh.tangled.repo",
2843 "rkey": rkey.as_ref(),
2844 "action": "delete"
2845 }
2846 }));
2847 handle_frame(
2848 delete,
2849 &store,
2850 &issue_states,
2851 &pull_statuses,
2852 &cov,
2853 &NoopSearchSink,
2854 &NoopRecordStore,
2855 &resolver,
2856 &sys_clock(),
2857 now(),
2858 )
2859 .await;
2860 assert!(
2861 resolver.cached_resolution(&owner, &rkey).await.is_none(),
2862 "deleting the repo record must clear the resolver cache so future observes are not blocked by a stale Authoritative entry",
2863 );
2864 }
2865
2866 #[tokio::test]
2867 async fn cancel_short_circuits_a_hung_ws_connect() {
2868 let _listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2869 .await
2870 .expect("bind sink listener");
2871 let port = _listener.local_addr().expect("local addr").port();
2872 let cfg =
2873 IngestConfig::new(Url::parse(&format!("ws://127.0.0.1:{port}")).expect("hydrant url"));
2874 let cancel = CancellationToken::new();
2875 let runtime = fresh_runtime(cancel.clone());
2876 let task = tokio::spawn(async move { run(cfg, runtime).await });
2877 tokio::time::sleep(Duration::from_millis(100)).await;
2878 cancel.cancel();
2879 let outcome = tokio::time::timeout(Duration::from_secs(2), task)
2880 .await
2881 .expect("cancel must short-circuit the hung ws connect");
2882 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}");
2883 }
2884
2885 struct CloseOnConnectTransport {
2886 used: std::sync::Mutex<bool>,
2887 }
2888 impl bobbin_runtime::WsTransport for CloseOnConnectTransport {
2889 fn connect(&self, _url: Url) -> bobbin_runtime::WsConnectFuture {
2890 let mut used = self.used.lock().unwrap();
2891 if *used {
2892 return Box::pin(async move {
2893 Err(NetworkError::Connect("only one connect allowed".to_owned()))
2894 });
2895 }
2896 *used = true;
2897 Box::pin(async move {
2898 let mut q = std::collections::VecDeque::new();
2899 q.push_back(Ok(WsMessage::Close {
2900 code: 1000,
2901 reason: "bye".to_owned(),
2902 }));
2903 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages: q });
2904 struct NoopSink;
2905 impl bobbin_runtime::WsSink for NoopSink {
2906 fn send<'a>(&'a mut self, _m: WsMessage) -> bobbin_runtime::WsSendFuture<'a> {
2907 Box::pin(async move { Ok(()) })
2908 }
2909 }
2910 let sink: Box<dyn bobbin_runtime::WsSink> = Box::new(NoopSink);
2911 Ok(bobbin_runtime::WsConn { sink, stream })
2912 })
2913 }
2914 }
2915
2916 #[tokio::test]
2917 async fn run_session_returns_after_remote_close_when_outer_cancel_unfired() {
2918 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap());
2919 let cancel = CancellationToken::new();
2920 let mut runtime = fresh_runtime(cancel.clone());
2921 runtime.ws = Arc::new(CloseOnConnectTransport {
2922 used: std::sync::Mutex::new(false),
2923 });
2924
2925 let res = tokio::time::timeout(
2926 Duration::from_secs(3),
2927 run_session(&cfg, HydrantCursor::new(0), &runtime),
2928 )
2929 .await;
2930 assert!(
2931 res.is_ok(),
2932 "run_session must return after a remote Close even when outer cancel never fires - regression for a session-scoped task hanging on the parent token",
2933 );
2934 }
2935
2936 struct HangingSink;
2937 impl bobbin_runtime::WsSink for HangingSink {
2938 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> {
2939 Box::pin(std::future::pending())
2940 }
2941 }
2942
2943 #[tokio::test(start_paused = true)]
2944 async fn timed_send_surfaces_send_timeout_when_sink_pends_forever() {
2945 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(HangingSink);
2946 let task =
2947 tokio::spawn(async move { timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await });
2948 tokio::time::advance(SEND_TIMEOUT + Duration::from_secs(1)).await;
2949 let result = task.await.expect("task panicked");
2950 match result {
2951 Err(IngestError::SendTimeout(d)) => assert_eq!(d, SEND_TIMEOUT),
2952 other => panic!(
2953 "expected SendTimeout, got {other:?}. A bare ws_sink.send.await would hang forever on a half-dead socket and starve the writer's pong-deadline arm",
2954 ),
2955 }
2956 }
2957
2958 struct OkSink;
2959 impl bobbin_runtime::WsSink for OkSink {
2960 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> {
2961 Box::pin(async move { Ok(()) })
2962 }
2963 }
2964
2965 #[tokio::test]
2966 async fn timed_send_returns_ok_when_sink_succeeds_promptly() {
2967 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(OkSink);
2968 let result = timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await;
2969 assert!(matches!(result, Ok(())), "got {result:?}");
2970 }
2971
2972 #[test]
2973 fn classify_error_frame_with_id_dispatches_as_error_not_unknown_kind() {
2974 let text = r#"{"id":42,"type":"error","error":"ConsumerTooSlow","message":"slow"}"#;
2975 match classify_text_frame(text) {
2976 Err(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "slow"),
2977 other => panic!(
2978 "type=\"error\" must dispatch to the error path even when id is present, got: {other:?}"
2979 ),
2980 }
2981 }
2982
2983 #[tokio::test]
2984 async fn buffered_pipeline_preserves_cursor_order_under_resolve_latency_skew() {
2985 use bobbin_record_lru::RecordStore;
2986 use bobbin_types::record::RecordBody;
2987 use std::sync::Mutex;
2988
2989 let server = wiremock::MockServer::start().await;
2990 wiremock::Mock::given(wiremock::matchers::method("GET"))
2991 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
2992 .respond_with(
2993 wiremock::ResponseTemplate::new(404).set_delay(Duration::from_millis(150)),
2994 )
2995 .mount(&server)
2996 .await;
2997
2998 let client = bobbin_slingshot_client::SlingshotClient::with_default_http(
2999 Url::parse(&server.uri()).unwrap(),
3000 )
3001 .unwrap();
3002 let clock: Arc<dyn Clock> = Arc::new(SystemClock::new());
3003 let resolver = Arc::new(RepoIdResolver::with_slingshot(
3004 client,
3005 clock.clone(),
3006 RuntimeHasher::default(),
3007 ));
3008
3009 let owner = Did::new_owned("did:plc:nel").unwrap();
3010 let abalone = Did::new_owned("did:plc:abalone").unwrap();
3011 let fast_rkeys: [Rkey<DefaultStr>; 2] = [rkey("fastrkeyaa01"), rkey("fastrkeyaa02")];
3012 let slow_rkeys: [Rkey<DefaultStr>; 2] = [rkey("slowrkeyaa01"), rkey("slowrkeyaa02")];
3013 for r in &fast_rkeys {
3014 resolver
3015 .observe(owner.clone(), r.clone(), Some(abalone.clone()))
3016 .await;
3017 }
3018
3019 #[derive(Default)]
3020 struct Capturing {
3021 urls: Mutex<Vec<AtUri<DefaultStr>>>,
3022 }
3023 impl RecordStore for Capturing {
3024 fn get(&self, _uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> {
3025 None
3026 }
3027 fn put(&self, uri: AtUri<DefaultStr>, _body: Arc<RecordBody>) {
3028 self.urls.lock().unwrap().push(uri);
3029 }
3030 fn remove(&self, _uri: &AtUri<DefaultStr>) {}
3031 }
3032 let capturing = Arc::new(Capturing::default());
3033
3034 let runtime: IngestRuntime<NoopSearchSink> = IngestRuntime {
3035 store: Arc::new(EdgeStore::new(RuntimeHasher::default())),
3036 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())),
3037 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())),
3038 coverage: Arc::new(CoverageWatch::new()),
3039 search: Arc::new(NoopSearchSink),
3040 records: capturing.clone() as Arc<dyn RecordStore>,
3041 resolver,
3042 clock,
3043 entropy: Arc::new(OsEntropy),
3044 ws: TungsteniteWs::shared(),
3045 cancel: CancellationToken::new(),
3046 disconnects: None,
3047 warming_shadow: None,
3048 warming_buffer: None,
3049 };
3050
3051 let parallelism = 4usize;
3052 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(64);
3053 let pipeline_rt = runtime.clone();
3054 let pipeline = tokio::spawn(async move {
3055 let prep_rt = pipeline_rt.clone();
3056 let resolve_rt = pipeline_rt.clone();
3057 let commit_rt = pipeline_rt;
3058 ReceiverStream::new(frame_rx)
3059 .then(move |frame| prep_stage(frame, prep_rt.clone()))
3060 .map(move |staged| resolve_stage(staged, resolve_rt.clone()))
3061 .buffered(parallelism)
3062 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism))
3063 .await;
3064 });
3065
3066 let mk = |id: u64, idx: u64, repo_rkey: &Rkey<DefaultStr>| -> HydrantFrame {
3067 parse_frame(json!({
3068 "id": id,
3069 "type": "record",
3070 "record": {
3071 "live": false,
3072 "did": "did:plc:olaren",
3073 "rev": fresh_tid().as_str(),
3074 "collection": "sh.tangled.feed.star",
3075 "rkey": format!("starrkeya{idx:04}"),
3076 "action": "create",
3077 "cid": VALID_CID,
3078 "record": {
3079 "$type": "sh.tangled.feed.star",
3080 "createdAt": "2026-05-01T00:00:00Z",
3081 "subject": format!("at://did:plc:nel/sh.tangled.repo/{}", repo_rkey.as_ref()),
3082 }
3083 }
3084 }))
3085 };
3086
3087 frame_tx.send(mk(1, 1, &slow_rkeys[0])).await.unwrap();
3088 frame_tx.send(mk(2, 2, &fast_rkeys[0])).await.unwrap();
3089 frame_tx.send(mk(3, 3, &slow_rkeys[1])).await.unwrap();
3090 frame_tx.send(mk(4, 4, &fast_rkeys[1])).await.unwrap();
3091 drop(frame_tx);
3092
3093 pipeline.await.unwrap();
3094
3095 let captured = capturing.urls.lock().unwrap();
3096 let rkeys: Vec<String> = captured
3097 .iter()
3098 .filter_map(|uri| uri.rkey().map(|r| r.as_ref().to_owned()))
3099 .collect();
3100 assert_eq!(
3101 rkeys,
3102 vec![
3103 "starrkeya0001".to_owned(),
3104 "starrkeya0002".to_owned(),
3105 "starrkeya0003".to_owned(),
3106 "starrkeya0004".to_owned(),
3107 ],
3108 "buffered(N) must preserve cursor order even when resolves complete out of order, with ~150ms slow vs cache-hit fast as the latency skew here",
3109 );
3110 assert_eq!(runtime.coverage.snapshot().last_cursor().raw(), 4);
3111 }
3112}