Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

at master 9.0 kB View raw
1use std::future::Future; 2use std::net::SocketAddr; 3use std::pin::Pin; 4use std::sync::Arc; 5 6use bytes::Bytes; 7use futures::stream::{Stream, StreamExt}; 8use http::{HeaderMap, StatusCode}; 9use thiserror::Error; 10use tokio_tungstenite::tungstenite::{ 11 Bytes as WsBytes, Message as TungsteniteMessage, protocol::CloseFrame as TungsteniteClose, 12 protocol::frame::coding::CloseCode as TungsteniteCloseCode, 13}; 14use url::Url; 15 16#[derive(Debug, Error)] 17pub enum NetworkError { 18 #[error("connect: {0}")] 19 Connect(String), 20 #[error("timeout: {0}")] 21 Timeout(String), 22 #[error("redirect: {0}")] 23 Redirect(String), 24 #[error("transport: {0}")] 25 Transport(String), 26 #[error("body: {0}")] 27 Body(String), 28 #[error("protocol: {0}")] 29 Protocol(String), 30} 31 32pub struct HttpRequest { 33 pub url: Url, 34 pub headers: HeaderMap, 35} 36 37pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetworkError>> + Send + 'static>>; 38 39pub struct HttpResponseHead { 40 pub status: StatusCode, 41 pub headers: HeaderMap, 42 pub content_length: Option<u64>, 43 pub body: BodyStream, 44} 45 46pub type HttpResult = Result<HttpResponseHead, NetworkError>; 47pub type HttpResponseFuture = Pin<Box<dyn Future<Output = HttpResult> + Send + 'static>>; 48 49pub trait HttpTransport: Send + Sync + 'static { 50 fn execute(&self, request: HttpRequest) -> HttpResponseFuture; 51} 52 53#[derive(Clone, Debug)] 54pub struct ReqwestHttp { 55 client: reqwest::Client, 56} 57 58impl ReqwestHttp { 59 pub fn new(client: reqwest::Client) -> Self { 60 Self { client } 61 } 62 63 pub fn shared(client: reqwest::Client) -> Arc<dyn HttpTransport> { 64 Arc::new(Self::new(client)) 65 } 66} 67 68impl HttpTransport for ReqwestHttp { 69 fn execute(&self, request: HttpRequest) -> HttpResponseFuture { 70 let client = self.client.clone(); 71 Box::pin(async move { 72 let resp = client 73 .get(request.url) 74 .headers(request.headers) 75 .send() 76 .await 77 .map_err(map_reqwest)?; 78 let status = resp.status(); 79 let headers = resp.headers().clone(); 80 let content_length = resp.content_length(); 81 let body: BodyStream = Box::pin( 82 resp.bytes_stream() 83 .map(|chunk| chunk.map_err(|e| NetworkError::Body(e.to_string()))), 84 ); 85 Ok(HttpResponseHead { 86 status, 87 headers, 88 content_length, 89 body, 90 }) 91 }) 92 } 93} 94 95fn map_reqwest(err: reqwest::Error) -> NetworkError { 96 let msg = err.to_string(); 97 if err.is_timeout() { 98 NetworkError::Timeout(msg) 99 } else if err.is_connect() { 100 NetworkError::Connect(msg) 101 } else if err.is_redirect() { 102 NetworkError::Redirect(msg) 103 } else { 104 NetworkError::Transport(msg) 105 } 106} 107 108#[derive(Clone, Debug)] 109pub enum WsMessage { 110 Text(String), 111 Binary(Bytes), 112 Ping(Bytes), 113 Pong(Bytes), 114 Close { code: u16, reason: String }, 115} 116 117pub type WsSendFuture<'a> = Pin<Box<dyn Future<Output = Result<(), NetworkError>> + Send + 'a>>; 118pub type WsMessageFuture<'a> = 119 Pin<Box<dyn Future<Output = Option<Result<WsMessage, NetworkError>>> + Send + 'a>>; 120 121pub trait WsSink: Send + 'static { 122 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a>; 123} 124 125pub trait WsStream: Send + 'static { 126 fn next<'a>(&'a mut self) -> WsMessageFuture<'a>; 127} 128 129pub struct WsConn { 130 pub sink: Box<dyn WsSink>, 131 pub stream: Box<dyn WsStream>, 132} 133 134pub type WsConnectFuture = 135 Pin<Box<dyn Future<Output = Result<WsConn, NetworkError>> + Send + 'static>>; 136 137pub trait WsTransport: Send + Sync + 'static { 138 fn connect(&self, url: Url) -> WsConnectFuture; 139} 140 141pub type AddrGuard = Arc<dyn Fn(&[SocketAddr]) -> Result<(), NetworkError> + Send + Sync>; 142 143#[derive(Clone, Copy, Debug, Default)] 144pub struct TungsteniteWs; 145 146impl TungsteniteWs { 147 pub fn shared() -> Arc<dyn WsTransport> { 148 Arc::new(Self) 149 } 150} 151 152impl WsTransport for TungsteniteWs { 153 fn connect(&self, url: Url) -> WsConnectFuture { 154 Box::pin(async move { 155 let url_str = url.as_str().to_owned(); 156 let (ws, _resp) = tokio_tungstenite::connect_async(&url_str) 157 .await 158 .map_err(|e| NetworkError::Connect(e.to_string()))?; 159 let (sink_inner, stream_inner) = futures::StreamExt::split(ws); 160 let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner }); 161 let stream: Box<dyn WsStream> = Box::new(TungsteniteStream { 162 inner: stream_inner, 163 }); 164 Ok(WsConn { sink, stream }) 165 }) 166 } 167} 168 169pub struct GuardedWs { 170 guard: AddrGuard, 171} 172 173impl GuardedWs { 174 pub fn shared(guard: AddrGuard) -> Arc<dyn WsTransport> { 175 Arc::new(Self { guard }) 176 } 177} 178 179impl WsTransport for GuardedWs { 180 fn connect(&self, url: Url) -> WsConnectFuture { 181 let guard = self.guard.clone(); 182 Box::pin(async move { 183 let host = url 184 .host_str() 185 .ok_or_else(|| NetworkError::Connect("ws url missing host".to_owned()))? 186 .to_owned(); 187 let port = url 188 .port_or_known_default() 189 .ok_or_else(|| NetworkError::Connect("ws url missing port".to_owned()))?; 190 let addrs: Vec<SocketAddr> = tokio::net::lookup_host((host.as_str(), port)) 191 .await 192 .map_err(|e| NetworkError::Connect(e.to_string()))? 193 .collect(); 194 guard(&addrs)?; 195 let addr = addrs 196 .into_iter() 197 .next() 198 .ok_or_else(|| NetworkError::Connect(format!("no addresses for {host}")))?; 199 let tcp = tokio::net::TcpStream::connect(addr) 200 .await 201 .map_err(|e| NetworkError::Connect(e.to_string()))?; 202 let (ws, _resp) = tokio_tungstenite::client_async_tls(url.as_str(), tcp) 203 .await 204 .map_err(|e| NetworkError::Connect(e.to_string()))?; 205 let (sink_inner, stream_inner) = futures::StreamExt::split(ws); 206 let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner }); 207 let stream: Box<dyn WsStream> = Box::new(TungsteniteStream { 208 inner: stream_inner, 209 }); 210 Ok(WsConn { sink, stream }) 211 }) 212 } 213} 214 215type TungsteniteWsStream = 216 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>; 217 218struct TungsteniteSink { 219 inner: futures::stream::SplitSink<TungsteniteWsStream, TungsteniteMessage>, 220} 221 222impl WsSink for TungsteniteSink { 223 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> { 224 Box::pin(async move { 225 use futures::SinkExt; 226 self.inner 227 .send(message_to_tungstenite(message)) 228 .await 229 .map_err(|e| NetworkError::Transport(e.to_string())) 230 }) 231 } 232} 233 234struct TungsteniteStream { 235 inner: futures::stream::SplitStream<TungsteniteWsStream>, 236} 237 238impl WsStream for TungsteniteStream { 239 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> { 240 Box::pin(async move { 241 let item = StreamExt::next(&mut self.inner).await?; 242 Some( 243 item.map_err(|e| NetworkError::Transport(e.to_string())) 244 .and_then(message_from_tungstenite), 245 ) 246 }) 247 } 248} 249 250fn message_to_tungstenite(message: WsMessage) -> TungsteniteMessage { 251 match message { 252 WsMessage::Text(text) => TungsteniteMessage::Text(text.into()), 253 WsMessage::Binary(bytes) => TungsteniteMessage::Binary(WsBytes::copy_from_slice(&bytes)), 254 WsMessage::Ping(bytes) => TungsteniteMessage::Ping(WsBytes::copy_from_slice(&bytes)), 255 WsMessage::Pong(bytes) => TungsteniteMessage::Pong(WsBytes::copy_from_slice(&bytes)), 256 WsMessage::Close { code, reason } => TungsteniteMessage::Close(Some(TungsteniteClose { 257 code: TungsteniteCloseCode::from(code), 258 reason: reason.into(), 259 })), 260 } 261} 262 263fn message_from_tungstenite(message: TungsteniteMessage) -> Result<WsMessage, NetworkError> { 264 match message { 265 TungsteniteMessage::Text(t) => Ok(WsMessage::Text(t.to_string())), 266 TungsteniteMessage::Binary(b) => Ok(WsMessage::Binary(Bytes::copy_from_slice(&b))), 267 TungsteniteMessage::Ping(b) => Ok(WsMessage::Ping(Bytes::copy_from_slice(&b))), 268 TungsteniteMessage::Pong(b) => Ok(WsMessage::Pong(Bytes::copy_from_slice(&b))), 269 TungsteniteMessage::Close(close) => { 270 let (code, reason) = close 271 .map(|c| (u16::from(c.code), c.reason.to_string())) 272 .unwrap_or((1000, String::new())); 273 Ok(WsMessage::Close { code, reason }) 274 } 275 TungsteniteMessage::Frame(_) => Err(NetworkError::Protocol( 276 "tungstenite raw frame surfaced unexpectedly".to_owned(), 277 )), 278 } 279}