Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1use std::collections::{HashSet, VecDeque}; 2use std::num::NonZeroUsize; 3use std::sync::Arc; 4use std::time::Duration; 5 6use bobbin_edge_index::{ 7 ApplyOutcome, Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, 8 PromotionSignal, PullStatusKind, StateIndex, apply_record_state, 9}; 10use bobbin_knot_ingest::{CapabilityGate, KnotRegistry}; 11use bobbin_record_lru::RecordStore; 12use bobbin_resolver::{NormalizeRepoRefs, decode_canon_or_upgrade_bytes, synthesize_created_at}; 13use bobbin_runtime::{ 14 Clock, Entropy, NetworkError, RuntimeHasher, UnixMicros, WsConn, WsMessage, WsStream, 15 WsTransport, 16}; 17use bobbin_types::edges::{Edge, ExtractError, Record}; 18use bobbin_types::ids::{RepoIdent, SubjectRef}; 19use bobbin_types::knot_acl::KnotHostKey; 20use bobbin_types::record::RecordBody; 21use bobbin_types::search::{SearchSink, SearchableRecord}; 22use bytes::Bytes; 23use futures::StreamExt; 24use jacquard_common::DefaultStr; 25use jacquard_common::types::did::Did; 26use jacquard_common::types::ident::AtIdentifier; 27use jacquard_common::types::nsid::Nsid; 28use jacquard_common::types::recordkey::Rkey; 29use jacquard_common::types::string::{AtStrError, AtUri, Cid}; 30use thiserror::Error; 31use tokio::time::Instant; 32use tokio_stream::wrappers::ReceiverStream; 33use tokio_util::sync::CancellationToken; 34use tracing::{debug, info, warn}; 35use url::Url; 36 37mod frame; 38mod resolver; 39mod shadow; 40mod warming; 41use frame::HydrantStreamErrorFrame; 42pub use frame::{FrameKind, HydrantFrame, RecordAction, RecordFrame}; 43pub use resolver::{RepoIdResolver, Resolution}; 44pub use shadow::{WarmingShadowBuffer, WarmingShadowSnapshot}; 45pub use warming::{ParkedUpsert, WarmingBuffer, WarmingBufferSnapshot}; 46 47const TANGLED_PREFIX: &str = "sh.tangled."; 48const RECONNECT_INITIAL_DELAY: Duration = Duration::from_millis(500); 49const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(30); 50const PING_INTERVAL: Duration = Duration::from_secs(20); 51const PONG_TIMEOUT: Duration = Duration::from_secs(15); 52const READY_SKEW: Duration = Duration::from_secs(60); 53const FRAME_CHANNEL_DEPTH: usize = 256; 54const CONTROL_CHANNEL_DEPTH: usize = 16; 55const READER_HOLD_LIMIT: usize = 64; 56const SEND_TIMEOUT: Duration = Duration::from_secs(10); 57const NORMAL_CLOSE: u16 = 1000; 58const METRICS_DUMP_INTERVAL: Duration = Duration::from_secs(10); 59const WARMING_FLUSH_PARALLELISM: usize = 64; 60pub const DEFAULT_INGEST_PARALLELISM: NonZeroUsize = match NonZeroUsize::new(16) { 61 Some(n) => n, 62 None => unreachable!(), 63}; 64 65#[derive(Clone, Debug)] 66pub struct IngestConfig { 67 pub hydrant_base: Url, 68 pub start_cursor: HydrantCursor, 69 pub parallelism: NonZeroUsize, 70} 71 72impl IngestConfig { 73 pub fn new(hydrant_base: Url) -> Self { 74 Self { 75 hydrant_base, 76 start_cursor: HydrantCursor::new(0), 77 parallelism: DEFAULT_INGEST_PARALLELISM, 78 } 79 } 80 81 fn stream_url(&self, cursor: HydrantCursor) -> Result<Url, IngestError> { 82 let mut url = self.hydrant_base.clone(); 83 match url.scheme() { 84 "http" => url 85 .set_scheme("ws") 86 .map_err(|_| IngestError::Url("set ws scheme"))?, 87 "https" => url 88 .set_scheme("wss") 89 .map_err(|_| IngestError::Url("set wss scheme"))?, 90 "ws" | "wss" => {} 91 other => return Err(IngestError::UnknownScheme(other.to_owned())), 92 } 93 url.set_path("/stream"); 94 url.query_pairs_mut() 95 .clear() 96 .append_pair("cursor", &cursor.raw().to_string()); 97 Ok(url) 98 } 99} 100 101#[derive(Debug, Error)] 102pub enum IngestError { 103 #[error("invalid hydrant url: {0}")] 104 Url(&'static str), 105 #[error("unsupported url scheme: {0}")] 106 UnknownScheme(String), 107 #[error("network: {0}")] 108 Network(#[from] NetworkError), 109 #[error("frame decode: {0}")] 110 Decode(#[from] serde_json::Error), 111 #[error("invalid at-uri synthesized from frame: {0}")] 112 InvalidAtUri(#[from] AtStrError), 113 #[error("record extraction: {0}")] 114 Extract(#[from] ExtractError), 115 #[error("hydrant did not respond to ping within {0:?}")] 116 PongTimeout(Duration), 117 #[error("websocket send blocked for at least {0:?}, treating link as dead")] 118 SendTimeout(Duration), 119 #[error("hydrant disconnected because bobbin's stream consumer fell behind: {message}")] 120 ConsumerTooSlow { message: String }, 121 #[error("hydrant signaled stream error {code}: {message}")] 122 HydrantStream { code: String, message: String }, 123} 124 125#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] 126pub enum DisconnectKind { 127 Url, 128 UnknownScheme, 129 Network, 130 Decode, 131 InvalidAtUri, 132 Extract, 133 PongTimeout, 134 SendTimeout, 135 ConsumerTooSlow, 136 HydrantStream, 137} 138 139impl DisconnectKind { 140 pub fn from_error(err: &IngestError) -> Self { 141 match err { 142 IngestError::Url(_) => Self::Url, 143 IngestError::UnknownScheme(_) => Self::UnknownScheme, 144 IngestError::Network(_) => Self::Network, 145 IngestError::Decode(_) => Self::Decode, 146 IngestError::InvalidAtUri(_) => Self::InvalidAtUri, 147 IngestError::Extract(_) => Self::Extract, 148 IngestError::PongTimeout(_) => Self::PongTimeout, 149 IngestError::SendTimeout(_) => Self::SendTimeout, 150 IngestError::ConsumerTooSlow { .. } => Self::ConsumerTooSlow, 151 IngestError::HydrantStream { .. } => Self::HydrantStream, 152 } 153 } 154} 155 156#[derive(Clone, Debug, Eq, PartialEq)] 157pub struct DisconnectSnapshot { 158 pub kind: DisconnectKind, 159 pub message: String, 160 pub at_unix_micros: UnixMicros, 161 pub last_cursor: HydrantCursor, 162} 163 164#[derive(Default)] 165pub struct DisconnectSink { 166 last: std::sync::Mutex<Option<DisconnectSnapshot>>, 167 count: std::sync::atomic::AtomicU64, 168} 169 170impl DisconnectSink { 171 pub fn new() -> Self { 172 Self::default() 173 } 174 175 pub fn record(&self, snap: DisconnectSnapshot) { 176 *self.last.lock().expect("disconnect sink mutex poisoned") = Some(snap); 177 self.count 178 .fetch_add(1, std::sync::atomic::Ordering::Relaxed); 179 } 180 181 pub fn snapshot(&self) -> Option<DisconnectSnapshot> { 182 self.last 183 .lock() 184 .expect("disconnect sink mutex poisoned") 185 .clone() 186 } 187 188 pub fn count(&self) -> u64 { 189 self.count.load(std::sync::atomic::Ordering::Relaxed) 190 } 191} 192 193#[derive(Clone, Copy, Debug, Eq, PartialEq)] 194enum SessionOutcome { 195 Progressed, 196 Empty, 197} 198 199#[derive(Debug)] 200struct SessionEnd { 201 outcome: SessionOutcome, 202 error: Option<IngestError>, 203} 204 205pub struct IngestRuntime<S: SearchSink + 'static> { 206 pub store: Arc<EdgeStore>, 207 pub issue_states: Arc<StateIndex<IssueStateKind>>, 208 pub pull_statuses: Arc<StateIndex<PullStatusKind>>, 209 pub coverage: Arc<CoverageWatch>, 210 pub search: Arc<S>, 211 pub records: Arc<dyn RecordStore>, 212 pub resolver: Arc<RepoIdResolver>, 213 pub clock: Arc<dyn Clock>, 214 pub entropy: Arc<dyn Entropy>, 215 pub ws: Arc<dyn WsTransport>, 216 pub cancel: CancellationToken, 217 pub disconnects: Option<Arc<DisconnectSink>>, 218 pub warming_shadow: Option<Arc<WarmingShadowBuffer>>, 219 pub warming_buffer: Option<Arc<WarmingBuffer>>, 220 pub knot_registry: Option<Arc<KnotRegistry>>, 221 pub knot_gate: Option<Arc<CapabilityGate>>, 222} 223 224impl<S: SearchSink + 'static> Clone for IngestRuntime<S> { 225 fn clone(&self) -> Self { 226 Self { 227 store: self.store.clone(), 228 issue_states: self.issue_states.clone(), 229 pull_statuses: self.pull_statuses.clone(), 230 coverage: self.coverage.clone(), 231 search: self.search.clone(), 232 records: self.records.clone(), 233 resolver: self.resolver.clone(), 234 clock: self.clock.clone(), 235 entropy: self.entropy.clone(), 236 ws: self.ws.clone(), 237 cancel: self.cancel.clone(), 238 disconnects: self.disconnects.clone(), 239 warming_shadow: self.warming_shadow.clone(), 240 warming_buffer: self.warming_buffer.clone(), 241 knot_registry: self.knot_registry.clone(), 242 knot_gate: self.knot_gate.clone(), 243 } 244 } 245} 246 247impl<S: SearchSink + 'static> IngestRuntime<S> { 248 fn pipeline_ctx(&self) -> PipelineCtx<'_, S> { 249 PipelineCtx { 250 resolver: &self.resolver, 251 store: &self.store, 252 issue_states: &self.issue_states, 253 pull_statuses: &self.pull_statuses, 254 coverage: &self.coverage, 255 records: &*self.records, 256 search: &self.search, 257 shadow: self.warming_shadow.as_deref(), 258 buffer: self.warming_buffer.as_deref(), 259 knot_registry: self.knot_registry.as_deref(), 260 knot_gate: self.knot_gate.as_deref(), 261 } 262 } 263} 264 265struct PipelineCtx<'a, S: SearchSink + 'static> { 266 resolver: &'a RepoIdResolver, 267 store: &'a EdgeStore, 268 issue_states: &'a StateIndex<IssueStateKind>, 269 pull_statuses: &'a StateIndex<PullStatusKind>, 270 coverage: &'a CoverageWatch, 271 records: &'a dyn RecordStore, 272 search: &'a S, 273 shadow: Option<&'a WarmingShadowBuffer>, 274 buffer: Option<&'a WarmingBuffer>, 275 knot_registry: Option<&'a KnotRegistry>, 276 knot_gate: Option<&'a CapabilityGate>, 277} 278 279pub async fn run<S: SearchSink + 'static>( 280 config: IngestConfig, 281 runtime: IngestRuntime<S>, 282) -> Result<(), IngestError> { 283 let metrics_dumper = spawn_metrics_dumper(&runtime); 284 let idle_promoter = spawn_idle_promoter(&runtime); 285 let warming_flusher = spawn_warming_flusher(&runtime); 286 let result = run_inner(config, &runtime).await; 287 if let Err(join) = metrics_dumper.await { 288 warn!(?join, "metrics dumper task panicked"); 289 } 290 if let Err(join) = idle_promoter.await { 291 warn!(?join, "idle promoter task panicked"); 292 } 293 if let Some(handle) = warming_flusher 294 && let Err(join) = handle.await 295 { 296 warn!(?join, "warming flusher task panicked"); 297 } 298 result 299} 300 301async fn run_inner<S: SearchSink + 'static>( 302 config: IngestConfig, 303 runtime: &IngestRuntime<S>, 304) -> Result<(), IngestError> { 305 let mut backoff = RECONNECT_INITIAL_DELAY; 306 loop { 307 let cursor = next_connect_cursor(runtime.coverage.snapshot(), config.start_cursor); 308 let SessionEnd { outcome, error } = run_session(&config, cursor, runtime).await; 309 if runtime.cancel.is_cancelled() { 310 info!( 311 last_cursor = runtime.coverage.snapshot().last_cursor().raw(), 312 "ingest stopped after shutdown signal" 313 ); 314 return Ok(()); 315 } 316 match (outcome, &error) { 317 (SessionOutcome::Progressed, None) => { 318 info!("hydrant stream closed after delivering frames, reconnecting") 319 } 320 (SessionOutcome::Empty, None) => { 321 warn!("hydrant stream closed without delivering frames") 322 } 323 (_, Some(err)) => warn!(?err, "hydrant stream errored"), 324 } 325 if let (Some(sink), Some(err)) = (runtime.disconnects.as_ref(), error.as_ref()) { 326 sink.record(DisconnectSnapshot { 327 kind: DisconnectKind::from_error(err), 328 message: err.to_string(), 329 at_unix_micros: runtime.clock.now_unix_micros(), 330 last_cursor: runtime.coverage.snapshot().last_cursor(), 331 }); 332 } 333 let made_progress = matches!(outcome, SessionOutcome::Progressed); 334 if made_progress { 335 backoff = RECONNECT_INITIAL_DELAY; 336 } else { 337 tokio::select! { 338 biased; 339 _ = runtime.cancel.cancelled() => return Ok(()), 340 _ = runtime.clock.sleep(jittered(backoff, &*runtime.entropy)) => {} 341 } 342 backoff = (backoff * 2).min(RECONNECT_MAX_DELAY); 343 } 344 } 345} 346 347fn spawn_warming_flusher<S: SearchSink + 'static>( 348 runtime: &IngestRuntime<S>, 349) -> Option<tokio::task::JoinHandle<()>> { 350 runtime.warming_buffer.as_ref()?; 351 let rt = runtime.clone(); 352 Some(tokio::spawn(async move { 353 let buffer = rt 354 .warming_buffer 355 .as_deref() 356 .expect("warming flusher only spawns when buffer is set"); 357 let mut rx = rt.coverage.subscribe(); 358 let reason = loop { 359 if rx.borrow_and_update().is_ready() { 360 break FlushReason::Ready; 361 } 362 tokio::select! { 363 biased; 364 _ = rt.cancel.cancelled() => break FlushReason::Cancelled, 365 res = rx.changed() => match res { 366 Ok(()) => continue, 367 Err(_) => break FlushReason::CoverageDropped, 368 }, 369 } 370 }; 371 flush_warming_buffer(&rt, buffer, reason).await; 372 })) 373} 374 375#[derive(Clone, Copy, Debug, Eq, PartialEq)] 376enum FlushReason { 377 Ready, 378 Cancelled, 379 CoverageDropped, 380} 381 382impl FlushReason { 383 fn as_str(self) -> &'static str { 384 match self { 385 FlushReason::Ready => "ready", 386 FlushReason::Cancelled => "cancelled", 387 FlushReason::CoverageDropped => "coverage_dropped", 388 } 389 } 390} 391 392async fn flush_warming_buffer<S: SearchSink + 'static>( 393 runtime: &IngestRuntime<S>, 394 buffer: &WarmingBuffer, 395 reason: FlushReason, 396) { 397 let drained = buffer.drain_for_promote().await; 398 if drained.is_empty() { 399 return; 400 } 401 if reason != FlushReason::Ready { 402 info!( 403 target: "bobbin_ingest::warming", 404 abandoned_entries = drained.len(), 405 reason = reason.as_str(), 406 "abandoning parked items on non-ready flush", 407 ); 408 return; 409 } 410 let hasher = buffer.hasher().clone(); 411 let unique: HashSet<RepoIdent, RuntimeHasher> = drained 412 .iter() 413 .flat_map(|(_, deps)| deps.iter().cloned()) 414 .fold(HashSet::with_hasher(hasher), |mut acc, dep| { 415 acc.insert(dep); 416 acc 417 }); 418 if !unique.is_empty() { 419 let resolver = runtime.resolver.clone(); 420 let _: Vec<()> = futures::stream::iter(unique) 421 .map(|key| { 422 let resolver = resolver.clone(); 423 async move { 424 let _ = resolver.resolve(&key.owner, &key.rkey).await; 425 } 426 }) 427 .buffer_unordered(WARMING_FLUSH_PARALLELISM) 428 .collect() 429 .await; 430 } 431 let upserts: Vec<ParkedUpsert> = drained.into_iter().map(|(u, _)| u).collect(); 432 let count = upserts.len(); 433 let ctx = runtime.pipeline_ctx(); 434 finalize_drained(&ctx, upserts).await; 435 info!( 436 target: "bobbin_ingest::warming", 437 flushed_entries = count, 438 reason = reason.as_str(), 439 "drained warming buffer", 440 ); 441} 442 443const IDLE_PROMOTE_WINDOW: Duration = Duration::from_secs(15); 444const IDLE_PROMOTE_MIN_EVENTS: u64 = 256; 445 446fn spawn_idle_promoter<S: SearchSink + 'static>( 447 runtime: &IngestRuntime<S>, 448) -> tokio::task::JoinHandle<()> { 449 let rt = runtime.clone(); 450 tokio::spawn(async move { 451 let mut prev = rt.coverage.snapshot().events_processed(); 452 loop { 453 tokio::select! { 454 biased; 455 _ = rt.cancel.cancelled() => return, 456 _ = rt.clock.sleep(IDLE_PROMOTE_WINDOW) => {} 457 } 458 let snap = rt.coverage.snapshot(); 459 if snap.is_ready() { 460 return; 461 } 462 let processed = snap.events_processed(); 463 if processed >= IDLE_PROMOTE_MIN_EVENTS && processed == prev { 464 rt.coverage.update(|c| c.force_ready()); 465 info!( 466 target: "bobbin_ingest::coverage", 467 events_processed = processed, 468 last_cursor = snap.last_cursor().raw(), 469 "stream idle, promoting coverage to ready", 470 ); 471 return; 472 } 473 prev = processed; 474 } 475 }) 476} 477 478fn spawn_metrics_dumper<S: SearchSink + 'static>( 479 runtime: &IngestRuntime<S>, 480) -> tokio::task::JoinHandle<()> { 481 let rt = runtime.clone(); 482 tokio::spawn(async move { 483 loop { 484 tokio::select! { 485 biased; 486 _ = rt.cancel.cancelled() => break, 487 _ = rt.clock.sleep(METRICS_DUMP_INTERVAL) => { 488 let s = rt.resolver.stats(); 489 info!( 490 target: "bobbin_ingest::metrics", 491 resolver_hits = s.hits, 492 resolver_misses_mapped = s.misses_mapped, 493 resolver_misses_no_repo_did = s.misses_no_repo_did, 494 resolver_misses_unresolvable = s.misses_unresolvable, 495 resolver_misses_transient = s.misses_transient, 496 resolver_misses_no_client = s.misses_no_client, 497 resolver_miss_latency_micros_avg = s.miss_latency_micros_avg().unwrap_or(0), 498 resolver_miss_latency_micros_max = s.miss_latency_micros_max, 499 resolver_total = s.total(), 500 "resolver stats", 501 ); 502 } 503 } 504 } 505 }) 506} 507 508fn next_connect_cursor(snapshot: Coverage, start: HydrantCursor) -> HydrantCursor { 509 if snapshot.events_processed() == 0 { 510 start 511 } else { 512 HydrantCursor::new(snapshot.last_cursor().raw().saturating_add(1)) 513 } 514} 515 516fn jittered(base: Duration, entropy: &dyn Entropy) -> Duration { 517 let base_ms = u64::try_from(base.as_millis()).unwrap_or(u64::MAX); 518 let cap_ms = (base_ms / 4).max(1); 519 base + Duration::from_millis(entropy.next_u64() % cap_ms) 520} 521 522async fn run_session<S: SearchSink + 'static>( 523 config: &IngestConfig, 524 cursor: HydrantCursor, 525 runtime: &IngestRuntime<S>, 526) -> SessionEnd { 527 let url = match config.stream_url(cursor) { 528 Ok(u) => u, 529 Err(e) => { 530 return SessionEnd { 531 outcome: SessionOutcome::Empty, 532 error: Some(e), 533 }; 534 } 535 }; 536 info!(%url, "connecting to hydrant /stream"); 537 let connect = tokio::select! { 538 biased; 539 _ = runtime.cancel.cancelled() => { 540 return SessionEnd { outcome: SessionOutcome::Empty, error: None }; 541 } 542 res = runtime.ws.connect(url) => res, 543 }; 544 let WsConn { 545 sink: mut ws_sink, 546 stream: ws_stream, 547 } = match connect { 548 Ok(c) => c, 549 Err(e) => { 550 return SessionEnd { 551 outcome: SessionOutcome::Empty, 552 error: Some(IngestError::Network(e)), 553 }; 554 } 555 }; 556 557 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH); 558 let parallelism = config.parallelism.get(); 559 let processor_runtime = runtime.clone(); 560 let processor = tokio::spawn(async move { 561 let cancel = processor_runtime.cancel.clone(); 562 let prep_rt = processor_runtime.clone(); 563 let resolve_rt = processor_runtime.clone(); 564 let commit_rt = processor_runtime; 565 566 let pipeline = ReceiverStream::new(frame_rx) 567 .map(move |frame| prep_stage(frame, prep_rt.clone())) 568 .buffered(parallelism) 569 .map(move |staged| resolve_stage(staged, resolve_rt.clone())) 570 .buffered(parallelism) 571 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism)); 572 573 tokio::select! { 574 biased; 575 _ = cancel.cancelled() => {}, 576 _ = pipeline => {}, 577 } 578 }); 579 580 let (control_tx, mut control_rx) = tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 581 let session_cancel = runtime.cancel.child_token(); 582 let reader_cancel = session_cancel.clone(); 583 let reader = tokio::spawn(reader_loop(ws_stream, frame_tx, control_tx, reader_cancel)); 584 585 let mut next_ping = runtime.clock.now_instant() + PING_INTERVAL; 586 let mut pong_deadline: Option<Instant> = None; 587 588 let writer_error: Option<IngestError> = loop { 589 tokio::select! { 590 biased; 591 _ = runtime.cancel.cancelled() => { 592 let _ = timed_send( 593 &mut ws_sink, 594 WsMessage::Close { code: NORMAL_CLOSE, reason: "bobbin shutdown".to_owned() }, 595 ).await; 596 break None; 597 } 598 _ = runtime.clock.sleep_until(next_ping) => { 599 next_ping = runtime.clock.now_instant() + PING_INTERVAL; 600 if pong_deadline.is_none() { 601 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Ping(Bytes::new())).await { 602 break Some(e); 603 } 604 pong_deadline = Some(runtime.clock.now_instant() + PONG_TIMEOUT); 605 } 606 } 607 _ = wait_until(pong_deadline, runtime.clock.as_ref()) => { 608 break Some(IngestError::PongTimeout(PONG_TIMEOUT)); 609 } 610 evt = control_rx.recv() => { 611 let Some(evt) = evt else { break None; }; 612 match evt { 613 WsEvent::IncomingPing(payload) => { 614 if let Err(e) = timed_send(&mut ws_sink, WsMessage::Pong(payload)).await { 615 break Some(e); 616 } 617 } 618 WsEvent::IncomingPong => { 619 pong_deadline = None; 620 } 621 } 622 } 623 } 624 }; 625 626 session_cancel.cancel(); 627 drop(ws_sink); 628 drop(control_rx); 629 630 let reader_end = reader.await.unwrap_or(SessionEnd { 631 outcome: SessionOutcome::Empty, 632 error: None, 633 }); 634 if let Err(join) = processor.await { 635 warn!(?join, "frame processor task panicked"); 636 } 637 SessionEnd { 638 outcome: reader_end.outcome, 639 error: writer_error.or(reader_end.error), 640 } 641} 642 643async fn timed_send( 644 sink: &mut Box<dyn bobbin_runtime::WsSink>, 645 msg: WsMessage, 646) -> Result<(), IngestError> { 647 match tokio::time::timeout(SEND_TIMEOUT, sink.send(msg)).await { 648 Ok(Ok(())) => Ok(()), 649 Ok(Err(e)) => Err(IngestError::Network(e)), 650 Err(_elapsed) => Err(IngestError::SendTimeout(SEND_TIMEOUT)), 651 } 652} 653 654async fn reader_loop( 655 mut ws_stream: Box<dyn WsStream>, 656 frame_tx: tokio::sync::mpsc::Sender<HydrantFrame>, 657 control_tx: tokio::sync::mpsc::Sender<WsEvent>, 658 cancel: CancellationToken, 659) -> SessionEnd { 660 let mut outcome = SessionOutcome::Empty; 661 let mut held: VecDeque<HydrantFrame> = VecDeque::new(); 662 663 let error: Option<IngestError> = loop { 664 let step = if held.is_empty() { 665 tokio::select! { 666 biased; 667 _ = cancel.cancelled() => ReaderStep::Cancelled, 668 msg = ws_stream.next() => ReaderStep::WsMessage(msg), 669 } 670 } else if held.len() < READER_HOLD_LIMIT { 671 tokio::select! { 672 biased; 673 _ = cancel.cancelled() => ReaderStep::Cancelled, 674 permit_result = frame_tx.reserve() => match permit_result { 675 Ok(permit) => { 676 let frame = held 677 .pop_front() 678 .expect("non-empty held when reserve succeeds"); 679 permit.send(frame); 680 ReaderStep::Sent 681 } 682 Err(_) => ReaderStep::FrameSinkClosed, 683 }, 684 msg = ws_stream.next() => ReaderStep::WsMessage(msg), 685 } 686 } else { 687 tokio::select! { 688 biased; 689 _ = cancel.cancelled() => ReaderStep::Cancelled, 690 permit_result = frame_tx.reserve() => match permit_result { 691 Ok(permit) => { 692 let frame = held 693 .pop_front() 694 .expect("held at limit when reserve succeeds"); 695 permit.send(frame); 696 ReaderStep::Sent 697 } 698 Err(_) => ReaderStep::FrameSinkClosed, 699 }, 700 } 701 }; 702 703 match step { 704 ReaderStep::Cancelled => break None, 705 ReaderStep::FrameSinkClosed => break None, 706 ReaderStep::Sent => { 707 outcome = SessionOutcome::Progressed; 708 } 709 ReaderStep::WsMessage(msg) => { 710 let Some(msg) = msg else { 711 break None; 712 }; 713 let parsed = match msg { 714 Ok(m) => m, 715 Err(e) => break Some(IngestError::Network(e)), 716 }; 717 match parsed { 718 WsMessage::Text(text) => { 719 let frame = match classify_text_frame(&text) { 720 Ok(f) => f, 721 Err(e) => break Some(e), 722 }; 723 held.push_back(frame); 724 } 725 WsMessage::Binary(_) => { 726 debug!("hydrant sent unexpected binary frame, ignoring"); 727 } 728 WsMessage::Ping(payload) => { 729 if control_tx 730 .send(WsEvent::IncomingPing(payload)) 731 .await 732 .is_err() 733 { 734 break None; 735 } 736 } 737 WsMessage::Pong(_) => { 738 if control_tx.send(WsEvent::IncomingPong).await.is_err() { 739 break None; 740 } 741 } 742 WsMessage::Close { code, reason } => { 743 debug!(code, %reason, "hydrant closed stream"); 744 break None; 745 } 746 } 747 } 748 } 749 }; 750 SessionEnd { outcome, error } 751} 752 753enum ReaderStep { 754 Cancelled, 755 FrameSinkClosed, 756 Sent, 757 WsMessage(Option<Result<WsMessage, NetworkError>>), 758} 759 760fn classify_text_frame(text: &str) -> Result<HydrantFrame, IngestError> { 761 #[derive(serde::Deserialize)] 762 struct PeekType<'a> { 763 #[serde(rename = "type", borrow)] 764 kind: Option<std::borrow::Cow<'a, str>>, 765 } 766 let is_error_frame = serde_json::from_str::<PeekType>(text) 767 .ok() 768 .and_then(|p| p.kind) 769 .as_deref() 770 == Some("error"); 771 if is_error_frame { 772 return match serde_json::from_str::<HydrantStreamErrorFrame>(text) { 773 Ok(err_frame) => Err(classify_hydrant_error(err_frame)), 774 Err(decode_err) => Err(IngestError::Decode(decode_err)), 775 }; 776 } 777 serde_json::from_str::<HydrantFrame>(text).map_err(IngestError::Decode) 778} 779 780fn classify_hydrant_error(frame: HydrantStreamErrorFrame) -> IngestError { 781 let HydrantStreamErrorFrame { error, message } = frame; 782 let message = message.unwrap_or_default(); 783 match error.as_str() { 784 "ConsumerTooSlow" => IngestError::ConsumerTooSlow { message }, 785 _ => IngestError::HydrantStream { 786 code: error, 787 message, 788 }, 789 } 790} 791 792#[derive(Debug)] 793enum WsEvent { 794 IncomingPing(Bytes), 795 IncomingPong, 796} 797 798async fn wait_until(deadline: Option<Instant>, clock: &dyn Clock) { 799 match deadline { 800 Some(d) => clock.sleep_until(d).await, 801 None => std::future::pending::<()>().await, 802 } 803} 804 805#[derive(Clone, Copy, Debug, Eq, PartialEq)] 806enum Regime { 807 Replay, 808 Live, 809 NonRecord, 810} 811 812impl Regime { 813 fn as_str(self) -> &'static str { 814 match self { 815 Self::Replay => "replay", 816 Self::Live => "live", 817 Self::NonRecord => "non_record", 818 } 819 } 820} 821 822struct Pending { 823 cursor: HydrantCursor, 824 signal: PromotionSignal, 825 regime: Regime, 826 op: PendingOp, 827} 828 829enum PendingOp { 830 Noop, 831 ClearCache { 832 source: AtUri<DefaultStr>, 833 }, 834 Upsert { 835 source: AtUri<DefaultStr>, 836 nsid: Nsid<DefaultStr>, 837 parsed: Box<Record>, 838 bytes: Bytes, 839 cid: Option<Cid<DefaultStr>>, 840 edges: Vec<Edge>, 841 }, 842 Parked { 843 nsid: Nsid<DefaultStr>, 844 }, 845 Delete { 846 source: AtUri<DefaultStr>, 847 nsid: Nsid<DefaultStr>, 848 }, 849} 850 851struct Prepared { 852 pending: Pending, 853 prepare_start: Instant, 854 prepare_end: Instant, 855} 856 857struct Resolved { 858 pending: Pending, 859 prepare_start: Instant, 860 prepare_end: Instant, 861 resolve_start: Instant, 862 resolve_end: Instant, 863} 864 865fn pending_nsid(op: &PendingOp) -> Option<&Nsid<DefaultStr>> { 866 match op { 867 PendingOp::Upsert { nsid, .. } => Some(nsid), 868 PendingOp::Delete { nsid, .. } => Some(nsid), 869 PendingOp::Parked { nsid, .. } => Some(nsid), 870 PendingOp::Noop | PendingOp::ClearCache { .. } => None, 871 } 872} 873 874fn pending_edge_count(op: &PendingOp) -> u64 { 875 match op { 876 PendingOp::Upsert { edges, .. } => edges.len() as u64, 877 _ => 0, 878 } 879} 880 881async fn prep_stage<S: SearchSink + 'static>( 882 frame: HydrantFrame, 883 rt: IngestRuntime<S>, 884) -> Prepared { 885 let now = rt.clock.now_unix_micros(); 886 let prepare_start = rt.clock.now_instant(); 887 let ctx = rt.pipeline_ctx(); 888 let pending = prepare_frame(frame, &ctx, now).await; 889 let prepare_end = rt.clock.now_instant(); 890 Prepared { 891 pending, 892 prepare_start, 893 prepare_end, 894 } 895} 896 897async fn resolve_stage<S: SearchSink + 'static>( 898 staged: Prepared, 899 rt: IngestRuntime<S>, 900) -> Resolved { 901 let resolve_start = rt.clock.now_instant(); 902 let ctx = rt.pipeline_ctx(); 903 let pending = resolve_pending(staged.pending, &ctx).await; 904 let resolve_end = rt.clock.now_instant(); 905 Resolved { 906 pending, 907 prepare_start: staged.prepare_start, 908 prepare_end: staged.prepare_end, 909 resolve_start, 910 resolve_end, 911 } 912} 913 914async fn commit_stage<S: SearchSink + 'static>( 915 staged: Resolved, 916 rt: IngestRuntime<S>, 917 parallelism: usize, 918) { 919 let nsid = pending_nsid(&staged.pending.op).cloned(); 920 let edge_count = pending_edge_count(&staged.pending.op); 921 let regime = staged.pending.regime; 922 let cursor = staged.pending.cursor.raw(); 923 let Resolved { 924 pending, 925 prepare_start, 926 prepare_end, 927 resolve_start, 928 resolve_end, 929 } = staged; 930 let commit_start = rt.clock.now_instant(); 931 commit_pending( 932 pending, 933 &rt.store, 934 &rt.issue_states, 935 &rt.pull_statuses, 936 &rt.coverage, 937 &*rt.search, 938 &*rt.records, 939 &rt.resolver, 940 ) 941 .await; 942 let commit_end = rt.clock.now_instant(); 943 tracing::trace!( 944 target: "bobbin_ingest::stage", 945 cursor, 946 regime = regime.as_str(), 947 nsid = %nsid.as_ref().map(Nsid::as_str).unwrap_or(""), 948 edge_count, 949 prepare_us = prepare_end.duration_since(prepare_start).as_micros() as u64, 950 queue_resolve_wait_us = resolve_start.duration_since(prepare_end).as_micros() as u64, 951 resolve_us = resolve_end.duration_since(resolve_start).as_micros() as u64, 952 queue_commit_wait_us = commit_start.duration_since(resolve_end).as_micros() as u64, 953 commit_us = commit_end.duration_since(commit_start).as_micros() as u64, 954 total_us = commit_end.duration_since(prepare_start).as_micros() as u64, 955 parallelism, 956 "pipeline stage timings", 957 ); 958} 959 960async fn prepare_frame<S: SearchSink + 'static>( 961 frame: HydrantFrame, 962 ctx: &PipelineCtx<'_, S>, 963 now: UnixMicros, 964) -> Pending { 965 let cursor = HydrantCursor::new(frame.id); 966 let signal = promotion_signal(frame.record.as_ref(), now); 967 let regime = match frame.record.as_ref() { 968 Some(r) if r.live => Regime::Live, 969 Some(_) => Regime::Replay, 970 None => Regime::NonRecord, 971 }; 972 let op = match frame.kind { 973 FrameKind::Record => prepare_record(frame.record, ctx).await, 974 FrameKind::Identity | FrameKind::Account => PendingOp::Noop, 975 FrameKind::Other => { 976 debug!(id = frame.id, "ignoring unknown hydrant frame kind"); 977 PendingOp::Noop 978 } 979 }; 980 Pending { 981 cursor, 982 signal, 983 regime, 984 op, 985 } 986} 987 988async fn prepare_record<S: SearchSink + 'static>( 989 record: Option<RecordFrame>, 990 ctx: &PipelineCtx<'_, S>, 991) -> PendingOp { 992 let Some(record) = record else { 993 debug!("record-typed frame missing payload, skipping"); 994 return PendingOp::Noop; 995 }; 996 if !record.collection.as_ref().starts_with(TANGLED_PREFIX) { 997 return PendingOp::Noop; 998 } 999 let nsid = record.collection.clone(); 1000 let source = match build_source_uri(&record) { 1001 Ok(s) => s, 1002 Err(e) => { 1003 warn!(?e, "invalid frame source, dropping record"); 1004 return PendingOp::Noop; 1005 } 1006 }; 1007 match record.action { 1008 RecordAction::Create | RecordAction::Update => { 1009 evict_from_buffer(ctx.buffer, &source).await; 1010 let Some(raw) = record.record else { 1011 debug!(collection = %nsid, "create/update missing record body, clearing cache"); 1012 return PendingOp::ClearCache { source }; 1013 }; 1014 let raw_bytes = Bytes::copy_from_slice(raw.get().as_bytes()); 1015 let wire_bytes = match fallback_rfc3339(&record.rkey, &record.rev) 1016 .and_then(|fallback| synthesize_created_at(&raw_bytes, &fallback)) 1017 { 1018 Some(patched) => Bytes::from(patched), 1019 None => raw_bytes, 1020 }; 1021 let (parsed, bytes) = match decode_canon_or_upgrade_bytes( 1022 &record.collection, 1023 &wire_bytes, 1024 ctx.resolver, 1025 ) 1026 .await 1027 { 1028 Ok((parsed, canon_bytes)) => { 1029 let bytes = match canon_bytes { 1030 std::borrow::Cow::Borrowed(_) => wire_bytes, 1031 std::borrow::Cow::Owned(v) => Bytes::from(v), 1032 }; 1033 (parsed, bytes) 1034 } 1035 Err(ExtractError::UnknownCollection(name)) => { 1036 debug!(collection = %name, "unknown sh.tangled.* collection, clearing cache"); 1037 return PendingOp::ClearCache { source }; 1038 } 1039 Err(e) => { 1040 warn!(?e, collection = %record.collection, "record decode failed, clearing cache"); 1041 return PendingOp::ClearCache { source }; 1042 } 1043 }; 1044 if let Record::Repo(repo) = &parsed { 1045 if let Some(shadow) = ctx.shadow { 1046 shadow.note_observed(&record.did, &record.rkey).await; 1047 } 1048 let superseded = ctx 1049 .resolver 1050 .observe( 1051 record.did.clone(), 1052 record.rkey.clone(), 1053 repo.repo_did.clone(), 1054 ) 1055 .await; 1056 if let Some(prior) = superseded { 1057 let prior_uri = format!( 1058 "at://{}/sh.tangled.repo/{}", 1059 prior.owner.as_ref(), 1060 prior.rkey.as_ref(), 1061 ); 1062 if let Ok(prior_at_uri) = AtUri::<DefaultStr>::new_owned(&prior_uri) { 1063 evict_from_buffer(ctx.buffer, &prior_at_uri).await; 1064 ctx.store.remove_source(&prior_at_uri); 1065 ctx.records.remove(&prior_at_uri); 1066 ctx.search.remove(&prior_at_uri).await; 1067 } 1068 } 1069 if let Some(buffer) = ctx.buffer { 1070 let drained = buffer.take_observed(&record.did, &record.rkey).await; 1071 if !drained.is_empty() { 1072 finalize_drained(ctx, drained).await; 1073 } 1074 } 1075 if let Some(registry) = ctx.knot_registry { 1076 let host = KnotHostKey::new(repo.knot.as_ref()); 1077 match repo.repo_did.clone() { 1078 Some(repo_did) => registry.observe_repo(&host, repo_did), 1079 None => registry.observe_host(&host), 1080 } 1081 } 1082 } 1083 match acl_disposition(&parsed, ctx.knot_gate, ctx.knot_registry) { 1084 AclDisposition::NativeSkip => { 1085 if let Some(registry) = ctx.knot_registry { 1086 registry.forget_legacy_member(&source); 1087 } 1088 return PendingOp::Delete { source, nsid }; 1089 } 1090 AclDisposition::LegacyMember { host } => { 1091 if let Some(registry) = ctx.knot_registry { 1092 registry.observe_host(&host); 1093 registry.note_legacy_member(source.clone(), &host); 1094 } 1095 } 1096 AclDisposition::Other => {} 1097 } 1098 let edges = match parsed.extract_edges(&source) { 1099 Ok(es) => es, 1100 Err(e) => { 1101 warn!(?e, "edge extraction failed, clearing cache"); 1102 return PendingOp::ClearCache { source }; 1103 } 1104 }; 1105 let _ = ctx.store.intern_source(&source); 1106 PendingOp::Upsert { 1107 source, 1108 nsid, 1109 parsed: Box::new(parsed), 1110 bytes, 1111 cid: record.cid, 1112 edges, 1113 } 1114 } 1115 RecordAction::Delete => { 1116 evict_from_buffer(ctx.buffer, &source).await; 1117 if nsid.as_ref() == "sh.tangled.repo" { 1118 ctx.resolver.forget(&record.did, &record.rkey).await; 1119 } 1120 if nsid.as_ref() == "sh.tangled.knot.member" 1121 && let Some(registry) = ctx.knot_registry 1122 { 1123 registry.forget_legacy_member(&source); 1124 } 1125 PendingOp::Delete { source, nsid } 1126 } 1127 RecordAction::Other => { 1128 debug!(collection = %nsid, "ignoring unknown record action"); 1129 PendingOp::Noop 1130 } 1131 } 1132} 1133 1134enum AclDisposition { 1135 Other, 1136 NativeSkip, 1137 LegacyMember { host: KnotHostKey }, 1138} 1139 1140fn acl_disposition( 1141 parsed: &Record, 1142 gate: Option<&CapabilityGate>, 1143 registry: Option<&KnotRegistry>, 1144) -> AclDisposition { 1145 let Some(gate) = gate else { 1146 return AclDisposition::Other; 1147 }; 1148 match parsed { 1149 Record::KnotMember(member) => { 1150 let host = KnotHostKey::new(member.domain.as_ref()); 1151 if gate.is_native(&host) { 1152 AclDisposition::NativeSkip 1153 } else { 1154 AclDisposition::LegacyMember { host } 1155 } 1156 } 1157 Record::Collaborator(collaborator) => { 1158 let native = registry 1159 .and_then(|registry| registry.host_of_repo(&collaborator.repo)) 1160 .is_some_and(|host| gate.is_native(&host)); 1161 if native { 1162 AclDisposition::NativeSkip 1163 } else { 1164 AclDisposition::Other 1165 } 1166 } 1167 _ => AclDisposition::Other, 1168 } 1169} 1170 1171fn fallback_rfc3339( 1172 rkey: &Rkey<DefaultStr>, 1173 rev: &jacquard_common::types::tid::Tid, 1174) -> Option<String> { 1175 let tid = jacquard_common::types::tid::Tid::new(rkey.as_ref()) 1176 .ok() 1177 .unwrap_or_else(|| rev.clone()); 1178 let micros = i64::try_from(tid.timestamp()).ok()?; 1179 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(micros)?; 1180 Some(dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)) 1181} 1182 1183async fn evict_from_buffer(buffer: Option<&WarmingBuffer>, source: &AtUri<DefaultStr>) { 1184 if let Some(buffer) = buffer 1185 && !buffer.is_sealed() 1186 { 1187 buffer.evict_source(source).await; 1188 } 1189} 1190 1191async fn resolve_pending<S: SearchSink + 'static>( 1192 pending: Pending, 1193 ctx: &PipelineCtx<'_, S>, 1194) -> Pending { 1195 let Pending { 1196 cursor, 1197 signal, 1198 regime, 1199 op, 1200 } = pending; 1201 let op = match op { 1202 PendingOp::Upsert { 1203 source, 1204 nsid, 1205 parsed, 1206 bytes, 1207 cid, 1208 edges, 1209 } => { 1210 let pieces = UpsertPieces { 1211 source, 1212 nsid, 1213 parsed: *parsed, 1214 bytes, 1215 cid, 1216 edges, 1217 }; 1218 let pieces = match try_park_warming(ctx, cursor, pieces).await { 1219 ParkOutcome::Parked { nsid } => { 1220 return Pending { 1221 cursor, 1222 signal, 1223 regime, 1224 op: PendingOp::Parked { nsid }, 1225 }; 1226 } 1227 ParkOutcome::Passthrough(pieces) => *pieces, 1228 }; 1229 let UpsertPieces { 1230 source, 1231 nsid, 1232 parsed, 1233 bytes, 1234 cid, 1235 edges, 1236 } = pieces; 1237 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, ctx.shadow).await; 1238 PendingOp::Upsert { 1239 source, 1240 nsid, 1241 parsed: Box::new(parsed), 1242 bytes, 1243 cid, 1244 edges, 1245 } 1246 } 1247 other => other, 1248 }; 1249 Pending { 1250 cursor, 1251 signal, 1252 regime, 1253 op, 1254 } 1255} 1256 1257struct UpsertPieces { 1258 source: AtUri<DefaultStr>, 1259 nsid: Nsid<DefaultStr>, 1260 parsed: Record, 1261 bytes: Bytes, 1262 cid: Option<Cid<DefaultStr>>, 1263 edges: Vec<Edge>, 1264} 1265 1266impl From<ParkedUpsert> for UpsertPieces { 1267 fn from(u: ParkedUpsert) -> Self { 1268 Self { 1269 source: u.source, 1270 nsid: u.nsid, 1271 parsed: u.parsed, 1272 bytes: u.bytes, 1273 cid: u.cid, 1274 edges: u.edges, 1275 } 1276 } 1277} 1278 1279enum ParkOutcome { 1280 Parked { nsid: Nsid<DefaultStr> }, 1281 Passthrough(Box<UpsertPieces>), 1282} 1283 1284async fn try_park_warming<S: SearchSink + 'static>( 1285 ctx: &PipelineCtx<'_, S>, 1286 cursor: HydrantCursor, 1287 pieces: UpsertPieces, 1288) -> ParkOutcome { 1289 let Some(buffer) = ctx.buffer else { 1290 return ParkOutcome::Passthrough(Box::new(pieces)); 1291 }; 1292 if ctx.coverage.snapshot().is_ready() || buffer.is_sealed() { 1293 return ParkOutcome::Passthrough(Box::new(pieces)); 1294 } 1295 let deps = collect_unresolved_deps(&pieces.edges, ctx.resolver).await; 1296 if deps.is_empty() { 1297 return ParkOutcome::Passthrough(Box::new(pieces)); 1298 } 1299 let nsid = pieces.nsid.clone(); 1300 let upsert = ParkedUpsert { 1301 cursor, 1302 source: pieces.source, 1303 nsid: pieces.nsid, 1304 parsed: pieces.parsed, 1305 bytes: pieces.bytes, 1306 cid: pieces.cid, 1307 edges: pieces.edges, 1308 }; 1309 let deps_for_shadow = ctx.shadow.is_some().then(|| deps.clone()); 1310 match buffer.try_park(upsert, deps).await { 1311 Ok(()) => { 1312 if let Some((shadow, noted)) = ctx.shadow.zip(deps_for_shadow) { 1313 let _ = futures::future::join_all(noted.into_iter().map(|dep| async move { 1314 shadow.note_unresolved(dep.owner, dep.rkey).await; 1315 })) 1316 .await; 1317 } 1318 ParkOutcome::Parked { nsid } 1319 } 1320 Err(returned) => ParkOutcome::Passthrough(Box::new(returned.into())), 1321 } 1322} 1323 1324async fn collect_unresolved_deps(edges: &[Edge], resolver: &RepoIdResolver) -> Vec<RepoIdent> { 1325 futures::stream::iter(edges) 1326 .fold(Vec::new(), |mut acc, edge| async move { 1327 let Some(uri) = edge.subject.as_uri() else { 1328 return acc; 1329 }; 1330 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else { 1331 return acc; 1332 }; 1333 if resolver.cached_resolution(&owner, &rkey).await.is_some() { 1334 return acc; 1335 } 1336 let candidate = RepoIdent::new(owner, rkey); 1337 if !acc.contains(&candidate) { 1338 acc.push(candidate); 1339 } 1340 acc 1341 }) 1342 .await 1343} 1344 1345async fn finalize_drained<S: SearchSink + 'static>( 1346 ctx: &PipelineCtx<'_, S>, 1347 drained: Vec<ParkedUpsert>, 1348) { 1349 for upsert in drained { 1350 let ParkedUpsert { 1351 cursor: _, 1352 source, 1353 nsid: _, 1354 parsed, 1355 bytes, 1356 cid, 1357 edges, 1358 } = upsert; 1359 let edges = normalize_subjects(edges, ctx.resolver, ctx.coverage, None).await; 1360 cache_body(ctx.records, &source, cid, bytes); 1361 ctx.store.upsert_source(&source, edges); 1362 let outcome = apply_record_state(ctx.issue_states, ctx.pull_statuses, &source, &parsed); 1363 log_unknown_state_variant(outcome, &source); 1364 index_search(ctx.search, ctx.resolver, &source, parsed).await; 1365 } 1366} 1367 1368#[allow(clippy::too_many_arguments)] 1369async fn commit_pending<S: SearchSink>( 1370 pending: Pending, 1371 store: &EdgeStore, 1372 issue_states: &StateIndex<IssueStateKind>, 1373 pull_statuses: &StateIndex<PullStatusKind>, 1374 coverage: &CoverageWatch, 1375 search: &S, 1376 records: &dyn RecordStore, 1377 resolver: &RepoIdResolver, 1378) { 1379 let Pending { 1380 cursor, 1381 signal, 1382 regime: _, 1383 op, 1384 } = pending; 1385 match op { 1386 PendingOp::Noop | PendingOp::Parked { .. } => {} 1387 PendingOp::ClearCache { source } => records.remove(&source), 1388 PendingOp::Upsert { 1389 source, 1390 nsid: _, 1391 parsed, 1392 bytes, 1393 cid, 1394 edges, 1395 } => { 1396 cache_body(records, &source, cid, bytes); 1397 store.upsert_source(&source, edges); 1398 let outcome = apply_record_state(issue_states, pull_statuses, &source, &parsed); 1399 log_unknown_state_variant(outcome, &source); 1400 index_search(search, resolver, &source, *parsed).await; 1401 } 1402 PendingOp::Delete { source, nsid } => { 1403 store.remove_source(&source); 1404 apply_delete_to_state_index(issue_states, pull_statuses, &source, &nsid); 1405 records.remove(&source); 1406 search.remove(&source).await; 1407 } 1408 } 1409 coverage.update(|c| c.advance(cursor).maybe_promote(signal)); 1410} 1411 1412async fn index_search<S: SearchSink>( 1413 search: &S, 1414 resolver: &RepoIdResolver, 1415 source: &AtUri<DefaultStr>, 1416 parsed: Record, 1417) { 1418 let Some(searchable) = SearchableRecord::try_from_record(parsed) else { 1419 return; 1420 }; 1421 let Some(searchable) = searchable.normalize(resolver).await else { 1422 return; 1423 }; 1424 search.upsert(searchable.to_search_doc(source)).await; 1425} 1426 1427#[cfg(test)] 1428#[allow(clippy::too_many_arguments)] 1429async fn handle_frame<S: SearchSink + 'static>( 1430 frame: HydrantFrame, 1431 store: &EdgeStore, 1432 issue_states: &StateIndex<IssueStateKind>, 1433 pull_statuses: &StateIndex<PullStatusKind>, 1434 coverage: &CoverageWatch, 1435 search: &S, 1436 records: &dyn RecordStore, 1437 resolver: &RepoIdResolver, 1438 clock: &dyn Clock, 1439 now: UnixMicros, 1440) { 1441 let _ = clock; 1442 let ctx = PipelineCtx { 1443 resolver, 1444 store, 1445 issue_states, 1446 pull_statuses, 1447 coverage, 1448 records, 1449 search, 1450 shadow: None, 1451 buffer: None, 1452 knot_registry: None, 1453 knot_gate: None, 1454 }; 1455 let pending = prepare_frame(frame, &ctx, now).await; 1456 let pending = resolve_pending(pending, &ctx).await; 1457 commit_pending( 1458 pending, 1459 store, 1460 issue_states, 1461 pull_statuses, 1462 coverage, 1463 search, 1464 records, 1465 resolver, 1466 ) 1467 .await; 1468} 1469 1470fn log_unknown_state_variant(outcome: ApplyOutcome, source: &AtUri<DefaultStr>) { 1471 if matches!(outcome, ApplyOutcome::UnknownVariant) { 1472 warn!( 1473 target: "bobbin_ingest::state_index", 1474 %source, 1475 "state record has unknown wire variant, skipping index update", 1476 ); 1477 } 1478} 1479 1480fn apply_delete_to_state_index( 1481 issue_states: &StateIndex<IssueStateKind>, 1482 pull_statuses: &StateIndex<PullStatusKind>, 1483 source: &AtUri<DefaultStr>, 1484 nsid: &Nsid<DefaultStr>, 1485) { 1486 match nsid.as_ref() { 1487 "sh.tangled.repo.issue" => issue_states.remove_entity(source), 1488 "sh.tangled.repo.pull" => pull_statuses.remove_entity(source), 1489 "sh.tangled.repo.issue.state" => issue_states.remove_source(source), 1490 "sh.tangled.repo.pull.status" => pull_statuses.remove_source(source), 1491 _ => {} 1492 } 1493} 1494 1495fn promotion_signal(record: Option<&RecordFrame>, now: UnixMicros) -> PromotionSignal { 1496 PromotionSignal { 1497 rev_micros: record.map(|r| r.rev.timestamp()), 1498 now_micros: now.raw(), 1499 skew_micros: READY_SKEW.as_micros() as u64, 1500 } 1501} 1502 1503fn cache_body( 1504 records: &dyn RecordStore, 1505 source: &AtUri<DefaultStr>, 1506 cid: Option<Cid<DefaultStr>>, 1507 bytes: Bytes, 1508) { 1509 match cid { 1510 Some(cid) => records.put( 1511 source.clone(), 1512 Arc::new(RecordBody { 1513 uri: source.clone(), 1514 cid, 1515 value: bytes, 1516 }), 1517 ), 1518 None => records.remove(source), 1519 } 1520} 1521 1522async fn normalize_subjects( 1523 edges: Vec<Edge>, 1524 resolver: &RepoIdResolver, 1525 coverage: &CoverageWatch, 1526 shadow: Option<&WarmingShadowBuffer>, 1527) -> Vec<Edge> { 1528 let warming = shadow.is_some() && !coverage.snapshot().is_ready(); 1529 futures::stream::iter(edges) 1530 .filter_map(|edge| async move { 1531 let Some(uri) = edge.subject.as_uri() else { 1532 return Some(edge); 1533 }; 1534 let Some((owner, rkey)) = parse_repo_subject_uri(uri) else { 1535 return Some(edge); 1536 }; 1537 if warming 1538 && let Some(shadow) = shadow 1539 && resolver.cached_resolution(&owner, &rkey).await.is_none() 1540 { 1541 shadow 1542 .note_unresolved(owner.clone(), rkey.clone()) 1543 .await; 1544 } 1545 match resolver.resolve(&owner, &rkey).await { 1546 Resolution::Mapped(repo_did) => Some(Edge { 1547 subject: SubjectRef::Did(repo_did), 1548 ..edge 1549 }), 1550 Resolution::NoRepoDid => { 1551 warn!( 1552 target: "bobbin_ingest::normalize", 1553 kind = %edge.kind, 1554 owner = owner.as_ref(), 1555 rkey = rkey.as_ref(), 1556 source = edge.source.as_ref(), 1557 "dropping edge: target repo has no repoDid, no canonical DID subject available", 1558 ); 1559 None 1560 } 1561 Resolution::Unresolvable => { 1562 warn!( 1563 target: "bobbin_ingest::normalize", 1564 kind = %edge.kind, 1565 owner = owner.as_ref(), 1566 rkey = rkey.as_ref(), 1567 source = edge.source.as_ref(), 1568 "dropping edge: repo unresolvable, rkey-form subject will not match bare-DID queries", 1569 ); 1570 None 1571 } 1572 } 1573 }) 1574 .collect() 1575 .await 1576} 1577 1578fn parse_repo_subject_uri(uri: &AtUri<DefaultStr>) -> Option<(Did<DefaultStr>, Rkey<DefaultStr>)> { 1579 let collection = uri.collection()?; 1580 if collection.as_ref() != "sh.tangled.repo" { 1581 return None; 1582 } 1583 let AtIdentifier::Did(authority) = uri.authority() else { 1584 return None; 1585 }; 1586 let rkey = uri.rkey()?; 1587 let owner = Did::new_owned(authority.as_ref()).ok()?; 1588 let rkey = Rkey::new_owned(rkey.as_ref()).ok()?; 1589 Some((owner, rkey)) 1590} 1591 1592fn build_source_uri(r: &RecordFrame) -> Result<AtUri<DefaultStr>, IngestError> { 1593 Ok(AtUri::from_parts_owned( 1594 r.did.as_ref(), 1595 r.collection.as_ref(), 1596 r.rkey.as_ref(), 1597 )?) 1598} 1599 1600#[cfg(test)] 1601mod tests { 1602 use super::*; 1603 use bobbin_edge_index::Coverage; 1604 use bobbin_record_lru::{CacheCapacity, LruRecordStore, NoopRecordStore, RecordStore}; 1605 use bobbin_runtime::{OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs}; 1606 use bobbin_types::search::NoopSearchSink; 1607 use jacquard_common::types::nsid::Nsid; 1608 use jacquard_common::types::tid::Tid; 1609 use serde_json::json; 1610 1611 const VALID_CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 1612 1613 fn did_subj(s: &str) -> SubjectRef { 1614 SubjectRef::Did(Did::new_owned(s).unwrap()) 1615 } 1616 1617 fn uri_subj(s: &str) -> SubjectRef { 1618 SubjectRef::Uri(AtUri::new_owned(s).unwrap()) 1619 } 1620 1621 fn rkey(s: &str) -> Rkey<DefaultStr> { 1622 Rkey::new_owned(s).unwrap() 1623 } 1624 1625 #[allow(clippy::type_complexity)] 1626 fn fresh() -> ( 1627 Arc<EdgeStore>, 1628 Arc<StateIndex<IssueStateKind>>, 1629 Arc<StateIndex<PullStatusKind>>, 1630 Arc<CoverageWatch>, 1631 Arc<RepoIdResolver>, 1632 ) { 1633 ( 1634 Arc::new(EdgeStore::new(RuntimeHasher::default())), 1635 Arc::new(StateIndex::new(RuntimeHasher::default())), 1636 Arc::new(StateIndex::new(RuntimeHasher::default())), 1637 Arc::new(CoverageWatch::new()), 1638 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 1639 ) 1640 } 1641 1642 fn now() -> UnixMicros { 1643 SystemClock::new().now_unix_micros() 1644 } 1645 1646 fn sys_clock() -> SystemClock { 1647 SystemClock::new() 1648 } 1649 1650 fn parse_frame(value: serde_json::Value) -> HydrantFrame { 1651 let text = serde_json::to_string(&value).expect("serialize fixture"); 1652 serde_json::from_str(&text).expect("deserialize fixture") 1653 } 1654 1655 fn fresh_tid() -> Tid { 1656 Tid::now_0() 1657 } 1658 1659 #[tokio::test] 1660 async fn ignores_non_tangled_collections() { 1661 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1662 let frame: HydrantFrame = parse_frame(json!({ 1663 "id": 1, 1664 "type": "record", 1665 "record": { 1666 "live": false, 1667 "did": "did:plc:nel", 1668 "rev": fresh_tid().as_str(), 1669 "collection": "app.bsky.feed.post", 1670 "rkey": "abcabcabcabcz", 1671 "action": "create", 1672 "record": {"$type": "app.bsky.feed.post", "text": "hi"} 1673 } 1674 })); 1675 handle_frame( 1676 frame, 1677 &store, 1678 &issue_states, 1679 &pull_statuses, 1680 &cov, 1681 &NoopSearchSink, 1682 &NoopRecordStore, 1683 &resolver, 1684 &sys_clock(), 1685 now(), 1686 ) 1687 .await; 1688 assert_eq!(store.key_count(), 0); 1689 assert_eq!(cov.snapshot().events_processed(), 1); 1690 } 1691 1692 #[tokio::test] 1693 async fn native_knot_member_skipped_legacy_indexed() { 1694 use bobbin_knot_ingest::{CapabilityGate, KnotClient, KnotRegistry}; 1695 use wiremock::matchers::{method, path}; 1696 use wiremock::{Mock, MockServer, ResponseTemplate}; 1697 1698 let server = MockServer::start().await; 1699 Mock::given(method("GET")) 1700 .and(path("/xrpc/sh.tangled.knot.version")) 1701 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 1702 "version": "1.1.0", 1703 "capabilities": ["knot-acl"] 1704 }))) 1705 .mount(&server) 1706 .await; 1707 let url = url::Url::parse(&server.uri()).unwrap(); 1708 let native_host = format!("{}:{}", url.host_str().unwrap(), url.port().unwrap()); 1709 1710 let gate = CapabilityGate::new( 1711 KnotClient::with_default_http(true).unwrap(), 1712 Arc::new(SystemClock::new()), 1713 true, 1714 true, 1715 ); 1716 assert!(gate.has_knot_acl(&KnotHostKey::new(&native_host)).await); 1717 1718 let registry = KnotRegistry::new(); 1719 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1720 let ctx = PipelineCtx { 1721 resolver: &resolver, 1722 store: &store, 1723 issue_states: &issue_states, 1724 pull_statuses: &pull_statuses, 1725 coverage: &cov, 1726 records: &NoopRecordStore, 1727 search: &NoopSearchSink, 1728 shadow: None, 1729 buffer: None, 1730 knot_registry: Some(&registry), 1731 knot_gate: Some(&gate), 1732 }; 1733 1734 let member_frame = |id: u64, rkey: &str, domain: &str| { 1735 parse_frame(json!({ 1736 "id": id, 1737 "type": "record", 1738 "record": { 1739 "live": false, 1740 "did": "did:plc:akshay", 1741 "rev": fresh_tid().as_str(), 1742 "collection": "sh.tangled.knot.member", 1743 "rkey": rkey, 1744 "action": "create", 1745 "record": { 1746 "$type": "sh.tangled.knot.member", 1747 "subject": "did:plc:boltless", 1748 "domain": domain, 1749 "createdAt": "2026-06-01T00:00:00Z" 1750 } 1751 } 1752 })) 1753 }; 1754 1755 let native = 1756 prepare_frame(member_frame(1, "aaaaaaaaaaaaz", &native_host), &ctx, now()).await; 1757 assert!( 1758 matches!(native.op, PendingOp::Delete { .. }), 1759 "member record for a native knot must be dropped" 1760 ); 1761 1762 let legacy = 1763 prepare_frame(member_frame(2, "bbbbbbbbbbbbz", "legacy.knot"), &ctx, now()).await; 1764 assert!( 1765 matches!(legacy.op, PendingOp::Upsert { .. }), 1766 "member record for a legacy knot must be ingested" 1767 ); 1768 assert!( 1769 registry.hosts().contains(&KnotHostKey::new("legacy.knot")), 1770 "a member record seeds host discovery even before any repo is seen" 1771 ); 1772 assert_eq!( 1773 registry 1774 .drain_legacy_members(&KnotHostKey::new("legacy.knot")) 1775 .len(), 1776 1, 1777 "legacy member edge is indexed for later purge once the knot upgrades" 1778 ); 1779 } 1780 1781 #[tokio::test] 1782 async fn create_then_delete_round_trips_a_star() { 1783 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1784 let create: HydrantFrame = parse_frame(json!({ 1785 "id": 10, 1786 "type": "record", 1787 "record": { 1788 "live": false, 1789 "did": "did:plc:olaren", 1790 "rev": fresh_tid().as_str(), 1791 "collection": "sh.tangled.feed.star", 1792 "rkey": "abcabcabcabcz", 1793 "action": "create", 1794 "record": { 1795 "$type": "sh.tangled.feed.star", 1796 "createdAt": "2026-05-01T00:00:00Z", 1797 "subjectDid": "did:plc:abalone" 1798 } 1799 } 1800 })); 1801 handle_frame( 1802 create, 1803 &store, 1804 &issue_states, 1805 &pull_statuses, 1806 &cov, 1807 &NoopSearchSink, 1808 &NoopRecordStore, 1809 &resolver, 1810 &sys_clock(), 1811 now(), 1812 ) 1813 .await; 1814 let key = bobbin_types::ids::EdgeKey::new( 1815 Nsid::new_static("sh.tangled.feed.star").unwrap(), 1816 did_subj("did:plc:abalone"), 1817 ); 1818 assert_eq!(store.count(&key), 1); 1819 1820 let delete: HydrantFrame = parse_frame(json!({ 1821 "id": 11, 1822 "type": "record", 1823 "record": { 1824 "live": false, 1825 "did": "did:plc:olaren", 1826 "rev": fresh_tid().as_str(), 1827 "collection": "sh.tangled.feed.star", 1828 "rkey": "abcabcabcabcz", 1829 "action": "delete", 1830 "record": null 1831 } 1832 })); 1833 handle_frame( 1834 delete, 1835 &store, 1836 &issue_states, 1837 &pull_statuses, 1838 &cov, 1839 &NoopSearchSink, 1840 &NoopRecordStore, 1841 &resolver, 1842 &sys_clock(), 1843 now(), 1844 ) 1845 .await; 1846 assert_eq!(store.count(&key), 0); 1847 } 1848 1849 #[tokio::test] 1850 async fn update_replaces_prior_edges() { 1851 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1852 let mk = |subject_did: &Did<DefaultStr>, id: u64| -> HydrantFrame { 1853 parse_frame(json!({ 1854 "id": id, 1855 "type": "record", 1856 "record": { 1857 "live": false, 1858 "did": "did:plc:olaren", 1859 "rev": fresh_tid().as_str(), 1860 "collection": "sh.tangled.feed.star", 1861 "rkey": "abcabcabcabcz", 1862 "action": "update", 1863 "record": { 1864 "$type": "sh.tangled.feed.star", 1865 "createdAt": "2026-05-01T00:00:00Z", 1866 "subjectDid": subject_did.as_ref() 1867 } 1868 } 1869 })) 1870 }; 1871 handle_frame( 1872 mk(&Did::new_owned("did:plc:abalone").unwrap(), 1), 1873 &store, 1874 &issue_states, 1875 &pull_statuses, 1876 &cov, 1877 &NoopSearchSink, 1878 &NoopRecordStore, 1879 &resolver, 1880 &sys_clock(), 1881 now(), 1882 ) 1883 .await; 1884 handle_frame( 1885 mk(&Did::new_owned("did:plc:uni").unwrap(), 2), 1886 &store, 1887 &issue_states, 1888 &pull_statuses, 1889 &cov, 1890 &NoopSearchSink, 1891 &NoopRecordStore, 1892 &resolver, 1893 &sys_clock(), 1894 now(), 1895 ) 1896 .await; 1897 1898 let kind = Nsid::new_static("sh.tangled.feed.star").unwrap(); 1899 let old = bobbin_types::ids::EdgeKey::new(kind.clone(), did_subj("did:plc:abalone")); 1900 let new = bobbin_types::ids::EdgeKey::new(kind, did_subj("did:plc:uni")); 1901 assert_eq!(store.count(&old), 0); 1902 assert_eq!(store.count(&new), 1); 1903 } 1904 1905 #[tokio::test] 1906 async fn prepare_frame_tags_regime_from_live_flag() { 1907 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1908 let search = NoopSearchSink; 1909 let records = NoopRecordStore; 1910 let ctx = PipelineCtx { 1911 resolver: &resolver, 1912 store: &store, 1913 issue_states: &issue_states, 1914 pull_statuses: &pull_statuses, 1915 coverage: &cov, 1916 records: &records, 1917 search: &search, 1918 shadow: None, 1919 buffer: None, 1920 knot_registry: None, 1921 knot_gate: None, 1922 }; 1923 let mk = |live: bool| -> HydrantFrame { 1924 parse_frame(json!({ 1925 "id": 1, 1926 "type": "record", 1927 "record": { 1928 "live": live, 1929 "did": "did:plc:olaren", 1930 "rev": fresh_tid().as_str(), 1931 "collection": "sh.tangled.feed.star", 1932 "rkey": "abcabcabcabcz", 1933 "action": "create", 1934 "record": { 1935 "$type": "sh.tangled.feed.star", 1936 "createdAt": "2026-05-01T00:00:00Z", 1937 "subjectDid": "did:plc:abalone" 1938 } 1939 } 1940 })) 1941 }; 1942 let live_pending = prepare_frame(mk(true), &ctx, now()).await; 1943 assert_eq!(live_pending.regime, Regime::Live); 1944 let replay_pending = prepare_frame(mk(false), &ctx, now()).await; 1945 assert_eq!(replay_pending.regime, Regime::Replay); 1946 1947 let identity: HydrantFrame = parse_frame(json!({ 1948 "id": 9, 1949 "type": "identity", 1950 })); 1951 let id_pending = prepare_frame(identity, &ctx, now()).await; 1952 assert_eq!(id_pending.regime, Regime::NonRecord); 1953 } 1954 1955 #[tokio::test] 1956 async fn create_with_cid_warms_record_lru() { 1957 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1958 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 1959 let frame: HydrantFrame = parse_frame(json!({ 1960 "id": 1, 1961 "type": "record", 1962 "record": { 1963 "live": false, 1964 "did": "did:plc:olaren", 1965 "rev": fresh_tid().as_str(), 1966 "collection": "sh.tangled.feed.star", 1967 "rkey": "abcabcabcabcz", 1968 "action": "create", 1969 "cid": VALID_CID, 1970 "record": { 1971 "$type": "sh.tangled.feed.star", 1972 "createdAt": "2026-05-01T00:00:00Z", 1973 "subjectDid": "did:plc:abalone" 1974 } 1975 } 1976 })); 1977 let source = 1978 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap(); 1979 handle_frame( 1980 frame, 1981 &store, 1982 &issue_states, 1983 &pull_statuses, 1984 &cov, 1985 &NoopSearchSink, 1986 &lru, 1987 &resolver, 1988 &sys_clock(), 1989 now(), 1990 ) 1991 .await; 1992 let cached = lru.get(&source).expect("hydrant cid must seed the lru"); 1993 assert_eq!(cached.cid.as_ref(), VALID_CID); 1994 let parsed: serde_json::Value = serde_json::from_slice(&cached.value).unwrap(); 1995 assert_eq!( 1996 parsed["subject"]["did"], "did:plc:abalone", 1997 "legacy wire is upgraded to canon shape before caching so downstream readers see canonical fields" 1998 ); 1999 } 2000 2001 #[tokio::test] 2002 async fn create_without_cid_clears_record_lru() { 2003 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2004 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 2005 let source = 2006 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap(); 2007 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap(); 2008 lru.put( 2009 source.clone(), 2010 Arc::new(RecordBody { 2011 uri: source.clone(), 2012 cid, 2013 value: bytes::Bytes::from_static(b"{\"stale\":true}"), 2014 }), 2015 ); 2016 let frame: HydrantFrame = parse_frame(json!({ 2017 "id": 1, 2018 "type": "record", 2019 "record": { 2020 "live": false, 2021 "did": "did:plc:olaren", 2022 "rev": fresh_tid().as_str(), 2023 "collection": "sh.tangled.feed.star", 2024 "rkey": "abcabcabcabcz", 2025 "action": "update", 2026 "record": { 2027 "$type": "sh.tangled.feed.star", 2028 "createdAt": "2026-05-01T00:00:00Z", 2029 "subjectDid": "did:plc:abalone" 2030 } 2031 } 2032 })); 2033 handle_frame( 2034 frame, 2035 &store, 2036 &issue_states, 2037 &pull_statuses, 2038 &cov, 2039 &NoopSearchSink, 2040 &lru, 2041 &resolver, 2042 &sys_clock(), 2043 now(), 2044 ) 2045 .await; 2046 assert!( 2047 lru.get(&source).is_none(), 2048 "missing cid means we cannot trust the body, so the lru must be cleared", 2049 ); 2050 } 2051 2052 #[tokio::test] 2053 async fn delete_evicts_record_lru() { 2054 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2055 let lru = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 2056 let source = 2057 AtUri::new_owned("at://did:plc:olaren/sh.tangled.feed.star/abcabcabcabcz").unwrap(); 2058 let cid: Cid<DefaultStr> = VALID_CID.parse().unwrap(); 2059 lru.put( 2060 source.clone(), 2061 Arc::new(RecordBody { 2062 uri: source.clone(), 2063 cid, 2064 value: bytes::Bytes::from_static(b"{}"), 2065 }), 2066 ); 2067 let frame: HydrantFrame = parse_frame(json!({ 2068 "id": 1, 2069 "type": "record", 2070 "record": { 2071 "live": false, 2072 "did": "did:plc:olaren", 2073 "rev": fresh_tid().as_str(), 2074 "collection": "sh.tangled.feed.star", 2075 "rkey": "abcabcabcabcz", 2076 "action": "delete", 2077 "record": null 2078 } 2079 })); 2080 handle_frame( 2081 frame, 2082 &store, 2083 &issue_states, 2084 &pull_statuses, 2085 &cov, 2086 &NoopSearchSink, 2087 &lru, 2088 &resolver, 2089 &sys_clock(), 2090 now(), 2091 ) 2092 .await; 2093 assert!(lru.get(&source).is_none()); 2094 } 2095 2096 #[tokio::test] 2097 async fn live_recent_event_promotes_coverage_to_ready() { 2098 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2099 assert!(!cov.snapshot().is_ready()); 2100 let frame: HydrantFrame = parse_frame(json!({ 2101 "id": 99, 2102 "type": "record", 2103 "record": { 2104 "live": true, 2105 "did": "did:plc:olaren", 2106 "rev": fresh_tid().as_str(), 2107 "collection": "sh.tangled.feed.star", 2108 "rkey": "abcabcabcabcz", 2109 "action": "create", 2110 "record": { 2111 "$type": "sh.tangled.feed.star", 2112 "createdAt": "2026-05-01T00:00:00Z", 2113 "subjectDid": "did:plc:abalone" 2114 } 2115 } 2116 })); 2117 handle_frame( 2118 frame, 2119 &store, 2120 &issue_states, 2121 &pull_statuses, 2122 &cov, 2123 &NoopSearchSink, 2124 &NoopRecordStore, 2125 &resolver, 2126 &sys_clock(), 2127 now(), 2128 ) 2129 .await; 2130 assert!(cov.snapshot().is_ready()); 2131 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(99)); 2132 } 2133 2134 #[tokio::test] 2135 async fn live_but_stale_rev_does_not_promote() { 2136 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2137 let stale_tid = Tid::from_time(1_000_000, 0); 2138 let frame: HydrantFrame = parse_frame(json!({ 2139 "id": 7, 2140 "type": "record", 2141 "record": { 2142 "live": true, 2143 "did": "did:plc:olaren", 2144 "rev": stale_tid.as_str(), 2145 "collection": "sh.tangled.feed.star", 2146 "rkey": "abcabcabcabcz", 2147 "action": "create", 2148 "record": { 2149 "$type": "sh.tangled.feed.star", 2150 "createdAt": "2026-05-01T00:00:00Z", 2151 "subjectDid": "did:plc:abalone" 2152 } 2153 } 2154 })); 2155 handle_frame( 2156 frame, 2157 &store, 2158 &issue_states, 2159 &pull_statuses, 2160 &cov, 2161 &NoopSearchSink, 2162 &NoopRecordStore, 2163 &resolver, 2164 &sys_clock(), 2165 now(), 2166 ) 2167 .await; 2168 assert!(!cov.snapshot().is_ready()); 2169 assert!(matches!(cov.snapshot(), Coverage::Warming { .. })); 2170 } 2171 2172 #[test] 2173 fn classify_consumer_too_slow_frame_returns_typed_variant() { 2174 let text = r#"{"type":"error","error":"ConsumerTooSlow","message":"stream socket send blocked for at least 30 seconds"}"#; 2175 match classify_text_frame(text) { 2176 Err(IngestError::ConsumerTooSlow { message }) => { 2177 assert!( 2178 message.contains("30 seconds"), 2179 "message field preserved verbatim, got: {message}" 2180 ); 2181 } 2182 other => panic!("expected ConsumerTooSlow variant, got: {other:?}"), 2183 } 2184 } 2185 2186 #[test] 2187 fn classify_unknown_hydrant_error_falls_back_to_generic_variant() { 2188 let text = r#"{"type":"error","error":"NewFutureCode","message":"some new failure mode"}"#; 2189 match classify_text_frame(text) { 2190 Err(IngestError::HydrantStream { code, message }) => { 2191 assert_eq!(code, "NewFutureCode"); 2192 assert_eq!(message, "some new failure mode"); 2193 } 2194 other => panic!("expected HydrantStream variant, got: {other:?}"), 2195 } 2196 } 2197 2198 #[test] 2199 fn classify_error_frame_without_message_uses_empty_string() { 2200 let text = r#"{"type":"error","error":"ConsumerTooSlow"}"#; 2201 match classify_text_frame(text) { 2202 Err(IngestError::ConsumerTooSlow { message }) => assert!(message.is_empty()), 2203 other => panic!("expected ConsumerTooSlow with empty message, got: {other:?}"), 2204 } 2205 } 2206 2207 #[test] 2208 fn classify_normal_record_frame_unchanged() { 2209 let text = r#"{"id":42,"type":"record"}"#; 2210 let frame = classify_text_frame(text).expect("normal record frame must parse"); 2211 assert_eq!(frame.id, 42); 2212 assert_eq!(frame.kind, FrameKind::Record); 2213 } 2214 2215 #[test] 2216 fn classify_garbage_object_returns_decode_error() { 2217 let text = r#"{"random":"object","without":"required fields"}"#; 2218 match classify_text_frame(text) { 2219 Err(IngestError::Decode(_)) => {} 2220 other => panic!("expected Decode error, got: {other:?}"), 2221 } 2222 } 2223 2224 struct ScriptedWsStream { 2225 messages: std::collections::VecDeque<Result<WsMessage, NetworkError>>, 2226 } 2227 2228 impl WsStream for ScriptedWsStream { 2229 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> { 2230 let msg = self.messages.pop_front(); 2231 Box::pin(async move { msg }) 2232 } 2233 } 2234 2235 #[tokio::test] 2236 async fn reader_loop_surfaces_consumer_too_slow_from_error_frame() { 2237 let mut messages = std::collections::VecDeque::new(); 2238 messages.push_back(Ok(WsMessage::Text( 2239 r#"{"type":"error","error":"ConsumerTooSlow","message":"stream delivery blocked"}"# 2240 .to_owned(), 2241 ))); 2242 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages }); 2243 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH); 2244 let (control_tx, _control_rx) = 2245 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2246 let cancel = CancellationToken::new(); 2247 2248 let end = reader_loop(stream, frame_tx, control_tx, cancel).await; 2249 2250 match end.error { 2251 Some(IngestError::ConsumerTooSlow { message }) => { 2252 assert_eq!(message, "stream delivery blocked"); 2253 } 2254 other => panic!("expected ConsumerTooSlow SessionEnd error, got: {other:?}"), 2255 } 2256 assert_eq!(end.outcome, SessionOutcome::Empty); 2257 } 2258 2259 #[tokio::test] 2260 async fn reader_loop_surfaces_unknown_hydrant_error_distinctly() { 2261 let mut messages = std::collections::VecDeque::new(); 2262 messages.push_back(Ok(WsMessage::Text( 2263 r#"{"type":"error","error":"NewFutureCode","message":"new mode"}"#.to_owned(), 2264 ))); 2265 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages }); 2266 let (frame_tx, _frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(FRAME_CHANNEL_DEPTH); 2267 let (control_tx, _control_rx) = 2268 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2269 let cancel = CancellationToken::new(); 2270 2271 let end = reader_loop(stream, frame_tx, control_tx, cancel).await; 2272 2273 match end.error { 2274 Some(IngestError::HydrantStream { code, message }) => { 2275 assert_eq!(code, "NewFutureCode"); 2276 assert_eq!(message, "new mode"); 2277 } 2278 other => panic!("expected HydrantStream SessionEnd error, got: {other:?}"), 2279 } 2280 } 2281 2282 struct ChannelWsStream { 2283 rx: tokio::sync::mpsc::Receiver<Result<WsMessage, NetworkError>>, 2284 } 2285 2286 impl WsStream for ChannelWsStream { 2287 fn next<'a>(&'a mut self) -> bobbin_runtime::WsMessageFuture<'a> { 2288 Box::pin(async move { self.rx.recv().await }) 2289 } 2290 } 2291 2292 fn star_frame_text(id: u64, rkey: &Rkey<DefaultStr>) -> String { 2293 json!({ 2294 "id": id, 2295 "type": "record", 2296 "record": { 2297 "live": false, 2298 "did": "did:plc:olaren", 2299 "rev": fresh_tid().as_str(), 2300 "collection": "sh.tangled.feed.star", 2301 "rkey": rkey.as_ref(), 2302 "action": "create", 2303 "record": { 2304 "$type": "sh.tangled.feed.star", 2305 "createdAt": "2026-05-01T00:00:00Z", 2306 "subjectDid": "did:plc:abalone" 2307 } 2308 } 2309 }) 2310 .to_string() 2311 } 2312 2313 #[tokio::test] 2314 async fn pong_forwards_promptly_when_frame_channel_is_full() { 2315 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8); 2316 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx }); 2317 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1); 2318 let (control_tx, mut control_rx) = 2319 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2320 let cancel = CancellationToken::new(); 2321 2322 let prefill: HydrantFrame = parse_frame(json!({ 2323 "id": 0, 2324 "type": "record", 2325 "record": { 2326 "live": false, 2327 "did": "did:plc:olaren", 2328 "rev": fresh_tid().as_str(), 2329 "collection": "sh.tangled.feed.star", 2330 "rkey": "prefilrkey001", 2331 "action": "create", 2332 "record": { 2333 "$type": "sh.tangled.feed.star", 2334 "createdAt": "2026-05-01T00:00:00Z", 2335 "subjectDid": "did:plc:abalone" 2336 } 2337 } 2338 })); 2339 frame_tx 2340 .try_send(prefill) 2341 .expect("depth-1 frame channel must accept the prefill"); 2342 2343 let reader_handle = tokio::spawn(reader_loop( 2344 stream, 2345 frame_tx.clone(), 2346 control_tx.clone(), 2347 cancel.clone(), 2348 )); 2349 2350 ws_tx 2351 .send(Ok(WsMessage::Text(star_frame_text( 2352 1, 2353 &rkey("starrkeyaa001"), 2354 )))) 2355 .await 2356 .unwrap(); 2357 ws_tx.send(Ok(WsMessage::Pong(Bytes::new()))).await.unwrap(); 2358 2359 let pong_event = tokio::time::timeout(Duration::from_millis(500), control_rx.recv()) 2360 .await 2361 .expect("pong must surface inside 500ms even when frame_tx is saturated. A reader blocked on a frame send would never poll the next ws message") 2362 .expect("control_tx was not closed"); 2363 assert!( 2364 matches!(pong_event, WsEvent::IncomingPong), 2365 "first control event must be the pong, not a held text", 2366 ); 2367 2368 let too_slow = 2369 "{\"type\":\"error\",\"error\":\"ConsumerTooSlow\",\"message\":\"saturated\"}" 2370 .to_string(); 2371 ws_tx.send(Ok(WsMessage::Text(too_slow))).await.unwrap(); 2372 2373 let end = tokio::time::timeout(Duration::from_secs(1), reader_handle) 2374 .await 2375 .expect("reader must exit promptly once ConsumerTooSlow is read") 2376 .expect("reader task must not panic"); 2377 match end.error { 2378 Some(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "saturated"), 2379 other => panic!("expected ConsumerTooSlow disconnect, got: {other:?}"), 2380 } 2381 2382 drop(frame_rx); 2383 } 2384 2385 #[tokio::test] 2386 async fn reader_drains_held_frames_once_processor_catches_up() { 2387 let (ws_tx, ws_rx) = tokio::sync::mpsc::channel::<Result<WsMessage, NetworkError>>(8); 2388 let stream: Box<dyn WsStream> = Box::new(ChannelWsStream { rx: ws_rx }); 2389 let (frame_tx, mut frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(1); 2390 let (control_tx, _control_rx) = 2391 tokio::sync::mpsc::channel::<WsEvent>(CONTROL_CHANNEL_DEPTH); 2392 let cancel = CancellationToken::new(); 2393 2394 let prefill: HydrantFrame = parse_frame(json!({ 2395 "id": 0, 2396 "type": "record", 2397 "record": { 2398 "live": false, 2399 "did": "did:plc:olaren", 2400 "rev": fresh_tid().as_str(), 2401 "collection": "sh.tangled.feed.star", 2402 "rkey": "prefilrkey001", 2403 "action": "create", 2404 "record": { 2405 "$type": "sh.tangled.feed.star", 2406 "createdAt": "2026-05-01T00:00:00Z", 2407 "subjectDid": "did:plc:abalone" 2408 } 2409 } 2410 })); 2411 frame_tx.try_send(prefill).unwrap(); 2412 2413 let reader_handle = tokio::spawn(reader_loop( 2414 stream, 2415 frame_tx.clone(), 2416 control_tx, 2417 cancel.clone(), 2418 )); 2419 2420 ws_tx 2421 .send(Ok(WsMessage::Text(star_frame_text( 2422 1, 2423 &rkey("heldrkeyaa001"), 2424 )))) 2425 .await 2426 .unwrap(); 2427 ws_tx 2428 .send(Ok(WsMessage::Text(star_frame_text( 2429 2, 2430 &rkey("heldrkeyaa002"), 2431 )))) 2432 .await 2433 .unwrap(); 2434 2435 let _drained_prefill = frame_rx.recv().await.expect("prefilled frame drains"); 2436 let first = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv()) 2437 .await 2438 .expect("first held frame must reach frame_rx after slot opens") 2439 .expect("frame_tx still open"); 2440 assert_eq!(first.id, 1); 2441 let second = tokio::time::timeout(Duration::from_millis(500), frame_rx.recv()) 2442 .await 2443 .expect("second held frame must reach frame_rx after slot opens") 2444 .expect("frame_tx still open"); 2445 assert_eq!(second.id, 2); 2446 2447 cancel.cancel(); 2448 let _ = tokio::time::timeout(Duration::from_secs(1), reader_handle) 2449 .await 2450 .expect("reader must stop after cancel"); 2451 } 2452 2453 #[test] 2454 fn first_connect_uses_configured_start_cursor() { 2455 let start = HydrantCursor::new(42); 2456 assert_eq!(next_connect_cursor(Coverage::default(), start), start); 2457 } 2458 2459 #[test] 2460 fn reconnect_resumes_strictly_after_last_seen() { 2461 let snap = Coverage::default().advance(HydrantCursor::new(7)); 2462 assert_eq!( 2463 next_connect_cursor(snap, HydrantCursor::new(0)), 2464 HydrantCursor::new(8), 2465 ); 2466 } 2467 2468 #[test] 2469 fn reconnect_overrides_configured_start() { 2470 let snap = Coverage::default().advance(HydrantCursor::new(100)); 2471 assert_eq!( 2472 next_connect_cursor(snap, HydrantCursor::new(50)), 2473 HydrantCursor::new(101), 2474 ); 2475 } 2476 2477 #[test] 2478 fn first_connect_uses_start_even_when_first_frame_id_would_be_zero() { 2479 let start = HydrantCursor::new(7); 2480 let snap = Coverage::default(); 2481 assert_eq!(snap.last_cursor(), HydrantCursor::new(0)); 2482 assert_eq!(snap.events_processed(), 0); 2483 assert_eq!(next_connect_cursor(snap, start), start); 2484 } 2485 2486 #[test] 2487 fn reconnect_after_processing_id_zero_advances_to_one() { 2488 let snap = Coverage::default().advance(HydrantCursor::new(0)); 2489 assert_eq!(snap.events_processed(), 1); 2490 assert_eq!( 2491 next_connect_cursor(snap, HydrantCursor::new(99)), 2492 HydrantCursor::new(1), 2493 "events_processed disambiguates 'never seen' from 'saw id 0'", 2494 ); 2495 } 2496 2497 #[tokio::test] 2498 async fn identity_frame_advances_cursor_only() { 2499 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2500 let frame: HydrantFrame = parse_frame(json!({ 2501 "id": 5, 2502 "type": "identity", 2503 "identity": { 2504 "did": "did:plc:olaren", 2505 "handle": "olaren.dev" 2506 } 2507 })); 2508 handle_frame( 2509 frame, 2510 &store, 2511 &issue_states, 2512 &pull_statuses, 2513 &cov, 2514 &NoopSearchSink, 2515 &NoopRecordStore, 2516 &resolver, 2517 &sys_clock(), 2518 now(), 2519 ) 2520 .await; 2521 assert_eq!(store.key_count(), 0); 2522 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(5)); 2523 assert!(!cov.snapshot().is_ready()); 2524 } 2525 2526 #[tokio::test] 2527 async fn account_frame_advances_cursor_only() { 2528 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2529 let frame: HydrantFrame = parse_frame(json!({ 2530 "id": 6, 2531 "type": "account", 2532 "account": {"did": "did:plc:olaren", "active": true} 2533 })); 2534 handle_frame( 2535 frame, 2536 &store, 2537 &issue_states, 2538 &pull_statuses, 2539 &cov, 2540 &NoopSearchSink, 2541 &NoopRecordStore, 2542 &resolver, 2543 &sys_clock(), 2544 now(), 2545 ) 2546 .await; 2547 assert_eq!(store.key_count(), 0); 2548 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(6)); 2549 } 2550 2551 #[tokio::test] 2552 async fn unknown_frame_kind_advances_cursor_without_panic() { 2553 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2554 let frame: HydrantFrame = parse_frame(json!({"id": 8, "type": "future_event"})); 2555 assert_eq!(frame.kind, FrameKind::Other); 2556 handle_frame( 2557 frame, 2558 &store, 2559 &issue_states, 2560 &pull_statuses, 2561 &cov, 2562 &NoopSearchSink, 2563 &NoopRecordStore, 2564 &resolver, 2565 &sys_clock(), 2566 now(), 2567 ) 2568 .await; 2569 assert_eq!(cov.snapshot().last_cursor(), HydrantCursor::new(8)); 2570 } 2571 2572 fn fresh_runtime(cancel: CancellationToken) -> IngestRuntime<NoopSearchSink> { 2573 IngestRuntime { 2574 store: Arc::new(EdgeStore::new(RuntimeHasher::default())), 2575 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())), 2576 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())), 2577 coverage: Arc::new(CoverageWatch::new()), 2578 search: Arc::new(NoopSearchSink), 2579 records: Arc::new(NoopRecordStore) as Arc<dyn RecordStore>, 2580 resolver: Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 2581 clock: Arc::new(SystemClock::new()), 2582 entropy: Arc::new(OsEntropy), 2583 ws: TungsteniteWs::shared(), 2584 cancel, 2585 disconnects: None, 2586 warming_shadow: None, 2587 warming_buffer: None, 2588 knot_registry: None, 2589 knot_gate: None, 2590 } 2591 } 2592 2593 #[tokio::test(start_paused = true)] 2594 async fn cancel_token_short_circuits_reconnect_sleep() { 2595 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap()); 2596 let cancel = CancellationToken::new(); 2597 let runtime = fresh_runtime(cancel.clone()); 2598 let task = tokio::spawn(async move { run(cfg, runtime).await }); 2599 tokio::time::advance(Duration::from_millis(10)).await; 2600 cancel.cancel(); 2601 let outcome = tokio::time::timeout(Duration::from_secs(1), task) 2602 .await 2603 .expect("ingest must stop within timeout once cancel fires"); 2604 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}"); 2605 } 2606 2607 #[test] 2608 fn jittered_stays_within_one_quarter_of_base() { 2609 let base = Duration::from_secs(1); 2610 let cap = base + Duration::from_millis(250); 2611 let entropy = OsEntropy; 2612 (0..50).for_each(|_| { 2613 let j = jittered(base, &entropy); 2614 assert!(j >= base, "jitter must not undershoot"); 2615 assert!(j <= cap, "jitter must not exceed +25%, got {:?}", j); 2616 }); 2617 } 2618 2619 #[tokio::test] 2620 async fn star_after_observed_repo_keys_on_repo_did() { 2621 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2622 let repo: HydrantFrame = parse_frame(json!({ 2623 "id": 1, 2624 "type": "record", 2625 "record": { 2626 "live": false, 2627 "did": "did:plc:nel", 2628 "rev": fresh_tid().as_str(), 2629 "collection": "sh.tangled.repo", 2630 "rkey": "abcabcabcabcz", 2631 "action": "create", 2632 "record": { 2633 "$type": "sh.tangled.repo", 2634 "createdAt": "2026-05-01T00:00:00Z", 2635 "knot": "oyster.cafe", 2636 "name": "abalone", 2637 "repoDid": "did:plc:abalone" 2638 } 2639 } 2640 })); 2641 handle_frame( 2642 repo, 2643 &store, 2644 &issue_states, 2645 &pull_statuses, 2646 &cov, 2647 &NoopSearchSink, 2648 &NoopRecordStore, 2649 &resolver, 2650 &sys_clock(), 2651 now(), 2652 ) 2653 .await; 2654 2655 let star: HydrantFrame = parse_frame(json!({ 2656 "id": 2, 2657 "type": "record", 2658 "record": { 2659 "live": false, 2660 "did": "did:plc:olaren", 2661 "rev": fresh_tid().as_str(), 2662 "collection": "sh.tangled.feed.star", 2663 "rkey": "abcabcabcabcz", 2664 "action": "create", 2665 "record": { 2666 "$type": "sh.tangled.feed.star", 2667 "createdAt": "2026-05-01T00:00:00Z", 2668 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2669 } 2670 } 2671 })); 2672 handle_frame( 2673 star, 2674 &store, 2675 &issue_states, 2676 &pull_statuses, 2677 &cov, 2678 &NoopSearchSink, 2679 &NoopRecordStore, 2680 &resolver, 2681 &sys_clock(), 2682 now(), 2683 ) 2684 .await; 2685 2686 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap(); 2687 let repo_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:abalone")); 2688 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel")); 2689 assert_eq!( 2690 store.count(&repo_keyed), 2691 1, 2692 "star should be keyed on repoDID once the repo is observed" 2693 ); 2694 assert_eq!( 2695 store.count(&owner_keyed), 2696 0, 2697 "owner DID should not collect the edge" 2698 ); 2699 } 2700 2701 #[tokio::test] 2702 async fn unresolvable_repo_subject_drops_edge() { 2703 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2704 let star: HydrantFrame = parse_frame(json!({ 2705 "id": 1, 2706 "type": "record", 2707 "record": { 2708 "live": false, 2709 "did": "did:plc:olaren", 2710 "rev": fresh_tid().as_str(), 2711 "collection": "sh.tangled.feed.star", 2712 "rkey": "abcabcabcabcz", 2713 "action": "create", 2714 "record": { 2715 "$type": "sh.tangled.feed.star", 2716 "createdAt": "2026-05-01T00:00:00Z", 2717 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2718 } 2719 } 2720 })); 2721 handle_frame( 2722 star, 2723 &store, 2724 &issue_states, 2725 &pull_statuses, 2726 &cov, 2727 &NoopSearchSink, 2728 &NoopRecordStore, 2729 &resolver, 2730 &sys_clock(), 2731 now(), 2732 ) 2733 .await; 2734 2735 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap(); 2736 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid.clone(), did_subj("did:plc:nel")); 2737 let uri_keyed = bobbin_types::ids::EdgeKey::new( 2738 nsid, 2739 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"), 2740 ); 2741 assert_eq!( 2742 store.count(&owner_keyed), 2743 0, 2744 "must not silently misfile under the authoring DID", 2745 ); 2746 assert_eq!( 2747 store.count(&uri_keyed), 2748 0, 2749 "unresolvable rkey-form subject must drop the edge; keeping it would index against a key that never matches bare-DID queries", 2750 ); 2751 } 2752 2753 #[tokio::test] 2754 async fn repo_without_repo_did_drops_edge() { 2755 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2756 let repo: HydrantFrame = parse_frame(json!({ 2757 "id": 1, 2758 "type": "record", 2759 "record": { 2760 "live": false, 2761 "did": "did:plc:nel", 2762 "rev": fresh_tid().as_str(), 2763 "collection": "sh.tangled.repo", 2764 "rkey": "abcabcabcabcz", 2765 "action": "create", 2766 "record": { 2767 "$type": "sh.tangled.repo", 2768 "createdAt": "2026-05-01T00:00:00Z", 2769 "knot": "oyster.cafe", 2770 "name": "abalone" 2771 } 2772 } 2773 })); 2774 handle_frame( 2775 repo, 2776 &store, 2777 &issue_states, 2778 &pull_statuses, 2779 &cov, 2780 &NoopSearchSink, 2781 &NoopRecordStore, 2782 &resolver, 2783 &sys_clock(), 2784 now(), 2785 ) 2786 .await; 2787 2788 let star: HydrantFrame = parse_frame(json!({ 2789 "id": 2, 2790 "type": "record", 2791 "record": { 2792 "live": false, 2793 "did": "did:plc:olaren", 2794 "rev": fresh_tid().as_str(), 2795 "collection": "sh.tangled.feed.star", 2796 "rkey": "abcabcabcabcz", 2797 "action": "create", 2798 "record": { 2799 "$type": "sh.tangled.feed.star", 2800 "createdAt": "2026-05-01T00:00:00Z", 2801 "subject": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2802 } 2803 } 2804 })); 2805 handle_frame( 2806 star, 2807 &store, 2808 &issue_states, 2809 &pull_statuses, 2810 &cov, 2811 &NoopSearchSink, 2812 &NoopRecordStore, 2813 &resolver, 2814 &sys_clock(), 2815 now(), 2816 ) 2817 .await; 2818 2819 let nsid = Nsid::new_static("sh.tangled.feed.star").unwrap(); 2820 let uri_keyed = bobbin_types::ids::EdgeKey::new( 2821 nsid.clone(), 2822 uri_subj("at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"), 2823 ); 2824 let owner_keyed = bobbin_types::ids::EdgeKey::new(nsid, did_subj("did:plc:nel")); 2825 assert_eq!( 2826 store.count(&uri_keyed), 2827 0, 2828 "no canonical DID exists for a repo without repoDID, so the edge must be dropped", 2829 ); 2830 assert_eq!( 2831 store.count(&owner_keyed), 2832 0, 2833 "the authoring DID is not the canonical repo identity", 2834 ); 2835 } 2836 2837 #[tokio::test] 2838 async fn explicit_subject_did_skips_normalization() { 2839 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2840 let star: HydrantFrame = parse_frame(json!({ 2841 "id": 1, 2842 "type": "record", 2843 "record": { 2844 "live": false, 2845 "did": "did:plc:olaren", 2846 "rev": fresh_tid().as_str(), 2847 "collection": "sh.tangled.feed.star", 2848 "rkey": "abcabcabcabcz", 2849 "action": "create", 2850 "record": { 2851 "$type": "sh.tangled.feed.star", 2852 "createdAt": "2026-05-01T00:00:00Z", 2853 "subjectDid": "did:plc:abalone" 2854 } 2855 } 2856 })); 2857 handle_frame( 2858 star, 2859 &store, 2860 &issue_states, 2861 &pull_statuses, 2862 &cov, 2863 &NoopSearchSink, 2864 &NoopRecordStore, 2865 &resolver, 2866 &sys_clock(), 2867 now(), 2868 ) 2869 .await; 2870 let key = bobbin_types::ids::EdgeKey::new( 2871 Nsid::new_static("sh.tangled.feed.star").unwrap(), 2872 did_subj("did:plc:abalone"), 2873 ); 2874 assert_eq!(store.count(&key), 1); 2875 } 2876 2877 #[tokio::test] 2878 async fn issue_with_repo_uri_resolves_to_repo_did() { 2879 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2880 resolver 2881 .observe( 2882 Did::new_owned("did:plc:nel").unwrap(), 2883 Rkey::new_owned("abcabcabcabcz").unwrap(), 2884 Some(Did::new_owned("did:plc:abalone").unwrap()), 2885 ) 2886 .await; 2887 let issue: HydrantFrame = parse_frame(json!({ 2888 "id": 1, 2889 "type": "record", 2890 "record": { 2891 "live": false, 2892 "did": "did:plc:olaren", 2893 "rev": fresh_tid().as_str(), 2894 "collection": "sh.tangled.repo.issue", 2895 "rkey": "abcabcabcabcz", 2896 "action": "create", 2897 "record": { 2898 "$type": "sh.tangled.repo.issue", 2899 "createdAt": "2026-05-01T00:00:00Z", 2900 "title": "bug", 2901 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2902 } 2903 } 2904 })); 2905 handle_frame( 2906 issue, 2907 &store, 2908 &issue_states, 2909 &pull_statuses, 2910 &cov, 2911 &NoopSearchSink, 2912 &NoopRecordStore, 2913 &resolver, 2914 &sys_clock(), 2915 now(), 2916 ) 2917 .await; 2918 let key = bobbin_types::ids::EdgeKey::new( 2919 Nsid::new_static("sh.tangled.repo.issue").unwrap(), 2920 did_subj("did:plc:abalone"), 2921 ); 2922 assert_eq!(store.count(&key), 1); 2923 } 2924 2925 #[derive(Default)] 2926 struct RecordingSearchSink { 2927 docs: tokio::sync::Mutex<Vec<bobbin_types::search::SearchDoc>>, 2928 } 2929 2930 impl SearchSink for RecordingSearchSink { 2931 async fn upsert(&self, doc: bobbin_types::search::SearchDoc) { 2932 self.docs.lock().await.push(doc); 2933 } 2934 async fn remove(&self, _uri: &AtUri<DefaultStr>) {} 2935 } 2936 2937 #[tokio::test] 2938 async fn search_index_hydrates_repo_did_via_resolver() { 2939 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2940 resolver 2941 .observe( 2942 Did::new_owned("did:plc:nel").unwrap(), 2943 Rkey::new_owned("abcabcabcabcz").unwrap(), 2944 Some(Did::new_owned("did:plc:abalone").unwrap()), 2945 ) 2946 .await; 2947 let search = RecordingSearchSink::default(); 2948 let issue: HydrantFrame = parse_frame(json!({ 2949 "id": 1, 2950 "type": "record", 2951 "record": { 2952 "live": false, 2953 "did": "did:plc:olaren", 2954 "rev": fresh_tid().as_str(), 2955 "collection": "sh.tangled.repo.issue", 2956 "rkey": "abcabcabcabcz", 2957 "action": "create", 2958 "record": { 2959 "$type": "sh.tangled.repo.issue", 2960 "createdAt": "2026-05-01T00:00:00Z", 2961 "title": "bug", 2962 "repo": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz" 2963 } 2964 } 2965 })); 2966 handle_frame( 2967 issue, 2968 &store, 2969 &issue_states, 2970 &pull_statuses, 2971 &cov, 2972 &search, 2973 &NoopRecordStore, 2974 &resolver, 2975 &sys_clock(), 2976 now(), 2977 ) 2978 .await; 2979 let docs = search.docs.lock().await; 2980 assert_eq!(docs.len(), 1, "issue should produce one search doc"); 2981 assert_eq!( 2982 docs[0].repo, 2983 Some(Did::new_owned("did:plc:abalone").unwrap()), 2984 "search doc repo field must be resolved from the observed repo, not left as None", 2985 ); 2986 } 2987 2988 #[tokio::test] 2989 async fn delete_repo_record_evicts_resolver_cache() { 2990 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 2991 let owner = Did::new_owned("did:plc:nel").unwrap(); 2992 let rkey = Rkey::new_owned("abcabcabcabcz").unwrap(); 2993 resolver 2994 .observe( 2995 owner.clone(), 2996 rkey.clone(), 2997 Some(Did::new_owned("did:plc:abalone").unwrap()), 2998 ) 2999 .await; 3000 assert!( 3001 resolver.cached_resolution(&owner, &rkey).await.is_some(), 3002 "observe must seed the cache", 3003 ); 3004 let delete: HydrantFrame = parse_frame(json!({ 3005 "id": 1, 3006 "type": "record", 3007 "record": { 3008 "live": false, 3009 "did": owner.as_ref(), 3010 "rev": fresh_tid().as_str(), 3011 "collection": "sh.tangled.repo", 3012 "rkey": rkey.as_ref(), 3013 "action": "delete" 3014 } 3015 })); 3016 handle_frame( 3017 delete, 3018 &store, 3019 &issue_states, 3020 &pull_statuses, 3021 &cov, 3022 &NoopSearchSink, 3023 &NoopRecordStore, 3024 &resolver, 3025 &sys_clock(), 3026 now(), 3027 ) 3028 .await; 3029 assert!( 3030 resolver.cached_resolution(&owner, &rkey).await.is_none(), 3031 "deleting the repo record must clear the resolver cache so future observes are not blocked by a stale Authoritative entry", 3032 ); 3033 } 3034 3035 #[tokio::test] 3036 async fn cancel_short_circuits_a_hung_ws_connect() { 3037 let _listener = tokio::net::TcpListener::bind("127.0.0.1:0") 3038 .await 3039 .expect("bind sink listener"); 3040 let port = _listener.local_addr().expect("local addr").port(); 3041 let cfg = 3042 IngestConfig::new(Url::parse(&format!("ws://127.0.0.1:{port}")).expect("hydrant url")); 3043 let cancel = CancellationToken::new(); 3044 let runtime = fresh_runtime(cancel.clone()); 3045 let task = tokio::spawn(async move { run(cfg, runtime).await }); 3046 tokio::time::sleep(Duration::from_millis(100)).await; 3047 cancel.cancel(); 3048 let outcome = tokio::time::timeout(Duration::from_secs(2), task) 3049 .await 3050 .expect("cancel must short-circuit the hung ws connect"); 3051 assert!(matches!(outcome, Ok(Ok(()))), "got {outcome:?}"); 3052 } 3053 3054 struct CloseOnConnectTransport { 3055 used: std::sync::Mutex<bool>, 3056 } 3057 impl bobbin_runtime::WsTransport for CloseOnConnectTransport { 3058 fn connect(&self, _url: Url) -> bobbin_runtime::WsConnectFuture { 3059 let mut used = self.used.lock().unwrap(); 3060 if *used { 3061 return Box::pin(async move { 3062 Err(NetworkError::Connect("only one connect allowed".to_owned())) 3063 }); 3064 } 3065 *used = true; 3066 Box::pin(async move { 3067 let mut q = std::collections::VecDeque::new(); 3068 q.push_back(Ok(WsMessage::Close { 3069 code: 1000, 3070 reason: "bye".to_owned(), 3071 })); 3072 let stream: Box<dyn WsStream> = Box::new(ScriptedWsStream { messages: q }); 3073 struct NoopSink; 3074 impl bobbin_runtime::WsSink for NoopSink { 3075 fn send<'a>(&'a mut self, _m: WsMessage) -> bobbin_runtime::WsSendFuture<'a> { 3076 Box::pin(async move { Ok(()) }) 3077 } 3078 } 3079 let sink: Box<dyn bobbin_runtime::WsSink> = Box::new(NoopSink); 3080 Ok(bobbin_runtime::WsConn { sink, stream }) 3081 }) 3082 } 3083 } 3084 3085 #[tokio::test] 3086 async fn run_session_returns_after_remote_close_when_outer_cancel_unfired() { 3087 let cfg = IngestConfig::new(Url::parse("ws://127.0.0.1:1").unwrap()); 3088 let cancel = CancellationToken::new(); 3089 let mut runtime = fresh_runtime(cancel.clone()); 3090 runtime.ws = Arc::new(CloseOnConnectTransport { 3091 used: std::sync::Mutex::new(false), 3092 }); 3093 3094 let res = tokio::time::timeout( 3095 Duration::from_secs(3), 3096 run_session(&cfg, HydrantCursor::new(0), &runtime), 3097 ) 3098 .await; 3099 assert!( 3100 res.is_ok(), 3101 "run_session must return after a remote Close even when outer cancel never fires - regression for a session-scoped task hanging on the parent token", 3102 ); 3103 } 3104 3105 struct HangingSink; 3106 impl bobbin_runtime::WsSink for HangingSink { 3107 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> { 3108 Box::pin(std::future::pending()) 3109 } 3110 } 3111 3112 #[tokio::test(start_paused = true)] 3113 async fn timed_send_surfaces_send_timeout_when_sink_pends_forever() { 3114 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(HangingSink); 3115 let task = 3116 tokio::spawn(async move { timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await }); 3117 tokio::time::advance(SEND_TIMEOUT + Duration::from_secs(1)).await; 3118 let result = task.await.expect("task panicked"); 3119 match result { 3120 Err(IngestError::SendTimeout(d)) => assert_eq!(d, SEND_TIMEOUT), 3121 other => panic!( 3122 "expected SendTimeout, got {other:?}. A bare ws_sink.send.await would hang forever on a half-dead socket and starve the writer's pong-deadline arm", 3123 ), 3124 } 3125 } 3126 3127 struct OkSink; 3128 impl bobbin_runtime::WsSink for OkSink { 3129 fn send<'a>(&'a mut self, _: WsMessage) -> bobbin_runtime::WsSendFuture<'a> { 3130 Box::pin(async move { Ok(()) }) 3131 } 3132 } 3133 3134 #[tokio::test] 3135 async fn timed_send_returns_ok_when_sink_succeeds_promptly() { 3136 let mut sink: Box<dyn bobbin_runtime::WsSink> = Box::new(OkSink); 3137 let result = timed_send(&mut sink, WsMessage::Ping(Bytes::new())).await; 3138 assert!(matches!(result, Ok(())), "got {result:?}"); 3139 } 3140 3141 #[test] 3142 fn classify_error_frame_with_id_dispatches_as_error_not_unknown_kind() { 3143 let text = r#"{"id":42,"type":"error","error":"ConsumerTooSlow","message":"slow"}"#; 3144 match classify_text_frame(text) { 3145 Err(IngestError::ConsumerTooSlow { message }) => assert_eq!(message, "slow"), 3146 other => panic!( 3147 "type=\"error\" must dispatch to the error path even when id is present, got: {other:?}" 3148 ), 3149 } 3150 } 3151 3152 #[tokio::test] 3153 async fn buffered_pipeline_preserves_cursor_order_under_resolve_latency_skew() { 3154 use bobbin_record_lru::RecordStore; 3155 use bobbin_types::record::RecordBody; 3156 use std::sync::Mutex; 3157 3158 let server = wiremock::MockServer::start().await; 3159 wiremock::Mock::given(wiremock::matchers::method("GET")) 3160 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 3161 .respond_with( 3162 wiremock::ResponseTemplate::new(404).set_delay(Duration::from_millis(150)), 3163 ) 3164 .mount(&server) 3165 .await; 3166 3167 let client = bobbin_slingshot_client::SlingshotClient::with_default_http( 3168 Url::parse(&server.uri()).unwrap(), 3169 ) 3170 .unwrap(); 3171 let clock: Arc<dyn Clock> = Arc::new(SystemClock::new()); 3172 let resolver = Arc::new(RepoIdResolver::with_slingshot( 3173 client, 3174 clock.clone(), 3175 RuntimeHasher::default(), 3176 )); 3177 3178 let owner = Did::new_owned("did:plc:nel").unwrap(); 3179 let abalone = Did::new_owned("did:plc:abalone").unwrap(); 3180 let fast_rkeys: [Rkey<DefaultStr>; 2] = [rkey("fastrkeyaa01"), rkey("fastrkeyaa02")]; 3181 let slow_rkeys: [Rkey<DefaultStr>; 2] = [rkey("slowrkeyaa01"), rkey("slowrkeyaa02")]; 3182 for r in &fast_rkeys { 3183 resolver 3184 .observe(owner.clone(), r.clone(), Some(abalone.clone())) 3185 .await; 3186 } 3187 3188 #[derive(Default)] 3189 struct Capturing { 3190 urls: Mutex<Vec<AtUri<DefaultStr>>>, 3191 } 3192 impl RecordStore for Capturing { 3193 fn get(&self, _uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> { 3194 None 3195 } 3196 fn put(&self, uri: AtUri<DefaultStr>, _body: Arc<RecordBody>) { 3197 self.urls.lock().unwrap().push(uri); 3198 } 3199 fn remove(&self, _uri: &AtUri<DefaultStr>) {} 3200 } 3201 let capturing = Arc::new(Capturing::default()); 3202 3203 let runtime: IngestRuntime<NoopSearchSink> = IngestRuntime { 3204 store: Arc::new(EdgeStore::new(RuntimeHasher::default())), 3205 issue_states: Arc::new(StateIndex::new(RuntimeHasher::default())), 3206 pull_statuses: Arc::new(StateIndex::new(RuntimeHasher::default())), 3207 coverage: Arc::new(CoverageWatch::new()), 3208 search: Arc::new(NoopSearchSink), 3209 records: capturing.clone() as Arc<dyn RecordStore>, 3210 resolver, 3211 clock, 3212 entropy: Arc::new(OsEntropy), 3213 ws: TungsteniteWs::shared(), 3214 cancel: CancellationToken::new(), 3215 disconnects: None, 3216 warming_shadow: None, 3217 warming_buffer: None, 3218 knot_registry: None, 3219 knot_gate: None, 3220 }; 3221 3222 let parallelism = 4usize; 3223 let (frame_tx, frame_rx) = tokio::sync::mpsc::channel::<HydrantFrame>(64); 3224 let pipeline_rt = runtime.clone(); 3225 let pipeline = tokio::spawn(async move { 3226 let prep_rt = pipeline_rt.clone(); 3227 let resolve_rt = pipeline_rt.clone(); 3228 let commit_rt = pipeline_rt; 3229 ReceiverStream::new(frame_rx) 3230 .then(move |frame| prep_stage(frame, prep_rt.clone())) 3231 .map(move |staged| resolve_stage(staged, resolve_rt.clone())) 3232 .buffered(parallelism) 3233 .for_each(move |staged| commit_stage(staged, commit_rt.clone(), parallelism)) 3234 .await; 3235 }); 3236 3237 let mk = |id: u64, idx: u64, repo_rkey: &Rkey<DefaultStr>| -> HydrantFrame { 3238 parse_frame(json!({ 3239 "id": id, 3240 "type": "record", 3241 "record": { 3242 "live": false, 3243 "did": "did:plc:olaren", 3244 "rev": fresh_tid().as_str(), 3245 "collection": "sh.tangled.feed.star", 3246 "rkey": format!("starrkeya{idx:04}"), 3247 "action": "create", 3248 "cid": VALID_CID, 3249 "record": { 3250 "$type": "sh.tangled.feed.star", 3251 "createdAt": "2026-05-01T00:00:00Z", 3252 "subject": format!("at://did:plc:nel/sh.tangled.repo/{}", repo_rkey.as_ref()), 3253 } 3254 } 3255 })) 3256 }; 3257 3258 frame_tx.send(mk(1, 1, &slow_rkeys[0])).await.unwrap(); 3259 frame_tx.send(mk(2, 2, &fast_rkeys[0])).await.unwrap(); 3260 frame_tx.send(mk(3, 3, &slow_rkeys[1])).await.unwrap(); 3261 frame_tx.send(mk(4, 4, &fast_rkeys[1])).await.unwrap(); 3262 drop(frame_tx); 3263 3264 pipeline.await.unwrap(); 3265 3266 let captured = capturing.urls.lock().unwrap(); 3267 let rkeys: Vec<String> = captured 3268 .iter() 3269 .filter_map(|uri| uri.rkey().map(|r| r.as_ref().to_owned())) 3270 .collect(); 3271 assert_eq!( 3272 rkeys, 3273 vec![ 3274 "starrkeya0001".to_owned(), 3275 "starrkeya0002".to_owned(), 3276 "starrkeya0003".to_owned(), 3277 "starrkeya0004".to_owned(), 3278 ], 3279 "buffered(N) must preserve cursor order even when resolves complete out of order, with ~150ms slow vs cache-hit fast as the latency skew here", 3280 ); 3281 assert_eq!(runtime.coverage.snapshot().last_cursor().raw(), 4); 3282 } 3283}