Monorepo for Tangled tangled.org
10

Configure Feed

Select the types of activity you want to include in your feed.

at icy/kxpzqo 24 kB View raw
1use std::pin::Pin; 2use std::sync::Arc; 3use std::task::{Context, Poll}; 4use std::time::Duration; 5 6use bobbin_runtime::{ 7 BodyStream as InnerBodyStream, Clock, HttpRequest, HttpResponseHead, HttpTransport, 8 NetworkError, ReqwestHttp, RuntimeHasher, 9}; 10use bytes::Bytes; 11use futures::Stream; 12use http::{HeaderMap, StatusCode}; 13use jacquard_common::BosStr; 14use jacquard_common::types::nsid::Nsid; 15use reqwest::{Client, redirect::Policy}; 16use scc::HashMap as SccMap; 17use thiserror::Error; 18use url::Url; 19 20mod breaker; 21mod dns; 22mod host; 23 24pub use breaker::{Breaker, BreakerPermit, CircuitOpen, FailureThreshold, ThresholdError}; 25pub use host::{KnotHost, KnotHostError, PrivateHostReason, RepoSlug, RepoSlugError}; 26 27const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION")); 28const HTTPS_SCHEME: &str = "https"; 29 30#[derive(Clone, Debug)] 31pub struct KnotProxyConfig { 32 pub failure_threshold: FailureThreshold, 33 pub cooldown: Duration, 34 pub allow_private_hosts: bool, 35 pub require_https: bool, 36} 37 38impl Default for KnotProxyConfig { 39 fn default() -> Self { 40 Self { 41 failure_threshold: FailureThreshold::new(5).expect("nonzero literal"), 42 cooldown: Duration::from_secs(30), 43 allow_private_hosts: false, 44 require_https: true, 45 } 46 } 47} 48 49#[derive(Clone, Copy, Debug)] 50pub struct KnotHttpConfig { 51 pub connect_timeout: Duration, 52 pub read_timeout: Duration, 53} 54 55impl Default for KnotHttpConfig { 56 fn default() -> Self { 57 Self { 58 connect_timeout: Duration::from_secs(5), 59 read_timeout: Duration::from_secs(60), 60 } 61 } 62} 63 64#[derive(Debug, Error)] 65pub enum KnotProxyError { 66 #[error("circuit breaker open")] 67 CircuitOpen, 68 #[error("blocked: host {host} resolves to {reason} address space")] 69 BlockedHost { 70 host: String, 71 reason: PrivateHostReason, 72 }, 73 #[error("blocked: knot {host} requires https, got plaintext http")] 74 PlaintextHttp { host: String }, 75 #[error("connect failed: {0}")] 76 Connect(String), 77 #[error("upstream read timed out: {0}")] 78 Timeout(String), 79 #[error("redirect refused: {0}")] 80 Redirect(String), 81 #[error("transport: {0}")] 82 Transport(String), 83 #[error("upstream returned status {0}")] 84 Upstream(StatusCode), 85} 86 87pub struct KnotProxy { 88 http: Arc<dyn HttpTransport>, 89 breakers: SccMap<KnotHost, Arc<Breaker>, RuntimeHasher>, 90 threshold: FailureThreshold, 91 cooldown: Duration, 92 allow_private_hosts: bool, 93 require_https: bool, 94 clock: Arc<dyn Clock>, 95} 96 97impl KnotProxy { 98 pub fn new( 99 config: KnotProxyConfig, 100 http: KnotHttpConfig, 101 clock: Arc<dyn Clock>, 102 hasher: RuntimeHasher, 103 ) -> Result<Self, reqwest::Error> { 104 let resolver = Arc::new(dns::PrivateAddressFilter::new(config.allow_private_hosts)); 105 let client = Client::builder() 106 .user_agent(USER_AGENT) 107 .connect_timeout(http.connect_timeout) 108 .read_timeout(http.read_timeout) 109 .redirect(Policy::none()) 110 .no_gzip() 111 .no_brotli() 112 .no_deflate() 113 .dns_resolver(resolver) 114 .build()?; 115 Ok(Self::with_transport( 116 ReqwestHttp::shared(client), 117 config, 118 clock, 119 hasher, 120 )) 121 } 122 123 pub fn with_transport( 124 http: Arc<dyn HttpTransport>, 125 config: KnotProxyConfig, 126 clock: Arc<dyn Clock>, 127 hasher: RuntimeHasher, 128 ) -> Self { 129 Self { 130 http, 131 breakers: SccMap::with_hasher(hasher), 132 threshold: config.failure_threshold, 133 cooldown: config.cooldown, 134 allow_private_hosts: config.allow_private_hosts, 135 require_https: config.require_https, 136 clock, 137 } 138 } 139 140 pub fn allows_private_hosts(&self) -> bool { 141 self.allow_private_hosts 142 } 143 144 pub fn requires_https(&self) -> bool { 145 self.require_https 146 } 147 148 pub async fn forward<S: BosStr + AsRef<str>>( 149 &self, 150 host: &KnotHost, 151 nsid: &Nsid<S>, 152 query: &[(&str, &str)], 153 headers: HeaderMap, 154 ) -> Result<ProxyResponse, KnotProxyError> { 155 self.guard_host(host)?; 156 let breaker = self.breaker_for(host).await; 157 let permit = breaker 158 .try_acquire() 159 .map_err(|_: CircuitOpen| KnotProxyError::CircuitOpen)?; 160 let url = build_xrpc_url(host, nsid, query); 161 let outcome = self.http.execute(HttpRequest { url, headers }).await; 162 classify(outcome, permit) 163 } 164 165 fn guard_host(&self, host: &KnotHost) -> Result<(), KnotProxyError> { 166 let host_str = || host.url().host_str().unwrap_or_default().to_owned(); 167 if self.require_https && host.url().scheme() != HTTPS_SCHEME { 168 return Err(KnotProxyError::PlaintextHttp { host: host_str() }); 169 } 170 if self.allow_private_hosts { 171 return Ok(()); 172 } 173 match host.private_literal_reason() { 174 None => Ok(()), 175 Some(reason) => Err(KnotProxyError::BlockedHost { 176 host: host_str(), 177 reason, 178 }), 179 } 180 } 181 182 async fn breaker_for(&self, host: &KnotHost) -> Arc<Breaker> { 183 if let Some(existing) = self.breakers.read_async(host, |_, v| Arc::clone(v)).await { 184 return existing; 185 } 186 let entry = self.breakers.entry_async(host.clone()).await; 187 Arc::clone( 188 entry 189 .or_insert_with(|| { 190 Arc::new(Breaker::new( 191 self.threshold, 192 self.cooldown, 193 self.clock.clone(), 194 )) 195 }) 196 .get(), 197 ) 198 } 199} 200 201pub struct ProxyResponse { 202 status: StatusCode, 203 headers: HeaderMap, 204 body: InnerBodyStream, 205 permit: BreakerPermit, 206} 207 208impl std::fmt::Debug for ProxyResponse { 209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 210 f.debug_struct("ProxyResponse") 211 .field("status", &self.status) 212 .field("headers", &self.headers) 213 .finish_non_exhaustive() 214 } 215} 216 217impl ProxyResponse { 218 pub fn status(&self) -> StatusCode { 219 self.status 220 } 221 222 pub fn headers(&self) -> &HeaderMap { 223 &self.headers 224 } 225 226 pub fn into_body_stream(self) -> BodyStream { 227 BodyStream::new(self.body, self.permit) 228 } 229} 230 231pub struct BodyStream { 232 inner: InnerBodyStream, 233 permit: Option<BreakerPermit>, 234} 235 236impl BodyStream { 237 fn new(inner: InnerBodyStream, permit: BreakerPermit) -> Self { 238 Self { 239 inner, 240 permit: Some(permit), 241 } 242 } 243 244 fn resolve(&mut self, success: bool) { 245 if let Some(permit) = self.permit.take() { 246 if success { 247 permit.record_success(); 248 } else { 249 permit.record_failure(); 250 } 251 } 252 } 253} 254 255impl Stream for BodyStream { 256 type Item = Result<Bytes, NetworkError>; 257 258 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 259 let next = match self.inner.as_mut().poll_next(cx) { 260 Poll::Pending => return Poll::Pending, 261 Poll::Ready(item) => item, 262 }; 263 match &next { 264 Some(Ok(_)) => {} 265 Some(Err(_)) => self.resolve(false), 266 None => self.resolve(true), 267 } 268 Poll::Ready(next) 269 } 270} 271 272fn build_xrpc_url<S: BosStr + AsRef<str>>( 273 host: &KnotHost, 274 nsid: &Nsid<S>, 275 query: &[(&str, &str)], 276) -> Url { 277 let mut url = host.xrpc_url(nsid); 278 { 279 let mut pairs = url.query_pairs_mut(); 280 query.iter().for_each(|(k, v)| { 281 pairs.append_pair(k, v); 282 }); 283 } 284 url 285} 286 287fn classify( 288 outcome: Result<HttpResponseHead, NetworkError>, 289 permit: BreakerPermit, 290) -> Result<ProxyResponse, KnotProxyError> { 291 match outcome { 292 Ok(head) if is_upstream_failure(head.status) => { 293 let status = head.status; 294 permit.record_failure(); 295 Err(KnotProxyError::Upstream(status)) 296 } 297 Ok(head) => Ok(ProxyResponse { 298 status: head.status, 299 headers: head.headers, 300 body: head.body, 301 permit, 302 }), 303 Err(err) => { 304 permit.record_failure(); 305 Err(map_network(err)) 306 } 307 } 308} 309 310fn is_upstream_failure(status: StatusCode) -> bool { 311 status.is_server_error() || is_unfollowable_redirect(status) 312} 313 314fn is_unfollowable_redirect(status: StatusCode) -> bool { 315 matches!(status.as_u16(), 301 | 302 | 303 | 307 | 308) 316} 317 318fn map_network(err: NetworkError) -> KnotProxyError { 319 match err { 320 NetworkError::Timeout(msg) => KnotProxyError::Timeout(msg), 321 NetworkError::Connect(msg) => KnotProxyError::Connect(msg), 322 NetworkError::Redirect(msg) => KnotProxyError::Redirect(msg), 323 NetworkError::Transport(msg) | NetworkError::Body(msg) | NetworkError::Protocol(msg) => { 324 KnotProxyError::Transport(msg) 325 } 326 } 327} 328 329#[cfg(test)] 330mod tests { 331 use super::*; 332 use bobbin_runtime::SystemClock; 333 use futures::stream::TryStreamExt; 334 use jacquard_common::DefaultStr; 335 use tokio::io::AsyncWriteExt; 336 use wiremock::matchers::{method, path, query_param}; 337 use wiremock::{Mock, MockServer, ResponseTemplate}; 338 339 fn nsid(s: &'static str) -> Nsid<DefaultStr> { 340 Nsid::new_static(s).unwrap() 341 } 342 343 pub(crate) fn config_for_test() -> KnotProxyConfig { 344 KnotProxyConfig { 345 failure_threshold: FailureThreshold::new(2).unwrap(), 346 cooldown: Duration::from_millis(80), 347 allow_private_hosts: true, 348 require_https: false, 349 } 350 } 351 352 pub(crate) fn http_config_for_test() -> KnotHttpConfig { 353 KnotHttpConfig { 354 connect_timeout: Duration::from_millis(500), 355 read_timeout: Duration::from_secs(2), 356 } 357 } 358 359 fn proxy_for_test() -> KnotProxy { 360 KnotProxy::new( 361 config_for_test(), 362 http_config_for_test(), 363 Arc::new(SystemClock::new()), 364 RuntimeHasher::default(), 365 ) 366 .unwrap() 367 } 368 369 async fn server() -> MockServer { 370 MockServer::start().await 371 } 372 373 fn host_of(server: &MockServer) -> KnotHost { 374 KnotHost::parse(&server.uri()).unwrap() 375 } 376 377 pub(crate) async fn drain(stream: BodyStream) -> Result<Bytes, NetworkError> { 378 let chunks: Vec<Bytes> = stream.try_collect().await?; 379 let total: usize = chunks.iter().map(|b| b.len()).sum(); 380 let mut buf = bytes::BytesMut::with_capacity(total); 381 chunks.iter().for_each(|c| buf.extend_from_slice(c)); 382 Ok(buf.freeze()) 383 } 384 385 #[tokio::test] 386 async fn forwards_query_params_and_returns_body() { 387 let server = server().await; 388 Mock::given(method("GET")) 389 .and(path("/xrpc/sh.tangled.repo.blob")) 390 .and(query_param("repo", "did:plc:squid/barnacle")) 391 .and(query_param("ref", "main")) 392 .and(query_param("path", "README.md")) 393 .respond_with( 394 ResponseTemplate::new(200) 395 .insert_header("content-type", "application/json") 396 .set_body_string(r#"{"path":"README.md"}"#), 397 ) 398 .mount(&server) 399 .await; 400 401 let proxy = proxy_for_test(); 402 let resp = proxy 403 .forward( 404 &host_of(&server), 405 &nsid("sh.tangled.repo.blob"), 406 &[ 407 ("repo", "did:plc:squid/barnacle"), 408 ("ref", "main"), 409 ("path", "README.md"), 410 ], 411 HeaderMap::new(), 412 ) 413 .await 414 .expect("happy path"); 415 assert_eq!(resp.status(), 200); 416 let body = drain(resp.into_body_stream()).await.unwrap(); 417 assert_eq!(&body[..], br#"{"path":"README.md"}"#); 418 } 419 420 #[tokio::test] 421 async fn five_hundreds_open_breaker() { 422 let server = server().await; 423 Mock::given(method("GET")) 424 .and(path("/xrpc/sh.tangled.repo.blob")) 425 .respond_with(ResponseTemplate::new(503)) 426 .mount(&server) 427 .await; 428 429 let proxy = proxy_for_test(); 430 let host = host_of(&server); 431 let r1 = proxy 432 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 433 .await; 434 assert!(matches!(r1, Err(KnotProxyError::Upstream(_)))); 435 let r2 = proxy 436 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 437 .await; 438 assert!(matches!(r2, Err(KnotProxyError::Upstream(_)))); 439 let r3 = proxy 440 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 441 .await; 442 assert!(matches!(r3, Err(KnotProxyError::CircuitOpen))); 443 } 444 445 #[tokio::test] 446 async fn four_hundreds_do_not_open_breaker() { 447 let server = server().await; 448 Mock::given(method("GET")) 449 .and(path("/xrpc/sh.tangled.repo.blob")) 450 .respond_with(ResponseTemplate::new(404).set_body_string("not found")) 451 .mount(&server) 452 .await; 453 454 let proxy = proxy_for_test(); 455 let host = host_of(&server); 456 let r1 = proxy 457 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 458 .await; 459 assert_eq!(r1.unwrap().status(), 404); 460 let r2 = proxy 461 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 462 .await; 463 assert_eq!(r2.unwrap().status(), 404); 464 let r3 = proxy 465 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 466 .await; 467 assert_eq!( 468 r3.unwrap().status(), 469 404, 470 "client errors must not trip breaker", 471 ); 472 } 473 474 #[tokio::test] 475 async fn breaker_recovers_after_cooldown() { 476 let server = server().await; 477 Mock::given(method("GET")) 478 .and(path("/xrpc/sh.tangled.repo.blob")) 479 .respond_with(ResponseTemplate::new(503)) 480 .up_to_n_times(2) 481 .mount(&server) 482 .await; 483 Mock::given(method("GET")) 484 .and(path("/xrpc/sh.tangled.repo.blob")) 485 .respond_with( 486 ResponseTemplate::new(200) 487 .insert_header("content-type", "application/json") 488 .set_body_string("ok"), 489 ) 490 .mount(&server) 491 .await; 492 493 let proxy = proxy_for_test(); 494 let host = host_of(&server); 495 let _ = proxy 496 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 497 .await; 498 let _ = proxy 499 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 500 .await; 501 assert!(matches!( 502 proxy 503 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 504 .await, 505 Err(KnotProxyError::CircuitOpen), 506 )); 507 tokio::time::sleep(Duration::from_millis(120)).await; 508 let recovered = proxy 509 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 510 .await 511 .expect("must recover after cooldown"); 512 assert_eq!(recovered.status(), 200); 513 let body = drain(recovered.into_body_stream()).await.unwrap(); 514 assert_eq!(&body[..], b"ok"); 515 } 516 517 #[tokio::test] 518 async fn breakers_are_isolated_per_host() { 519 let bad = server().await; 520 let good = server().await; 521 Mock::given(method("GET")) 522 .and(path("/xrpc/sh.tangled.repo.blob")) 523 .respond_with(ResponseTemplate::new(503)) 524 .mount(&bad) 525 .await; 526 Mock::given(method("GET")) 527 .and(path("/xrpc/sh.tangled.repo.blob")) 528 .respond_with( 529 ResponseTemplate::new(200) 530 .insert_header("content-type", "application/json") 531 .set_body_string("ok"), 532 ) 533 .mount(&good) 534 .await; 535 536 let proxy = proxy_for_test(); 537 let bad_host = host_of(&bad); 538 let good_host = host_of(&good); 539 let _ = proxy 540 .forward( 541 &bad_host, 542 &nsid("sh.tangled.repo.blob"), 543 &[], 544 HeaderMap::new(), 545 ) 546 .await; 547 let _ = proxy 548 .forward( 549 &bad_host, 550 &nsid("sh.tangled.repo.blob"), 551 &[], 552 HeaderMap::new(), 553 ) 554 .await; 555 assert!(matches!( 556 proxy 557 .forward( 558 &bad_host, 559 &nsid("sh.tangled.repo.blob"), 560 &[], 561 HeaderMap::new() 562 ) 563 .await, 564 Err(KnotProxyError::CircuitOpen), 565 )); 566 let resp = proxy 567 .forward( 568 &good_host, 569 &nsid("sh.tangled.repo.blob"), 570 &[], 571 HeaderMap::new(), 572 ) 573 .await 574 .expect("healthy host stays open"); 575 assert_eq!(resp.status(), 200); 576 } 577 578 #[tokio::test] 579 async fn build_xrpc_url_appends_query() { 580 let host = KnotHost::parse("https://oyster.cafe").unwrap(); 581 let url = build_xrpc_url( 582 &host, 583 &nsid("sh.tangled.repo.tree"), 584 &[("repo", "did:plc:squid/barnacle"), ("ref", "main")], 585 ); 586 assert_eq!( 587 url.as_str(), 588 "https://oyster.cafe/xrpc/sh.tangled.repo.tree?repo=did%3Aplc%3Asquid%2Fbarnacle&ref=main", 589 ); 590 } 591 592 #[tokio::test] 593 async fn rejects_private_host_by_default() { 594 let server = server().await; 595 let strict = KnotProxyConfig { 596 allow_private_hosts: false, 597 ..config_for_test() 598 }; 599 let proxy = KnotProxy::new( 600 strict, 601 http_config_for_test(), 602 Arc::new(SystemClock::new()), 603 RuntimeHasher::default(), 604 ) 605 .unwrap(); 606 let err = proxy 607 .forward( 608 &host_of(&server), 609 &nsid("sh.tangled.repo.blob"), 610 &[], 611 HeaderMap::new(), 612 ) 613 .await 614 .expect_err("loopback must be blocked under strict config"); 615 assert!(matches!(err, KnotProxyError::BlockedHost { .. })); 616 } 617 618 #[tokio::test] 619 async fn rejects_plaintext_when_https_required() { 620 let server = server().await; 621 let strict = KnotProxyConfig { 622 require_https: true, 623 ..config_for_test() 624 }; 625 let proxy = KnotProxy::new( 626 strict, 627 http_config_for_test(), 628 Arc::new(SystemClock::new()), 629 RuntimeHasher::default(), 630 ) 631 .unwrap(); 632 let err = proxy 633 .forward( 634 &host_of(&server), 635 &nsid("sh.tangled.repo.blob"), 636 &[], 637 HeaderMap::new(), 638 ) 639 .await 640 .expect_err("plaintext http must be rejected under https-required"); 641 assert!( 642 matches!(err, KnotProxyError::PlaintextHttp { .. }), 643 "got {err:?}", 644 ); 645 } 646 647 #[tokio::test] 648 async fn transport_error_trips_breaker() { 649 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); 650 let addr = listener.local_addr().unwrap(); 651 drop(listener); 652 let dead = KnotHost::parse(&format!("http://{addr}")).unwrap(); 653 654 let proxy = proxy_for_test(); 655 let r1 = proxy 656 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 657 .await; 658 assert!( 659 r1.is_err(), 660 "transport must fail against closed port: {r1:?}" 661 ); 662 let r2 = proxy 663 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 664 .await; 665 assert!(r2.is_err(), "second transport must fail: {r2:?}"); 666 let r3 = proxy 667 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 668 .await; 669 assert!( 670 matches!(r3, Err(KnotProxyError::CircuitOpen)), 671 "transport failures must trip breaker, got {r3:?}", 672 ); 673 } 674 675 #[tokio::test] 676 async fn redirects_surface_as_upstream_failure() { 677 let primary = server().await; 678 let secondary = server().await; 679 Mock::given(method("GET")) 680 .and(path("/xrpc/sh.tangled.repo.blob")) 681 .respond_with( 682 ResponseTemplate::new(302) 683 .insert_header("location", &format!("{}/secret", secondary.uri())), 684 ) 685 .mount(&primary) 686 .await; 687 Mock::given(method("GET")) 688 .and(path("/secret")) 689 .respond_with(ResponseTemplate::new(200).set_body_string("leaked")) 690 .mount(&secondary) 691 .await; 692 693 let proxy = proxy_for_test(); 694 let err = proxy 695 .forward( 696 &host_of(&primary), 697 &nsid("sh.tangled.repo.blob"), 698 &[], 699 HeaderMap::new(), 700 ) 701 .await 702 .expect_err("302 must surface as upstream failure"); 703 assert!( 704 matches!(err, KnotProxyError::Upstream(s) if s.as_u16() == 302), 705 "got {err:?}", 706 ); 707 let received = secondary.received_requests().await.unwrap(); 708 assert!(received.is_empty(), "secondary must never be dialled"); 709 } 710 711 #[tokio::test] 712 async fn not_modified_passes_through() { 713 let server = server().await; 714 Mock::given(method("GET")) 715 .and(path("/xrpc/sh.tangled.repo.blob")) 716 .respond_with(ResponseTemplate::new(304).insert_header("etag", "\"v1\"")) 717 .mount(&server) 718 .await; 719 let proxy = proxy_for_test(); 720 let resp = proxy 721 .forward( 722 &host_of(&server), 723 &nsid("sh.tangled.repo.blob"), 724 &[], 725 HeaderMap::new(), 726 ) 727 .await 728 .expect("304 is a cache validator, not a redirect"); 729 assert_eq!(resp.status(), 304); 730 } 731 732 #[tokio::test] 733 async fn mid_stream_drop_records_breaker_failure() { 734 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); 735 let addr = listener.local_addr().unwrap(); 736 let server = tokio::spawn(async move { 737 async fn drop_after_partial(mut socket: tokio::net::TcpStream) { 738 let _ = socket 739 .write_all( 740 b"HTTP/1.1 200 OK\r\nContent-Length: 1024\r\nContent-Type: application/octet-stream\r\n\r\nabcd", 741 ) 742 .await; 743 drop(socket); 744 } 745 let admit = || async { 746 let (socket, _) = listener.accept().await.ok()?; 747 drop_after_partial(socket).await; 748 Some(()) 749 }; 750 admit().await; 751 admit().await; 752 }); 753 754 let host = KnotHost::parse(&format!("http://{addr}")).unwrap(); 755 let proxy = proxy_for_test(); 756 757 let r1 = proxy 758 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 759 .await 760 .expect("headers arrive even when body is truncated"); 761 assert_eq!(r1.status(), 200); 762 let _ = drain(r1.into_body_stream()).await; 763 764 let r2 = proxy 765 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 766 .await 767 .expect("second call still gets headers"); 768 let _ = drain(r2.into_body_stream()).await; 769 770 let r3 = proxy 771 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new()) 772 .await; 773 assert!( 774 matches!(r3, Err(KnotProxyError::CircuitOpen)), 775 "two truncated streams must trip the breaker, got {r3:?}", 776 ); 777 server.abort(); 778 } 779}