Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 15 kB View raw
1use std::sync::{Arc, Mutex}; 2use std::time::Duration; 3 4use bobbin_knot_proxy::KnotHost; 5use bobbin_runtime::{Clock, WsConn, WsMessage, WsTransport}; 6use jacquard_common::DefaultStr; 7use jacquard_common::types::did::Did; 8use serde::Deserialize; 9use serde_json::value::RawValue; 10use tokio_util::sync::CancellationToken; 11use url::Url; 12 13use crate::client::authority; 14use crate::roster::{AclOp, Cursor, Roster}; 15 16const KNOT_MEMBER_UPDATE_NSID: &str = "sh.tangled.knot.memberUpdate"; 17const REPO_COLLABORATOR_UPDATE_NSID: &str = "sh.tangled.repo.collaboratorUpdate"; 18const RECONNECT_INITIAL: Duration = Duration::from_secs(1); 19const RECONNECT_MAX: Duration = Duration::from_secs(60); 20const HEALTHY_SESSION_MIN: Duration = Duration::from_secs(15); 21 22#[derive(Clone)] 23pub struct StreamConfig { 24 pub ws: Arc<dyn WsTransport>, 25 pub clock: Arc<dyn Clock>, 26 pub cancel: CancellationToken, 27} 28 29enum SessionEnd { 30 Cancelled, 31 Closed { progressed: bool }, 32 ConnectFailed, 33} 34 35pub async fn run_stream( 36 cfg: &StreamConfig, 37 host: &KnotHost, 38 roster: &Mutex<Roster>, 39 initial_cursor: i64, 40) { 41 let mut cursor = initial_cursor; 42 let mut backoff = RECONNECT_INITIAL; 43 loop { 44 if cfg.cancel.is_cancelled() { 45 return; 46 } 47 let started = cfg.clock.now_instant(); 48 let end = run_session(cfg, host, roster, &mut cursor).await; 49 if matches!(&end, SessionEnd::Cancelled) { 50 return; 51 } 52 let elapsed = cfg.clock.now_instant().saturating_duration_since(started); 53 if matches!(&end, SessionEnd::Closed { progressed: false }) && cursor != 0 { 54 cursor = 0; 55 } 56 if session_was_healthy(&end, elapsed) { 57 backoff = RECONNECT_INITIAL; 58 } 59 let delay = jitter_delay(backoff, cfg.clock.now_unix_micros().raw()); 60 tokio::select! { 61 _ = cfg.cancel.cancelled() => return, 62 _ = cfg.clock.sleep(delay) => {} 63 } 64 backoff = (backoff * 2).min(RECONNECT_MAX); 65 } 66} 67 68fn session_was_healthy(end: &SessionEnd, elapsed: Duration) -> bool { 69 matches!(end, SessionEnd::Closed { progressed: true }) && elapsed >= HEALTHY_SESSION_MIN 70} 71 72fn jitter_delay(base: Duration, entropy: u64) -> Duration { 73 let frac = (entropy % 1024) as f64 / 1024.0; 74 base.mul_f64(0.5 + 0.5 * frac) 75} 76 77async fn run_session( 78 cfg: &StreamConfig, 79 host: &KnotHost, 80 roster: &Mutex<Roster>, 81 cursor: &mut i64, 82) -> SessionEnd { 83 let Some(url) = events_url(host, *cursor) else { 84 return SessionEnd::ConnectFailed; 85 }; 86 let conn = tokio::select! { 87 _ = cfg.cancel.cancelled() => return SessionEnd::Cancelled, 88 res = cfg.ws.connect(url) => match res { 89 Ok(conn) => conn, 90 Err(err) => { 91 tracing::warn!(host = %authority(host), error = %err, "knot eventstream connect failed"); 92 return SessionEnd::ConnectFailed; 93 } 94 }, 95 }; 96 let WsConn { 97 mut sink, 98 mut stream, 99 } = conn; 100 let mut progressed = false; 101 loop { 102 let message = tokio::select! { 103 _ = cfg.cancel.cancelled() => return SessionEnd::Cancelled, 104 message = stream.next() => message, 105 }; 106 match message { 107 None => return SessionEnd::Closed { progressed }, 108 Some(Ok(WsMessage::Text(text))) => { 109 progressed = true; 110 process_frame(&text, roster, cursor); 111 } 112 Some(Ok(WsMessage::Ping(payload))) => { 113 progressed = true; 114 let _ = sink.send(WsMessage::Pong(payload)).await; 115 } 116 Some(Ok(WsMessage::Close { .. })) => return SessionEnd::Closed { progressed }, 117 Some(Ok(_)) => {} 118 Some(Err(err)) => { 119 tracing::warn!(host = %authority(host), error = %err, "knot eventstream read error"); 120 return SessionEnd::Closed { progressed }; 121 } 122 } 123 } 124} 125 126fn process_frame(text: &str, roster: &Mutex<Roster>, cursor: &mut i64) { 127 let Ok(frame) = serde_json::from_str::<FrameWire>(text) else { 128 return; 129 }; 130 *cursor = frame.created; 131 match frame.nsid.as_str() { 132 KNOT_MEMBER_UPDATE_NSID => { 133 if let Ok(update) = serde_json::from_str::<MemberUpdate>(frame.event.get()) { 134 roster.lock().unwrap().apply_member( 135 update.op, 136 update.subject, 137 Cursor(frame.created), 138 ); 139 } 140 } 141 REPO_COLLABORATOR_UPDATE_NSID => { 142 if let Ok(update) = serde_json::from_str::<CollaboratorUpdate>(frame.event.get()) { 143 roster.lock().unwrap().apply_collaborator( 144 update.op, 145 update.repo, 146 update.subject, 147 Cursor(frame.created), 148 ); 149 } 150 } 151 _ => {} 152 } 153} 154 155fn events_url(host: &KnotHost, cursor: i64) -> Option<Url> { 156 let scheme = if host.url().scheme() == "https" { 157 "wss" 158 } else { 159 "ws" 160 }; 161 let base = format!("{scheme}://{}/events", authority(host)); 162 let full = if cursor != 0 { 163 format!("{base}?cursor={cursor}") 164 } else { 165 base 166 }; 167 Url::parse(&full).ok() 168} 169 170#[derive(Deserialize)] 171struct FrameWire { 172 nsid: String, 173 event: Box<RawValue>, 174 created: i64, 175} 176 177#[derive(Deserialize)] 178struct MemberUpdate { 179 op: AclOp, 180 subject: Did<DefaultStr>, 181} 182 183#[derive(Deserialize)] 184struct CollaboratorUpdate { 185 op: AclOp, 186 subject: Did<DefaultStr>, 187 repo: Did<DefaultStr>, 188} 189 190#[cfg(test)] 191mod tests { 192 use super::*; 193 use std::collections::VecDeque; 194 use std::sync::Mutex; 195 196 use bobbin_edge_index::EdgeStore; 197 use bobbin_runtime::{ 198 NetworkError, SystemClock, WsConnectFuture, WsMessageFuture, WsSendFuture, WsSink, WsStream, 199 }; 200 use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 201 use bobbin_types::knot_acl; 202 use bytes::Bytes; 203 204 use crate::registry::KnotRegistry; 205 206 struct ScriptStream { 207 msgs: VecDeque<WsMessage>, 208 } 209 impl WsStream for ScriptStream { 210 fn next<'a>(&'a mut self) -> WsMessageFuture<'a> { 211 Box::pin(async move { self.msgs.pop_front().map(Ok) }) 212 } 213 } 214 215 struct RecordSink { 216 sent: Arc<Mutex<Vec<WsMessage>>>, 217 } 218 impl WsSink for RecordSink { 219 fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> { 220 let sent = self.sent.clone(); 221 Box::pin(async move { 222 sent.lock().unwrap().push(message); 223 Ok(()) 224 }) 225 } 226 } 227 228 struct ScriptWs { 229 msgs: Mutex<Option<VecDeque<WsMessage>>>, 230 sent: Arc<Mutex<Vec<WsMessage>>>, 231 fail: bool, 232 } 233 impl WsTransport for ScriptWs { 234 fn connect(&self, _url: Url) -> WsConnectFuture { 235 if self.fail { 236 return Box::pin(async { 237 Err(NetworkError::Connect("scripted failure".to_owned())) 238 }); 239 } 240 let msgs = self.msgs.lock().unwrap().take().unwrap_or_default(); 241 let sent = self.sent.clone(); 242 Box::pin(async move { 243 Ok(WsConn { 244 sink: Box::new(RecordSink { sent }), 245 stream: Box::new(ScriptStream { msgs }), 246 }) 247 }) 248 } 249 } 250 251 fn member_frame(op: &str, subject: &str, created: i64) -> String { 252 format!( 253 r#"{{"rkey":"r{created}","nsid":"sh.tangled.knot.memberUpdate","event":{{"op":"{op}","subject":"{subject}"}},"created":{created}}}"# 254 ) 255 } 256 257 fn collab_frame(op: &str, subject: &str, repo: &str, created: i64) -> String { 258 format!( 259 r#"{{"rkey":"r{created}","nsid":"sh.tangled.repo.collaboratorUpdate","event":{{"op":"{op}","subject":"{subject}","repo":"{repo}"}},"created":{created}}}"# 260 ) 261 } 262 263 fn did(s: &str) -> Did<DefaultStr> { 264 Did::new_owned(s).unwrap() 265 } 266 267 fn member_count(store: &EdgeStore, subject: &str) -> u64 { 268 store.count(&EdgeKey::new( 269 nsid_static("sh.tangled.knot.member"), 270 SubjectRef::Did(did(subject)), 271 )) 272 } 273 274 fn collaborator_count(store: &EdgeStore, repo: &str) -> u64 { 275 store.count(&EdgeKey::new( 276 nsid_static("sh.tangled.repo.collaborator"), 277 SubjectRef::Did(did(repo)), 278 )) 279 } 280 281 fn cfg(ws: Arc<dyn WsTransport>, cancel: CancellationToken) -> StreamConfig { 282 StreamConfig { 283 ws, 284 clock: Arc::new(SystemClock::new()), 285 cancel, 286 } 287 } 288 289 #[tokio::test] 290 async fn session_dispatches_deltas_pongs_and_advances_cursor() { 291 let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default())); 292 let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap(); 293 let registry = Arc::new(KnotRegistry::new()); 294 registry.observe_repo( 295 &knot_acl::KnotHostKey::new("oyster.cafe"), 296 Did::new_owned("did:plc:scallop").unwrap(), 297 ); 298 let roster = Mutex::new(Roster::new( 299 store.clone(), 300 knot, 301 registry, 302 knot_acl::KnotHostKey::new("oyster.cafe"), 303 )); 304 let frames = VecDeque::from(vec![ 305 WsMessage::Text(member_frame("add", "did:plc:boltless", 100)), 306 WsMessage::Text(collab_frame( 307 "add", 308 "did:plc:olaren", 309 "did:plc:scallop", 310 200, 311 )), 312 WsMessage::Ping(Bytes::from_static(b"ka")), 313 WsMessage::Text(member_frame("remove", "did:plc:boltless", 300)), 314 WsMessage::Close { 315 code: 1000, 316 reason: String::new(), 317 }, 318 ]); 319 let sent = Arc::new(Mutex::new(Vec::new())); 320 let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs { 321 msgs: Mutex::new(Some(frames)), 322 sent: sent.clone(), 323 fail: false, 324 }); 325 let config = cfg(ws, CancellationToken::new()); 326 let host = KnotHost::parse("http://oyster.cafe").unwrap(); 327 let mut cursor = 0i64; 328 329 let end = run_session(&config, &host, &roster, &mut cursor).await; 330 331 assert!(matches!(end, SessionEnd::Closed { progressed: true })); 332 assert_eq!(cursor, 300); 333 assert_eq!(member_count(&store, "did:plc:boltless"), 0); 334 assert_eq!(collaborator_count(&store, "did:plc:scallop"), 1); 335 let sent = sent.lock().unwrap(); 336 assert_eq!(sent.len(), 1); 337 assert!(matches!(&sent[0], WsMessage::Pong(p) if p.as_ref() == b"ka")); 338 } 339 340 #[tokio::test] 341 async fn session_reports_no_progress_on_immediate_close() { 342 let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default())); 343 let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap(); 344 let roster = Mutex::new(Roster::new( 345 store, 346 knot, 347 Arc::new(KnotRegistry::new()), 348 knot_acl::KnotHostKey::new("oyster.cafe"), 349 )); 350 let frames = VecDeque::from(vec![WsMessage::Close { 351 code: 1000, 352 reason: String::new(), 353 }]); 354 let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs { 355 msgs: Mutex::new(Some(frames)), 356 sent: Arc::new(Mutex::new(Vec::new())), 357 fail: false, 358 }); 359 let config = cfg(ws, CancellationToken::new()); 360 let host = KnotHost::parse("http://oyster.cafe").unwrap(); 361 let mut cursor = 99i64; 362 363 let end = run_session(&config, &host, &roster, &mut cursor).await; 364 365 assert!( 366 matches!(end, SessionEnd::Closed { progressed: false }), 367 "a session that delivers no frames before closing reports no progress" 368 ); 369 } 370 371 #[tokio::test] 372 async fn pre_cancelled_stream_returns_without_connecting() { 373 let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default())); 374 let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap(); 375 let roster = Mutex::new(Roster::new( 376 store, 377 knot, 378 Arc::new(KnotRegistry::new()), 379 knot_acl::KnotHostKey::new("oyster.cafe"), 380 )); 381 let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs { 382 msgs: Mutex::new(None), 383 sent: Arc::new(Mutex::new(Vec::new())), 384 fail: true, 385 }); 386 let cancel = CancellationToken::new(); 387 cancel.cancel(); 388 let config = cfg(ws, cancel); 389 let host = KnotHost::parse("http://oyster.cafe").unwrap(); 390 391 run_stream(&config, &host, &roster, 0).await; 392 } 393 394 #[test] 395 fn events_url_carries_scheme_and_cursor() { 396 let host = KnotHost::parse("http://oyster.cafe").unwrap(); 397 assert_eq!( 398 events_url(&host, 0).unwrap().as_str(), 399 "ws://oyster.cafe/events" 400 ); 401 assert_eq!( 402 events_url(&host, 42).unwrap().as_str(), 403 "ws://oyster.cafe/events?cursor=42" 404 ); 405 let secure = KnotHost::parse("https://nel.pet").unwrap(); 406 assert_eq!( 407 events_url(&secure, 7).unwrap().as_str(), 408 "wss://nel.pet/events?cursor=7" 409 ); 410 } 411 412 #[test] 413 fn only_a_long_lived_session_resets_backoff() { 414 assert!(session_was_healthy( 415 &SessionEnd::Closed { progressed: true }, 416 HEALTHY_SESSION_MIN 417 )); 418 assert!( 419 !session_was_healthy( 420 &SessionEnd::Closed { progressed: true }, 421 HEALTHY_SESSION_MIN - Duration::from_millis(1) 422 ), 423 "a knot that flaps faster than the healthy floor must keep backing off" 424 ); 425 assert!(!session_was_healthy( 426 &SessionEnd::Closed { progressed: false }, 427 Duration::from_secs(3600) 428 )); 429 assert!(!session_was_healthy( 430 &SessionEnd::ConnectFailed, 431 Duration::from_secs(3600) 432 )); 433 } 434 435 #[test] 436 fn jitter_delay_stays_within_half_to_full_window() { 437 let base = Duration::from_secs(8); 438 [0u64, 1, 511, 512, 1023, 1024, u64::MAX] 439 .into_iter() 440 .for_each(|entropy| { 441 let delay = jitter_delay(base, entropy); 442 assert!(delay >= base / 2, "delay {delay:?} below half of {base:?}"); 443 assert!(delay <= base, "delay {delay:?} above {base:?}"); 444 }); 445 } 446}