Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

at icy/ytnwlw 24 kB View raw
1use std::pin::Pin; 2use std::sync::Arc; 3use std::task::{Context, Poll}; 4use std::time::Duration; 5 6use bobbin_runtime::{ 7 BodyStream as InnerBodyStream, Clock, HttpRequest, HttpResponseHead, HttpTransport, 8 NetworkError, ReqwestHttp, RuntimeHasher, 9}; 10use bytes::Bytes; 11use futures::Stream; 12use http::{HeaderMap, StatusCode}; 13use jacquard_common::BosStr; 14use jacquard_common::types::nsid::Nsid; 15use reqwest::{Client, redirect::Policy}; 16use scc::HashMap as SccMap; 17use thiserror::Error; 18use url::Url; 19 20mod breaker; 21mod dns; 22mod host; 23 24pub use breaker::{Breaker, BreakerPermit, CircuitOpen, FailureThreshold, ThresholdError}; 25pub use dns::PrivateAddressFilter; 26pub use host::{KnotHost, KnotHostError, PrivateHostReason, RepoSlug, RepoSlugError, classify_ip}; 27 28const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION")); 29const HTTPS_SCHEME: &str = "https"; 30 31#[derive(Clone, Debug)] 32pub struct KnotProxyConfig { 33 pub failure_threshold: FailureThreshold, 34 pub cooldown: Duration, 35 pub allow_private_hosts: bool, 36 pub require_https: bool, 37} 38 39impl Default for KnotProxyConfig { 40 fn default() -> Self { 41 Self { 42 failure_threshold: FailureThreshold::new(5).expect("nonzero literal"), 43 cooldown: Duration::from_secs(30), 44 allow_private_hosts: false, 45 require_https: true, 46 } 47 } 48} 49 50#[derive(Clone, Copy, Debug)] 51pub struct KnotHttpConfig { 52 pub connect_timeout: Duration, 53 pub read_timeout: Duration, 54} 55 56impl Default for KnotHttpConfig { 57 fn default() -> Self { 58 Self { 59 connect_timeout: Duration::from_secs(5), 60 read_timeout: Duration::from_secs(60), 61 } 62 } 63} 64 65#[derive(Debug, Error)] 66pub enum KnotProxyError { 67 #[error("circuit breaker open")] 68 CircuitOpen, 69 #[error("blocked: host {host} resolves to {reason} address space")] 70 BlockedHost { 71 host: String, 72 reason: PrivateHostReason, 73 }, 74 #[error("blocked: knot {host} requires https, got plaintext http")] 75 PlaintextHttp { host: String }, 76 #[error("connect failed: {0}")] 77 Connect(String), 78 #[error("upstream read timed out: {0}")] 79 Timeout(String), 80 #[error("redirect refused: {0}")] 81 Redirect(String), 82 #[error("transport: {0}")] 83 Transport(String), 84 #[error("upstream returned status {0}")] 85 Upstream(StatusCode), 86} 87 88pub struct KnotProxy { 89 http: Arc<dyn HttpTransport>, 90 breakers: SccMap<KnotHost, Arc<Breaker>, RuntimeHasher>, 91 threshold: FailureThreshold, 92 cooldown: Duration, 93 allow_private_hosts: bool, 94 require_https: bool, 95 clock: Arc<dyn Clock>, 96} 97 98impl KnotProxy { 99 pub fn new( 100 config: KnotProxyConfig, 101 http: KnotHttpConfig, 102 clock: Arc<dyn Clock>, 103 hasher: RuntimeHasher, 104 ) -> Result<Self, reqwest::Error> { 105 let resolver = Arc::new(dns::PrivateAddressFilter::new(config.allow_private_hosts)); 106 let client = Client::builder() 107 .user_agent(USER_AGENT) 108 .connect_timeout(http.connect_timeout) 109 .read_timeout(http.read_timeout) 110 .redirect(Policy::none()) 111 .no_gzip() 112 .no_brotli() 113 .no_deflate() 114 .dns_resolver(resolver) 115 .build()?; 116 Ok(Self::with_transport( 117 ReqwestHttp::shared(client), 118 config, 119 clock, 120 hasher, 121 )) 122 } 123 124 pub fn with_transport( 125 http: Arc<dyn HttpTransport>, 126 config: KnotProxyConfig, 127 clock: Arc<dyn Clock>, 128 hasher: RuntimeHasher, 129 ) -> Self { 130 Self { 131 http, 132 breakers: SccMap::with_hasher(hasher), 133 threshold: config.failure_threshold, 134 cooldown: config.cooldown, 135 allow_private_hosts: config.allow_private_hosts, 136 require_https: config.require_https, 137 clock, 138 } 139 } 140 141 pub fn allows_private_hosts(&self) -> bool { 142 self.allow_private_hosts 143 } 144 145 pub fn requires_https(&self) -> bool { 146 self.require_https 147 } 148 149 pub async fn forward<S: BosStr + AsRef<str>>( 150 &self, 151 host: &KnotHost, 152 nsid: &Nsid<S>, 153 query: &[(&str, &str)], 154 headers: HeaderMap, 155 ) -> Result<ProxyResponse, KnotProxyError> { 156 self.guard_host(host)?; 157 let breaker = self.breaker_for(host).await; 158 let permit = breaker 159 .try_acquire() 160 .map_err(|_: CircuitOpen| KnotProxyError::CircuitOpen)?; 161 let url = build_xrpc_url(host, nsid, query); 162 let outcome = self.http.execute(HttpRequest { url, headers }).await; 163 classify(outcome, permit) 164 } 165 166 fn guard_host(&self, host: &KnotHost) -> Result<(), KnotProxyError> { 167 let host_str = || host.url().host_str().unwrap_or_default().to_owned(); 168 if self.require_https && host.url().scheme() != HTTPS_SCHEME { 169 return Err(KnotProxyError::PlaintextHttp { host: host_str() }); 170 } 171 if self.allow_private_hosts { 172 return Ok(()); 173 } 174 match host.private_literal_reason() { 175 None => Ok(()), 176 Some(reason) => Err(KnotProxyError::BlockedHost { 177 host: host_str(), 178 reason, 179 }), 180 } 181 } 182 183 async fn breaker_for(&self, host: &KnotHost) -> Arc<Breaker> { 184 if let Some(existing) = self.breakers.read_async(host, |_, v| Arc::clone(v)).await { 185 return existing; 186 } 187 let entry = self.breakers.entry_async(host.clone()).await; 188 Arc::clone( 189 entry 190 .or_insert_with(|| { 191 Arc::new(Breaker::new( 192 self.threshold, 193 self.cooldown, 194 self.clock.clone(), 195 )) 196 }) 197 .get(), 198 ) 199 } 200} 201 202pub struct ProxyResponse { 203 status: StatusCode, 204 headers: HeaderMap, 205 body: InnerBodyStream, 206 permit: BreakerPermit, 207} 208 209impl std::fmt::Debug for ProxyResponse { 210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 211 f.debug_struct("ProxyResponse") 212 .field("status", &self.status) 213 .field("headers", &self.headers) 214 .finish_non_exhaustive() 215 } 216} 217 218impl ProxyResponse { 219 pub fn status(&self) -> StatusCode { 220 self.status 221 } 222 223 pub fn headers(&self) -> &HeaderMap { 224 &self.headers 225 } 226 227 pub fn into_body_stream(self) -> BodyStream { 228 BodyStream::new(self.body, self.permit) 229 } 230} 231 232pub struct BodyStream { 233 inner: InnerBodyStream, 234 permit: Option<BreakerPermit>, 235} 236 237impl BodyStream { 238 fn new(inner: InnerBodyStream, permit: BreakerPermit) -> Self { 239 Self { 240 inner, 241 permit: Some(permit), 242 } 243 } 244 245 fn resolve(&mut self, success: bool) { 246 if let Some(permit) = self.permit.take() { 247 if success { 248 permit.record_success(); 249 } else { 250 permit.record_failure(); 251 } 252 } 253 } 254} 255 256impl Stream for BodyStream { 257 type Item = Result<Bytes, NetworkError>; 258 259 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 260 let next = match self.inner.as_mut().poll_next(cx) { 261 Poll::Pending => return Poll::Pending, 262 Poll::Ready(item) => item, 263 }; 264 match &next { 265 Some(Ok(_)) => {} 266 Some(Err(_)) => self.resolve(false), 267 None => self.resolve(true), 268 } 269 Poll::Ready(next) 270 } 271} 272 273fn build_xrpc_url<S: BosStr + AsRef<str>>( 274 host: &KnotHost, 275 nsid: &Nsid<S>, 276 query: &[(&str, &str)], 277) -> Url { 278 let mut url = host.xrpc_url(nsid); 279 { 280 let mut pairs = url.query_pairs_mut(); 281 query.iter().for_each(|(k, v)| { 282 pairs.append_pair(k, v); 283 }); 284 } 285 url 286} 287 288fn classify( 289 outcome: Result<HttpResponseHead, NetworkError>, 290 permit: BreakerPermit, 291) -> Result<ProxyResponse, KnotProxyError> { 292 match outcome { 293 Ok(head) if is_upstream_failure(head.status) => { 294 let status = head.status; 295 permit.record_failure(); 296 Err(KnotProxyError::Upstream(status)) 297 } 298 Ok(head) => Ok(ProxyResponse { 299 status: head.status, 300 headers: head.headers, 301 body: head.body, 302 permit, 303 }), 304 Err(err) => { 305 permit.record_failure(); 306 Err(map_network(err)) 307 } 308 } 309} 310 311fn is_upstream_failure(status: StatusCode) -> bool { 312 status.is_server_error() || is_unfollowable_redirect(status) 313} 314 315fn is_unfollowable_redirect(status: StatusCode) -> bool { 316 matches!(status.as_u16(), 301 | 302 | 303 | 307 | 308) 317} 318 319fn map_network(err: NetworkError) -> KnotProxyError { 320 match err { 321 NetworkError::Timeout(msg) => KnotProxyError::Timeout(msg), 322 NetworkError::Connect(msg) => KnotProxyError::Connect(msg), 323 NetworkError::Redirect(msg) => KnotProxyError::Redirect(msg), 324 NetworkError::Transport(msg) | NetworkError::Body(msg) | NetworkError::Protocol(msg) => { 325 KnotProxyError::Transport(msg) 326 } 327 } 328} 329 330#[cfg(test)] 331mod tests { 332 use super::*; 333 use bobbin_runtime::SystemClock; 334 use futures::stream::TryStreamExt; 335 use jacquard_common::DefaultStr; 336 use tokio::io::AsyncWriteExt; 337 use wiremock::matchers::{method, path, query_param}; 338 use wiremock::{Mock, MockServer, ResponseTemplate}; 339 340 fn nsid(s: &'static str) -> Nsid<DefaultStr> { 341 Nsid::new_static(s).unwrap() 342 } 343 344 pub(crate) fn config_for_test() -> KnotProxyConfig { 345 KnotProxyConfig { 346 failure_threshold: FailureThreshold::new(2).unwrap(), 347 cooldown: Duration::from_millis(80), 348 allow_private_hosts: true, 349 require_https: false, 350 } 351 } 352 353 pub(crate) fn http_config_for_test() -> KnotHttpConfig { 354 KnotHttpConfig { 355 connect_timeout: Duration::from_millis(500), 356 read_timeout: Duration::from_secs(2), 357 } 358 } 359 360 fn proxy_for_test() -> KnotProxy { 361 KnotProxy::new( 362 config_for_test(), 363 http_config_for_test(), 364 Arc::new(SystemClock::new()), 365 RuntimeHasher::default(), 366 ) 367 .unwrap() 368 } 369 370 async fn server() -> MockServer { 371 MockServer::start().await 372 } 373 374 fn host_of(server: &MockServer) -> KnotHost { 375 KnotHost::parse(&server.uri()).unwrap() 376 } 377 378 pub(crate) async fn drain(stream: BodyStream) -> Result<Bytes, NetworkError> { 379 let chunks: Vec<Bytes> = stream.try_collect().await?; 380 let total: usize = chunks.iter().map(|b| b.len()).sum(); 381 let mut buf = bytes::BytesMut::with_capacity(total); 382 chunks.iter().for_each(|c| buf.extend_from_slice(c)); 383 Ok(buf.freeze()) 384 } 385 386 #[tokio::test] 387 async fn forwards_query_params_and_returns_body() { 388 let server = server().await; 389 Mock::given(method("GET")) 390 .and(path("/xrpc/sh.tangled.repo.blob")) 391 .and(query_param("repo", "did:plc:squid/barnacle")) 392 .and(query_param("ref", "main")) 393 .and(query_param("path", "README.md")) 394 .respond_with( 395 ResponseTemplate::new(200) 396 .insert_header("content-type", "application/json") 397 .set_body_string(r#"{"path":"README.md"}"#), 398 ) 399 .mount(&server) 400 .await; 401 402 let proxy = proxy_for_test(); 403 let resp = proxy 404 .forward( 405 &host_of(&server), 406 &nsid("sh.tangled.repo.blob"), 407 &[ 408 ("repo", "did:plc:squid/barnacle"), 409 ("ref", "main"), 410 ("path", "README.md"), 411 ], 412 HeaderMap::new(), 413 ) 414 .await 415 .expect("happy path"); 416 assert_eq!(resp.status(), 200); 417 let body = drain(resp.into_body_stream()).await.unwrap(); 418 assert_eq!(&body[..], br#"{"path":"README.md"}"#); 419 } 420 421 #[tokio::test] 422 async fn five_hundreds_open_breaker() { 423 let server = server().await; 424 Mock::given(method("GET")) 425 .and(path("/xrpc/sh.tangled.repo.blob")) 426 .respond_with(ResponseTemplate::new(503)) 427 .mount(&server) 428 .await; 429 430 let proxy = proxy_for_test(); 431 let host = host_of(&server); 432 let r1 = proxy 433 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 434 .await; 435 assert!(matches!(r1, Err(KnotProxyError::Upstream(_)))); 436 let r2 = proxy 437 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 438 .await; 439 assert!(matches!(r2, Err(KnotProxyError::Upstream(_)))); 440 let r3 = proxy 441 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 442 .await; 443 assert!(matches!(r3, Err(KnotProxyError::CircuitOpen))); 444 } 445 446 #[tokio::test] 447 async fn four_hundreds_do_not_open_breaker() { 448 let server = server().await; 449 Mock::given(method("GET")) 450 .and(path("/xrpc/sh.tangled.repo.blob")) 451 .respond_with(ResponseTemplate::new(404).set_body_string("not found")) 452 .mount(&server) 453 .await; 454 455 let proxy = proxy_for_test(); 456 let host = host_of(&server); 457 let r1 = proxy 458 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 459 .await; 460 assert_eq!(r1.unwrap().status(), 404); 461 let r2 = proxy 462 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 463 .await; 464 assert_eq!(r2.unwrap().status(), 404); 465 let r3 = proxy 466 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 467 .await; 468 assert_eq!( 469 r3.unwrap().status(), 470 404, 471 "client errors must not trip breaker", 472 ); 473 } 474 475 #[tokio::test] 476 async fn breaker_recovers_after_cooldown() { 477 let server = server().await; 478 Mock::given(method("GET")) 479 .and(path("/xrpc/sh.tangled.repo.blob")) 480 .respond_with(ResponseTemplate::new(503)) 481 .up_to_n_times(2) 482 .mount(&server) 483 .await; 484 Mock::given(method("GET")) 485 .and(path("/xrpc/sh.tangled.repo.blob")) 486 .respond_with( 487 ResponseTemplate::new(200) 488 .insert_header("content-type", "application/json") 489 .set_body_string("ok"), 490 ) 491 .mount(&server) 492 .await; 493 494 let proxy = proxy_for_test(); 495 let host = host_of(&server); 496 let _ = proxy 497 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 498 .await; 499 let _ = proxy 500 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 501 .await; 502 assert!(matches!( 503 proxy 504 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 505 .await, 506 Err(KnotProxyError::CircuitOpen), 507 )); 508 tokio::time::sleep(Duration::from_millis(120)).await; 509 let recovered = proxy 510 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 511 .await 512 .expect("must recover after cooldown"); 513 assert_eq!(recovered.status(), 200); 514 let body = drain(recovered.into_body_stream()).await.unwrap(); 515 assert_eq!(&body[..], b"ok"); 516 } 517 518 #[tokio::test] 519 async fn breakers_are_isolated_per_host() { 520 let bad = server().await; 521 let good = server().await; 522 Mock::given(method("GET")) 523 .and(path("/xrpc/sh.tangled.repo.blob")) 524 .respond_with(ResponseTemplate::new(503)) 525 .mount(&bad) 526 .await; 527 Mock::given(method("GET")) 528 .and(path("/xrpc/sh.tangled.repo.blob")) 529 .respond_with( 530 ResponseTemplate::new(200) 531 .insert_header("content-type", "application/json") 532 .set_body_string("ok"), 533 ) 534 .mount(&good) 535 .await; 536 537 let proxy = proxy_for_test(); 538 let bad_host = host_of(&bad); 539 let good_host = host_of(&good); 540 let _ = proxy 541 .forward( 542 &bad_host, 543 &nsid("sh.tangled.repo.blob"), 544 &[], 545 HeaderMap::new(), 546 ) 547 .await; 548 let _ = proxy 549 .forward( 550 &bad_host, 551 &nsid("sh.tangled.repo.blob"), 552 &[], 553 HeaderMap::new(), 554 ) 555 .await; 556 assert!(matches!( 557 proxy 558 .forward( 559 &bad_host, 560 &nsid("sh.tangled.repo.blob"), 561 &[], 562 HeaderMap::new() 563 ) 564 .await, 565 Err(KnotProxyError::CircuitOpen), 566 )); 567 let resp = proxy 568 .forward( 569 &good_host, 570 &nsid("sh.tangled.repo.blob"), 571 &[], 572 HeaderMap::new(), 573 ) 574 .await 575 .expect("healthy host stays open"); 576 assert_eq!(resp.status(), 200); 577 } 578 579 #[tokio::test] 580 async fn build_xrpc_url_appends_query() { 581 let host = KnotHost::parse("https://oyster.cafe").unwrap(); 582 let url = build_xrpc_url( 583 &host, 584 &nsid("sh.tangled.repo.tree"), 585 &[("repo", "did:plc:squid/barnacle"), ("ref", "main")], 586 ); 587 assert_eq!( 588 url.as_str(), 589 "https://oyster.cafe/xrpc/sh.tangled.repo.tree?repo=did%3Aplc%3Asquid%2Fbarnacle&ref=main", 590 ); 591 } 592 593 #[tokio::test] 594 async fn rejects_private_host_by_default() { 595 let server = server().await; 596 let strict = KnotProxyConfig { 597 allow_private_hosts: false, 598 ..config_for_test() 599 }; 600 let proxy = KnotProxy::new( 601 strict, 602 http_config_for_test(), 603 Arc::new(SystemClock::new()), 604 RuntimeHasher::default(), 605 ) 606 .unwrap(); 607 let err = proxy 608 .forward( 609 &host_of(&server), 610 &nsid("sh.tangled.repo.blob"), 611 &[], 612 HeaderMap::new(), 613 ) 614 .await 615 .expect_err("loopback must be blocked under strict config"); 616 assert!(matches!(err, KnotProxyError::BlockedHost { .. })); 617 } 618 619 #[tokio::test] 620 async fn rejects_plaintext_when_https_required() { 621 let server = server().await; 622 let strict = KnotProxyConfig { 623 require_https: true, 624 ..config_for_test() 625 }; 626 let proxy = KnotProxy::new( 627 strict, 628 http_config_for_test(), 629 Arc::new(SystemClock::new()), 630 RuntimeHasher::default(), 631 ) 632 .unwrap(); 633 let err = proxy 634 .forward( 635 &host_of(&server), 636 &nsid("sh.tangled.repo.blob"), 637 &[], 638 HeaderMap::new(), 639 ) 640 .await 641 .expect_err("plaintext http must be rejected under https-required"); 642 assert!( 643 matches!(err, KnotProxyError::PlaintextHttp { .. }), 644 "got {err:?}", 645 ); 646 } 647 648 #[tokio::test] 649 async fn transport_error_trips_breaker() { 650 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); 651 let addr = listener.local_addr().unwrap(); 652 drop(listener); 653 let dead = KnotHost::parse(&format!("http://{addr}")).unwrap(); 654 655 let proxy = proxy_for_test(); 656 let r1 = proxy 657 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 658 .await; 659 assert!( 660 r1.is_err(), 661 "transport must fail against closed port: {r1:?}" 662 ); 663 let r2 = proxy 664 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 665 .await; 666 assert!(r2.is_err(), "second transport must fail: {r2:?}"); 667 let r3 = proxy 668 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 669 .await; 670 assert!( 671 matches!(r3, Err(KnotProxyError::CircuitOpen)), 672 "transport failures must trip breaker, got {r3:?}", 673 ); 674 } 675 676 #[tokio::test] 677 async fn redirects_surface_as_upstream_failure() { 678 let primary = server().await; 679 let secondary = server().await; 680 Mock::given(method("GET")) 681 .and(path("/xrpc/sh.tangled.repo.blob")) 682 .respond_with( 683 ResponseTemplate::new(302) 684 .insert_header("location", &format!("{}/secret", secondary.uri())), 685 ) 686 .mount(&primary) 687 .await; 688 Mock::given(method("GET")) 689 .and(path("/secret")) 690 .respond_with(ResponseTemplate::new(200).set_body_string("leaked")) 691 .mount(&secondary) 692 .await; 693 694 let proxy = proxy_for_test(); 695 let err = proxy 696 .forward( 697 &host_of(&primary), 698 &nsid("sh.tangled.repo.blob"), 699 &[], 700 HeaderMap::new(), 701 ) 702 .await 703 .expect_err("302 must surface as upstream failure"); 704 assert!( 705 matches!(err, KnotProxyError::Upstream(s) if s.as_u16() == 302), 706 "got {err:?}", 707 ); 708 let received = secondary.received_requests().await.unwrap(); 709 assert!(received.is_empty(), "secondary must never be dialled"); 710 } 711 712 #[tokio::test] 713 async fn not_modified_passes_through() { 714 let server = server().await; 715 Mock::given(method("GET")) 716 .and(path("/xrpc/sh.tangled.repo.blob")) 717 .respond_with(ResponseTemplate::new(304).insert_header("etag", "\"v1\"")) 718 .mount(&server) 719 .await; 720 let proxy = proxy_for_test(); 721 let resp = proxy 722 .forward( 723 &host_of(&server), 724 &nsid("sh.tangled.repo.blob"), 725 &[], 726 HeaderMap::new(), 727 ) 728 .await 729 .expect("304 is a cache validator, not a redirect"); 730 assert_eq!(resp.status(), 304); 731 } 732 733 #[tokio::test] 734 async fn mid_stream_drop_records_breaker_failure() { 735 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); 736 let addr = listener.local_addr().unwrap(); 737 let server = tokio::spawn(async move { 738 async fn drop_after_partial(mut socket: tokio::net::TcpStream) { 739 let _ = socket 740 .write_all( 741 b"HTTP/1.1 200 OK\r\nContent-Length: 1024\r\nContent-Type: application/octet-stream\r\n\r\nabcd", 742 ) 743 .await; 744 drop(socket); 745 } 746 let admit = || async { 747 let (socket, _) = listener.accept().await.ok()?; 748 drop_after_partial(socket).await; 749 Some(()) 750 }; 751 admit().await; 752 admit().await; 753 }); 754 755 let host = KnotHost::parse(&format!("http://{addr}")).unwrap(); 756 let proxy = proxy_for_test(); 757 758 let r1 = proxy 759 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 760 .await 761 .expect("headers arrive even when body is truncated"); 762 assert_eq!(r1.status(), 200); 763 let _ = drain(r1.into_body_stream()).await; 764 765 let r2 = proxy 766 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 767 .await 768 .expect("second call still gets headers"); 769 let _ = drain(r2.into_body_stream()).await; 770 771 let r3 = proxy 772 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 773 .await; 774 assert!( 775 matches!(r3, Err(KnotProxyError::CircuitOpen)), 776 "two truncated streams must trip the breaker, got {r3:?}", 777 ); 778 server.abort(); 779 } 780}