This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1use std::future::Future; 2use std::pin::Pin; 3use std::sync::Arc; 4use std::time::Duration; 5 6use bytes::Bytes; 7use futures::stream; 8use http::{HeaderMap, StatusCode}; 9use tokio::sync::mpsc; 10use url::Url; 11 12use crate::clock::{Clock, SleepFuture}; 13use crate::network::{ 14 BodyStream, HttpRequest, HttpResponseFuture, HttpResponseHead, HttpResult, HttpTransport, 15 NetworkError, WsConn, WsConnectFuture, WsMessage, WsMessageFuture, WsSendFuture, WsSink, 16 WsStream, WsTransport, 17}; 18 19#[derive(Debug)] 20pub struct MemHttpResponse { 21 pub latency: Duration, 22 pub result: Result<MemHttpBody, NetworkError>, 23} 24 25#[derive(Debug)] 26pub struct MemHttpBody { 27 pub status: StatusCode, 28 pub headers: HeaderMap, 29 pub body: Bytes, 30} 31 32impl MemHttpBody { 33 pub fn ok_json(body: Bytes) -> Self { 34 let mut headers = HeaderMap::new(); 35 headers.insert( 36 http::header::CONTENT_TYPE, 37 "application/json".parse().unwrap(), 38 ); 39 Self { 40 status: StatusCode::OK, 41 headers, 42 body, 43 } 44 } 45 46 pub fn status_only(status: StatusCode) -> Self { 47 Self { 48 status, 49 headers: HeaderMap::new(), 50 body: Bytes::new(), 51 } 52 } 53} 54 55pub trait MemHttpResponder: Send + Sync + 'static { 56 fn respond(&self, request: &HttpRequest) -> MemHttpResponse; 57} 58 59#[derive(Clone)] 60pub struct MemHttpTransport { 61 responder: Arc<dyn MemHttpResponder>, 62 clock: Arc<dyn Clock>, 63} 64 65impl MemHttpTransport { 66 pub fn new(responder: Arc<dyn MemHttpResponder>, clock: Arc<dyn Clock>) -> Self { 67 Self { responder, clock } 68 } 69 70 pub fn shared( 71 responder: Arc<dyn MemHttpResponder>, 72 clock: Arc<dyn Clock>, 73 ) -> Arc<dyn HttpTransport> { 74 Arc::new(Self::new(responder, clock)) 75 } 76} 77 78impl HttpTransport for MemHttpTransport { 79 fn execute(&self, request: HttpRequest) -> HttpResponseFuture { 80 let response = self.responder.respond(&request); 81 let sleep: SleepFuture = self.clock.sleep(response.latency); 82 Box::pin(async move { 83 sleep.await; 84 into_response_head(response.result) 85 }) 86 } 87} 88 89fn into_response_head(result: Result<MemHttpBody, NetworkError>) -> HttpResult { 90 let body = result?; 91 let content_length = Some(body.body.len() as u64); 92 let body_stream: BodyStream = Box::pin(stream::once(async move { Ok(body.body) })); 93 Ok(HttpResponseHead { 94 status: body.status, 95 headers: body.headers, 96 content_length, 97 body: body_stream, 98 }) 99} 100 101pub type MemWsServerFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>; 102 103pub trait MemWsResponder: Send + Sync + 'static { 104 fn spawn_server( 105 &self, 106 url: Url, 107 recv: mpsc::UnboundedReceiver<WsMessage>, 108 send: mpsc::Sender<WsMessage>, 109 ) -> MemWsServerFuture; 110} 111 112pub const DEFAULT_MEM_WS_CAPACITY: usize = 4096; 113 114#[derive(Clone)] 115pub struct MemWsTransport { 116 responder: Arc<dyn MemWsResponder>, 117 capacity: usize, 118} 119 120impl MemWsTransport { 121 pub fn new(responder: Arc<dyn MemWsResponder>) -> Self { 122 Self::with_capacity(responder, DEFAULT_MEM_WS_CAPACITY) 123 } 124 125 pub fn with_capacity(responder: Arc<dyn MemWsResponder>, capacity: usize) -> Self { 126 assert!(capacity > 0, "MemWsTransport capacity must be > 0"); 127 Self { 128 responder, 129 capacity, 130 } 131 } 132 133 pub fn shared(responder: Arc<dyn MemWsResponder>) -> Arc<dyn WsTransport> { 134 Arc::new(Self::new(responder)) 135 } 136 137 pub fn shared_with_capacity( 138 responder: Arc<dyn MemWsResponder>, 139 capacity: usize, 140 ) -> Arc<dyn WsTransport> { 141 Arc::new(Self::with_capacity(responder, capacity)) 142 } 143} 144 145impl WsTransport for MemWsTransport { 146 fn connect(&self, url: Url) -> WsConnectFuture { 147 let responder = self.responder.clone(); 148 let capacity = self.capacity; 149 Box::pin(async move { 150 let (c2s_tx, c2s_rx) = mpsc::unbounded_channel(); 151 let (s2c_tx, s2c_rx) = mpsc::channel(capacity); 152 let server_future = responder.spawn_server(url, c2s_rx, s2c_tx); 153 tokio::spawn(server_future); 154 let sink: Box<dyn WsSink> = Box::new(MemWsSink { sender: c2s_tx }); 155 let stream: Box<dyn WsStream> = Box::new(MemWsStream { receiver: s2c_rx }); 156 Ok(WsConn { sink, stream }) 157 }) 158 } 159} 160 161struct MemWsSink { 162 sender: mpsc::UnboundedSender<WsMessage>, 163} 164 165impl WsSink for MemWsSink { 166 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> { 167 let result = self 168 .sender 169 .send(message) 170 .map_err(|_| NetworkError::Transport("server side closed channel".into())); 171 Box::pin(async move { result }) 172 } 173} 174 175struct MemWsStream { 176 receiver: mpsc::Receiver<WsMessage>, 177} 178 179impl WsStream for MemWsStream { 180 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> { 181 Box::pin(async move { self.receiver.recv().await.map(Ok) }) 182 } 183} 184 185#[cfg(test)] 186mod tests { 187 use super::*; 188 use crate::SimClock; 189 use crate::UnixMicros; 190 use std::sync::Mutex; 191 use std::sync::atomic::{AtomicUsize, Ordering}; 192 use tokio::time::Instant; 193 194 struct ScriptedHttp { 195 responses: Mutex<Vec<MemHttpResponse>>, 196 cursor: AtomicUsize, 197 } 198 199 impl ScriptedHttp { 200 fn new(responses: Vec<MemHttpResponse>) -> Self { 201 Self { 202 responses: Mutex::new(responses), 203 cursor: AtomicUsize::new(0), 204 } 205 } 206 } 207 208 impl MemHttpResponder for ScriptedHttp { 209 fn respond(&self, _: &HttpRequest) -> MemHttpResponse { 210 let i = self.cursor.fetch_add(1, Ordering::Relaxed); 211 let mut guard = self.responses.lock().unwrap(); 212 std::mem::replace( 213 &mut guard[i], 214 MemHttpResponse { 215 latency: Duration::ZERO, 216 result: Err(NetworkError::Transport("script consumed".into())), 217 }, 218 ) 219 } 220 } 221 222 fn ok_response(body: &str, latency_ms: u64) -> MemHttpResponse { 223 MemHttpResponse { 224 latency: Duration::from_millis(latency_ms), 225 result: Ok(MemHttpBody::ok_json(Bytes::from(body.to_owned()))), 226 } 227 } 228 229 fn err_response(latency_ms: u64) -> MemHttpResponse { 230 MemHttpResponse { 231 latency: Duration::from_millis(latency_ms), 232 result: Err(NetworkError::Transport("brownout".into())), 233 } 234 } 235 236 #[tokio::test(start_paused = true)] 237 async fn http_returns_scripted_body_after_injected_latency() { 238 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(UnixMicros::new(0))); 239 let responder: Arc<dyn MemHttpResponder> = 240 Arc::new(ScriptedHttp::new(vec![ok_response("{\"hello\":1}", 10)])); 241 let transport = MemHttpTransport::new(responder, clock); 242 let request = HttpRequest { 243 url: Url::parse("http://oyster.cafe/xrpc/x").unwrap(), 244 headers: HeaderMap::new(), 245 }; 246 247 let before = Instant::now(); 248 let resp = transport.execute(request).await.unwrap(); 249 let elapsed = Instant::now().saturating_duration_since(before); 250 251 assert_eq!(resp.status, StatusCode::OK); 252 assert_eq!(elapsed, Duration::from_millis(10)); 253 let chunks = collect_body(resp.body).await; 254 assert_eq!(chunks.as_ref(), b"{\"hello\":1}"); 255 } 256 257 #[tokio::test(start_paused = true)] 258 async fn http_propagates_scripted_errors_with_latency() { 259 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(UnixMicros::new(0))); 260 let responder: Arc<dyn MemHttpResponder> = 261 Arc::new(ScriptedHttp::new(vec![err_response(50)])); 262 let transport = MemHttpTransport::new(responder, clock); 263 let request = HttpRequest { 264 url: Url::parse("http://oyster.cafe/xrpc/x").unwrap(), 265 headers: HeaderMap::new(), 266 }; 267 268 let before = Instant::now(); 269 let resp = transport.execute(request).await; 270 let elapsed = Instant::now().saturating_duration_since(before); 271 272 assert_eq!(elapsed, Duration::from_millis(50)); 273 assert!(matches!(resp, Err(NetworkError::Transport(_)))); 274 } 275 276 async fn collect_body(mut body: BodyStream) -> Bytes { 277 use futures::StreamExt; 278 let mut acc = Vec::new(); 279 while let Some(chunk) = body.next().await { 280 acc.extend_from_slice(&chunk.unwrap()); 281 } 282 Bytes::from(acc) 283 } 284 285 struct ScriptedWs { 286 frames: Mutex<Vec<WsMessage>>, 287 } 288 289 impl MemWsResponder for ScriptedWs { 290 fn spawn_server( 291 &self, 292 _: Url, 293 mut _recv: mpsc::UnboundedReceiver<WsMessage>, 294 send: mpsc::Sender<WsMessage>, 295 ) -> MemWsServerFuture { 296 let frames: Vec<WsMessage> = std::mem::take(&mut *self.frames.lock().unwrap()); 297 Box::pin(async move { 298 for frame in frames { 299 if send.send(frame).await.is_err() { 300 return; 301 } 302 } 303 }) 304 } 305 } 306 307 #[tokio::test(start_paused = true)] 308 async fn ws_delivers_scripted_frames_to_client() { 309 let responder: Arc<dyn MemWsResponder> = Arc::new(ScriptedWs { 310 frames: Mutex::new(vec![ 311 WsMessage::Text("frame-a".into()), 312 WsMessage::Text("frame-b".into()), 313 ]), 314 }); 315 let transport = MemWsTransport::new(responder); 316 let mut conn = transport 317 .connect(Url::parse("ws://oyster.cafe/").unwrap()) 318 .await 319 .unwrap(); 320 let a = conn.stream.next().await.unwrap().unwrap(); 321 let b = conn.stream.next().await.unwrap().unwrap(); 322 assert!(matches!(a, WsMessage::Text(t) if t == "frame-a")); 323 assert!(matches!(b, WsMessage::Text(t) if t == "frame-b")); 324 } 325 326 struct EchoWs; 327 328 impl MemWsResponder for EchoWs { 329 fn spawn_server( 330 &self, 331 _: Url, 332 mut recv: mpsc::UnboundedReceiver<WsMessage>, 333 send: mpsc::Sender<WsMessage>, 334 ) -> MemWsServerFuture { 335 Box::pin(async move { 336 while let Some(msg) = recv.recv().await { 337 if send.send(msg).await.is_err() { 338 return; 339 } 340 } 341 }) 342 } 343 } 344 345 #[tokio::test(start_paused = true)] 346 async fn ws_round_trip_via_server_echo() { 347 let transport = MemWsTransport::new(Arc::new(EchoWs)); 348 let mut conn = transport 349 .connect(Url::parse("ws://oyster.cafe/").unwrap()) 350 .await 351 .unwrap(); 352 conn.sink 353 .send(WsMessage::Text("ping".into())) 354 .await 355 .unwrap(); 356 let echoed = conn.stream.next().await.unwrap().unwrap(); 357 assert!(matches!(echoed, WsMessage::Text(t) if t == "ping")); 358 } 359}