Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_xrpc::{AppState, router};
12use futures::stream::{self, StreamExt};
13use http::{Request, StatusCode};
14use jacquard_common::DefaultStr;
15use jacquard_common::types::did::Did;
16use jacquard_common::types::nsid::Nsid;
17use jacquard_common::types::recordkey::Rkey;
18use serde_json::{Value, json};
19use tower::ServiceExt;
20use url::Url;
21use url::form_urlencoded::byte_serialize;
22use wiremock::matchers::{method, path, query_param};
23use wiremock::{Mock, MockServer, ResponseTemplate};
24
25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
26
27fn did(s: &str) -> Did<DefaultStr> {
28 Did::new_owned(s).unwrap()
29}
30
31fn rkey(s: &str) -> Rkey<DefaultStr> {
32 Rkey::new_owned(s).unwrap()
33}
34
35fn nsid(s: &'static str) -> Nsid<DefaultStr> {
36 Nsid::new_static(s).unwrap()
37}
38
39async fn fresh_app(server_uri: &Url) -> AppState {
40 AppState::new(
41 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
42 SlingshotClient::with_default_http(server_uri.clone()).unwrap(),
43 Arc::new(EdgeStore::new(RuntimeHasher::default())),
44 Arc::new(StateIndex::new(RuntimeHasher::default())),
45 Arc::new(StateIndex::new(RuntimeHasher::default())),
46 Arc::new(CoverageWatch::new()),
47 Arc::new(
48 KnotProxy::new(
49 KnotProxyConfig::default(),
50 KnotHttpConfig::default(),
51 Arc::new(SystemClock::new()),
52 RuntimeHasher::default(),
53 )
54 .unwrap(),
55 ),
56 Arc::new(SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap())
57 as Arc<dyn SearchReader>,
58 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
59 )
60}
61
62async fn mount_record(
63 server: &MockServer,
64 did: &Did<DefaultStr>,
65 collection: &Nsid<DefaultStr>,
66 rkey: &Rkey<DefaultStr>,
67 value: Value,
68) {
69 let uri = format!(
70 "at://{}/{}/{}",
71 did.as_ref(),
72 collection.as_ref(),
73 rkey.as_ref()
74 );
75 let body = json!({ "uri": uri, "cid": CID, "value": value });
76 Mock::given(method("GET"))
77 .and(path("/xrpc/com.atproto.repo.getRecord"))
78 .and(query_param("repo", did.as_ref()))
79 .and(query_param("collection", collection.as_ref()))
80 .and(query_param("rkey", rkey.as_ref()))
81 .respond_with(ResponseTemplate::new(200).set_body_json(body))
82 .mount(server)
83 .await;
84}
85
86fn xrpc_request(endpoint: &str, param: &str, value: &str) -> Request<Body> {
87 Request::builder()
88 .uri(format!("/xrpc/{endpoint}?{param}={value}"))
89 .body(Body::empty())
90 .unwrap()
91}
92
93fn xrpc_request_escaped(endpoint: &str, param: &str, value: &str) -> Request<Body> {
94 let encoded: String = byte_serialize(value.as_bytes()).collect();
95 Request::builder()
96 .uri(format!("/xrpc/{endpoint}?{param}={encoded}"))
97 .body(Body::empty())
98 .unwrap()
99}
100
101async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
102 let status = resp.status();
103 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
104 let parsed: Value = serde_json::from_slice(&bytes).expect("response is JSON");
105 (status, parsed)
106}
107
108#[tokio::test]
109async fn cold_start_serves_all_four_point_lookups() {
110 let server = MockServer::start().await;
111 let clam = did("did:plc:clam");
112
113 mount_record(
114 &server,
115 &clam,
116 &nsid("sh.tangled.repo"),
117 &rkey("r1"),
118 json!({
119 "$type": "sh.tangled.repo",
120 "name": "clam",
121 "knot": "oyster.cafe",
122 "createdAt": "2026-05-01T00:00:00Z"
123 }),
124 )
125 .await;
126
127 mount_record(
128 &server,
129 &clam,
130 &nsid("sh.tangled.actor.profile"),
131 &rkey("self"),
132 json!({
133 "$type": "sh.tangled.actor.profile",
134 "bluesky": false,
135 "description": "clam shell"
136 }),
137 )
138 .await;
139
140 mount_record(
141 &server,
142 &clam,
143 &nsid("sh.tangled.repo.issue"),
144 &rkey("i1"),
145 json!({
146 "$type": "sh.tangled.repo.issue",
147 "repo": "did:plc:limpet",
148 "title": "broken",
149 "createdAt": "2026-05-01T00:00:00Z"
150 }),
151 )
152 .await;
153
154 mount_record(
155 &server,
156 &clam,
157 &nsid("sh.tangled.repo.pull"),
158 &rkey("p1"),
159 json!({
160 "$type": "sh.tangled.repo.pull",
161 "title": "ship",
162 "createdAt": "2026-05-01T00:00:00Z",
163 "rounds": [],
164 "target": {"repo": "did:plc:limpet", "branch": "main"}
165 }),
166 )
167 .await;
168
169 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
170 let app = router(state);
171
172 let cases = [
173 (
174 "sh.tangled.repo.getRepo",
175 "repo",
176 format!("at://{}/sh.tangled.repo/r1", clam.as_ref()),
177 "knot",
178 json!("oyster.cafe"),
179 ),
180 (
181 "sh.tangled.actor.getProfile",
182 "actor",
183 format!("at://{}/sh.tangled.actor.profile/self", clam.as_ref()),
184 "description",
185 json!("clam shell"),
186 ),
187 (
188 "sh.tangled.repo.getIssue",
189 "issue",
190 format!("at://{}/sh.tangled.repo.issue/i1", clam.as_ref()),
191 "title",
192 json!("broken"),
193 ),
194 (
195 "sh.tangled.repo.getPull",
196 "pull",
197 format!("at://{}/sh.tangled.repo.pull/p1", clam.as_ref()),
198 "title",
199 json!("ship"),
200 ),
201 ];
202
203 stream::iter(cases)
204 .for_each(|(endpoint, param, at_uri, field, expected)| {
205 let app = app.clone();
206 async move {
207 let resp = app
208 .oneshot(xrpc_request(endpoint, param, &at_uri))
209 .await
210 .unwrap();
211 let (status, body) = json_response(resp).await;
212 assert_eq!(status, StatusCode::OK, "{endpoint} status");
213 assert_eq!(body["uri"], at_uri, "{endpoint} uri");
214 assert_eq!(body["cid"], CID, "{endpoint} cid");
215 assert_eq!(
216 body["value"][field], expected,
217 "{endpoint} body field {field}"
218 );
219 }
220 })
221 .await;
222}
223
224#[tokio::test]
225async fn percent_escaped_at_uri_resolves_identically_to_raw() {
226 let server = MockServer::start().await;
227 let clam = did("did:plc:clam");
228 mount_record(
229 &server,
230 &clam,
231 &nsid("sh.tangled.actor.profile"),
232 &rkey("self"),
233 json!({
234 "$type": "sh.tangled.actor.profile",
235 "bluesky": false,
236 "description": "clam shell"
237 }),
238 )
239 .await;
240
241 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
242 let app = router(state);
243
244 let at_uri = format!("at://{}/sh.tangled.actor.profile/self", clam.as_ref());
245
246 let (raw_status, raw_body) = json_response(
247 app.clone()
248 .oneshot(xrpc_request(
249 "sh.tangled.actor.getProfile",
250 "actor",
251 &at_uri,
252 ))
253 .await
254 .unwrap(),
255 )
256 .await;
257 let (escaped_status, escaped_body) = json_response(
258 app.oneshot(xrpc_request_escaped(
259 "sh.tangled.actor.getProfile",
260 "actor",
261 &at_uri,
262 ))
263 .await
264 .unwrap(),
265 )
266 .await;
267
268 assert_eq!(raw_status, StatusCode::OK, "raw at-uri status");
269 assert_eq!(escaped_status, StatusCode::OK, "escaped at-uri status");
270 assert_eq!(
271 raw_body, escaped_body,
272 "raw and escaped must resolve identically"
273 );
274 assert_eq!(escaped_body["uri"], at_uri);
275}
276
277#[tokio::test]
278async fn second_call_is_served_from_lru() {
279 let server = MockServer::start().await;
280 let uni = did("did:plc:uni");
281 let mock = Mock::given(method("GET"))
282 .and(path("/xrpc/com.atproto.repo.getRecord"))
283 .and(query_param("repo", uni.as_ref()))
284 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
285 "uri": format!("at://{}/sh.tangled.repo/r1", uni.as_ref()),
286 "cid": CID,
287 "value": {
288 "$type": "sh.tangled.repo",
289 "name": "uni",
290 "knot": "witchcraft.systems",
291 "createdAt": "2026-05-01T00:00:00Z"
292 }
293 })))
294 .expect(1)
295 .mount_as_scoped(&server)
296 .await;
297
298 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
299 let app = router(state);
300 let req_uri = format!("at://{}/sh.tangled.repo/r1", uni.as_ref());
301
302 stream::iter(0..3)
303 .for_each(|_| {
304 let app = app.clone();
305 let req_uri = req_uri.clone();
306 async move {
307 let resp = app
308 .oneshot(xrpc_request("sh.tangled.repo.getRepo", "repo", &req_uri))
309 .await
310 .unwrap();
311 assert_eq!(resp.status(), StatusCode::OK);
312 }
313 })
314 .await;
315
316 drop(mock);
317}
318
319#[tokio::test]
320async fn collection_mismatch_is_400() {
321 let server = MockServer::start().await;
322 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
323 let app = router(state);
324 let resp = app
325 .oneshot(xrpc_request(
326 "sh.tangled.repo.getRepo",
327 "repo",
328 "at://did:plc:clam/sh.tangled.actor.profile/self",
329 ))
330 .await
331 .unwrap();
332 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
333}
334
335#[tokio::test]
336async fn handle_authority_is_400() {
337 let server = MockServer::start().await;
338 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
339 let app = router(state);
340 let resp = app
341 .oneshot(xrpc_request(
342 "sh.tangled.repo.getRepo",
343 "repo",
344 "at://witchcraft.systems/sh.tangled.repo/r1",
345 ))
346 .await
347 .unwrap();
348 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
349}
350
351#[tokio::test]
352async fn slingshot_404_propagates_as_404() {
353 let server = MockServer::start().await;
354 Mock::given(method("GET"))
355 .and(path("/xrpc/com.atproto.repo.getRecord"))
356 .respond_with(ResponseTemplate::new(404))
357 .mount(&server)
358 .await;
359 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
360 let app = router(state);
361 let resp = app
362 .oneshot(xrpc_request(
363 "sh.tangled.repo.getRepo",
364 "repo",
365 "at://did:plc:clam/sh.tangled.repo/missing",
366 ))
367 .await
368 .unwrap();
369 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
370}
371
372#[tokio::test]
373async fn wrong_record_type_is_502() {
374 let server = MockServer::start().await;
375 mount_record(
376 &server,
377 &did("did:plc:clam"),
378 &nsid("sh.tangled.repo"),
379 &rkey("r1"),
380 json!({
381 "$type": "sh.tangled.knot",
382 "knot": "oyster.cafe",
383 "createdAt": "2026-05-01T00:00:00Z"
384 }),
385 )
386 .await;
387 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
388 let app = router(state);
389 let resp = app
390 .oneshot(xrpc_request(
391 "sh.tangled.repo.getRepo",
392 "repo",
393 "at://did:plc:clam/sh.tangled.repo/r1",
394 ))
395 .await
396 .unwrap();
397 let (status, body) = json_response(resp).await;
398 assert_eq!(status, StatusCode::BAD_GATEWAY);
399 assert_eq!(body["error"], "InvalidRecord");
400}
401
402#[tokio::test]
403async fn wrong_type_does_not_poison_cache() {
404 let server = MockServer::start().await;
405 let mock = Mock::given(method("GET"))
406 .and(path("/xrpc/com.atproto.repo.getRecord"))
407 .and(query_param("repo", "did:plc:clam"))
408 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
409 "uri": "at://did:plc:clam/sh.tangled.repo/r1",
410 "cid": CID,
411 "value": {
412 "$type": "sh.tangled.knot",
413 "knot": "oyster.cafe",
414 "createdAt": "2026-05-01T00:00:00Z"
415 }
416 })))
417 .expect(2)
418 .mount_as_scoped(&server)
419 .await;
420 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
421 let app = router(state);
422 let req = || {
423 xrpc_request(
424 "sh.tangled.repo.getRepo",
425 "repo",
426 "at://did:plc:clam/sh.tangled.repo/r1",
427 )
428 };
429 let first = app.clone().oneshot(req()).await.unwrap();
430 assert_eq!(first.status(), StatusCode::BAD_GATEWAY);
431 let second = app.clone().oneshot(req()).await.unwrap();
432 assert_eq!(second.status(), StatusCode::BAD_GATEWAY);
433 drop(mock);
434}
435
436#[tokio::test]
437async fn profile_with_empty_preferred_handle_is_tolerated() {
438 let server = MockServer::start().await;
439 let nel = did("did:plc:nel");
440 mount_record(
441 &server,
442 &nel,
443 &nsid("sh.tangled.actor.profile"),
444 &rkey("self"),
445 json!({
446 "$type": "sh.tangled.actor.profile",
447 "bluesky": true,
448 "preferredHandle": "",
449 "description": "empty handle, valid profile"
450 }),
451 )
452 .await;
453 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
454 let app = router(state);
455 let at_uri = format!("at://{}/sh.tangled.actor.profile/self", nel.as_ref());
456 let resp = app
457 .oneshot(xrpc_request(
458 "sh.tangled.actor.getProfile",
459 "actor",
460 &at_uri,
461 ))
462 .await
463 .unwrap();
464 let (status, body) = json_response(resp).await;
465 assert_eq!(status, StatusCode::OK, "status: {body}");
466 assert_eq!(body["uri"], at_uri);
467 assert_eq!(body["value"]["description"], "empty handle, valid profile");
468 assert!(body["value"]["preferredHandle"].is_null());
469}
470
471#[tokio::test]
472async fn missing_uri_param_returns_json_envelope() {
473 let server = MockServer::start().await;
474 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
475 let app = router(state);
476 let req = Request::builder()
477 .uri("/xrpc/sh.tangled.repo.getRepo")
478 .body(Body::empty())
479 .unwrap();
480 let resp = app.oneshot(req).await.unwrap();
481 let (status, body) = json_response(resp).await;
482 assert_eq!(status, StatusCode::BAD_REQUEST);
483 assert_eq!(body["error"], "InvalidRequest");
484 assert!(body["message"].is_string());
485}
486
487#[tokio::test]
488async fn malformed_at_uri_returns_400_envelope() {
489 let server = MockServer::start().await;
490 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
491 let app = router(state);
492 let resp = app
493 .oneshot(xrpc_request(
494 "sh.tangled.repo.getRepo",
495 "repo",
496 "definitely-not-an-at-uri",
497 ))
498 .await
499 .unwrap();
500 let (status, body) = json_response(resp).await;
501 assert_eq!(status, StatusCode::BAD_REQUEST);
502 assert_eq!(body["error"], "InvalidRequest");
503}
504
505#[tokio::test]
506async fn upstream_uri_mismatch_routes_to_invalid_record() {
507 let server = MockServer::start().await;
508 Mock::given(method("GET"))
509 .and(path("/xrpc/com.atproto.repo.getRecord"))
510 .and(query_param("repo", "did:plc:clam"))
511 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
512 "uri": "at://did:plc:limpet/sh.tangled.repo/elsewhere",
513 "cid": CID,
514 "value": {
515 "$type": "sh.tangled.repo",
516 "knot": "oyster.cafe",
517 "createdAt": "2026-05-01T00:00:00Z"
518 }
519 })))
520 .mount(&server)
521 .await;
522 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
523 let app = router(state);
524 let resp = app
525 .oneshot(xrpc_request(
526 "sh.tangled.repo.getRepo",
527 "repo",
528 "at://did:plc:clam/sh.tangled.repo/r1",
529 ))
530 .await
531 .unwrap();
532 let (status, body) = json_response(resp).await;
533 assert_eq!(status, StatusCode::BAD_GATEWAY);
534 assert_eq!(body["error"], "InvalidRecord");
535}
536
537#[tokio::test]
538async fn upstream_garbage_cid_routes_to_invalid_record() {
539 let server = MockServer::start().await;
540 Mock::given(method("GET"))
541 .and(path("/xrpc/com.atproto.repo.getRecord"))
542 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
543 "uri": "at://did:plc:clam/sh.tangled.repo/r1",
544 "cid": "not-a-real-cid",
545 "value": {
546 "$type": "sh.tangled.repo",
547 "knot": "oyster.cafe",
548 "createdAt": "2026-05-01T00:00:00Z"
549 }
550 })))
551 .mount(&server)
552 .await;
553 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
554 let app = router(state);
555 let resp = app
556 .oneshot(xrpc_request(
557 "sh.tangled.repo.getRepo",
558 "repo",
559 "at://did:plc:clam/sh.tangled.repo/r1",
560 ))
561 .await
562 .unwrap();
563 let (status, body) = json_response(resp).await;
564 assert_eq!(status, StatusCode::BAD_GATEWAY);
565 assert_eq!(body["error"], "InvalidRecord");
566}
567
568#[tokio::test]
569async fn oversize_upstream_body_routes_to_upstream_failed() {
570 let server = MockServer::start().await;
571 let payload = vec![b'x'; 8 * 1024 * 1024];
572 Mock::given(method("GET"))
573 .and(path("/xrpc/com.atproto.repo.getRecord"))
574 .respond_with(
575 ResponseTemplate::new(200)
576 .insert_header("content-type", "application/json")
577 .set_body_bytes(payload),
578 )
579 .mount(&server)
580 .await;
581 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
582 let app = router(state);
583 let resp = app
584 .oneshot(xrpc_request(
585 "sh.tangled.repo.getRepo",
586 "repo",
587 "at://did:plc:clam/sh.tangled.repo/r1",
588 ))
589 .await
590 .unwrap();
591 let (status, body) = json_response(resp).await;
592 assert_eq!(status, StatusCode::BAD_GATEWAY);
593 assert_eq!(body["error"], "UpstreamFailed");
594}
595
596#[tokio::test]
597async fn upstream_503_routes_to_upstream_failed() {
598 let server = MockServer::start().await;
599 Mock::given(method("GET"))
600 .and(path("/xrpc/com.atproto.repo.getRecord"))
601 .respond_with(ResponseTemplate::new(503))
602 .mount(&server)
603 .await;
604 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
605 let app = router(state);
606 let resp = app
607 .oneshot(xrpc_request(
608 "sh.tangled.repo.getRepo",
609 "repo",
610 "at://did:plc:clam/sh.tangled.repo/r1",
611 ))
612 .await
613 .unwrap();
614 let (status, body) = json_response(resp).await;
615 assert_eq!(status, StatusCode::BAD_GATEWAY);
616 assert_eq!(body["error"], "UpstreamFailed");
617}
618
619#[tokio::test]
620async fn get_repo_by_repo_did_returns_observed_record() {
621 let server = MockServer::start().await;
622 let owner_did = did("did:plc:scallop");
623 let rk = rkey("r1");
624 let repo_did = did("did:plc:limpet");
625 mount_record(
626 &server,
627 &owner_did,
628 &nsid("sh.tangled.repo"),
629 &rk,
630 json!({
631 "$type": "sh.tangled.repo",
632 "name": "scallop",
633 "knot": "oyster.cafe",
634 "createdAt": "2026-05-01T00:00:00Z",
635 "repoDid": repo_did.as_ref(),
636 }),
637 )
638 .await;
639
640 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
641 state
642 .resolver
643 .observe(owner_did.clone(), rk.clone(), Some(repo_did.clone()))
644 .await;
645
646 let app = router(state);
647 let resp = app
648 .oneshot(xrpc_request(
649 "sh.tangled.repo.getRepoByRepoDid",
650 "repoDid",
651 repo_did.as_ref(),
652 ))
653 .await
654 .unwrap();
655 let (status, body) = json_response(resp).await;
656 assert_eq!(status, StatusCode::OK);
657 assert_eq!(
658 body["uri"],
659 format!(
660 "at://{}/sh.tangled.repo/{}",
661 owner_did.as_ref(),
662 rk.as_ref()
663 )
664 );
665 assert_eq!(body["value"]["name"], "scallop");
666 assert_eq!(body["value"]["repoDid"], repo_did.as_ref());
667}
668
669#[tokio::test]
670async fn get_repo_by_repo_did_404_when_unobserved() {
671 let server = MockServer::start().await;
672 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
673 let app = router(state);
674 let resp = app
675 .oneshot(xrpc_request(
676 "sh.tangled.repo.getRepoByRepoDid",
677 "repoDid",
678 "did:plc:whelk",
679 ))
680 .await
681 .unwrap();
682 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
683}
684
685#[tokio::test]
686async fn get_repo_by_repo_did_400_on_invalid_did() {
687 let server = MockServer::start().await;
688 let state = fresh_app(&Url::parse(&server.uri()).unwrap()).await;
689 let app = router(state);
690 let resp = app
691 .oneshot(xrpc_request(
692 "sh.tangled.repo.getRepoByRepoDid",
693 "repoDid",
694 "not-a-did",
695 ))
696 .await
697 .unwrap();
698 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
699}