Monorepo for Tangled
tangled.org
1use std::future::Future;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use futures::stream::{Stream, StreamExt};
8use http::{HeaderMap, StatusCode};
9use thiserror::Error;
10use tokio_tungstenite::tungstenite::{
11 Bytes as WsBytes, Message as TungsteniteMessage, protocol::CloseFrame as TungsteniteClose,
12 protocol::frame::coding::CloseCode as TungsteniteCloseCode,
13};
14use url::Url;
15
16#[derive(Debug, Error)]
17pub enum NetworkError {
18 #[error("connect: {0}")]
19 Connect(String),
20 #[error("timeout: {0}")]
21 Timeout(String),
22 #[error("redirect: {0}")]
23 Redirect(String),
24 #[error("transport: {0}")]
25 Transport(String),
26 #[error("body: {0}")]
27 Body(String),
28 #[error("protocol: {0}")]
29 Protocol(String),
30}
31
32pub struct HttpRequest {
33 pub url: Url,
34 pub headers: HeaderMap,
35}
36
37pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetworkError>> + Send + 'static>>;
38
39pub struct HttpResponseHead {
40 pub status: StatusCode,
41 pub headers: HeaderMap,
42 pub content_length: Option<u64>,
43 pub body: BodyStream,
44}
45
46pub type HttpResult = Result<HttpResponseHead, NetworkError>;
47pub type HttpResponseFuture = Pin<Box<dyn Future<Output = HttpResult> + Send + 'static>>;
48
49pub trait HttpTransport: Send + Sync + 'static {
50 fn execute(&self, request: HttpRequest) -> HttpResponseFuture;
51}
52
53#[derive(Clone, Debug)]
54pub struct ReqwestHttp {
55 client: reqwest::Client,
56}
57
58impl ReqwestHttp {
59 pub fn new(client: reqwest::Client) -> Self {
60 Self { client }
61 }
62
63 pub fn shared(client: reqwest::Client) -> Arc<dyn HttpTransport> {
64 Arc::new(Self::new(client))
65 }
66}
67
68impl HttpTransport for ReqwestHttp {
69 fn execute(&self, request: HttpRequest) -> HttpResponseFuture {
70 let client = self.client.clone();
71 Box::pin(async move {
72 let resp = client
73 .get(request.url)
74 .headers(request.headers)
75 .send()
76 .await
77 .map_err(map_reqwest)?;
78 let status = resp.status();
79 let headers = resp.headers().clone();
80 let content_length = resp.content_length();
81 let body: BodyStream = Box::pin(
82 resp.bytes_stream()
83 .map(|chunk| chunk.map_err(|e| NetworkError::Body(e.to_string()))),
84 );
85 Ok(HttpResponseHead {
86 status,
87 headers,
88 content_length,
89 body,
90 })
91 })
92 }
93}
94
95fn map_reqwest(err: reqwest::Error) -> NetworkError {
96 let msg = err.to_string();
97 if err.is_timeout() {
98 NetworkError::Timeout(msg)
99 } else if err.is_connect() {
100 NetworkError::Connect(msg)
101 } else if err.is_redirect() {
102 NetworkError::Redirect(msg)
103 } else {
104 NetworkError::Transport(msg)
105 }
106}
107
108#[derive(Clone, Debug)]
109pub enum WsMessage {
110 Text(String),
111 Binary(Bytes),
112 Ping(Bytes),
113 Pong(Bytes),
114 Close { code: u16, reason: String },
115}
116
117pub type WsSendFuture<'a> = Pin<Box<dyn Future<Output = Result<(), NetworkError>> + Send + 'a>>;
118pub type WsMessageFuture<'a> =
119 Pin<Box<dyn Future<Output = Option<Result<WsMessage, NetworkError>>> + Send + 'a>>;
120
121pub trait WsSink: Send + 'static {
122 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a>;
123}
124
125pub trait WsStream: Send + 'static {
126 fn next<'a>(&'a mut self) -> WsMessageFuture<'a>;
127}
128
129pub struct WsConn {
130 pub sink: Box<dyn WsSink>,
131 pub stream: Box<dyn WsStream>,
132}
133
134pub type WsConnectFuture =
135 Pin<Box<dyn Future<Output = Result<WsConn, NetworkError>> + Send + 'static>>;
136
137pub trait WsTransport: Send + Sync + 'static {
138 fn connect(&self, url: Url) -> WsConnectFuture;
139}
140
141pub type AddrGuard = Arc<dyn Fn(&[SocketAddr]) -> Result<(), NetworkError> + Send + Sync>;
142
143#[derive(Clone, Copy, Debug, Default)]
144pub struct TungsteniteWs;
145
146impl TungsteniteWs {
147 pub fn shared() -> Arc<dyn WsTransport> {
148 Arc::new(Self)
149 }
150}
151
152impl WsTransport for TungsteniteWs {
153 fn connect(&self, url: Url) -> WsConnectFuture {
154 Box::pin(async move {
155 let url_str = url.as_str().to_owned();
156 let (ws, _resp) = tokio_tungstenite::connect_async(&url_str)
157 .await
158 .map_err(|e| NetworkError::Connect(e.to_string()))?;
159 let (sink_inner, stream_inner) = futures::StreamExt::split(ws);
160 let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner });
161 let stream: Box<dyn WsStream> = Box::new(TungsteniteStream {
162 inner: stream_inner,
163 });
164 Ok(WsConn { sink, stream })
165 })
166 }
167}
168
169pub struct GuardedWs {
170 guard: AddrGuard,
171}
172
173impl GuardedWs {
174 pub fn shared(guard: AddrGuard) -> Arc<dyn WsTransport> {
175 Arc::new(Self { guard })
176 }
177}
178
179impl WsTransport for GuardedWs {
180 fn connect(&self, url: Url) -> WsConnectFuture {
181 let guard = self.guard.clone();
182 Box::pin(async move {
183 let host = url
184 .host_str()
185 .ok_or_else(|| NetworkError::Connect("ws url missing host".to_owned()))?
186 .to_owned();
187 let port = url
188 .port_or_known_default()
189 .ok_or_else(|| NetworkError::Connect("ws url missing port".to_owned()))?;
190 let addrs: Vec<SocketAddr> = tokio::net::lookup_host((host.as_str(), port))
191 .await
192 .map_err(|e| NetworkError::Connect(e.to_string()))?
193 .collect();
194 guard(&addrs)?;
195 let addr = addrs
196 .into_iter()
197 .next()
198 .ok_or_else(|| NetworkError::Connect(format!("no addresses for {host}")))?;
199 let tcp = tokio::net::TcpStream::connect(addr)
200 .await
201 .map_err(|e| NetworkError::Connect(e.to_string()))?;
202 let (ws, _resp) = tokio_tungstenite::client_async_tls(url.as_str(), tcp)
203 .await
204 .map_err(|e| NetworkError::Connect(e.to_string()))?;
205 let (sink_inner, stream_inner) = futures::StreamExt::split(ws);
206 let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner });
207 let stream: Box<dyn WsStream> = Box::new(TungsteniteStream {
208 inner: stream_inner,
209 });
210 Ok(WsConn { sink, stream })
211 })
212 }
213}
214
215type TungsteniteWsStream =
216 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
217
218struct TungsteniteSink {
219 inner: futures::stream::SplitSink<TungsteniteWsStream, TungsteniteMessage>,
220}
221
222impl WsSink for TungsteniteSink {
223 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> {
224 Box::pin(async move {
225 use futures::SinkExt;
226 self.inner
227 .send(message_to_tungstenite(message))
228 .await
229 .map_err(|e| NetworkError::Transport(e.to_string()))
230 })
231 }
232}
233
234struct TungsteniteStream {
235 inner: futures::stream::SplitStream<TungsteniteWsStream>,
236}
237
238impl WsStream for TungsteniteStream {
239 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> {
240 Box::pin(async move {
241 let item = StreamExt::next(&mut self.inner).await?;
242 Some(
243 item.map_err(|e| NetworkError::Transport(e.to_string()))
244 .and_then(message_from_tungstenite),
245 )
246 })
247 }
248}
249
250fn message_to_tungstenite(message: WsMessage) -> TungsteniteMessage {
251 match message {
252 WsMessage::Text(text) => TungsteniteMessage::Text(text.into()),
253 WsMessage::Binary(bytes) => TungsteniteMessage::Binary(WsBytes::copy_from_slice(&bytes)),
254 WsMessage::Ping(bytes) => TungsteniteMessage::Ping(WsBytes::copy_from_slice(&bytes)),
255 WsMessage::Pong(bytes) => TungsteniteMessage::Pong(WsBytes::copy_from_slice(&bytes)),
256 WsMessage::Close { code, reason } => TungsteniteMessage::Close(Some(TungsteniteClose {
257 code: TungsteniteCloseCode::from(code),
258 reason: reason.into(),
259 })),
260 }
261}
262
263fn message_from_tungstenite(message: TungsteniteMessage) -> Result<WsMessage, NetworkError> {
264 match message {
265 TungsteniteMessage::Text(t) => Ok(WsMessage::Text(t.to_string())),
266 TungsteniteMessage::Binary(b) => Ok(WsMessage::Binary(Bytes::copy_from_slice(&b))),
267 TungsteniteMessage::Ping(b) => Ok(WsMessage::Ping(Bytes::copy_from_slice(&b))),
268 TungsteniteMessage::Pong(b) => Ok(WsMessage::Pong(Bytes::copy_from_slice(&b))),
269 TungsteniteMessage::Close(close) => {
270 let (code, reason) = close
271 .map(|c| (u16::from(c.code), c.reason.to_string()))
272 .unwrap_or((1000, String::new()));
273 Ok(WsMessage::Close { code, reason })
274 }
275 TungsteniteMessage::Frame(_) => Err(NetworkError::Protocol(
276 "tungstenite raw frame surfaced unexpectedly".to_owned(),
277 )),
278 }
279}