Monorepo for Tangled
tangled.org
1use std::sync::{Arc, Mutex};
2use std::time::Duration;
3
4use bobbin_knot_proxy::KnotHost;
5use bobbin_runtime::{Clock, WsConn, WsMessage, WsTransport};
6use jacquard_common::DefaultStr;
7use jacquard_common::types::did::Did;
8use serde::Deserialize;
9use serde_json::value::RawValue;
10use tokio_util::sync::CancellationToken;
11use url::Url;
12
13use crate::client::authority;
14use crate::roster::{AclOp, Cursor, Roster};
15
16const KNOT_MEMBER_UPDATE_NSID: &str = "sh.tangled.knot.memberUpdate";
17const REPO_COLLABORATOR_UPDATE_NSID: &str = "sh.tangled.repo.collaboratorUpdate";
18const RECONNECT_INITIAL: Duration = Duration::from_secs(1);
19const RECONNECT_MAX: Duration = Duration::from_secs(60);
20const HEALTHY_SESSION_MIN: Duration = Duration::from_secs(15);
21
22#[derive(Clone)]
23pub struct StreamConfig {
24 pub ws: Arc<dyn WsTransport>,
25 pub clock: Arc<dyn Clock>,
26 pub cancel: CancellationToken,
27}
28
29enum SessionEnd {
30 Cancelled,
31 Closed { progressed: bool },
32 ConnectFailed,
33}
34
35pub async fn run_stream(
36 cfg: &StreamConfig,
37 host: &KnotHost,
38 roster: &Mutex<Roster>,
39 initial_cursor: i64,
40) {
41 let mut cursor = initial_cursor;
42 let mut backoff = RECONNECT_INITIAL;
43 loop {
44 if cfg.cancel.is_cancelled() {
45 return;
46 }
47 let started = cfg.clock.now_instant();
48 let end = run_session(cfg, host, roster, &mut cursor).await;
49 if matches!(&end, SessionEnd::Cancelled) {
50 return;
51 }
52 let elapsed = cfg.clock.now_instant().saturating_duration_since(started);
53 if matches!(&end, SessionEnd::Closed { progressed: false }) && cursor != 0 {
54 cursor = 0;
55 }
56 if session_was_healthy(&end, elapsed) {
57 backoff = RECONNECT_INITIAL;
58 }
59 let delay = jitter_delay(backoff, cfg.clock.now_unix_micros().raw());
60 tokio::select! {
61 _ = cfg.cancel.cancelled() => return,
62 _ = cfg.clock.sleep(delay) => {}
63 }
64 backoff = (backoff * 2).min(RECONNECT_MAX);
65 }
66}
67
68fn session_was_healthy(end: &SessionEnd, elapsed: Duration) -> bool {
69 matches!(end, SessionEnd::Closed { progressed: true }) && elapsed >= HEALTHY_SESSION_MIN
70}
71
72fn jitter_delay(base: Duration, entropy: u64) -> Duration {
73 let frac = (entropy % 1024) as f64 / 1024.0;
74 base.mul_f64(0.5 + 0.5 * frac)
75}
76
77async fn run_session(
78 cfg: &StreamConfig,
79 host: &KnotHost,
80 roster: &Mutex<Roster>,
81 cursor: &mut i64,
82) -> SessionEnd {
83 let Some(url) = events_url(host, *cursor) else {
84 return SessionEnd::ConnectFailed;
85 };
86 let conn = tokio::select! {
87 _ = cfg.cancel.cancelled() => return SessionEnd::Cancelled,
88 res = cfg.ws.connect(url) => match res {
89 Ok(conn) => conn,
90 Err(err) => {
91 tracing::warn!(host = %authority(host), error = %err, "knot eventstream connect failed");
92 return SessionEnd::ConnectFailed;
93 }
94 },
95 };
96 let WsConn {
97 mut sink,
98 mut stream,
99 } = conn;
100 let mut progressed = false;
101 loop {
102 let message = tokio::select! {
103 _ = cfg.cancel.cancelled() => return SessionEnd::Cancelled,
104 message = stream.next() => message,
105 };
106 match message {
107 None => return SessionEnd::Closed { progressed },
108 Some(Ok(WsMessage::Text(text))) => {
109 progressed = true;
110 process_frame(&text, roster, cursor);
111 }
112 Some(Ok(WsMessage::Ping(payload))) => {
113 progressed = true;
114 let _ = sink.send(WsMessage::Pong(payload)).await;
115 }
116 Some(Ok(WsMessage::Close { .. })) => return SessionEnd::Closed { progressed },
117 Some(Ok(_)) => {}
118 Some(Err(err)) => {
119 tracing::warn!(host = %authority(host), error = %err, "knot eventstream read error");
120 return SessionEnd::Closed { progressed };
121 }
122 }
123 }
124}
125
126fn process_frame(text: &str, roster: &Mutex<Roster>, cursor: &mut i64) {
127 let Ok(frame) = serde_json::from_str::<FrameWire>(text) else {
128 return;
129 };
130 *cursor = frame.created;
131 match frame.nsid.as_str() {
132 KNOT_MEMBER_UPDATE_NSID => {
133 if let Ok(update) = serde_json::from_str::<MemberUpdate>(frame.event.get()) {
134 roster.lock().unwrap().apply_member(
135 update.op,
136 update.subject,
137 Cursor(frame.created),
138 );
139 }
140 }
141 REPO_COLLABORATOR_UPDATE_NSID => {
142 if let Ok(update) = serde_json::from_str::<CollaboratorUpdate>(frame.event.get()) {
143 roster.lock().unwrap().apply_collaborator(
144 update.op,
145 update.repo,
146 update.subject,
147 Cursor(frame.created),
148 );
149 }
150 }
151 _ => {}
152 }
153}
154
155fn events_url(host: &KnotHost, cursor: i64) -> Option<Url> {
156 let scheme = if host.url().scheme() == "https" {
157 "wss"
158 } else {
159 "ws"
160 };
161 let base = format!("{scheme}://{}/events", authority(host));
162 let full = if cursor != 0 {
163 format!("{base}?cursor={cursor}")
164 } else {
165 base
166 };
167 Url::parse(&full).ok()
168}
169
170#[derive(Deserialize)]
171struct FrameWire {
172 nsid: String,
173 event: Box<RawValue>,
174 created: i64,
175}
176
177#[derive(Deserialize)]
178struct MemberUpdate {
179 op: AclOp,
180 subject: Did<DefaultStr>,
181}
182
183#[derive(Deserialize)]
184struct CollaboratorUpdate {
185 op: AclOp,
186 subject: Did<DefaultStr>,
187 repo: Did<DefaultStr>,
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use std::collections::VecDeque;
194 use std::sync::Mutex;
195
196 use bobbin_edge_index::EdgeStore;
197 use bobbin_runtime::{
198 NetworkError, SystemClock, WsConnectFuture, WsMessageFuture, WsSendFuture, WsSink, WsStream,
199 };
200 use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static};
201 use bobbin_types::knot_acl;
202 use bytes::Bytes;
203
204 use crate::registry::KnotRegistry;
205
206 struct ScriptStream {
207 msgs: VecDeque<WsMessage>,
208 }
209 impl WsStream for ScriptStream {
210 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> {
211 Box::pin(async move { self.msgs.pop_front().map(Ok) })
212 }
213 }
214
215 struct RecordSink {
216 sent: Arc<Mutex<Vec<WsMessage>>>,
217 }
218 impl WsSink for RecordSink {
219 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> {
220 let sent = self.sent.clone();
221 Box::pin(async move {
222 sent.lock().unwrap().push(message);
223 Ok(())
224 })
225 }
226 }
227
228 struct ScriptWs {
229 msgs: Mutex<Option<VecDeque<WsMessage>>>,
230 sent: Arc<Mutex<Vec<WsMessage>>>,
231 fail: bool,
232 }
233 impl WsTransport for ScriptWs {
234 fn connect(&self, _url: Url) -> WsConnectFuture {
235 if self.fail {
236 return Box::pin(async {
237 Err(NetworkError::Connect("scripted failure".to_owned()))
238 });
239 }
240 let msgs = self.msgs.lock().unwrap().take().unwrap_or_default();
241 let sent = self.sent.clone();
242 Box::pin(async move {
243 Ok(WsConn {
244 sink: Box::new(RecordSink { sent }),
245 stream: Box::new(ScriptStream { msgs }),
246 })
247 })
248 }
249 }
250
251 fn member_frame(op: &str, subject: &str, created: i64) -> String {
252 format!(
253 r#"{{"rkey":"r{created}","nsid":"sh.tangled.knot.memberUpdate","event":{{"op":"{op}","subject":"{subject}"}},"created":{created}}}"#
254 )
255 }
256
257 fn collab_frame(op: &str, subject: &str, repo: &str, created: i64) -> String {
258 format!(
259 r#"{{"rkey":"r{created}","nsid":"sh.tangled.repo.collaboratorUpdate","event":{{"op":"{op}","subject":"{subject}","repo":"{repo}"}},"created":{created}}}"#
260 )
261 }
262
263 fn did(s: &str) -> Did<DefaultStr> {
264 Did::new_owned(s).unwrap()
265 }
266
267 fn member_count(store: &EdgeStore, subject: &str) -> u64 {
268 store.count(&EdgeKey::new(
269 nsid_static("sh.tangled.knot.member"),
270 SubjectRef::Did(did(subject)),
271 ))
272 }
273
274 fn collaborator_count(store: &EdgeStore, repo: &str) -> u64 {
275 store.count(&EdgeKey::new(
276 nsid_static("sh.tangled.repo.collaborator"),
277 SubjectRef::Did(did(repo)),
278 ))
279 }
280
281 fn cfg(ws: Arc<dyn WsTransport>, cancel: CancellationToken) -> StreamConfig {
282 StreamConfig {
283 ws,
284 clock: Arc::new(SystemClock::new()),
285 cancel,
286 }
287 }
288
289 #[tokio::test]
290 async fn session_dispatches_deltas_pongs_and_advances_cursor() {
291 let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default()));
292 let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap();
293 let registry = Arc::new(KnotRegistry::new());
294 registry.observe_repo(
295 &knot_acl::KnotHostKey::new("oyster.cafe"),
296 Did::new_owned("did:plc:scallop").unwrap(),
297 );
298 let roster = Mutex::new(Roster::new(
299 store.clone(),
300 knot,
301 registry,
302 knot_acl::KnotHostKey::new("oyster.cafe"),
303 ));
304 let frames = VecDeque::from(vec![
305 WsMessage::Text(member_frame("add", "did:plc:boltless", 100)),
306 WsMessage::Text(collab_frame(
307 "add",
308 "did:plc:olaren",
309 "did:plc:scallop",
310 200,
311 )),
312 WsMessage::Ping(Bytes::from_static(b"ka")),
313 WsMessage::Text(member_frame("remove", "did:plc:boltless", 300)),
314 WsMessage::Close {
315 code: 1000,
316 reason: String::new(),
317 },
318 ]);
319 let sent = Arc::new(Mutex::new(Vec::new()));
320 let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs {
321 msgs: Mutex::new(Some(frames)),
322 sent: sent.clone(),
323 fail: false,
324 });
325 let config = cfg(ws, CancellationToken::new());
326 let host = KnotHost::parse("http://oyster.cafe").unwrap();
327 let mut cursor = 0i64;
328
329 let end = run_session(&config, &host, &roster, &mut cursor).await;
330
331 assert!(matches!(end, SessionEnd::Closed { progressed: true }));
332 assert_eq!(cursor, 300);
333 assert_eq!(member_count(&store, "did:plc:boltless"), 0);
334 assert_eq!(collaborator_count(&store, "did:plc:scallop"), 1);
335 let sent = sent.lock().unwrap();
336 assert_eq!(sent.len(), 1);
337 assert!(matches!(&sent[0], WsMessage::Pong(p) if p.as_ref() == b"ka"));
338 }
339
340 #[tokio::test]
341 async fn session_reports_no_progress_on_immediate_close() {
342 let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default()));
343 let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap();
344 let roster = Mutex::new(Roster::new(
345 store,
346 knot,
347 Arc::new(KnotRegistry::new()),
348 knot_acl::KnotHostKey::new("oyster.cafe"),
349 ));
350 let frames = VecDeque::from(vec![WsMessage::Close {
351 code: 1000,
352 reason: String::new(),
353 }]);
354 let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs {
355 msgs: Mutex::new(Some(frames)),
356 sent: Arc::new(Mutex::new(Vec::new())),
357 fail: false,
358 });
359 let config = cfg(ws, CancellationToken::new());
360 let host = KnotHost::parse("http://oyster.cafe").unwrap();
361 let mut cursor = 99i64;
362
363 let end = run_session(&config, &host, &roster, &mut cursor).await;
364
365 assert!(
366 matches!(end, SessionEnd::Closed { progressed: false }),
367 "a session that delivers no frames before closing reports no progress"
368 );
369 }
370
371 #[tokio::test]
372 async fn pre_cancelled_stream_returns_without_connecting() {
373 let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default()));
374 let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap();
375 let roster = Mutex::new(Roster::new(
376 store,
377 knot,
378 Arc::new(KnotRegistry::new()),
379 knot_acl::KnotHostKey::new("oyster.cafe"),
380 ));
381 let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs {
382 msgs: Mutex::new(None),
383 sent: Arc::new(Mutex::new(Vec::new())),
384 fail: true,
385 });
386 let cancel = CancellationToken::new();
387 cancel.cancel();
388 let config = cfg(ws, cancel);
389 let host = KnotHost::parse("http://oyster.cafe").unwrap();
390
391 run_stream(&config, &host, &roster, 0).await;
392 }
393
394 #[test]
395 fn events_url_carries_scheme_and_cursor() {
396 let host = KnotHost::parse("http://oyster.cafe").unwrap();
397 assert_eq!(
398 events_url(&host, 0).unwrap().as_str(),
399 "ws://oyster.cafe/events"
400 );
401 assert_eq!(
402 events_url(&host, 42).unwrap().as_str(),
403 "ws://oyster.cafe/events?cursor=42"
404 );
405 let secure = KnotHost::parse("https://nel.pet").unwrap();
406 assert_eq!(
407 events_url(&secure, 7).unwrap().as_str(),
408 "wss://nel.pet/events?cursor=7"
409 );
410 }
411
412 #[test]
413 fn only_a_long_lived_session_resets_backoff() {
414 assert!(session_was_healthy(
415 &SessionEnd::Closed { progressed: true },
416 HEALTHY_SESSION_MIN
417 ));
418 assert!(
419 !session_was_healthy(
420 &SessionEnd::Closed { progressed: true },
421 HEALTHY_SESSION_MIN - Duration::from_millis(1)
422 ),
423 "a knot that flaps faster than the healthy floor must keep backing off"
424 );
425 assert!(!session_was_healthy(
426 &SessionEnd::Closed { progressed: false },
427 Duration::from_secs(3600)
428 ));
429 assert!(!session_was_healthy(
430 &SessionEnd::ConnectFailed,
431 Duration::from_secs(3600)
432 ));
433 }
434
435 #[test]
436 fn jitter_delay_stays_within_half_to_full_window() {
437 let base = Duration::from_secs(8);
438 [0u64, 1, 511, 512, 1023, 1024, u64::MAX]
439 .into_iter()
440 .for_each(|entropy| {
441 let delay = jitter_delay(base, entropy);
442 assert!(delay >= base / 2, "delay {delay:?} below half of {base:?}");
443 assert!(delay <= base, "delay {delay:?} above {base:?}");
444 });
445 }
446}