Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1pub mod error; 2pub mod events; 3pub mod exports; 4 5use std::{ 6 io::Cursor as IoCursor, 7 time::{ 8 Duration, 9 Instant, 10 }, 11}; 12 13use futures_util::{ 14 stream::StreamExt, 15 SinkExt, 16}; 17use tokio::{ 18 net::TcpStream, 19 sync::mpsc::{ 20 channel, 21 Receiver, 22 Sender, 23 }, 24}; 25use tokio_tungstenite::{ 26 connect_async, 27 tungstenite::{ 28 client::{ 29 ClientRequestBuilder, 30 IntoClientRequest, 31 }, 32 handshake::client::Request, 33 Message, 34 }, 35 MaybeTlsStream, 36 WebSocketStream, 37}; 38use url::Url; 39use zstd::dict::DecoderDictionary; 40 41use crate::{ 42 error::{ 43 ConfigValidationError, 44 ConnectionError, 45 JetstreamEventError, 46 }, 47 events::{ 48 Cursor, 49 JetstreamEvent, 50 }, 51}; 52 53/// The Jetstream endpoints officially provided by Bluesky themselves. 54/// 55/// There are no guarantees that these endpoints will always be available, but you are free 56/// to run your own Jetstream instance in any case. 57pub enum DefaultJetstreamEndpoints { 58 /// `jetstream1.us-east.bsky.network` 59 USEastOne, 60 /// `jetstream2.us-east.bsky.network` 61 USEastTwo, 62 /// `jetstream1.us-west.bsky.network` 63 USWestOne, 64 /// `jetstream2.us-west.bsky.network` 65 USWestTwo, 66} 67 68impl DefaultJetstreamEndpoints { 69 /// Helper to reference official jetstream instances by shortcut 70 /// 71 /// This function will pass through a jetstream endpoint URL unless it matches a shortcut, 72 /// in which case it will be rewritten to the corresponding bluesky-operated jetstream endpoint 73 /// URL. 74 /// 75 /// The shortcuts available are 76 /// - 'us-east-1' 77 /// - 'us-east-2' 78 /// - 'us-west-1' 79 /// - 'us-west-2' 80 pub fn endpoint_or_shortcut(s: &str) -> String { 81 match s { 82 "us-east-1" => DefaultJetstreamEndpoints::USEastOne.into(), 83 "us-east-2" => DefaultJetstreamEndpoints::USEastTwo.into(), 84 "us-west-1" => DefaultJetstreamEndpoints::USWestOne.into(), 85 "us-west-2" => DefaultJetstreamEndpoints::USWestTwo.into(), 86 custom => custom.into(), 87 } 88 } 89} 90 91impl From<DefaultJetstreamEndpoints> for String { 92 fn from(endpoint: DefaultJetstreamEndpoints) -> Self { 93 match endpoint { 94 DefaultJetstreamEndpoints::USEastOne => { 95 "wss://jetstream1.us-east.bsky.network/subscribe".to_owned() 96 } 97 DefaultJetstreamEndpoints::USEastTwo => { 98 "wss://jetstream2.us-east.bsky.network/subscribe".to_owned() 99 } 100 DefaultJetstreamEndpoints::USWestOne => { 101 "wss://jetstream1.us-west.bsky.network/subscribe".to_owned() 102 } 103 DefaultJetstreamEndpoints::USWestTwo => { 104 "wss://jetstream2.us-west.bsky.network/subscribe".to_owned() 105 } 106 } 107 } 108} 109 110/// The maximum number of wanted collections that can be requested on a single Jetstream connection. 111const MAX_WANTED_COLLECTIONS: usize = 100; 112/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection. 113const MAX_WANTED_DIDS: usize = 10_000; 114 115/// The custom `zstd` dictionary used for decoding compressed Jetstream messages. 116/// 117/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models) 118const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary"); 119 120/// A receiver channel for consuming Jetstream events. 121pub type JetstreamReceiver = Receiver<JetstreamEvent>; 122 123/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s. 124type JetstreamSender = Sender<JetstreamEvent>; 125 126/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to 127/// receive and consume events. See [JetstreamConnector::connect] for more info. 128pub struct JetstreamConnector { 129 /// The configuration for the Jetstream connection. 130 config: JetstreamConfig, 131} 132 133pub enum JetstreamCompression { 134 /// No compression, just raw plaintext JSON. 135 None, 136 /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on 137 /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info. 138 Zstd, 139} 140 141impl From<JetstreamCompression> for bool { 142 fn from(compression: JetstreamCompression) -> Self { 143 match compression { 144 JetstreamCompression::None => false, 145 JetstreamCompression::Zstd => true, 146 } 147 } 148} 149 150impl From<bool> for JetstreamCompression { 151 fn from(compress: bool) -> Self { 152 if compress { 153 JetstreamCompression::Zstd 154 } else { 155 JetstreamCompression::None 156 } 157 } 158} 159 160pub struct JetstreamConfig { 161 /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e. 162 /// `wss://jetstream1.us-east.bsky.network/subscribe`. 163 pub endpoint: String, 164 /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for. 165 /// 166 /// An empty list will receive events for *all* collections. 167 /// 168 /// Regardless of desired collections, all subscribers receive 169 /// [AccountEvent](events::account::AccountEvent) and 170 /// [IdentityEvent](events::identity::Identity) events. 171 pub wanted_collections: Vec<exports::Nsid>, 172 /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for. 173 /// 174 /// An empty list will receive events for *all* repos, which is a lot of events! 175 pub wanted_dids: Vec<exports::Did>, 176 /// The compression algorithm to request and use for the WebSocket connection (if any). 177 pub compression: JetstreamCompression, 178 /// User agent string to include with the jetstream connection request 179 pub user_agent: Option<String>, 180 /// Do not append jetstream client info to user agent string 181 pub omit_user_agent_jetstream_info: bool, 182 /// Enable automatic cursor for auto-reconnect 183 /// 184 /// By default, reconnects will never set a cursor for the connection, so a small number of 185 /// events will always be dropped. 186 /// 187 /// If you want gapless playback across reconnects, set this to `true`. If you always want 188 /// the latest available events and can tolerate missing some: `false`. 189 pub replay_on_reconnect: bool, 190 /// Maximum size of send channel for jetstream events. 191 /// 192 /// If your consuming task can't keep up with every new jetstream event in real-time, 193 /// you might get disconnected from the server as a "slow consumer". Increasing channel_size 194 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory 195 /// usage while events are buffered. 196 pub channel_size: usize, 197} 198 199impl Default for JetstreamConfig { 200 fn default() -> Self { 201 JetstreamConfig { 202 endpoint: DefaultJetstreamEndpoints::USEastOne.into(), 203 wanted_collections: Vec::new(), 204 wanted_dids: Vec::new(), 205 compression: JetstreamCompression::None, 206 user_agent: None, 207 omit_user_agent_jetstream_info: false, 208 replay_on_reconnect: false, 209 channel_size: 4096, // a few seconds of firehose buffer 210 } 211 } 212} 213 214impl JetstreamConfig { 215 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied. 216 pub fn get_request_builder( 217 &self, 218 ) -> impl Fn(Option<Cursor>) -> Result<Request, ConnectionError> { 219 let did_search_query = self 220 .wanted_dids 221 .iter() 222 .map(|s| ("wantedDids", s.to_string())); 223 224 let collection_search_query = self 225 .wanted_collections 226 .iter() 227 .map(|s| ("wantedCollections", s.to_string())); 228 229 let compression = ( 230 "compress", 231 match self.compression { 232 JetstreamCompression::None => "false".to_owned(), 233 JetstreamCompression::Zstd => "true".to_owned(), 234 }, 235 ); 236 237 let base_params = did_search_query 238 .chain(collection_search_query) 239 .chain(std::iter::once(compression)) 240 .collect::<Vec<(&'static str, String)>>(); 241 242 let ua_info: Option<String> = if self.omit_user_agent_jetstream_info { 243 None 244 } else { 245 Some(format!( 246 "v{} via jetstream-oxide (microcosm/links fork)", 247 env!("CARGO_PKG_VERSION") 248 )) 249 }; 250 let maybe_ua = match (&self.user_agent, ua_info) { 251 (Some(ua), Some(info)) => Some(format!("{ua} {info}")), 252 (Some(ua), None) => Some(ua.clone()), 253 (None, Some(info)) => Some(info.clone()), 254 (None, None) => None, 255 }; 256 257 let endpoint = self.endpoint.clone(); 258 move |maybe_cursor: Option<Cursor>| { 259 let mut params = base_params.clone(); 260 if let Some(ref cursor) = maybe_cursor { 261 params.push(("cursor", cursor.to_jetstream())); 262 } 263 let url = Url::parse_with_params(&endpoint, params)?; 264 265 let mut req = ClientRequestBuilder::new(url.as_str().parse()?); 266 if let Some(ua) = &maybe_ua { 267 req = req.with_header("user-agent", ua) 268 }; 269 Ok(req.into_client_request()?) 270 } 271 } 272 273 /// Validates the configuration to make sure it is within the limits of the Jetstream API. 274 /// 275 /// # Constants 276 /// The following constants are used to validate the configuration and should only be changed 277 /// if the Jetstream API has itself changed. 278 /// - [MAX_WANTED_COLLECTIONS] 279 /// - [MAX_WANTED_DIDS] 280 /// 281 /// # Endpoint 282 /// 283 /// The provided `endpoint` is attempted to be parsed so that any errors occur early. 284 pub fn validate(&self) -> Result<(), ConfigValidationError> { 285 let collections = self.wanted_collections.len(); 286 let dids = self.wanted_dids.len(); 287 288 if collections > MAX_WANTED_COLLECTIONS { 289 return Err(ConfigValidationError::TooManyWantedCollections(collections)); 290 } 291 292 if dids > MAX_WANTED_DIDS { 293 return Err(ConfigValidationError::TooManyDids(dids)); 294 } 295 296 let _ = self.endpoint.parse::<Url>()?; 297 298 Ok(()) 299 } 300} 301 302impl JetstreamConnector { 303 /// Create a Jetstream connector with a valid [JetstreamConfig]. 304 /// 305 /// After creation, you can call [connect] to connect to the provided Jetstream instance. 306 pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> { 307 // We validate the configuration here so any issues are caught early. 308 config.validate()?; 309 Ok(JetstreamConnector { config }) 310 } 311 312 /// Connects to a Jetstream instance as defined in the [JetstreamConfig]. 313 /// 314 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances 315 /// of this receiver are dropped, the connection and task are automatically closed. 316 pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> { 317 self.connect_cursor(None).await 318 } 319 320 /// Connects to a Jetstream instance as defined in the [JetstreamConfig] with playback from a 321 /// cursor 322 /// 323 /// A cursor from the future will result in live-tail operation. 324 /// 325 /// The cursor is only used for first successfull connection -- on auto-reconnect it will 326 /// live-tail by default. Set `replay_on_reconnect: true` in the config if you need to 327 /// receive every event, which will keep track of the last-seen cursor and reconnect from 328 /// there. 329 pub async fn connect_cursor( 330 &self, 331 cursor: Option<Cursor>, 332 ) -> Result<JetstreamReceiver, ConnectionError> { 333 // We validate the config again for good measure. Probably not necessary but it can't hurt. 334 self.config 335 .validate() 336 .map_err(ConnectionError::InvalidConfig)?; 337 338 let (send_channel, receive_channel) = channel(self.config.channel_size); 339 let replay_on_reconnect = self.config.replay_on_reconnect; 340 let build_request = self.config.get_request_builder(); 341 342 tokio::task::spawn(async move { 343 // TODO: maybe return the task handle so we can surface any errors 344 let max_retries = 300; 345 let base_delay_ms = 1_000; // 1 second 346 let max_delay_ms = 30_000; // 30 seconds 347 let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long 348 349 let mut retry_attempt = 0; 350 let mut connect_cursor = cursor; 351 loop { 352 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY); 353 354 let req = match build_request(connect_cursor) { 355 Ok(req) => req, 356 Err(e) => { 357 log::error!("Could not build jetstream websocket request: {e:?}"); 358 break; // this is always fatal? no retry. 359 } 360 }; 361 362 let mut last_cursor = connect_cursor; 363 retry_attempt += 1; 364 if let Ok((ws_stream, _)) = connect_async(req).await { 365 let t_connected = Instant::now(); 366 log::info!("jetstream connected. starting websocket task..."); 367 if let Err(e) = 368 websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor) 369 .await 370 { 371 if let JetstreamEventError::ReceiverClosedError = e { 372 log::error!("Jetstream receiver channel closed. Exiting consumer."); 373 return; 374 } 375 log::error!("Jetstream closed after encountering error: {e:?}"); 376 } else { 377 log::warn!("Jetstream connection closed cleanly"); 378 } 379 if t_connected.elapsed() > Duration::from_secs(success_threshold_s) { 380 log::warn!("Jetstream: more than {success_threshold_s}s since last reconnect, reconnecting immediately."); 381 retry_attempt = 0; 382 } 383 } 384 385 if retry_attempt >= max_retries { 386 log::error!("jetstream: hit max retries, bye"); 387 break; 388 } 389 390 connect_cursor = if replay_on_reconnect { 391 last_cursor 392 } else { 393 None 394 }; 395 396 if retry_attempt > 0 { 397 // Exponential backoff 398 let delay = 399 (base_delay_ms * (2_u64.saturating_pow(retry_attempt))).min(max_delay_ms); 400 log::error!("Connection failed, retry #{retry_attempt} in {delay}ms..."); 401 tokio::time::sleep(Duration::from_millis(delay)).await; 402 log::info!("Attempting to reconnect..."); 403 } 404 } 405 log::error!("Connection retries exhausted. Jetstream is disconnected."); 406 }); 407 408 Ok(receive_channel) 409 } 410} 411 412/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any 413/// receivers that are listening for them. 414async fn websocket_task( 415 dictionary: DecoderDictionary<'_>, 416 ws: WebSocketStream<MaybeTlsStream<TcpStream>>, 417 send_channel: JetstreamSender, 418 last_cursor: &mut Option<Cursor>, 419) -> Result<(), JetstreamEventError> { 420 // TODO: Use the write half to allow the user to change configuration settings on the fly. 421 let (mut socket_write, mut socket_read) = ws.split(); 422 423 let mut closing_connection = false; 424 loop { 425 match socket_read.next().await { 426 Some(Ok(message)) => match message { 427 Message::Text(json) => { 428 let event: JetstreamEvent = match serde_json::from_str(&json) { 429 Ok(ev) => ev, 430 Err(e) => { 431 log::warn!( 432 "failed to parse json: {e:?} (from {})", 433 json.get(..24).unwrap_or(&json) 434 ); 435 continue; 436 } 437 }; 438 let event_cursor = event.cursor; 439 440 if let Some(last) = last_cursor { 441 if event_cursor <= *last { 442 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event."); 443 continue; 444 } 445 } 446 447 if send_channel.send(event).await.is_err() { 448 log::warn!( 449 "All receivers for the Jetstream connection have been dropped, closing connection." 450 ); 451 socket_write.close().await?; 452 return Err(JetstreamEventError::ReceiverClosedError); 453 } else if let Some(last) = last_cursor.as_mut() { 454 *last = event_cursor; 455 } 456 } 457 Message::Binary(zstd_json) => { 458 let mut cursor = IoCursor::new(zstd_json); 459 let decoder = 460 zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary) 461 .map_err(JetstreamEventError::CompressionDictionaryError)?; 462 463 let event: JetstreamEvent = match serde_json::from_reader(decoder) { 464 Ok(ev) => ev, 465 Err(e) => { 466 log::warn!("failed to parse json: {e:?}"); 467 continue; 468 } 469 }; 470 let event_cursor = event.cursor; 471 472 if let Some(last) = last_cursor { 473 if event_cursor <= *last { 474 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event."); 475 continue; 476 } 477 } 478 479 if send_channel.send(event).await.is_err() { 480 log::warn!( 481 "All receivers for the Jetstream connection have been dropped, closing connection." 482 ); 483 socket_write.close().await?; 484 return Err(JetstreamEventError::ReceiverClosedError); 485 } else if let Some(last) = last_cursor.as_mut() { 486 *last = event_cursor; 487 } 488 } 489 Message::Ping(vec) => { 490 log::trace!("Ping recieved, responding"); 491 socket_write 492 .send(Message::Pong(vec)) 493 .await 494 .map_err(JetstreamEventError::PingPongError)?; 495 } 496 Message::Close(close_frame) => { 497 log::trace!("Close recieved. I guess we just log here?"); 498 if let Some(close_frame) = close_frame { 499 let reason = close_frame.reason; 500 let code = close_frame.code; 501 log::trace!("Connection closed. Reason: {reason}, Code: {code}"); 502 } 503 } 504 Message::Pong(pong) => { 505 let pong_payload = 506 String::from_utf8(pong.to_vec()).unwrap_or("Invalid payload".to_string()); 507 log::trace!("Pong recieved. Payload: {pong_payload}"); 508 } 509 Message::Frame(_) => (), 510 }, 511 Some(Err(error)) => { 512 log::error!("Web socket error: {error}"); 513 closing_connection = true; 514 } 515 None => { 516 log::error!("No web socket result"); 517 closing_connection = true; 518 } 519 } 520 if closing_connection { 521 log::trace!("closing connection"); 522 _ = socket_write.close().await; 523 return Ok(()); 524 } 525 } 526}