This repository has no description
1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use futures::stream::{Stream, StreamExt};
7use http::{HeaderMap, StatusCode};
8use thiserror::Error;
9use tokio_tungstenite::tungstenite::{
10 Bytes as WsBytes, Message as TungsteniteMessage, protocol::CloseFrame as TungsteniteClose,
11 protocol::frame::coding::CloseCode as TungsteniteCloseCode,
12};
13use url::Url;
14
15#[derive(Debug, Error)]
16pub enum NetworkError {
17 #[error("connect: {0}")]
18 Connect(String),
19 #[error("timeout: {0}")]
20 Timeout(String),
21 #[error("redirect: {0}")]
22 Redirect(String),
23 #[error("transport: {0}")]
24 Transport(String),
25 #[error("body: {0}")]
26 Body(String),
27 #[error("protocol: {0}")]
28 Protocol(String),
29}
30
31pub struct HttpRequest {
32 pub url: Url,
33 pub headers: HeaderMap,
34}
35
36pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetworkError>> + Send + 'static>>;
37
38pub struct HttpResponseHead {
39 pub status: StatusCode,
40 pub headers: HeaderMap,
41 pub content_length: Option<u64>,
42 pub body: BodyStream,
43}
44
45pub type HttpResult = Result<HttpResponseHead, NetworkError>;
46pub type HttpResponseFuture = Pin<Box<dyn Future<Output = HttpResult> + Send + 'static>>;
47
48pub trait HttpTransport: Send + Sync + 'static {
49 fn execute(&self, request: HttpRequest) -> HttpResponseFuture;
50}
51
52#[derive(Clone, Debug)]
53pub struct ReqwestHttp {
54 client: reqwest::Client,
55}
56
57impl ReqwestHttp {
58 pub fn new(client: reqwest::Client) -> Self {
59 Self { client }
60 }
61
62 pub fn shared(client: reqwest::Client) -> Arc<dyn HttpTransport> {
63 Arc::new(Self::new(client))
64 }
65}
66
67impl HttpTransport for ReqwestHttp {
68 fn execute(&self, request: HttpRequest) -> HttpResponseFuture {
69 let client = self.client.clone();
70 Box::pin(async move {
71 let resp = client
72 .get(request.url)
73 .headers(request.headers)
74 .send()
75 .await
76 .map_err(map_reqwest)?;
77 let status = resp.status();
78 let headers = resp.headers().clone();
79 let content_length = resp.content_length();
80 let body: BodyStream = Box::pin(
81 resp.bytes_stream()
82 .map(|chunk| chunk.map_err(|e| NetworkError::Body(e.to_string()))),
83 );
84 Ok(HttpResponseHead {
85 status,
86 headers,
87 content_length,
88 body,
89 })
90 })
91 }
92}
93
94fn map_reqwest(err: reqwest::Error) -> NetworkError {
95 let msg = err.to_string();
96 if err.is_timeout() {
97 NetworkError::Timeout(msg)
98 } else if err.is_connect() {
99 NetworkError::Connect(msg)
100 } else if err.is_redirect() {
101 NetworkError::Redirect(msg)
102 } else {
103 NetworkError::Transport(msg)
104 }
105}
106
107#[derive(Clone, Debug)]
108pub enum WsMessage {
109 Text(String),
110 Binary(Bytes),
111 Ping(Bytes),
112 Pong(Bytes),
113 Close { code: u16, reason: String },
114}
115
116pub type WsSendFuture<'a> = Pin<Box<dyn Future<Output = Result<(), NetworkError>> + Send + 'a>>;
117pub type WsMessageFuture<'a> =
118 Pin<Box<dyn Future<Output = Option<Result<WsMessage, NetworkError>>> + Send + 'a>>;
119
120pub trait WsSink: Send + 'static {
121 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a>;
122}
123
124pub trait WsStream: Send + 'static {
125 fn next<'a>(&'a mut self) -> WsMessageFuture<'a>;
126}
127
128pub struct WsConn {
129 pub sink: Box<dyn WsSink>,
130 pub stream: Box<dyn WsStream>,
131}
132
133pub type WsConnectFuture =
134 Pin<Box<dyn Future<Output = Result<WsConn, NetworkError>> + Send + 'static>>;
135
136pub trait WsTransport: Send + Sync + 'static {
137 fn connect(&self, url: Url) -> WsConnectFuture;
138}
139
140#[derive(Clone, Copy, Debug, Default)]
141pub struct TungsteniteWs;
142
143impl TungsteniteWs {
144 pub fn shared() -> Arc<dyn WsTransport> {
145 Arc::new(Self)
146 }
147}
148
149impl WsTransport for TungsteniteWs {
150 fn connect(&self, url: Url) -> WsConnectFuture {
151 Box::pin(async move {
152 let url_str = url.as_str().to_owned();
153 let (ws, _resp) = tokio_tungstenite::connect_async(&url_str)
154 .await
155 .map_err(|e| NetworkError::Connect(e.to_string()))?;
156 let (sink_inner, stream_inner) = futures::StreamExt::split(ws);
157 let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner });
158 let stream: Box<dyn WsStream> = Box::new(TungsteniteStream {
159 inner: stream_inner,
160 });
161 Ok(WsConn { sink, stream })
162 })
163 }
164}
165
166type TungsteniteWsStream =
167 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
168
169struct TungsteniteSink {
170 inner: futures::stream::SplitSink<TungsteniteWsStream, TungsteniteMessage>,
171}
172
173impl WsSink for TungsteniteSink {
174 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> {
175 Box::pin(async move {
176 use futures::SinkExt;
177 self.inner
178 .send(message_to_tungstenite(message))
179 .await
180 .map_err(|e| NetworkError::Transport(e.to_string()))
181 })
182 }
183}
184
185struct TungsteniteStream {
186 inner: futures::stream::SplitStream<TungsteniteWsStream>,
187}
188
189impl WsStream for TungsteniteStream {
190 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> {
191 Box::pin(async move {
192 let item = StreamExt::next(&mut self.inner).await?;
193 Some(
194 item.map_err(|e| NetworkError::Transport(e.to_string()))
195 .and_then(message_from_tungstenite),
196 )
197 })
198 }
199}
200
201fn message_to_tungstenite(message: WsMessage) -> TungsteniteMessage {
202 match message {
203 WsMessage::Text(text) => TungsteniteMessage::Text(text.into()),
204 WsMessage::Binary(bytes) => TungsteniteMessage::Binary(WsBytes::copy_from_slice(&bytes)),
205 WsMessage::Ping(bytes) => TungsteniteMessage::Ping(WsBytes::copy_from_slice(&bytes)),
206 WsMessage::Pong(bytes) => TungsteniteMessage::Pong(WsBytes::copy_from_slice(&bytes)),
207 WsMessage::Close { code, reason } => TungsteniteMessage::Close(Some(TungsteniteClose {
208 code: TungsteniteCloseCode::from(code),
209 reason: reason.into(),
210 })),
211 }
212}
213
214fn message_from_tungstenite(message: TungsteniteMessage) -> Result<WsMessage, NetworkError> {
215 match message {
216 TungsteniteMessage::Text(t) => Ok(WsMessage::Text(t.to_string())),
217 TungsteniteMessage::Binary(b) => Ok(WsMessage::Binary(Bytes::copy_from_slice(&b))),
218 TungsteniteMessage::Ping(b) => Ok(WsMessage::Ping(Bytes::copy_from_slice(&b))),
219 TungsteniteMessage::Pong(b) => Ok(WsMessage::Pong(Bytes::copy_from_slice(&b))),
220 TungsteniteMessage::Close(close) => {
221 let (code, reason) = close
222 .map(|c| (u16::from(c.code), c.reason.to_string()))
223 .unwrap_or((1000, String::new()));
224 Ok(WsMessage::Close { code, reason })
225 }
226 TungsteniteMessage::Frame(_) => Err(NetworkError::Protocol(
227 "tungstenite raw frame surfaced unexpectedly".to_owned(),
228 )),
229 }
230}