Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 34 kB View raw
1use std::sync::Arc; 2use std::time::Duration; 3 4use axum::body::{Body, to_bytes}; 5use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 6use bobbin_knot_proxy::{FailureThreshold, KnotHttpConfig, KnotProxy, KnotProxyConfig}; 7use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 8use bobbin_resolver::RepoIdResolver; 9use bobbin_runtime::{RuntimeHasher, SystemClock}; 10use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 11use bobbin_slingshot_client::SlingshotClient; 12use bobbin_xrpc::{AppState, router}; 13use http::{Request, StatusCode}; 14use jacquard_common::DefaultStr; 15use jacquard_common::types::did::Did; 16use jacquard_common::types::recordkey::Rkey; 17use serde_json::{Value, json}; 18use tower::ServiceExt; 19use url::Url; 20use url::form_urlencoded::byte_serialize; 21use wiremock::matchers::{header_exists, method, path, query_param}; 22use wiremock::{Mock, MockServer, ResponseTemplate}; 23 24const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 25 26fn did(s: &str) -> Did<DefaultStr> { 27 Did::new_owned(s).unwrap() 28} 29 30fn rkey(s: &str) -> Rkey<DefaultStr> { 31 Rkey::new_owned(s).unwrap() 32} 33 34fn test_config() -> KnotProxyConfig { 35 KnotProxyConfig { 36 failure_threshold: FailureThreshold::new(2).unwrap(), 37 cooldown: Duration::from_millis(80), 38 allow_private_hosts: true, 39 require_https: false, 40 } 41} 42 43fn test_http_config() -> KnotHttpConfig { 44 KnotHttpConfig { 45 connect_timeout: Duration::from_millis(500), 46 read_timeout: Duration::from_secs(2), 47 } 48} 49 50struct Harness { 51 slingshot: MockServer, 52 knot: MockServer, 53 state: AppState, 54} 55 56impl Harness { 57 async fn new() -> Self { 58 Self::with_config(test_config()).await 59 } 60 61 async fn with_config(config: KnotProxyConfig) -> Self { 62 let slingshot_server = MockServer::start().await; 63 let knot_server = MockServer::start().await; 64 let state = AppState::new( 65 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 66 SlingshotClient::with_default_http(Url::parse(&slingshot_server.uri()).unwrap()) 67 .unwrap(), 68 Arc::new(EdgeStore::new(RuntimeHasher::default())), 69 Arc::new(StateIndex::new(RuntimeHasher::default())), 70 Arc::new(StateIndex::new(RuntimeHasher::default())), 71 Arc::new(CoverageWatch::new()), 72 Arc::new( 73 KnotProxy::new( 74 config, 75 test_http_config(), 76 Arc::new(SystemClock::new()), 77 RuntimeHasher::default(), 78 ) 79 .unwrap(), 80 ), 81 Arc::new( 82 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 83 ) as Arc<dyn SearchReader>, 84 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 85 ); 86 Self { 87 slingshot: slingshot_server, 88 knot: knot_server, 89 state, 90 } 91 } 92 93 async fn mount_repo_record(&self, did: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>, name: &str) { 94 self.mount_repo_record_inner(did, rkey, Some(name)).await; 95 } 96 97 async fn mount_repo_record_rkey_as_name(&self, did: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) { 98 self.mount_repo_record_inner(did, rkey, None).await; 99 } 100 101 async fn mount_repo_record_inner( 102 &self, 103 did: &Did<DefaultStr>, 104 rkey: &Rkey<DefaultStr>, 105 name: Option<&str>, 106 ) { 107 let knot_value = self.knot.uri(); 108 let mut record = json!({ 109 "$type": "sh.tangled.repo", 110 "createdAt": "2026-05-01T00:00:00Z", 111 "knot": knot_value, 112 }); 113 if let Some(n) = name { 114 record["name"] = json!(n); 115 } 116 let uri = format!("at://{}/sh.tangled.repo/{}", did.as_ref(), rkey.as_ref()); 117 Mock::given(method("GET")) 118 .and(path("/xrpc/com.atproto.repo.getRecord")) 119 .and(query_param("repo", did.as_ref())) 120 .and(query_param("collection", "sh.tangled.repo")) 121 .and(query_param("rkey", rkey.as_ref())) 122 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 123 "uri": uri, 124 "cid": CID, 125 "value": record, 126 }))) 127 .mount(&self.slingshot) 128 .await; 129 } 130 131 async fn call(&self, path_and_query: &str) -> http::Response<Body> { 132 self.call_with_headers(path_and_query, &[]).await 133 } 134 135 async fn call_with_headers( 136 &self, 137 path_and_query: &str, 138 client_headers: &[(&str, &str)], 139 ) -> http::Response<Body> { 140 let builder = client_headers 141 .iter() 142 .fold(Request::builder().uri(path_and_query), |b, (k, v)| { 143 b.header(*k, *v) 144 }); 145 router(self.state.clone()) 146 .oneshot(builder.body(Body::empty()).unwrap()) 147 .await 148 .expect("router infallible") 149 } 150} 151 152fn enc(s: &str) -> String { 153 byte_serialize(s.as_bytes()).collect() 154} 155 156async fn body_string(resp: http::Response<Body>) -> String { 157 let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); 158 String::from_utf8(body.to_vec()).expect("response body is utf-8") 159} 160 161async fn body_value(resp: http::Response<Body>) -> Value { 162 let s = body_string(resp).await; 163 serde_json::from_str(&s).unwrap_or_else(|e| panic!("body not json: {e}: {s}")) 164} 165 166#[tokio::test] 167async fn proxies_repo_blob_with_did_slash_name_repo_param() { 168 let h = Harness::new().await; 169 let tid = "3jzfcijpj2z2a"; 170 h.mount_repo_record(&did("did:plc:abalone"), &rkey(tid), "barnacle") 171 .await; 172 Mock::given(method("GET")) 173 .and(path("/xrpc/sh.tangled.repo.blob")) 174 .and(query_param("repo", "did:plc:abalone/barnacle")) 175 .and(query_param("ref", "main")) 176 .and(query_param("path", "README.md")) 177 .respond_with( 178 ResponseTemplate::new(200) 179 .set_body_raw(r#"{"path":"README.md","content":"hi"}"#, "application/json"), 180 ) 181 .mount(&h.knot) 182 .await; 183 184 let target = format!( 185 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=README.md", 186 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid}")), 187 ); 188 let resp = h.call(&target).await; 189 assert_eq!(resp.status(), StatusCode::OK); 190 assert_eq!( 191 resp.headers().get("content-type").unwrap(), 192 "application/json", 193 ); 194 let v = body_value(resp).await; 195 assert_eq!(v["path"], "README.md"); 196 assert_eq!(v["content"], "hi"); 197} 198 199#[tokio::test] 200async fn modern_rkey_as_name_uses_rkey_even_when_name_field_set() { 201 let h = Harness::new().await; 202 h.mount_repo_record(&did("did:plc:abalone"), &rkey("core"), "Tangled Core") 203 .await; 204 Mock::given(method("GET")) 205 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch")) 206 .and(query_param("repo", "did:plc:abalone/core")) 207 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 208 "hash": "abc", 209 "name": "main", 210 "when": "2026-05-01T00:00:00Z", 211 }))) 212 .mount(&h.knot) 213 .await; 214 215 let target = format!( 216 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}", 217 enc("at://did:plc:abalone/sh.tangled.repo/core"), 218 ); 219 let resp = h.call(&target).await; 220 assert_eq!(resp.status(), StatusCode::OK); 221 let v = body_value(resp).await; 222 assert_eq!(v["name"], "main"); 223} 224 225#[tokio::test] 226async fn modern_rkey_as_name_works_when_name_field_null() { 227 let h = Harness::new().await; 228 h.mount_repo_record_rkey_as_name(&did("did:plc:abalone"), &rkey("core")) 229 .await; 230 Mock::given(method("GET")) 231 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch")) 232 .and(query_param("repo", "did:plc:abalone/core")) 233 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 234 "hash": "abc", 235 "name": "main", 236 "when": "2026-05-01T00:00:00Z", 237 }))) 238 .mount(&h.knot) 239 .await; 240 241 let target = format!( 242 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}", 243 enc("at://did:plc:abalone/sh.tangled.repo/core"), 244 ); 245 let resp = h.call(&target).await; 246 assert_eq!(resp.status(), StatusCode::OK); 247} 248 249#[tokio::test] 250async fn legacy_tid_rkey_falls_back_to_name_field() { 251 let h = Harness::new().await; 252 let tid_rkey = "3jzfcijpj2z2a"; 253 h.mount_repo_record(&did("did:plc:abalone"), &rkey(tid_rkey), "dotfiles") 254 .await; 255 Mock::given(method("GET")) 256 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch")) 257 .and(query_param("repo", "did:plc:abalone/dotfiles")) 258 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 259 "hash": "abc", 260 "name": "main", 261 "when": "2026-05-01T00:00:00Z", 262 }))) 263 .mount(&h.knot) 264 .await; 265 266 let target = format!( 267 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}", 268 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid_rkey}")), 269 ); 270 let resp = h.call(&target).await; 271 assert_eq!(resp.status(), StatusCode::OK); 272} 273 274#[tokio::test] 275async fn tid_rkey_without_name_falls_back_to_tid() { 276 let h = Harness::new().await; 277 let tid_rkey = "3jzfcijpj2z2a"; 278 h.mount_repo_record_rkey_as_name(&did("did:plc:abalone"), &rkey(tid_rkey)) 279 .await; 280 Mock::given(method("GET")) 281 .and(path("/xrpc/sh.tangled.repo.getDefaultBranch")) 282 .and(query_param("repo", format!("did:plc:abalone/{tid_rkey}"))) 283 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 284 "hash": "abc", 285 "name": "main", 286 "when": "2026-05-01T00:00:00Z", 287 }))) 288 .mount(&h.knot) 289 .await; 290 let target = format!( 291 "/xrpc/sh.tangled.repo.getDefaultBranch?repo={}", 292 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid_rkey}")), 293 ); 294 let resp = h.call(&target).await; 295 assert_eq!(resp.status(), StatusCode::OK); 296} 297 298#[tokio::test] 299async fn streams_binary_archive_through_proxy() { 300 let h = Harness::new().await; 301 let tid = "3jzfcijpj2z2b"; 302 h.mount_repo_record(&did("did:plc:limpet"), &rkey(tid), "kelp") 303 .await; 304 let payload: Vec<u8> = (0u8..=255).collect(); 305 Mock::given(method("GET")) 306 .and(path("/xrpc/sh.tangled.repo.archive")) 307 .and(query_param("repo", "did:plc:limpet/kelp")) 308 .and(query_param("ref", "v1")) 309 .respond_with( 310 ResponseTemplate::new(200) 311 .insert_header("content-type", "application/gzip") 312 .set_body_bytes(payload.clone()), 313 ) 314 .mount(&h.knot) 315 .await; 316 317 let target = format!( 318 "/xrpc/sh.tangled.repo.archive?repo={}&ref=v1", 319 enc(&format!("at://did:plc:limpet/sh.tangled.repo/{tid}")), 320 ); 321 let resp = h.call(&target).await; 322 assert_eq!(resp.status(), StatusCode::OK); 323 assert_eq!( 324 resp.headers().get("content-type").unwrap(), 325 "application/gzip", 326 ); 327 let body = to_bytes(resp.into_body(), 4 * 1024).await.unwrap(); 328 assert_eq!(body.as_ref(), payload.as_slice()); 329} 330 331#[tokio::test] 332async fn missing_repo_param_returns_400() { 333 let h = Harness::new().await; 334 let resp = h.call("/xrpc/sh.tangled.repo.blob?ref=main").await; 335 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 336 let v = body_value(resp).await; 337 assert_eq!(v["error"], "InvalidRequest"); 338} 339 340#[tokio::test] 341async fn unknown_repo_propagates_404_from_slingshot() { 342 let h = Harness::new().await; 343 Mock::given(method("GET")) 344 .and(path("/xrpc/com.atproto.repo.getRecord")) 345 .respond_with(ResponseTemplate::new(404).set_body_string("not found")) 346 .mount(&h.slingshot) 347 .await; 348 let target = format!( 349 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main", 350 enc("at://did:plc:abalone/sh.tangled.repo/missing"), 351 ); 352 let resp = h.call(&target).await; 353 assert_eq!(resp.status(), StatusCode::NOT_FOUND); 354 let v = body_value(resp).await; 355 assert_eq!(v["error"], "RecordNotFound"); 356} 357 358#[tokio::test] 359async fn knot_5xx_routes_to_upstream_failed() { 360 let h = Harness::new().await; 361 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle") 362 .await; 363 Mock::given(method("GET")) 364 .and(path("/xrpc/sh.tangled.repo.blob")) 365 .respond_with(ResponseTemplate::new(503)) 366 .mount(&h.knot) 367 .await; 368 let target = format!( 369 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=x", 370 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 371 ); 372 let resp = h.call(&target).await; 373 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY); 374 let v = body_value(resp).await; 375 assert_eq!(v["error"], "UpstreamFailed"); 376} 377 378#[tokio::test] 379async fn knot_4xx_passes_through_unchanged() { 380 let h = Harness::new().await; 381 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle") 382 .await; 383 Mock::given(method("GET")) 384 .and(path("/xrpc/sh.tangled.repo.blob")) 385 .respond_with(ResponseTemplate::new(404).set_body_raw( 386 r#"{"error":"FileNotFound","message":"nope"}"#, 387 "application/json", 388 )) 389 .mount(&h.knot) 390 .await; 391 let target = format!( 392 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=missing", 393 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 394 ); 395 let resp = h.call(&target).await; 396 assert_eq!(resp.status(), StatusCode::NOT_FOUND); 397 let v = body_value(resp).await; 398 assert_eq!(v["error"], "FileNotFound"); 399} 400 401#[tokio::test] 402async fn breaker_opens_after_threshold_then_short_circuits() { 403 let h = Harness::new().await; 404 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle") 405 .await; 406 Mock::given(method("GET")) 407 .and(path("/xrpc/sh.tangled.repo.blob")) 408 .respond_with(ResponseTemplate::new(503)) 409 .mount(&h.knot) 410 .await; 411 let target = format!( 412 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=x", 413 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 414 ); 415 let r1 = h.call(&target).await; 416 assert_eq!(r1.status(), StatusCode::BAD_GATEWAY); 417 let _ = body_string(r1).await; 418 let r2 = h.call(&target).await; 419 assert_eq!(r2.status(), StatusCode::BAD_GATEWAY); 420 let _ = body_string(r2).await; 421 let r3 = h.call(&target).await; 422 assert_eq!(r3.status(), StatusCode::BAD_GATEWAY); 423 let v = body_value(r3).await; 424 assert!( 425 v["message"] 426 .as_str() 427 .unwrap_or_default() 428 .contains("circuit breaker open"), 429 "third call must be short-circuited by breaker, got {v}", 430 ); 431} 432 433#[tokio::test] 434async fn proxy_owner_uses_knot_query_param() { 435 let h = Harness::new().await; 436 Mock::given(method("GET")) 437 .and(path("/xrpc/sh.tangled.owner")) 438 .respond_with( 439 ResponseTemplate::new(200) 440 .set_body_raw(r#"{"owner":"did:plc:nautilus"}"#, "application/json"), 441 ) 442 .mount(&h.knot) 443 .await; 444 let target = format!("/xrpc/sh.tangled.owner?knot={}", enc(&h.knot.uri())); 445 let resp = h.call(&target).await; 446 assert_eq!(resp.status(), StatusCode::OK); 447 let v = body_value(resp).await; 448 assert_eq!(v["owner"], "did:plc:nautilus"); 449} 450 451#[tokio::test] 452async fn proxy_knot_version_uses_knot_query_param() { 453 let h = Harness::new().await; 454 Mock::given(method("GET")) 455 .and(path("/xrpc/sh.tangled.knot.version")) 456 .respond_with( 457 ResponseTemplate::new(200).set_body_raw(r#"{"version":"0.42"}"#, "application/json"), 458 ) 459 .mount(&h.knot) 460 .await; 461 let target = format!("/xrpc/sh.tangled.knot.version?knot={}", enc(&h.knot.uri())); 462 let resp = h.call(&target).await; 463 assert_eq!(resp.status(), StatusCode::OK); 464 let v = body_value(resp).await; 465 assert_eq!(v["version"], "0.42"); 466} 467 468#[tokio::test] 469async fn proxy_knot_list_keys_forwards_pagination_params() { 470 let h = Harness::new().await; 471 Mock::given(method("GET")) 472 .and(path("/xrpc/sh.tangled.knot.listKeys")) 473 .and(query_param("limit", "5")) 474 .and(query_param("cursor", "abc")) 475 .respond_with(ResponseTemplate::new(200).set_body_raw(r#"{"keys":[]}"#, "application/json")) 476 .mount(&h.knot) 477 .await; 478 let target = format!( 479 "/xrpc/sh.tangled.knot.listKeys?knot={}&limit=5&cursor=abc", 480 enc(&h.knot.uri()), 481 ); 482 let resp = h.call(&target).await; 483 assert_eq!(resp.status(), StatusCode::OK); 484} 485 486#[tokio::test] 487async fn missing_knot_param_on_knot_route_returns_400() { 488 let h = Harness::new().await; 489 let resp = h.call("/xrpc/sh.tangled.knot.version").await; 490 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 491 let v = body_value(resp).await; 492 assert_eq!(v["error"], "InvalidRequest"); 493} 494 495#[tokio::test] 496async fn second_proxy_call_skips_slingshot_via_lru() { 497 let h = Harness::new().await; 498 let tid = "3jzfcijpj2z2c"; 499 h.mount_repo_record(&did("did:plc:abalone"), &rkey(tid), "barnacle") 500 .await; 501 Mock::given(method("GET")) 502 .and(path("/xrpc/sh.tangled.repo.tree")) 503 .and(query_param("repo", "did:plc:abalone/barnacle")) 504 .and(query_param("ref", "main")) 505 .respond_with( 506 ResponseTemplate::new(200) 507 .set_body_raw(r#"{"ref":"main","files":[]}"#, "application/json"), 508 ) 509 .mount(&h.knot) 510 .await; 511 let target = format!( 512 "/xrpc/sh.tangled.repo.tree?repo={}&ref=main", 513 enc(&format!("at://did:plc:abalone/sh.tangled.repo/{tid}")), 514 ); 515 let r1 = h.call(&target).await; 516 assert_eq!(r1.status(), StatusCode::OK); 517 let _ = body_string(r1).await; 518 let r2 = h.call(&target).await; 519 assert_eq!(r2.status(), StatusCode::OK); 520 let _ = body_string(r2).await; 521 let received = h.slingshot.received_requests().await.unwrap(); 522 let getrecord = received 523 .iter() 524 .filter(|r| r.url.path() == "/xrpc/com.atproto.repo.getRecord") 525 .count(); 526 assert_eq!( 527 getrecord, 1, 528 "slingshot must be hit exactly once because the LRU serves the second proxy call", 529 ); 530} 531 532#[tokio::test] 533async fn does_not_inject_auth_or_atproto_proxy_headers() { 534 let h = Harness::new().await; 535 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle") 536 .await; 537 Mock::given(method("GET")) 538 .and(path("/xrpc/sh.tangled.repo.blob")) 539 .and(header_exists("user-agent")) 540 .respond_with( 541 ResponseTemplate::new(200).set_body_raw(r#"{"path":"x"}"#, "application/json"), 542 ) 543 .mount(&h.knot) 544 .await; 545 let target = format!( 546 "/xrpc/sh.tangled.repo.blob?repo={}&ref=main&path=x", 547 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 548 ); 549 let resp = h.call(&target).await; 550 assert_eq!(resp.status(), StatusCode::OK); 551 let received = h.knot.received_requests().await.unwrap(); 552 let knot_call = received 553 .iter() 554 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.blob") 555 .expect("knot received the proxied call"); 556 assert!( 557 knot_call.headers.get("authorization").is_none(), 558 "bobbin must not inject auth, anonymous read by design", 559 ); 560 assert!( 561 knot_call.headers.get("atproto-proxy").is_none(), 562 "bobbin is not an atproto-proxy chain", 563 ); 564 assert!( 565 knot_call.headers.get("atproto-accept-labelers").is_none(), 566 "bobbin does not negotiate labelers with knots", 567 ); 568} 569 570#[tokio::test] 571async fn forwards_range_and_conditional_request_headers() { 572 let h = Harness::new().await; 573 let tid = "3jzfcijpj2z2d"; 574 h.mount_repo_record(&did("did:plc:limpet"), &rkey(tid), "kelp") 575 .await; 576 Mock::given(method("GET")) 577 .and(path("/xrpc/sh.tangled.repo.archive")) 578 .and(query_param("repo", "did:plc:limpet/kelp")) 579 .respond_with( 580 ResponseTemplate::new(206) 581 .insert_header("content-type", "application/octet-stream") 582 .insert_header("content-range", "bytes 0-99/2048") 583 .insert_header("accept-ranges", "bytes") 584 .insert_header("etag", "\"v1\"") 585 .set_body_bytes(vec![0u8; 100]), 586 ) 587 .mount(&h.knot) 588 .await; 589 590 let target = format!( 591 "/xrpc/sh.tangled.repo.archive?repo={}&ref=v1", 592 enc(&format!("at://did:plc:limpet/sh.tangled.repo/{tid}")), 593 ); 594 let resp = h 595 .call_with_headers( 596 &target, 597 &[ 598 ("range", "bytes=0-99"), 599 ("if-none-match", "\"old\""), 600 ("if-modified-since", "Wed, 01 May 2026 00:00:00 GMT"), 601 ], 602 ) 603 .await; 604 assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); 605 assert_eq!( 606 resp.headers().get("content-range").unwrap(), 607 "bytes 0-99/2048" 608 ); 609 assert_eq!(resp.headers().get("accept-ranges").unwrap(), "bytes"); 610 assert_eq!(resp.headers().get("etag").unwrap(), "\"v1\""); 611 612 let received = h.knot.received_requests().await.unwrap(); 613 let knot_call = received 614 .iter() 615 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.archive") 616 .expect("knot received the proxied call"); 617 assert_eq!(knot_call.headers.get("range").unwrap(), "bytes=0-99"); 618 assert_eq!(knot_call.headers.get("if-none-match").unwrap(), "\"old\""); 619 assert_eq!( 620 knot_call.headers.get("if-modified-since").unwrap(), 621 "Wed, 01 May 2026 00:00:00 GMT", 622 ); 623} 624 625#[tokio::test] 626async fn drops_disallowed_client_headers() { 627 let h = Harness::new().await; 628 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle") 629 .await; 630 Mock::given(method("GET")) 631 .and(path("/xrpc/sh.tangled.repo.blob")) 632 .respond_with( 633 ResponseTemplate::new(200).set_body_raw(r#"{"path":"x"}"#, "application/json"), 634 ) 635 .mount(&h.knot) 636 .await; 637 let target = format!( 638 "/xrpc/sh.tangled.repo.blob?repo={}&path=x", 639 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 640 ); 641 let resp = h 642 .call_with_headers( 643 &target, 644 &[ 645 ("authorization", "Bearer secret"), 646 ("cookie", "sid=evil"), 647 ("x-custom", "should-not-pass"), 648 ], 649 ) 650 .await; 651 assert_eq!(resp.status(), StatusCode::OK); 652 let received = h.knot.received_requests().await.unwrap(); 653 let knot_call = received 654 .iter() 655 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.blob") 656 .expect("knot received the proxied call"); 657 assert!(knot_call.headers.get("authorization").is_none()); 658 assert!(knot_call.headers.get("cookie").is_none()); 659 assert!(knot_call.headers.get("x-custom").is_none()); 660} 661 662#[tokio::test] 663async fn rejects_client_supplied_loopback_under_strict_config() { 664 let strict = KnotProxyConfig { 665 allow_private_hosts: false, 666 ..test_config() 667 }; 668 let h = Harness::with_config(strict).await; 669 let resp = h 670 .call(&format!( 671 "/xrpc/sh.tangled.knot.version?knot={}", 672 enc("http://127.0.0.1:9"), 673 )) 674 .await; 675 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 676 let v = body_value(resp).await; 677 assert_eq!(v["error"], "InvalidRequest"); 678 let msg = v["message"].as_str().unwrap_or_default().to_owned(); 679 assert!( 680 msg.contains("loopback") || msg.contains("blocked"), 681 "message should explain block reason, got {msg}", 682 ); 683} 684 685#[tokio::test] 686async fn rejects_client_supplied_link_local_metadata_endpoint() { 687 let strict = KnotProxyConfig { 688 allow_private_hosts: false, 689 ..test_config() 690 }; 691 let h = Harness::with_config(strict).await; 692 let resp = h 693 .call(&format!( 694 "/xrpc/sh.tangled.knot.version?knot={}", 695 enc("http://169.254.169.254"), 696 )) 697 .await; 698 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 699 let v = body_value(resp).await; 700 assert_eq!(v["error"], "InvalidRequest"); 701} 702 703#[tokio::test] 704async fn record_with_private_knot_returns_invalid_record() { 705 let strict = KnotProxyConfig { 706 allow_private_hosts: false, 707 ..test_config() 708 }; 709 let h = Harness::with_config(strict).await; 710 let owner = did("did:plc:abalone"); 711 let rk = rkey("r1"); 712 let record = json!({ 713 "$type": "sh.tangled.repo", 714 "createdAt": "2026-05-01T00:00:00Z", 715 "knot": "http://10.0.0.5:3000", 716 "name": "barnacle", 717 }); 718 let uri = format!("at://{}/sh.tangled.repo/{}", owner.as_ref(), rk.as_ref()); 719 Mock::given(method("GET")) 720 .and(path("/xrpc/com.atproto.repo.getRecord")) 721 .and(query_param("repo", owner.as_ref())) 722 .and(query_param("collection", "sh.tangled.repo")) 723 .and(query_param("rkey", rk.as_ref())) 724 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 725 "uri": uri, 726 "cid": CID, 727 "value": record, 728 }))) 729 .mount(&h.slingshot) 730 .await; 731 let resp = h 732 .call(&format!( 733 "/xrpc/sh.tangled.repo.blob?repo={}&path=x", 734 enc(&uri), 735 )) 736 .await; 737 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY); 738 let v = body_value(resp).await; 739 assert_eq!(v["error"], "InvalidRecord"); 740} 741 742#[tokio::test] 743async fn strips_basic_auth_from_credentialed_knot_url() { 744 let h = Harness::new().await; 745 let parsed = Url::parse(&h.knot.uri()).unwrap(); 746 let knot_with_creds = format!( 747 "{}://attacker:secret@{}:{}/", 748 parsed.scheme(), 749 parsed.host_str().unwrap(), 750 parsed.port().unwrap(), 751 ); 752 let owner = did("did:plc:abalone"); 753 let rk = rkey("r1"); 754 let record = json!({ 755 "$type": "sh.tangled.repo", 756 "createdAt": "2026-05-01T00:00:00Z", 757 "knot": knot_with_creds, 758 "name": "barnacle", 759 }); 760 let uri = format!("at://{}/sh.tangled.repo/{}", owner.as_ref(), rk.as_ref()); 761 Mock::given(method("GET")) 762 .and(path("/xrpc/com.atproto.repo.getRecord")) 763 .and(query_param("repo", owner.as_ref())) 764 .and(query_param("collection", "sh.tangled.repo")) 765 .and(query_param("rkey", rk.as_ref())) 766 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 767 "uri": uri, 768 "cid": CID, 769 "value": record, 770 }))) 771 .mount(&h.slingshot) 772 .await; 773 Mock::given(method("GET")) 774 .and(path("/xrpc/sh.tangled.repo.blob")) 775 .respond_with( 776 ResponseTemplate::new(200).set_body_raw(r#"{"path":"x"}"#, "application/json"), 777 ) 778 .mount(&h.knot) 779 .await; 780 let target = format!("/xrpc/sh.tangled.repo.blob?repo={}&path=x", enc(&uri)); 781 let resp = h.call(&target).await; 782 assert_eq!(resp.status(), StatusCode::OK); 783 let received = h.knot.received_requests().await.unwrap(); 784 let knot_call = received 785 .iter() 786 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.blob") 787 .expect("knot received the proxied call"); 788 assert!( 789 knot_call.headers.get("authorization").is_none(), 790 "userinfo in knot field must not become an Authorization header", 791 ); 792} 793 794#[tokio::test] 795async fn knot_redirect_surfaces_as_upstream_failed() { 796 let h = Harness::new().await; 797 let secondary = MockServer::start().await; 798 h.mount_repo_record(&did("did:plc:abalone"), &rkey("r1"), "barnacle") 799 .await; 800 Mock::given(method("GET")) 801 .and(path("/xrpc/sh.tangled.repo.blob")) 802 .respond_with( 803 ResponseTemplate::new(302) 804 .insert_header("location", &format!("{}/secret", secondary.uri())), 805 ) 806 .mount(&h.knot) 807 .await; 808 Mock::given(method("GET")) 809 .and(path("/secret")) 810 .respond_with(ResponseTemplate::new(200).set_body_string("leaked")) 811 .mount(&secondary) 812 .await; 813 let resp = h 814 .call(&format!( 815 "/xrpc/sh.tangled.repo.blob?repo={}&path=x", 816 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 817 )) 818 .await; 819 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY); 820 let v = body_value(resp).await; 821 assert_eq!(v["error"], "UpstreamFailed"); 822 let received = secondary.received_requests().await.unwrap(); 823 assert!(received.is_empty(), "redirect target must not be dialled"); 824} 825 826#[tokio::test] 827async fn forwards_repeated_query_params() { 828 let h = Harness::new().await; 829 h.mount_repo_record(&did("did:plc:limpet"), &rkey("r4"), "kelp") 830 .await; 831 Mock::given(method("GET")) 832 .and(path("/xrpc/sh.tangled.repo.tags")) 833 .respond_with(ResponseTemplate::new(200).set_body_raw(r#"{"tags":[]}"#, "application/json")) 834 .mount(&h.knot) 835 .await; 836 let target = format!( 837 "/xrpc/sh.tangled.repo.tags?repo={}&filter=alpha&filter=beta", 838 enc("at://did:plc:limpet/sh.tangled.repo/r4"), 839 ); 840 let resp = h.call(&target).await; 841 assert_eq!(resp.status(), StatusCode::OK); 842 let received = h.knot.received_requests().await.unwrap(); 843 let knot_call = received 844 .iter() 845 .find(|r| r.url.path() == "/xrpc/sh.tangled.repo.tags") 846 .expect("knot received the proxied call"); 847 let filters: Vec<String> = knot_call 848 .url 849 .query_pairs() 850 .filter(|(k, _)| k == "filter") 851 .map(|(_, v)| v.into_owned()) 852 .collect(); 853 assert_eq!(filters, vec!["alpha".to_owned(), "beta".to_owned()]); 854} 855 856#[tokio::test] 857async fn duplicate_repo_param_rejected_as_invalid_request() { 858 let h = Harness::new().await; 859 let target = format!( 860 "/xrpc/sh.tangled.repo.blob?repo={}&repo={}", 861 enc("at://did:plc:abalone/sh.tangled.repo/r1"), 862 enc("at://did:plc:limpet/sh.tangled.repo/r2"), 863 ); 864 let resp = h.call(&target).await; 865 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 866 let v = body_value(resp).await; 867 assert_eq!(v["error"], "InvalidRequest"); 868 assert!( 869 v["message"] 870 .as_str() 871 .unwrap_or_default() 872 .contains("repo parameter must appear at most once"), 873 "got {v}", 874 ); 875} 876 877#[tokio::test] 878async fn duplicate_knot_param_rejected_as_invalid_request() { 879 let h = Harness::new().await; 880 let target = format!( 881 "/xrpc/sh.tangled.knot.version?knot={}&knot={}", 882 enc("https://oyster.cafe"), 883 enc("https://nel.pet"), 884 ); 885 let resp = h.call(&target).await; 886 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 887 let v = body_value(resp).await; 888 assert_eq!(v["error"], "InvalidRequest"); 889} 890 891#[tokio::test] 892async fn rejects_client_supplied_plaintext_when_https_required() { 893 let strict = KnotProxyConfig { 894 require_https: true, 895 ..test_config() 896 }; 897 let h = Harness::with_config(strict).await; 898 let resp = h 899 .call(&format!( 900 "/xrpc/sh.tangled.knot.version?knot={}", 901 enc("http://oyster.cafe"), 902 )) 903 .await; 904 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 905 let v = body_value(resp).await; 906 assert_eq!(v["error"], "InvalidRequest"); 907 assert!( 908 v["message"] 909 .as_str() 910 .unwrap_or_default() 911 .contains("must be https"), 912 "got {v}", 913 ); 914} 915 916#[tokio::test] 917async fn record_with_plaintext_knot_returns_invalid_record_when_https_required() { 918 let strict = KnotProxyConfig { 919 require_https: true, 920 allow_private_hosts: true, 921 ..test_config() 922 }; 923 let h = Harness::with_config(strict).await; 924 let owner = did("did:plc:abalone"); 925 let rk = rkey("r1"); 926 let record = json!({ 927 "$type": "sh.tangled.repo", 928 "createdAt": "2026-05-01T00:00:00Z", 929 "knot": "http://oyster.cafe", 930 "name": "barnacle", 931 }); 932 let uri = format!("at://{}/sh.tangled.repo/{}", owner.as_ref(), rk.as_ref()); 933 Mock::given(method("GET")) 934 .and(path("/xrpc/com.atproto.repo.getRecord")) 935 .and(query_param("repo", owner.as_ref())) 936 .and(query_param("collection", "sh.tangled.repo")) 937 .and(query_param("rkey", rk.as_ref())) 938 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 939 "uri": uri, 940 "cid": CID, 941 "value": record, 942 }))) 943 .mount(&h.slingshot) 944 .await; 945 let resp = h 946 .call(&format!( 947 "/xrpc/sh.tangled.repo.blob?repo={}&path=x", 948 enc(&uri), 949 )) 950 .await; 951 assert_eq!(resp.status(), StatusCode::BAD_GATEWAY); 952 let v = body_value(resp).await; 953 assert_eq!(v["error"], "InvalidRecord"); 954 assert!( 955 v["message"] 956 .as_str() 957 .unwrap_or_default() 958 .contains("requires https"), 959 "got {v}", 960 ); 961} 962 963#[tokio::test] 964async fn knot_not_modified_passes_through() { 965 let h = Harness::new().await; 966 h.mount_repo_record(&did("did:plc:limpet"), &rkey("r5"), "kelp") 967 .await; 968 Mock::given(method("GET")) 969 .and(path("/xrpc/sh.tangled.repo.archive")) 970 .respond_with(ResponseTemplate::new(304).insert_header("etag", "\"v1\"")) 971 .mount(&h.knot) 972 .await; 973 let target = format!( 974 "/xrpc/sh.tangled.repo.archive?repo={}&ref=v1", 975 enc("at://did:plc:limpet/sh.tangled.repo/r5"), 976 ); 977 let resp = h 978 .call_with_headers(&target, &[("if-none-match", "\"v1\"")]) 979 .await; 980 assert_eq!(resp.status(), StatusCode::NOT_MODIFIED); 981 assert_eq!(resp.headers().get("etag").unwrap(), "\"v1\""); 982}