Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1//! cached record storage 2 3use crate::{Identity, error::RecordError}; 4use atrium_api::types::string::{Cid, Did, Nsid, RecordKey}; 5use reqwest::{Client, StatusCode}; 6use serde::{Deserialize, Serialize}; 7use serde_json::value::RawValue; 8use std::str::FromStr; 9use std::time::Duration; 10use url::Url; 11 12#[derive(Debug, Serialize, Deserialize)] 13pub struct RawRecord { 14 cid: Cid, 15 record: String, 16} 17 18// TODO: should be able to do typed CID 19impl From<(Cid, Box<RawValue>)> for RawRecord { 20 fn from((cid, rv): (Cid, Box<RawValue>)) -> Self { 21 Self { 22 cid, 23 record: rv.get().to_string(), 24 } 25 } 26} 27 28/// only for use with stored (validated) values, not general strings 29impl From<&RawRecord> for (Cid, Box<RawValue>) { 30 fn from(RawRecord { cid, record }: &RawRecord) -> Self { 31 ( 32 cid.clone(), 33 RawValue::from_string(record.to_string()) 34 .expect("stored string from RawValue to be valid"), 35 ) 36 } 37} 38 39#[derive(Debug, Serialize, Deserialize)] 40pub enum CachedRecord { 41 Found(RawRecord), 42 Deleted, 43} 44 45impl CachedRecord { 46 pub(crate) fn weight(&self) -> usize { 47 let wrapping = std::mem::size_of::<Self>(); 48 let inner = match self { 49 CachedRecord::Found(RawRecord { record, .. }) => std::mem::size_of_val(record.as_str()), 50 _ => 0, 51 }; 52 wrapping + inner 53 } 54} 55 56//////// upstream record fetching 57 58#[derive(Deserialize)] 59struct RecordResponseObject { 60 #[allow(dead_code)] // expect it to be there but we ignore it 61 uri: String, 62 /// CID for this exact version of the record 63 /// 64 /// this is optional in the spec and that's potentially TODO for slingshot 65 cid: Option<String>, 66 /// the record itself as JSON 67 value: Box<RawValue>, 68} 69 70#[derive(Debug, Deserialize)] 71pub struct ErrorResponseObject { 72 pub error: String, 73 pub message: String, 74} 75 76#[derive(Clone)] 77pub struct Repo { 78 identity: Identity, 79 client: Client, 80} 81 82impl Repo { 83 pub fn new(identity: Identity) -> Self { 84 let client = Client::builder() 85 .user_agent(format!( 86 "microcosm slingshot v{} (contact: @bad-example.com)", 87 env!("CARGO_PKG_VERSION") 88 )) 89 .no_proxy() 90 .timeout(Duration::from_secs(10)) 91 .build() 92 .unwrap(); 93 Repo { identity, client } 94 } 95 96 pub async fn get_record( 97 &self, 98 did: &Did, 99 collection: &Nsid, 100 rkey: &RecordKey, 101 cid: &Option<Cid>, 102 ) -> Result<CachedRecord, RecordError> { 103 let Some(pds) = self.identity.did_to_pds(did.clone()).await? else { 104 return Err(RecordError::NotFound("could not get pds for DID")); 105 }; 106 107 // cid gets set to None for a retry, if it's Some and we got NotFound 108 let mut cid = cid; 109 110 let res = loop { 111 // TODO: throttle outgoing requests by host probably, generally guard against outgoing requests 112 let mut params = vec![ 113 ("repo", did.to_string()), 114 ("collection", collection.to_string()), 115 ("rkey", rkey.to_string()), 116 ]; 117 if let Some(cid) = cid { 118 params.push(("cid", cid.as_ref().to_string())); 119 } 120 let mut url = Url::parse_with_params(&pds, &params)?; 121 url.set_path("/xrpc/com.atproto.repo.getRecord"); 122 123 let res = self 124 .client 125 .get(url.clone()) 126 .send() 127 .await 128 .map_err(RecordError::SendError)?; 129 130 if res.status() == StatusCode::BAD_REQUEST { 131 // 1. if we're not able to parse json, it's not something we can handle 132 let err = res 133 .json::<ErrorResponseObject>() 134 .await 135 .map_err(RecordError::UpstreamBadBadNotGoodRequest)?; 136 // 2. if we are, is it a NotFound? and if so, did we try with a CID? 137 // if so, retry with no CID (api handler will reject for mismatch but 138 // with a nice error + warm cache) 139 if err.error == "NotFound" && cid.is_some() { 140 cid = &None; 141 continue; 142 } else { 143 return Err(RecordError::UpstreamBadRequest(err)); 144 } 145 } 146 break res; 147 }; 148 149 let data = res 150 .error_for_status() 151 .map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found) 152 .json::<RecordResponseObject>() 153 .await 154 .map_err(RecordError::ParseJsonError)?; // todo... 155 156 let Some(cid) = data.cid else { 157 return Err(RecordError::MissingUpstreamCid); 158 }; 159 let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?; 160 161 Ok(CachedRecord::Found(RawRecord { 162 cid, 163 record: data.value.to_string(), 164 })) 165 } 166}