Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use crate::error::ConsumerError;
2use crate::{CachedRecord, Identity, IdentityKey};
3use foyer::HybridCache;
4use jetstream::{
5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
6 events::{CommitOp, Cursor, EventKind},
7};
8use tokio_util::sync::CancellationToken;
9
10pub async fn consume(
11 jetstream_endpoint: String,
12 cursor: Option<Cursor>,
13 no_zstd: bool,
14 identity: Identity,
15 shutdown: CancellationToken,
16 cache: HybridCache<String, CachedRecord>,
17) -> Result<(), ConsumerError> {
18 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint);
19 if endpoint == jetstream_endpoint {
20 log::info!("consumer: connecting jetstream at {endpoint}");
21 } else {
22 log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}");
23 }
24 let config: JetstreamConfig = JetstreamConfig {
25 endpoint,
26 compression: if no_zstd {
27 JetstreamCompression::None
28 } else {
29 JetstreamCompression::Zstd
30 },
31 replay_on_reconnect: true,
32 channel_size: 1024, // buffer up to ~1s of jetstream events
33 ..Default::default()
34 };
35 let mut receiver = JetstreamConnector::new(config)?
36 .connect_cursor(cursor)
37 .await?;
38
39 log::info!("consumer: receiving messages..");
40 loop {
41 if shutdown.is_cancelled() {
42 log::info!("consumer: exiting for shutdown");
43 return Ok(());
44 }
45 let Some(mut event) = receiver.recv().await else {
46 log::error!("consumer: could not receive event, bailing");
47 break;
48 };
49
50 match event.kind {
51 EventKind::Commit => {
52 let Some(ref mut commit) = event.commit else {
53 log::warn!("consumer: commit event missing commit data, ignoring");
54 continue;
55 };
56
57 // TODO: something a bit more robust
58 let at_uri = format!(
59 "at://{}/{}/{}",
60 &*event.did, &*commit.collection, &*commit.rkey
61 );
62
63 if commit.operation == CommitOp::Delete {
64 cache.insert(at_uri, CachedRecord::Deleted);
65 } else {
66 let Some(record) = commit.record.take() else {
67 log::warn!("consumer: commit insert or update missing record, ignoring");
68 continue;
69 };
70 let Some(cid) = commit.cid.take() else {
71 log::warn!("consumer: commit insert or update missing CID, ignoring");
72 continue;
73 };
74
75 cache.insert(at_uri, CachedRecord::Found((cid, record).into()));
76 }
77 }
78 EventKind::Identity => {
79 let Some(ident) = event.identity else {
80 log::warn!("consumer: identity event missing identity data, ignoring");
81 continue;
82 };
83 if let Some(handle) = ident.handle {
84 metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1);
85 identity.queue_refresh(IdentityKey::Handle(handle)).await;
86 }
87 metrics::counter!("identity_did_refresh_queued", "reason" => "identity event")
88 .increment(1);
89 identity.queue_refresh(IdentityKey::Did(ident.did)).await;
90 }
91 EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete)
92 }
93 }
94
95 Err(ConsumerError::JetstreamEnded)
96}