Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use std::future::Future; 2use std::pin::Pin; 3use std::sync::Arc; 4 5use bytes::Bytes; 6use futures::stream::{Stream, StreamExt}; 7use http::{HeaderMap, StatusCode}; 8use thiserror::Error; 9use tokio_tungstenite::tungstenite::{ 10 Bytes as WsBytes, Message as TungsteniteMessage, protocol::CloseFrame as TungsteniteClose, 11 protocol::frame::coding::CloseCode as TungsteniteCloseCode, 12}; 13use url::Url; 14 15#[derive(Debug, Error)] 16pub enum NetworkError { 17 #[error("connect: {0}")] 18 Connect(String), 19 #[error("timeout: {0}")] 20 Timeout(String), 21 #[error("redirect: {0}")] 22 Redirect(String), 23 #[error("transport: {0}")] 24 Transport(String), 25 #[error("body: {0}")] 26 Body(String), 27 #[error("protocol: {0}")] 28 Protocol(String), 29} 30 31pub struct HttpRequest { 32 pub url: Url, 33 pub headers: HeaderMap, 34} 35 36pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetworkError>> + Send + 'static>>; 37 38pub struct HttpResponseHead { 39 pub status: StatusCode, 40 pub headers: HeaderMap, 41 pub content_length: Option<u64>, 42 pub body: BodyStream, 43} 44 45pub type HttpResult = Result<HttpResponseHead, NetworkError>; 46pub type HttpResponseFuture = Pin<Box<dyn Future<Output = HttpResult> + Send + 'static>>; 47 48pub trait HttpTransport: Send + Sync + 'static { 49 fn execute(&self, request: HttpRequest) -> HttpResponseFuture; 50} 51 52#[derive(Clone, Debug)] 53pub struct ReqwestHttp { 54 client: reqwest::Client, 55} 56 57impl ReqwestHttp { 58 pub fn new(client: reqwest::Client) -> Self { 59 Self { client } 60 } 61 62 pub fn shared(client: reqwest::Client) -> Arc<dyn HttpTransport> { 63 Arc::new(Self::new(client)) 64 } 65} 66 67impl HttpTransport for ReqwestHttp { 68 fn execute(&self, request: HttpRequest) -> HttpResponseFuture { 69 let client = self.client.clone(); 70 Box::pin(async move { 71 let resp = client 72 .get(request.url) 73 .headers(request.headers) 74 .send() 75 .await 76 .map_err(map_reqwest)?; 77 let status = resp.status(); 78 let headers = resp.headers().clone(); 79 let content_length = resp.content_length(); 80 let body: BodyStream = Box::pin( 81 resp.bytes_stream() 82 .map(|chunk| chunk.map_err(|e| NetworkError::Body(e.to_string()))), 83 ); 84 Ok(HttpResponseHead { 85 status, 86 headers, 87 content_length, 88 body, 89 }) 90 }) 91 } 92} 93 94fn map_reqwest(err: reqwest::Error) -> NetworkError { 95 let msg = err.to_string(); 96 if err.is_timeout() { 97 NetworkError::Timeout(msg) 98 } else if err.is_connect() { 99 NetworkError::Connect(msg) 100 } else if err.is_redirect() { 101 NetworkError::Redirect(msg) 102 } else { 103 NetworkError::Transport(msg) 104 } 105} 106 107#[derive(Clone, Debug)] 108pub enum WsMessage { 109 Text(String), 110 Binary(Bytes), 111 Ping(Bytes), 112 Pong(Bytes), 113 Close { code: u16, reason: String }, 114} 115 116pub type WsSendFuture<'a> = Pin<Box<dyn Future<Output = Result<(), NetworkError>> + Send + 'a>>; 117pub type WsMessageFuture<'a> = 118 Pin<Box<dyn Future<Output = Option<Result<WsMessage, NetworkError>>> + Send + 'a>>; 119 120pub trait WsSink: Send + 'static { 121 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a>; 122} 123 124pub trait WsStream: Send + 'static { 125 fn next<'a>(&'a mut self) -> WsMessageFuture<'a>; 126} 127 128pub struct WsConn { 129 pub sink: Box<dyn WsSink>, 130 pub stream: Box<dyn WsStream>, 131} 132 133pub type WsConnectFuture = 134 Pin<Box<dyn Future<Output = Result<WsConn, NetworkError>> + Send + 'static>>; 135 136pub trait WsTransport: Send + Sync + 'static { 137 fn connect(&self, url: Url) -> WsConnectFuture; 138} 139 140#[derive(Clone, Copy, Debug, Default)] 141pub struct TungsteniteWs; 142 143impl TungsteniteWs { 144 pub fn shared() -> Arc<dyn WsTransport> { 145 Arc::new(Self) 146 } 147} 148 149impl WsTransport for TungsteniteWs { 150 fn connect(&self, url: Url) -> WsConnectFuture { 151 Box::pin(async move { 152 let url_str = url.as_str().to_owned(); 153 let (ws, _resp) = tokio_tungstenite::connect_async(&url_str) 154 .await 155 .map_err(|e| NetworkError::Connect(e.to_string()))?; 156 let (sink_inner, stream_inner) = futures::StreamExt::split(ws); 157 let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner }); 158 let stream: Box<dyn WsStream> = Box::new(TungsteniteStream { 159 inner: stream_inner, 160 }); 161 Ok(WsConn { sink, stream }) 162 }) 163 } 164} 165 166type TungsteniteWsStream = 167 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>; 168 169struct TungsteniteSink { 170 inner: futures::stream::SplitSink<TungsteniteWsStream, TungsteniteMessage>, 171} 172 173impl WsSink for TungsteniteSink { 174 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> { 175 Box::pin(async move { 176 use futures::SinkExt; 177 self.inner 178 .send(message_to_tungstenite(message)) 179 .await 180 .map_err(|e| NetworkError::Transport(e.to_string())) 181 }) 182 } 183} 184 185struct TungsteniteStream { 186 inner: futures::stream::SplitStream<TungsteniteWsStream>, 187} 188 189impl WsStream for TungsteniteStream { 190 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> { 191 Box::pin(async move { 192 let item = StreamExt::next(&mut self.inner).await?; 193 Some( 194 item.map_err(|e| NetworkError::Transport(e.to_string())) 195 .and_then(message_from_tungstenite), 196 ) 197 }) 198 } 199} 200 201fn message_to_tungstenite(message: WsMessage) -> TungsteniteMessage { 202 match message { 203 WsMessage::Text(text) => TungsteniteMessage::Text(text.into()), 204 WsMessage::Binary(bytes) => TungsteniteMessage::Binary(WsBytes::copy_from_slice(&bytes)), 205 WsMessage::Ping(bytes) => TungsteniteMessage::Ping(WsBytes::copy_from_slice(&bytes)), 206 WsMessage::Pong(bytes) => TungsteniteMessage::Pong(WsBytes::copy_from_slice(&bytes)), 207 WsMessage::Close { code, reason } => TungsteniteMessage::Close(Some(TungsteniteClose { 208 code: TungsteniteCloseCode::from(code), 209 reason: reason.into(), 210 })), 211 } 212} 213 214fn message_from_tungstenite(message: TungsteniteMessage) -> Result<WsMessage, NetworkError> { 215 match message { 216 TungsteniteMessage::Text(t) => Ok(WsMessage::Text(t.to_string())), 217 TungsteniteMessage::Binary(b) => Ok(WsMessage::Binary(Bytes::copy_from_slice(&b))), 218 TungsteniteMessage::Ping(b) => Ok(WsMessage::Ping(Bytes::copy_from_slice(&b))), 219 TungsteniteMessage::Pong(b) => Ok(WsMessage::Pong(Bytes::copy_from_slice(&b))), 220 TungsteniteMessage::Close(close) => { 221 let (code, reason) = close 222 .map(|c| (u16::from(c.code), c.reason.to_string())) 223 .unwrap_or((1000, String::new())); 224 Ok(WsMessage::Close { code, reason }) 225 } 226 TungsteniteMessage::Frame(_) => Err(NetworkError::Protocol( 227 "tungstenite raw frame surfaced unexpectedly".to_owned(), 228 )), 229 } 230}