Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1//! cached record storage
2
3use crate::{Identity, error::RecordError};
4use atrium_api::types::string::{Cid, Did, Nsid, RecordKey};
5use reqwest::{Client, StatusCode};
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use std::str::FromStr;
9use std::time::Duration;
10use url::Url;
11
12#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14 pub cid: Cid,
15 pub record: String,
16}
17
18// TODO: should be able to do typed CID
19impl From<(Cid, Box<RawValue>)> for RawRecord {
20 fn from((cid, rv): (Cid, Box<RawValue>)) -> Self {
21 Self {
22 cid,
23 record: rv.get().to_string(),
24 }
25 }
26}
27
28/// only for use with stored (validated) values, not general strings
29impl From<&RawRecord> for (Cid, Box<RawValue>) {
30 fn from(RawRecord { cid, record }: &RawRecord) -> Self {
31 (
32 cid.clone(),
33 RawValue::from_string(record.to_string())
34 .expect("stored string from RawValue to be valid"),
35 )
36 }
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40pub enum CachedRecord {
41 Found(RawRecord),
42 Deleted,
43}
44
45impl CachedRecord {
46 pub(crate) fn weight(&self) -> usize {
47 let wrapping = std::mem::size_of::<Self>();
48 let inner = match self {
49 CachedRecord::Found(RawRecord { record, .. }) => std::mem::size_of_val(record.as_str()),
50 _ => 0,
51 };
52 wrapping + inner
53 }
54}
55
56//////// upstream record fetching
57
58#[derive(Deserialize)]
59struct RecordResponseObject {
60 #[allow(dead_code)] // expect it to be there but we ignore it
61 uri: String,
62 /// CID for this exact version of the record
63 ///
64 /// this is optional in the spec and that's potentially TODO for slingshot
65 cid: Option<String>,
66 /// the record itself as JSON
67 value: Box<RawValue>,
68}
69
70#[derive(Debug, Deserialize)]
71pub struct ErrorResponseObject {
72 pub error: String,
73 pub message: String,
74}
75
76#[derive(Clone)]
77pub struct Repo {
78 identity: Identity,
79 client: Client,
80}
81
82impl Repo {
83 pub fn new(identity: Identity) -> Self {
84 let client = Client::builder()
85 .user_agent(format!(
86 "microcosm slingshot v{} (contact: @bad-example.com)",
87 env!("CARGO_PKG_VERSION")
88 ))
89 .no_proxy()
90 .timeout(Duration::from_secs(10))
91 .build()
92 .unwrap();
93 Repo { identity, client }
94 }
95
96 pub async fn get_record(
97 &self,
98 did: &Did,
99 collection: &Nsid,
100 rkey: &RecordKey,
101 cid: &Option<Cid>,
102 ) -> Result<CachedRecord, RecordError> {
103 let Some(pds) = self.identity.did_to_pds(did.clone()).await? else {
104 return Err(RecordError::NotFound("could not get pds for DID"));
105 };
106
107 // cid gets set to None for a retry, if it's Some and we got NotFound
108 let mut cid = cid;
109
110 let res = loop {
111 // TODO: throttle outgoing requests by host probably, generally guard against outgoing requests
112 let mut params = vec![
113 ("repo", did.to_string()),
114 ("collection", collection.to_string()),
115 ("rkey", rkey.to_string()),
116 ];
117 if let Some(cid) = cid {
118 params.push(("cid", cid.as_ref().to_string()));
119 }
120 let mut url = Url::parse_with_params(&pds, ¶ms)?;
121 url.set_path("/xrpc/com.atproto.repo.getRecord");
122
123 let res = self
124 .client
125 .get(url.clone())
126 .send()
127 .await
128 .map_err(RecordError::SendError)?;
129
130 if res.status() == StatusCode::BAD_REQUEST {
131 // 1. if we're not able to parse json, it's not something we can handle
132 let err = res
133 .json::<ErrorResponseObject>()
134 .await
135 .map_err(RecordError::UpstreamBadBadNotGoodRequest)?;
136 // 2. if we are, is it a NotFound? and if so, did we try with a CID?
137 // if so, retry with no CID (api handler will reject for mismatch but
138 // with a nice error + warm cache)
139 if err.error == "NotFound" && cid.is_some() {
140 cid = &None;
141 continue;
142 } else {
143 return Err(RecordError::UpstreamBadRequest(err));
144 }
145 }
146 break res;
147 };
148
149 let data = res
150 .error_for_status()
151 .map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found)
152 .json::<RecordResponseObject>()
153 .await
154 .map_err(RecordError::ParseJsonError)?; // todo...
155
156 let Some(cid) = data.cid else {
157 return Err(RecordError::MissingUpstreamCid);
158 };
159 let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?;
160
161 Ok(CachedRecord::Found(RawRecord {
162 cid,
163 record: data.value.to_string(),
164 }))
165 }
166}