Monorepo for Tangled
tangled.org
1mod legacy_upgrade;
2mod normalize;
3
4pub use legacy_upgrade::{
5 DecodedRecord, decode_canon_or_upgrade, decode_canon_or_upgrade_bytes, normalize_record_fields,
6 scrub_record_bytes, synthesize_created_at, upgrade, upgrade_wire_bytes,
7};
8pub use normalize::NormalizeRepoRefs;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::Duration;
13
14use tokio::time::Instant;
15
16use bobbin_runtime::{Clock, RuntimeHasher};
17use bobbin_slingshot_client::{SlingshotClient, SlingshotError};
18use bobbin_types::edges::{ExtractError, Record};
19use bobbin_types::ids::{RepoIdent, nsid_static};
20use jacquard_common::DefaultStr;
21use jacquard_common::types::did::Did;
22use jacquard_common::types::nsid::Nsid;
23use jacquard_common::types::recordkey::Rkey;
24use scc::HashMap as SccMap;
25use tokio::sync::OnceCell;
26use tracing::warn;
27
28const REPO_COLLECTION: &str = "sh.tangled.repo";
29const TRANSIENT_TTL: Duration = Duration::from_secs(60);
30
31#[derive(Clone, Debug, Eq, PartialEq)]
32pub enum Resolution {
33 Mapped(Did<DefaultStr>),
34 NoRepoDid,
35 Unresolvable,
36}
37
38#[derive(Clone, Debug, Eq, PartialEq)]
39enum AuthoritativeResolution {
40 Mapped(Did<DefaultStr>),
41 NoRepoDid,
42}
43
44impl AuthoritativeResolution {
45 fn from_repo_did(repo_did: Option<Did<DefaultStr>>) -> Self {
46 match repo_did {
47 Some(did) => Self::Mapped(did),
48 None => Self::NoRepoDid,
49 }
50 }
51}
52
53#[derive(Clone, Debug, Eq, PartialEq)]
54enum CacheEntry {
55 Authoritative(AuthoritativeResolution),
56 Provisional(Resolution),
57 Transient { expires_at: Instant },
58}
59
60impl CacheEntry {
61 fn into_resolution(self) -> Resolution {
62 match self {
63 Self::Authoritative(AuthoritativeResolution::Mapped(did)) => Resolution::Mapped(did),
64 Self::Authoritative(AuthoritativeResolution::NoRepoDid) => Resolution::NoRepoDid,
65 Self::Provisional(r) => r,
66 Self::Transient { .. } => Resolution::Unresolvable,
67 }
68 }
69
70 fn is_expired_transient(&self, now: Instant) -> bool {
71 matches!(self, Self::Transient { expires_at } if *expires_at <= now)
72 }
73}
74
75#[derive(Default)]
76pub struct ResolverStats {
77 hits: AtomicU64,
78 misses_mapped: AtomicU64,
79 misses_no_repo_did: AtomicU64,
80 misses_unresolvable: AtomicU64,
81 misses_transient: AtomicU64,
82 misses_no_client: AtomicU64,
83 miss_latency_micros_sum: AtomicU64,
84 miss_latency_micros_max: AtomicU64,
85}
86
87#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
88pub struct ResolverStatsSnapshot {
89 pub hits: u64,
90 pub misses_mapped: u64,
91 pub misses_no_repo_did: u64,
92 pub misses_unresolvable: u64,
93 pub misses_transient: u64,
94 pub misses_no_client: u64,
95 pub miss_latency_micros_sum: u64,
96 pub miss_latency_micros_max: u64,
97}
98
99impl ResolverStatsSnapshot {
100 pub fn miss_count(&self) -> u64 {
101 self.misses_mapped
102 + self.misses_no_repo_did
103 + self.misses_unresolvable
104 + self.misses_transient
105 + self.misses_no_client
106 }
107
108 pub fn total(&self) -> u64 {
109 self.hits + self.miss_count()
110 }
111
112 pub fn miss_latency_micros_avg(&self) -> Option<u64> {
113 let misses = self.miss_count() - self.misses_no_client;
114 (misses > 0).then(|| self.miss_latency_micros_sum / misses)
115 }
116}
117
118#[derive(Clone, Copy)]
119enum MissKind {
120 Mapped,
121 NoRepoDid,
122 Unresolvable,
123 Transient,
124 NoClient,
125}
126
127impl ResolverStats {
128 fn record_hit(&self) {
129 self.hits.fetch_add(1, Ordering::Relaxed);
130 }
131
132 fn record_miss(&self, kind: MissKind, latency: Option<Duration>) {
133 let counter = match kind {
134 MissKind::Mapped => &self.misses_mapped,
135 MissKind::NoRepoDid => &self.misses_no_repo_did,
136 MissKind::Unresolvable => &self.misses_unresolvable,
137 MissKind::Transient => &self.misses_transient,
138 MissKind::NoClient => &self.misses_no_client,
139 };
140 counter.fetch_add(1, Ordering::Relaxed);
141 if let Some(latency) = latency {
142 let micros = u64::try_from(latency.as_micros()).unwrap_or(u64::MAX);
143 self.miss_latency_micros_sum
144 .fetch_add(micros, Ordering::Relaxed);
145 self.miss_latency_micros_max
146 .fetch_max(micros, Ordering::Relaxed);
147 }
148 }
149
150 pub fn snapshot(&self) -> ResolverStatsSnapshot {
151 ResolverStatsSnapshot {
152 hits: self.hits.load(Ordering::Relaxed),
153 misses_mapped: self.misses_mapped.load(Ordering::Relaxed),
154 misses_no_repo_did: self.misses_no_repo_did.load(Ordering::Relaxed),
155 misses_unresolvable: self.misses_unresolvable.load(Ordering::Relaxed),
156 misses_transient: self.misses_transient.load(Ordering::Relaxed),
157 misses_no_client: self.misses_no_client.load(Ordering::Relaxed),
158 miss_latency_micros_sum: self.miss_latency_micros_sum.load(Ordering::Relaxed),
159 miss_latency_micros_max: self.miss_latency_micros_max.load(Ordering::Relaxed),
160 }
161 }
162}
163
164struct SlingshotProbe {
165 client: SlingshotClient,
166 clock: Arc<dyn Clock>,
167}
168
169pub struct RepoIdResolver {
170 cache: SccMap<RepoIdent, CacheEntry, RuntimeHasher>,
171 by_repo_did: SccMap<Did<DefaultStr>, RepoIdent, RuntimeHasher>,
172 in_flight: SccMap<RepoIdent, Arc<OnceCell<Resolution>>, RuntimeHasher>,
173 probe: Option<SlingshotProbe>,
174 stats: ResolverStats,
175}
176
177impl RepoIdResolver {
178 pub fn with_slingshot(
179 client: SlingshotClient,
180 clock: Arc<dyn Clock>,
181 hasher: RuntimeHasher,
182 ) -> Self {
183 Self {
184 cache: SccMap::with_hasher(hasher.clone()),
185 by_repo_did: SccMap::with_hasher(hasher.clone()),
186 in_flight: SccMap::with_hasher(hasher),
187 probe: Some(SlingshotProbe { client, clock }),
188 stats: ResolverStats::default(),
189 }
190 }
191
192 pub fn detached(hasher: RuntimeHasher) -> Self {
193 Self {
194 cache: SccMap::with_hasher(hasher.clone()),
195 by_repo_did: SccMap::with_hasher(hasher.clone()),
196 in_flight: SccMap::with_hasher(hasher),
197 probe: None,
198 stats: ResolverStats::default(),
199 }
200 }
201
202 pub fn stats(&self) -> ResolverStatsSnapshot {
203 self.stats.snapshot()
204 }
205
206 pub async fn cached_resolution(
207 &self,
208 owner: &Did<DefaultStr>,
209 rkey: &Rkey<DefaultStr>,
210 ) -> Option<Resolution> {
211 let key = RepoIdent::new(owner.clone(), rkey.clone());
212 let entry = self.cache.get_async(&key).await?;
213 let now = self.probe.as_ref().map(|p| p.clock.now_instant());
214 if let Some(now) = now
215 && entry.get().is_expired_transient(now)
216 {
217 return None;
218 }
219 Some(entry.get().clone().into_resolution())
220 }
221
222 pub async fn lookup_by_repo_did(&self, repo_did: &Did<DefaultStr>) -> Option<RepoIdent> {
223 self.by_repo_did
224 .get_async(repo_did)
225 .await
226 .map(|e| e.get().clone())
227 }
228
229 pub async fn observe(
230 &self,
231 owner: Did<DefaultStr>,
232 rkey: Rkey<DefaultStr>,
233 repo_did: Option<Did<DefaultStr>>,
234 ) -> Option<RepoIdent> {
235 let ident = RepoIdent::new(owner, rkey);
236 let entry =
237 CacheEntry::Authoritative(AuthoritativeResolution::from_repo_did(repo_did.clone()));
238 self.cache
239 .entry_async(ident.clone())
240 .await
241 .and_modify(|existing| *existing = entry.clone())
242 .or_insert(entry);
243
244 let repo_did = repo_did?;
245 let mut prior: Option<RepoIdent> = None;
246 self.by_repo_did
247 .entry_async(repo_did)
248 .await
249 .and_modify(|existing| {
250 if *existing != ident {
251 prior = Some(existing.clone());
252 *existing = ident.clone();
253 }
254 })
255 .or_insert(ident);
256 prior
257 }
258
259 pub async fn forget(&self, owner: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) {
260 let ident = RepoIdent::new(owner.clone(), rkey.clone());
261 let prior_resolution = self
262 .cache
263 .remove_async(&ident)
264 .await
265 .map(|(_, entry)| entry.into_resolution());
266 if let Some(Resolution::Mapped(repo_did)) = prior_resolution {
267 self.by_repo_did
268 .remove_if_async(&repo_did, |existing| *existing == ident)
269 .await;
270 }
271 }
272
273 async fn fill_provisional(&self, key: RepoIdent, resolution: Resolution) {
274 let entry = CacheEntry::Provisional(resolution);
275 self.cache
276 .entry_async(key)
277 .await
278 .and_modify(|existing| {
279 if matches!(existing, CacheEntry::Authoritative(_)) {
280 return;
281 }
282 *existing = entry.clone();
283 })
284 .or_insert(entry);
285 }
286
287 async fn fill_transient(&self, key: RepoIdent, expires_at: Instant) {
288 let entry = CacheEntry::Transient { expires_at };
289 self.cache
290 .entry_async(key)
291 .await
292 .and_modify(|existing| {
293 if matches!(existing, CacheEntry::Authoritative(_)) {
294 return;
295 }
296 *existing = entry.clone();
297 })
298 .or_insert(entry);
299 }
300
301 pub async fn resolve(&self, owner: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) -> Resolution {
302 let key = RepoIdent::new(owner.clone(), rkey.clone());
303
304 let Some(probe) = self.probe.as_ref() else {
305 if let Some(entry) = self.cache.get_async(&key).await {
306 self.stats.record_hit();
307 return entry.get().clone().into_resolution();
308 }
309 self.stats.record_miss(MissKind::NoClient, None);
310 return Resolution::Unresolvable;
311 };
312
313 let now = probe.clock.now_instant();
314 if let Some(entry) = self.cache.get_async(&key).await
315 && !entry.get().is_expired_transient(now)
316 {
317 self.stats.record_hit();
318 return entry.get().clone().into_resolution();
319 }
320
321 let cell: Arc<OnceCell<Resolution>> = self
322 .in_flight
323 .entry_async(key.clone())
324 .await
325 .or_insert_with(|| Arc::new(OnceCell::new()))
326 .get()
327 .clone();
328
329 let result = cell
330 .get_or_init(|| async { self.fetch_repo_did(owner, rkey, &key).await })
331 .await
332 .clone();
333
334 self.in_flight.remove_async(&key).await;
335
336 result
337 }
338
339 async fn fetch_repo_did(
340 &self,
341 owner: &Did<DefaultStr>,
342 rkey: &Rkey<DefaultStr>,
343 key: &RepoIdent,
344 ) -> Resolution {
345 let probe = self
346 .probe
347 .as_ref()
348 .expect("fetch_repo_did is only called when a probe is present");
349 let started = probe.clock.now_instant();
350 let nsid: Nsid<DefaultStr> = nsid_static(REPO_COLLECTION);
351 let provisional = match probe.client.get_record(owner, &nsid, rkey).await {
352 Ok(body) => match repo_did_from_body(&nsid, &body.value) {
353 Ok(Some(did)) => Resolution::Mapped(did),
354 Ok(None) => Resolution::NoRepoDid,
355 Err(e) => {
356 warn!(
357 error = ?e,
358 owner = owner.as_ref(),
359 rkey = rkey.as_ref(),
360 "slingshot returned unparseable repo body, caching as unresolvable",
361 );
362 Resolution::Unresolvable
363 }
364 },
365 Err(SlingshotError::NotFound) => {
366 warn!(
367 owner = owner.as_ref(),
368 rkey = rkey.as_ref(),
369 "no repo record on slingshot, caching as unresolvable",
370 );
371 Resolution::Unresolvable
372 }
373 Err(ref e) if is_garbage_response(e) => {
374 warn!(
375 error = ?e,
376 owner = owner.as_ref(),
377 rkey = rkey.as_ref(),
378 "slingshot returned malformed response, caching as unresolvable",
379 );
380 Resolution::Unresolvable
381 }
382 Err(e) => {
383 warn!(
384 error = ?e,
385 owner = owner.as_ref(),
386 rkey = rkey.as_ref(),
387 "caching transient slingshot failure for repoDID lookup under short TTL",
388 );
389 let elapsed = probe.clock.now_instant().duration_since(started);
390 self.stats.record_miss(MissKind::Transient, Some(elapsed));
391 let expires_at = probe.clock.now_instant() + TRANSIENT_TTL;
392 self.fill_transient(key.clone(), expires_at).await;
393 return Resolution::Unresolvable;
394 }
395 };
396 let elapsed = probe.clock.now_instant().duration_since(started);
397 let kind = match &provisional {
398 Resolution::Mapped(_) => MissKind::Mapped,
399 Resolution::NoRepoDid => MissKind::NoRepoDid,
400 Resolution::Unresolvable => MissKind::Unresolvable,
401 };
402 self.stats.record_miss(kind, Some(elapsed));
403 self.fill_provisional(key.clone(), provisional.clone())
404 .await;
405 provisional
406 }
407}
408
409fn is_garbage_response(err: &SlingshotError) -> bool {
410 matches!(
411 err,
412 SlingshotError::Decode(_)
413 | SlingshotError::MissingField(_)
414 | SlingshotError::InvalidAtUri(_)
415 | SlingshotError::InvalidCid(_)
416 | SlingshotError::UriMismatch { .. },
417 )
418}
419
420fn repo_did_from_body(
421 nsid: &Nsid<DefaultStr>,
422 body: &[u8],
423) -> Result<Option<Did<DefaultStr>>, ExtractError> {
424 match DecodedRecord::try_decode(nsid, body)? {
425 DecodedRecord::Canon(Record::Repo(repo)) => Ok(repo.repo_did),
426 DecodedRecord::Canon(_) | DecodedRecord::Legacy(_) => Ok(None),
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use bobbin_runtime::SystemClock;
434 use jacquard_common::types::did::Did;
435 use jacquard_common::types::recordkey::Rkey;
436
437 fn did(s: &str) -> Did<DefaultStr> {
438 Did::new_owned(s).unwrap()
439 }
440
441 fn rkey(s: &str) -> Rkey<DefaultStr> {
442 Rkey::new_owned(s).unwrap()
443 }
444
445 fn test_clock() -> Arc<dyn Clock> {
446 Arc::new(SystemClock::new())
447 }
448
449 #[tokio::test]
450 async fn observation_returns_prior_ident_when_repo_did_moves() {
451 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
452 let prior = resolver
453 .observe(
454 did("did:plc:nel"),
455 rkey("3liuighjy2h22"),
456 Some(did("did:plc:clam")),
457 )
458 .await;
459 assert!(prior.is_none(), "first observation has no prior");
460
461 let prior = resolver
462 .observe(did("did:plc:nel"), rkey("core"), Some(did("did:plc:clam")))
463 .await;
464 assert_eq!(
465 prior,
466 Some(RepoIdent::new(did("did:plc:nel"), rkey("3liuighjy2h22"))),
467 "same repoDID at a new (owner, rkey) returns the prior ident so callers can evict the stale at-uri",
468 );
469
470 let prior = resolver
471 .observe(did("did:plc:nel"), rkey("core"), Some(did("did:plc:clam")))
472 .await;
473 assert!(prior.is_none(), "re-observing the same ident is a no-op");
474 }
475
476 #[tokio::test]
477 async fn observation_without_repo_did_does_not_track_reverse() {
478 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
479 let prior = resolver
480 .observe(did("did:plc:nel"), rkey("abcabcabcabcz"), None)
481 .await;
482 assert!(prior.is_none());
483 }
484
485 #[tokio::test]
486 async fn forget_clears_reverse_only_when_still_owned() {
487 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
488 resolver
489 .observe(
490 did("did:plc:nel"),
491 rkey("3liuighjy2h22"),
492 Some(did("did:plc:clam")),
493 )
494 .await;
495 resolver
496 .observe(did("did:plc:nel"), rkey("core"), Some(did("did:plc:clam")))
497 .await;
498
499 resolver
500 .forget(&did("did:plc:nel"), &rkey("3liuighjy2h22"))
501 .await;
502
503 let prior = resolver
504 .observe(
505 did("did:plc:nel"),
506 rkey("core-renamed"),
507 Some(did("did:plc:clam")),
508 )
509 .await;
510 assert_eq!(
511 prior,
512 Some(RepoIdent::new(did("did:plc:nel"), rkey("core"))),
513 "stale at-uri's forget must not displace the live owner of did:plc:clam",
514 );
515 }
516
517 #[tokio::test]
518 async fn observation_with_repo_did_resolves_mapped() {
519 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
520 resolver
521 .observe(
522 did("did:plc:nel"),
523 rkey("abcabcabcabcz"),
524 Some(did("did:plc:clam")),
525 )
526 .await;
527 let got = resolver
528 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz"))
529 .await;
530 assert_eq!(got, Resolution::Mapped(did("did:plc:clam")));
531 }
532
533 #[tokio::test]
534 async fn observation_without_repo_did_resolves_no_repo_did() {
535 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
536 resolver
537 .observe(did("did:plc:nel"), rkey("abcabcabcabcz"), None)
538 .await;
539 let got = resolver
540 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz"))
541 .await;
542 assert_eq!(
543 got,
544 Resolution::NoRepoDid,
545 "observed but empty repoDID is a definitive answer not a lookup failure",
546 );
547 }
548
549 #[tokio::test]
550 async fn cache_miss_without_client_is_unresolvable() {
551 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
552 let got = resolver
553 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz"))
554 .await;
555 assert_eq!(got, Resolution::Unresolvable);
556 }
557
558 #[tokio::test]
559 async fn lookup_by_repo_did_finds_observed_ident() {
560 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
561 resolver
562 .observe(
563 did("did:plc:nel"),
564 rkey("abcabcabcabcz"),
565 Some(did("did:plc:limpet")),
566 )
567 .await;
568 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await;
569 assert_eq!(
570 got,
571 Some(RepoIdent::new(did("did:plc:nel"), rkey("abcabcabcabcz"))),
572 );
573 }
574
575 #[tokio::test]
576 async fn lookup_by_repo_did_misses_when_unobserved() {
577 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
578 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await;
579 assert_eq!(got, None);
580 }
581
582 #[tokio::test]
583 async fn lookup_by_repo_did_misses_when_repo_did_was_none() {
584 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
585 resolver
586 .observe(did("did:plc:nel"), rkey("abcabcabcabcz"), None)
587 .await;
588 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await;
589 assert_eq!(got, None);
590 }
591
592 #[tokio::test]
593 async fn lookup_by_repo_did_follows_move_to_new_ident() {
594 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
595 resolver
596 .observe(
597 did("did:plc:nel"),
598 rkey("abcabcabcabcz"),
599 Some(did("did:plc:limpet")),
600 )
601 .await;
602 resolver
603 .observe(
604 did("did:plc:olaren"),
605 rkey("xyzxyzxyzxyzx"),
606 Some(did("did:plc:limpet")),
607 )
608 .await;
609 let got = resolver.lookup_by_repo_did(&did("did:plc:limpet")).await;
610 assert_eq!(
611 got,
612 Some(RepoIdent::new(did("did:plc:olaren"), rkey("xyzxyzxyzxyzx"))),
613 );
614 }
615
616 #[tokio::test]
617 async fn observation_overwrites_prior_value() {
618 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
619 resolver
620 .observe(
621 did("did:plc:nel"),
622 rkey("abcabcabcabcz"),
623 Some(did("did:plc:clam")),
624 )
625 .await;
626 resolver
627 .observe(
628 did("did:plc:nel"),
629 rkey("abcabcabcabcz"),
630 Some(did("did:plc:uni")),
631 )
632 .await;
633 let got = resolver
634 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz"))
635 .await;
636 assert_eq!(got, Resolution::Mapped(did("did:plc:uni")));
637 }
638
639 #[tokio::test]
640 async fn fill_provisional_does_not_downgrade_authoritative_mapped() {
641 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
642 let owner = did("did:plc:nel");
643 let key = rkey("abcabcabcabcz");
644 resolver
645 .observe(owner.clone(), key.clone(), Some(did("did:plc:clam")))
646 .await;
647 resolver
648 .fill_provisional(
649 RepoIdent::new(owner.clone(), key.clone()),
650 Resolution::Unresolvable,
651 )
652 .await;
653 let got = resolver.resolve(&owner, &key).await;
654 assert_eq!(
655 got,
656 Resolution::Mapped(did("did:plc:clam")),
657 "firehose-observed mapping must outrank provisional slingshot info",
658 );
659 }
660
661 #[tokio::test]
662 async fn fill_provisional_does_not_downgrade_authoritative_no_repo_did() {
663 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
664 let owner = did("did:plc:nel");
665 let key = rkey("abcabcabcabcz");
666 resolver.observe(owner.clone(), key.clone(), None).await;
667 resolver
668 .fill_provisional(
669 RepoIdent::new(owner.clone(), key.clone()),
670 Resolution::Mapped(did("did:plc:clam")),
671 )
672 .await;
673 let got = resolver.resolve(&owner, &key).await;
674 assert_eq!(
675 got,
676 Resolution::NoRepoDid,
677 "an authoritative empty observation must outrank provisional slingshot info even when slingshot disagrees",
678 );
679 }
680
681 #[tokio::test]
682 async fn slingshot_404_caches_as_unresolvable() {
683 let server = wiremock::MockServer::start().await;
684 wiremock::Mock::given(wiremock::matchers::method("GET"))
685 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
686 .respond_with(wiremock::ResponseTemplate::new(404))
687 .expect(1)
688 .mount(&server)
689 .await;
690
691 let client =
692 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
693 let resolver =
694 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
695
696 let owner = did("did:plc:nel");
697 let key = rkey("abcabcabcabcz");
698 let first = resolver.resolve(&owner, &key).await;
699 let second = resolver.resolve(&owner, &key).await;
700 assert_eq!(first, Resolution::Unresolvable);
701 assert_eq!(second, Resolution::Unresolvable);
702 }
703
704 #[tokio::test]
705 async fn slingshot_malformed_envelope_caches_as_unresolvable() {
706 let server = wiremock::MockServer::start().await;
707 wiremock::Mock::given(wiremock::matchers::method("GET"))
708 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
709 .respond_with(
710 wiremock::ResponseTemplate::new(200)
711 .insert_header("content-type", "application/json")
712 .set_body_string("not json"),
713 )
714 .expect(1)
715 .mount(&server)
716 .await;
717
718 let client =
719 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
720 let resolver =
721 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
722
723 let owner = did("did:plc:nel");
724 let key = rkey("abcabcabcabcz");
725 let first = resolver.resolve(&owner, &key).await;
726 let second = resolver.resolve(&owner, &key).await;
727 assert_eq!(first, Resolution::Unresolvable);
728 assert_eq!(
729 second,
730 Resolution::Unresolvable,
731 "garbage envelopes are stable across retries, so caching avoids hammering slingshot",
732 );
733 }
734
735 #[tokio::test]
736 async fn slingshot_uri_mismatch_caches_as_unresolvable() {
737 let server = wiremock::MockServer::start().await;
738 let body = serde_json::json!({
739 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere",
740 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i",
741 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z"}
742 });
743 wiremock::Mock::given(wiremock::matchers::method("GET"))
744 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
745 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
746 .expect(1)
747 .mount(&server)
748 .await;
749
750 let client =
751 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
752 let resolver =
753 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
754
755 let owner = did("did:plc:nel");
756 let key = rkey("abcabcabcabcz");
757 let first = resolver.resolve(&owner, &key).await;
758 let second = resolver.resolve(&owner, &key).await;
759 assert_eq!(first, Resolution::Unresolvable);
760 assert_eq!(second, Resolution::Unresolvable);
761 }
762
763 #[tokio::test]
764 async fn slingshot_legacy_repo_body_resolves_no_repo_did_not_unresolvable() {
765 let server = wiremock::MockServer::start().await;
766 let body = serde_json::json!({
767 "uri": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz",
768 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i",
769 "value": {
770 "$type": "sh.tangled.repo",
771 "addedAt": "2025-03-07T21:47:53Z",
772 "knot": "knot1.tangled.sh",
773 "name": "scallop",
774 "owner": "did:plc:nel",
775 },
776 });
777 wiremock::Mock::given(wiremock::matchers::method("GET"))
778 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
779 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
780 .expect(1)
781 .mount(&server)
782 .await;
783
784 let client =
785 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
786 let resolver =
787 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
788
789 let owner = did("did:plc:nel");
790 let key = rkey("abcabcabcabcz");
791 let got = resolver.resolve(&owner, &key).await;
792 assert_eq!(
793 got,
794 Resolution::NoRepoDid,
795 "legacy repo wires without a repo_did parse via legacy upgrade and resolve as NoRepoDid, not Unresolvable",
796 );
797 }
798
799 #[tokio::test]
800 async fn slingshot_unparseable_repo_value_caches_as_unresolvable() {
801 let server = wiremock::MockServer::start().await;
802 let body = serde_json::json!({
803 "uri": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz",
804 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i",
805 "value": {"$type": "sh.tangled.repo"}
806 });
807 wiremock::Mock::given(wiremock::matchers::method("GET"))
808 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
809 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
810 .expect(1)
811 .mount(&server)
812 .await;
813
814 let client =
815 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
816 let resolver =
817 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
818
819 let owner = did("did:plc:nel");
820 let key = rkey("abcabcabcabcz");
821 let first = resolver.resolve(&owner, &key).await;
822 let second = resolver.resolve(&owner, &key).await;
823 assert_eq!(
824 first,
825 Resolution::Unresolvable,
826 "a repo body that fails lexicon validation must not be conflated with NoRepoDid",
827 );
828 assert_eq!(second, Resolution::Unresolvable);
829 }
830
831 #[tokio::test]
832 async fn slingshot_transport_error_caches_with_short_ttl() {
833 let server = wiremock::MockServer::start().await;
834 wiremock::Mock::given(wiremock::matchers::method("GET"))
835 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
836 .respond_with(wiremock::ResponseTemplate::new(503))
837 .expect(1)
838 .mount(&server)
839 .await;
840
841 let client =
842 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
843 let resolver =
844 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
845
846 let owner = did("did:plc:nel");
847 let key = rkey("abcabcabcabcz");
848 let first = resolver.resolve(&owner, &key).await;
849 let second = resolver.resolve(&owner, &key).await;
850 assert_eq!(first, Resolution::Unresolvable);
851 assert_eq!(
852 second,
853 Resolution::Unresolvable,
854 "transient TTL must suppress immediate re-hammering of a sick upstream",
855 );
856 }
857
858 #[tokio::test]
859 async fn slingshot_transient_recorded_separately_from_unresolvable() {
860 let server = wiremock::MockServer::start().await;
861 wiremock::Mock::given(wiremock::matchers::method("GET"))
862 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
863 .respond_with(wiremock::ResponseTemplate::new(503))
864 .expect(1)
865 .mount(&server)
866 .await;
867
868 let client =
869 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
870 let resolver =
871 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
872
873 let owner = did("did:plc:nel");
874 let key = rkey("abcabcabcabcz");
875 resolver.resolve(&owner, &key).await;
876 resolver.resolve(&owner, &key).await;
877
878 let snap = resolver.stats();
879 assert_eq!(
880 snap.misses_transient, 1,
881 "second resolve must hit the short-TTL cache instead of re-firing the transient miss",
882 );
883 assert_eq!(snap.hits, 1, "second call hits cached transient entry");
884 assert_eq!(
885 snap.misses_unresolvable, 0,
886 "canonical unresolvable counter is reserved for cached terminal answers",
887 );
888 assert!(
889 snap.miss_latency_micros_sum > 0,
890 "transient misses still have latency contributions",
891 );
892 assert_eq!(snap.miss_count(), 1);
893 }
894
895 #[tokio::test]
896 async fn slingshot_in_flight_requests_coalesce() {
897 let server = wiremock::MockServer::start().await;
898 let body = serde_json::json!({
899 "uri": "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz",
900 "cid": "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i",
901 "value": {"$type": "sh.tangled.repo", "knot": "oyster.cafe", "createdAt": "2026-05-01T00:00:00Z", "repoDid": "did:plc:limpet"}
902 });
903 wiremock::Mock::given(wiremock::matchers::method("GET"))
904 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
905 .respond_with(
906 wiremock::ResponseTemplate::new(200)
907 .set_body_json(body)
908 .set_delay(Duration::from_millis(200)),
909 )
910 .expect(1)
911 .mount(&server)
912 .await;
913
914 let client =
915 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
916 let resolver = Arc::new(RepoIdResolver::with_slingshot(
917 client,
918 test_clock(),
919 RuntimeHasher::default(),
920 ));
921
922 let owner = did("did:plc:nel");
923 let key = rkey("abcabcabcabcz");
924 let r0 = resolver.clone();
925 let r1 = resolver.clone();
926 let r2 = resolver.clone();
927 let o0 = owner.clone();
928 let o1 = owner.clone();
929 let o2 = owner.clone();
930 let k0 = key.clone();
931 let k1 = key.clone();
932 let k2 = key.clone();
933 let (a, b, c) = tokio::join!(
934 tokio::spawn(async move { r0.resolve(&o0, &k0).await }),
935 tokio::spawn(async move { r1.resolve(&o1, &k1).await }),
936 tokio::spawn(async move { r2.resolve(&o2, &k2).await }),
937 );
938 let expected = Resolution::Mapped(did("did:plc:limpet"));
939 assert_eq!(a.unwrap(), expected);
940 assert_eq!(b.unwrap(), expected);
941 assert_eq!(c.unwrap(), expected);
942
943 let snap = resolver.stats();
944 assert_eq!(
945 snap.misses_mapped, 1,
946 "only the winning task pays the slingshot RTT",
947 );
948 }
949
950 #[tokio::test]
951 async fn stats_count_hits_misses_and_latency() {
952 let server = wiremock::MockServer::start().await;
953 wiremock::Mock::given(wiremock::matchers::method("GET"))
954 .and(wiremock::matchers::path("/xrpc/com.atproto.repo.getRecord"))
955 .respond_with(wiremock::ResponseTemplate::new(404))
956 .mount(&server)
957 .await;
958 let client =
959 SlingshotClient::with_default_http(url::Url::parse(&server.uri()).unwrap()).unwrap();
960 let resolver =
961 RepoIdResolver::with_slingshot(client, test_clock(), RuntimeHasher::default());
962
963 let owner = did("did:plc:nel");
964 let key = rkey("abcabcabcabcz");
965 resolver.resolve(&owner, &key).await;
966 resolver.resolve(&owner, &key).await;
967
968 let snap = resolver.stats();
969 assert_eq!(
970 snap.misses_unresolvable, 1,
971 "first call is the slingshot miss"
972 );
973 assert_eq!(snap.hits, 1, "second call hits the unresolvable cache");
974 assert_eq!(snap.miss_count(), 1);
975 assert_eq!(snap.total(), 2);
976 assert!(
977 snap.miss_latency_micros_sum > 0,
978 "latency recorded for slingshot miss"
979 );
980 assert!(snap.miss_latency_micros_avg().unwrap() > 0);
981 }
982
983 #[tokio::test]
984 async fn stats_no_client_miss_recorded_without_latency() {
985 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
986 resolver
987 .resolve(&did("did:plc:nel"), &rkey("abcabcabcabcz"))
988 .await;
989 let snap = resolver.stats();
990 assert_eq!(snap.misses_no_client, 1);
991 assert_eq!(snap.miss_latency_micros_sum, 0);
992 assert_eq!(snap.miss_latency_micros_avg(), None);
993 }
994
995 #[tokio::test]
996 async fn firehose_observe_can_demote_provisional() {
997 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
998 let owner = did("did:plc:nel");
999 let key = rkey("abcabcabcabcz");
1000 resolver
1001 .fill_provisional(
1002 RepoIdent::new(owner.clone(), key.clone()),
1003 Resolution::Mapped(did("did:plc:clam")),
1004 )
1005 .await;
1006 resolver.observe(owner.clone(), key.clone(), None).await;
1007 let got = resolver.resolve(&owner, &key).await;
1008 assert_eq!(
1009 got,
1010 Resolution::NoRepoDid,
1011 "firehose update is canonical and may legitimately remove repoDID",
1012 );
1013 }
1014
1015 #[tokio::test]
1016 async fn forget_removes_cache_entry() {
1017 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
1018 let owner = did("did:plc:nel");
1019 let key = rkey("abcabcabcabcz");
1020 resolver
1021 .observe(owner.clone(), key.clone(), Some(did("did:plc:clam")))
1022 .await;
1023 assert_eq!(
1024 resolver.cached_resolution(&owner, &key).await,
1025 Some(Resolution::Mapped(did("did:plc:clam"))),
1026 );
1027 resolver.forget(&owner, &key).await;
1028 assert_eq!(
1029 resolver.cached_resolution(&owner, &key).await,
1030 None,
1031 "forget must drop the entry entirely so a subsequent observe can supply fresh state",
1032 );
1033 }
1034}