Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

extract a record iterator

+89 -44
+89 -44
ufos/src/storage_fjall.rs
··· 207 207 } 208 208 } 209 209 210 + type FjallRKV = fjall::Result<(fjall::Slice, fjall::Slice)>; 211 + 210 212 #[derive(Clone)] 211 213 pub struct FjallReader { 212 214 keyspace: Keyspace, ··· 214 216 feeds: PartitionHandle, 215 217 records: PartitionHandle, 216 218 rollups: PartitionHandle, 219 + } 220 + 221 + /// An iterator that knows how to skip over deleted/invalidated records 222 + struct RecordIterator { 223 + db_iter: Box<dyn Iterator<Item = FjallRKV>>, 224 + records: PartitionHandle, 225 + limit: usize, 226 + fetched: usize, 227 + } 228 + impl RecordIterator { 229 + pub fn new( 230 + feeds: &PartitionHandle, 231 + records: PartitionHandle, 232 + collection: &Nsid, 233 + limit: usize, 234 + ) -> StorageResult<Self> { 235 + let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 236 + let db_iter = feeds.prefix(prefix).rev(); 237 + Ok(Self { 238 + db_iter: Box::new(db_iter), 239 + records, 240 + limit, 241 + fetched: 0, 242 + }) 243 + } 244 + fn get_record(&self, db_next: FjallRKV) -> StorageResult<Option<UFOsRecord>> { 245 + let (key_bytes, val_bytes) = db_next?; 246 + let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 247 + let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 248 + let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 249 + 250 + let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else { 251 + // record was deleted (hopefully) 252 + return Ok(None); 253 + }; 254 + 255 + let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 256 + 257 + if meta.cursor() != feed_key.cursor() { 258 + // older/different version 259 + return Ok(None); 260 + } 261 + if meta.rev != feed_val.rev() { 262 + // weird... 263 + log::warn!("record lookup: cursor match but rev did not...? excluding."); 264 + return Ok(None); 265 + } 266 + let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 267 + log::warn!( 268 + "record lookup: found record but could not get bytes to decode the record??" 269 + ); 270 + return Ok(None); 271 + }; 272 + let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 273 + Ok(Some(UFOsRecord { 274 + collection: feed_key.collection().clone(), 275 + cursor: feed_key.cursor(), 276 + did: feed_val.did().clone(), 277 + rkey: feed_val.rkey().clone(), 278 + rev: meta.rev.to_string(), 279 + record: rawval.try_into()?, 280 + is_update: meta.is_update, 281 + })) 282 + } 283 + } 284 + impl Iterator for RecordIterator { 285 + type Item = StorageResult<Option<UFOsRecord>>; 286 + fn next(&mut self) -> Option<Self::Item> { 287 + if self.fetched == self.limit { 288 + return Some(Ok(None)); 289 + } 290 + let record = loop { 291 + let db_next = self.db_iter.next()?; // None short-circuits here 292 + match self.get_record(db_next) { 293 + Err(e) => return Some(Err(e)), 294 + Ok(Some(record)) => break record, 295 + Ok(None) => continue, 296 + } 297 + }; 298 + self.fetched += 1; 299 + Some(Ok(Some(record))) 300 + } 217 301 } 218 302 219 303 impl StoreReader<FjallStats> for FjallReader { ··· 348 432 349 433 let collection = collections[0]; 350 434 351 - let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 352 - let collected = 0; 353 - let mut out = vec![]; 354 - for kv in self.feeds.prefix(prefix).rev() { 355 - let (key_bytes, val_bytes) = kv?; 356 - let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 357 - let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 358 - let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 359 - 360 - let Some(location_val_bytes) = self.records.get(location_key.to_db_bytes()?)? else { 361 - // record was deleted (hopefully) 362 - continue; 363 - }; 364 - 365 - let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 366 - 367 - if meta.cursor() != feed_key.cursor() { 368 - // older/different version 369 - continue; 370 - } 371 - if meta.rev != feed_val.rev() { 372 - // weird... 373 - log::warn!("record lookup: cursor match but rev did not...? excluding."); 374 - continue; 375 - } 376 - let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 377 - log::warn!( 378 - "record lookup: found record but could not get bytes to decode the record??" 379 - ); 380 - continue; 381 - }; 382 - let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 383 - out.push(UFOsRecord { 384 - collection: feed_key.collection().clone(), 385 - cursor: feed_key.cursor(), 386 - did: feed_val.did().clone(), 387 - rkey: feed_val.rkey().clone(), 388 - rev: meta.rev.to_string(), 389 - record: rawval.try_into()?, 390 - is_update: meta.is_update, 391 - }); 392 - 393 - if collected >= limit { 435 + let mut out = Vec::new(); 436 + for rec in RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)? { 437 + if let Some(r) = rec? { 438 + out.push(r) 439 + } else { 394 440 break; 395 441 } 396 442 } 397 - 398 443 Ok(out) 399 444 } 400 445 }