Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 3.7 kB View raw
1use crate::error::ConsumerError; 2use crate::{CachedRecord, Identity, IdentityKey}; 3use foyer::HybridCache; 4use jetstream::{ 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 6 events::{CommitOp, Cursor, EventKind}, 7}; 8use tokio_util::sync::CancellationToken; 9 10pub async fn consume( 11 jetstream_endpoint: String, 12 cursor: Option<Cursor>, 13 no_zstd: bool, 14 identity: Identity, 15 shutdown: CancellationToken, 16 cache: HybridCache<String, CachedRecord>, 17) -> Result<(), ConsumerError> { 18 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint); 19 if endpoint == jetstream_endpoint { 20 log::info!("consumer: connecting jetstream at {endpoint}"); 21 } else { 22 log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}"); 23 } 24 let config: JetstreamConfig = JetstreamConfig { 25 endpoint, 26 compression: if no_zstd { 27 JetstreamCompression::None 28 } else { 29 JetstreamCompression::Zstd 30 }, 31 replay_on_reconnect: true, 32 channel_size: 1024, // buffer up to ~1s of jetstream events 33 ..Default::default() 34 }; 35 let mut receiver = JetstreamConnector::new(config)? 36 .connect_cursor(cursor) 37 .await?; 38 39 log::info!("consumer: receiving messages.."); 40 loop { 41 if shutdown.is_cancelled() { 42 log::info!("consumer: exiting for shutdown"); 43 return Ok(()); 44 } 45 let Some(mut event) = receiver.recv().await else { 46 log::error!("consumer: could not receive event, bailing"); 47 break; 48 }; 49 50 match event.kind { 51 EventKind::Commit => { 52 let Some(ref mut commit) = event.commit else { 53 log::warn!("consumer: commit event missing commit data, ignoring"); 54 continue; 55 }; 56 57 // TODO: something a bit more robust 58 let at_uri = format!( 59 "at://{}/{}/{}", 60 &*event.did, &*commit.collection, &*commit.rkey 61 ); 62 63 if commit.operation == CommitOp::Delete { 64 cache.insert(at_uri, CachedRecord::Deleted); 65 } else { 66 let Some(record) = commit.record.take() else { 67 log::warn!("consumer: commit insert or update missing record, ignoring"); 68 continue; 69 }; 70 let Some(cid) = commit.cid.take() else { 71 log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 continue; 73 }; 74 75 cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 } 77 } 78 EventKind::Identity => { 79 let Some(ident) = event.identity else { 80 log::warn!("consumer: identity event missing identity data, ignoring"); 81 continue; 82 }; 83 if let Some(handle) = ident.handle { 84 metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 } 87 metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 .increment(1); 89 identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 } 91 EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 92 } 93 } 94 95 Err(ConsumerError::JetstreamEnded) 96}