···5757This listens for posts that *I personally make*. You can substitute your own DID and make a few test posts yourself if
5858you'd
5959like of course!
6060+6161+6262+## Running `rustfmt` (requires nightly)
6363+6464+```bash
6565+# get nightly set up
6666+rustup toolchain install nightly --allow-downgrade -c rustfmt
6767+# run the nightly version of fmt
6868+cargo +nightly fmt
6969+```
+11-8
jetstream/examples/arbitrary_record.rs
···11-//! An example of how to listen for create/delete events on a specific DID and potentialy unknown NSID
11+//! An example of how to listen for create/delete events on a specific DID and potentialy unknown
22+//! NSID
2334use atrium_api::types::string;
45use clap::Parser;
56use jetstream::{
66- events::{commit::CommitEvent, JetstreamEvent::Commit},
77- DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
77+ events::{
88+ commit::CommitEvent,
99+ JetstreamEvent::Commit,
1010+ },
1111+ DefaultJetstreamEndpoints,
1212+ JetstreamCompression,
1313+ JetstreamConfig,
1414+ JetstreamConnector,
815};
9161017#[derive(Parser, Debug)]
···3441 let jetstream: JetstreamConnector<serde_json::Value> = JetstreamConnector::new(config)?;
3542 let receiver = jetstream.connect().await?;
36433737- println!(
3838- "Listening for '{}' events on DIDs: {:?}",
3939- &*args.nsid,
4040- dids,
4141- );
4444+ println!("Listening for '{}' events on DIDs: {:?}", &*args.nsid, dids);
42454346 while let Ok(event) = receiver.recv_async().await {
4447 if let Commit(CommitEvent::Create { commit, .. }) = event {
+13-8
jetstream/examples/basic.rs
···11//! A very basic example of how to listen for create/delete events on a specific DID and NSID.
2233-use atrium_api::{record::KnownRecord::AppBskyFeedPost, types::string};
33+use atrium_api::{
44+ record::KnownRecord::AppBskyFeedPost,
55+ types::string,
66+};
47use clap::Parser;
58use jetstream::{
66- events::{commit::CommitEvent, JetstreamEvent::Commit},
77- DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
99+ events::{
1010+ commit::CommitEvent,
1111+ JetstreamEvent::Commit,
1212+ },
1313+ DefaultJetstreamEndpoints,
1414+ JetstreamCompression,
1515+ JetstreamConfig,
1616+ JetstreamConnector,
817};
9181019#[derive(Parser, Debug)]
···3443 let jetstream = JetstreamConnector::new(config)?;
3544 let receiver = jetstream.connect().await?;
36453737- println!(
3838- "Listening for '{}' events on DIDs: {:?}",
3939- &*args.nsid,
4040- dids,
4141- );
4646+ println!("Listening for '{}' events on DIDs: {:?}", &*args.nsid, dids);
42474348 while let Ok(event) = receiver.recv_async().await {
4449 if let Commit(commit) = event {
+32-10
jetstream/src/lib.rs
···33pub mod exports;
4455use std::{
66- io::{Cursor, Read},
66+ io::{
77+ Cursor,
88+ Read,
99+ },
710 marker::PhantomData,
811 sync::Arc,
99- time::{Instant, Duration},
1212+ time::{
1313+ Duration,
1414+ Instant,
1515+ },
1016};
1717+1118use atrium_api::record::KnownRecord;
1219use chrono::Utc;
1313-use futures_util::{stream::StreamExt, SinkExt};
2020+use futures_util::{
2121+ stream::StreamExt,
2222+ SinkExt,
2323+};
1424use serde::de::DeserializeOwned;
1515-use tokio::{net::TcpStream, sync::Mutex};
1616-use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
2525+use tokio::{
2626+ net::TcpStream,
2727+ sync::Mutex,
2828+};
2929+use tokio_tungstenite::{
3030+ connect_async,
3131+ tungstenite::Message,
3232+ MaybeTlsStream,
3333+ WebSocketStream,
3434+};
1735use tokio_util::sync::CancellationToken;
1836use url::Url;
1937use zstd::dict::DecoderDictionary;
20382139use crate::{
2222- error::{ConfigValidationError, ConnectionError, JetstreamEventError},
4040+ error::{
4141+ ConfigValidationError,
4242+ ConnectionError,
4343+ JetstreamEventError,
4444+ },
2345 events::JetstreamEvent,
2446};
2547···318340 .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
319341320342 if send_channel.send(event).is_err() {
321321- // We can assume that all receivers have been dropped, so we can close the
322322- // connection and exit the task.
343343+ // We can assume that all receivers have been dropped, so we can close
344344+ // the connection and exit the task.
323345 log::info!(
324346 "All receivers for the Jetstream connection have been dropped, closing connection."
325347 );
···343365 .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
344366345367 if send_channel.send(event).is_err() {
346346- // We can assume that all receivers have been dropped, so we can close the
347347- // connection and exit the task.
368368+ // We can assume that all receivers have been dropped, so we can close
369369+ // the connection and exit the task.
348370 log::info!(
349371 "All receivers for the Jetstream connection have been dropped, closing connection..."
350372 );