Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1use std::collections::{HashSet, VecDeque}; 2use std::num::NonZeroUsize; 3use std::sync::Arc; 4use std::time::Duration; 5 6use bobbin_edge_index::{ 7 ApplyOutcome, Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, 8 PromotionSignal, PullStatusKind, StateIndex, apply_record_state, 9}; 10use bobbin_record_lru::RecordStore; 11use bobbin_resolver::{NormalizeRepoRefs, decode_canon_or_upgrade_bytes, synthesize_created_at}; 12use bobbin_runtime::{ 13 Clock, Entropy, NetworkError, RuntimeHasher, UnixMicros, WsConn, WsMessage, WsStream, 14 WsTransport, 15}; 16use bobbin_types::edges::{Edge, ExtractError, Record}; 17use bobbin_types::ids::{RepoIdent, SubjectRef}; 18use bobbin_types::record::RecordBody; 19use bobbin_types::search::{SearchSink, SearchableRecord}; 20use bytes::Bytes; 21use futures::StreamExt; 22use jacquard_common::DefaultStr; 23use jacquard_common::types::did::Did; 24use jacquard_common::types::ident::AtIdentifier; 25use jacquard_common::types::nsid::Nsid; 26use jacquard_common::types::recordkey::Rkey; 27use jacquard_common::types::string::{AtStrError, AtUri, Cid}; 28use thiserror::Error; 29use tokio::time::Instant; 30use tokio_stream::wrappers::ReceiverStream; 31use tokio_util::sync::CancellationToken; 32use tracing::{debug, info, warn}; 33use url::Url; 34 35mod frame; 36mod resolver; 37mod shadow; 38mod warming; 39use frame::HydrantStreamErrorFrame; 40pub use frame::{FrameKind, HydrantFrame, RecordAction, RecordFrame}; 41pub use resolver::{RepoIdResolver, Resolution}; 42pub use shadow::{WarmingShadowBuffer, WarmingShadowSnapshot}; 43pub use warming::{ParkedUpsert, WarmingBuffer, WarmingBufferSnapshot}; 44 45const TANGLED_PREFIX: &str = "sh.tangled."; 46const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(500); 47const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30); 48const PING_INTERVAL: Duration = Duration::from_secs(20); 49const PONG_TIMEOUT: Duration = Duration::from_secs(15); 50const READY_SKEW: Duration = Duration::from_secs(60); 51const FRAME_CHANNEL_DEPTH: usize = 256; 52const CONTROL_CHANNEL_DEPTH: usize = 16; 53const READER_HOLD_LIMIT: usize = 64; 54const SEND_TIMEOUT: Duration = Duration::from_secs(10); 55const NORMAL_CLOSE: u16 = 1000; 56const METRICS_DUMP_INTERVAL: Duration = Duration::from_secs(10); 57const WARMING_FLUSH_PARALLELISM: usize = 64; 58pub const DEFAULT_INGEST_PARALLELISM: NonZeroUsize = match NonZeroUsize::new(16) { 59 Some(n) => n, 60 None => unreachable!(), 61}; 62 63#[derive(Clone, Debug)] 64pub struct IngestConfig { 65 pub hydrant_base: Url, 66 pub start_cursor: HydrantCursor, 67 pub parallelism: NonZeroUsize, 68} 69 70impl IngestConfig { 71 pub fn new(hydrant_base: Url) -> Self { 72 Self { 73 hydrant_base, 74 start_cursor: HydrantCursor::new(0), 75 parallelism: DEFAULT_INGEST_PARALLELISM, 76 } 77 } 78 79 fn stream_url(&self, cursor: HydrantCursor) -> Result<Url, IngestError> { 80 let mut url = self.hydrant_base.clone(); 81 match url.scheme() { 82 "http" => url 83 .set_scheme("ws") 84 .map_err(|_| IngestError::Url("set ws scheme"))?, 85 "https" => url 86 .set_scheme("wss") 87 .map_err(|_| IngestError::Url("set wss scheme"))?, 88 "ws" | "wss" => {} 89 other => return Err(IngestError::UnknownScheme(other.to_owned())), 90 } 91 url.set_path("/stream"); 92 url.query_pairs_mut() 93 .clear() 94 .append_pair("cursor", &cursor.raw().to_string()); 95 Ok(url) 96 } 97} 98 99#[derive(Debug, Error)] 100pub enum IngestError { 101 #[error("invalid hydrant url: {0}")] 102 Url(&'static str), 103 #[error("unsupported url scheme: {0}")] 104 UnknownScheme(String), 105 #[error("network: {0}")] 106 Network(#[from] NetworkError), 107 #[error("frame decode: {0}")] 108 Decode(#[from] serde_json::Error), 109 #[error("invalid at-uri synthesized from frame: {0}")] 110 InvalidAtUri(#[from] AtStrError), 111 #[error("record extraction: {0}")] 112 Extract(#[from] ExtractError), 113 #[error("hydrant did not respond to ping within {0:?}")] 114 PongTimeout(Duration), 115 #[error("websocket send blocked for at least {0:?}, treating link as dead")] 116 SendTimeout(Duration), 117 #[error("hydrant disconnected because bobbin's stream consumer fell behind: {message}")] 118 ConsumerTooSlow { message: String }, 119 #[error("hydrant signaled stream error {code}: {message}")] 120 HydrantStream { code: String, message: String }, 121} 122 123#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] 124pub enum DisconnectKind { 125 Url, 126 UnknownScheme, 127 Network, 128 Decode, 129 InvalidAtUri, 130 Extract, 131 PongTimeout, 132 SendTimeout, 133 ConsumerTooSlow, 134 HydrantStream, 135} 136 137impl DisconnectKind { 138 pub fn from_error(err: &IngestError) -> Self { 139 match err { 140 IngestError::Url(_) => Self::Url, 141 IngestError::UnknownScheme(_) => Self::UnknownScheme, 142 IngestError::Network(_) => Self::Network, 143 IngestError::Decode(_) => Self::Decode, 144 IngestError::InvalidAtUri(_) => Self::InvalidAtUri, 145 IngestError::Extract(_) => Self::Extract, 146 IngestError::PongTimeout(_) => Self::PongTimeout, 147 IngestError::SendTimeout(_) => Self::SendTimeout, 148 IngestError::ConsumerTooSlow { .. } => Self::ConsumerTooSlow, 149 IngestError::HydrantStream { .. } => Self::HydrantStream, 150 } 151 } 152} 153 154#[derive(Clone, Debug, Eq, PartialEq)] 155pub struct DisconnectSnapshot { 156 pub kind: DisconnectKind, 157 pub message: String, 158 pub at_unix_micros: UnixMicros, 159 pub last_cursor: HydrantCursor, 160} 161 162#[derive(Default)] 163pub struct DisconnectSink { 164 last: std::sync::Mutex<Option<DisconnectSnapshot>>, 165 count: std::sync::atomic::AtomicU64, 166} 167 168impl DisconnectSink { 169 pub fn new() -> Self { 170 Self::default() 171 } 172 173 pub fn record(&self, snap: DisconnectSnapshot) { 174 *self.last.lock().expect("disconnect sink mutex poisoned") = Some(snap); 175 self.count 176 .fetch_add(1, std::sync::atomic::Ordering::Relaxed); 177 } 178 179 pub fn snapshot(&self) -> Option<DisconnectSnapshot> { 180 self.last 181 .lock() 182 .expect("disconnect sink mutex poisoned") 183 .clone() 184 } 185 186 pub fn count(&self) -> u64 { 187 self.count.load(std::sync::atomic::Ordering::Relaxed) 188 } 189} 190 191#[derive(Clone, Copy, Debug, Eq, PartialEq)] 192enum SessionOutcome { 193 Progressed, 194 Empty, 195} 196 197#[derive(Debug)] 198struct SessionEnd { 199 outcome: SessionOutcome, 200 error: Option<IngestError>, 201} 202 203pub struct IngestRuntime<S: SearchSink + 'static> { 204 pub store: Arc<EdgeStore>, 205 pub issue_states: Arc<StateIndex<IssueStateKind>>, 206 pub pull_statuses: Arc<StateIndex<PullStatusKind>>, 207 pub coverage: Arc<CoverageWatch>, 208 pub search: Arc<S>, 209 pub records: Arc<dyn RecordStore>, 210 pub resolver: Arc<RepoIdResolver>, 211 pub clock: Arc<dyn Clock>, 212 pub entropy: Arc<dyn Entropy>, 213 pub ws: Arc<dyn WsTransport>, 214 pub cancel: CancellationToken, 215 pub disconnects: Option<Arc<DisconnectSink>>, 216 pub warming_shadow: Option<Arc<WarmingShadowBuffer>>, 217 pub warming_buffer: Option<Arc<WarmingBuffer>>, 218} 219 220impl<S: SearchSink + 'static> Clone for IngestRuntime<S> { 221 fn clone(&self) -> Self { 222 Self { 223 store: self.store.clone(), 224 issue_states: self.issue_states.clone(), 225 pull_statuses: self.pull_statuses.clone(), 226 coverage: self.coverage.clone(), 227 search: self.search.clone(), 228 records: self.records.clone(), 229 resolver: self.resolver.clone(), 230 clock: self.clock.clone(), 231 entropy: self.entropy.clone(), 232 ws: self.ws.clone(), 233 cancel: self.cancel.clone(), 234 disconnects: self.disconnects.clone(), 235 warming_shadow: self.warming_shadow.clone(), 236 warming_buffer: self.warming_buffer.clone(), 237 } 238 } 239} 240 241impl<S: SearchSink + 'static> IngestRuntime<S> { 242 fn pipeline_ctx(&self) -> PipelineCtx<'_, S> { 243 PipelineCtx { 244 resolver: &self.resolver, 245 store: &self.store, 246 issue_states: &self.issue_states, 247 pull_statuses: &self.pull_statuses, 248 coverage: &self.coverage, 249 records: &*self.records, 250 search: &self.search, 251 shadow: self.warming_shadow.as_deref(), 252 buffer: self.warming_buffer.as_deref(), 253 } 254 } 255} 256 257struct PipelineCtx<'a, S: SearchSink + 'static> { 258 resolver: &'a RepoIdResolver, 259 store: &'a EdgeStore, 260 issue_states: &'a StateIndex<IssueStateKind>, 261 pull_statuses: &'a StateIndex<PullStatusKind>, 262 coverage: &'a CoverageWatch, 263 records: &'a dyn RecordStore, 264 search: &'a S, 265 shadow: Option<&'a WarmingShadowBuffer>, 266 buffer: Option<&'a WarmingBuffer>, 267} 268 269pub async fn run<S: SearchSink + 'static>( 270 config: IngestConfig, 271 runtime: IngestRuntime<S>, 272) -> Result<(), IngestError> { 273 let metrics_dumper = spawn_metrics_dumper(&runtime); 274 let idle_promoter = spawn_idle_promoter(&runtime); 275 let warming_flusher = spawn_warming_flusher(&runtime); 276 let result = run_inner(config, &runtime).await; 277 if let Err(join) = metrics_dumper.await { 278 warn!(?join, "metrics dumper task panicked"); 279 } 280 if let Err(join) = idle_promoter.await { 281 warn!(?join, "idle promoter task panicked"); 282 } 283 if let Some(handle) = warming_flusher 284 && let Err(join) = handle.await 285 { 286 warn!(?join, "warming flusher task panicked"); 287 } 288 result 289} 290 291async fn run_inner<S: SearchSink + 'static>( 292 config: IngestConfig, 293 runtime: &IngestRuntime<S>, 294) -> Result<(), IngestError> { 295 let mut backoff = RECONNECT_INITIAL_DELAY; 296 loop { 297 let cursor = next_connect_cursor(runtime.coverage.snapshot(), config.start_cursor); 298 let SessionEnd { outcome, error } = run_session(&config, cursor, runtime).await; 299 if runtime.cancel.is_cancelled() { 300 info!( 301 last_cursor = runtime.coverage.snapshot().last_cursor().raw(), 302 "ingest stopped after shutdown signal" 303 ); 304 return Ok(()); 305 } 306 match (outcome, &error) { 307 (SessionOutcome::Progressed, None) => { 308 info!("hydrant stream closed after delivering frames, reconnecting") 309 } 310 (SessionOutcome::Empty, None) => { 311 warn!("hydrant stream closed without delivering frames") 312 } 313 (_, Some(err)) => warn!(?err, "hydrant stream errored"), 314 } 315 if let (Some(sink), Some(err)) = (runtime.disconnects.as_ref(), error.as_ref()) { 316 sink.record(DisconnectSnapshot { 317 kind: DisconnectKind::from_error(err), 318 message: err.to_string(), 319 at_unix_micros: runtime.clock.now_unix_micros(), 320 last_cursor: runtime.coverage.snapshot().last_cursor(), 321 }); 322 } 323 let made_progress = matches!(outcome, SessionOutcome::Progressed); 324 if made_progress { 325 backoff = RECONNECT_INITIAL_DELAY; 326 } else { 327 tokio::select! { 328 biased; 329 _ = runtime.cancel.cancelled() => return Ok(()), 330 _ = runtime.clock.sleep(jittered(backoff, &*runtime.entropy)) => {} 331 } 332 backoff = (backoff * 2).min(RECONNECT_MAX_DELAY); 333 } 334 } 335} 336 337fn spawn_warming_flusher<S: SearchSink + 'static>( 338 runtime: &IngestRuntime<S>, 339) -> Option<tokio::task::JoinHandle<()>> { 340 runtime.warming_buffer.as_ref()?; 341 let rt = runtime.clone(); 342 Some(tokio::spawn(async move { 343 let buffer = rt 344 .warming_buffer 345 .as_deref() 346 .expect("warming flusher only spawns when buffer is set"); 347 let mut rx = rt.coverage.subscribe(); 348 let reason = loop { 349 if rx.borrow_and_update().is_ready() { 350 break FlushReason::Ready; 351 } 352 tokio::select! { 353 biased; 354 _ = rt.cancel.cancelled() => break FlushReason::Cancelled, 355 res = rx.changed() => match res { 356 Ok(()) => continue, 357 Err(_) => break FlushReason::CoverageDropped, 358 }, 359 } 360 }; 361 flush_warming_buffer(&rt, buffer, reason).await; 362 })) 363} 364 365#[derive(Clone, Copy, Debug, Eq, PartialEq)] 366enum FlushReason { 367 Ready, 368 Cancelled, 369 CoverageDropped, 370} 371 372impl FlushReason { 373 fn as_str(self) -> &'static str { 374 match self { 375 FlushReason::Ready => "ready", 376 FlushReason::Cancelled => "cancelled", 377 FlushReason::CoverageDropped => "coverage_dropped", 378 } 379 } 380} 381 382async fn flush_warming_buffer<S: SearchSink + 'static>( 383 runtime: &IngestRuntime<S>, 384 buffer: &WarmingBuffer, 385 reason: FlushReason, 386) { 387 let drained = buffer.drain_for_promote().await; 388 if drained.is_empty() { 389 return; 390 } 391 if reason != FlushReason::Ready { 392 info!( 393 target: "bobbin_ingest::warming", 394 abandoned_entries = drained.len(), 395 reason = reason.as_str(), 396 "abandoning parked items on non-ready flush", 397 ); 398 return; 399 } 400 let hasher = buffer.hasher().clone(); 401 let unique: HashSet<RepoIdent, RuntimeHasher> = drained 402 .iter() 403 .flat_map(|(_, deps)| deps.iter().cloned()) 404 .fold(HashSet::with_hasher(hasher), |mut acc, dep| { 405 acc.insert(dep); 406 acc 407 }); 408 if !unique.is_empty() { 409 let resolver = runtime.resolver.clone(); 410 let _: Vec<()> = futures::stream::iter(unique) 411 .map(|key| { 412 let resolver = resolver.clone(); 413 async move { 414 let _ = resolver.resolve(&key.owner, &key.rkey).await; 415 } 416 }) 417 .buffer_unordered(WARMING_FLUSH_PARALLELISM) 418 .collect() 419 .await; 420 } 421 let upserts: Vec<ParkedUpsert> = drained.into_iter().map(|(u, _)| u).collect(); 422 let count = upserts.len(); 423 let ctx = runtime.pipeline_ctx(); 424 finalize_drained(&ctx, upserts).await; 425 info!( 426 target: "bobbin_ingest::warming", 427 flushed_entries = count, 428 reason = reason.as_str(), 429 "drained warming buffer", 430 ); 431} 432 433const IDLE_PROMOTE_WINDOW: Duration = Duration::from_secs(15); 434const IDLE_PROMOTE_MIN_EVENTS: u64 = 256; 435 436fn spawn_idle_promoter<S: SearchSink + 'static>( 437 runtime: &IngestRuntime<S>, 438) -> tokio::task::JoinHandle<()> { 439 let rt = runtime.clone(); 440 tokio::spawn(async move { 441 let mut prev = rt.coverage.snapshot().events_processed(); 442 loop { 443 tokio::select! { 444 biased; 445 _ = rt.cancel.cancelled() => return, 446 _ = rt.clock.sleep(IDLE_PROMOTE_WINDOW) => {} 447 } 448 let snap = rt.coverage.snapshot(); 449 if snap.is_ready() { 450 return; 451 } 452 let processed = snap.events_processed(); 453 if processed >= IDLE_PROMOTE_MIN_EVENTS && processed == prev { 454 rt.coverage.update(|c| c.force_ready()); 455 info!( 456 target: "bobbin_ingest::coverage", 457 events_processed = processed, 458 last_cursor = snap.last_cursor().raw(), 459 "stream idle, promoting coverage to ready", 460 ); 461 return; 462 } 463 prev = processed; 464 } 465 }) 466} 467 468fn spawn_metrics_dumper<S: SearchSink + 'static>( 469 runtime: &IngestRuntime<S>, 470) -> tokio::task::JoinHandle<()> { 471 let rt = runtime.clone(); 472 tokio::spawn(async move { 473 loop { 474 tokio::select! { 475 biased; 476 _ = rt.cancel.cancelled() => break, 477 _ = rt.clock.sleep(METRICS_DUMP_INTERVAL) => { 478 let s = rt.resolver.stats(); 479 info!( 480 target: "bobbin_ingest::metrics", 481 resolver_hits = s.hits, 482 resolver_misses_mapped = s.misses_mapped, 483 resolver_misses_no_repo_did = s.misses_no_repo_did, 484 resolver_misses_unresolvable = s.misses_unresolvable, 485 resolver_misses_transient = s.misses_transient, 486 resolver_misses_no_client = s.misses_no_client, 487 resolver_miss_latency_micros_avg = s.miss_latency_micros_avg().unwrap_or(0), 488 resolver_miss_latency_micros_max = s.miss_latency_micros_max, 489 resolver_total = s.total(), 490 "resolver stats", 491 ); 492 } 493 } 494 } 495 }) 496} 497 498fn next_connect_cursor(snapshot: Coverage, start: HydrantCursor) -> HydrantCursor { 499 if snapshot.events_processed() == 0 { 500 start 501 } else { 502 HydrantCursor::new(snapshot.last_cursor().raw().saturating_add(1)) 503 } 504} 505 506fn jittered(base: Duration, entropy: &dyn Entropy) -> Duration { 507 let base_ms = u64::try_from(base.as_millis()).unwrap_or(u64::MAX); 508 let cap_ms = (base_ms / 4).max(1); 509 base + Duration::from_millis(entropy.next_u64() % cap_ms) 510} 511 512async fn run_session<S: SearchSink + 'static>( 513 config: &IngestConfig, 514 cursor: HydrantCursor, 515 runtime: &IngestRuntime<S>, 516) -> SessionEnd { 517 let url = match config.stream_url(cursor) { 518 Ok(u) => u, 519 Err(e) => { 520 return SessionEnd { 521 outcome: SessionOutcome::Empty, 522 error: Some(e), 523 }; 524 } 525 }; 526 info!(%url, "connecting to hydrant /stream"); 527 let connect = tokio::select! { 528 biased; 529 _ = runtime.cancel.cancelled() => { 530 return SessionEnd { outcome: SessionOutcome::Empty, error: None }; 531 } 532 res = runtime.ws.connect(url) => res, 533 }; 534 let WsConn { 535 sink: mut ws_sink, 536 stream: ws_stream, 537 } = match connect { 538 Ok(c) => c, 539 Err(e) => { 540 return SessionEnd { 541 outcome: SessionOutcome::Empty, 542 error: Some(IngestError::Network(e)), 543 }; 544 } 545 }; 546 547 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH); 548 let parallelism = config.parallelism.get(); 549 let processor_runtime = runtime.clone(); 550 let processor = tokio::spawn(async move { 551 let cancel = processor_runtime.cancel.clone(); 552 let prep_rt = processor_runtime.clone(); 553 let resolve_rt = processor_runtime.clone(); 554 let commit_rt = processor_runtime; 555 556 let pipeline = ReceiverStream::new(frame_rx) 557 .map(move |frame| prep_stage(frame, prep_rt.clone())) 558 .buffered(parallelism) 559 .map(move |staged| resolve_stage(staged, resolve_rt.clone())) 560 .buffered(parallelism) 561 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism)); 562 563 tokio::select! { 564 biased; 565 _ = cancel.cancelled() => {}, 566 _ = pipeline => {}, 567 } 568 }); 569 570 let (control_tx, mut control_rx) = tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 571 let session_cancel = runtime.cancel.child_token(); 572 let reader_cancel = session_cancel.clone(); 573 let reader = tokio::spawn(reader_loop(ws_stream, frame_tx, control_tx, reader_cancel)); 574 575 let mut next_ping = runtime.clock.now_instant() + PING_INTERVAL; 576 let mut pong_deadline: Option<Instant> = None; 577 578 let writer_error: Option<IngestError> = loop { 579 tokio::select! { 580 biased; 581 _ = runtime.cancel.cancelled() => { 582 let _ = timed_send( 583 &mut ws_sink, 584 WsMessage::Close { code: NORMAL_CLOSE, reason: "bobbin shutdown".to_owned() }, 585 ).await; 586 break None; 587 } 588 _ = runtime.clock.sleep_until(next_ping) => { 589 next_ping = runtime.clock.now_instant() + PING_INTERVAL; 590 if pong_deadline.is_none() { 591 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Ping(Bytes::new())).await { 592 break Some(e); 593 } 594 pong_deadline = Some(runtime.clock.now_instant() + PONG_TIMEOUT); 595 } 596 } 597 _ = wait_until(pong_deadline, runtime.clock.as_ref()) => { 598 break Some(IngestError::PongTimeout(PONG_TIMEOUT)); 599 } 600 evt = control_rx.recv() => { 601 let Some(evt) = evt else { break None; }; 602 match evt { 603 WsEvent::IncomingPing(payload) => { 604 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Pong(payload)).await { 605 break Some(e); 606 } 607 } 608 WsEvent::IncomingPong => { 609 pong_deadline = None; 610 } 611 } 612 } 613 } 614 }; 615 616 session_cancel.cancel(); 617 drop(ws_sink); 618 drop(control_rx); 619 620 let reader_end = reader.await.unwrap_or(SessionEnd { 621 outcome: SessionOutcome::Empty, 622 error: None, 623 }); 624 if let Err(join) = processor.await { 625 warn!(?join, "frame processor task panicked"); 626 } 627 SessionEnd { 628 outcome: reader_end.outcome, 629 error: writer_error.or(reader_end.error), 630 } 631} 632 633async fn timed_send( 634 sink: &mut Box<dyn bobbin_runtime::WsSink>, 635 msg: WsMessage, 636) -> Result<(), IngestError> { 637 match tokio::time::timeout(SEND_TIMEOUT, sink.send(msg)).await { 638 Ok(Ok(())) => Ok(()), 639 Ok(Err(e)) => Err(IngestError::Network(e)), 640 Err(_elapsed) => Err(IngestError::SendTimeout(SEND_TIMEOUT)), 641 } 642} 643 644async fn reader_loop( 645 mut ws_stream: Box<dyn WsStream>, 646 frame_tx: tokio::sync::mpsc::Sender<HydrantFrame>, 647 control_tx: tokio::sync::mpsc::Sender<WsEvent>, 648 cancel: CancellationToken, 649) -> SessionEnd { 650 let mut outcome = SessionOutcome::Empty; 651 let mut held: VecDeque<HydrantFrame> = VecDeque::new(); 652 653 let error: Option<IngestError> = loop { 654 let step = if held.is_empty() { 655 tokio::select! { 656 biased; 657 _ = cancel.cancelled() => ReaderStep::Cancelled, 658 msg = ws_stream.next() => ReaderStep::WsMessage(msg), 659 } 660 } else if held.len() < READER_HOLD_LIMIT { 661 tokio::select! { 662 biased; 663 _ = cancel.cancelled() => ReaderStep::Cancelled, 664 permit_result = frame_tx.reserve() => match permit_result { 665 Ok(permit) => { 666 let frame = held 667 .pop_front() 668 .expect("non-empty held when reserve succeeds"); 669 permit.send(frame); 670 ReaderStep::Sent 671 } 672 Err(_) => ReaderStep::FrameSinkClosed, 673 }, 674 msg = ws_stream.next() => ReaderStep::WsMessage(msg), 675 } 676 } else { 677 tokio::select! { 678 biased; 679 _ = cancel.cancelled() => ReaderStep::Cancelled, 680 permit_result = frame_tx.reserve() => match permit_result { 681 Ok(permit) => { 682 let frame = held 683 .pop_front() 684 .expect("held at limit when reserve succeeds"); 685 permit.send(frame); 686 ReaderStep::Sent 687 } 688 Err(_) => ReaderStep::FrameSinkClosed, 689 }, 690 } 691 }; 692 693 match step { 694 ReaderStep::Cancelled => break None, 695 ReaderStep::FrameSinkClosed => break None, 696 ReaderStep::Sent => { 697 outcome = SessionOutcome::Progressed; 698 } 699 ReaderStep::WsMessage(msg) => { 700 let Some(msg) = msg else { 701 break None; 702 }; 703 let parsed = match msg { 704 Ok(m) => m, 705 Err(e) => break Some(IngestError::Network(e)), 706 }; 707 match parsed { 708 WsMessage::Text(text) => { 709 let frame = match classify_text_frame(&text) { 710 Ok(f) => f, 711 Err(e) => break Some(e), 712 }; 713 held.push_back(frame); 714 } 715 WsMessage::Binary(_) => { 716 debug!("hydrant sent unexpected binary frame, ignoring"); 717 } 718 WsMessage::Ping(payload) => { 719 if control_tx 720 .send(WsEvent::IncomingPing(payload)) 721 .await 722 .is_err() 723 { 724 break None; 725 } 726 } 727 WsMessage::Pong(_) => { 728 if control_tx.send(WsEvent::IncomingPong).await.is_err() { 729 break None; 730 } 731 } 732 WsMessage::Close { code, reason } => { 733 debug!(code, %reason, "hydrant closed stream"); 734 break None; 735 } 736 } 737 } 738 } 739 }; 740 SessionEnd { outcome, error } 741} 742 743enum ReaderStep { 744 Cancelled, 745 FrameSinkClosed, 746 Sent, 747 WsMessage(Option<Result<WsMessage, NetworkError>>), 748} 749 750fn classify_text_frame(text: &str) -> Result<HydrantFrame, IngestError> { 751 #[derive(serde::Deserialize)] 752 struct PeekType<'a> { 753 #[serde(rename = "type", borrow)] 754 kind: Option<std::borrow::Cow<'a, str>>, 755 } 756 let is_error_frame = serde_json::from_str::<PeekType>(text) 757 .ok() 758 .and_then(|p| p.kind) 759 .as_deref() 760 == Some("error"); 761 if is_error_frame { 762 return match serde_json::from_str::<HydrantStreamErrorFrame>(text) { 763 Ok(err_frame) => Err(classify_hydrant_error(err_frame)), 764 Err(decode_err) => Err(IngestError::Decode(decode_err)), 765 }; 766 } 767 serde_json::from_str::<HydrantFrame>(text).map_err(IngestError::Decode) 768} 769 770fn classify_hydrant_error(frame: HydrantStreamErrorFrame) -> IngestError { 771 let HydrantStreamErrorFrame { error, message } = frame; 772 let message = message.unwrap_or_default(); 773 match error.as_str() { 774 "ConsumerTooSlow" => IngestError::ConsumerTooSlow { message }, 775 _ => IngestError::HydrantStream { 776 code: error, 777 message, 778 }, 779 } 780} 781 782#[derive(Debug)] 783enum WsEvent { 784 IncomingPing(Bytes), 785 IncomingPong, 786} 787 788async fn wait_until(deadline: Option<Instant>, clock: &dyn Clock) { 789 match deadline { 790 Some(d) => clock.sleep_until(d).await, 791 None => std::future::pending::<()>().await, 792 } 793} 794 795#[derive(Clone, Copy, Debug, Eq, PartialEq)] 796enum Regime { 797 Replay, 798 Live, 799 NonRecord, 800} 801 802impl Regime { 803 fn as_str(self) -> &'static str { 804 match self { 805 Self::Replay => "replay", 806 Self::Live => "live", 807 Self::NonRecord => "non_record", 808 } 809 } 810} 811 812struct Pending { 813 cursor: HydrantCursor, 814 signal: PromotionSignal, 815 regime: Regime, 816 op: PendingOp, 817} 818 819enum PendingOp { 820 Noop, 821 ClearCache { 822 source: AtUri<DefaultStr>, 823 }, 824 Upsert { 825 source: AtUri<DefaultStr>, 826 nsid: Nsid<DefaultStr>, 827 parsed: Box<Record>, 828 bytes: Bytes, 829 cid: Option<Cid<DefaultStr>>, 830 edges: Vec<Edge>, 831 }, 832 Parked { 833 nsid: Nsid<DefaultStr>, 834 }, 835 Delete { 836 source: AtUri<DefaultStr>, 837 nsid: Nsid<DefaultStr>, 838 }, 839} 840 841struct Prepared { 842 pending: Pending, 843 prepare_start: Instant, 844 prepare_end: Instant, 845} 846 847struct Resolved { 848 pending: Pending, 849 prepare_start: Instant, 850 prepare_end: Instant, 851 resolve_start: Instant, 852 resolve_end: Instant, 853} 854 855fn pending_nsid(op: &PendingOp) -> Option<&Nsid<DefaultStr>> { 856 match op { 857 PendingOp::Upsert { nsid, .. } => Some(nsid), 858 PendingOp::Delete { nsid, .. } => Some(nsid), 859 PendingOp::Parked { nsid, .. } => Some(nsid), 860 PendingOp::Noop | PendingOp::ClearCache { .. } => None, 861 } 862} 863 864fn pending_edge_count(op: &PendingOp) -> u64 { 865 match op { 866 PendingOp::Upsert { edges, .. } => edges.len() as u64, 867 _ => 0, 868 } 869} 870 871async fn prep_stage<S: SearchSink + 'static>( 872 frame: HydrantFrame, 873 rt: IngestRuntime<S>, 874) -> Prepared { 875 let now = rt.clock.now_unix_micros(); 876 let prepare_start = rt.clock.now_instant(); 877 let ctx = rt.pipeline_ctx(); 878 let pending = prepare_frame(frame, &ctx, now).await; 879 let prepare_end = rt.clock.now_instant(); 880 Prepared { 881 pending, 882 prepare_start, 883 prepare_end, 884 } 885} 886 887async fn resolve_stage<S: SearchSink + 'static>( 888 staged: Prepared, 889 rt: IngestRuntime<S>, 890) -> Resolved { 891 let resolve_start = rt.clock.now_instant(); 892 let ctx = rt.pipeline_ctx(); 893 let pending = resolve_pending(staged.pending, &ctx).await; 894 let resolve_end = rt.clock.now_instant(); 895 Resolved { 896 pending, 897 prepare_start: staged.prepare_start, 898 prepare_end: staged.prepare_end, 899 resolve_start, 900 resolve_end, 901 } 902} 903 904async fn commit_stage<S: SearchSink + 'static>( 905 staged: Resolved, 906 rt: IngestRuntime<S>, 907 parallelism: usize, 908) { 909 let nsid = pending_nsid(&staged.pending.op).cloned(); 910 let edge_count = pending_edge_count(&staged.pending.op); 911 let regime = staged.pending.regime; 912 let cursor = staged.pending.cursor.raw(); 913 let Resolved { 914 pending, 915 prepare_start, 916 prepare_end, 917 resolve_start, 918 resolve_end, 919 } = staged; 920 let commit_start = rt.clock.now_instant(); 921 commit_pending( 922 pending, 923 &rt.store, 924 &rt.issue_states, 925 &rt.pull_statuses, 926 &rt.coverage, 927 &*rt.search, 928 &*rt.records, 929 &rt.resolver, 930 ) 931 .await; 932 let commit_end = rt.clock.now_instant(); 933 tracing::trace!( 934 target: "bobbin_ingest::stage", 935 cursor, 936 regime = regime.as_str(), 937 nsid = %nsid.as_ref().map(Nsid::as_str).unwrap_or(""), 938 edge_count, 939 prepare_us = prepare_end.duration_since(prepare_start).as_micros() as u64, 940 queue_resolve_wait_us = resolve_start.duration_since(prepare_end).as_micros() as u64, 941 resolve_us = resolve_end.duration_since(resolve_start).as_micros() as u64, 942 queue_commit_wait_us = commit_start.duration_since(resolve_end).as_micros() as u64, 943 commit_us = commit_end.duration_since(commit_start).as_micros() as u64, 944 total_us = commit_end.duration_since(prepare_start).as_micros() as u64, 945 parallelism, 946 "pipeline stage timings", 947 ); 948} 949 950async fn prepare_frame<S: SearchSink + 'static>( 951 frame: HydrantFrame, 952 ctx: &PipelineCtx<'_, S>, 953 now: UnixMicros, 954) -> Pending { 955 let cursor = HydrantCursor::new(frame.id); 956 let signal = promotion_signal(frame.record.as_ref(), now); 957 let regime = match frame.record.as_ref() { 958 Some(r) if r.live => Regime::Live, 959 Some(_) => Regime::Replay, 960 None => Regime::NonRecord, 961 }; 962 let op = match frame.kind { 963 FrameKind::Record => prepare_record(frame.record, ctx).await, 964 FrameKind::Identity | FrameKind::Account => PendingOp::Noop, 965 FrameKind::Other => { 966 debug!(id = frame.id, "ignoring unknown hydrant frame kind"); 967 PendingOp::Noop 968 } 969 }; 970 Pending { 971 cursor, 972 signal, 973 regime, 974 op, 975 } 976} 977 978async fn prepare_record<S: SearchSink + 'static>( 979 record: Option<RecordFrame>, 980 ctx: &PipelineCtx<'_, S>, 981) -> PendingOp { 982 let Some(record) = record else { 983 debug!("record-typed frame missing payload, skipping"); 984 return PendingOp::Noop; 985 }; 986 if !record.collection.as_ref().starts_with(TANGLED_PREFIX) { 987 return PendingOp::Noop; 988 } 989 let nsid = record.collection.clone(); 990 let source = match build_source_uri(&record) { 991 Ok(s) => s, 992 Err(e) => { 993 warn!(?e, "invalid frame source, dropping record"); 994 return PendingOp::Noop; 995 } 996 }; 997 match record.action { 998 RecordAction::Create | RecordAction::Update => { 999 evict_from_buffer(ctx.buffer, &source).await; 1000 let Some(raw) = record.record else { 1001 debug!(collection = %nsid, "create/update missing record body, clearing cache"); 1002 return PendingOp::ClearCache { source }; 1003 }; 1004 let raw_bytes = Bytes::copy_from_slice(raw.get().as_bytes()); 1005 let wire_bytes = match fallback_rfc3339(&record.rkey, &record.rev) 1006 .and_then(|fallback| synthesize_created_at(&raw_bytes, &fallback)) 1007 { 1008 Some(patched) => Bytes::from(patched), 1009 None => raw_bytes, 1010 }; 1011 let (parsed, bytes) = match decode_canon_or_upgrade_bytes( 1012 &record.collection, 1013 &wire_bytes, 1014 ctx.resolver, 1015 ) 1016 .await 1017 { 1018 Ok((parsed, canon_bytes)) => { 1019 let bytes = match canon_bytes { 1020 std::borrow::Cow::Borrowed(_) => wire_bytes, 1021 std::borrow::Cow::Owned(v) => Bytes::from(v), 1022 }; 1023 (parsed, bytes) 1024 } 1025 Err(ExtractError::UnknownCollection(name)) => { 1026 debug!(collection = %name, "unknown sh.tangled.* collection, clearing cache"); 1027 return PendingOp::ClearCache { source }; 1028 } 1029 Err(e) => { 1030 warn!(?e, collection = %record.collection, "record decode failed, clearing cache"); 1031 return PendingOp::ClearCache { source }; 1032 } 1033 }; 1034 if let Record::Repo(repo) = &parsed { 1035 if let Some(shadow) = ctx.shadow { 1036 shadow.note_observed(&record.did, &record.rkey).await; 1037 } 1038 let superseded = ctx 1039 .resolver 1040 .observe( 1041 record.did.clone(), 1042 record.rkey.clone(), 1043 repo.repo_did.clone(), 1044 ) 1045 .await; 1046 if let Some(prior) = superseded { 1047 let prior_uri = format!( 1048 "at://{}/sh.tangled.repo/{}", 1049 prior.owner.as_ref(), 1050 prior.rkey.as_ref(), 1051 ); 1052 if let Ok(prior_at_uri) = AtUri::<DefaultStr>::new_owned(&prior_uri) { 1053 evict_from_buffer(ctx.buffer, &prior_at_uri).await; 1054 ctx.store.remove_source(&prior_at_uri); 1055 ctx.records.remove(&prior_at_uri); 1056 ctx.search.remove(&prior_at_uri).await; 1057 } 1058 } 1059 if let Some(buffer) = ctx.buffer { 1060 let drained = buffer.take_observed(&record.did, &record.rkey).await; 1061 if !drained.is_empty() { 1062 finalize_drained(ctx, drained).await; 1063 } 1064 } 1065 } 1066 let edges = match parsed.extract_edges(&source) { 1067 Ok(es) => es, 1068 Err(e) => { 1069 warn!(?e, "edge extraction failed, clearing cache"); 1070 return PendingOp::ClearCache { source }; 1071 } 1072 }; 1073 let _ = ctx.store.intern_source(&source); 1074 PendingOp::Upsert { 1075 source, 1076 nsid, 1077 parsed: Box::new(parsed), 1078 bytes, 1079 cid: record.cid, 1080 edges, 1081 } 1082 } 1083 RecordAction::Delete => { 1084 evict_from_buffer(ctx.buffer, &source).await; 1085 if nsid.as_ref() == "sh.tangled.repo" { 1086 ctx.resolver.forget(&record.did, &record.rkey).await; 1087 } 1088 PendingOp::Delete { source, nsid } 1089 } 1090 RecordAction::Other => { 1091 debug!(collection = %nsid, "ignoring unknown record action"); 1092 PendingOp::Noop 1093 } 1094 } 1095} 1096 1097fn fallback_rfc3339( 1098 rkey: &Rkey<DefaultStr>, 1099 rev: &jacquard_common::types::tid::Tid, 1100) -> Option<String> { 1101 let tid = jacquard_common::types::tid::Tid::new(rkey.as_ref()) 1102 .ok() 1103 .unwrap_or_else(|| rev.clone()); 1104 let micros = i64::try_from(tid.timestamp()).ok()?; 1105 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(micros)?; 1106 Some(dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)) 1107} 1108 1109async fn evict_from_buffer(buffer: Option<&WarmingBuffer>, source: &AtUri<DefaultStr>) { 1110 if let Some(buffer) = buffer 1111 && !buffer.is_sealed() 1112 { 1113 buffer.evict_source(source).await; 1114 } 1115} 1116 1117async fn resolve_pending<S: SearchSink + 'static>( 1118 pending: Pending, 1119 ctx: &PipelineCtx<'_, S>, 1120) -> Pending { 1121 let Pending { 1122 cursor, 1123 signal, 1124 regime, 1125 op, 1126 } = pending; 1127 let op = match op { 1128 PendingOp::Upsert { 1129 source, 1130 nsid, 1131 parsed, 1132 bytes, 1133 cid, 1134 edges, 1135 } => { 1136 let pieces = UpsertPieces { 1137 source, 1138 nsid, 1139 parsed: *parsed, 1140 bytes, 1141 cid, 1142 edges, 1143 }; 1144 let pieces = match try_park_warming(ctx, cursor, pieces).await { 1145 ParkOutcome::Parked { nsid } => { 1146 return Pending { 1147 cursor, 1148 signal, 1149 regime, 1150 op: PendingOp::Parked { nsid }, 1151 }; 1152 } 1153 ParkOutcome::Passthrough(pieces) => *pieces, 1154 }; 1155 let UpsertPieces { 1156 source, 1157 nsid, 1158 parsed, 1159 bytes, 1160 cid, 1161 edges, 1162 } = pieces; 1163 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, ctx.shadow).await; 1164 PendingOp::Upsert { 1165 source, 1166 nsid, 1167 parsed: Box::new(parsed), 1168 bytes, 1169 cid, 1170 edges, 1171 } 1172 } 1173 other => other, 1174 }; 1175 Pending { 1176 cursor, 1177 signal, 1178 regime, 1179 op, 1180 } 1181} 1182 1183struct UpsertPieces { 1184 source: AtUri<DefaultStr>, 1185 nsid: Nsid<DefaultStr>, 1186 parsed: Record, 1187 bytes: Bytes, 1188 cid: Option<Cid<DefaultStr>>, 1189 edges: Vec<Edge>, 1190} 1191 1192impl From<ParkedUpsert> for UpsertPieces { 1193 fn from(u: ParkedUpsert) -> Self { 1194 Self { 1195 source: u.source, 1196 nsid: u.nsid, 1197 parsed: u.parsed, 1198 bytes: u.bytes, 1199 cid: u.cid, 1200 edges: u.edges, 1201 } 1202 } 1203} 1204 1205enum ParkOutcome { 1206 Parked { nsid: Nsid<DefaultStr> }, 1207 Passthrough(Box<UpsertPieces>), 1208} 1209 1210async fn try_park_warming<S: SearchSink + 'static>( 1211 ctx: &PipelineCtx<'_, S>, 1212 cursor: HydrantCursor, 1213 pieces: UpsertPieces, 1214) -> ParkOutcome { 1215 let Some(buffer) = ctx.buffer else { 1216 return ParkOutcome::Passthrough(Box::new(pieces)); 1217 }; 1218 if ctx.coverage.snapshot().is_ready() || buffer.is_sealed() { 1219 return ParkOutcome::Passthrough(Box::new(pieces)); 1220 } 1221 let deps = collect_unresolved_deps(&pieces.edges, ctx.resolver).await; 1222 if deps.is_empty() { 1223 return ParkOutcome::Passthrough(Box::new(pieces)); 1224 } 1225 let nsid = pieces.nsid.clone(); 1226 let upsert = ParkedUpsert { 1227 cursor, 1228 source: pieces.source, 1229 nsid: pieces.nsid, 1230 parsed: pieces.parsed, 1231 bytes: pieces.bytes, 1232 cid: pieces.cid, 1233 edges: pieces.edges, 1234 }; 1235 let deps_for_shadow = ctx.shadow.is_some().then(|| deps.clone()); 1236 match buffer.try_park(upsert, deps).await { 1237 Ok(()) => { 1238 if let Some((shadow, noted)) = ctx.shadow.zip(deps_for_shadow) { 1239 let _ = futures::future::join_all(noted.into_iter().map(|dep| async move { 1240 shadow.note_unresolved(dep.owner, dep.rkey).await; 1241 })) 1242 .await; 1243 } 1244 ParkOutcome::Parked { nsid } 1245 } 1246 Err(returned) => ParkOutcome::Passthrough(Box::new(returned.into())), 1247 } 1248} 1249 1250async fn collect_unresolved_deps(edges: &[Edge], resolver: &RepoIdResolver) -> Vec<RepoIdent> { 1251 futures::stream::iter(edges) 1252 .fold(Vec::new(), |mut acc, edge| async move { 1253 let Some(uri) = edge.subject.as_uri() else { 1254 return acc; 1255 }; 1256 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else { 1257 return acc; 1258 }; 1259 if resolver.cached_resolution(&owner, &rkey).await.is_some() { 1260 return acc; 1261 } 1262 let candidate = RepoIdent::new(owner, rkey); 1263 if !acc.contains(&candidate) { 1264 acc.push(candidate); 1265 } 1266 acc 1267 }) 1268 .await 1269} 1270 1271async fn finalize_drained<S: SearchSink + 'static>( 1272 ctx: &PipelineCtx<'_, S>, 1273 drained: Vec<ParkedUpsert>, 1274) { 1275 for upsert in drained { 1276 let ParkedUpsert { 1277 cursor: _, 1278 source, 1279 nsid: _, 1280 parsed, 1281 bytes, 1282 cid, 1283 edges, 1284 } = upsert; 1285 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, None).await; 1286 cache_body(ctx.records, &source, cid, bytes); 1287 ctx.store.upsert_source(&source, edges); 1288 let outcome = apply_record_state(ctx.issue_states, ctx.pull_statuses, &source, &parsed); 1289 log_unknown_state_variant(outcome, &source); 1290 index_search(ctx.search, ctx.resolver, &source, parsed).await; 1291 } 1292} 1293 1294#[allow(clippy::too_many_arguments)] 1295async fn commit_pending<S: SearchSink>( 1296 pending: Pending, 1297 store: &EdgeStore, 1298 issue_states: &StateIndex<IssueStateKind>, 1299 pull_statuses: &StateIndex<PullStatusKind>, 1300 coverage: &CoverageWatch, 1301 search: &S, 1302 records: &dyn RecordStore, 1303 resolver: &RepoIdResolver, 1304) { 1305 let Pending { 1306 cursor, 1307 signal, 1308 regime: _, 1309 op, 1310 } = pending; 1311 match op { 1312 PendingOp::Noop | PendingOp::Parked { .. } => {} 1313 PendingOp::ClearCache { source } => records.remove(&source), 1314 PendingOp::Upsert { 1315 source, 1316 nsid: _, 1317 parsed, 1318 bytes, 1319 cid, 1320 edges, 1321 } => { 1322 cache_body(records, &source, cid, bytes); 1323 store.upsert_source(&source, edges); 1324 let outcome = apply_record_state(issue_states, pull_statuses, &source, &parsed); 1325 log_unknown_state_variant(outcome, &source); 1326 index_search(search, resolver, &source, *parsed).await; 1327 } 1328 PendingOp::Delete { source, nsid } => { 1329 store.remove_source(&source); 1330 apply_delete_to_state_index(issue_states, pull_statuses, &source, &nsid); 1331 records.remove(&source); 1332 search.remove(&source).await; 1333 } 1334 } 1335 coverage.update(|c| c.advance(cursor).maybe_promote(signal)); 1336} 1337 1338async fn index_search<S: SearchSink>( 1339 search: &S, 1340 resolver: &RepoIdResolver, 1341 source: &AtUri<DefaultStr>, 1342 parsed: Record, 1343) { 1344 let Some(searchable) = SearchableRecord::try_from_record(parsed) else { 1345 return; 1346 }; 1347 let Some(searchable) = searchable.normalize(resolver).await else { 1348 return; 1349 }; 1350 search.upsert(searchable.to_search_doc(source)).await; 1351} 1352 1353#[cfg(test)] 1354#[allow(clippy::too_many_arguments)] 1355async fn handle_frame<S: SearchSink + 'static>( 1356 frame: HydrantFrame, 1357 store: &EdgeStore, 1358 issue_states: &StateIndex<IssueStateKind>, 1359 pull_statuses: &StateIndex<PullStatusKind>, 1360 coverage: &CoverageWatch, 1361 search: &S, 1362 records: &dyn RecordStore, 1363 resolver: &RepoIdResolver, 1364 clock: &dyn Clock, 1365 now: UnixMicros, 1366) { 1367 let _ = clock; 1368 let ctx = PipelineCtx { 1369 resolver, 1370 store, 1371 issue_states, 1372 pull_statuses, 1373 coverage, 1374 records, 1375 search, 1376 shadow: None, 1377 buffer: None, 1378 }; 1379 let pending = prepare_frame(frame, &ctx, now).await; 1380 let pending = resolve_pending(pending, &ctx).await; 1381 commit_pending( 1382 pending, 1383 store, 1384 issue_states, 1385 pull_statuses, 1386 coverage, 1387 search, 1388 records, 1389 resolver, 1390 ) 1391 .await; 1392} 1393 1394fn log_unknown_state_variant(outcome: ApplyOutcome, source: &AtUri<DefaultStr>) { 1395 if matches!(outcome, ApplyOutcome::UnknownVariant) { 1396 warn!( 1397 target: "bobbin_ingest::state_index", 1398 %source, 1399 "state record has unknown wire variant, skipping index update", 1400 ); 1401 } 1402} 1403 1404fn apply_delete_to_state_index( 1405 issue_states: &StateIndex<IssueStateKind>, 1406 pull_statuses: &StateIndex<PullStatusKind>, 1407 source: &AtUri<DefaultStr>, 1408 nsid: &Nsid<DefaultStr>, 1409) { 1410 match nsid.as_ref() { 1411 "sh.tangled.repo.issue" => issue_states.remove_entity(source), 1412 "sh.tangled.repo.pull" => pull_statuses.remove_entity(source), 1413 "sh.tangled.repo.issue.state" => issue_states.remove_source(source), 1414 "sh.tangled.repo.pull.status" => pull_statuses.remove_source(source), 1415 _ => {} 1416 } 1417} 1418 1419fn promotion_signal(record: Option<&RecordFrame>, now: UnixMicros) -> PromotionSignal { 1420 PromotionSignal { 1421 rev_micros: record.map(|r| r.rev.timestamp()), 1422 now_micros: now.raw(), 1423 skew_micros: READY_SKEW.as_micros() as u64, 1424 } 1425} 1426 1427fn cache_body( 1428 records: &dyn RecordStore, 1429 source: &AtUri<DefaultStr>, 1430 cid: Option<Cid<DefaultStr>>, 1431 bytes: Bytes, 1432) { 1433 match cid { 1434 Some(cid) => records.put( 1435 source.clone(), 1436 Arc::new(RecordBody { 1437 uri: source.clone(), 1438 cid, 1439 value: bytes, 1440 }), 1441 ), 1442 None => records.remove(source), 1443 } 1444} 1445 1446async fn normalize_subjects( 1447 edges: Vec<Edge>, 1448 resolver: &RepoIdResolver, 1449 coverage: &CoverageWatch, 1450 shadow: Option<&WarmingShadowBuffer>, 1451) -> Vec<Edge> { 1452 let warming = shadow.is_some() && !coverage.snapshot().is_ready(); 1453 futures::stream::iter(edges) 1454 .filter_map(|edge| async move { 1455 let Some(uri) = edge.subject.as_uri() else { 1456 return Some(edge); 1457 }; 1458 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else { 1459 return Some(edge); 1460 }; 1461 if warming 1462 && let Some(shadow) = shadow 1463 && resolver.cached_resolution(&owner, &rkey).await.is_none() 1464 { 1465 shadow 1466 .note_unresolved(owner.clone(), rkey.clone()) 1467 .await; 1468 } 1469 match resolver.resolve(&owner, &rkey).await { 1470 Resolution::Mapped(repo_did) => Some(Edge { 1471 subject: SubjectRef::Did(repo_did), 1472 ..edge 1473 }), 1474 Resolution::NoRepoDid => { 1475 warn!( 1476 target: "bobbin_ingest::normalize", 1477 kind = %edge.kind, 1478 owner = owner.as_ref(), 1479 rkey = rkey.as_ref(), 1480 source = edge.source.as_ref(), 1481 "dropping edge: target repo has no repoDid, no canonical DID subject available", 1482 ); 1483 None 1484 } 1485 Resolution::Unresolvable => { 1486 warn!( 1487 target: "bobbin_ingest::normalize", 1488 kind = %edge.kind, 1489 owner = owner.as_ref(), 1490 rkey = rkey.as_ref(), 1491 source = edge.source.as_ref(), 1492 "dropping edge: repo unresolvable, rkey-form subject will not match bare-DID queries", 1493 ); 1494 None 1495 } 1496 } 1497 }) 1498 .collect() 1499 .await 1500} 1501 1502fn parse_repo_subject_uri(uri: &AtUri<DefaultStr>) -> Option<(Did<DefaultStr>, Rkey<DefaultStr>)> { 1503 let collection = uri.collection()?; 1504 if collection.as_ref() != "sh.tangled.repo" { 1505 return None; 1506 } 1507 let AtIdentifier::Did(authority) = uri.authority() else { 1508 return None; 1509 }; 1510 let rkey = uri.rkey()?; 1511 let owner = Did::new_owned(authority.as_ref()).ok()?; 1512 let rkey = Rkey::new_owned(rkey.as_ref()).ok()?; 1513 Some((owner, rkey)) 1514} 1515 1516fn build_source_uri(r: &RecordFrame) -> Result<AtUri<DefaultStr>, IngestError> { 1517 Ok(AtUri::from_parts_owned( 1518 r.did.as_ref(), 1519 r.collection.as_ref(), 1520 r.rkey.as_ref(), 1521 )?) 1522} 1523 1524#[cfg(test)] 1525mod tests { 1526 use super::*; 1527 use bobbin_edge_index::Coverage; 1528 use bobbin_record_lru::{CacheCapacity, LruRecordStore, NoopRecordStore, RecordStore}; 1529 use bobbin_runtime::{OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs}; 1530 use bobbin_types::search::NoopSearchSink; 1531 use jacquard_common::types::nsid::Nsid; 1532 use jacquard_common::types::tid::Tid; 1533 use serde_json::json; 1534 1535 const VALID_CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 1536 1537 fn did_subj(s: &str) -> SubjectRef { 1538 SubjectRef::Did(Did::new_owned(s).unwrap()) 1539 } 1540 1541 fn uri_subj(s: &str) -> SubjectRef { 1542 SubjectRef::Uri(AtUri::new_owned(s).unwrap()) 1543 } 1544 1545 fn rkey(s: &str) -> Rkey<DefaultStr> { 1546 Rkey::new_owned(s).unwrap() 1547 } 1548 1549 #[allow(clippy::type_complexity)] 1550 fn fresh() -> ( 1551 Arc<EdgeStore>, 1552 Arc<StateIndex<IssueStateKind>>, 1553 Arc<StateIndex<PullStatusKind>>, 1554 Arc<CoverageWatch>, 1555 Arc<RepoIdResolver>, 1556 ) { 1557 ( 1558 Arc::new(EdgeStore::new(RuntimeHasher::default())), 1559 Arc::new(StateIndex::new(RuntimeHasher::default())), 1560 Arc::new(StateIndex::new(RuntimeHasher::default())), 1561 Arc::new(CoverageWatch::new()), 1562 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 1563 ) 1564 } 1565 1566 fn now() -> UnixMicros { 1567 SystemClock::new().now_unix_micros() 1568 } 1569 1570 fn sys_clock() -> SystemClock { 1571 SystemClock::new() 1572 } 1573 1574 fn parse_frame(value: serde_json::Value) -> HydrantFrame { 1575 let text = serde_json::to_string(&value).expect("serialize fixture"); 1576 serde_json::from_str(&text).expect("deserialize fixture") 1577 } 1578 1579 fn fresh_tid() -> Tid { 1580 Tid::now_0() 1581 } 1582 1583 #[tokio::test] 1584 async fn ignores_non_tangled_collections() { 1585 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1586 let frame: HydrantFrame = parse_frame(json!({ 1587 "id": 1, 1588 "type": "record", 1589 "record": { 1590 "live": false, 1591 "did": "did:plc:nel", 1592 "rev": fresh_tid().as_str(), 1593 "collection": "app.bsky.feed.post", 1594 "rkey": "abcabcabcabcz", 1595 "action": "create", 1596 "record": {"$type": "app.bsky.feed.post", "text": "hi"} 1597 } 1598 })); 1599 handle_frame( 1600 frame, 1601 &store, 1602 &issue_states, 1603 &pull_statuses, 1604 &cov, 1605 &NoopSearchSink, 1606 &NoopRecordStore, 1607 &resolver, 1608 &sys_clock(), 1609 now(), 1610 ) 1611 .await; 1612 assert_eq!(store.key_count(), 0); 1613 assert_eq!(cov.snapshot().events_processed(), 1); 1614 } 1615 1616 #[tokio::test] 1617 async fn create_then_delete_round_trips_a_star() { 1618 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1619 let create: HydrantFrame = parse_frame(json!({ 1620 "id": 10, 1621 "type": "record", 1622 "record": { 1623 "live": false, 1624 "did": "did:plc:olaren", 1625 "rev": fresh_tid().as_str(), 1626 "collection": "sh.tangled.feed.star", 1627 "rkey": "abcabcabcabcz", 1628 "action": "create", 1629 "record": { 1630 "$type": "sh.tangled.feed.star", 1631 "createdAt": "2026-05-01T00:00:00Z", 1632 "subjectDid": "did:plc:abalone" 1633 } 1634 } 1635 })); 1636 handle_frame( 1637 create, 1638 &store, 1639 &issue_states, 1640 &pull_statuses, 1641 &cov, 1642 &NoopSearchSink, 1643 &NoopRecordStore, 1644 &resolver, 1645 &sys_clock(), 1646 now(), 1647 ) 1648 .await; 1649 let key = bobbin_types::ids::EdgeKey::new( 1650 Nsid::new_static("sh.tangled.feed.star").unwrap(), 1651 did_subj("did:plc:abalone"), 1652 ); 1653 assert_eq!(store.count(&key), 1); 1654 1655 let delete: HydrantFrame = parse_frame(json!({ 1656 "id": 11, 1657 "type": "record", 1658 "record": { 1659 "live": false, 1660 "did": "did:plc:olaren", 1661 "rev": fresh_tid().as_str(), 1662 "collection": "sh.tangled.feed.star", 1663 "rkey": "abcabcabcabcz", 1664 "action": "delete", 1665 "record": null 1666 } 1667 })); 1668 handle_frame( 1669 delete, 1670 &store, 1671 &issue_states, 1672 &pull_statuses, 1673 &cov, 1674 &NoopSearchSink, 1675 &NoopRecordStore, 1676 &resolver, 1677 &sys_clock(), 1678 now(), 1679 ) 1680 .await; 1681 assert_eq!(store.count(&key), 0); 1682 } 1683 1684 #[tokio::test] 1685 async fn update_replaces_prior_edges() { 1686 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1687 let mk = |subject_did: &Did<DefaultStr>, id: u64| -> HydrantFrame { 1688 parse_frame(json!({ 1689 "id": id, 1690 "type": "record", 1691 "record": { 1692 "live": false, 1693 "did": "did:plc:olaren", 1694 "rev": fresh_tid().as_str(), 1695 "collection": "sh.tangled.feed.star", 1696 "rkey": "abcabcabcabcz", 1697 "action": "update", 1698 "record": { 1699 "$type": "sh.tangled.feed.star", 1700 "createdAt": "2026-05-01T00:00:00Z", 1701 "subjectDid": subject_did.as_ref() 1702 } 1703 } 1704 })) 1705 }; 1706 handle_frame( 1707 mk(&Did::new_owned("did:plc:abalone").unwrap(), 1), 1708 &store, 1709 &issue_states, 1710 &pull_statuses, 1711 &cov, 1712 &NoopSearchSink, 1713 &NoopRecordStore, 1714 &resolver, 1715 &sys_clock(), 1716 now(), 1717 ) 1718 .await; 1719 handle_frame( 1720 mk(&Did::new_owned("did:plc:uni").unwrap(), 2), 1721 &store, 1722 &issue_states, 1723 &pull_statuses, 1724 &cov, 1725 &NoopSearchSink, 1726 &NoopRecordStore, 1727 &resolver, 1728 &sys_clock(), 1729 now(), 1730 ) 1731 .await; 1732 1733 let kind = Nsid::new_static("sh.tangled.feed.star").unwrap(); 1734 let old = bobbin_types::ids::EdgeKey::new(kind.clone(), did_subj("did:plc:abalone")); 1735 let new = bobbin_types::ids::EdgeKey::new(kind, did_subj("did:plc:uni")); 1736 assert_eq!(store.count(&old), 0); 1737 assert_eq!(store.count(&new), 1); 1738 } 1739 1740 #[tokio::test] 1741 async fn prepare_frame_tags_regime_from_live_flag() { 1742 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1743 let search = NoopSearchSink; 1744 let records = NoopRecordStore; 1745 let ctx = PipelineCtx { 1746 resolver: &resolver, 1747 store: &store, 1748 issue_states: &issue_states, 1749 pull_statuses: &pull_statuses, 1750 coverage: &cov, 1751 records: &records, 1752 search: &search, 1753 shadow: None, 1754 buffer: None, 1755 }; 1756 let mk = |live: bool| -> HydrantFrame { 1757 parse_frame(json!({ 1758 "id": 1, 1759 "type": "record", 1760 "record": { 1761 "live": live, 1762 "did": "did:plc:olaren", 1763 "rev": fresh_tid().as_str(), 1764 "collection": "sh.tangled.feed.star", 1765 "rkey": "abcabcabcabcz", 1766 "action": "create", 1767 "record": { 1768 "$type": "sh.tangled.feed.star", 1769 "createdAt": "2026-05-01T00:00:00Z", 1770 "subjectDid": "did:plc:abalone" 1771 } 1772 } 1773 })) 1774 }; 1775 let live_pending = prepare_frame(mk(true), &ctx, now()).await; 1776 assert_eq!(live_pending.regime, Regime::Live); 1777 let replay_pending = prepare_frame(mk(false), &ctx, now()).await; 1778 assert_eq!(replay_pending.regime, Regime::Replay); 1779 1780 let identity: HydrantFrame = parse_frame(json!({ 1781 "id": 9, 1782 "type": "identity", 1783 })); 1784 let id_pending = prepare_frame(identity, &ctx, now()).await; 1785 assert_eq!(id_pending.regime, Regime::NonRecord); 1786 } 1787 1788 #[tokio::test] 1789 async fn create_with_cid_warms_record_lru() { 1790 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1791 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 1792 let frame: HydrantFrame = parse_frame(json!({ 1793 "id": 1, 1794 "type": "record", 1795 "record": { 1796 "live": false, 1797 "did": "did:plc:olaren", 1798 "rev": fresh_tid().as_str(), 1799 "collection": "sh.tangled.feed.star", 1800 "rkey": "abcabcabcabcz", 1801 "action": "create", 1802 "cid": VALID_CID, 1803 "record": { 1804 "$type": "sh.tangled.feed.star", 1805 "createdAt": "2026-05-01T00:00:00Z", 1806 "subjectDid": "did:plc:abalone" 1807 } 1808 } 1809 })); 1810 let source = 1811 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap(); 1812 handle_frame( 1813 frame, 1814 &store, 1815 &issue_states, 1816 &pull_statuses, 1817 &cov, 1818 &NoopSearchSink, 1819 &lru, 1820 &resolver, 1821 &sys_clock(), 1822 now(), 1823 ) 1824 .await; 1825 let cached = lru.get(&source).expect("hydrant cid must seed the lru"); 1826 assert_eq!(cached.cid.as_ref(), VALID_CID); 1827 let parsed: serde_json::Value = serde_json::from_slice(&cached.value).unwrap(); 1828 assert_eq!( 1829 parsed["subject"]["did"], "did:plc:abalone", 1830 "legacy wire is upgraded to canon shape before caching so downstream readers see canonical fields" 1831 ); 1832 } 1833 1834 #[tokio::test] 1835 async fn create_without_cid_clears_record_lru() { 1836 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1837 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 1838 let source = 1839 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap(); 1840 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap(); 1841 lru.put( 1842 source.clone(), 1843 Arc::new(RecordBody { 1844 uri: source.clone(), 1845 cid, 1846 value: bytes::Bytes::from_static(b"{\"stale\":true}"), 1847 }), 1848 ); 1849 let frame: HydrantFrame = parse_frame(json!({ 1850 "id": 1, 1851 "type": "record", 1852 "record": { 1853 "live": false, 1854 "did": "did:plc:olaren", 1855 "rev": fresh_tid().as_str(), 1856 "collection": "sh.tangled.feed.star", 1857 "rkey": "abcabcabcabcz", 1858 "action": "update", 1859 "record": { 1860 "$type": "sh.tangled.feed.star", 1861 "createdAt": "2026-05-01T00:00:00Z", 1862 "subjectDid": "did:plc:abalone" 1863 } 1864 } 1865 })); 1866 handle_frame( 1867 frame, 1868 &store, 1869 &issue_states, 1870 &pull_statuses, 1871 &cov, 1872 &NoopSearchSink, 1873 &lru, 1874 &resolver, 1875 &sys_clock(), 1876 now(), 1877 ) 1878 .await; 1879 assert!( 1880 lru.get(&source).is_none(), 1881 "missing cid means we cannot trust the body, so the lru must be cleared", 1882 ); 1883 } 1884 1885 #[tokio::test] 1886 async fn delete_evicts_record_lru() { 1887 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1888 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 1889 let source = 1890 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap(); 1891 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap(); 1892 lru.put( 1893 source.clone(), 1894 Arc::new(RecordBody { 1895 uri: source.clone(), 1896 cid, 1897 value: bytes::Bytes::from_static(b"{}"), 1898 }), 1899 ); 1900 let frame: HydrantFrame = parse_frame(json!({ 1901 "id": 1, 1902 "type": "record", 1903 "record": { 1904 "live": false, 1905 "did": "did:plc:olaren", 1906 "rev": fresh_tid().as_str(), 1907 "collection": "sh.tangled.feed.star", 1908 "rkey": "abcabcabcabcz", 1909 "action": "delete", 1910 "record": null 1911 } 1912 })); 1913 handle_frame( 1914 frame, 1915 &store, 1916 &issue_states, 1917 &pull_statuses, 1918 &cov, 1919 &NoopSearchSink, 1920 &lru, 1921 &resolver, 1922 &sys_clock(), 1923 now(), 1924 ) 1925 .await; 1926 assert!(lru.get(&source).is_none()); 1927 } 1928 1929 #[tokio::test] 1930 async fn live_recent_event_promotes_coverage_to_ready() { 1931 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1932 assert!(!cov.snapshot().is_ready()); 1933 let frame: HydrantFrame = parse_frame(json!({ 1934 "id": 99, 1935 "type": "record", 1936 "record": { 1937 "live": true, 1938 "did": "did:plc:olaren", 1939 "rev": fresh_tid().as_str(), 1940 "collection": "sh.tangled.feed.star", 1941 "rkey": "abcabcabcabcz", 1942 "action": "create", 1943 "record": { 1944 "$type": "sh.tangled.feed.star", 1945 "createdAt": "2026-05-01T00:00:00Z", 1946 "subjectDid": "did:plc:abalone" 1947 } 1948 } 1949 })); 1950 handle_frame( 1951 frame, 1952 &store, 1953 &issue_states, 1954 &pull_statuses, 1955 &cov, 1956 &NoopSearchSink, 1957 &NoopRecordStore, 1958 &resolver, 1959 &sys_clock(), 1960 now(), 1961 ) 1962 .await; 1963 assert!(cov.snapshot().is_ready()); 1964 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(99)); 1965 } 1966 1967 #[tokio::test] 1968 async fn live_but_stale_rev_does_not_promote() { 1969 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1970 let stale_tid = Tid::from_time(1_000_000, 0); 1971 let frame: HydrantFrame = parse_frame(json!({ 1972 "id": 7, 1973 "type": "record", 1974 "record": { 1975 "live": true, 1976 "did": "did:plc:olaren", 1977 "rev": stale_tid.as_str(), 1978 "collection": "sh.tangled.feed.star", 1979 "rkey": "abcabcabcabcz", 1980 "action": "create", 1981 "record": { 1982 "$type": "sh.tangled.feed.star", 1983 "createdAt": "2026-05-01T00:00:00Z", 1984 "subjectDid": "did:plc:abalone" 1985 } 1986 } 1987 })); 1988 handle_frame( 1989 frame, 1990 &store, 1991 &issue_states, 1992 &pull_statuses, 1993 &cov, 1994 &NoopSearchSink, 1995 &NoopRecordStore, 1996 &resolver, 1997 &sys_clock(), 1998 now(), 1999 ) 2000 .await; 2001 assert!(!cov.snapshot().is_ready()); 2002 assert!(matches!(cov.snapshot(), Coverage::Warming { .. })); 2003 } 2004 2005 #[test] 2006 fn classify_consumer_too_slow_frame_returns_typed_variant() { 2007 let text = r#"{"type":"error","error":"ConsumerTooSlow","message":"stream socket send blocked for at least 30 seconds"}"#; 2008 match classify_text_frame(text) { 2009 Err(IngestError::ConsumerTooSlow { message }) => { 2010 assert!( 2011 message.contains("30 seconds"), 2012 "message field preserved verbatim, got: {message}" 2013 ); 2014 } 2015 other => panic!("expected ConsumerTooSlow variant, got: {other:?}"), 2016 } 2017 } 2018 2019 #[test] 2020 fn classify_unknown_hydrant_error_falls_back_to_generic_variant() { 2021 let text = r#"{"type":"error","error":"NewFutureCode","message":"some new failure mode"}"#; 2022 match classify_text_frame(text) { 2023 Err(IngestError::HydrantStream { code, message }) => { 2024 assert_eq!(code, "NewFutureCode"); 2025 assert_eq!(message, "some new failure mode"); 2026 } 2027 other => panic!("expected HydrantStream variant, got: {other:?}"), 2028 } 2029 } 2030 2031 #[test] 2032 fn classify_error_frame_without_message_uses_empty_string() { 2033 let text = r#"{"type":"error","error":"ConsumerTooSlow"}"#; 2034 match classify_text_frame(text) { 2035 Err(IngestError::ConsumerTooSlow { message }) => assert!(message.is_empty()), 2036 other => panic!("expected ConsumerTooSlow with empty message, got: {other:?}"), 2037 } 2038 } 2039 2040 #[test] 2041 fn classify_normal_record_frame_unchanged() { 2042 let text = r#"{"id":42,"type":"record"}"#; 2043 let frame = classify_text_frame(text).expect("normal record frame must parse"); 2044 assert_eq!(frame.id, 42); 2045 assert_eq!(frame.kind, FrameKind::Record); 2046 } 2047 2048 #[test] 2049 fn classify_garbage_object_returns_decode_error() { 2050 let text = r#"{"random":"object","without":"required fields"}"#; 2051 match classify_text_frame(text) { 2052 Err(IngestError::Decode(_)) => {} 2053 other => panic!("expected Decode error, got: {other:?}"), 2054 } 2055 } 2056 2057 struct ScriptedWsStream { 2058 messages: std::collections::VecDeque<Result<WsMessage, NetworkError>>, 2059 } 2060 2061 impl WsStream for ScriptedWsStream { 2062 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> { 2063 let msg = self.messages.pop_front(); 2064 Box::pin(async move { msg }) 2065 } 2066 } 2067 2068 #[tokio::test] 2069 async fn reader_loop_surfaces_consumer_too_slow_from_error_frame() { 2070 let mut messages = std::collections::VecDeque::new(); 2071 messages.push_back(Ok(WsMessage::Text( 2072 r#"{"type":"error","error":"ConsumerTooSlow","message":"stream delivery blocked"}"# 2073 .to_owned(), 2074 ))); 2075 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages }); 2076 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH); 2077 let (control_tx, _control_rx) = 2078 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2079 let cancel = CancellationToken::new(); 2080 2081 let end = reader_loop(stream, frame_tx, control_tx, cancel).await; 2082 2083 match end.error { 2084 Some(IngestError::ConsumerTooSlow { message }) => { 2085 assert_eq!(message, "stream delivery blocked"); 2086 } 2087 other => panic!("expected ConsumerTooSlow SessionEnd error, got: {other:?}"), 2088 } 2089 assert_eq!(end.outcome, SessionOutcome::Empty); 2090 } 2091 2092 #[tokio::test] 2093 async fn reader_loop_surfaces_unknown_hydrant_error_distinctly() { 2094 let mut messages = std::collections::VecDeque::new(); 2095 messages.push_back(Ok(WsMessage::Text( 2096 r#"{"type":"error","error":"NewFutureCode","message":"new mode"}"#.to_owned(), 2097 ))); 2098 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages }); 2099 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH); 2100 let (control_tx, _control_rx) = 2101 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2102 let cancel = CancellationToken::new(); 2103 2104 let end = reader_loop(stream, frame_tx, control_tx, cancel).await; 2105 2106 match end.error { 2107 Some(IngestError::HydrantStream { code, message }) => { 2108 assert_eq!(code, "NewFutureCode"); 2109 assert_eq!(message, "new mode"); 2110 } 2111 other => panic!("expected HydrantStream SessionEnd error, got: {other:?}"), 2112 } 2113 } 2114 2115 struct ChannelWsStream { 2116 rx: tokio::sync::mpsc::Receiver<Result<WsMessage, NetworkError>>, 2117 } 2118 2119 impl WsStream for ChannelWsStream { 2120 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> { 2121 Box::pin(async move { self.rx.recv().await }) 2122 } 2123 } 2124 2125 fn star_frame_text(id: u64, rkey: &Rkey<DefaultStr>) -> String { 2126 json!({ 2127 "id": id, 2128 "type": "record", 2129 "record": { 2130 "live": false, 2131 "did": "did:plc:olaren", 2132 "rev": fresh_tid().as_str(), 2133 "collection": "sh.tangled.feed.star", 2134 "rkey": rkey.as_ref(), 2135 "action": "create", 2136 "record": { 2137 "$type": "sh.tangled.feed.star", 2138 "createdAt": "2026-05-01T00:00:00Z", 2139 "subjectDid": "did:plc:abalone" 2140 } 2141 } 2142 }) 2143 .to_string() 2144 } 2145 2146 #[tokio::test] 2147 async fn pong_forwards_promptly_when_frame_channel_is_full() { 2148 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8); 2149 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx }); 2150 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1); 2151 let (control_tx, mut control_rx) = 2152 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2153 let cancel = CancellationToken::new(); 2154 2155 let prefill: HydrantFrame = parse_frame(json!({ 2156 "id": 0, 2157 "type": "record", 2158 "record": { 2159 "live": false, 2160 "did": "did:plc:olaren", 2161 "rev": fresh_tid().as_str(), 2162 "collection": "sh.tangled.feed.star", 2163 "rkey": "prefilrkey001", 2164 "action": "create", 2165 "record": { 2166 "$type": "sh.tangled.feed.star", 2167 "createdAt": "2026-05-01T00:00:00Z", 2168 "subjectDid": "did:plc:abalone" 2169 } 2170 } 2171 })); 2172 frame_tx 2173 .try_send(prefill) 2174 .expect("depth-1 frame channel must accept the prefill"); 2175 2176 let reader_handle = tokio::spawn(reader_loop( 2177 stream, 2178 frame_tx.clone(), 2179 control_tx.clone(), 2180 cancel.clone(), 2181 )); 2182 2183 ws_tx 2184 .send(Ok(WsMessage::Text(star_frame_text( 2185 1, 2186 &rkey("starrkeyaa001"), 2187 )))) 2188 .await 2189 .unwrap(); 2190 ws_tx.send(Ok(WsMessage::Pong(Bytes::new()))).await.unwrap(); 2191 2192 let pong_event = tokio::time::timeout(Duration::from_millis(500), control_rx.recv()) 2193 .await 2194 .expect("pong must surface inside 500ms even when frame_tx is saturated. A reader blocked on a frame send would never poll the next ws message") 2195 .expect("control_tx was not closed"); 2196 assert!( 2197 matches!(pong_event, WsEvent::IncomingPong), 2198 "first control event must be the pong, not a held text", 2199 ); 2200 2201 let too_slow = 2202 "{\"type\":\"error\",\"error\":\"ConsumerTooSlow\",\"message\":\"saturated\"}" 2203 .to_string(); 2204 ws_tx.send(Ok(WsMessage::Text(too_slow))).await.unwrap(); 2205 2206 let end = tokio::time::timeout(Duration::from_secs(1), reader_handle) 2207 .await 2208 .expect("reader must exit promptly once ConsumerTooSlow is read") 2209 .expect("reader task must not panic"); 2210 match end.error { 2211 Some(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "saturated"), 2212 other => panic!("expected ConsumerTooSlow disconnect, got: {other:?}"), 2213 } 2214 2215 drop(frame_rx); 2216 } 2217 2218 #[tokio::test] 2219 async fn reader_drains_held_frames_once_processor_catches_up() { 2220 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8); 2221 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx }); 2222 let (frame_tx, mut frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1); 2223 let (control_tx, _control_rx) = 2224 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2225 let cancel = CancellationToken::new(); 2226 2227 let prefill: HydrantFrame = parse_frame(json!({ 2228 "id": 0, 2229 "type": "record", 2230 "record": { 2231 "live": false, 2232 "did": "did:plc:olaren", 2233 "rev": fresh_tid().as_str(), 2234 "collection": "sh.tangled.feed.star", 2235 "rkey": "prefilrkey001", 2236 "action": "create", 2237 "record": { 2238 "$type": "sh.tangled.feed.star", 2239 "createdAt": "2026-05-01T00:00:00Z", 2240 "subjectDid": "did:plc:abalone" 2241 } 2242 } 2243 })); 2244 frame_tx.try_send(prefill).unwrap(); 2245 2246 let reader_handle = tokio::spawn(reader_loop( 2247 stream, 2248 frame_tx.clone(), 2249 control_tx, 2250 cancel.clone(), 2251 )); 2252 2253 ws_tx 2254 .send(Ok(WsMessage::Text(star_frame_text( 2255 1, 2256 &rkey("heldrkeyaa001"), 2257 )))) 2258 .await 2259 .unwrap(); 2260 ws_tx 2261 .send(Ok(WsMessage::Text(star_frame_text( 2262 2, 2263 &rkey("heldrkeyaa002"), 2264 )))) 2265 .await 2266 .unwrap(); 2267 2268 let _drained_prefill = frame_rx.recv().await.expect("prefilled frame drains"); 2269 let first = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv()) 2270 .await 2271 .expect("first held frame must reach frame_rx after slot opens") 2272 .expect("frame_tx still open"); 2273 assert_eq!(first.id, 1); 2274 let second = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv()) 2275 .await 2276 .expect("second held frame must reach frame_rx after slot opens") 2277 .expect("frame_tx still open"); 2278 assert_eq!(second.id, 2); 2279 2280 cancel.cancel(); 2281 let _ = tokio::time::timeout(Duration::from_secs(1), reader_handle) 2282 .await 2283 .expect("reader must stop after cancel"); 2284 } 2285 2286 #[test] 2287 fn first_connect_uses_configured_start_cursor() { 2288 let start = HydrantCursor::new(42); 2289 assert_eq!(next_connect_cursor(Coverage::default(), start), start); 2290 } 2291 2292 #[test] 2293 fn reconnect_resumes_strictly_after_last_seen() { 2294 let snap = Coverage::default().advance(HydrantCursor::new(7)); 2295 assert_eq!( 2296 next_connect_cursor(snap, HydrantCursor::new(0)), 2297 HydrantCursor::new(8), 2298 ); 2299 } 2300 2301 #[test] 2302 fn reconnect_overrides_configured_start() { 2303 let snap = Coverage::default().advance(HydrantCursor::new(100)); 2304 assert_eq!( 2305 next_connect_cursor(snap, HydrantCursor::new(50)), 2306 HydrantCursor::new(101), 2307 ); 2308 } 2309 2310 #[test] 2311 fn first_connect_uses_start_even_when_first_frame_id_would_be_zero() { 2312 let start = HydrantCursor::new(7); 2313 let snap = Coverage::default(); 2314 assert_eq!(snap.last_cursor(), HydrantCursor::new(0)); 2315 assert_eq!(snap.events_processed(), 0); 2316 assert_eq!(next_connect_cursor(snap, start), start); 2317 } 2318 2319 #[test] 2320 fn reconnect_after_processing_id_zero_advances_to_one() { 2321 let snap = Coverage::default().advance(HydrantCursor::new(0)); 2322 assert_eq!(snap.events_processed(), 1); 2323 assert_eq!( 2324 next_connect_cursor(snap, HydrantCursor::new(99)), 2325 HydrantCursor::new(1), 2326 "events_processed disambiguates 'never seen' from 'saw id 0'", 2327 ); 2328 } 2329 2330 #[tokio::test] 2331 async fn identity_frame_advances_cursor_only() { 2332 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2333 let frame: HydrantFrame = parse_frame(json!({ 2334 "id": 5, 2335 "type": "identity", 2336 "identity": { 2337 "did": "did:plc:olaren", 2338 "handle": "olaren.dev" 2339 } 2340 })); 2341 handle_frame( 2342 frame, 2343 &store, 2344 &issue_states, 2345 &pull_statuses, 2346 &cov, 2347 &NoopSearchSink, 2348 &NoopRecordStore, 2349 &resolver, 2350 &sys_clock(), 2351 now(), 2352 ) 2353 .await; 2354 assert_eq!(store.key_count(), 0); 2355 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(5)); 2356 assert!(!cov.snapshot().is_ready()); 2357 } 2358 2359 #[tokio::test] 2360 async fn account_frame_advances_cursor_only() { 2361 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2362 let frame: HydrantFrame = parse_frame(json!({ 2363 "id": 6, 2364 "type": "account", 2365 "account": {"did": "did:plc:olaren", "active": true} 2366 })); 2367 handle_frame( 2368 frame, 2369 &store, 2370 &issue_states, 2371 &pull_statuses, 2372 &cov, 2373 &NoopSearchSink, 2374 &NoopRecordStore, 2375 &resolver, 2376 &sys_clock(), 2377 now(), 2378 ) 2379 .await; 2380 assert_eq!(store.key_count(), 0); 2381 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(6)); 2382 } 2383 2384 #[tokio::test] 2385 async fn unknown_frame_kind_advances_cursor_without_panic() { 2386 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2387 let frame: HydrantFrame = parse_frame(json!({"id": 8, "type": "future_event"})); 2388 assert_eq!(frame.kind, FrameKind::Other); 2389 handle_frame( 2390 frame, 2391 &store, 2392 &issue_states, 2393 &pull_statuses, 2394 &cov, 2395 &NoopSearchSink, 2396 &NoopRecordStore, 2397 &resolver, 2398 &sys_clock(), 2399 now(), 2400 ) 2401 .await; 2402 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(8)); 2403 } 2404 2405 fn fresh_runtime(cancel: CancellationToken) -> IngestRuntime<NoopSearchSink> { 2406 IngestRuntime { 2407 store: Arc::new(EdgeStore::new(RuntimeHasher::default())), 2408 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())), 2409 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())), 2410 coverage: Arc::new(CoverageWatch::new()), 2411 search: Arc::new(NoopSearchSink), 2412 records: Arc::new(NoopRecordStore) as Arc<dyn RecordStore>, 2413 resolver: Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 2414 clock: Arc::new(SystemClock::new()), 2415 entropy: Arc::new(OsEntropy), 2416 ws: TungsteniteWs::shared(), 2417 cancel, 2418 disconnects: None, 2419 warming_shadow: None, 2420 warming_buffer: None, 2421 } 2422 } 2423 2424 #[tokio::test(start_paused = true)] 2425 async fn cancel_token_short_circuits_reconnect_sleep() { 2426 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap()); 2427 let cancel = CancellationToken::new(); 2428 let runtime = fresh_runtime(cancel.clone()); 2429 let task = tokio::spawn(async move { run(cfg, runtime).await }); 2430 tokio::time::advance(Duration::from_millis(10)).await; 2431 cancel.cancel(); 2432 let outcome = tokio::time::timeout(Duration::from_secs(1), task) 2433 .await 2434 .expect("ingest must stop within timeout once cancel fires"); 2435 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}"); 2436 } 2437 2438 #[test] 2439 fn jittered_stays_within_one_quarter_of_base() { 2440 let base = Duration::from_secs(1); 2441 let cap = base + Duration::from_millis(250); 2442 let entropy = OsEntropy; 2443 (0..50).for_each(|_| { 2444 let j = jittered(base, &entropy); 2445 assert!(j >= base, "jitter must not undershoot"); 2446 assert!(j <= cap, "jitter must not exceed +25%, got {:?}", j); 2447 }); 2448 } 2449 2450 #[tokio::test] 2451 async fn star_after_observed_repo_keys_on_repo_did() { 2452 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2453 let repo: HydrantFrame = parse_frame(json!({ 2454 "id": 1, 2455 "type": "record", 2456 "record": { 2457 "live": false, 2458 "did": "did:plc:nel", 2459 "rev": fresh_tid().as_str(), 2460 "collection": "sh.tangled.repo", 2461 "rkey": "abcabcabcabcz", 2462 "action": "create", 2463 "record": { 2464 "$type": "sh.tangled.repo", 2465 "createdAt": "2026-05-01T00:00:00Z", 2466 "knot": "oyster.cafe", 2467 "name": "abalone", 2468 "repoDid": "did:plc:abalone" 2469 } 2470 } 2471 })); 2472 handle_frame( 2473 repo, 2474 &store, 2475 &issue_states, 2476 &pull_statuses, 2477 &cov, 2478 &NoopSearchSink, 2479 &NoopRecordStore, 2480 &resolver, 2481 &sys_clock(), 2482 now(), 2483 ) 2484 .await; 2485 2486 let star: HydrantFrame = parse_frame(json!({ 2487 "id": 2, 2488 "type": "record", 2489 "record": { 2490 "live": false, 2491 "did": "did:plc:olaren", 2492 "rev": fresh_tid().as_str(), 2493 "collection": "sh.tangled.feed.star", 2494 "rkey": "abcabcabcabcz", 2495 "action": "create", 2496 "record": { 2497 "$type": "sh.tangled.feed.star", 2498 "createdAt": "2026-05-01T00:00:00Z", 2499 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2500 } 2501 } 2502 })); 2503 handle_frame( 2504 star, 2505 &store, 2506 &issue_states, 2507 &pull_statuses, 2508 &cov, 2509 &NoopSearchSink, 2510 &NoopRecordStore, 2511 &resolver, 2512 &sys_clock(), 2513 now(), 2514 ) 2515 .await; 2516 2517 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap(); 2518 let repo_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:abalone")); 2519 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel")); 2520 assert_eq!( 2521 store.count(&repo_keyed), 2522 1, 2523 "star should be keyed on repoDID once the repo is observed" 2524 ); 2525 assert_eq!( 2526 store.count(&owner_keyed), 2527 0, 2528 "owner DID should not collect the edge" 2529 ); 2530 } 2531 2532 #[tokio::test] 2533 async fn unresolvable_repo_subject_drops_edge() { 2534 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2535 let star: HydrantFrame = parse_frame(json!({ 2536 "id": 1, 2537 "type": "record", 2538 "record": { 2539 "live": false, 2540 "did": "did:plc:olaren", 2541 "rev": fresh_tid().as_str(), 2542 "collection": "sh.tangled.feed.star", 2543 "rkey": "abcabcabcabcz", 2544 "action": "create", 2545 "record": { 2546 "$type": "sh.tangled.feed.star", 2547 "createdAt": "2026-05-01T00:00:00Z", 2548 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2549 } 2550 } 2551 })); 2552 handle_frame( 2553 star, 2554 &store, 2555 &issue_states, 2556 &pull_statuses, 2557 &cov, 2558 &NoopSearchSink, 2559 &NoopRecordStore, 2560 &resolver, 2561 &sys_clock(), 2562 now(), 2563 ) 2564 .await; 2565 2566 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap(); 2567 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:nel")); 2568 let uri_keyed = bobbin_types::ids::EdgeKey::new( 2569 nsid, 2570 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"), 2571 ); 2572 assert_eq!( 2573 store.count(&owner_keyed), 2574 0, 2575 "must not silently misfile under the authoring DID", 2576 ); 2577 assert_eq!( 2578 store.count(&uri_keyed), 2579 0, 2580 "unresolvable rkey-form subject must drop the edge; keeping it would index against a key that never matches bare-DID queries", 2581 ); 2582 } 2583 2584 #[tokio::test] 2585 async fn repo_without_repo_did_drops_edge() { 2586 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2587 let repo: HydrantFrame = parse_frame(json!({ 2588 "id": 1, 2589 "type": "record", 2590 "record": { 2591 "live": false, 2592 "did": "did:plc:nel", 2593 "rev": fresh_tid().as_str(), 2594 "collection": "sh.tangled.repo", 2595 "rkey": "abcabcabcabcz", 2596 "action": "create", 2597 "record": { 2598 "$type": "sh.tangled.repo", 2599 "createdAt": "2026-05-01T00:00:00Z", 2600 "knot": "oyster.cafe", 2601 "name": "abalone" 2602 } 2603 } 2604 })); 2605 handle_frame( 2606 repo, 2607 &store, 2608 &issue_states, 2609 &pull_statuses, 2610 &cov, 2611 &NoopSearchSink, 2612 &NoopRecordStore, 2613 &resolver, 2614 &sys_clock(), 2615 now(), 2616 ) 2617 .await; 2618 2619 let star: HydrantFrame = parse_frame(json!({ 2620 "id": 2, 2621 "type": "record", 2622 "record": { 2623 "live": false, 2624 "did": "did:plc:olaren", 2625 "rev": fresh_tid().as_str(), 2626 "collection": "sh.tangled.feed.star", 2627 "rkey": "abcabcabcabcz", 2628 "action": "create", 2629 "record": { 2630 "$type": "sh.tangled.feed.star", 2631 "createdAt": "2026-05-01T00:00:00Z", 2632 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2633 } 2634 } 2635 })); 2636 handle_frame( 2637 star, 2638 &store, 2639 &issue_states, 2640 &pull_statuses, 2641 &cov, 2642 &NoopSearchSink, 2643 &NoopRecordStore, 2644 &resolver, 2645 &sys_clock(), 2646 now(), 2647 ) 2648 .await; 2649 2650 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap(); 2651 let uri_keyed = bobbin_types::ids::EdgeKey::new( 2652 nsid.clone(), 2653 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"), 2654 ); 2655 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel")); 2656 assert_eq!( 2657 store.count(&uri_keyed), 2658 0, 2659 "no canonical DID exists for a repo without repoDID, so the edge must be dropped", 2660 ); 2661 assert_eq!( 2662 store.count(&owner_keyed), 2663 0, 2664 "the authoring DID is not the canonical repo identity", 2665 ); 2666 } 2667 2668 #[tokio::test] 2669 async fn explicit_subject_did_skips_normalization() { 2670 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2671 let star: HydrantFrame = parse_frame(json!({ 2672 "id": 1, 2673 "type": "record", 2674 "record": { 2675 "live": false, 2676 "did": "did:plc:olaren", 2677 "rev": fresh_tid().as_str(), 2678 "collection": "sh.tangled.feed.star", 2679 "rkey": "abcabcabcabcz", 2680 "action": "create", 2681 "record": { 2682 "$type": "sh.tangled.feed.star", 2683 "createdAt": "2026-05-01T00:00:00Z", 2684 "subjectDid": "did:plc:abalone" 2685 } 2686 } 2687 })); 2688 handle_frame( 2689 star, 2690 &store, 2691 &issue_states, 2692 &pull_statuses, 2693 &cov, 2694 &NoopSearchSink, 2695 &NoopRecordStore, 2696 &resolver, 2697 &sys_clock(), 2698 now(), 2699 ) 2700 .await; 2701 let key = bobbin_types::ids::EdgeKey::new( 2702 Nsid::new_static("sh.tangled.feed.star").unwrap(), 2703 did_subj("did:plc:abalone"), 2704 ); 2705 assert_eq!(store.count(&key), 1); 2706 } 2707 2708 #[tokio::test] 2709 async fn issue_with_repo_uri_resolves_to_repo_did() { 2710 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2711 resolver 2712 .observe( 2713 Did::new_owned("did:plc:nel").unwrap(), 2714 Rkey::new_owned("abcabcabcabcz").unwrap(), 2715 Some(Did::new_owned("did:plc:abalone").unwrap()), 2716 ) 2717 .await; 2718 let issue: HydrantFrame = parse_frame(json!({ 2719 "id": 1, 2720 "type": "record", 2721 "record": { 2722 "live": false, 2723 "did": "did:plc:olaren", 2724 "rev": fresh_tid().as_str(), 2725 "collection": "sh.tangled.repo.issue", 2726 "rkey": "abcabcabcabcz", 2727 "action": "create", 2728 "record": { 2729 "$type": "sh.tangled.repo.issue", 2730 "createdAt": "2026-05-01T00:00:00Z", 2731 "title": "bug", 2732 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2733 } 2734 } 2735 })); 2736 handle_frame( 2737 issue, 2738 &store, 2739 &issue_states, 2740 &pull_statuses, 2741 &cov, 2742 &NoopSearchSink, 2743 &NoopRecordStore, 2744 &resolver, 2745 &sys_clock(), 2746 now(), 2747 ) 2748 .await; 2749 let key = bobbin_types::ids::EdgeKey::new( 2750 Nsid::new_static("sh.tangled.repo.issue").unwrap(), 2751 did_subj("did:plc:abalone"), 2752 ); 2753 assert_eq!(store.count(&key), 1); 2754 } 2755 2756 #[derive(Default)] 2757 struct RecordingSearchSink { 2758 docs: tokio::sync::Mutex<Vec<bobbin_types::search::SearchDoc>>, 2759 } 2760 2761 impl SearchSink for RecordingSearchSink { 2762 async fn upsert(&self, doc: bobbin_types::search::SearchDoc) { 2763 self.docs.lock().await.push(doc); 2764 } 2765 async fn remove(&self, _uri: &AtUri<DefaultStr>) {} 2766 } 2767 2768 #[tokio::test] 2769 async fn search_index_hydrates_repo_did_via_resolver() { 2770 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2771 resolver 2772 .observe( 2773 Did::new_owned("did:plc:nel").unwrap(), 2774 Rkey::new_owned("abcabcabcabcz").unwrap(), 2775 Some(Did::new_owned("did:plc:abalone").unwrap()), 2776 ) 2777 .await; 2778 let search = RecordingSearchSink::default(); 2779 let issue: HydrantFrame = parse_frame(json!({ 2780 "id": 1, 2781 "type": "record", 2782 "record": { 2783 "live": false, 2784 "did": "did:plc:olaren", 2785 "rev": fresh_tid().as_str(), 2786 "collection": "sh.tangled.repo.issue", 2787 "rkey": "abcabcabcabcz", 2788 "action": "create", 2789 "record": { 2790 "$type": "sh.tangled.repo.issue", 2791 "createdAt": "2026-05-01T00:00:00Z", 2792 "title": "bug", 2793 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2794 } 2795 } 2796 })); 2797 handle_frame( 2798 issue, 2799 &store, 2800 &issue_states, 2801 &pull_statuses, 2802 &cov, 2803 &search, 2804 &NoopRecordStore, 2805 &resolver, 2806 &sys_clock(), 2807 now(), 2808 ) 2809 .await; 2810 let docs = search.docs.lock().await; 2811 assert_eq!(docs.len(), 1, "issue should produce one search doc"); 2812 assert_eq!( 2813 docs[0].repo, 2814 Some(Did::new_owned("did:plc:abalone").unwrap()), 2815 "search doc repo field must be resolved from the observed repo, not left as None", 2816 ); 2817 } 2818 2819 #[tokio::test] 2820 async fn delete_repo_record_evicts_resolver_cache() { 2821 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2822 let owner = Did::new_owned("did:plc:nel").unwrap(); 2823 let rkey = Rkey::new_owned("abcabcabcabcz").unwrap(); 2824 resolver 2825 .observe( 2826 owner.clone(), 2827 rkey.clone(), 2828 Some(Did::new_owned("did:plc:abalone").unwrap()), 2829 ) 2830 .await; 2831 assert!( 2832 resolver.cached_resolution(&owner, &rkey).await.is_some(), 2833 "observe must seed the cache", 2834 ); 2835 let delete: HydrantFrame = parse_frame(json!({ 2836 "id": 1, 2837 "type": "record", 2838 "record": { 2839 "live": false, 2840 "did": owner.as_ref(), 2841 "rev": fresh_tid().as_str(), 2842 "collection": "sh.tangled.repo", 2843 "rkey": rkey.as_ref(), 2844 "action": "delete" 2845 } 2846 })); 2847 handle_frame( 2848 delete, 2849 &store, 2850 &issue_states, 2851 &pull_statuses, 2852 &cov, 2853 &NoopSearchSink, 2854 &NoopRecordStore, 2855 &resolver, 2856 &sys_clock(), 2857 now(), 2858 ) 2859 .await; 2860 assert!( 2861 resolver.cached_resolution(&owner, &rkey).await.is_none(), 2862 "deleting the repo record must clear the resolver cache so future observes are not blocked by a stale Authoritative entry", 2863 ); 2864 } 2865 2866 #[tokio::test] 2867 async fn cancel_short_circuits_a_hung_ws_connect() { 2868 let _listener = tokio::net::TcpListener::bind("127.0.0.1:0") 2869 .await 2870 .expect("bind sink listener"); 2871 let port = _listener.local_addr().expect("local addr").port(); 2872 let cfg = 2873 IngestConfig::new(Url::parse(&format!("ws://127.0.0.1:{port}")).expect("hydrant url")); 2874 let cancel = CancellationToken::new(); 2875 let runtime = fresh_runtime(cancel.clone()); 2876 let task = tokio::spawn(async move { run(cfg, runtime).await }); 2877 tokio::time::sleep(Duration::from_millis(100)).await; 2878 cancel.cancel(); 2879 let outcome = tokio::time::timeout(Duration::from_secs(2), task) 2880 .await 2881 .expect("cancel must short-circuit the hung ws connect"); 2882 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}"); 2883 } 2884 2885 struct CloseOnConnectTransport { 2886 used: std::sync::Mutex<bool>, 2887 } 2888 impl bobbin_runtime::WsTransport for CloseOnConnectTransport { 2889 fn connect(&self, _url: Url) -> bobbin_runtime::WsConnectFuture { 2890 let mut used = self.used.lock().unwrap(); 2891 if *used { 2892 return Box::pin(async move { 2893 Err(NetworkError::Connect("only one connect allowed".to_owned())) 2894 }); 2895 } 2896 *used = true; 2897 Box::pin(async move { 2898 let mut q = std::collections::VecDeque::new(); 2899 q.push_back(Ok(WsMessage::Close { 2900 code: 1000, 2901 reason: "bye".to_owned(), 2902 })); 2903 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages: q }); 2904 struct NoopSink; 2905 impl bobbin_runtime::WsSink for NoopSink { 2906 fn send<'a>(&'a mut self, _m: WsMessage) -> bobbin_runtime::WsSendFuture<'a> { 2907 Box::pin(async move { Ok(()) }) 2908 } 2909 } 2910 let sink: Box<dyn bobbin_runtime::WsSink> = Box::new(NoopSink); 2911 Ok(bobbin_runtime::WsConn { sink, stream }) 2912 }) 2913 } 2914 } 2915 2916 #[tokio::test] 2917 async fn run_session_returns_after_remote_close_when_outer_cancel_unfired() { 2918 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap()); 2919 let cancel = CancellationToken::new(); 2920 let mut runtime = fresh_runtime(cancel.clone()); 2921 runtime.ws = Arc::new(CloseOnConnectTransport { 2922 used: std::sync::Mutex::new(false), 2923 }); 2924 2925 let res = tokio::time::timeout( 2926 Duration::from_secs(3), 2927 run_session(&cfg, HydrantCursor::new(0), &runtime), 2928 ) 2929 .await; 2930 assert!( 2931 res.is_ok(), 2932 "run_session must return after a remote Close even when outer cancel never fires - regression for a session-scoped task hanging on the parent token", 2933 ); 2934 } 2935 2936 struct HangingSink; 2937 impl bobbin_runtime::WsSink for HangingSink { 2938 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> { 2939 Box::pin(std::future::pending()) 2940 } 2941 } 2942 2943 #[tokio::test(start_paused = true)] 2944 async fn timed_send_surfaces_send_timeout_when_sink_pends_forever() { 2945 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(HangingSink); 2946 let task = 2947 tokio::spawn(async move { timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await }); 2948 tokio::time::advance(SEND_TIMEOUT + Duration::from_secs(1)).await; 2949 let result = task.await.expect("task panicked"); 2950 match result { 2951 Err(IngestError::SendTimeout(d)) => assert_eq!(d, SEND_TIMEOUT), 2952 other => panic!( 2953 "expected SendTimeout, got {other:?}. A bare ws_sink.send.await would hang forever on a half-dead socket and starve the writer's pong-deadline arm", 2954 ), 2955 } 2956 } 2957 2958 struct OkSink; 2959 impl bobbin_runtime::WsSink for OkSink { 2960 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> { 2961 Box::pin(async move { Ok(()) }) 2962 } 2963 } 2964 2965 #[tokio::test] 2966 async fn timed_send_returns_ok_when_sink_succeeds_promptly() { 2967 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(OkSink); 2968 let result = timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await; 2969 assert!(matches!(result, Ok(())), "got {result:?}"); 2970 } 2971 2972 #[test] 2973 fn classify_error_frame_with_id_dispatches_as_error_not_unknown_kind() { 2974 let text = r#"{"id":42,"type":"error","error":"ConsumerTooSlow","message":"slow"}"#; 2975 match classify_text_frame(text) { 2976 Err(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "slow"), 2977 other => panic!( 2978 "type=\"error\" must dispatch to the error path even when id is present, got: {other:?}" 2979 ), 2980 } 2981 } 2982 2983 #[tokio::test] 2984 async fn buffered_pipeline_preserves_cursor_order_under_resolve_latency_skew() { 2985 use bobbin_record_lru::RecordStore; 2986 use bobbin_types::record::RecordBody; 2987 use std::sync::Mutex; 2988 2989 let server = wiremock::MockServer::start().await; 2990 wiremock::Mock::given(wiremock::matchers::method("GET")) 2991 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 2992 .respond_with( 2993 wiremock::ResponseTemplate::new(404).set_delay(Duration::from_millis(150)), 2994 ) 2995 .mount(&server) 2996 .await; 2997 2998 let client = bobbin_slingshot_client::SlingshotClient::with_default_http( 2999 Url::parse(&server.uri()).unwrap(), 3000 ) 3001 .unwrap(); 3002 let clock: Arc<dyn Clock> = Arc::new(SystemClock::new()); 3003 let resolver = Arc::new(RepoIdResolver::with_slingshot( 3004 client, 3005 clock.clone(), 3006 RuntimeHasher::default(), 3007 )); 3008 3009 let owner = Did::new_owned("did:plc:nel").unwrap(); 3010 let abalone = Did::new_owned("did:plc:abalone").unwrap(); 3011 let fast_rkeys: [Rkey<DefaultStr>; 2] = [rkey("fastrkeyaa01"), rkey("fastrkeyaa02")]; 3012 let slow_rkeys: [Rkey<DefaultStr>; 2] = [rkey("slowrkeyaa01"), rkey("slowrkeyaa02")]; 3013 for r in &fast_rkeys { 3014 resolver 3015 .observe(owner.clone(), r.clone(), Some(abalone.clone())) 3016 .await; 3017 } 3018 3019 #[derive(Default)] 3020 struct Capturing { 3021 urls: Mutex<Vec<AtUri<DefaultStr>>>, 3022 } 3023 impl RecordStore for Capturing { 3024 fn get(&self, _uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> { 3025 None 3026 } 3027 fn put(&self, uri: AtUri<DefaultStr>, _body: Arc<RecordBody>) { 3028 self.urls.lock().unwrap().push(uri); 3029 } 3030 fn remove(&self, _uri: &AtUri<DefaultStr>) {} 3031 } 3032 let capturing = Arc::new(Capturing::default()); 3033 3034 let runtime: IngestRuntime<NoopSearchSink> = IngestRuntime { 3035 store: Arc::new(EdgeStore::new(RuntimeHasher::default())), 3036 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())), 3037 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())), 3038 coverage: Arc::new(CoverageWatch::new()), 3039 search: Arc::new(NoopSearchSink), 3040 records: capturing.clone() as Arc<dyn RecordStore>, 3041 resolver, 3042 clock, 3043 entropy: Arc::new(OsEntropy), 3044 ws: TungsteniteWs::shared(), 3045 cancel: CancellationToken::new(), 3046 disconnects: None, 3047 warming_shadow: None, 3048 warming_buffer: None, 3049 }; 3050 3051 let parallelism = 4usize; 3052 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(64); 3053 let pipeline_rt = runtime.clone(); 3054 let pipeline = tokio::spawn(async move { 3055 let prep_rt = pipeline_rt.clone(); 3056 let resolve_rt = pipeline_rt.clone(); 3057 let commit_rt = pipeline_rt; 3058 ReceiverStream::new(frame_rx) 3059 .then(move |frame| prep_stage(frame, prep_rt.clone())) 3060 .map(move |staged| resolve_stage(staged, resolve_rt.clone())) 3061 .buffered(parallelism) 3062 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism)) 3063 .await; 3064 }); 3065 3066 let mk = |id: u64, idx: u64, repo_rkey: &Rkey<DefaultStr>| -> HydrantFrame { 3067 parse_frame(json!({ 3068 "id": id, 3069 "type": "record", 3070 "record": { 3071 "live": false, 3072 "did": "did:plc:olaren", 3073 "rev": fresh_tid().as_str(), 3074 "collection": "sh.tangled.feed.star", 3075 "rkey": format!("starrkeya{idx:04}"), 3076 "action": "create", 3077 "cid": VALID_CID, 3078 "record": { 3079 "$type": "sh.tangled.feed.star", 3080 "createdAt": "2026-05-01T00:00:00Z", 3081 "subject": format!("at://did:plc:nel/sh.tangled.repo/{}", repo_rkey.as_ref()), 3082 } 3083 } 3084 })) 3085 }; 3086 3087 frame_tx.send(mk(1, 1, &slow_rkeys[0])).await.unwrap(); 3088 frame_tx.send(mk(2, 2, &fast_rkeys[0])).await.unwrap(); 3089 frame_tx.send(mk(3, 3, &slow_rkeys[1])).await.unwrap(); 3090 frame_tx.send(mk(4, 4, &fast_rkeys[1])).await.unwrap(); 3091 drop(frame_tx); 3092 3093 pipeline.await.unwrap(); 3094 3095 let captured = capturing.urls.lock().unwrap(); 3096 let rkeys: Vec<String> = captured 3097 .iter() 3098 .filter_map(|uri| uri.rkey().map(|r| r.as_ref().to_owned())) 3099 .collect(); 3100 assert_eq!( 3101 rkeys, 3102 vec![ 3103 "starrkeya0001".to_owned(), 3104 "starrkeya0002".to_owned(), 3105 "starrkeya0003".to_owned(), 3106 "starrkeya0004".to_owned(), 3107 ], 3108 "buffered(N) must preserve cursor order even when resolves complete out of order, with ~150ms slow vs cache-hit fast as the latency skew here", 3109 ); 3110 assert_eq!(runtime.coverage.snapshot().last_cursor().raw(), 4); 3111 } 3112}