Monorepo for Tangled
tangled.org
1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use bobbin_runtime::{
7 BodyStream as InnerBodyStream, Clock, HttpRequest, HttpResponseHead, HttpTransport,
8 NetworkError, ReqwestHttp, RuntimeHasher,
9};
10use bytes::Bytes;
11use futures::Stream;
12use http::{HeaderMap, StatusCode};
13use jacquard_common::BosStr;
14use jacquard_common::types::nsid::Nsid;
15use reqwest::{Client, redirect::Policy};
16use scc::HashMap as SccMap;
17use thiserror::Error;
18use url::Url;
19
20mod breaker;
21mod dns;
22mod host;
23
24pub use breaker::{Breaker, BreakerPermit, CircuitOpen, FailureThreshold, ThresholdError};
25pub use host::{KnotHost, KnotHostError, PrivateHostReason, RepoSlug, RepoSlugError};
26
27const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION"));
28const HTTPS_SCHEME: &str = "https";
29
30#[derive(Clone, Debug)]
31pub struct KnotProxyConfig {
32 pub failure_threshold: FailureThreshold,
33 pub cooldown: Duration,
34 pub allow_private_hosts: bool,
35 pub require_https: bool,
36}
37
38impl Default for KnotProxyConfig {
39 fn default() -> Self {
40 Self {
41 failure_threshold: FailureThreshold::new(5).expect("nonzero literal"),
42 cooldown: Duration::from_secs(30),
43 allow_private_hosts: false,
44 require_https: true,
45 }
46 }
47}
48
49#[derive(Clone, Copy, Debug)]
50pub struct KnotHttpConfig {
51 pub connect_timeout: Duration,
52 pub read_timeout: Duration,
53}
54
55impl Default for KnotHttpConfig {
56 fn default() -> Self {
57 Self {
58 connect_timeout: Duration::from_secs(5),
59 read_timeout: Duration::from_secs(60),
60 }
61 }
62}
63
64#[derive(Debug, Error)]
65pub enum KnotProxyError {
66 #[error("circuit breaker open")]
67 CircuitOpen,
68 #[error("blocked: host {host} resolves to {reason} address space")]
69 BlockedHost {
70 host: String,
71 reason: PrivateHostReason,
72 },
73 #[error("blocked: knot {host} requires https, got plaintext http")]
74 PlaintextHttp { host: String },
75 #[error("connect failed: {0}")]
76 Connect(String),
77 #[error("upstream read timed out: {0}")]
78 Timeout(String),
79 #[error("redirect refused: {0}")]
80 Redirect(String),
81 #[error("transport: {0}")]
82 Transport(String),
83 #[error("upstream returned status {0}")]
84 Upstream(StatusCode),
85}
86
87pub struct KnotProxy {
88 http: Arc<dyn HttpTransport>,
89 breakers: SccMap<KnotHost, Arc<Breaker>, RuntimeHasher>,
90 threshold: FailureThreshold,
91 cooldown: Duration,
92 allow_private_hosts: bool,
93 require_https: bool,
94 clock: Arc<dyn Clock>,
95}
96
97impl KnotProxy {
98 pub fn new(
99 config: KnotProxyConfig,
100 http: KnotHttpConfig,
101 clock: Arc<dyn Clock>,
102 hasher: RuntimeHasher,
103 ) -> Result<Self, reqwest::Error> {
104 let resolver = Arc::new(dns::PrivateAddressFilter::new(config.allow_private_hosts));
105 let client = Client::builder()
106 .user_agent(USER_AGENT)
107 .connect_timeout(http.connect_timeout)
108 .read_timeout(http.read_timeout)
109 .redirect(Policy::none())
110 .no_gzip()
111 .no_brotli()
112 .no_deflate()
113 .dns_resolver(resolver)
114 .build()?;
115 Ok(Self::with_transport(
116 ReqwestHttp::shared(client),
117 config,
118 clock,
119 hasher,
120 ))
121 }
122
123 pub fn with_transport(
124 http: Arc<dyn HttpTransport>,
125 config: KnotProxyConfig,
126 clock: Arc<dyn Clock>,
127 hasher: RuntimeHasher,
128 ) -> Self {
129 Self {
130 http,
131 breakers: SccMap::with_hasher(hasher),
132 threshold: config.failure_threshold,
133 cooldown: config.cooldown,
134 allow_private_hosts: config.allow_private_hosts,
135 require_https: config.require_https,
136 clock,
137 }
138 }
139
140 pub fn allows_private_hosts(&self) -> bool {
141 self.allow_private_hosts
142 }
143
144 pub fn requires_https(&self) -> bool {
145 self.require_https
146 }
147
148 pub async fn forward<S: BosStr + AsRef<str>>(
149 &self,
150 host: &KnotHost,
151 nsid: &Nsid<S>,
152 query: &[(&str, &str)],
153 headers: HeaderMap,
154 ) -> Result<ProxyResponse, KnotProxyError> {
155 self.guard_host(host)?;
156 let breaker = self.breaker_for(host).await;
157 let permit = breaker
158 .try_acquire()
159 .map_err(|_: CircuitOpen| KnotProxyError::CircuitOpen)?;
160 let url = build_xrpc_url(host, nsid, query);
161 let outcome = self.http.execute(HttpRequest { url, headers }).await;
162 classify(outcome, permit)
163 }
164
165 fn guard_host(&self, host: &KnotHost) -> Result<(), KnotProxyError> {
166 let host_str = || host.url().host_str().unwrap_or_default().to_owned();
167 if self.require_https && host.url().scheme() != HTTPS_SCHEME {
168 return Err(KnotProxyError::PlaintextHttp { host: host_str() });
169 }
170 if self.allow_private_hosts {
171 return Ok(());
172 }
173 match host.private_literal_reason() {
174 None => Ok(()),
175 Some(reason) => Err(KnotProxyError::BlockedHost {
176 host: host_str(),
177 reason,
178 }),
179 }
180 }
181
182 async fn breaker_for(&self, host: &KnotHost) -> Arc<Breaker> {
183 if let Some(existing) = self.breakers.read_async(host, |_, v| Arc::clone(v)).await {
184 return existing;
185 }
186 let entry = self.breakers.entry_async(host.clone()).await;
187 Arc::clone(
188 entry
189 .or_insert_with(|| {
190 Arc::new(Breaker::new(
191 self.threshold,
192 self.cooldown,
193 self.clock.clone(),
194 ))
195 })
196 .get(),
197 )
198 }
199}
200
201pub struct ProxyResponse {
202 status: StatusCode,
203 headers: HeaderMap,
204 body: InnerBodyStream,
205 permit: BreakerPermit,
206}
207
208impl std::fmt::Debug for ProxyResponse {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("ProxyResponse")
211 .field("status", &self.status)
212 .field("headers", &self.headers)
213 .finish_non_exhaustive()
214 }
215}
216
217impl ProxyResponse {
218 pub fn status(&self) -> StatusCode {
219 self.status
220 }
221
222 pub fn headers(&self) -> &HeaderMap {
223 &self.headers
224 }
225
226 pub fn into_body_stream(self) -> BodyStream {
227 BodyStream::new(self.body, self.permit)
228 }
229}
230
231pub struct BodyStream {
232 inner: InnerBodyStream,
233 permit: Option<BreakerPermit>,
234}
235
236impl BodyStream {
237 fn new(inner: InnerBodyStream, permit: BreakerPermit) -> Self {
238 Self {
239 inner,
240 permit: Some(permit),
241 }
242 }
243
244 fn resolve(&mut self, success: bool) {
245 if let Some(permit) = self.permit.take() {
246 if success {
247 permit.record_success();
248 } else {
249 permit.record_failure();
250 }
251 }
252 }
253}
254
255impl Stream for BodyStream {
256 type Item = Result<Bytes, NetworkError>;
257
258 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
259 let next = match self.inner.as_mut().poll_next(cx) {
260 Poll::Pending => return Poll::Pending,
261 Poll::Ready(item) => item,
262 };
263 match &next {
264 Some(Ok(_)) => {}
265 Some(Err(_)) => self.resolve(false),
266 None => self.resolve(true),
267 }
268 Poll::Ready(next)
269 }
270}
271
272fn build_xrpc_url<S: BosStr + AsRef<str>>(
273 host: &KnotHost,
274 nsid: &Nsid<S>,
275 query: &[(&str, &str)],
276) -> Url {
277 let mut url = host.xrpc_url(nsid);
278 {
279 let mut pairs = url.query_pairs_mut();
280 query.iter().for_each(|(k, v)| {
281 pairs.append_pair(k, v);
282 });
283 }
284 url
285}
286
287fn classify(
288 outcome: Result<HttpResponseHead, NetworkError>,
289 permit: BreakerPermit,
290) -> Result<ProxyResponse, KnotProxyError> {
291 match outcome {
292 Ok(head) if is_upstream_failure(head.status) => {
293 let status = head.status;
294 permit.record_failure();
295 Err(KnotProxyError::Upstream(status))
296 }
297 Ok(head) => Ok(ProxyResponse {
298 status: head.status,
299 headers: head.headers,
300 body: head.body,
301 permit,
302 }),
303 Err(err) => {
304 permit.record_failure();
305 Err(map_network(err))
306 }
307 }
308}
309
310fn is_upstream_failure(status: StatusCode) -> bool {
311 status.is_server_error() || is_unfollowable_redirect(status)
312}
313
314fn is_unfollowable_redirect(status: StatusCode) -> bool {
315 matches!(status.as_u16(), 301 | 302 | 303 | 307 | 308)
316}
317
318fn map_network(err: NetworkError) -> KnotProxyError {
319 match err {
320 NetworkError::Timeout(msg) => KnotProxyError::Timeout(msg),
321 NetworkError::Connect(msg) => KnotProxyError::Connect(msg),
322 NetworkError::Redirect(msg) => KnotProxyError::Redirect(msg),
323 NetworkError::Transport(msg) | NetworkError::Body(msg) | NetworkError::Protocol(msg) => {
324 KnotProxyError::Transport(msg)
325 }
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use bobbin_runtime::SystemClock;
333 use futures::stream::TryStreamExt;
334 use jacquard_common::DefaultStr;
335 use tokio::io::AsyncWriteExt;
336 use wiremock::matchers::{method, path, query_param};
337 use wiremock::{Mock, MockServer, ResponseTemplate};
338
339 fn nsid(s: &'static str) -> Nsid<DefaultStr> {
340 Nsid::new_static(s).unwrap()
341 }
342
343 pub(crate) fn config_for_test() -> KnotProxyConfig {
344 KnotProxyConfig {
345 failure_threshold: FailureThreshold::new(2).unwrap(),
346 cooldown: Duration::from_millis(80),
347 allow_private_hosts: true,
348 require_https: false,
349 }
350 }
351
352 pub(crate) fn http_config_for_test() -> KnotHttpConfig {
353 KnotHttpConfig {
354 connect_timeout: Duration::from_millis(500),
355 read_timeout: Duration::from_secs(2),
356 }
357 }
358
359 fn proxy_for_test() -> KnotProxy {
360 KnotProxy::new(
361 config_for_test(),
362 http_config_for_test(),
363 Arc::new(SystemClock::new()),
364 RuntimeHasher::default(),
365 )
366 .unwrap()
367 }
368
369 async fn server() -> MockServer {
370 MockServer::start().await
371 }
372
373 fn host_of(server: &MockServer) -> KnotHost {
374 KnotHost::parse(&server.uri()).unwrap()
375 }
376
377 pub(crate) async fn drain(stream: BodyStream) -> Result<Bytes, NetworkError> {
378 let chunks: Vec<Bytes> = stream.try_collect().await?;
379 let total: usize = chunks.iter().map(|b| b.len()).sum();
380 let mut buf = bytes::BytesMut::with_capacity(total);
381 chunks.iter().for_each(|c| buf.extend_from_slice(c));
382 Ok(buf.freeze())
383 }
384
385 #[tokio::test]
386 async fn forwards_query_params_and_returns_body() {
387 let server = server().await;
388 Mock::given(method("GET"))
389 .and(path("/xrpc/sh.tangled.repo.blob"))
390 .and(query_param("repo", "did:plc:squid/barnacle"))
391 .and(query_param("ref", "main"))
392 .and(query_param("path", "README.md"))
393 .respond_with(
394 ResponseTemplate::new(200)
395 .insert_header("content-type", "application/json")
396 .set_body_string(r#"{"path":"README.md"}"#),
397 )
398 .mount(&server)
399 .await;
400
401 let proxy = proxy_for_test();
402 let resp = proxy
403 .forward(
404 &host_of(&server),
405 &nsid("sh.tangled.repo.blob"),
406 &[
407 ("repo", "did:plc:squid/barnacle"),
408 ("ref", "main"),
409 ("path", "README.md"),
410 ],
411 HeaderMap::new(),
412 )
413 .await
414 .expect("happy path");
415 assert_eq!(resp.status(), 200);
416 let body = drain(resp.into_body_stream()).await.unwrap();
417 assert_eq!(&body[..], br#"{"path":"README.md"}"#);
418 }
419
420 #[tokio::test]
421 async fn five_hundreds_open_breaker() {
422 let server = server().await;
423 Mock::given(method("GET"))
424 .and(path("/xrpc/sh.tangled.repo.blob"))
425 .respond_with(ResponseTemplate::new(503))
426 .mount(&server)
427 .await;
428
429 let proxy = proxy_for_test();
430 let host = host_of(&server);
431 let r1 = proxy
432 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
433 .await;
434 assert!(matches!(r1, Err(KnotProxyError::Upstream(_))));
435 let r2 = proxy
436 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
437 .await;
438 assert!(matches!(r2, Err(KnotProxyError::Upstream(_))));
439 let r3 = proxy
440 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
441 .await;
442 assert!(matches!(r3, Err(KnotProxyError::CircuitOpen)));
443 }
444
445 #[tokio::test]
446 async fn four_hundreds_do_not_open_breaker() {
447 let server = server().await;
448 Mock::given(method("GET"))
449 .and(path("/xrpc/sh.tangled.repo.blob"))
450 .respond_with(ResponseTemplate::new(404).set_body_string("not found"))
451 .mount(&server)
452 .await;
453
454 let proxy = proxy_for_test();
455 let host = host_of(&server);
456 let r1 = proxy
457 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
458 .await;
459 assert_eq!(r1.unwrap().status(), 404);
460 let r2 = proxy
461 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
462 .await;
463 assert_eq!(r2.unwrap().status(), 404);
464 let r3 = proxy
465 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
466 .await;
467 assert_eq!(
468 r3.unwrap().status(),
469 404,
470 "client errors must not trip breaker",
471 );
472 }
473
474 #[tokio::test]
475 async fn breaker_recovers_after_cooldown() {
476 let server = server().await;
477 Mock::given(method("GET"))
478 .and(path("/xrpc/sh.tangled.repo.blob"))
479 .respond_with(ResponseTemplate::new(503))
480 .up_to_n_times(2)
481 .mount(&server)
482 .await;
483 Mock::given(method("GET"))
484 .and(path("/xrpc/sh.tangled.repo.blob"))
485 .respond_with(
486 ResponseTemplate::new(200)
487 .insert_header("content-type", "application/json")
488 .set_body_string("ok"),
489 )
490 .mount(&server)
491 .await;
492
493 let proxy = proxy_for_test();
494 let host = host_of(&server);
495 let _ = proxy
496 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
497 .await;
498 let _ = proxy
499 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
500 .await;
501 assert!(matches!(
502 proxy
503 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
504 .await,
505 Err(KnotProxyError::CircuitOpen),
506 ));
507 tokio::time::sleep(Duration::from_millis(120)).await;
508 let recovered = proxy
509 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
510 .await
511 .expect("must recover after cooldown");
512 assert_eq!(recovered.status(), 200);
513 let body = drain(recovered.into_body_stream()).await.unwrap();
514 assert_eq!(&body[..], b"ok");
515 }
516
517 #[tokio::test]
518 async fn breakers_are_isolated_per_host() {
519 let bad = server().await;
520 let good = server().await;
521 Mock::given(method("GET"))
522 .and(path("/xrpc/sh.tangled.repo.blob"))
523 .respond_with(ResponseTemplate::new(503))
524 .mount(&bad)
525 .await;
526 Mock::given(method("GET"))
527 .and(path("/xrpc/sh.tangled.repo.blob"))
528 .respond_with(
529 ResponseTemplate::new(200)
530 .insert_header("content-type", "application/json")
531 .set_body_string("ok"),
532 )
533 .mount(&good)
534 .await;
535
536 let proxy = proxy_for_test();
537 let bad_host = host_of(&bad);
538 let good_host = host_of(&good);
539 let _ = proxy
540 .forward(
541 &bad_host,
542 &nsid("sh.tangled.repo.blob"),
543 &[],
544 HeaderMap::new(),
545 )
546 .await;
547 let _ = proxy
548 .forward(
549 &bad_host,
550 &nsid("sh.tangled.repo.blob"),
551 &[],
552 HeaderMap::new(),
553 )
554 .await;
555 assert!(matches!(
556 proxy
557 .forward(
558 &bad_host,
559 &nsid("sh.tangled.repo.blob"),
560 &[],
561 HeaderMap::new()
562 )
563 .await,
564 Err(KnotProxyError::CircuitOpen),
565 ));
566 let resp = proxy
567 .forward(
568 &good_host,
569 &nsid("sh.tangled.repo.blob"),
570 &[],
571 HeaderMap::new(),
572 )
573 .await
574 .expect("healthy host stays open");
575 assert_eq!(resp.status(), 200);
576 }
577
578 #[tokio::test]
579 async fn build_xrpc_url_appends_query() {
580 let host = KnotHost::parse("https://oyster.cafe").unwrap();
581 let url = build_xrpc_url(
582 &host,
583 &nsid("sh.tangled.repo.tree"),
584 &[("repo", "did:plc:squid/barnacle"), ("ref", "main")],
585 );
586 assert_eq!(
587 url.as_str(),
588 "https://oyster.cafe/xrpc/sh.tangled.repo.tree?repo=did%3Aplc%3Asquid%2Fbarnacle&ref=main",
589 );
590 }
591
592 #[tokio::test]
593 async fn rejects_private_host_by_default() {
594 let server = server().await;
595 let strict = KnotProxyConfig {
596 allow_private_hosts: false,
597 ..config_for_test()
598 };
599 let proxy = KnotProxy::new(
600 strict,
601 http_config_for_test(),
602 Arc::new(SystemClock::new()),
603 RuntimeHasher::default(),
604 )
605 .unwrap();
606 let err = proxy
607 .forward(
608 &host_of(&server),
609 &nsid("sh.tangled.repo.blob"),
610 &[],
611 HeaderMap::new(),
612 )
613 .await
614 .expect_err("loopback must be blocked under strict config");
615 assert!(matches!(err, KnotProxyError::BlockedHost { .. }));
616 }
617
618 #[tokio::test]
619 async fn rejects_plaintext_when_https_required() {
620 let server = server().await;
621 let strict = KnotProxyConfig {
622 require_https: true,
623 ..config_for_test()
624 };
625 let proxy = KnotProxy::new(
626 strict,
627 http_config_for_test(),
628 Arc::new(SystemClock::new()),
629 RuntimeHasher::default(),
630 )
631 .unwrap();
632 let err = proxy
633 .forward(
634 &host_of(&server),
635 &nsid("sh.tangled.repo.blob"),
636 &[],
637 HeaderMap::new(),
638 )
639 .await
640 .expect_err("plaintext http must be rejected under https-required");
641 assert!(
642 matches!(err, KnotProxyError::PlaintextHttp { .. }),
643 "got {err:?}",
644 );
645 }
646
647 #[tokio::test]
648 async fn transport_error_trips_breaker() {
649 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
650 let addr = listener.local_addr().unwrap();
651 drop(listener);
652 let dead = KnotHost::parse(&format!("http://{addr}")).unwrap();
653
654 let proxy = proxy_for_test();
655 let r1 = proxy
656 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
657 .await;
658 assert!(
659 r1.is_err(),
660 "transport must fail against closed port: {r1:?}"
661 );
662 let r2 = proxy
663 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
664 .await;
665 assert!(r2.is_err(), "second transport must fail: {r2:?}");
666 let r3 = proxy
667 .forward(&dead, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
668 .await;
669 assert!(
670 matches!(r3, Err(KnotProxyError::CircuitOpen)),
671 "transport failures must trip breaker, got {r3:?}",
672 );
673 }
674
675 #[tokio::test]
676 async fn redirects_surface_as_upstream_failure() {
677 let primary = server().await;
678 let secondary = server().await;
679 Mock::given(method("GET"))
680 .and(path("/xrpc/sh.tangled.repo.blob"))
681 .respond_with(
682 ResponseTemplate::new(302)
683 .insert_header("location", &format!("{}/secret", secondary.uri())),
684 )
685 .mount(&primary)
686 .await;
687 Mock::given(method("GET"))
688 .and(path("/secret"))
689 .respond_with(ResponseTemplate::new(200).set_body_string("leaked"))
690 .mount(&secondary)
691 .await;
692
693 let proxy = proxy_for_test();
694 let err = proxy
695 .forward(
696 &host_of(&primary),
697 &nsid("sh.tangled.repo.blob"),
698 &[],
699 HeaderMap::new(),
700 )
701 .await
702 .expect_err("302 must surface as upstream failure");
703 assert!(
704 matches!(err, KnotProxyError::Upstream(s) if s.as_u16() == 302),
705 "got {err:?}",
706 );
707 let received = secondary.received_requests().await.unwrap();
708 assert!(received.is_empty(), "secondary must never be dialled");
709 }
710
711 #[tokio::test]
712 async fn not_modified_passes_through() {
713 let server = server().await;
714 Mock::given(method("GET"))
715 .and(path("/xrpc/sh.tangled.repo.blob"))
716 .respond_with(ResponseTemplate::new(304).insert_header("etag", "\"v1\""))
717 .mount(&server)
718 .await;
719 let proxy = proxy_for_test();
720 let resp = proxy
721 .forward(
722 &host_of(&server),
723 &nsid("sh.tangled.repo.blob"),
724 &[],
725 HeaderMap::new(),
726 )
727 .await
728 .expect("304 is a cache validator, not a redirect");
729 assert_eq!(resp.status(), 304);
730 }
731
732 #[tokio::test]
733 async fn mid_stream_drop_records_breaker_failure() {
734 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
735 let addr = listener.local_addr().unwrap();
736 let server = tokio::spawn(async move {
737 async fn drop_after_partial(mut socket: tokio::net::TcpStream) {
738 let _ = socket
739 .write_all(
740 b"HTTP/1.1 200 OK\r\nContent-Length: 1024\r\nContent-Type: application/octet-stream\r\n\r\nabcd",
741 )
742 .await;
743 drop(socket);
744 }
745 let admit = || async {
746 let (socket, _) = listener.accept().await.ok()?;
747 drop_after_partial(socket).await;
748 Some(())
749 };
750 admit().await;
751 admit().await;
752 });
753
754 let host = KnotHost::parse(&format!("http://{addr}")).unwrap();
755 let proxy = proxy_for_test();
756
757 let r1 = proxy
758 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
759 .await
760 .expect("headers arrive even when body is truncated");
761 assert_eq!(r1.status(), 200);
762 let _ = drain(r1.into_body_stream()).await;
763
764 let r2 = proxy
765 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
766 .await
767 .expect("second call still gets headers");
768 let _ = drain(r2.into_body_stream()).await;
769
770 let r3 = proxy
771 .forward(&host, &nsid("sh.tangled.repo.blob"), &[], HeaderMap::new())
772 .await;
773 assert!(
774 matches!(r3, Err(KnotProxyError::CircuitOpen)),
775 "two truncated streams must trip the breaker, got {r3:?}",
776 );
777 server.abort();
778 }
779}