Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 8.6 kB View raw
1use std::cell::RefCell; 2use std::sync::Arc; 3use std::sync::OnceLock; 4 5use bobbin_types::record::RecordBody; 6use bytes::Bytes; 7use jacquard_common::DefaultStr; 8use jacquard_common::types::string::{AtUri, Cid}; 9use quick_cache::Weighter; 10use quick_cache::sync::Cache; 11use zstd::bulk::{Compressor, Decompressor}; 12use zstd::dict::{DecoderDictionary, EncoderDictionary}; 13 14static RECORD_DICT: &[u8] = include_bytes!("record.dict"); 15const COMPRESS_LEVEL: i32 = 6; 16const ENTRY_OVERHEAD: u64 = 64; 17 18static ENCODER_DICT: OnceLock<EncoderDictionary<'static>> = OnceLock::new(); 19static DECODER_DICT: OnceLock<DecoderDictionary<'static>> = OnceLock::new(); 20 21fn encoder_dict() -> &'static EncoderDictionary<'static> { 22 ENCODER_DICT.get_or_init(|| EncoderDictionary::copy(RECORD_DICT, COMPRESS_LEVEL)) 23} 24 25fn decoder_dict() -> &'static DecoderDictionary<'static> { 26 DECODER_DICT.get_or_init(|| DecoderDictionary::copy(RECORD_DICT)) 27} 28 29thread_local! { 30 static COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new( 31 Compressor::with_prepared_dictionary(encoder_dict()).expect("zstd compressor init"), 32 ); 33 static DECOMPRESSOR: RefCell<Decompressor<'static>> = RefCell::new( 34 Decompressor::with_prepared_dictionary(decoder_dict()).expect("zstd decompressor init"), 35 ); 36} 37 38pub trait RecordStore: Send + Sync { 39 fn get(&self, uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>>; 40 fn put(&self, uri: AtUri<DefaultStr>, body: Arc<RecordBody>); 41 fn remove(&self, uri: &AtUri<DefaultStr>); 42 fn cache_stats(&self) -> Option<CacheStats> { 43 None 44 } 45} 46 47#[derive(Clone, Copy, Debug, Default)] 48pub struct CacheStats { 49 pub weight: u64, 50 pub len: u64, 51 pub capacity: u64, 52} 53 54pub struct NoopRecordStore; 55 56impl RecordStore for NoopRecordStore { 57 fn get(&self, _uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> { 58 None 59 } 60 fn put(&self, _uri: AtUri<DefaultStr>, _body: Arc<RecordBody>) {} 61 fn remove(&self, _uri: &AtUri<DefaultStr>) {} 62} 63 64#[derive(Clone, Debug)] 65pub struct CacheCapacity { 66 pub bytes: u64, 67 pub estimated_items: usize, 68} 69 70impl CacheCapacity { 71 pub fn from_bytes(bytes: u64) -> Self { 72 const TYPICAL_RECORD_BYTES: u64 = 512; 73 let estimated = bytes.div_ceil(TYPICAL_RECORD_BYTES).max(64) as usize; 74 Self { 75 bytes, 76 estimated_items: estimated, 77 } 78 } 79} 80 81#[derive(Clone)] 82enum Payload { 83 Raw(Bytes), 84 Zstd { compressed: Bytes, plain_len: usize }, 85} 86 87#[derive(Clone)] 88struct Stored { 89 cid: Cid<DefaultStr>, 90 payload: Payload, 91} 92 93#[derive(Clone)] 94struct ByteWeighter; 95 96impl Weighter<AtUri<DefaultStr>, Stored> for ByteWeighter { 97 fn weight(&self, key: &AtUri<DefaultStr>, val: &Stored) -> u64 { 98 let payload = match &val.payload { 99 Payload::Raw(bytes) => bytes.len(), 100 Payload::Zstd { compressed, .. } => compressed.len(), 101 }; 102 ENTRY_OVERHEAD + payload as u64 + key.as_ref().len() as u64 + val.cid.as_ref().len() as u64 103 } 104} 105 106pub struct LruRecordStore { 107 cache: Cache<AtUri<DefaultStr>, Stored, ByteWeighter>, 108} 109 110impl LruRecordStore { 111 pub fn new(capacity: CacheCapacity) -> Self { 112 Self { 113 cache: Cache::with_weighter(capacity.estimated_items, capacity.bytes, ByteWeighter), 114 } 115 } 116} 117 118impl RecordStore for LruRecordStore { 119 fn get(&self, uri: &AtUri<DefaultStr>) -> Option<Arc<RecordBody>> { 120 let stored = self.cache.get(uri)?; 121 let value = match stored.payload { 122 Payload::Raw(bytes) => bytes, 123 Payload::Zstd { 124 compressed, 125 plain_len, 126 } => { 127 let plain = DECOMPRESSOR 128 .with_borrow_mut(|d| d.decompress(&compressed, plain_len)) 129 .ok()?; 130 Bytes::from(plain) 131 } 132 }; 133 Some(Arc::new(RecordBody { 134 uri: uri.clone(), 135 cid: stored.cid, 136 value, 137 })) 138 } 139 140 fn put(&self, uri: AtUri<DefaultStr>, body: Arc<RecordBody>) { 141 let plain_len = body.value.len(); 142 let payload = match COMPRESSOR.with_borrow_mut(|c| c.compress(&body.value)) { 143 Ok(mut compressed) if compressed.len() < plain_len => { 144 compressed.shrink_to_fit(); 145 Payload::Zstd { 146 compressed: Bytes::from(compressed), 147 plain_len, 148 } 149 } 150 _ => Payload::Raw(body.value.clone()), 151 }; 152 self.cache.insert( 153 uri, 154 Stored { 155 cid: body.cid.clone(), 156 payload, 157 }, 158 ); 159 } 160 161 fn remove(&self, uri: &AtUri<DefaultStr>) { 162 self.cache.remove(uri); 163 } 164 165 fn cache_stats(&self) -> Option<CacheStats> { 166 Some(CacheStats { 167 weight: self.cache.weight(), 168 len: self.cache.len() as u64, 169 capacity: self.cache.capacity(), 170 }) 171 } 172} 173 174#[cfg(test)] 175mod tests { 176 use super::*; 177 use jacquard_common::types::string::Cid; 178 179 fn at(s: &str) -> AtUri<DefaultStr> { 180 AtUri::new_owned(s).unwrap() 181 } 182 183 fn body(uri: AtUri<DefaultStr>, payload: &[u8]) -> Arc<RecordBody> { 184 let cid: Cid<DefaultStr> = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i" 185 .parse() 186 .unwrap(); 187 Arc::new(RecordBody { 188 uri, 189 cid, 190 value: Bytes::copy_from_slice(payload), 191 }) 192 } 193 194 fn high_entropy(seed: u64, len: usize) -> Vec<u8> { 195 use std::hash::{Hash, Hasher}; 196 let mut out = Vec::with_capacity(len); 197 let mut counter = seed; 198 while out.len() < len { 199 let mut hasher = std::collections::hash_map::DefaultHasher::new(); 200 counter.hash(&mut hasher); 201 out.extend_from_slice(&hasher.finish().to_le_bytes()); 202 counter = counter.wrapping_add(1); 203 } 204 out.truncate(len); 205 out 206 } 207 208 #[test] 209 fn put_then_get_round_trips_through_compression() { 210 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 211 let uri = at("at://did:plc:nel/sh.tangled.repo/r1"); 212 let payload = br#"{"$type":"sh.tangled.repo","name":"oyster","knot":"oyster.cafe"}"#; 213 let b = body(uri.clone(), payload); 214 store.put(uri.clone(), b.clone()); 215 let got = store.get(&uri).expect("hit"); 216 assert_eq!(got.value, b.value, "decompressed body must equal original"); 217 assert_eq!(got.cid, b.cid); 218 } 219 220 #[test] 221 fn remove_evicts_entry() { 222 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 223 let uri = at("at://did:plc:nel/sh.tangled.repo/r1"); 224 store.put(uri.clone(), body(uri.clone(), br#"{"v":1}"#)); 225 assert!(store.get(&uri).is_some()); 226 store.remove(&uri); 227 assert!(store.get(&uri).is_none()); 228 } 229 230 #[test] 231 fn miss_returns_none() { 232 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 233 assert!( 234 store 235 .get(&at("at://did:plc:abalone/sh.tangled.repo/r1")) 236 .is_none() 237 ); 238 } 239 240 #[test] 241 fn byte_capacity_evicts_incompressible_entries() { 242 let store = LruRecordStore::new(CacheCapacity::from_bytes(2_048)); 243 let resident = (0..16) 244 .map(|i| { 245 let uri = at(&format!("at://did:plc:olaren/sh.tangled.string/r{i}")); 246 store.put(uri.clone(), body(uri.clone(), &high_entropy(i, 900))); 247 uri 248 }) 249 .collect::<Vec<_>>() 250 .iter() 251 .filter(|uri| store.get(uri).is_some()) 252 .count(); 253 assert!( 254 resident < 16, 255 "byte cap must evict incompressible bodies, observed {resident} of 16 resident" 256 ); 257 } 258 259 #[test] 260 fn capacity_estimates_items_floor() { 261 let cap = CacheCapacity::from_bytes(0); 262 assert_eq!(cap.estimated_items, 64, "tiny caches still need a floor"); 263 } 264 265 #[test] 266 fn incompressible_body_round_trips_via_raw() { 267 let store = LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024)); 268 let uri = at("at://did:plc:olaren/sh.tangled.string/r1"); 269 let payload = high_entropy(7, 300); 270 let stored = body(uri.clone(), &payload); 271 store.put(uri.clone(), stored.clone()); 272 let got = store.get(&uri).expect("hit"); 273 assert_eq!( 274 got.value, stored.value, 275 "incompressible body must round-trip byte-exact through the raw path" 276 ); 277 } 278}