Undisclosed project number 1234
1use bytes::Bytes;
2use jacquard_common::session::SessionStore;
3use jacquard_lexicon::schema::LexiconSchema;
4use jacquard_lexicon::validation::ConstraintError;
5use reqwest::StatusCode;
6use serde::{Serialize, de::DeserializeOwned};
7use superjam_core::{Did, Nsid, XrpcErrorBody};
8use superjam_oauth::dpop::{DpopBinding, HttpMethod};
9use superjam_oauth::session::DpopNonce;
10use superjam_oauth::{RefreshLock, Session};
11use url::Url;
12
13use crate::error::{Error, Result};
14use crate::newtype::{APPLICATION_JSON, MimeType};
15
16#[derive(Debug)]
17pub struct Pds<S>
18where
19 S: SessionStore<Did, Session> + Clone + Send + Sync + 'static,
20{
21 http: reqwest::Client,
22 session: Session,
23 store: S,
24}
25
26impl<S> Pds<S>
27where
28 S: SessionStore<Did, Session> + Clone + Send + Sync + 'static,
29{
30 pub fn new(session: Session, store: S, http: reqwest::Client) -> Self {
31 Self {
32 session,
33 store,
34 http,
35 }
36 }
37
38 pub fn session(&self) -> &Session {
39 &self.session
40 }
41
42 pub fn did(&self) -> &Did {
43 &self.session.did
44 }
45
46 fn endpoint(&self) -> &Url {
47 self.session.pds_url.as_url()
48 }
49
50 pub(crate) async fn ensure_fresh(&mut self) -> Result<()> {
51 if self.session.is_fresh() {
52 return Ok(());
53 }
54 self.refresh_with_persist().await
55 }
56
57 async fn refresh_with_persist(&mut self) -> Result<()> {
58 let _lock = RefreshLock::acquire(&self.session.did)
59 .await
60 .map_err(Error::RefreshLock)?;
61 if let Some(stored) = self.store.get(&self.session.did).await
62 && stored.is_fresh()
63 {
64 self.session = stored;
65 return Ok(());
66 }
67 self.session.refresh(&self.http).await?;
68 self.store
69 .set(self.session.did.clone(), self.session.clone())
70 .await?;
71 Ok(())
72 }
73
74 pub(crate) fn xrpc_url(&self, nsid: &Nsid) -> Result<Url> {
75 let base = self.endpoint().as_str().trim_end_matches('/');
76 Ok(Url::parse(&format!("{base}/xrpc/{}", nsid.as_str()))?)
77 }
78
79 pub(crate) fn validate_record<R: LexiconSchema>(&self, record: &R) -> Result<()> {
80 record
81 .validate()
82 .map_err(|e: ConstraintError| Error::LexiconValidation(e.to_string()))
83 }
84
85 pub(crate) async fn post_json<B, O>(&mut self, url: Url, body: &B) -> Result<O>
86 where
87 B: Serialize + ?Sized,
88 O: DeserializeOwned,
89 {
90 let raw = Bytes::from(serde_json::to_vec(body)?);
91 let resp = self.send_post(url, raw, &APPLICATION_JSON).await?;
92 Ok(serde_json::from_slice(&resp)?)
93 }
94
95 pub(crate) async fn post_bytes<O>(
96 &mut self,
97 url: Url,
98 body: Bytes,
99 content_type: &MimeType,
100 ) -> Result<O>
101 where
102 O: DeserializeOwned,
103 {
104 let resp = self.send_post(url, body, content_type).await?;
105 Ok(serde_json::from_slice(&resp)?)
106 }
107
108 async fn send_post(&mut self, url: Url, body: Bytes, content_type: &MimeType) -> Result<Bytes> {
109 self.ensure_fresh().await?;
110 let mut nonce_retried = false;
111 let mut token_refreshed = false;
112 loop {
113 let proof =
114 self.session
115 .dpop_proof_for(HttpMethod::Post, &url, DpopBinding::Resource)?;
116 let resp = self
117 .http
118 .post(url.clone())
119 .header(
120 "Authorization",
121 format!("DPoP {}", self.session.access_token.as_str()),
122 )
123 .header("DPoP", proof.as_str())
124 .header("Content-Type", content_type.as_str())
125 .body(body.clone())
126 .send()
127 .await?;
128
129 let received_nonce = resp
130 .headers()
131 .get("DPoP-Nonce")
132 .and_then(|h| h.to_str().ok())
133 .map(DpopNonce::new);
134 if let Some(n) = received_nonce {
135 self.session.dpop_nonces.set(&url, n);
136 }
137
138 let status = resp.status();
139 if status.is_success() {
140 return Ok(resp.bytes().await?);
141 }
142
143 let body_bytes = resp.bytes().await.unwrap_or_default();
144 let parsed: core::result::Result<XrpcErrorBody, _> =
145 serde_json::from_slice(&body_bytes);
146 let code = parsed.as_ref().map(|b| b.error.as_str()).unwrap_or("");
147
148 if code == "use_dpop_nonce" {
149 if self.session.dpop_nonces.get(&url).is_none() {
150 return Err(Error::DpopNonceMissing);
151 }
152 if !nonce_retried {
153 nonce_retried = true;
154 continue;
155 }
156 return Err(Error::DpopNonceLoop);
157 }
158
159 if status == StatusCode::UNAUTHORIZED
160 && matches!(code, "ExpiredToken" | "invalid_token" | "InvalidToken")
161 && !token_refreshed
162 {
163 token_refreshed = true;
164 self.refresh_with_persist().await?;
165 nonce_retried = false;
166 continue;
167 }
168
169 return Err(match parsed {
170 Ok(b) => Error::Xrpc {
171 code: b.error,
172 message: b.message,
173 },
174 Err(_) => Error::XrpcUnparsed {
175 status,
176 body: String::from_utf8_lossy(&body_bytes).into_owned(),
177 },
178 });
179 }
180 }
181}