This repository has no description
1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Duration;
5
6use bytes::Bytes;
7use futures::stream;
8use http::{HeaderMap, StatusCode};
9use tokio::sync::mpsc;
10use url::Url;
11
12use crate::clock::{Clock, SleepFuture};
13use crate::network::{
14 BodyStream, HttpRequest, HttpResponseFuture, HttpResponseHead, HttpResult, HttpTransport,
15 NetworkError, WsConn, WsConnectFuture, WsMessage, WsMessageFuture, WsSendFuture, WsSink,
16 WsStream, WsTransport,
17};
18
19#[derive(Debug)]
20pub struct MemHttpResponse {
21 pub latency: Duration,
22 pub result: Result<MemHttpBody, NetworkError>,
23}
24
25#[derive(Debug)]
26pub struct MemHttpBody {
27 pub status: StatusCode,
28 pub headers: HeaderMap,
29 pub body: Bytes,
30}
31
32impl MemHttpBody {
33 pub fn ok_json(body: Bytes) -> Self {
34 let mut headers = HeaderMap::new();
35 headers.insert(
36 http::header::CONTENT_TYPE,
37 "application/json".parse().unwrap(),
38 );
39 Self {
40 status: StatusCode::OK,
41 headers,
42 body,
43 }
44 }
45
46 pub fn status_only(status: StatusCode) -> Self {
47 Self {
48 status,
49 headers: HeaderMap::new(),
50 body: Bytes::new(),
51 }
52 }
53}
54
55pub trait MemHttpResponder: Send + Sync + 'static {
56 fn respond(&self, request: &HttpRequest) -> MemHttpResponse;
57}
58
59#[derive(Clone)]
60pub struct MemHttpTransport {
61 responder: Arc<dyn MemHttpResponder>,
62 clock: Arc<dyn Clock>,
63}
64
65impl MemHttpTransport {
66 pub fn new(responder: Arc<dyn MemHttpResponder>, clock: Arc<dyn Clock>) -> Self {
67 Self { responder, clock }
68 }
69
70 pub fn shared(
71 responder: Arc<dyn MemHttpResponder>,
72 clock: Arc<dyn Clock>,
73 ) -> Arc<dyn HttpTransport> {
74 Arc::new(Self::new(responder, clock))
75 }
76}
77
78impl HttpTransport for MemHttpTransport {
79 fn execute(&self, request: HttpRequest) -> HttpResponseFuture {
80 let response = self.responder.respond(&request);
81 let sleep: SleepFuture = self.clock.sleep(response.latency);
82 Box::pin(async move {
83 sleep.await;
84 into_response_head(response.result)
85 })
86 }
87}
88
89fn into_response_head(result: Result<MemHttpBody, NetworkError>) -> HttpResult {
90 let body = result?;
91 let content_length = Some(body.body.len() as u64);
92 let body_stream: BodyStream = Box::pin(stream::once(async move { Ok(body.body) }));
93 Ok(HttpResponseHead {
94 status: body.status,
95 headers: body.headers,
96 content_length,
97 body: body_stream,
98 })
99}
100
101pub type MemWsServerFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
102
103pub trait MemWsResponder: Send + Sync + 'static {
104 fn spawn_server(
105 &self,
106 url: Url,
107 recv: mpsc::UnboundedReceiver<WsMessage>,
108 send: mpsc::Sender<WsMessage>,
109 ) -> MemWsServerFuture;
110}
111
112pub const DEFAULT_MEM_WS_CAPACITY: usize = 4096;
113
114#[derive(Clone)]
115pub struct MemWsTransport {
116 responder: Arc<dyn MemWsResponder>,
117 capacity: usize,
118}
119
120impl MemWsTransport {
121 pub fn new(responder: Arc<dyn MemWsResponder>) -> Self {
122 Self::with_capacity(responder, DEFAULT_MEM_WS_CAPACITY)
123 }
124
125 pub fn with_capacity(responder: Arc<dyn MemWsResponder>, capacity: usize) -> Self {
126 assert!(capacity > 0, "MemWsTransport capacity must be > 0");
127 Self {
128 responder,
129 capacity,
130 }
131 }
132
133 pub fn shared(responder: Arc<dyn MemWsResponder>) -> Arc<dyn WsTransport> {
134 Arc::new(Self::new(responder))
135 }
136
137 pub fn shared_with_capacity(
138 responder: Arc<dyn MemWsResponder>,
139 capacity: usize,
140 ) -> Arc<dyn WsTransport> {
141 Arc::new(Self::with_capacity(responder, capacity))
142 }
143}
144
145impl WsTransport for MemWsTransport {
146 fn connect(&self, url: Url) -> WsConnectFuture {
147 let responder = self.responder.clone();
148 let capacity = self.capacity;
149 Box::pin(async move {
150 let (c2s_tx, c2s_rx) = mpsc::unbounded_channel();
151 let (s2c_tx, s2c_rx) = mpsc::channel(capacity);
152 let server_future = responder.spawn_server(url, c2s_rx, s2c_tx);
153 tokio::spawn(server_future);
154 let sink: Box<dyn WsSink> = Box::new(MemWsSink { sender: c2s_tx });
155 let stream: Box<dyn WsStream> = Box::new(MemWsStream { receiver: s2c_rx });
156 Ok(WsConn { sink, stream })
157 })
158 }
159}
160
161struct MemWsSink {
162 sender: mpsc::UnboundedSender<WsMessage>,
163}
164
165impl WsSink for MemWsSink {
166 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> {
167 let result = self
168 .sender
169 .send(message)
170 .map_err(|_| NetworkError::Transport("server side closed channel".into()));
171 Box::pin(async move { result })
172 }
173}
174
175struct MemWsStream {
176 receiver: mpsc::Receiver<WsMessage>,
177}
178
179impl WsStream for MemWsStream {
180 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> {
181 Box::pin(async move { self.receiver.recv().await.map(Ok) })
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::SimClock;
189 use crate::UnixMicros;
190 use std::sync::Mutex;
191 use std::sync::atomic::{AtomicUsize, Ordering};
192 use tokio::time::Instant;
193
194 struct ScriptedHttp {
195 responses: Mutex<Vec<MemHttpResponse>>,
196 cursor: AtomicUsize,
197 }
198
199 impl ScriptedHttp {
200 fn new(responses: Vec<MemHttpResponse>) -> Self {
201 Self {
202 responses: Mutex::new(responses),
203 cursor: AtomicUsize::new(0),
204 }
205 }
206 }
207
208 impl MemHttpResponder for ScriptedHttp {
209 fn respond(&self, _: &HttpRequest) -> MemHttpResponse {
210 let i = self.cursor.fetch_add(1, Ordering::Relaxed);
211 let mut guard = self.responses.lock().unwrap();
212 std::mem::replace(
213 &mut guard[i],
214 MemHttpResponse {
215 latency: Duration::ZERO,
216 result: Err(NetworkError::Transport("script consumed".into())),
217 },
218 )
219 }
220 }
221
222 fn ok_response(body: &str, latency_ms: u64) -> MemHttpResponse {
223 MemHttpResponse {
224 latency: Duration::from_millis(latency_ms),
225 result: Ok(MemHttpBody::ok_json(Bytes::from(body.to_owned()))),
226 }
227 }
228
229 fn err_response(latency_ms: u64) -> MemHttpResponse {
230 MemHttpResponse {
231 latency: Duration::from_millis(latency_ms),
232 result: Err(NetworkError::Transport("brownout".into())),
233 }
234 }
235
236 #[tokio::test(start_paused = true)]
237 async fn http_returns_scripted_body_after_injected_latency() {
238 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(UnixMicros::new(0)));
239 let responder: Arc<dyn MemHttpResponder> =
240 Arc::new(ScriptedHttp::new(vec![ok_response("{\"hello\":1}", 10)]));
241 let transport = MemHttpTransport::new(responder, clock);
242 let request = HttpRequest {
243 url: Url::parse("http://oyster.cafe/xrpc/x").unwrap(),
244 headers: HeaderMap::new(),
245 };
246
247 let before = Instant::now();
248 let resp = transport.execute(request).await.unwrap();
249 let elapsed = Instant::now().saturating_duration_since(before);
250
251 assert_eq!(resp.status, StatusCode::OK);
252 assert_eq!(elapsed, Duration::from_millis(10));
253 let chunks = collect_body(resp.body).await;
254 assert_eq!(chunks.as_ref(), b"{\"hello\":1}");
255 }
256
257 #[tokio::test(start_paused = true)]
258 async fn http_propagates_scripted_errors_with_latency() {
259 let clock: Arc<dyn Clock> = Arc::new(SimClock::at(UnixMicros::new(0)));
260 let responder: Arc<dyn MemHttpResponder> =
261 Arc::new(ScriptedHttp::new(vec![err_response(50)]));
262 let transport = MemHttpTransport::new(responder, clock);
263 let request = HttpRequest {
264 url: Url::parse("http://oyster.cafe/xrpc/x").unwrap(),
265 headers: HeaderMap::new(),
266 };
267
268 let before = Instant::now();
269 let resp = transport.execute(request).await;
270 let elapsed = Instant::now().saturating_duration_since(before);
271
272 assert_eq!(elapsed, Duration::from_millis(50));
273 assert!(matches!(resp, Err(NetworkError::Transport(_))));
274 }
275
276 async fn collect_body(mut body: BodyStream) -> Bytes {
277 use futures::StreamExt;
278 let mut acc = Vec::new();
279 while let Some(chunk) = body.next().await {
280 acc.extend_from_slice(&chunk.unwrap());
281 }
282 Bytes::from(acc)
283 }
284
285 struct ScriptedWs {
286 frames: Mutex<Vec<WsMessage>>,
287 }
288
289 impl MemWsResponder for ScriptedWs {
290 fn spawn_server(
291 &self,
292 _: Url,
293 mut _recv: mpsc::UnboundedReceiver<WsMessage>,
294 send: mpsc::Sender<WsMessage>,
295 ) -> MemWsServerFuture {
296 let frames: Vec<WsMessage> = std::mem::take(&mut *self.frames.lock().unwrap());
297 Box::pin(async move {
298 for frame in frames {
299 if send.send(frame).await.is_err() {
300 return;
301 }
302 }
303 })
304 }
305 }
306
307 #[tokio::test(start_paused = true)]
308 async fn ws_delivers_scripted_frames_to_client() {
309 let responder: Arc<dyn MemWsResponder> = Arc::new(ScriptedWs {
310 frames: Mutex::new(vec![
311 WsMessage::Text("frame-a".into()),
312 WsMessage::Text("frame-b".into()),
313 ]),
314 });
315 let transport = MemWsTransport::new(responder);
316 let mut conn = transport
317 .connect(Url::parse("ws://oyster.cafe/").unwrap())
318 .await
319 .unwrap();
320 let a = conn.stream.next().await.unwrap().unwrap();
321 let b = conn.stream.next().await.unwrap().unwrap();
322 assert!(matches!(a, WsMessage::Text(t) if t == "frame-a"));
323 assert!(matches!(b, WsMessage::Text(t) if t == "frame-b"));
324 }
325
326 struct EchoWs;
327
328 impl MemWsResponder for EchoWs {
329 fn spawn_server(
330 &self,
331 _: Url,
332 mut recv: mpsc::UnboundedReceiver<WsMessage>,
333 send: mpsc::Sender<WsMessage>,
334 ) -> MemWsServerFuture {
335 Box::pin(async move {
336 while let Some(msg) = recv.recv().await {
337 if send.send(msg).await.is_err() {
338 return;
339 }
340 }
341 })
342 }
343 }
344
345 #[tokio::test(start_paused = true)]
346 async fn ws_round_trip_via_server_echo() {
347 let transport = MemWsTransport::new(Arc::new(EchoWs));
348 let mut conn = transport
349 .connect(Url::parse("ws://oyster.cafe/").unwrap())
350 .await
351 .unwrap();
352 conn.sink
353 .send(WsMessage::Text("ping".into()))
354 .await
355 .unwrap();
356 let echoed = conn.stream.next().await.unwrap().unwrap();
357 assert!(matches!(echoed, WsMessage::Text(t) if t == "ping"));
358 }
359}