Monorepo for Tangled
tangled.org
1use std::cell::RefCell;
2use std::sync::Arc;
3use std::sync::OnceLock;
4
5use bobbin_types::record::RecordBody;
6use bytes::Bytes;
7use jacquard_common::DefaultStr;
8use jacquard_common::types::string::{AtUri, Cid};
9use quick_cache::Weighter;
10use quick_cache::sync::Cache;
11use zstd::bulk::{Compressor, Decompressor};
12use zstd::dict::{DecoderDictionary, EncoderDictionary};
13
14static RECORD_DICT: &[u8] = include_bytes!("record.dict");
15const COMPRESS_LEVEL: i32 = 6;
16const ENTRY_OVERHEAD: u64 = 64;
17
18static ENCODER_DICT: OnceLock<EncoderDictionary<'static>> = OnceLock::new();
19static DECODER_DICT: OnceLock<DecoderDictionary<'static>> = OnceLock::new();
20
21fn encoder_dict() -> &'static EncoderDictionary<'static> {
22 ENCODER_DICT.get_or_init(|| EncoderDictionary::copy(RECORD_DICT, COMPRESS_LEVEL))
23}
24
25fn decoder_dict() -> &'static DecoderDictionary<'static> {
26 DECODER_DICT.get_or_init(|| DecoderDictionary::copy(RECORD_DICT))
27}
28
29thread_local! {
30 static COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
31 Compressor::with_prepared_dictionary(encoder_dict()).expect("zstd compressor init"),
32 );
33 static DECOMPRESSOR: RefCell<Decompressor<'static>> = RefCell::new(
34 Decompressor::with_prepared_dictionary(decoder_dict()).expect("zstd decompressor init"),
35 );
36}
37
38pub trait RecordStore: Send + Sync {
39 fn get(&self, uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>>;
40 fn put(&self, uri: AtUri<DefaultStr>, body: Arc<RecordBody>);
41 fn remove(&self, uri: &AtUri<DefaultStr>);
42 fn cache_stats(&self) -> Option<CacheStats> {
43 None
44 }
45}
46
47#[derive(Clone, Copy, Debug, Default)]
48pub struct CacheStats {
49 pub weight: u64,
50 pub len: u64,
51 pub capacity: u64,
52}
53
54pub struct NoopRecordStore;
55
56impl RecordStore for NoopRecordStore {
57 fn get(&self, _uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> {
58 None
59 }
60 fn put(&self, _uri: AtUri<DefaultStr>, _body: Arc<RecordBody>) {}
61 fn remove(&self, _uri: &AtUri<DefaultStr>) {}
62}
63
64#[derive(Clone, Debug)]
65pub struct CacheCapacity {
66 pub bytes: u64,
67 pub estimated_items: usize,
68}
69
70impl CacheCapacity {
71 pub fn from_bytes(bytes: u64) -> Self {
72 const TYPICAL_RECORD_BYTES: u64 = 512;
73 let estimated = bytes.div_ceil(TYPICAL_RECORD_BYTES).max(64) as usize;
74 Self {
75 bytes,
76 estimated_items: estimated,
77 }
78 }
79}
80
81#[derive(Clone)]
82enum Payload {
83 Raw(Bytes),
84 Zstd { compressed: Bytes, plain_len: usize },
85}
86
87#[derive(Clone)]
88struct Stored {
89 cid: Cid<DefaultStr>,
90 payload: Payload,
91}
92
93#[derive(Clone)]
94struct ByteWeighter;
95
96impl Weighter<AtUri<DefaultStr>, Stored> for ByteWeighter {
97 fn weight(&self, key: &AtUri<DefaultStr>, val: &Stored) -> u64 {
98 let payload = match &val.payload {
99 Payload::Raw(bytes) => bytes.len(),
100 Payload::Zstd { compressed, .. } => compressed.len(),
101 };
102 ENTRY_OVERHEAD + payload as u64 + key.as_ref().len() as u64 + val.cid.as_ref().len() as u64
103 }
104}
105
106pub struct LruRecordStore {
107 cache: Cache<AtUri<DefaultStr>, Stored, ByteWeighter>,
108}
109
110impl LruRecordStore {
111 pub fn new(capacity: CacheCapacity) -> Self {
112 Self {
113 cache: Cache::with_weighter(capacity.estimated_items, capacity.bytes, ByteWeighter),
114 }
115 }
116}
117
118impl RecordStore for LruRecordStore {
119 fn get(&self, uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> {
120 let stored = self.cache.get(uri)?;
121 let value = match stored.payload {
122 Payload::Raw(bytes) => bytes,
123 Payload::Zstd {
124 compressed,
125 plain_len,
126 } => {
127 let plain = DECOMPRESSOR
128 .with_borrow_mut(|d| d.decompress(&compressed, plain_len))
129 .ok()?;
130 Bytes::from(plain)
131 }
132 };
133 Some(Arc::new(RecordBody {
134 uri: uri.clone(),
135 cid: stored.cid,
136 value,
137 }))
138 }
139
140 fn put(&self, uri: AtUri<DefaultStr>, body: Arc<RecordBody>) {
141 let plain_len = body.value.len();
142 let payload = match COMPRESSOR.with_borrow_mut(|c| c.compress(&body.value)) {
143 Ok(mut compressed) if compressed.len() < plain_len => {
144 compressed.shrink_to_fit();
145 Payload::Zstd {
146 compressed: Bytes::from(compressed),
147 plain_len,
148 }
149 }
150 _ => Payload::Raw(body.value.clone()),
151 };
152 self.cache.insert(
153 uri,
154 Stored {
155 cid: body.cid.clone(),
156 payload,
157 },
158 );
159 }
160
161 fn remove(&self, uri: &AtUri<DefaultStr>) {
162 self.cache.remove(uri);
163 }
164
165 fn cache_stats(&self) -> Option<CacheStats> {
166 Some(CacheStats {
167 weight: self.cache.weight(),
168 len: self.cache.len() as u64,
169 capacity: self.cache.capacity(),
170 })
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use jacquard_common::types::string::Cid;
178
179 fn at(s: &str) -> AtUri<DefaultStr> {
180 AtUri::new_owned(s).unwrap()
181 }
182
183 fn body(uri: AtUri<DefaultStr>, payload: &[u8]) -> Arc<RecordBody> {
184 let cid: Cid<DefaultStr> = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"
185 .parse()
186 .unwrap();
187 Arc::new(RecordBody {
188 uri,
189 cid,
190 value: Bytes::copy_from_slice(payload),
191 })
192 }
193
194 fn high_entropy(seed: u64, len: usize) -> Vec<u8> {
195 use std::hash::{Hash, Hasher};
196 let mut out = Vec::with_capacity(len);
197 let mut counter = seed;
198 while out.len() < len {
199 let mut hasher = std::collections::hash_map::DefaultHasher::new();
200 counter.hash(&mut hasher);
201 out.extend_from_slice(&hasher.finish().to_le_bytes());
202 counter = counter.wrapping_add(1);
203 }
204 out.truncate(len);
205 out
206 }
207
208 #[test]
209 fn put_then_get_round_trips_through_compression() {
210 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
211 let uri = at("at://did:plc:nel/sh.tangled.repo/r1");
212 let payload = br#"{"$type":"sh.tangled.repo","name":"oyster","knot":"oyster.cafe"}"#;
213 let b = body(uri.clone(), payload);
214 store.put(uri.clone(), b.clone());
215 let got = store.get(&uri).expect("hit");
216 assert_eq!(got.value, b.value, "decompressed body must equal original");
217 assert_eq!(got.cid, b.cid);
218 }
219
220 #[test]
221 fn remove_evicts_entry() {
222 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
223 let uri = at("at://did:plc:nel/sh.tangled.repo/r1");
224 store.put(uri.clone(), body(uri.clone(), br#"{"v":1}"#));
225 assert!(store.get(&uri).is_some());
226 store.remove(&uri);
227 assert!(store.get(&uri).is_none());
228 }
229
230 #[test]
231 fn miss_returns_none() {
232 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
233 assert!(
234 store
235 .get(&at("at://did:plc:abalone/sh.tangled.repo/r1"))
236 .is_none()
237 );
238 }
239
240 #[test]
241 fn byte_capacity_evicts_incompressible_entries() {
242 let store = LruRecordStore::new(CacheCapacity::from_bytes(2_048));
243 let resident = (0..16)
244 .map(|i| {
245 let uri = at(&format!("at://did:plc:olaren/sh.tangled.string/r{i}"));
246 store.put(uri.clone(), body(uri.clone(), &high_entropy(i, 900)));
247 uri
248 })
249 .collect::<Vec<_>>()
250 .iter()
251 .filter(|uri| store.get(uri).is_some())
252 .count();
253 assert!(
254 resident < 16,
255 "byte cap must evict incompressible bodies, observed {resident} of 16 resident"
256 );
257 }
258
259 #[test]
260 fn capacity_estimates_items_floor() {
261 let cap = CacheCapacity::from_bytes(0);
262 assert_eq!(cap.estimated_items, 64, "tiny caches still need a floor");
263 }
264
265 #[test]
266 fn incompressible_body_round_trips_via_raw() {
267 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024));
268 let uri = at("at://did:plc:olaren/sh.tangled.string/r1");
269 let payload = high_entropy(7, 300);
270 let stored = body(uri.clone(), &payload);
271 store.put(uri.clone(), stored.clone());
272 let got = store.get(&uri).expect("hit");
273 assert_eq!(
274 got.value, stored.value,
275 "incompressible body must round-trip byte-exact through the raw path"
276 );
277 }
278}