Personal ATProto tools.
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 10 kB View raw
1//! Consume Jetstream events. 2use std::{str::FromStr, sync::Arc}; 3 4use atrium_api::{record::KnownRecord::AppBskyFeedPost, types::string}; 5use jetstream_oxide::{ 6 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 7 events::{JetstreamEvent::Commit, commit::CommitEvent}, 8}; 9use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode}, SqlitePool}; 10use tokio::runtime::Handle; 11 12use crate::{database::negation, types::{Profile, ProfileStats}, webrequest::Agent}; 13/// Consume Jetstream events. 14#[tracing::instrument] 15pub async fn main_jetstream() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 16 let metrics = Handle::current().metrics(); 17 let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.") 18 .journal_mode(SqliteJournalMode::Wal); 19 let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed."); 20 let mut agent: Agent = Agent::default(); 21 let app_bsky_feed_post: string::Nsid = "app.bsky.feed.post".parse().expect("Expected to be able to parse a string, but failed"); 22 let config = JetstreamConfig { 23 endpoint: DefaultJetstreamEndpoints::USEastOne.into(), 24 wanted_collections: vec![app_bsky_feed_post], 25 wanted_dids: vec![], 26 compression: JetstreamCompression::Zstd, 27 cursor: None, 28 }; 29 let jetstream = JetstreamConnector::new(config).expect("Failed to connect to Jetstream"); 30 let receiver = jetstream 31 .connect() 32 .await 33 .expect("Failed to connect to Jetstream"); 34 let language = string::Language::from_str("en").expect("Expected to be able to parse a string, but failed"); 35 36 drop(dotenvy::dotenv().expect("Failed to load .env file")); 37 let negation_post_uri = dotenvy::var("NEGATION_POST_URI").expect("Expected to be able to read a variable, but failed."); 38 automated_removal(&pool, &mut agent, negation_post_uri.as_str()).await?; 39 let addition_post_uri = dotenvy::var("ADDITION_POST_URI").expect("Expected to be able to read a variable, but failed."); 40 manual_addition(&pool, &mut agent, addition_post_uri.as_str()).await?; 41 42 const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(20); 43 let mut last_check = tokio::time::Instant::now(); 44 45 let mut vec_of_profiles_to_check: Vec<String> = vec![]; 46 47 while let Ok(event) = receiver.recv_async().await { 48 if let Commit(CommitEvent::Create { info, commit }) = event { 49 match commit.record { 50 AppBskyFeedPost(record) => { 51 if record.langs == Some(vec![(language).clone()]) { 52 let profile = Profile::new(info.did.clone().as_str()).insert_profile(&pool.clone()).await; 53 match profile 54 { 55 Ok(mut profile) => { 56 if profile.determine_stats_exist(&pool.clone()).await?.is_none() { 57 vec_of_profiles_to_check.push(info.did.clone().to_string()); 58 } else { 59 tracing::debug!("Stats already exist: {:?}", info.did.as_str()); 60 } 61 } 62 Err(_e) => { 63 // tracing::debug!("Duplicate profile: {:?}", info.did.as_str()); 64 } 65 } 66 } 67 } 68 // atrium_api::record::KnownRecord::AppBskyFeedLike(like) => { 69 // match like.subject.uri.as_str() { 70 // NEGATION_POST_URI => { 71 // negation(info.did.as_str(), &mut agent, &pool).await?; 72 // }, 73 // ADDITION_POST_URI => { 74 // let mut profile = Profile::new(info.did.as_str()).insert_profile(&pool).await?; 75 // let mut profile_with_stats = profile.determine_stats_exist(&pool).await?; 76 // if profile_with_stats.is_none() { 77 // profile_with_stats = Some(profile.determine_stats(&mut agent, &pool).await); 78 // } 79 // if let Some(profile_with_stats) = profile_with_stats { 80 // _ = profile_with_stats.determine_label_agnostic(&pool).await; 81 // } 82 // } 83 // _ => {} 84 // } 85 // } 86 _ => {} 87 } 88 } 89 if vec_of_profiles_to_check.len() >= 25 { 90 match_profiles(&agent, &pool, &metrics, &vec_of_profiles_to_check).await?; 91 vec_of_profiles_to_check.clear(); 92 } 93 if last_check.elapsed() > CHECK_INTERVAL { 94 automated_removal(&pool, &mut agent, negation_post_uri.as_str()).await?; 95 manual_addition(&pool, &mut agent, addition_post_uri.as_str()).await?; 96 last_check = tokio::time::Instant::now(); 97 } 98 } 99 tracing::info!("Jetstream event stream ended unexpectedly."); 100 Ok(()) 101} 102 103/// Every so often, we'll check the `app.bsky.feed.getLikes` endpoint for new likes on our provided `uri` for requested negation labels. 104async fn automated_removal( 105 pool: &SqlitePool, 106 agent: &mut Agent, 107 negation_post_uri: &str, 108) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 109 let likes = agent.get_likes(negation_post_uri).await?; 110 for like in likes { 111 let did = like["actor"]["did"].as_str().expect("Expected to be able to parse a string, but failed"); 112 negation(did, agent, pool).await?; 113 } 114 Ok(()) 115} 116async fn manual_addition( 117 pool: &SqlitePool, 118 agent: &mut Agent, 119 addition_post_uri: &str, 120) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 121 let likes = agent.get_likes(addition_post_uri).await?; 122 for like in likes { 123 let did = like["actor"]["did"].as_str().expect("Expected to be able to parse a string, but failed"); 124 let mut profile = Profile::new(did).insert_profile(pool).await?; 125 let mut profile_with_stats = profile.determine_stats_exist(pool).await?; 126 if profile_with_stats.is_none() { 127 profile_with_stats = Some(profile.determine_stats(agent, pool).await); 128 } 129 if let Some(profile_with_stats) = profile_with_stats { 130 _ = profile_with_stats.determine_label_agnostic(pool).await; 131 } 132 } 133 Ok(()) 134} 135// async fn match_profile( 136// agent: &Agent, 137// pool: &SqlitePool, 138// metrics: &tokio::runtime::RuntimeMetrics, 139// did_str: &str, 140// ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 141// let mut profile = Profile::new(did_str); 142// let num_alive_tasks = metrics.num_alive_tasks(); 143// if num_alive_tasks < 50 { 144// let spawned_pool = pool.clone(); 145// let mut spawned_agent = agent.clone(); 146// drop(tokio::spawn(async move { 147// _ = profile.determine_stats(&mut spawned_agent, &spawned_pool).await.determine_label(&spawned_pool).await; 148// })); 149// } else { 150// tracing::warn!("Too many tasks alive: {:?}", num_alive_tasks); 151// } 152// Ok(()) 153// } 154/// Match a vec of profiles 155async fn match_profiles( 156 agent: &Agent, 157 pool: &SqlitePool, 158 metrics: &tokio::runtime::RuntimeMetrics, 159 did_strs: &[String], 160) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 161 let num_alive_tasks = metrics.num_alive_tasks(); 162 if num_alive_tasks < 14 { 163 let spawned_pool = pool.clone(); 164 let mut spawned_agent = Agent { 165 access_jwt: Arc::clone(&agent.access_jwt), 166 refresh_jwt: Arc::clone(&agent.refresh_jwt), 167 client: agent.client.clone(), 168 self_did: agent.self_did.clone(), 169 }; 170 let did_strs = did_strs.to_owned(); 171 drop(tokio::spawn(async move { 172 tracing::info!("Checking profiles: {:?}", did_strs); 173 let profiles_vec = spawned_agent.get_profiles(&did_strs).await.expect("Expected to be able to get profiles, but failed."); 174 let profiles_array = profiles_vec["profiles"].as_array(); 175 if profiles_array.is_none() { 176 tracing::warn!("No profiles json found for profiles: {:?}", profiles_vec); 177 return; 178 } 179 for profile_stats in profiles_array.unwrap_or_else(|| panic!("Expected to be able to read profiles as array, but failed. Profiles: {:?}", profiles_vec)) { 180 let did = profile_stats["did"].as_str().expect("Expected to be able to parse a string, but failed"); 181 let mut profile = Profile::new(did).insert_profile(&spawned_pool).await.expect("Expected to be able to insert a profile, but failed."); 182 let checked_at = chrono::Utc::now(); 183 profile.stats = Some(ProfileStats { 184 follower_count: profile_stats["followersCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32, 185 post_count: profile_stats["postsCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32, 186 created_at: chrono::DateTime::parse_from_rfc3339(profile_stats["createdAt"].as_str().expect("Expected to be able to parse a string, but failed")).expect("Expected to be able to parse a string, but failed").into(), 187 checked_at, 188 }); 189 profile.insert_profile_stats(&spawned_pool).await.expect("Expected to be able to insert profile stats, but failed."); 190 _ = profile.determine_label(&spawned_pool).await; 191 } 192 })); 193 } else { 194 tracing::warn!("Too many tasks alive: {:?}", num_alive_tasks); 195 } 196 Ok(()) 197}