Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: no ping, more retries

fix retry delay reporting, increase max retries

entirely remove the ping loop, it was not doing much

+11 -47
-1
Cargo.lock
··· 1827 1827 "thiserror 2.0.12", 1828 1828 "tokio", 1829 1829 "tokio-tungstenite", 1830 - "tokio-util", 1831 1830 "url", 1832 1831 "zstd", 1833 1832 ]
-1
jetstream/Cargo.toml
··· 27 27 zstd = "0.13.2" 28 28 thiserror = "2.0.3" 29 29 log = "0.4.22" 30 - tokio-util = "0.7.13" 31 30 32 31 [dev-dependencies] 33 32 anyhow = "1.0.93"
+11 -45
jetstream/src/lib.rs
··· 8 8 Read, 9 9 }, 10 10 marker::PhantomData, 11 - sync::Arc, 12 11 time::{ 13 12 Duration, 14 13 Instant, ··· 23 22 use serde::de::DeserializeOwned; 24 23 use tokio::{ 25 24 net::TcpStream, 26 - sync::{ 27 - mpsc::{ 28 - channel, 29 - Receiver, 30 - Sender, 31 - }, 32 - Mutex, 25 + sync::mpsc::{ 26 + channel, 27 + Receiver, 28 + Sender, 33 29 }, 34 30 }; 35 31 use tokio_tungstenite::{ ··· 45 41 MaybeTlsStream, 46 42 WebSocketStream, 47 43 }; 48 - use tokio_util::sync::CancellationToken; 49 44 use url::Url; 50 45 use zstd::dict::DecoderDictionary; 51 46 ··· 360 355 361 356 tokio::task::spawn(async move { 362 357 // TODO: maybe return the task handle so we can surface any errors 363 - let max_retries = 30; 358 + let max_retries = 300; 364 359 let base_delay_ms = 1_000; // 1 second 365 360 let max_delay_ms = 30_000; // 30 seconds 366 361 let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long ··· 409 404 410 405 if retry_attempt > 0 { 411 406 // Exponential backoff 412 - let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt)); 413 - log::error!("Connection failed, retrying in {delay_ms}ms..."); 414 - tokio::time::sleep(Duration::from_millis(delay_ms.min(max_delay_ms))).await; 407 + let delay = (base_delay_ms * (2_u64.pow(retry_attempt))).min(max_delay_ms); 408 + log::error!("Connection failed, retrying in {delay}ms..."); 409 + tokio::time::sleep(Duration::from_millis(delay)).await; 415 410 log::info!("Attempting to reconnect..."); 416 411 } 417 412 } ··· 431 426 last_cursor: &mut Option<Cursor>, 432 427 ) -> Result<(), JetstreamEventError> { 433 428 // TODO: Use the write half to allow the user to change configuration settings on the fly. 434 - let (socket_write, mut socket_read) = ws.split(); 435 - let shared_socket_write = Arc::new(Mutex::new(socket_write)); 436 - 437 - let ping_cancellation_token = CancellationToken::new(); 438 - let mut ping_interval = tokio::time::interval(Duration::from_secs(30)); 439 - let ping_cancelled = ping_cancellation_token.clone(); 440 - let ping_shared_socket_write = shared_socket_write.clone(); 441 - tokio::spawn(async move { 442 - log::trace!("starting ping task"); 443 - loop { 444 - ping_interval.tick().await; 445 - let false = ping_cancelled.is_cancelled() else { 446 - break; 447 - }; 448 - if let Err(error) = ping_shared_socket_write 449 - .lock() 450 - .await 451 - .send(Message::Ping("ping".into())) 452 - .await 453 - { 454 - log::error!("Ping failed: {error}"); 455 - break; 456 - } 457 - } 458 - eprintln!("oh this is bad news."); 459 - }); 429 + let (mut socket_write, mut socket_read) = ws.split(); 460 430 461 431 let mut closing_connection = false; 462 432 loop { ··· 523 493 } 524 494 Message::Ping(vec) => { 525 495 log::trace!("Ping recieved, responding"); 526 - shared_socket_write 527 - .lock() 528 - .await 496 + socket_write 529 497 .send(Message::Pong(vec)) 530 498 .await 531 499 .map_err(JetstreamEventError::PingPongError)?; ··· 548 516 } 549 517 Some(Err(error)) => { 550 518 log::error!("Web socket error: {error}"); 551 - ping_cancellation_token.cancel(); 552 519 closing_connection = true; 553 520 } 554 521 None => { 555 522 log::error!("No web socket result"); 556 - ping_cancellation_token.cancel(); 557 523 closing_connection = true; 558 524 } 559 525 } 560 526 if closing_connection { 561 527 log::trace!("closing connection"); 562 - _ = shared_socket_write.lock().await.close().await; 528 + _ = socket_write.close().await; 563 529 return Ok(()); 564 530 } 565 531 }