This repository has no description
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_xrpc::{AppState, router};
12use futures::stream::{self, StreamExt};
13use http::{Request, StatusCode};
14use jacquard_common::DefaultStr;
15use jacquard_common::types::did::Did;
16use jacquard_common::types::nsid::Nsid;
17use jacquard_common::types::recordkey::Rkey;
18use serde_json::{Value, json};
19use tower::ServiceExt;
20use url::Url;
21use url::form_urlencoded::byte_serialize;
22use wiremock::matchers::{method, path, query_param};
23use wiremock::{Mock, MockServer, ResponseTemplate};
24
25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
26
27fn did(s: &str) -> Did<DefaultStr> {
28 Did::new_owned(s).unwrap()
29}
30
31fn rkey(s: &str) -> Rkey<DefaultStr> {
32 Rkey::new_owned(s).unwrap()
33}
34
35fn nsid(s: &'static str) -> Nsid<DefaultStr> {
36 Nsid::new_static(s).unwrap()
37}
38
39async fn fresh_app(server_uri: &Url) -> AppState {
40 AppState::new(
41 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
42 SlingshotClient::with_default_http(server_uri.clone()).unwrap(),
43 Arc::new(EdgeStore::new(RuntimeHasher::default())),
44 Arc::new(StateIndex::new(RuntimeHasher::default())),
45 Arc::new(StateIndex::new(RuntimeHasher::default())),
46 Arc::new(CoverageWatch::new()),
47 Arc::new(
48 KnotProxy::new(
49 KnotProxyConfig::default(),
50 KnotHttpConfig::default(),
51 Arc::new(SystemClock::new()),
52 RuntimeHasher::default(),
53 )
54 .unwrap(),
55 ),
56 Arc::new(SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap())
57 as Arc<dyn SearchReader>,
58 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
59 )
60}
61
62async fn mount_record(
63 server: &MockServer,
64 did: &Did<DefaultStr>,
65 collection: &Nsid<DefaultStr>,
66 rkey: &Rkey<DefaultStr>,
67 value: Value,
68) {
69 let uri = format!(
70 "at://{}/{}/{}",
71 did.as_ref(),
72 collection.as_ref(),
73 rkey.as_ref()
74 );
75 let body = json!({ "uri": uri, "cid": CID, "value": value });
76 Mock::given(method("GET"))
77 .and(path("/xrpc/com.atproto.repo.getRecord"))
78 .and(query_param("repo", did.as_ref()))
79 .and(query_param("collection", collection.as_ref()))
80 .and(query_param("rkey", rkey.as_ref()))
81 .respond_with(ResponseTemplate::new(200).set_body_json(body))
82 .mount(server)
83 .await;
84}
85
86fn xrpc_request(endpoint: &str, param: &str, value: &str) -> Request<Body> {
87 let encoded: String = byte_serialize(value.as_bytes()).collect();
88 Request::builder()
89 .uri(format!("/xrpc/{endpoint}?{param}={encoded}"))
90 .body(Body::empty())
91 .unwrap()
92}
93
94async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
95 let status = resp.status();
96 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
97 let parsed: Value = serde_json::from_slice(&bytes).expect("response is JSON");
98 (status, parsed)
99}
100
101#[tokio::test]
102async fn cold_start_serves_all_four_point_lookups() {
103 let server = MockServer::start().await;
104 let clam = did("did:plc:clam");
105
106 mount_record(
107 &server,
108 &clam,
109 &nsid("sh.tangled.repo"),
110 &rkey("r1"),
111 json!({
112 "$type": "sh.tangled.repo",
113 "name": "clam",
114 "knot": "oyster.cafe",
115 "createdAt": "2026-05-01T00:00:00Z"
116 }),
117 )
118 .await;
119
120 mount_record(
121 &server,
122 &clam,
123 &nsid("sh.tangled.actor.profile"),
124 &rkey("self"),
125 json!({
126 "$type": "sh.tangled.actor.profile",
127 "bluesky": false,
128 "description": "clam shell"
129 }),
130 )
131 .await;
132
133 mount_record(
134 &server,
135 &clam,
136 &nsid("sh.tangled.repo.issue"),
137 &rkey("i1"),
138 json!({
139 "$type": "sh.tangled.repo.issue",
140 "repo": "did:plc:limpet",
141 "title": "broken",
142 "createdAt": "2026-05-01T00:00:00Z"
143 }),
144 )
145 .await;
146
147 mount_record(
148 &server,
149 &clam,
150 &nsid("sh.tangled.repo.pull"),
151 &rkey("p1"),
152 json!({
153 "$type": "sh.tangled.repo.pull",
154 "title": "ship",
155 "createdAt": "2026-05-01T00:00:00Z",
156 "rounds": [],
157 "target": {"repo": "did:plc:limpet", "branch": "main"}
158 }),
159 )
160 .await;
161
162 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
163 let app = router(state);
164
165 let cases = [
166 (
167 "sh.tangled.repo.getRepo",
168 "repo",
169 format!("at://{}/sh.tangled.repo/r1", clam.as_ref()),
170 "knot",
171 json!("oyster.cafe"),
172 ),
173 (
174 "sh.tangled.actor.getProfile",
175 "actor",
176 format!("at://{}/sh.tangled.actor.profile/self", clam.as_ref()),
177 "description",
178 json!("clam shell"),
179 ),
180 (
181 "sh.tangled.repo.getIssue",
182 "issue",
183 format!("at://{}/sh.tangled.repo.issue/i1", clam.as_ref()),
184 "title",
185 json!("broken"),
186 ),
187 (
188 "sh.tangled.repo.getPull",
189 "pull",
190 format!("at://{}/sh.tangled.repo.pull/p1", clam.as_ref()),
191 "title",
192 json!("ship"),
193 ),
194 ];
195
196 stream::iter(cases)
197 .for_each(|(endpoint, param, at_uri, field, expected)| {
198 let app = app.clone();
199 async move {
200 let resp = app
201 .oneshot(xrpc_request(endpoint, param, &at_uri))
202 .await
203 .unwrap();
204 let (status, body) = json_response(resp).await;
205 assert_eq!(status, StatusCode::OK, "{endpoint} status");
206 assert_eq!(body["uri"], at_uri, "{endpoint} uri");
207 assert_eq!(body["cid"], CID, "{endpoint} cid");
208 assert_eq!(
209 body["value"][field], expected,
210 "{endpoint} body field {field}"
211 );
212 }
213 })
214 .await;
215}
216
217#[tokio::test]
218async fn second_call_is_served_from_lru() {
219 let server = MockServer::start().await;
220 let uni = did("did:plc:uni");
221 let mock = Mock::given(method("GET"))
222 .and(path("/xrpc/com.atproto.repo.getRecord"))
223 .and(query_param("repo", uni.as_ref()))
224 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
225 "uri": format!("at://{}/sh.tangled.repo/r1", uni.as_ref()),
226 "cid": CID,
227 "value": {
228 "$type": "sh.tangled.repo",
229 "name": "uni",
230 "knot": "witchcraft.systems",
231 "createdAt": "2026-05-01T00:00:00Z"
232 }
233 })))
234 .expect(1)
235 .mount_as_scoped(&server)
236 .await;
237
238 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
239 let app = router(state);
240 let req_uri = format!("at://{}/sh.tangled.repo/r1", uni.as_ref());
241
242 stream::iter(0..3)
243 .for_each(|_| {
244 let app = app.clone();
245 let req_uri = req_uri.clone();
246 async move {
247 let resp = app
248 .oneshot(xrpc_request("sh.tangled.repo.getRepo", "repo", &req_uri))
249 .await
250 .unwrap();
251 assert_eq!(resp.status(), StatusCode::OK);
252 }
253 })
254 .await;
255
256 drop(mock);
257}
258
259#[tokio::test]
260async fn collection_mismatch_is_400() {
261 let server = MockServer::start().await;
262 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
263 let app = router(state);
264 let resp = app
265 .oneshot(xrpc_request(
266 "sh.tangled.repo.getRepo",
267 "repo",
268 "at://did:plc:clam/sh.tangled.actor.profile/self",
269 ))
270 .await
271 .unwrap();
272 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
273}
274
275#[tokio::test]
276async fn handle_authority_is_400() {
277 let server = MockServer::start().await;
278 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
279 let app = router(state);
280 let resp = app
281 .oneshot(xrpc_request(
282 "sh.tangled.repo.getRepo",
283 "repo",
284 "at://witchcraft.systems/sh.tangled.repo/r1",
285 ))
286 .await
287 .unwrap();
288 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
289}
290
291#[tokio::test]
292async fn slingshot_404_propagates_as_404() {
293 let server = MockServer::start().await;
294 Mock::given(method("GET"))
295 .and(path("/xrpc/com.atproto.repo.getRecord"))
296 .respond_with(ResponseTemplate::new(404))
297 .mount(&server)
298 .await;
299 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
300 let app = router(state);
301 let resp = app
302 .oneshot(xrpc_request(
303 "sh.tangled.repo.getRepo",
304 "repo",
305 "at://did:plc:clam/sh.tangled.repo/missing",
306 ))
307 .await
308 .unwrap();
309 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
310}
311
312#[tokio::test]
313async fn wrong_record_type_is_502() {
314 let server = MockServer::start().await;
315 mount_record(
316 &server,
317 &did("did:plc:clam"),
318 &nsid("sh.tangled.repo"),
319 &rkey("r1"),
320 json!({
321 "$type": "sh.tangled.knot",
322 "knot": "oyster.cafe",
323 "createdAt": "2026-05-01T00:00:00Z"
324 }),
325 )
326 .await;
327 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
328 let app = router(state);
329 let resp = app
330 .oneshot(xrpc_request(
331 "sh.tangled.repo.getRepo",
332 "repo",
333 "at://did:plc:clam/sh.tangled.repo/r1",
334 ))
335 .await
336 .unwrap();
337 let (status, body) = json_response(resp).await;
338 assert_eq!(status, StatusCode::BAD_GATEWAY);
339 assert_eq!(body["error"], "InvalidRecord");
340}
341
342#[tokio::test]
343async fn wrong_type_does_not_poison_cache() {
344 let server = MockServer::start().await;
345 let mock = Mock::given(method("GET"))
346 .and(path("/xrpc/com.atproto.repo.getRecord"))
347 .and(query_param("repo", "did:plc:clam"))
348 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
349 "uri": "at://did:plc:clam/sh.tangled.repo/r1",
350 "cid": CID,
351 "value": {
352 "$type": "sh.tangled.knot",
353 "knot": "oyster.cafe",
354 "createdAt": "2026-05-01T00:00:00Z"
355 }
356 })))
357 .expect(2)
358 .mount_as_scoped(&server)
359 .await;
360 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
361 let app = router(state);
362 let req = || {
363 xrpc_request(
364 "sh.tangled.repo.getRepo",
365 "repo",
366 "at://did:plc:clam/sh.tangled.repo/r1",
367 )
368 };
369 let first = app.clone().oneshot(req()).await.unwrap();
370 assert_eq!(first.status(), StatusCode::BAD_GATEWAY);
371 let second = app.clone().oneshot(req()).await.unwrap();
372 assert_eq!(second.status(), StatusCode::BAD_GATEWAY);
373 drop(mock);
374}
375
376#[tokio::test]
377async fn profile_with_empty_preferred_handle_is_tolerated() {
378 let server = MockServer::start().await;
379 let nel = did("did:plc:nel");
380 mount_record(
381 &server,
382 &nel,
383 &nsid("sh.tangled.actor.profile"),
384 &rkey("self"),
385 json!({
386 "$type": "sh.tangled.actor.profile",
387 "bluesky": true,
388 "preferredHandle": "",
389 "description": "empty handle, valid profile"
390 }),
391 )
392 .await;
393 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
394 let app = router(state);
395 let at_uri = format!("at://{}/sh.tangled.actor.profile/self", nel.as_ref());
396 let resp = app
397 .oneshot(xrpc_request(
398 "sh.tangled.actor.getProfile",
399 "actor",
400 &at_uri,
401 ))
402 .await
403 .unwrap();
404 let (status, body) = json_response(resp).await;
405 assert_eq!(status, StatusCode::OK, "status: {body}");
406 assert_eq!(body["uri"], at_uri);
407 assert_eq!(body["value"]["description"], "empty handle, valid profile");
408 assert!(body["value"]["preferredHandle"].is_null());
409}
410
411#[tokio::test]
412async fn missing_uri_param_returns_json_envelope() {
413 let server = MockServer::start().await;
414 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
415 let app = router(state);
416 let req = Request::builder()
417 .uri("/xrpc/sh.tangled.repo.getRepo")
418 .body(Body::empty())
419 .unwrap();
420 let resp = app.oneshot(req).await.unwrap();
421 let (status, body) = json_response(resp).await;
422 assert_eq!(status, StatusCode::BAD_REQUEST);
423 assert_eq!(body["error"], "InvalidRequest");
424 assert!(body["message"].is_string());
425}
426
427#[tokio::test]
428async fn malformed_at_uri_returns_400_envelope() {
429 let server = MockServer::start().await;
430 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
431 let app = router(state);
432 let resp = app
433 .oneshot(xrpc_request(
434 "sh.tangled.repo.getRepo",
435 "repo",
436 "definitely-not-an-at-uri",
437 ))
438 .await
439 .unwrap();
440 let (status, body) = json_response(resp).await;
441 assert_eq!(status, StatusCode::BAD_REQUEST);
442 assert_eq!(body["error"], "InvalidRequest");
443}
444
445#[tokio::test]
446async fn upstream_uri_mismatch_routes_to_invalid_record() {
447 let server = MockServer::start().await;
448 Mock::given(method("GET"))
449 .and(path("/xrpc/com.atproto.repo.getRecord"))
450 .and(query_param("repo", "did:plc:clam"))
451 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
452 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere",
453 "cid": CID,
454 "value": {
455 "$type": "sh.tangled.repo",
456 "knot": "oyster.cafe",
457 "createdAt": "2026-05-01T00:00:00Z"
458 }
459 })))
460 .mount(&server)
461 .await;
462 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
463 let app = router(state);
464 let resp = app
465 .oneshot(xrpc_request(
466 "sh.tangled.repo.getRepo",
467 "repo",
468 "at://did:plc:clam/sh.tangled.repo/r1",
469 ))
470 .await
471 .unwrap();
472 let (status, body) = json_response(resp).await;
473 assert_eq!(status, StatusCode::BAD_GATEWAY);
474 assert_eq!(body["error"], "InvalidRecord");
475}
476
477#[tokio::test]
478async fn upstream_garbage_cid_routes_to_invalid_record() {
479 let server = MockServer::start().await;
480 Mock::given(method("GET"))
481 .and(path("/xrpc/com.atproto.repo.getRecord"))
482 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
483 "uri": "at://did:plc:clam/sh.tangled.repo/r1",
484 "cid": "not-a-real-cid",
485 "value": {
486 "$type": "sh.tangled.repo",
487 "knot": "oyster.cafe",
488 "createdAt": "2026-05-01T00:00:00Z"
489 }
490 })))
491 .mount(&server)
492 .await;
493 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
494 let app = router(state);
495 let resp = app
496 .oneshot(xrpc_request(
497 "sh.tangled.repo.getRepo",
498 "repo",
499 "at://did:plc:clam/sh.tangled.repo/r1",
500 ))
501 .await
502 .unwrap();
503 let (status, body) = json_response(resp).await;
504 assert_eq!(status, StatusCode::BAD_GATEWAY);
505 assert_eq!(body["error"], "InvalidRecord");
506}
507
508#[tokio::test]
509async fn oversize_upstream_body_routes_to_upstream_failed() {
510 let server = MockServer::start().await;
511 let payload = vec![b'x'; 8 * 1024 * 1024];
512 Mock::given(method("GET"))
513 .and(path("/xrpc/com.atproto.repo.getRecord"))
514 .respond_with(
515 ResponseTemplate::new(200)
516 .insert_header("content-type", "application/json")
517 .set_body_bytes(payload),
518 )
519 .mount(&server)
520 .await;
521 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
522 let app = router(state);
523 let resp = app
524 .oneshot(xrpc_request(
525 "sh.tangled.repo.getRepo",
526 "repo",
527 "at://did:plc:clam/sh.tangled.repo/r1",
528 ))
529 .await
530 .unwrap();
531 let (status, body) = json_response(resp).await;
532 assert_eq!(status, StatusCode::BAD_GATEWAY);
533 assert_eq!(body["error"], "UpstreamFailed");
534}
535
536#[tokio::test]
537async fn upstream_503_routes_to_upstream_failed() {
538 let server = MockServer::start().await;
539 Mock::given(method("GET"))
540 .and(path("/xrpc/com.atproto.repo.getRecord"))
541 .respond_with(ResponseTemplate::new(503))
542 .mount(&server)
543 .await;
544 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
545 let app = router(state);
546 let resp = app
547 .oneshot(xrpc_request(
548 "sh.tangled.repo.getRepo",
549 "repo",
550 "at://did:plc:clam/sh.tangled.repo/r1",
551 ))
552 .await
553 .unwrap();
554 let (status, body) = json_response(resp).await;
555 assert_eq!(status, StatusCode::BAD_GATEWAY);
556 assert_eq!(body["error"], "UpstreamFailed");
557}
558
559#[tokio::test]
560async fn get_repo_by_repo_did_returns_observed_record() {
561 let server = MockServer::start().await;
562 let owner_did = did("did:plc:scallop");
563 let rk = rkey("r1");
564 let repo_did = did("did:plc:limpet");
565 mount_record(
566 &server,
567 &owner_did,
568 &nsid("sh.tangled.repo"),
569 &rk,
570 json!({
571 "$type": "sh.tangled.repo",
572 "name": "scallop",
573 "knot": "oyster.cafe",
574 "createdAt": "2026-05-01T00:00:00Z",
575 "repoDid": repo_did.as_ref(),
576 }),
577 )
578 .await;
579
580 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
581 state
582 .resolver
583 .observe(owner_did.clone(), rk.clone(), Some(repo_did.clone()))
584 .await;
585
586 let app = router(state);
587 let resp = app
588 .oneshot(xrpc_request(
589 "sh.tangled.repo.getRepoByRepoDid",
590 "repoDid",
591 repo_did.as_ref(),
592 ))
593 .await
594 .unwrap();
595 let (status, body) = json_response(resp).await;
596 assert_eq!(status, StatusCode::OK);
597 assert_eq!(
598 body["uri"],
599 format!(
600 "at://{}/sh.tangled.repo/{}",
601 owner_did.as_ref(),
602 rk.as_ref()
603 )
604 );
605 assert_eq!(body["value"]["name"], "scallop");
606 assert_eq!(body["value"]["repoDid"], repo_did.as_ref());
607}
608
609#[tokio::test]
610async fn get_repo_by_repo_did_404_when_unobserved() {
611 let server = MockServer::start().await;
612 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
613 let app = router(state);
614 let resp = app
615 .oneshot(xrpc_request(
616 "sh.tangled.repo.getRepoByRepoDid",
617 "repoDid",
618 "did:plc:whelk",
619 ))
620 .await
621 .unwrap();
622 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
623}
624
625#[tokio::test]
626async fn get_repo_by_repo_did_400_on_invalid_did() {
627 let server = MockServer::start().await;
628 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
629 let app = router(state);
630 let resp = app
631 .oneshot(xrpc_request(
632 "sh.tangled.repo.getRepoByRepoDid",
633 "repoDid",
634 "not-a-did",
635 ))
636 .await
637 .unwrap();
638 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
639}