Undisclosed project number 1234
0

Configure Feed

Select the types of activity you want to include in your feed.

1use bytes::Bytes; 2use jacquard_common::session::SessionStore; 3use jacquard_lexicon::schema::LexiconSchema; 4use jacquard_lexicon::validation::ConstraintError; 5use reqwest::StatusCode; 6use serde::{Serialize, de::DeserializeOwned}; 7use superjam_core::{Did, Nsid, XrpcErrorBody}; 8use superjam_oauth::dpop::{DpopBinding, HttpMethod}; 9use superjam_oauth::session::DpopNonce; 10use superjam_oauth::{RefreshLock, Session}; 11use url::Url; 12 13use crate::error::{Error, Result}; 14use crate::newtype::{APPLICATION_JSON, MimeType}; 15 16#[derive(Debug)] 17pub struct Pds<S> 18where 19 S: SessionStore<Did, Session> + Clone + Send + Sync + 'static, 20{ 21 http: reqwest::Client, 22 session: Session, 23 store: S, 24} 25 26impl<S> Pds<S> 27where 28 S: SessionStore<Did, Session> + Clone + Send + Sync + 'static, 29{ 30 pub fn new(session: Session, store: S, http: reqwest::Client) -> Self { 31 Self { 32 session, 33 store, 34 http, 35 } 36 } 37 38 pub fn session(&self) -> &Session { 39 &self.session 40 } 41 42 pub fn did(&self) -> &Did { 43 &self.session.did 44 } 45 46 fn endpoint(&self) -> &Url { 47 self.session.pds_url.as_url() 48 } 49 50 pub(crate) async fn ensure_fresh(&mut self) -> Result<()> { 51 if self.session.is_fresh() { 52 return Ok(()); 53 } 54 self.refresh_with_persist().await 55 } 56 57 async fn refresh_with_persist(&mut self) -> Result<()> { 58 let _lock = RefreshLock::acquire(&self.session.did) 59 .await 60 .map_err(Error::RefreshLock)?; 61 if let Some(stored) = self.store.get(&self.session.did).await 62 && stored.is_fresh() 63 { 64 self.session = stored; 65 return Ok(()); 66 } 67 self.session.refresh(&self.http).await?; 68 self.store 69 .set(self.session.did.clone(), self.session.clone()) 70 .await?; 71 Ok(()) 72 } 73 74 pub(crate) fn xrpc_url(&self, nsid: &Nsid) -> Result<Url> { 75 let base = self.endpoint().as_str().trim_end_matches('/'); 76 Ok(Url::parse(&format!("{base}/xrpc/{}", nsid.as_str()))?) 77 } 78 79 pub(crate) fn validate_record<R: LexiconSchema>(&self, record: &R) -> Result<()> { 80 record 81 .validate() 82 .map_err(|e: ConstraintError| Error::LexiconValidation(e.to_string())) 83 } 84 85 pub(crate) async fn post_json<B, O>(&mut self, url: Url, body: &B) -> Result<O> 86 where 87 B: Serialize + ?Sized, 88 O: DeserializeOwned, 89 { 90 let raw = Bytes::from(serde_json::to_vec(body)?); 91 let resp = self.send_post(url, raw, &APPLICATION_JSON).await?; 92 Ok(serde_json::from_slice(&resp)?) 93 } 94 95 pub(crate) async fn post_bytes<O>( 96 &mut self, 97 url: Url, 98 body: Bytes, 99 content_type: &MimeType, 100 ) -> Result<O> 101 where 102 O: DeserializeOwned, 103 { 104 let resp = self.send_post(url, body, content_type).await?; 105 Ok(serde_json::from_slice(&resp)?) 106 } 107 108 async fn send_post(&mut self, url: Url, body: Bytes, content_type: &MimeType) -> Result<Bytes> { 109 self.ensure_fresh().await?; 110 let mut nonce_retried = false; 111 let mut token_refreshed = false; 112 loop { 113 let proof = 114 self.session 115 .dpop_proof_for(HttpMethod::Post, &url, DpopBinding::Resource)?; 116 let resp = self 117 .http 118 .post(url.clone()) 119 .header( 120 "Authorization", 121 format!("DPoP {}", self.session.access_token.as_str()), 122 ) 123 .header("DPoP", proof.as_str()) 124 .header("Content-Type", content_type.as_str()) 125 .body(body.clone()) 126 .send() 127 .await?; 128 129 let received_nonce = resp 130 .headers() 131 .get("DPoP-Nonce") 132 .and_then(|h| h.to_str().ok()) 133 .map(DpopNonce::new); 134 if let Some(n) = received_nonce { 135 self.session.dpop_nonces.set(&url, n); 136 } 137 138 let status = resp.status(); 139 if status.is_success() { 140 return Ok(resp.bytes().await?); 141 } 142 143 let body_bytes = resp.bytes().await.unwrap_or_default(); 144 let parsed: core::result::Result<XrpcErrorBody, _> = 145 serde_json::from_slice(&body_bytes); 146 let code = parsed.as_ref().map(|b| b.error.as_str()).unwrap_or(""); 147 148 if code == "use_dpop_nonce" { 149 if self.session.dpop_nonces.get(&url).is_none() { 150 return Err(Error::DpopNonceMissing); 151 } 152 if !nonce_retried { 153 nonce_retried = true; 154 continue; 155 } 156 return Err(Error::DpopNonceLoop); 157 } 158 159 if status == StatusCode::UNAUTHORIZED 160 && matches!(code, "ExpiredToken" | "invalid_token" | "InvalidToken") 161 && !token_refreshed 162 { 163 token_refreshed = true; 164 self.refresh_with_persist().await?; 165 nonce_retried = false; 166 continue; 167 } 168 169 return Err(match parsed { 170 Ok(b) => Error::Xrpc { 171 code: b.error, 172 message: b.message, 173 }, 174 Err(_) => Error::XrpcUnparsed { 175 status, 176 body: String::from_utf8_lossy(&body_bytes).into_owned(), 177 }, 178 }); 179 } 180 } 181}