Monorepo for Tangled
tangled.org
1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use bobbin_runtime::{
7 BodyStream as InnerBodyStream, Clock, HttpRequest, HttpResponseHead, HttpTransport,
8 NetworkError, ReqwestHttp, RuntimeHasher,
9};
10use bytes::Bytes;
11use futures::Stream;
12use http::{HeaderMap, StatusCode};
13use jacquard_common::BosStr;
14use jacquard_common::types::nsid::Nsid;
15use reqwest::{Client, redirect::Policy};
16use scc::HashMap as SccMap;
17use thiserror::Error;
18use url::Url;
19
20mod breaker;
21mod dns;
22mod host;
23
24pub use breaker::{Breaker, BreakerPermit, CircuitOpen, FailureThreshold, ThresholdError};
25pub use dns::PrivateAddressFilter;
26pub use host::{KnotHost, KnotHostError, PrivateHostReason, RepoSlug, RepoSlugError, classify_ip};
27
28const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION"));
29const HTTPS_SCHEME: &str = "https";
30
31#[derive(Clone, Debug)]
32pub struct KnotProxyConfig {
33 pub failure_threshold: FailureThreshold,
34 pub cooldown: Duration,
35 pub allow_private_hosts: bool,
36 pub require_https: bool,
37}
38
39impl Default for KnotProxyConfig {
40 fn default() -> Self {
41 Self {
42 failure_threshold: FailureThreshold::new(5).expect("nonzero literal"),
43 cooldown: Duration::from_secs(30),
44 allow_private_hosts: false,
45 require_https: true,
46 }
47 }
48}
49
50#[derive(Clone, Copy, Debug)]
51pub struct KnotHttpConfig {
52 pub connect_timeout: Duration,
53 pub read_timeout: Duration,
54}
55
56impl Default for KnotHttpConfig {
57 fn default() -> Self {
58 Self {
59 connect_timeout: Duration::from_secs(5),
60 read_timeout: Duration::from_secs(60),
61 }
62 }
63}
64
65#[derive(Debug, Error)]
66pub enum KnotProxyError {
67 #[error("circuit breaker open")]
68 CircuitOpen,
69 #[error("blocked: host {host} resolves to {reason} address space")]
70 BlockedHost {
71 host: String,
72 reason: PrivateHostReason,
73 },
74 #[error("blocked: knot {host} requires https, got plaintext http")]
75 PlaintextHttp { host: String },
76 #[error("connect failed: {0}")]
77 Connect(String),
78 #[error("upstream read timed out: {0}")]
79 Timeout(String),
80 #[error("redirect refused: {0}")]
81 Redirect(String),
82 #[error("transport: {0}")]
83 Transport(String),
84 #[error("upstream returned status {0}")]
85 Upstream(StatusCode),
86}
87
88pub struct KnotProxy {
89 http: Arc<dyn HttpTransport>,
90 breakers: SccMap<KnotHost, Arc<Breaker>, RuntimeHasher>,
91 threshold: FailureThreshold,
92 cooldown: Duration,
93 allow_private_hosts: bool,
94 require_https: bool,
95 clock: Arc<dyn Clock>,
96}
97
98impl KnotProxy {
99 pub fn new(
100 config: KnotProxyConfig,
101 http: KnotHttpConfig,
102 clock: Arc<dyn Clock>,
103 hasher: RuntimeHasher,
104 ) -> Result<Self, reqwest::Error> {
105 let resolver = Arc::new(dns::PrivateAddressFilter::new(config.allow_private_hosts));
106 let client = Client::builder()
107 .user_agent(USER_AGENT)
108 .connect_timeout(http.connect_timeout)
109 .read_timeout(http.read_timeout)
110 .redirect(Policy::none())
111 .no_gzip()
112 .no_brotli()
113 .no_deflate()
114 .dns_resolver(resolver)
115 .build()?;
116 Ok(Self::with_transport(
117 ReqwestHttp::shared(client),
118 config,
119 clock,
120 hasher,
121 ))
122 }
123
124 pub fn with_transport(
125 http: Arc<dyn HttpTransport>,
126 config: KnotProxyConfig,
127 clock: Arc<dyn Clock>,
128 hasher: RuntimeHasher,
129 ) -> Self {
130 Self {
131 http,
132 breakers: SccMap::with_hasher(hasher),
133 threshold: config.failure_threshold,
134 cooldown: config.cooldown,
135 allow_private_hosts: config.allow_private_hosts,
136 require_https: config.require_https,
137 clock,
138 }
139 }
140
141 pub fn allows_private_hosts(&self) -> bool {
142 self.allow_private_hosts
143 }
144
145 pub fn requires_https(&self) -> bool {
146 self.require_https
147 }
148
149 pub async fn forward<S: BosStr + AsRef<str>>(
150 &self,
151 host: &KnotHost,
152 nsid: &Nsid<S>,
153 query: &[(&str, &str)],
154 headers: HeaderMap,
155 ) -> Result<ProxyResponse, KnotProxyError> {
156 self.guard_host(host)?;
157 let breaker = self.breaker_for(host).await;
158 let permit = breaker
159 .try_acquire()
160 .map_err(|_: CircuitOpen| KnotProxyError::CircuitOpen)?;
161 let url = build_xrpc_url(host, nsid, query);
162 let outcome = self.http.execute(HttpRequest { url, headers }).await;
163 classify(outcome, permit)
164 }
165
166 fn guard_host(&self, host: &KnotHost) -> Result<(), KnotProxyError> {
167 let host_str = || host.url().host_str().unwrap_or_default().to_owned();
168 if self.require_https && host.url().scheme() != HTTPS_SCHEME {
169 return Err(KnotProxyError::PlaintextHttp { host: host_str() });
170 }
171 if self.allow_private_hosts {
172 return Ok(());
173 }
174 match host.private_literal_reason() {
175 None => Ok(()),
176 Some(reason) => Err(KnotProxyError::BlockedHost {
177 host: host_str(),
178 reason,
179 }),
180 }
181 }
182
183 async fn breaker_for(&self, host: &KnotHost) -> Arc<Breaker> {
184 if let Some(existing) = self.breakers.read_async(host, |_, v| Arc::clone(v)).await {
185 return existing;
186 }
187 let entry = self.breakers.entry_async(host.clone()).await;
188 Arc::clone(
189 entry
190 .or_insert_with(|| {
191 Arc::new(Breaker::new(
192 self.threshold,
193 self.cooldown,
194 self.clock.clone(),
195 ))
196 })
197 .get(),
198 )
199 }
200}
201
202pub struct ProxyResponse {
203 status: StatusCode,
204 headers: HeaderMap,
205 body: InnerBodyStream,
206 permit: BreakerPermit,
207}
208
209impl std::fmt::Debug for ProxyResponse {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("ProxyResponse")
212 .field("status", &self.status)
213 .field("headers", &self.headers)
214 .finish_non_exhaustive()
215 }
216}
217
218impl ProxyResponse {
219 pub fn status(&self) -> StatusCode {
220 self.status
221 }
222
223 pub fn headers(&self) -> &HeaderMap {
224 &self.headers
225 }
226
227 pub fn into_body_stream(self) -> BodyStream {
228 BodyStream::new(self.body, self.permit)
229 }
230}
231
232pub struct BodyStream {
233 inner: InnerBodyStream,
234 permit: Option<BreakerPermit>,
235}
236
237impl BodyStream {
238 fn new(inner: InnerBodyStream, permit: BreakerPermit) -> Self {
239 Self {
240 inner,
241 permit: Some(permit),
242 }
243 }
244
245 fn resolve(&mut self, success: bool) {
246 if let Some(permit) = self.permit.take() {
247 if success {
248 permit.record_success();
249 } else {
250 permit.record_failure();
251 }
252 }
253 }
254}
255
256impl Stream for BodyStream {
257 type Item = Result<Bytes, NetworkError>;
258
259 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
260 let next = match self.inner.as_mut().poll_next(cx) {
261 Poll::Pending => return Poll::Pending,
262 Poll::Ready(item) => item,
263 };
264 match &next {
265 Some(Ok(_)) => {}
266 Some(Err(_)) => self.resolve(false),
267 None => self.resolve(true),
268 }
269 Poll::Ready(next)
270 }
271}
272
273fn build_xrpc_url<S: BosStr + AsRef<str>>(
274 host: &KnotHost,
275 nsid: &Nsid<S>,
276 query: &[(&str, &str)],
277) -> Url {
278 let mut url = host.xrpc_url(nsid);
279 {
280 let mut pairs = url.query_pairs_mut();
281 query.iter().for_each(|(k, v)| {
282 pairs.append_pair(k, v);
283 });
284 }
285 url
286}
287
288fn classify(
289 outcome: Result<HttpResponseHead, NetworkError>,
290 permit: BreakerPermit,
291) -> Result<ProxyResponse, KnotProxyError> {
292 match outcome {
293 Ok(head) if is_upstream_failure(head.status) => {
294 let status = head.status;
295 permit.record_failure();
296 Err(KnotProxyError::Upstream(status))
297 }
298 Ok(head) => Ok(ProxyResponse {
299 status: head.status,
300 headers: head.headers,
301 body: head.body,
302 permit,
303 }),
304 Err(err) => {
305 permit.record_failure();
306 Err(map_network(err))
307 }
308 }
309}
310
311fn is_upstream_failure(status: StatusCode) -> bool {
312 status.is_server_error() || is_unfollowable_redirect(status)
313}
314
315fn is_unfollowable_redirect(status: StatusCode) -> bool {
316 matches!(status.as_u16(), 301 | 302 | 303 | 307 | 308)
317}
318
319fn map_network(err: NetworkError) -> KnotProxyError {
320 match err {
321 NetworkError::Timeout(msg) => KnotProxyError::Timeout(msg),
322 NetworkError::Connect(msg) => KnotProxyError::Connect(msg),
323 NetworkError::Redirect(msg) => KnotProxyError::Redirect(msg),
324 NetworkError::Transport(msg) | NetworkError::Body(msg) | NetworkError::Protocol(msg) => {
325 KnotProxyError::Transport(msg)
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use bobbin_runtime::SystemClock;
334 use futures::stream::TryStreamExt;
335 use jacquard_common::DefaultStr;
336 use tokio::io::AsyncWriteExt;
337 use wiremock::matchers::{method, path, query_param};
338 use wiremock::{Mock, MockServer, ResponseTemplate};
339
340 fn nsid(s: &'static str) -> Nsid<DefaultStr> {
341 Nsid::new_static(s).unwrap()
342 }
343
344 pub(crate) fn config_for_test() -> KnotProxyConfig {
345 KnotProxyConfig {
346 failure_threshold: FailureThreshold::new(2).unwrap(),
347 cooldown: Duration::from_millis(80),
348 allow_private_hosts: true,
349 require_https: false,
350 }
351 }
352
353 pub(crate) fn http_config_for_test() -> KnotHttpConfig {
354 KnotHttpConfig {
355 connect_timeout: Duration::from_millis(500),
356 read_timeout: Duration::from_secs(2),
357 }
358 }
359
360 fn proxy_for_test() -> KnotProxy {
361 KnotProxy::new(
362 config_for_test(),
363 http_config_for_test(),
364 Arc::new(SystemClock::new()),
365 RuntimeHasher::default(),
366 )
367 .unwrap()
368 }
369
370 async fn server() -> MockServer {
371 MockServer::start().await
372 }
373
374 fn host_of(server: &MockServer) -> KnotHost {
375 KnotHost::parse(&server.uri()).unwrap()
376 }
377
378 pub(crate) async fn drain(stream: BodyStream) -> Result<Bytes, NetworkError> {
379 let chunks: Vec<Bytes> = stream.try_collect().await?;
380 let total: usize = chunks.iter().map(|b| b.len()).sum();
381 let mut buf = bytes::BytesMut::with_capacity(total);
382 chunks.iter().for_each(|c| buf.extend_from_slice(c));
383 Ok(buf.freeze())
384 }
385
386 #[tokio::test]
387 async fn forwards_query_params_and_returns_body() {
388 let server = server().await;
389 Mock::given(method("GET"))
390 .and(path("/xrpc/sh.tangled.repo.blob"))
391 .and(query_param("repo", "did:plc:squid/barnacle"))
392 .and(query_param("ref", "main"))
393 .and(query_param("path", "README.md"))
394 .respond_with(
395 ResponseTemplate::new(200)
396 .insert_header("content-type", "application/json")
397 .set_body_string(r#"{"path":"README.md"}"#),
398 )
399 .mount(&server)
400 .await;
401
402 let proxy = proxy_for_test();
403 let resp = proxy
404 .forward(
405 &host_of(&server),
406 &nsid("sh.tangled.repo.blob"),
407 &[
408 ("repo", "did:plc:squid/barnacle"),
409 ("ref", "main"),
410 ("path", "README.md"),
411 ],
412 HeaderMap::new(),
413 )
414 .await
415 .expect("happy path");
416 assert_eq!(resp.status(), 200);
417 let body = drain(resp.into_body_stream()).await.unwrap();
418 assert_eq!(&body[..], br#"{"path":"README.md"}"#);
419 }
420
421 #[tokio::test]
422 async fn five_hundreds_open_breaker() {
423 let server = server().await;
424 Mock::given(method("GET"))
425 .and(path("/xrpc/sh.tangled.repo.blob"))
426 .respond_with(ResponseTemplate::new(503))
427 .mount(&server)
428 .await;
429
430 let proxy = proxy_for_test();
431 let host = host_of(&server);
432 let r1 = proxy
433 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
434 .await;
435 assert!(matches!(r1, Err(KnotProxyError::Upstream(_))));
436 let r2 = proxy
437 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
438 .await;
439 assert!(matches!(r2, Err(KnotProxyError::Upstream(_))));
440 let r3 = proxy
441 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
442 .await;
443 assert!(matches!(r3, Err(KnotProxyError::CircuitOpen)));
444 }
445
446 #[tokio::test]
447 async fn four_hundreds_do_not_open_breaker() {
448 let server = server().await;
449 Mock::given(method("GET"))
450 .and(path("/xrpc/sh.tangled.repo.blob"))
451 .respond_with(ResponseTemplate::new(404).set_body_string("not found"))
452 .mount(&server)
453 .await;
454
455 let proxy = proxy_for_test();
456 let host = host_of(&server);
457 let r1 = proxy
458 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
459 .await;
460 assert_eq!(r1.unwrap().status(), 404);
461 let r2 = proxy
462 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
463 .await;
464 assert_eq!(r2.unwrap().status(), 404);
465 let r3 = proxy
466 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
467 .await;
468 assert_eq!(
469 r3.unwrap().status(),
470 404,
471 "client errors must not trip breaker",
472 );
473 }
474
475 #[tokio::test]
476 async fn breaker_recovers_after_cooldown() {
477 let server = server().await;
478 Mock::given(method("GET"))
479 .and(path("/xrpc/sh.tangled.repo.blob"))
480 .respond_with(ResponseTemplate::new(503))
481 .up_to_n_times(2)
482 .mount(&server)
483 .await;
484 Mock::given(method("GET"))
485 .and(path("/xrpc/sh.tangled.repo.blob"))
486 .respond_with(
487 ResponseTemplate::new(200)
488 .insert_header("content-type", "application/json")
489 .set_body_string("ok"),
490 )
491 .mount(&server)
492 .await;
493
494 let proxy = proxy_for_test();
495 let host = host_of(&server);
496 let _ = proxy
497 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
498 .await;
499 let _ = proxy
500 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
501 .await;
502 assert!(matches!(
503 proxy
504 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
505 .await,
506 Err(KnotProxyError::CircuitOpen),
507 ));
508 tokio::time::sleep(Duration::from_millis(120)).await;
509 let recovered = proxy
510 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
511 .await
512 .expect("must recover after cooldown");
513 assert_eq!(recovered.status(), 200);
514 let body = drain(recovered.into_body_stream()).await.unwrap();
515 assert_eq!(&body[..], b"ok");
516 }
517
518 #[tokio::test]
519 async fn breakers_are_isolated_per_host() {
520 let bad = server().await;
521 let good = server().await;
522 Mock::given(method("GET"))
523 .and(path("/xrpc/sh.tangled.repo.blob"))
524 .respond_with(ResponseTemplate::new(503))
525 .mount(&bad)
526 .await;
527 Mock::given(method("GET"))
528 .and(path("/xrpc/sh.tangled.repo.blob"))
529 .respond_with(
530 ResponseTemplate::new(200)
531 .insert_header("content-type", "application/json")
532 .set_body_string("ok"),
533 )
534 .mount(&good)
535 .await;
536
537 let proxy = proxy_for_test();
538 let bad_host = host_of(&bad);
539 let good_host = host_of(&good);
540 let _ = proxy
541 .forward(
542 &bad_host,
543 &nsid("sh.tangled.repo.blob"),
544 &[],
545 HeaderMap::new(),
546 )
547 .await;
548 let _ = proxy
549 .forward(
550 &bad_host,
551 &nsid("sh.tangled.repo.blob"),
552 &[],
553 HeaderMap::new(),
554 )
555 .await;
556 assert!(matches!(
557 proxy
558 .forward(
559 &bad_host,
560 &nsid("sh.tangled.repo.blob"),
561 &[],
562 HeaderMap::new()
563 )
564 .await,
565 Err(KnotProxyError::CircuitOpen),
566 ));
567 let resp = proxy
568 .forward(
569 &good_host,
570 &nsid("sh.tangled.repo.blob"),
571 &[],
572 HeaderMap::new(),
573 )
574 .await
575 .expect("healthy host stays open");
576 assert_eq!(resp.status(), 200);
577 }
578
579 #[tokio::test]
580 async fn build_xrpc_url_appends_query() {
581 let host = KnotHost::parse("https://oyster.cafe").unwrap();
582 let url = build_xrpc_url(
583 &host,
584 &nsid("sh.tangled.repo.tree"),
585 &[("repo", "did:plc:squid/barnacle"), ("ref", "main")],
586 );
587 assert_eq!(
588 url.as_str(),
589 "https://oyster.cafe/xrpc/sh.tangled.repo.tree?repo=did%3Aplc%3Asquid%2Fbarnacle&ref=main",
590 );
591 }
592
593 #[tokio::test]
594 async fn rejects_private_host_by_default() {
595 let server = server().await;
596 let strict = KnotProxyConfig {
597 allow_private_hosts: false,
598 ..config_for_test()
599 };
600 let proxy = KnotProxy::new(
601 strict,
602 http_config_for_test(),
603 Arc::new(SystemClock::new()),
604 RuntimeHasher::default(),
605 )
606 .unwrap();
607 let err = proxy
608 .forward(
609 &host_of(&server),
610 &nsid("sh.tangled.repo.blob"),
611 &[],
612 HeaderMap::new(),
613 )
614 .await
615 .expect_err("loopback must be blocked under strict config");
616 assert!(matches!(err, KnotProxyError::BlockedHost { .. }));
617 }
618
619 #[tokio::test]
620 async fn rejects_plaintext_when_https_required() {
621 let server = server().await;
622 let strict = KnotProxyConfig {
623 require_https: true,
624 ..config_for_test()
625 };
626 let proxy = KnotProxy::new(
627 strict,
628 http_config_for_test(),
629 Arc::new(SystemClock::new()),
630 RuntimeHasher::default(),
631 )
632 .unwrap();
633 let err = proxy
634 .forward(
635 &host_of(&server),
636 &nsid("sh.tangled.repo.blob"),
637 &[],
638 HeaderMap::new(),
639 )
640 .await
641 .expect_err("plaintext http must be rejected under https-required");
642 assert!(
643 matches!(err, KnotProxyError::PlaintextHttp { .. }),
644 "got {err:?}",
645 );
646 }
647
648 #[tokio::test]
649 async fn transport_error_trips_breaker() {
650 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
651 let addr = listener.local_addr().unwrap();
652 drop(listener);
653 let dead = KnotHost::parse(&format!("http://{addr}")).unwrap();
654
655 let proxy = proxy_for_test();
656 let r1 = proxy
657 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
658 .await;
659 assert!(
660 r1.is_err(),
661 "transport must fail against closed port: {r1:?}"
662 );
663 let r2 = proxy
664 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
665 .await;
666 assert!(r2.is_err(), "second transport must fail: {r2:?}");
667 let r3 = proxy
668 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
669 .await;
670 assert!(
671 matches!(r3, Err(KnotProxyError::CircuitOpen)),
672 "transport failures must trip breaker, got {r3:?}",
673 );
674 }
675
676 #[tokio::test]
677 async fn redirects_surface_as_upstream_failure() {
678 let primary = server().await;
679 let secondary = server().await;
680 Mock::given(method("GET"))
681 .and(path("/xrpc/sh.tangled.repo.blob"))
682 .respond_with(
683 ResponseTemplate::new(302)
684 .insert_header("location", &format!("{}/secret", secondary.uri())),
685 )
686 .mount(&primary)
687 .await;
688 Mock::given(method("GET"))
689 .and(path("/secret"))
690 .respond_with(ResponseTemplate::new(200).set_body_string("leaked"))
691 .mount(&secondary)
692 .await;
693
694 let proxy = proxy_for_test();
695 let err = proxy
696 .forward(
697 &host_of(&primary),
698 &nsid("sh.tangled.repo.blob"),
699 &[],
700 HeaderMap::new(),
701 )
702 .await
703 .expect_err("302 must surface as upstream failure");
704 assert!(
705 matches!(err, KnotProxyError::Upstream(s) if s.as_u16() == 302),
706 "got {err:?}",
707 );
708 let received = secondary.received_requests().await.unwrap();
709 assert!(received.is_empty(), "secondary must never be dialled");
710 }
711
712 #[tokio::test]
713 async fn not_modified_passes_through() {
714 let server = server().await;
715 Mock::given(method("GET"))
716 .and(path("/xrpc/sh.tangled.repo.blob"))
717 .respond_with(ResponseTemplate::new(304).insert_header("etag", "\"v1\""))
718 .mount(&server)
719 .await;
720 let proxy = proxy_for_test();
721 let resp = proxy
722 .forward(
723 &host_of(&server),
724 &nsid("sh.tangled.repo.blob"),
725 &[],
726 HeaderMap::new(),
727 )
728 .await
729 .expect("304 is a cache validator, not a redirect");
730 assert_eq!(resp.status(), 304);
731 }
732
733 #[tokio::test]
734 async fn mid_stream_drop_records_breaker_failure() {
735 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
736 let addr = listener.local_addr().unwrap();
737 let server = tokio::spawn(async move {
738 async fn drop_after_partial(mut socket: tokio::net::TcpStream) {
739 let _ = socket
740 .write_all(
741 b"HTTP/1.1 200 OK\r\nContent-Length: 1024\r\nContent-Type: application/octet-stream\r\n\r\nabcd",
742 )
743 .await;
744 drop(socket);
745 }
746 let admit = || async {
747 let (socket, _) = listener.accept().await.ok()?;
748 drop_after_partial(socket).await;
749 Some(())
750 };
751 admit().await;
752 admit().await;
753 });
754
755 let host = KnotHost::parse(&format!("http://{addr}")).unwrap();
756 let proxy = proxy_for_test();
757
758 let r1 = proxy
759 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
760 .await
761 .expect("headers arrive even when body is truncated");
762 assert_eq!(r1.status(), 200);
763 let _ = drain(r1.into_body_stream()).await;
764
765 let r2 = proxy
766 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
767 .await
768 .expect("second call still gets headers");
769 let _ = drain(r2.into_body_stream()).await;
770
771 let r3 = proxy
772 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
773 .await;
774 assert!(
775 matches!(r3, Err(KnotProxyError::CircuitOpen)),
776 "two truncated streams must trip the breaker, got {r3:?}",
777 );
778 server.abort();
779 }
780}