Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1mod legacy_upgrade; 2mod normalize; 3 4pub use legacy_upgrade::{ 5 DecodedRecord, decode_canon_or_upgrade, decode_canon_or_upgrade_bytes, normalize_record_fields, 6 scrub_record_bytes, synthesize_created_at, upgrade, upgrade_wire_bytes, 7}; 8pub use normalize::NormalizeRepoRefs; 9 10use std::sync::Arc; 11use std::sync::atomic::{AtomicU64, Ordering}; 12use std::time::Duration; 13 14use tokio::time::Instant; 15 16use bobbin_runtime::{Clock, RuntimeHasher}; 17use bobbin_slingshot_client::{SlingshotClient, SlingshotError}; 18use bobbin_types::edges::{ExtractError, Record}; 19use bobbin_types::ids::{RepoIdent, nsid_static}; 20use jacquard_common::DefaultStr; 21use jacquard_common::types::did::Did; 22use jacquard_common::types::nsid::Nsid; 23use jacquard_common::types::recordkey::Rkey; 24use scc::HashMap as SccMap; 25use tokio::sync::OnceCell; 26use tracing::warn; 27 28const REPO_COLLECTION: &str = "sh.tangled.repo"; 29const TRANSIENT_TTL: Duration = Duration::from_secs(60); 30 31#[derive(Clone, Debug, Eq, PartialEq)] 32pub enum Resolution { 33 Mapped(Did<DefaultStr>), 34 NoRepoDid, 35 Unresolvable, 36} 37 38#[derive(Clone, Debug, Eq, PartialEq)] 39enum AuthoritativeResolution { 40 Mapped(Did<DefaultStr>), 41 NoRepoDid, 42} 43 44impl AuthoritativeResolution { 45 fn from_repo_did(repo_did: Option<Did<DefaultStr>>) -> Self { 46 match repo_did { 47 Some(did) => Self::Mapped(did), 48 None => Self::NoRepoDid, 49 } 50 } 51} 52 53#[derive(Clone, Debug, Eq, PartialEq)] 54enum CacheEntry { 55 Authoritative(AuthoritativeResolution), 56 Provisional(Resolution), 57 Transient { expires_at: Instant }, 58} 59 60impl CacheEntry { 61 fn into_resolution(self) -> Resolution { 62 match self { 63 Self::Authoritative(AuthoritativeResolution::Mapped(did)) => Resolution::Mapped(did), 64 Self::Authoritative(AuthoritativeResolution::NoRepoDid) => Resolution::NoRepoDid, 65 Self::Provisional(r) => r, 66 Self::Transient { .. } => Resolution::Unresolvable, 67 } 68 } 69 70 fn is_expired_transient(&self, now: Instant) -> bool { 71 matches!(self, Self::Transient { expires_at } if *expires_at <= now) 72 } 73} 74 75#[derive(Default)] 76pub struct ResolverStats { 77 hits: AtomicU64, 78 misses_mapped: AtomicU64, 79 misses_no_repo_did: AtomicU64, 80 misses_unresolvable: AtomicU64, 81 misses_transient: AtomicU64, 82 misses_no_client: AtomicU64, 83 miss_latency_micros_sum: AtomicU64, 84 miss_latency_micros_max: AtomicU64, 85} 86 87#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] 88pub struct ResolverStatsSnapshot { 89 pub hits: u64, 90 pub misses_mapped: u64, 91 pub misses_no_repo_did: u64, 92 pub misses_unresolvable: u64, 93 pub misses_transient: u64, 94 pub misses_no_client: u64, 95 pub miss_latency_micros_sum: u64, 96 pub miss_latency_micros_max: u64, 97} 98 99impl ResolverStatsSnapshot { 100 pub fn miss_count(&self) -> u64 { 101 self.misses_mapped 102 + self.misses_no_repo_did 103 + self.misses_unresolvable 104 + self.misses_transient 105 + self.misses_no_client 106 } 107 108 pub fn total(&self) -> u64 { 109 self.hits + self.miss_count() 110 } 111 112 pub fn miss_latency_micros_avg(&self) -> Option<u64> { 113 let misses = self.miss_count() - self.misses_no_client; 114 (misses > 0).then(|| self.miss_latency_micros_sum / misses) 115 } 116} 117 118#[derive(Clone, Copy)] 119enum MissKind { 120 Mapped, 121 NoRepoDid, 122 Unresolvable, 123 Transient, 124 NoClient, 125} 126 127impl ResolverStats { 128 fn record_hit(&self) { 129 self.hits.fetch_add(1, Ordering::Relaxed); 130 } 131 132 fn record_miss(&self, kind: MissKind, latency: Option<Duration>) { 133 let counter = match kind { 134 MissKind::Mapped => &self.misses_mapped, 135 MissKind::NoRepoDid => &self.misses_no_repo_did, 136 MissKind::Unresolvable => &self.misses_unresolvable, 137 MissKind::Transient => &self.misses_transient, 138 MissKind::NoClient => &self.misses_no_client, 139 }; 140 counter.fetch_add(1, Ordering::Relaxed); 141 if let Some(latency) = latency { 142 let micros = u64::try_from(latency.as_micros()).unwrap_or(u64::MAX); 143 self.miss_latency_micros_sum 144 .fetch_add(micros, Ordering::Relaxed); 145 self.miss_latency_micros_max 146 .fetch_max(micros, Ordering::Relaxed); 147 } 148 } 149 150 pub fn snapshot(&self) -> ResolverStatsSnapshot { 151 ResolverStatsSnapshot { 152 hits: self.hits.load(Ordering::Relaxed), 153 misses_mapped: self.misses_mapped.load(Ordering::Relaxed), 154 misses_no_repo_did: self.misses_no_repo_did.load(Ordering::Relaxed), 155 misses_unresolvable: self.misses_unresolvable.load(Ordering::Relaxed), 156 misses_transient: self.misses_transient.load(Ordering::Relaxed), 157 misses_no_client: self.misses_no_client.load(Ordering::Relaxed), 158 miss_latency_micros_sum: self.miss_latency_micros_sum.load(Ordering::Relaxed), 159 miss_latency_micros_max: self.miss_latency_micros_max.load(Ordering::Relaxed), 160 } 161 } 162} 163 164struct SlingshotProbe { 165 client: SlingshotClient, 166 clock: Arc<dyn Clock>, 167} 168 169pub struct RepoIdResolver { 170 cache: SccMap<RepoIdent, CacheEntry, RuntimeHasher>, 171 by_repo_did: SccMap<Did<DefaultStr>, RepoIdent, RuntimeHasher>, 172 in_flight: SccMap<RepoIdent, Arc<OnceCell<Resolution>>, RuntimeHasher>, 173 probe: Option<SlingshotProbe>, 174 stats: ResolverStats, 175} 176 177impl RepoIdResolver { 178 pub fn with_slingshot( 179 client: SlingshotClient, 180 clock: Arc<dyn Clock>, 181 hasher: RuntimeHasher, 182 ) -> Self { 183 Self { 184 cache: SccMap::with_hasher(hasher.clone()), 185 by_repo_did: SccMap::with_hasher(hasher.clone()), 186 in_flight: SccMap::with_hasher(hasher), 187 probe: Some(SlingshotProbe { client, clock }), 188 stats: ResolverStats::default(), 189 } 190 } 191 192 pub fn detached(hasher: RuntimeHasher) -> Self { 193 Self { 194 cache: SccMap::with_hasher(hasher.clone()), 195 by_repo_did: SccMap::with_hasher(hasher.clone()), 196 in_flight: SccMap::with_hasher(hasher), 197 probe: None, 198 stats: ResolverStats::default(), 199 } 200 } 201 202 pub fn stats(&self) -> ResolverStatsSnapshot { 203 self.stats.snapshot() 204 } 205 206 pub async fn cached_resolution( 207 &self, 208 owner: &Did<DefaultStr>, 209 rkey: &Rkey<DefaultStr>, 210 ) -> Option<Resolution> { 211 let key = RepoIdent::new(owner.clone(), rkey.clone()); 212 let entry = self.cache.get_async(&key).await?; 213 let now = self.probe.as_ref().map(|p| p.clock.now_instant()); 214 if let Some(now) = now 215 && entry.get().is_expired_transient(now) 216 { 217 return None; 218 } 219 Some(entry.get().clone().into_resolution()) 220 } 221 222 pub async fn lookup_by_repo_did(&self, repo_did: &Did<DefaultStr>) -> Option<RepoIdent> { 223 self.by_repo_did 224 .get_async(repo_did) 225 .await 226 .map(|e| e.get().clone()) 227 } 228 229 pub async fn observe( 230 &self, 231 owner: Did<DefaultStr>, 232 rkey: Rkey<DefaultStr>, 233 repo_did: Option<Did<DefaultStr>>, 234 ) -> Option<RepoIdent> { 235 let ident = RepoIdent::new(owner, rkey); 236 let entry = 237 CacheEntry::Authoritative(AuthoritativeResolution::from_repo_did(repo_did.clone())); 238 self.cache 239 .entry_async(ident.clone()) 240 .await 241 .and_modify(|existing| *existing = entry.clone()) 242 .or_insert(entry); 243 244 let repo_did = repo_did?; 245 let mut prior: Option<RepoIdent> = None; 246 self.by_repo_did 247 .entry_async(repo_did) 248 .await 249 .and_modify(|existing| { 250 if *existing != ident { 251 prior = Some(existing.clone()); 252 *existing = ident.clone(); 253 } 254 }) 255 .or_insert(ident); 256 prior 257 } 258 259 pub async fn forget(&self, owner: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) { 260 let ident = RepoIdent::new(owner.clone(), rkey.clone()); 261 let prior_resolution = self 262 .cache 263 .remove_async(&ident) 264 .await 265 .map(|(_, entry)| entry.into_resolution()); 266 if let Some(Resolution::Mapped(repo_did)) = prior_resolution { 267 self.by_repo_did 268 .remove_if_async(&repo_did, |existing| *existing == ident) 269 .await; 270 } 271 } 272 273 async fn fill_provisional(&self, key: RepoIdent, resolution: Resolution) { 274 let entry = CacheEntry::Provisional(resolution); 275 self.cache 276 .entry_async(key) 277 .await 278 .and_modify(|existing| { 279 if matches!(existing, CacheEntry::Authoritative(_)) { 280 return; 281 } 282 *existing = entry.clone(); 283 }) 284 .or_insert(entry); 285 } 286 287 async fn fill_transient(&self, key: RepoIdent, expires_at: Instant) { 288 let entry = CacheEntry::Transient { expires_at }; 289 self.cache 290 .entry_async(key) 291 .await 292 .and_modify(|existing| { 293 if matches!(existing, CacheEntry::Authoritative(_)) { 294 return; 295 } 296 *existing = entry.clone(); 297 }) 298 .or_insert(entry); 299 } 300 301 pub async fn resolve(&self, owner: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) -> Resolution { 302 let key = RepoIdent::new(owner.clone(), rkey.clone()); 303 304 let Some(probe) = self.probe.as_ref() else { 305 if let Some(entry) = self.cache.get_async(&key).await { 306 self.stats.record_hit(); 307 return entry.get().clone().into_resolution(); 308 } 309 self.stats.record_miss(MissKind::NoClient, None); 310 return Resolution::Unresolvable; 311 }; 312 313 let now = probe.clock.now_instant(); 314 if let Some(entry) = self.cache.get_async(&key).await 315 && !entry.get().is_expired_transient(now) 316 { 317 self.stats.record_hit(); 318 return entry.get().clone().into_resolution(); 319 } 320 321 let cell: Arc<OnceCell<Resolution>> = self 322 .in_flight 323 .entry_async(key.clone()) 324 .await 325 .or_insert_with(|| Arc::new(OnceCell::new())) 326 .get() 327 .clone(); 328 329 let result = cell 330 .get_or_init(|| async { self.fetch_repo_did(owner, rkey, &key).await }) 331 .await 332 .clone(); 333 334 self.in_flight.remove_async(&key).await; 335 336 result 337 } 338 339 async fn fetch_repo_did( 340 &self, 341 owner: &Did<DefaultStr>, 342 rkey: &Rkey<DefaultStr>, 343 key: &RepoIdent, 344 ) -> Resolution { 345 let probe = self 346 .probe 347 .as_ref() 348 .expect("fetch_repo_did is only called when a probe is present"); 349 let started = probe.clock.now_instant(); 350 let nsid: Nsid<DefaultStr> = nsid_static(REPO_COLLECTION); 351 let provisional = match probe.client.get_record(owner, &nsid, rkey).await { 352 Ok(body) => match repo_did_from_body(&nsid, &body.value) { 353 Ok(Some(did)) => Resolution::Mapped(did), 354 Ok(None) => Resolution::NoRepoDid, 355 Err(e) => { 356 warn!( 357 error = ?e, 358 owner = owner.as_ref(), 359 rkey = rkey.as_ref(), 360 "slingshot returned unparseable repo body, caching as unresolvable", 361 ); 362 Resolution::Unresolvable 363 } 364 }, 365 Err(SlingshotError::NotFound) => { 366 warn!( 367 owner = owner.as_ref(), 368 rkey = rkey.as_ref(), 369 "no repo record on slingshot, caching as unresolvable", 370 ); 371 Resolution::Unresolvable 372 } 373 Err(ref e) if is_garbage_response(e) => { 374 warn!( 375 error = ?e, 376 owner = owner.as_ref(), 377 rkey = rkey.as_ref(), 378 "slingshot returned malformed response, caching as unresolvable", 379 ); 380 Resolution::Unresolvable 381 } 382 Err(e) => { 383 warn!( 384 error = ?e, 385 owner = owner.as_ref(), 386 rkey = rkey.as_ref(), 387 "caching transient slingshot failure for repoDID lookup under short TTL", 388 ); 389 let elapsed = probe.clock.now_instant().duration_since(started); 390 self.stats.record_miss(MissKind::Transient, Some(elapsed)); 391 let expires_at = probe.clock.now_instant() + TRANSIENT_TTL; 392 self.fill_transient(key.clone(), expires_at).await; 393 return Resolution::Unresolvable; 394 } 395 }; 396 let elapsed = probe.clock.now_instant().duration_since(started); 397 let kind = match &provisional { 398 Resolution::Mapped(_) => MissKind::Mapped, 399 Resolution::NoRepoDid => MissKind::NoRepoDid, 400 Resolution::Unresolvable => MissKind::Unresolvable, 401 }; 402 self.stats.record_miss(kind, Some(elapsed)); 403 self.fill_provisional(key.clone(), provisional.clone()) 404 .await; 405 provisional 406 } 407} 408 409fn is_garbage_response(err: &SlingshotError) -> bool { 410 matches!( 411 err, 412 SlingshotError::Decode(_) 413 | SlingshotError::MissingField(_) 414 | SlingshotError::InvalidAtUri(_) 415 | SlingshotError::InvalidCid(_) 416 | SlingshotError::UriMismatch { .. }, 417 ) 418} 419 420fn repo_did_from_body( 421 nsid: &Nsid<DefaultStr>, 422 body: &[u8], 423) -> Result<Option<Did<DefaultStr>>, ExtractError> { 424 match DecodedRecord::try_decode(nsid, body)? { 425 DecodedRecord::Canon(Record::Repo(repo)) => Ok(repo.repo_did), 426 DecodedRecord::Canon(_) | DecodedRecord::Legacy(_) => Ok(None), 427 } 428} 429 430#[cfg(test)] 431mod tests { 432 use super::*; 433 use bobbin_runtime::SystemClock; 434 use jacquard_common::types::did::Did; 435 use jacquard_common::types::recordkey::Rkey; 436 437 fn did(s: &str) -> Did<DefaultStr> { 438 Did::new_owned(s).unwrap() 439 } 440 441 fn rkey(s: &str) -> Rkey<DefaultStr> { 442 Rkey::new_owned(s).unwrap() 443 } 444 445 fn test_clock() -> Arc<dyn Clock> { 446 Arc::new(SystemClock::new()) 447 } 448 449 #[tokio::test] 450 async fn observation_returns_prior_ident_when_repo_did_moves() { 451 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 452 let prior = resolver 453 .observe( 454 did("did:plc:nel"), 455 rkey("3liuighjy2h22"), 456 Some(did("did:plc:clam")), 457 ) 458 .await; 459 assert!(prior.is_none(), "first observation has no prior"); 460 461 let prior = resolver 462 .observe(did("did:plc:nel"), rkey("core"), Some(did("did:plc:clam"))) 463 .await; 464 assert_eq!( 465 prior, 466 Some(RepoIdent::new(did("did:plc:nel"), rkey("3liuighjy2h22"))), 467 "same repoDID at a new (owner, rkey) returns the prior ident so callers can evict the stale at-uri", 468 ); 469 470 let prior = resolver 471 .observe(did("did:plc:nel"), rkey("core"), Some(did("did:plc:clam"))) 472 .await; 473 assert!(prior.is_none(), "re-observing the same ident is a no-op"); 474 } 475 476 #[tokio::test] 477 async fn observation_without_repo_did_does_not_track_reverse() { 478 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 479 let prior = resolver 480 .observe(did("did:plc:nel"), rkey("abcabcabcabcz"), None) 481 .await; 482 assert!(prior.is_none()); 483 } 484 485 #[tokio::test] 486 async fn forget_clears_reverse_only_when_still_owned() { 487 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 488 resolver 489 .observe( 490 did("did:plc:nel"), 491 rkey("3liuighjy2h22"), 492 Some(did("did:plc:clam")), 493 ) 494 .await; 495 resolver 496 .observe(did("did:plc:nel"), rkey("core"), Some(did("did:plc:clam"))) 497 .await; 498 499 resolver 500 .forget(&did("did:plc:nel"), &rkey("3liuighjy2h22")) 501 .await; 502 503 let prior = resolver 504 .observe( 505 did("did:plc:nel"), 506 rkey("core-renamed"), 507 Some(did("did:plc:clam")), 508 ) 509 .await; 510 assert_eq!( 511 prior, 512 Some(RepoIdent::new(did("did:plc:nel"), rkey("core"))), 513 "stale at-uri's forget must not displace the live owner of did:plc:clam", 514 ); 515 } 516 517 #[tokio::test] 518 async fn observation_with_repo_did_resolves_mapped() { 519 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 520 resolver 521 .observe( 522 did("did:plc:nel"), 523 rkey("abcabcabcabcz"), 524 Some(did("did:plc:clam")), 525 ) 526 .await; 527 let got = resolver 528 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz")) 529 .await; 530 assert_eq!(got, Resolution::Mapped(did("did:plc:clam"))); 531 } 532 533 #[tokio::test] 534 async fn observation_without_repo_did_resolves_no_repo_did() { 535 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 536 resolver 537 .observe(did("did:plc:nel"), rkey("abcabcabcabcz"), None) 538 .await; 539 let got = resolver 540 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz")) 541 .await; 542 assert_eq!( 543 got, 544 Resolution::NoRepoDid, 545 "observed but empty repoDID is a definitive answer not a lookup failure", 546 ); 547 } 548 549 #[tokio::test] 550 async fn cache_miss_without_client_is_unresolvable() { 551 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 552 let got = resolver 553 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz")) 554 .await; 555 assert_eq!(got, Resolution::Unresolvable); 556 } 557 558 #[tokio::test] 559 async fn lookup_by_repo_did_finds_observed_ident() { 560 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 561 resolver 562 .observe( 563 did("did:plc:nel"), 564 rkey("abcabcabcabcz"), 565 Some(did("did:plc:limpet")), 566 ) 567 .await; 568 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await; 569 assert_eq!( 570 got, 571 Some(RepoIdent::new(did("did:plc:nel"), rkey("abcabcabcabcz"))), 572 ); 573 } 574 575 #[tokio::test] 576 async fn lookup_by_repo_did_misses_when_unobserved() { 577 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 578 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await; 579 assert_eq!(got, None); 580 } 581 582 #[tokio::test] 583 async fn lookup_by_repo_did_misses_when_repo_did_was_none() { 584 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 585 resolver 586 .observe(did("did:plc:nel"), rkey("abcabcabcabcz"), None) 587 .await; 588 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await; 589 assert_eq!(got, None); 590 } 591 592 #[tokio::test] 593 async fn lookup_by_repo_did_follows_move_to_new_ident() { 594 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 595 resolver 596 .observe( 597 did("did:plc:nel"), 598 rkey("abcabcabcabcz"), 599 Some(did("did:plc:limpet")), 600 ) 601 .await; 602 resolver 603 .observe( 604 did("did:plc:olaren"), 605 rkey("xyzxyzxyzxyzx"), 606 Some(did("did:plc:limpet")), 607 ) 608 .await; 609 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await; 610 assert_eq!( 611 got, 612 Some(RepoIdent::new(did("did:plc:olaren"), rkey("xyzxyzxyzxyzx"))), 613 ); 614 } 615 616 #[tokio::test] 617 async fn observation_overwrites_prior_value() { 618 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 619 resolver 620 .observe( 621 did("did:plc:nel"), 622 rkey("abcabcabcabcz"), 623 Some(did("did:plc:clam")), 624 ) 625 .await; 626 resolver 627 .observe( 628 did("did:plc:nel"), 629 rkey("abcabcabcabcz"), 630 Some(did("did:plc:uni")), 631 ) 632 .await; 633 let got = resolver 634 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz")) 635 .await; 636 assert_eq!(got, Resolution::Mapped(did("did:plc:uni"))); 637 } 638 639 #[tokio::test] 640 async fn fill_provisional_does_not_downgrade_authoritative_mapped() { 641 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 642 let owner = did("did:plc:nel"); 643 let key = rkey("abcabcabcabcz"); 644 resolver 645 .observe(owner.clone(), key.clone(), Some(did("did:plc:clam"))) 646 .await; 647 resolver 648 .fill_provisional( 649 RepoIdent::new(owner.clone(), key.clone()), 650 Resolution::Unresolvable, 651 ) 652 .await; 653 let got = resolver.resolve(&owner, &key).await; 654 assert_eq!( 655 got, 656 Resolution::Mapped(did("did:plc:clam")), 657 "firehose-observed mapping must outrank provisional slingshot info", 658 ); 659 } 660 661 #[tokio::test] 662 async fn fill_provisional_does_not_downgrade_authoritative_no_repo_did() { 663 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 664 let owner = did("did:plc:nel"); 665 let key = rkey("abcabcabcabcz"); 666 resolver.observe(owner.clone(), key.clone(), None).await; 667 resolver 668 .fill_provisional( 669 RepoIdent::new(owner.clone(), key.clone()), 670 Resolution::Mapped(did("did:plc:clam")), 671 ) 672 .await; 673 let got = resolver.resolve(&owner, &key).await; 674 assert_eq!( 675 got, 676 Resolution::NoRepoDid, 677 "an authoritative empty observation must outrank provisional slingshot info even when slingshot disagrees", 678 ); 679 } 680 681 #[tokio::test] 682 async fn slingshot_404_caches_as_unresolvable() { 683 let server = wiremock::MockServer::start().await; 684 wiremock::Mock::given(wiremock::matchers::method("GET")) 685 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 686 .respond_with(wiremock::ResponseTemplate::new(404)) 687 .expect(1) 688 .mount(&server) 689 .await; 690 691 let client = 692 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 693 let resolver = 694 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 695 696 let owner = did("did:plc:nel"); 697 let key = rkey("abcabcabcabcz"); 698 let first = resolver.resolve(&owner, &key).await; 699 let second = resolver.resolve(&owner, &key).await; 700 assert_eq!(first, Resolution::Unresolvable); 701 assert_eq!(second, Resolution::Unresolvable); 702 } 703 704 #[tokio::test] 705 async fn slingshot_malformed_envelope_caches_as_unresolvable() { 706 let server = wiremock::MockServer::start().await; 707 wiremock::Mock::given(wiremock::matchers::method("GET")) 708 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 709 .respond_with( 710 wiremock::ResponseTemplate::new(200) 711 .insert_header("content-type", "application/json") 712 .set_body_string("not json"), 713 ) 714 .expect(1) 715 .mount(&server) 716 .await; 717 718 let client = 719 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 720 let resolver = 721 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 722 723 let owner = did("did:plc:nel"); 724 let key = rkey("abcabcabcabcz"); 725 let first = resolver.resolve(&owner, &key).await; 726 let second = resolver.resolve(&owner, &key).await; 727 assert_eq!(first, Resolution::Unresolvable); 728 assert_eq!( 729 second, 730 Resolution::Unresolvable, 731 "garbage envelopes are stable across retries, so caching avoids hammering slingshot", 732 ); 733 } 734 735 #[tokio::test] 736 async fn slingshot_uri_mismatch_caches_as_unresolvable() { 737 let server = wiremock::MockServer::start().await; 738 let body = serde_json::json!({ 739 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere", 740 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i", 741 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z"} 742 }); 743 wiremock::Mock::given(wiremock::matchers::method("GET")) 744 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 745 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body)) 746 .expect(1) 747 .mount(&server) 748 .await; 749 750 let client = 751 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 752 let resolver = 753 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 754 755 let owner = did("did:plc:nel"); 756 let key = rkey("abcabcabcabcz"); 757 let first = resolver.resolve(&owner, &key).await; 758 let second = resolver.resolve(&owner, &key).await; 759 assert_eq!(first, Resolution::Unresolvable); 760 assert_eq!(second, Resolution::Unresolvable); 761 } 762 763 #[tokio::test] 764 async fn slingshot_legacy_repo_body_resolves_no_repo_did_not_unresolvable() { 765 let server = wiremock::MockServer::start().await; 766 let body = serde_json::json!({ 767 "uri": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz", 768 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i", 769 "value": { 770 "$type": "sh.tangled.repo", 771 "addedAt": "2025-03-07T21:47:53Z", 772 "knot": "knot1.tangled.sh", 773 "name": "scallop", 774 "owner": "did:plc:nel", 775 }, 776 }); 777 wiremock::Mock::given(wiremock::matchers::method("GET")) 778 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 779 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body)) 780 .expect(1) 781 .mount(&server) 782 .await; 783 784 let client = 785 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 786 let resolver = 787 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 788 789 let owner = did("did:plc:nel"); 790 let key = rkey("abcabcabcabcz"); 791 let got = resolver.resolve(&owner, &key).await; 792 assert_eq!( 793 got, 794 Resolution::NoRepoDid, 795 "legacy repo wires without a repo_did parse via legacy upgrade and resolve as NoRepoDid, not Unresolvable", 796 ); 797 } 798 799 #[tokio::test] 800 async fn slingshot_unparseable_repo_value_caches_as_unresolvable() { 801 let server = wiremock::MockServer::start().await; 802 let body = serde_json::json!({ 803 "uri": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz", 804 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i", 805 "value": {"$type": "sh.tangled.repo"} 806 }); 807 wiremock::Mock::given(wiremock::matchers::method("GET")) 808 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 809 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body)) 810 .expect(1) 811 .mount(&server) 812 .await; 813 814 let client = 815 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 816 let resolver = 817 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 818 819 let owner = did("did:plc:nel"); 820 let key = rkey("abcabcabcabcz"); 821 let first = resolver.resolve(&owner, &key).await; 822 let second = resolver.resolve(&owner, &key).await; 823 assert_eq!( 824 first, 825 Resolution::Unresolvable, 826 "a repo body that fails lexicon validation must not be conflated with NoRepoDid", 827 ); 828 assert_eq!(second, Resolution::Unresolvable); 829 } 830 831 #[tokio::test] 832 async fn slingshot_transport_error_caches_with_short_ttl() { 833 let server = wiremock::MockServer::start().await; 834 wiremock::Mock::given(wiremock::matchers::method("GET")) 835 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 836 .respond_with(wiremock::ResponseTemplate::new(503)) 837 .expect(1) 838 .mount(&server) 839 .await; 840 841 let client = 842 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 843 let resolver = 844 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 845 846 let owner = did("did:plc:nel"); 847 let key = rkey("abcabcabcabcz"); 848 let first = resolver.resolve(&owner, &key).await; 849 let second = resolver.resolve(&owner, &key).await; 850 assert_eq!(first, Resolution::Unresolvable); 851 assert_eq!( 852 second, 853 Resolution::Unresolvable, 854 "transient TTL must suppress immediate re-hammering of a sick upstream", 855 ); 856 } 857 858 #[tokio::test] 859 async fn slingshot_transient_recorded_separately_from_unresolvable() { 860 let server = wiremock::MockServer::start().await; 861 wiremock::Mock::given(wiremock::matchers::method("GET")) 862 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 863 .respond_with(wiremock::ResponseTemplate::new(503)) 864 .expect(1) 865 .mount(&server) 866 .await; 867 868 let client = 869 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 870 let resolver = 871 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 872 873 let owner = did("did:plc:nel"); 874 let key = rkey("abcabcabcabcz"); 875 resolver.resolve(&owner, &key).await; 876 resolver.resolve(&owner, &key).await; 877 878 let snap = resolver.stats(); 879 assert_eq!( 880 snap.misses_transient, 1, 881 "second resolve must hit the short-TTL cache instead of re-firing the transient miss", 882 ); 883 assert_eq!(snap.hits, 1, "second call hits cached transient entry"); 884 assert_eq!( 885 snap.misses_unresolvable, 0, 886 "canonical unresolvable counter is reserved for cached terminal answers", 887 ); 888 assert!( 889 snap.miss_latency_micros_sum > 0, 890 "transient misses still have latency contributions", 891 ); 892 assert_eq!(snap.miss_count(), 1); 893 } 894 895 #[tokio::test] 896 async fn slingshot_in_flight_requests_coalesce() { 897 let server = wiremock::MockServer::start().await; 898 let body = serde_json::json!({ 899 "uri": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz", 900 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i", 901 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z", "repoDid": "did:plc:limpet"} 902 }); 903 wiremock::Mock::given(wiremock::matchers::method("GET")) 904 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 905 .respond_with( 906 wiremock::ResponseTemplate::new(200) 907 .set_body_json(body) 908 .set_delay(Duration::from_millis(200)), 909 ) 910 .expect(1) 911 .mount(&server) 912 .await; 913 914 let client = 915 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 916 let resolver = Arc::new(RepoIdResolver::with_slingshot( 917 client, 918 test_clock(), 919 RuntimeHasher::default(), 920 )); 921 922 let owner = did("did:plc:nel"); 923 let key = rkey("abcabcabcabcz"); 924 let r0 = resolver.clone(); 925 let r1 = resolver.clone(); 926 let r2 = resolver.clone(); 927 let o0 = owner.clone(); 928 let o1 = owner.clone(); 929 let o2 = owner.clone(); 930 let k0 = key.clone(); 931 let k1 = key.clone(); 932 let k2 = key.clone(); 933 let (a, b, c) = tokio::join!( 934 tokio::spawn(async move { r0.resolve(&o0, &k0).await }), 935 tokio::spawn(async move { r1.resolve(&o1, &k1).await }), 936 tokio::spawn(async move { r2.resolve(&o2, &k2).await }), 937 ); 938 let expected = Resolution::Mapped(did("did:plc:limpet")); 939 assert_eq!(a.unwrap(), expected); 940 assert_eq!(b.unwrap(), expected); 941 assert_eq!(c.unwrap(), expected); 942 943 let snap = resolver.stats(); 944 assert_eq!( 945 snap.misses_mapped, 1, 946 "only the winning task pays the slingshot RTT", 947 ); 948 } 949 950 #[tokio::test] 951 async fn stats_count_hits_misses_and_latency() { 952 let server = wiremock::MockServer::start().await; 953 wiremock::Mock::given(wiremock::matchers::method("GET")) 954 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord")) 955 .respond_with(wiremock::ResponseTemplate::new(404)) 956 .mount(&server) 957 .await; 958 let client = 959 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap(); 960 let resolver = 961 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default()); 962 963 let owner = did("did:plc:nel"); 964 let key = rkey("abcabcabcabcz"); 965 resolver.resolve(&owner, &key).await; 966 resolver.resolve(&owner, &key).await; 967 968 let snap = resolver.stats(); 969 assert_eq!( 970 snap.misses_unresolvable, 1, 971 "first call is the slingshot miss" 972 ); 973 assert_eq!(snap.hits, 1, "second call hits the unresolvable cache"); 974 assert_eq!(snap.miss_count(), 1); 975 assert_eq!(snap.total(), 2); 976 assert!( 977 snap.miss_latency_micros_sum > 0, 978 "latency recorded for slingshot miss" 979 ); 980 assert!(snap.miss_latency_micros_avg().unwrap() > 0); 981 } 982 983 #[tokio::test] 984 async fn stats_no_client_miss_recorded_without_latency() { 985 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 986 resolver 987 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz")) 988 .await; 989 let snap = resolver.stats(); 990 assert_eq!(snap.misses_no_client, 1); 991 assert_eq!(snap.miss_latency_micros_sum, 0); 992 assert_eq!(snap.miss_latency_micros_avg(), None); 993 } 994 995 #[tokio::test] 996 async fn firehose_observe_can_demote_provisional() { 997 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 998 let owner = did("did:plc:nel"); 999 let key = rkey("abcabcabcabcz"); 1000 resolver 1001 .fill_provisional( 1002 RepoIdent::new(owner.clone(), key.clone()), 1003 Resolution::Mapped(did("did:plc:clam")), 1004 ) 1005 .await; 1006 resolver.observe(owner.clone(), key.clone(), None).await; 1007 let got = resolver.resolve(&owner, &key).await; 1008 assert_eq!( 1009 got, 1010 Resolution::NoRepoDid, 1011 "firehose update is canonical and may legitimately remove repoDID", 1012 ); 1013 } 1014 1015 #[tokio::test] 1016 async fn forget_removes_cache_entry() { 1017 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 1018 let owner = did("did:plc:nel"); 1019 let key = rkey("abcabcabcabcz"); 1020 resolver 1021 .observe(owner.clone(), key.clone(), Some(did("did:plc:clam"))) 1022 .await; 1023 assert_eq!( 1024 resolver.cached_resolution(&owner, &key).await, 1025 Some(Resolution::Mapped(did("did:plc:clam"))), 1026 ); 1027 resolver.forget(&owner, &key).await; 1028 assert_eq!( 1029 resolver.cached_resolution(&owner, &key).await, 1030 None, 1031 "forget must drop the entry entirely so a subsequent observe can supply fresh state", 1032 ); 1033 } 1034}