Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1pub mod error;
2pub mod events;
3pub mod exports;
4
5use std::{
6 io::Cursor as IoCursor,
7 time::{
8 Duration,
9 Instant,
10 },
11};
12
13use futures_util::{
14 stream::StreamExt,
15 SinkExt,
16};
17use tokio::{
18 net::TcpStream,
19 sync::mpsc::{
20 channel,
21 Receiver,
22 Sender,
23 },
24};
25use tokio_tungstenite::{
26 connect_async,
27 tungstenite::{
28 client::{
29 ClientRequestBuilder,
30 IntoClientRequest,
31 },
32 handshake::client::Request,
33 Message,
34 },
35 MaybeTlsStream,
36 WebSocketStream,
37};
38use url::Url;
39use zstd::dict::DecoderDictionary;
40
41use crate::{
42 error::{
43 ConfigValidationError,
44 ConnectionError,
45 JetstreamEventError,
46 },
47 events::{
48 Cursor,
49 JetstreamEvent,
50 },
51};
52
53/// The Jetstream endpoints officially provided by Bluesky themselves.
54///
55/// There are no guarantees that these endpoints will always be available, but you are free
56/// to run your own Jetstream instance in any case.
57pub enum DefaultJetstreamEndpoints {
58 /// `jetstream1.us-east.bsky.network`
59 USEastOne,
60 /// `jetstream2.us-east.bsky.network`
61 USEastTwo,
62 /// `jetstream1.us-west.bsky.network`
63 USWestOne,
64 /// `jetstream2.us-west.bsky.network`
65 USWestTwo,
66}
67
68impl DefaultJetstreamEndpoints {
69 /// Helper to reference official jetstream instances by shortcut
70 ///
71 /// This function will pass through a jetstream endpoint URL unless it matches a shortcut,
72 /// in which case it will be rewritten to the corresponding bluesky-operated jetstream endpoint
73 /// URL.
74 ///
75 /// The shortcuts available are
76 /// - 'us-east-1'
77 /// - 'us-east-2'
78 /// - 'us-west-1'
79 /// - 'us-west-2'
80 pub fn endpoint_or_shortcut(s: &str) -> String {
81 match s {
82 "us-east-1" => DefaultJetstreamEndpoints::USEastOne.into(),
83 "us-east-2" => DefaultJetstreamEndpoints::USEastTwo.into(),
84 "us-west-1" => DefaultJetstreamEndpoints::USWestOne.into(),
85 "us-west-2" => DefaultJetstreamEndpoints::USWestTwo.into(),
86 custom => custom.into(),
87 }
88 }
89}
90
91impl From<DefaultJetstreamEndpoints> for String {
92 fn from(endpoint: DefaultJetstreamEndpoints) -> Self {
93 match endpoint {
94 DefaultJetstreamEndpoints::USEastOne => {
95 "wss://jetstream1.us-east.bsky.network/subscribe".to_owned()
96 }
97 DefaultJetstreamEndpoints::USEastTwo => {
98 "wss://jetstream2.us-east.bsky.network/subscribe".to_owned()
99 }
100 DefaultJetstreamEndpoints::USWestOne => {
101 "wss://jetstream1.us-west.bsky.network/subscribe".to_owned()
102 }
103 DefaultJetstreamEndpoints::USWestTwo => {
104 "wss://jetstream2.us-west.bsky.network/subscribe".to_owned()
105 }
106 }
107 }
108}
109
110/// The maximum number of wanted collections that can be requested on a single Jetstream connection.
111const MAX_WANTED_COLLECTIONS: usize = 100;
112/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection.
113const MAX_WANTED_DIDS: usize = 10_000;
114
115/// The custom `zstd` dictionary used for decoding compressed Jetstream messages.
116///
117/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models)
118const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
119
120/// A receiver channel for consuming Jetstream events.
121pub type JetstreamReceiver = Receiver<JetstreamEvent>;
122
123/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
124type JetstreamSender = Sender<JetstreamEvent>;
125
126/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
127/// receive and consume events. See [JetstreamConnector::connect] for more info.
128pub struct JetstreamConnector {
129 /// The configuration for the Jetstream connection.
130 config: JetstreamConfig,
131}
132
133pub enum JetstreamCompression {
134 /// No compression, just raw plaintext JSON.
135 None,
136 /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on
137 /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info.
138 Zstd,
139}
140
141impl From<JetstreamCompression> for bool {
142 fn from(compression: JetstreamCompression) -> Self {
143 match compression {
144 JetstreamCompression::None => false,
145 JetstreamCompression::Zstd => true,
146 }
147 }
148}
149
150impl From<bool> for JetstreamCompression {
151 fn from(compress: bool) -> Self {
152 if compress {
153 JetstreamCompression::Zstd
154 } else {
155 JetstreamCompression::None
156 }
157 }
158}
159
160pub struct JetstreamConfig {
161 /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
162 /// `wss://jetstream1.us-east.bsky.network/subscribe`.
163 pub endpoint: String,
164 /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for.
165 ///
166 /// An empty list will receive events for *all* collections.
167 ///
168 /// Regardless of desired collections, all subscribers receive
169 /// [AccountEvent](events::account::AccountEvent) and
170 /// [IdentityEvent](events::identity::Identity) events.
171 pub wanted_collections: Vec<exports::Nsid>,
172 /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for.
173 ///
174 /// An empty list will receive events for *all* repos, which is a lot of events!
175 pub wanted_dids: Vec<exports::Did>,
176 /// The compression algorithm to request and use for the WebSocket connection (if any).
177 pub compression: JetstreamCompression,
178 /// User agent string to include with the jetstream connection request
179 pub user_agent: Option<String>,
180 /// Do not append jetstream client info to user agent string
181 pub omit_user_agent_jetstream_info: bool,
182 /// Enable automatic cursor for auto-reconnect
183 ///
184 /// By default, reconnects will never set a cursor for the connection, so a small number of
185 /// events will always be dropped.
186 ///
187 /// If you want gapless playback across reconnects, set this to `true`. If you always want
188 /// the latest available events and can tolerate missing some: `false`.
189 pub replay_on_reconnect: bool,
190 /// Maximum size of send channel for jetstream events.
191 ///
192 /// If your consuming task can't keep up with every new jetstream event in real-time,
193 /// you might get disconnected from the server as a "slow consumer". Increasing channel_size
194 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory
195 /// usage while events are buffered.
196 pub channel_size: usize,
197}
198
199impl Default for JetstreamConfig {
200 fn default() -> Self {
201 JetstreamConfig {
202 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
203 wanted_collections: Vec::new(),
204 wanted_dids: Vec::new(),
205 compression: JetstreamCompression::None,
206 user_agent: None,
207 omit_user_agent_jetstream_info: false,
208 replay_on_reconnect: false,
209 channel_size: 4096, // a few seconds of firehose buffer
210 }
211 }
212}
213
214impl JetstreamConfig {
215 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
216 pub fn get_request_builder(
217 &self,
218 ) -> impl Fn(Option<Cursor>) -> Result<Request, ConnectionError> {
219 let did_search_query = self
220 .wanted_dids
221 .iter()
222 .map(|s| ("wantedDids", s.to_string()));
223
224 let collection_search_query = self
225 .wanted_collections
226 .iter()
227 .map(|s| ("wantedCollections", s.to_string()));
228
229 let compression = (
230 "compress",
231 match self.compression {
232 JetstreamCompression::None => "false".to_owned(),
233 JetstreamCompression::Zstd => "true".to_owned(),
234 },
235 );
236
237 let base_params = did_search_query
238 .chain(collection_search_query)
239 .chain(std::iter::once(compression))
240 .collect::<Vec<(&'static str, String)>>();
241
242 let ua_info: Option<String> = if self.omit_user_agent_jetstream_info {
243 None
244 } else {
245 Some(format!(
246 "v{} via jetstream-oxide (microcosm/links fork)",
247 env!("CARGO_PKG_VERSION")
248 ))
249 };
250 let maybe_ua = match (&self.user_agent, ua_info) {
251 (Some(ua), Some(info)) => Some(format!("{ua} {info}")),
252 (Some(ua), None) => Some(ua.clone()),
253 (None, Some(info)) => Some(info.clone()),
254 (None, None) => None,
255 };
256
257 let endpoint = self.endpoint.clone();
258 move |maybe_cursor: Option<Cursor>| {
259 let mut params = base_params.clone();
260 if let Some(ref cursor) = maybe_cursor {
261 params.push(("cursor", cursor.to_jetstream()));
262 }
263 let url = Url::parse_with_params(&endpoint, params)?;
264
265 let mut req = ClientRequestBuilder::new(url.as_str().parse()?);
266 if let Some(ua) = &maybe_ua {
267 req = req.with_header("user-agent", ua)
268 };
269 Ok(req.into_client_request()?)
270 }
271 }
272
273 /// Validates the configuration to make sure it is within the limits of the Jetstream API.
274 ///
275 /// # Constants
276 /// The following constants are used to validate the configuration and should only be changed
277 /// if the Jetstream API has itself changed.
278 /// - [MAX_WANTED_COLLECTIONS]
279 /// - [MAX_WANTED_DIDS]
280 ///
281 /// # Endpoint
282 ///
283 /// The provided `endpoint` is attempted to be parsed so that any errors occur early.
284 pub fn validate(&self) -> Result<(), ConfigValidationError> {
285 let collections = self.wanted_collections.len();
286 let dids = self.wanted_dids.len();
287
288 if collections > MAX_WANTED_COLLECTIONS {
289 return Err(ConfigValidationError::TooManyWantedCollections(collections));
290 }
291
292 if dids > MAX_WANTED_DIDS {
293 return Err(ConfigValidationError::TooManyDids(dids));
294 }
295
296 let _ = self.endpoint.parse::<Url>()?;
297
298 Ok(())
299 }
300}
301
302impl JetstreamConnector {
303 /// Create a Jetstream connector with a valid [JetstreamConfig].
304 ///
305 /// After creation, you can call [connect] to connect to the provided Jetstream instance.
306 pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
307 // We validate the configuration here so any issues are caught early.
308 config.validate()?;
309 Ok(JetstreamConnector { config })
310 }
311
312 /// Connects to a Jetstream instance as defined in the [JetstreamConfig].
313 ///
314 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
315 /// of this receiver are dropped, the connection and task are automatically closed.
316 pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> {
317 self.connect_cursor(None).await
318 }
319
320 /// Connects to a Jetstream instance as defined in the [JetstreamConfig] with playback from a
321 /// cursor
322 ///
323 /// A cursor from the future will result in live-tail operation.
324 ///
325 /// The cursor is only used for first successfull connection -- on auto-reconnect it will
326 /// live-tail by default. Set `replay_on_reconnect: true` in the config if you need to
327 /// receive every event, which will keep track of the last-seen cursor and reconnect from
328 /// there.
329 pub async fn connect_cursor(
330 &self,
331 cursor: Option<Cursor>,
332 ) -> Result<JetstreamReceiver, ConnectionError> {
333 // We validate the config again for good measure. Probably not necessary but it can't hurt.
334 self.config
335 .validate()
336 .map_err(ConnectionError::InvalidConfig)?;
337
338 let (send_channel, receive_channel) = channel(self.config.channel_size);
339 let replay_on_reconnect = self.config.replay_on_reconnect;
340 let build_request = self.config.get_request_builder();
341
342 tokio::task::spawn(async move {
343 // TODO: maybe return the task handle so we can surface any errors
344 let max_retries = 300;
345 let base_delay_ms = 1_000; // 1 second
346 let max_delay_ms = 30_000; // 30 seconds
347 let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long
348
349 let mut retry_attempt = 0;
350 let mut connect_cursor = cursor;
351 loop {
352 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
353
354 let req = match build_request(connect_cursor) {
355 Ok(req) => req,
356 Err(e) => {
357 log::error!("Could not build jetstream websocket request: {e:?}");
358 break; // this is always fatal? no retry.
359 }
360 };
361
362 let mut last_cursor = connect_cursor;
363 retry_attempt += 1;
364 if let Ok((ws_stream, _)) = connect_async(req).await {
365 let t_connected = Instant::now();
366 log::info!("jetstream connected. starting websocket task...");
367 if let Err(e) =
368 websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor)
369 .await
370 {
371 if let JetstreamEventError::ReceiverClosedError = e {
372 log::error!("Jetstream receiver channel closed. Exiting consumer.");
373 return;
374 }
375 log::error!("Jetstream closed after encountering error: {e:?}");
376 } else {
377 log::warn!("Jetstream connection closed cleanly");
378 }
379 if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
380 log::warn!("Jetstream: more than {success_threshold_s}s since last reconnect, reconnecting immediately.");
381 retry_attempt = 0;
382 }
383 }
384
385 if retry_attempt >= max_retries {
386 log::error!("jetstream: hit max retries, bye");
387 break;
388 }
389
390 connect_cursor = if replay_on_reconnect {
391 last_cursor
392 } else {
393 None
394 };
395
396 if retry_attempt > 0 {
397 // Exponential backoff
398 let delay =
399 (base_delay_ms * (2_u64.saturating_pow(retry_attempt))).min(max_delay_ms);
400 log::error!("Connection failed, retry #{retry_attempt} in {delay}ms...");
401 tokio::time::sleep(Duration::from_millis(delay)).await;
402 log::info!("Attempting to reconnect...");
403 }
404 }
405 log::error!("Connection retries exhausted. Jetstream is disconnected.");
406 });
407
408 Ok(receive_channel)
409 }
410}
411
412/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
413/// receivers that are listening for them.
414async fn websocket_task(
415 dictionary: DecoderDictionary<'_>,
416 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
417 send_channel: JetstreamSender,
418 last_cursor: &mut Option<Cursor>,
419) -> Result<(), JetstreamEventError> {
420 // TODO: Use the write half to allow the user to change configuration settings on the fly.
421 let (mut socket_write, mut socket_read) = ws.split();
422
423 let mut closing_connection = false;
424 loop {
425 match socket_read.next().await {
426 Some(Ok(message)) => match message {
427 Message::Text(json) => {
428 let event: JetstreamEvent = match serde_json::from_str(&json) {
429 Ok(ev) => ev,
430 Err(e) => {
431 log::warn!(
432 "failed to parse json: {e:?} (from {})",
433 json.get(..24).unwrap_or(&json)
434 );
435 continue;
436 }
437 };
438 let event_cursor = event.cursor;
439
440 if let Some(last) = last_cursor {
441 if event_cursor <= *last {
442 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
443 continue;
444 }
445 }
446
447 if send_channel.send(event).await.is_err() {
448 log::warn!(
449 "All receivers for the Jetstream connection have been dropped, closing connection."
450 );
451 socket_write.close().await?;
452 return Err(JetstreamEventError::ReceiverClosedError);
453 } else if let Some(last) = last_cursor.as_mut() {
454 *last = event_cursor;
455 }
456 }
457 Message::Binary(zstd_json) => {
458 let mut cursor = IoCursor::new(zstd_json);
459 let decoder =
460 zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
461 .map_err(JetstreamEventError::CompressionDictionaryError)?;
462
463 let event: JetstreamEvent = match serde_json::from_reader(decoder) {
464 Ok(ev) => ev,
465 Err(e) => {
466 log::warn!("failed to parse json: {e:?}");
467 continue;
468 }
469 };
470 let event_cursor = event.cursor;
471
472 if let Some(last) = last_cursor {
473 if event_cursor <= *last {
474 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
475 continue;
476 }
477 }
478
479 if send_channel.send(event).await.is_err() {
480 log::warn!(
481 "All receivers for the Jetstream connection have been dropped, closing connection."
482 );
483 socket_write.close().await?;
484 return Err(JetstreamEventError::ReceiverClosedError);
485 } else if let Some(last) = last_cursor.as_mut() {
486 *last = event_cursor;
487 }
488 }
489 Message::Ping(vec) => {
490 log::trace!("Ping recieved, responding");
491 socket_write
492 .send(Message::Pong(vec))
493 .await
494 .map_err(JetstreamEventError::PingPongError)?;
495 }
496 Message::Close(close_frame) => {
497 log::trace!("Close recieved. I guess we just log here?");
498 if let Some(close_frame) = close_frame {
499 let reason = close_frame.reason;
500 let code = close_frame.code;
501 log::trace!("Connection closed. Reason: {reason}, Code: {code}");
502 }
503 }
504 Message::Pong(pong) => {
505 let pong_payload =
506 String::from_utf8(pong.to_vec()).unwrap_or("Invalid payload".to_string());
507 log::trace!("Pong recieved. Payload: {pong_payload}");
508 }
509 Message::Frame(_) => (),
510 },
511 Some(Err(error)) => {
512 log::error!("Web socket error: {error}");
513 closing_connection = true;
514 }
515 None => {
516 log::error!("No web socket result");
517 closing_connection = true;
518 }
519 }
520 if closing_connection {
521 log::trace!("closing connection");
522 _ = socket_write.close().await;
523 return Ok(());
524 }
525 }
526}