Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2use std::time::Duration;
3
4use axum::body::{Body, to_bytes};
5use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
6use bobbin_knot_proxy::{FailureThreshold, KnotHttpConfig, KnotProxy, KnotProxyConfig};
7use bobbin_record_lru::{CacheCapacity, LruRecordStore};
8use bobbin_resolver::RepoIdResolver;
9use bobbin_runtime::{RuntimeHasher, SystemClock};
10use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
11use bobbin_slingshot_client::SlingshotClient;
12use bobbin_xrpc::{AppState, router};
13use http::{Request, StatusCode};
14use jacquard_common::DefaultStr;
15use jacquard_common::types::did::Did;
16use jacquard_common::types::recordkey::Rkey;
17use serde_json::{Value, json};
18use tower::ServiceExt;
19use url::Url;
20use url::form_urlencoded::byte_serialize;
21use wiremock::matchers::{header_exists, method, path, query_param};
22use wiremock::{Mock, MockServer, ResponseTemplate};
23
24const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
25
26fn did(s: &str) -> Did<DefaultStr> {
27 Did::new_owned(s).unwrap()
28}
29
30fn rkey(s: &str) -> Rkey<DefaultStr> {
31 Rkey::new_owned(s).unwrap()
32}
33
34fn test_config() -> KnotProxyConfig {
35 KnotProxyConfig {
36 failure_threshold: FailureThreshold::new(2).unwrap(),
37 cooldown: Duration::from_millis(80),
38 allow_private_hosts: true,
39 require_https: false,
40 }
41}
42
43fn test_http_config() -> KnotHttpConfig {
44 KnotHttpConfig {
45 connect_timeout: Duration::from_millis(500),
46 read_timeout: Duration::from_secs(2),
47 }
48}
49
50struct Harness {
51 slingshot: MockServer,
52 knot: MockServer,
53 state: AppState,
54}
55
56impl Harness {
57 async fn new() -> Self {
58 Self::with_config(test_config()).await
59 }
60
61 async fn with_config(config: KnotProxyConfig) -> Self {
62 let slingshot_server = MockServer::start().await;
63 let knot_server = MockServer::start().await;
64 let state = AppState::new(
65 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
66 SlingshotClient::with_default_http(Url::parse(&slingshot_server.uri()).unwrap())
67 .unwrap(),
68 Arc::new(EdgeStore::new(RuntimeHasher::default())),
69 Arc::new(StateIndex::new(RuntimeHasher::default())),
70 Arc::new(StateIndex::new(RuntimeHasher::default())),
71 Arc::new(CoverageWatch::new()),
72 Arc::new(
73 KnotProxy::new(
74 config,
75 test_http_config(),
76 Arc::new(SystemClock::new()),
77 RuntimeHasher::default(),
78 )
79 .unwrap(),
80 ),
81 Arc::new(
82 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
83 ) as Arc<dyn SearchReader>,
84 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
85 );
86 Self {
87 slingshot: slingshot_server,
88 knot: knot_server,
89 state,
90 }
91 }
92
93 async fn mount_repo_record(&self, did: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>, name: &str) {
94 self.mount_repo_record_inner(did, rkey, Some(name)).await;
95 }
96
97 async fn mount_repo_record_rkey_as_name(&self, did: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) {
98 self.mount_repo_record_inner(did, rkey, None).await;
99 }
100
101 async fn mount_repo_record_inner(
102 &self,
103 did: &Did<DefaultStr>,
104 rkey: &Rkey<DefaultStr>,
105 name: Option<&str>,
106 ) {
107 let knot_value = self.knot.uri();
108 let mut record = json!({
109 "$type": "sh.tangled.repo",
110 "createdAt": "2026-05-01T00:00:00Z",
111 "knot": knot_value,
112 });
113 if let Some(n) = name {
114 record["name"] = json!(n);
115 }
116 let uri = format!("at://{}/sh.tangled.repo/{}", did.as_ref(), rkey.as_ref());
117 Mock::given(method("GET"))
118 .and(path("/xrpc/com.atproto.repo.getRecord"))
119 .and(query_param("repo", did.as_ref()))
120 .and(query_param("collection", "sh.tangled.repo"))
121 .and(query_param("rkey", rkey.as_ref()))
122 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
123 "uri": uri,
124 "cid": CID,
125 "value": record,
126 })))
127 .mount(&self.slingshot)
128 .await;
129 }
130
131 async fn call(&self, path_and_query: &str) -> http::Response<Body> {
132 self.call_with_headers(path_and_query, &[]).await
133 }
134
135 async fn call_with_headers(
136 &self,
137 path_and_query: &str,
138 client_headers: &[(&str, &str)],
139 ) -> http::Response<Body> {
140 let builder = client_headers
141 .iter()
142 .fold(Request::builder().uri(path_and_query), |b, (k, v)| {
143 b.header(*k, *v)
144 });
145 router(self.state.clone())
146 .oneshot(builder.body(Body::empty()).unwrap())
147 .await
148 .expect("router infallible")
149 }
150}
151
152fn enc(s: &str) -> String {
153 byte_serialize(s.as_bytes()).collect()
154}
155
156async fn body_string(resp: http::Response<Body>) -> String {
157 let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap();
158 String::from_utf8(body.to_vec()).expect("response body is utf-8")
159}
160
161async fn body_value(resp: http::Response<Body>) -> Value {
162 let s = body_string(resp).await;
163 serde_json::from_str(&s).unwrap_or_else(|e| panic!("body not json: {e}: {s}"))
164}
165
166#[tokio::test]
167async fn proxies_repo_blob_with_did_slash_name_repo_param() {
168 let h = Harness::new().await;
169 let tid = "3jzfcijpj2z2a";
170 h.mount_repo_record(&did("did:plc:abalone"), &rkey(tid), "barnacle")
171 .await;
172 Mock::given(method("GET"))
173 .and(path("/xrpc/sh.tangled.repo.blob"))
174 .and(query_param("repo", "did:plc:abalone/barnacle"))
175 .and(query_param("ref", "main"))
176 .and(query_param("path", "README.md"))
177 .respond_with(
178 ResponseTemplate::new(200)
179 .set_body_raw(r#"{"path":"README.md","content":"hi"}"#, "application/json"),
180 )
181 .mount(&h.knot)
182 .await;
183
184 let target = format!(
185 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=README.md",
186 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid}")),
187 );
188 let resp = h.call(&target).await;
189 assert_eq!(resp.status(), StatusCode::OK);
190 assert_eq!(
191 resp.headers().get("content-type").unwrap(),
192 "application/json",
193 );
194 let v = body_value(resp).await;
195 assert_eq!(v["path"], "README.md");
196 assert_eq!(v["content"], "hi");
197}
198
199#[tokio::test]
200async fn modern_rkey_as_name_uses_rkey_even_when_name_field_set() {
201 let h = Harness::new().await;
202 h.mount_repo_record(&did("did:plc:abalone"), &rkey("core"), "Tangled Core")
203 .await;
204 Mock::given(method("GET"))
205 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch"))
206 .and(query_param("repo", "did:plc:abalone/core"))
207 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
208 "hash": "abc",
209 "name": "main",
210 "when": "2026-05-01T00:00:00Z",
211 })))
212 .mount(&h.knot)
213 .await;
214
215 let target = format!(
216 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}",
217 enc("at://did:plc:abalone/sh.tangled.repo/core"),
218 );
219 let resp = h.call(&target).await;
220 assert_eq!(resp.status(), StatusCode::OK);
221 let v = body_value(resp).await;
222 assert_eq!(v["name"], "main");
223}
224
225#[tokio::test]
226async fn modern_rkey_as_name_works_when_name_field_null() {
227 let h = Harness::new().await;
228 h.mount_repo_record_rkey_as_name(&did("did:plc:abalone"), &rkey("core"))
229 .await;
230 Mock::given(method("GET"))
231 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch"))
232 .and(query_param("repo", "did:plc:abalone/core"))
233 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
234 "hash": "abc",
235 "name": "main",
236 "when": "2026-05-01T00:00:00Z",
237 })))
238 .mount(&h.knot)
239 .await;
240
241 let target = format!(
242 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}",
243 enc("at://did:plc:abalone/sh.tangled.repo/core"),
244 );
245 let resp = h.call(&target).await;
246 assert_eq!(resp.status(), StatusCode::OK);
247}
248
249#[tokio::test]
250async fn legacy_tid_rkey_falls_back_to_name_field() {
251 let h = Harness::new().await;
252 let tid_rkey = "3jzfcijpj2z2a";
253 h.mount_repo_record(&did("did:plc:abalone"), &rkey(tid_rkey), "dotfiles")
254 .await;
255 Mock::given(method("GET"))
256 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch"))
257 .and(query_param("repo", "did:plc:abalone/dotfiles"))
258 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
259 "hash": "abc",
260 "name": "main",
261 "when": "2026-05-01T00:00:00Z",
262 })))
263 .mount(&h.knot)
264 .await;
265
266 let target = format!(
267 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}",
268 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid_rkey}")),
269 );
270 let resp = h.call(&target).await;
271 assert_eq!(resp.status(), StatusCode::OK);
272}
273
274#[tokio::test]
275async fn tid_rkey_without_name_falls_back_to_tid() {
276 let h = Harness::new().await;
277 let tid_rkey = "3jzfcijpj2z2a";
278 h.mount_repo_record_rkey_as_name(&did("did:plc:abalone"), &rkey(tid_rkey))
279 .await;
280 Mock::given(method("GET"))
281 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch"))
282 .and(query_param("repo", format!("did:plc:abalone/{tid_rkey}")))
283 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
284 "hash": "abc",
285 "name": "main",
286 "when": "2026-05-01T00:00:00Z",
287 })))
288 .mount(&h.knot)
289 .await;
290 let target = format!(
291 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}",
292 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid_rkey}")),
293 );
294 let resp = h.call(&target).await;
295 assert_eq!(resp.status(), StatusCode::OK);
296}
297
298#[tokio::test]
299async fn streams_binary_archive_through_proxy() {
300 let h = Harness::new().await;
301 let tid = "3jzfcijpj2z2b";
302 h.mount_repo_record(&did("did:plc:limpet"), &rkey(tid), "kelp")
303 .await;
304 let payload: Vec<u8> = (0u8..=255).collect();
305 Mock::given(method("GET"))
306 .and(path("/xrpc/sh.tangled.repo.archive"))
307 .and(query_param("repo", "did:plc:limpet/kelp"))
308 .and(query_param("ref", "v1"))
309 .respond_with(
310 ResponseTemplate::new(200)
311 .insert_header("content-type", "application/gzip")
312 .set_body_bytes(payload.clone()),
313 )
314 .mount(&h.knot)
315 .await;
316
317 let target = format!(
318 "/xrpc/sh.tangled.repo.archive?repo={}&ref=v1",
319 enc(&format!("at://did:plc:limpet/sh.tangled.repo/{tid}")),
320 );
321 let resp = h.call(&target).await;
322 assert_eq!(resp.status(), StatusCode::OK);
323 assert_eq!(
324 resp.headers().get("content-type").unwrap(),
325 "application/gzip",
326 );
327 let body = to_bytes(resp.into_body(), 4 * 1024).await.unwrap();
328 assert_eq!(body.as_ref(), payload.as_slice());
329}
330
331#[tokio::test]
332async fn missing_repo_param_returns_400() {
333 let h = Harness::new().await;
334 let resp = h.call("/xrpc/sh.tangled.repo.blob?ref=main").await;
335 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
336 let v = body_value(resp).await;
337 assert_eq!(v["error"], "InvalidRequest");
338}
339
340#[tokio::test]
341async fn unknown_repo_propagates_404_from_slingshot() {
342 let h = Harness::new().await;
343 Mock::given(method("GET"))
344 .and(path("/xrpc/com.atproto.repo.getRecord"))
345 .respond_with(ResponseTemplate::new(404).set_body_string("not found"))
346 .mount(&h.slingshot)
347 .await;
348 let target = format!(
349 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main",
350 enc("at://did:plc:abalone/sh.tangled.repo/missing"),
351 );
352 let resp = h.call(&target).await;
353 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
354 let v = body_value(resp).await;
355 assert_eq!(v["error"], "RecordNotFound");
356}
357
358#[tokio::test]
359async fn knot_5xx_routes_to_upstream_failed() {
360 let h = Harness::new().await;
361 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle")
362 .await;
363 Mock::given(method("GET"))
364 .and(path("/xrpc/sh.tangled.repo.blob"))
365 .respond_with(ResponseTemplate::new(503))
366 .mount(&h.knot)
367 .await;
368 let target = format!(
369 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=x",
370 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
371 );
372 let resp = h.call(&target).await;
373 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
374 let v = body_value(resp).await;
375 assert_eq!(v["error"], "UpstreamFailed");
376}
377
378#[tokio::test]
379async fn knot_4xx_passes_through_unchanged() {
380 let h = Harness::new().await;
381 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle")
382 .await;
383 Mock::given(method("GET"))
384 .and(path("/xrpc/sh.tangled.repo.blob"))
385 .respond_with(ResponseTemplate::new(404).set_body_raw(
386 r#"{"error":"FileNotFound","message":"nope"}"#,
387 "application/json",
388 ))
389 .mount(&h.knot)
390 .await;
391 let target = format!(
392 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=missing",
393 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
394 );
395 let resp = h.call(&target).await;
396 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
397 let v = body_value(resp).await;
398 assert_eq!(v["error"], "FileNotFound");
399}
400
401#[tokio::test]
402async fn breaker_opens_after_threshold_then_short_circuits() {
403 let h = Harness::new().await;
404 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle")
405 .await;
406 Mock::given(method("GET"))
407 .and(path("/xrpc/sh.tangled.repo.blob"))
408 .respond_with(ResponseTemplate::new(503))
409 .mount(&h.knot)
410 .await;
411 let target = format!(
412 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=x",
413 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
414 );
415 let r1 = h.call(&target).await;
416 assert_eq!(r1.status(), StatusCode::BAD_GATEWAY);
417 let _ = body_string(r1).await;
418 let r2 = h.call(&target).await;
419 assert_eq!(r2.status(), StatusCode::BAD_GATEWAY);
420 let _ = body_string(r2).await;
421 let r3 = h.call(&target).await;
422 assert_eq!(r3.status(), StatusCode::BAD_GATEWAY);
423 let v = body_value(r3).await;
424 assert!(
425 v["message"]
426 .as_str()
427 .unwrap_or_default()
428 .contains("circuit breaker open"),
429 "third call must be short-circuited by breaker, got {v}",
430 );
431}
432
433#[tokio::test]
434async fn proxy_owner_uses_knot_query_param() {
435 let h = Harness::new().await;
436 Mock::given(method("GET"))
437 .and(path("/xrpc/sh.tangled.owner"))
438 .respond_with(
439 ResponseTemplate::new(200)
440 .set_body_raw(r#"{"owner":"did:plc:nautilus"}"#, "application/json"),
441 )
442 .mount(&h.knot)
443 .await;
444 let target = format!("/xrpc/sh.tangled.owner?knot={}", enc(&h.knot.uri()));
445 let resp = h.call(&target).await;
446 assert_eq!(resp.status(), StatusCode::OK);
447 let v = body_value(resp).await;
448 assert_eq!(v["owner"], "did:plc:nautilus");
449}
450
451#[tokio::test]
452async fn proxy_knot_version_uses_knot_query_param() {
453 let h = Harness::new().await;
454 Mock::given(method("GET"))
455 .and(path("/xrpc/sh.tangled.knot.version"))
456 .respond_with(
457 ResponseTemplate::new(200).set_body_raw(r#"{"version":"0.42"}"#, "application/json"),
458 )
459 .mount(&h.knot)
460 .await;
461 let target = format!("/xrpc/sh.tangled.knot.version?knot={}", enc(&h.knot.uri()));
462 let resp = h.call(&target).await;
463 assert_eq!(resp.status(), StatusCode::OK);
464 let v = body_value(resp).await;
465 assert_eq!(v["version"], "0.42");
466}
467
468#[tokio::test]
469async fn proxy_knot_list_keys_forwards_pagination_params() {
470 let h = Harness::new().await;
471 Mock::given(method("GET"))
472 .and(path("/xrpc/sh.tangled.knot.listKeys"))
473 .and(query_param("limit", "5"))
474 .and(query_param("cursor", "abc"))
475 .respond_with(ResponseTemplate::new(200).set_body_raw(r#"{"keys":[]}"#, "application/json"))
476 .mount(&h.knot)
477 .await;
478 let target = format!(
479 "/xrpc/sh.tangled.knot.listKeys?knot={}&limit=5&cursor=abc",
480 enc(&h.knot.uri()),
481 );
482 let resp = h.call(&target).await;
483 assert_eq!(resp.status(), StatusCode::OK);
484}
485
486#[tokio::test]
487async fn missing_knot_param_on_knot_route_returns_400() {
488 let h = Harness::new().await;
489 let resp = h.call("/xrpc/sh.tangled.knot.version").await;
490 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
491 let v = body_value(resp).await;
492 assert_eq!(v["error"], "InvalidRequest");
493}
494
495#[tokio::test]
496async fn second_proxy_call_skips_slingshot_via_lru() {
497 let h = Harness::new().await;
498 let tid = "3jzfcijpj2z2c";
499 h.mount_repo_record(&did("did:plc:abalone"), &rkey(tid), "barnacle")
500 .await;
501 Mock::given(method("GET"))
502 .and(path("/xrpc/sh.tangled.repo.tree"))
503 .and(query_param("repo", "did:plc:abalone/barnacle"))
504 .and(query_param("ref", "main"))
505 .respond_with(
506 ResponseTemplate::new(200)
507 .set_body_raw(r#"{"ref":"main","files":[]}"#, "application/json"),
508 )
509 .mount(&h.knot)
510 .await;
511 let target = format!(
512 "/xrpc/sh.tangled.repo.tree?repo={}&ref=main",
513 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid}")),
514 );
515 let r1 = h.call(&target).await;
516 assert_eq!(r1.status(), StatusCode::OK);
517 let _ = body_string(r1).await;
518 let r2 = h.call(&target).await;
519 assert_eq!(r2.status(), StatusCode::OK);
520 let _ = body_string(r2).await;
521 let received = h.slingshot.received_requests().await.unwrap();
522 let getrecord = received
523 .iter()
524 .filter(|r| r.url.path() == "/xrpc/com.atproto.repo.getRecord")
525 .count();
526 assert_eq!(
527 getrecord, 1,
528 "slingshot must be hit exactly once because the LRU serves the second proxy call",
529 );
530}
531
532#[tokio::test]
533async fn does_not_inject_auth_or_atproto_proxy_headers() {
534 let h = Harness::new().await;
535 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle")
536 .await;
537 Mock::given(method("GET"))
538 .and(path("/xrpc/sh.tangled.repo.blob"))
539 .and(header_exists("user-agent"))
540 .respond_with(
541 ResponseTemplate::new(200).set_body_raw(r#"{"path":"x"}"#, "application/json"),
542 )
543 .mount(&h.knot)
544 .await;
545 let target = format!(
546 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=x",
547 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
548 );
549 let resp = h.call(&target).await;
550 assert_eq!(resp.status(), StatusCode::OK);
551 let received = h.knot.received_requests().await.unwrap();
552 let knot_call = received
553 .iter()
554 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.blob")
555 .expect("knot received the proxied call");
556 assert!(
557 knot_call.headers.get("authorization").is_none(),
558 "bobbin must not inject auth, anonymous read by design",
559 );
560 assert!(
561 knot_call.headers.get("atproto-proxy").is_none(),
562 "bobbin is not an atproto-proxy chain",
563 );
564 assert!(
565 knot_call.headers.get("atproto-accept-labelers").is_none(),
566 "bobbin does not negotiate labelers with knots",
567 );
568}
569
570#[tokio::test]
571async fn forwards_range_and_conditional_request_headers() {
572 let h = Harness::new().await;
573 let tid = "3jzfcijpj2z2d";
574 h.mount_repo_record(&did("did:plc:limpet"), &rkey(tid), "kelp")
575 .await;
576 Mock::given(method("GET"))
577 .and(path("/xrpc/sh.tangled.repo.archive"))
578 .and(query_param("repo", "did:plc:limpet/kelp"))
579 .respond_with(
580 ResponseTemplate::new(206)
581 .insert_header("content-type", "application/octet-stream")
582 .insert_header("content-range", "bytes 0-99/2048")
583 .insert_header("accept-ranges", "bytes")
584 .insert_header("etag", "\"v1\"")
585 .set_body_bytes(vec![0u8; 100]),
586 )
587 .mount(&h.knot)
588 .await;
589
590 let target = format!(
591 "/xrpc/sh.tangled.repo.archive?repo={}&ref=v1",
592 enc(&format!("at://did:plc:limpet/sh.tangled.repo/{tid}")),
593 );
594 let resp = h
595 .call_with_headers(
596 &target,
597 &[
598 ("range", "bytes=0-99"),
599 ("if-none-match", "\"old\""),
600 ("if-modified-since", "Wed, 01 May 2026 00:00:00 GMT"),
601 ],
602 )
603 .await;
604 assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
605 assert_eq!(
606 resp.headers().get("content-range").unwrap(),
607 "bytes 0-99/2048"
608 );
609 assert_eq!(resp.headers().get("accept-ranges").unwrap(), "bytes");
610 assert_eq!(resp.headers().get("etag").unwrap(), "\"v1\"");
611
612 let received = h.knot.received_requests().await.unwrap();
613 let knot_call = received
614 .iter()
615 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.archive")
616 .expect("knot received the proxied call");
617 assert_eq!(knot_call.headers.get("range").unwrap(), "bytes=0-99");
618 assert_eq!(knot_call.headers.get("if-none-match").unwrap(), "\"old\"");
619 assert_eq!(
620 knot_call.headers.get("if-modified-since").unwrap(),
621 "Wed, 01 May 2026 00:00:00 GMT",
622 );
623}
624
625#[tokio::test]
626async fn drops_disallowed_client_headers() {
627 let h = Harness::new().await;
628 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle")
629 .await;
630 Mock::given(method("GET"))
631 .and(path("/xrpc/sh.tangled.repo.blob"))
632 .respond_with(
633 ResponseTemplate::new(200).set_body_raw(r#"{"path":"x"}"#, "application/json"),
634 )
635 .mount(&h.knot)
636 .await;
637 let target = format!(
638 "/xrpc/sh.tangled.repo.blob?repo={}&path=x",
639 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
640 );
641 let resp = h
642 .call_with_headers(
643 &target,
644 &[
645 ("authorization", "Bearer secret"),
646 ("cookie", "sid=evil"),
647 ("x-custom", "should-not-pass"),
648 ],
649 )
650 .await;
651 assert_eq!(resp.status(), StatusCode::OK);
652 let received = h.knot.received_requests().await.unwrap();
653 let knot_call = received
654 .iter()
655 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.blob")
656 .expect("knot received the proxied call");
657 assert!(knot_call.headers.get("authorization").is_none());
658 assert!(knot_call.headers.get("cookie").is_none());
659 assert!(knot_call.headers.get("x-custom").is_none());
660}
661
662#[tokio::test]
663async fn rejects_client_supplied_loopback_under_strict_config() {
664 let strict = KnotProxyConfig {
665 allow_private_hosts: false,
666 ..test_config()
667 };
668 let h = Harness::with_config(strict).await;
669 let resp = h
670 .call(&format!(
671 "/xrpc/sh.tangled.knot.version?knot={}",
672 enc("http://127.0.0.1:9"),
673 ))
674 .await;
675 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
676 let v = body_value(resp).await;
677 assert_eq!(v["error"], "InvalidRequest");
678 let msg = v["message"].as_str().unwrap_or_default().to_owned();
679 assert!(
680 msg.contains("loopback") || msg.contains("blocked"),
681 "message should explain block reason, got {msg}",
682 );
683}
684
685#[tokio::test]
686async fn rejects_client_supplied_link_local_metadata_endpoint() {
687 let strict = KnotProxyConfig {
688 allow_private_hosts: false,
689 ..test_config()
690 };
691 let h = Harness::with_config(strict).await;
692 let resp = h
693 .call(&format!(
694 "/xrpc/sh.tangled.knot.version?knot={}",
695 enc("http://169.254.169.254"),
696 ))
697 .await;
698 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
699 let v = body_value(resp).await;
700 assert_eq!(v["error"], "InvalidRequest");
701}
702
703#[tokio::test]
704async fn record_with_private_knot_returns_invalid_record() {
705 let strict = KnotProxyConfig {
706 allow_private_hosts: false,
707 ..test_config()
708 };
709 let h = Harness::with_config(strict).await;
710 let owner = did("did:plc:abalone");
711 let rk = rkey("r1");
712 let record = json!({
713 "$type": "sh.tangled.repo",
714 "createdAt": "2026-05-01T00:00:00Z",
715 "knot": "http://10.0.0.5:3000",
716 "name": "barnacle",
717 });
718 let uri = format!("at://{}/sh.tangled.repo/{}", owner.as_ref(), rk.as_ref());
719 Mock::given(method("GET"))
720 .and(path("/xrpc/com.atproto.repo.getRecord"))
721 .and(query_param("repo", owner.as_ref()))
722 .and(query_param("collection", "sh.tangled.repo"))
723 .and(query_param("rkey", rk.as_ref()))
724 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
725 "uri": uri,
726 "cid": CID,
727 "value": record,
728 })))
729 .mount(&h.slingshot)
730 .await;
731 let resp = h
732 .call(&format!(
733 "/xrpc/sh.tangled.repo.blob?repo={}&path=x",
734 enc(&uri),
735 ))
736 .await;
737 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
738 let v = body_value(resp).await;
739 assert_eq!(v["error"], "InvalidRecord");
740}
741
742#[tokio::test]
743async fn strips_basic_auth_from_credentialed_knot_url() {
744 let h = Harness::new().await;
745 let parsed = Url::parse(&h.knot.uri()).unwrap();
746 let knot_with_creds = format!(
747 "{}://attacker:secret@{}:{}/",
748 parsed.scheme(),
749 parsed.host_str().unwrap(),
750 parsed.port().unwrap(),
751 );
752 let owner = did("did:plc:abalone");
753 let rk = rkey("r1");
754 let record = json!({
755 "$type": "sh.tangled.repo",
756 "createdAt": "2026-05-01T00:00:00Z",
757 "knot": knot_with_creds,
758 "name": "barnacle",
759 });
760 let uri = format!("at://{}/sh.tangled.repo/{}", owner.as_ref(), rk.as_ref());
761 Mock::given(method("GET"))
762 .and(path("/xrpc/com.atproto.repo.getRecord"))
763 .and(query_param("repo", owner.as_ref()))
764 .and(query_param("collection", "sh.tangled.repo"))
765 .and(query_param("rkey", rk.as_ref()))
766 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
767 "uri": uri,
768 "cid": CID,
769 "value": record,
770 })))
771 .mount(&h.slingshot)
772 .await;
773 Mock::given(method("GET"))
774 .and(path("/xrpc/sh.tangled.repo.blob"))
775 .respond_with(
776 ResponseTemplate::new(200).set_body_raw(r#"{"path":"x"}"#, "application/json"),
777 )
778 .mount(&h.knot)
779 .await;
780 let target = format!("/xrpc/sh.tangled.repo.blob?repo={}&path=x", enc(&uri));
781 let resp = h.call(&target).await;
782 assert_eq!(resp.status(), StatusCode::OK);
783 let received = h.knot.received_requests().await.unwrap();
784 let knot_call = received
785 .iter()
786 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.blob")
787 .expect("knot received the proxied call");
788 assert!(
789 knot_call.headers.get("authorization").is_none(),
790 "userinfo in knot field must not become an Authorization header",
791 );
792}
793
794#[tokio::test]
795async fn knot_redirect_surfaces_as_upstream_failed() {
796 let h = Harness::new().await;
797 let secondary = MockServer::start().await;
798 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle")
799 .await;
800 Mock::given(method("GET"))
801 .and(path("/xrpc/sh.tangled.repo.blob"))
802 .respond_with(
803 ResponseTemplate::new(302)
804 .insert_header("location", &format!("{}/secret", secondary.uri())),
805 )
806 .mount(&h.knot)
807 .await;
808 Mock::given(method("GET"))
809 .and(path("/secret"))
810 .respond_with(ResponseTemplate::new(200).set_body_string("leaked"))
811 .mount(&secondary)
812 .await;
813 let resp = h
814 .call(&format!(
815 "/xrpc/sh.tangled.repo.blob?repo={}&path=x",
816 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
817 ))
818 .await;
819 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
820 let v = body_value(resp).await;
821 assert_eq!(v["error"], "UpstreamFailed");
822 let received = secondary.received_requests().await.unwrap();
823 assert!(received.is_empty(), "redirect target must not be dialled");
824}
825
826#[tokio::test]
827async fn forwards_repeated_query_params() {
828 let h = Harness::new().await;
829 h.mount_repo_record(&did("did:plc:limpet"), &rkey("r4"), "kelp")
830 .await;
831 Mock::given(method("GET"))
832 .and(path("/xrpc/sh.tangled.repo.tags"))
833 .respond_with(ResponseTemplate::new(200).set_body_raw(r#"{"tags":[]}"#, "application/json"))
834 .mount(&h.knot)
835 .await;
836 let target = format!(
837 "/xrpc/sh.tangled.repo.tags?repo={}&filter=alpha&filter=beta",
838 enc("at://did:plc:limpet/sh.tangled.repo/r4"),
839 );
840 let resp = h.call(&target).await;
841 assert_eq!(resp.status(), StatusCode::OK);
842 let received = h.knot.received_requests().await.unwrap();
843 let knot_call = received
844 .iter()
845 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.tags")
846 .expect("knot received the proxied call");
847 let filters: Vec<String> = knot_call
848 .url
849 .query_pairs()
850 .filter(|(k, _)| k == "filter")
851 .map(|(_, v)| v.into_owned())
852 .collect();
853 assert_eq!(filters, vec!["alpha".to_owned(), "beta".to_owned()]);
854}
855
856#[tokio::test]
857async fn duplicate_repo_param_rejected_as_invalid_request() {
858 let h = Harness::new().await;
859 let target = format!(
860 "/xrpc/sh.tangled.repo.blob?repo={}&repo={}",
861 enc("at://did:plc:abalone/sh.tangled.repo/r1"),
862 enc("at://did:plc:limpet/sh.tangled.repo/r2"),
863 );
864 let resp = h.call(&target).await;
865 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
866 let v = body_value(resp).await;
867 assert_eq!(v["error"], "InvalidRequest");
868 assert!(
869 v["message"]
870 .as_str()
871 .unwrap_or_default()
872 .contains("repo parameter must appear at most once"),
873 "got {v}",
874 );
875}
876
877#[tokio::test]
878async fn duplicate_knot_param_rejected_as_invalid_request() {
879 let h = Harness::new().await;
880 let target = format!(
881 "/xrpc/sh.tangled.knot.version?knot={}&knot={}",
882 enc("https://oyster.cafe"),
883 enc("https://nel.pet"),
884 );
885 let resp = h.call(&target).await;
886 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
887 let v = body_value(resp).await;
888 assert_eq!(v["error"], "InvalidRequest");
889}
890
891#[tokio::test]
892async fn rejects_client_supplied_plaintext_when_https_required() {
893 let strict = KnotProxyConfig {
894 require_https: true,
895 ..test_config()
896 };
897 let h = Harness::with_config(strict).await;
898 let resp = h
899 .call(&format!(
900 "/xrpc/sh.tangled.knot.version?knot={}",
901 enc("http://oyster.cafe"),
902 ))
903 .await;
904 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
905 let v = body_value(resp).await;
906 assert_eq!(v["error"], "InvalidRequest");
907 assert!(
908 v["message"]
909 .as_str()
910 .unwrap_or_default()
911 .contains("must be https"),
912 "got {v}",
913 );
914}
915
916#[tokio::test]
917async fn record_with_plaintext_knot_returns_invalid_record_when_https_required() {
918 let strict = KnotProxyConfig {
919 require_https: true,
920 allow_private_hosts: true,
921 ..test_config()
922 };
923 let h = Harness::with_config(strict).await;
924 let owner = did("did:plc:abalone");
925 let rk = rkey("r1");
926 let record = json!({
927 "$type": "sh.tangled.repo",
928 "createdAt": "2026-05-01T00:00:00Z",
929 "knot": "http://oyster.cafe",
930 "name": "barnacle",
931 });
932 let uri = format!("at://{}/sh.tangled.repo/{}", owner.as_ref(), rk.as_ref());
933 Mock::given(method("GET"))
934 .and(path("/xrpc/com.atproto.repo.getRecord"))
935 .and(query_param("repo", owner.as_ref()))
936 .and(query_param("collection", "sh.tangled.repo"))
937 .and(query_param("rkey", rk.as_ref()))
938 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
939 "uri": uri,
940 "cid": CID,
941 "value": record,
942 })))
943 .mount(&h.slingshot)
944 .await;
945 let resp = h
946 .call(&format!(
947 "/xrpc/sh.tangled.repo.blob?repo={}&path=x",
948 enc(&uri),
949 ))
950 .await;
951 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
952 let v = body_value(resp).await;
953 assert_eq!(v["error"], "InvalidRecord");
954 assert!(
955 v["message"]
956 .as_str()
957 .unwrap_or_default()
958 .contains("requires https"),
959 "got {v}",
960 );
961}
962
963#[tokio::test]
964async fn knot_not_modified_passes_through() {
965 let h = Harness::new().await;
966 h.mount_repo_record(&did("did:plc:limpet"), &rkey("r5"), "kelp")
967 .await;
968 Mock::given(method("GET"))
969 .and(path("/xrpc/sh.tangled.repo.archive"))
970 .respond_with(ResponseTemplate::new(304).insert_header("etag", "\"v1\""))
971 .mount(&h.knot)
972 .await;
973 let target = format!(
974 "/xrpc/sh.tangled.repo.archive?repo={}&ref=v1",
975 enc("at://did:plc:limpet/sh.tangled.repo/r5"),
976 );
977 let resp = h
978 .call_with_headers(&target, &[("if-none-match", "\"v1\"")])
979 .await;
980 assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
981 assert_eq!(resp.headers().get("etag").unwrap(), "\"v1\"");
982}