Personal ATProto tools.
1//! Consume Jetstream events.
2use std::{str::FromStr, sync::Arc};
3
4use atrium_api::{record::KnownRecord::AppBskyFeedPost, types::string};
5use jetstream_oxide::{
6 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
7 events::{JetstreamEvent::Commit, commit::CommitEvent},
8};
9use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode}, SqlitePool};
10use tokio::runtime::Handle;
11
12use crate::{database::negation, types::{Profile, ProfileStats}, webrequest::Agent};
13/// Consume Jetstream events.
14#[tracing::instrument]
15pub async fn main_jetstream() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
16 let metrics = Handle::current().metrics();
17 let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.")
18 .journal_mode(SqliteJournalMode::Wal);
19 let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed.");
20 let mut agent: Agent = Agent::default();
21 let app_bsky_feed_post: string::Nsid = "app.bsky.feed.post".parse().expect("Expected to be able to parse a string, but failed");
22 let config = JetstreamConfig {
23 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
24 wanted_collections: vec![app_bsky_feed_post],
25 wanted_dids: vec![],
26 compression: JetstreamCompression::Zstd,
27 cursor: None,
28 };
29 let jetstream = JetstreamConnector::new(config).expect("Failed to connect to Jetstream");
30 let receiver = jetstream
31 .connect()
32 .await
33 .expect("Failed to connect to Jetstream");
34 let language = string::Language::from_str("en").expect("Expected to be able to parse a string, but failed");
35
36 drop(dotenvy::dotenv().expect("Failed to load .env file"));
37 let negation_post_uri = dotenvy::var("NEGATION_POST_URI").expect("Expected to be able to read a variable, but failed.");
38 automated_removal(&pool, &mut agent, negation_post_uri.as_str()).await?;
39 let addition_post_uri = dotenvy::var("ADDITION_POST_URI").expect("Expected to be able to read a variable, but failed.");
40 manual_addition(&pool, &mut agent, addition_post_uri.as_str()).await?;
41
42 const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(20);
43 let mut last_check = tokio::time::Instant::now();
44
45 let mut vec_of_profiles_to_check: Vec<String> = vec![];
46
47 while let Ok(event) = receiver.recv_async().await {
48 if let Commit(CommitEvent::Create { info, commit }) = event {
49 match commit.record {
50 AppBskyFeedPost(record) => {
51 if record.langs == Some(vec![(language).clone()]) {
52 let profile = Profile::new(info.did.clone().as_str()).insert_profile(&pool.clone()).await;
53 match profile
54 {
55 Ok(mut profile) => {
56 if profile.determine_stats_exist(&pool.clone()).await?.is_none() {
57 vec_of_profiles_to_check.push(info.did.clone().to_string());
58 } else {
59 tracing::debug!("Stats already exist: {:?}", info.did.as_str());
60 }
61 }
62 Err(_e) => {
63 // tracing::debug!("Duplicate profile: {:?}", info.did.as_str());
64 }
65 }
66 }
67 }
68 // atrium_api::record::KnownRecord::AppBskyFeedLike(like) => {
69 // match like.subject.uri.as_str() {
70 // NEGATION_POST_URI => {
71 // negation(info.did.as_str(), &mut agent, &pool).await?;
72 // },
73 // ADDITION_POST_URI => {
74 // let mut profile = Profile::new(info.did.as_str()).insert_profile(&pool).await?;
75 // let mut profile_with_stats = profile.determine_stats_exist(&pool).await?;
76 // if profile_with_stats.is_none() {
77 // profile_with_stats = Some(profile.determine_stats(&mut agent, &pool).await);
78 // }
79 // if let Some(profile_with_stats) = profile_with_stats {
80 // _ = profile_with_stats.determine_label_agnostic(&pool).await;
81 // }
82 // }
83 // _ => {}
84 // }
85 // }
86 _ => {}
87 }
88 }
89 if vec_of_profiles_to_check.len() >= 25 {
90 match_profiles(&agent, &pool, &metrics, &vec_of_profiles_to_check).await?;
91 vec_of_profiles_to_check.clear();
92 }
93 if last_check.elapsed() > CHECK_INTERVAL {
94 automated_removal(&pool, &mut agent, negation_post_uri.as_str()).await?;
95 manual_addition(&pool, &mut agent, addition_post_uri.as_str()).await?;
96 last_check = tokio::time::Instant::now();
97 }
98 }
99 tracing::info!("Jetstream event stream ended unexpectedly.");
100 Ok(())
101}
102
103/// Every so often, we'll check the `app.bsky.feed.getLikes` endpoint for new likes on our provided `uri` for requested negation labels.
104async fn automated_removal(
105 pool: &SqlitePool,
106 agent: &mut Agent,
107 negation_post_uri: &str,
108) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
109 let likes = agent.get_likes(negation_post_uri).await?;
110 for like in likes {
111 let did = like["actor"]["did"].as_str().expect("Expected to be able to parse a string, but failed");
112 negation(did, agent, pool).await?;
113 }
114 Ok(())
115}
116async fn manual_addition(
117 pool: &SqlitePool,
118 agent: &mut Agent,
119 addition_post_uri: &str,
120) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
121 let likes = agent.get_likes(addition_post_uri).await?;
122 for like in likes {
123 let did = like["actor"]["did"].as_str().expect("Expected to be able to parse a string, but failed");
124 let mut profile = Profile::new(did).insert_profile(pool).await?;
125 let mut profile_with_stats = profile.determine_stats_exist(pool).await?;
126 if profile_with_stats.is_none() {
127 profile_with_stats = Some(profile.determine_stats(agent, pool).await);
128 }
129 if let Some(profile_with_stats) = profile_with_stats {
130 _ = profile_with_stats.determine_label_agnostic(pool).await;
131 }
132 }
133 Ok(())
134}
135// async fn match_profile(
136// agent: &Agent,
137// pool: &SqlitePool,
138// metrics: &tokio::runtime::RuntimeMetrics,
139// did_str: &str,
140// ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
141// let mut profile = Profile::new(did_str);
142// let num_alive_tasks = metrics.num_alive_tasks();
143// if num_alive_tasks < 50 {
144// let spawned_pool = pool.clone();
145// let mut spawned_agent = agent.clone();
146// drop(tokio::spawn(async move {
147// _ = profile.determine_stats(&mut spawned_agent, &spawned_pool).await.determine_label(&spawned_pool).await;
148// }));
149// } else {
150// tracing::warn!("Too many tasks alive: {:?}", num_alive_tasks);
151// }
152// Ok(())
153// }
154/// Match a vec of profiles
155async fn match_profiles(
156 agent: &Agent,
157 pool: &SqlitePool,
158 metrics: &tokio::runtime::RuntimeMetrics,
159 did_strs: &[String],
160) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
161 let num_alive_tasks = metrics.num_alive_tasks();
162 if num_alive_tasks < 14 {
163 let spawned_pool = pool.clone();
164 let mut spawned_agent = Agent {
165 access_jwt: Arc::clone(&agent.access_jwt),
166 refresh_jwt: Arc::clone(&agent.refresh_jwt),
167 client: agent.client.clone(),
168 self_did: agent.self_did.clone(),
169 };
170 let did_strs = did_strs.to_owned();
171 drop(tokio::spawn(async move {
172 tracing::info!("Checking profiles: {:?}", did_strs);
173 let profiles_vec = spawned_agent.get_profiles(&did_strs).await.expect("Expected to be able to get profiles, but failed.");
174 let profiles_array = profiles_vec["profiles"].as_array();
175 if profiles_array.is_none() {
176 tracing::warn!("No profiles json found for profiles: {:?}", profiles_vec);
177 return;
178 }
179 for profile_stats in profiles_array.unwrap_or_else(|| panic!("Expected to be able to read profiles as array, but failed. Profiles: {:?}", profiles_vec)) {
180 let did = profile_stats["did"].as_str().expect("Expected to be able to parse a string, but failed");
181 let mut profile = Profile::new(did).insert_profile(&spawned_pool).await.expect("Expected to be able to insert a profile, but failed.");
182 let checked_at = chrono::Utc::now();
183 profile.stats = Some(ProfileStats {
184 follower_count: profile_stats["followersCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32,
185 post_count: profile_stats["postsCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32,
186 created_at: chrono::DateTime::parse_from_rfc3339(profile_stats["createdAt"].as_str().expect("Expected to be able to parse a string, but failed")).expect("Expected to be able to parse a string, but failed").into(),
187 checked_at,
188 });
189 profile.insert_profile_stats(&spawned_pool).await.expect("Expected to be able to insert profile stats, but failed.");
190 _ = profile.determine_label(&spawned_pool).await;
191 }
192 }));
193 } else {
194 tracing::warn!("Too many tasks alive: {:?}", num_alive_tasks);
195 }
196 Ok(())
197}