A better Rust ATProto crate
1

Configure Feed

Select the types of activity you want to include in your feed.

at main 15 kB View raw
1//! Stress tests for firehose commit validation 2//! 3//! Generates thousands of random operations to catch edge cases in v1.1 validation. 4 5use jacquard_common::IntoStatic; 6use jacquard_common::types::crypto::{KeyCodec, PublicKey}; 7use jacquard_common::types::recordkey::Rkey; 8use jacquard_common::types::string::{Datetime, Did, Nsid, RecordKey}; 9use jacquard_common::types::tid::Ticker; 10use jacquard_common::types::value::RawData; 11use jacquard_repo::Repository; 12use jacquard_repo::car::read_car_header; 13use jacquard_repo::commit::firehose::validate_v1_1; 14use jacquard_repo::mst::RecordWriteOp; 15use jacquard_repo::storage::{BlockStore, MemoryBlockStore}; 16use rand::Rng; 17use rand::seq::SliceRandom; 18use smol_str::{SmolStr, ToSmolStr}; 19use std::collections::{BTreeMap, HashMap}; 20use std::sync::Arc; 21 22// Test configuration 23const INITIAL_RECORDS: usize = 50; 24const STRESS_OPERATIONS: usize = 100; 25const BATCH_SIZE_RANGE: (usize, usize) = (1, 10); 26 27fn make_test_record(n: u32, text: &str) -> BTreeMap<SmolStr, RawData<'static>> { 28 let mut record = BTreeMap::new(); 29 record.insert( 30 SmolStr::new("$type"), 31 RawData::String("app.bsky.feed.post".into()), 32 ); 33 record.insert( 34 SmolStr::new("text"), 35 RawData::String(format!("{} #{}", text, n).into()), 36 ); 37 record.insert( 38 SmolStr::new("createdAt"), 39 RawData::String("2024-01-01T00:00:00Z".to_string().into()), 40 ); 41 record 42} 43 44fn get_public_key(signing_key: &k256::ecdsa::SigningKey) -> PublicKey<'static> { 45 let verifying_key = signing_key.verifying_key(); 46 let pubkey_bytes = verifying_key.to_encoded_point(true).as_bytes().to_vec(); 47 PublicKey { 48 codec: KeyCodec::Secp256k1, 49 bytes: pubkey_bytes.into(), 50 } 51} 52 53async fn create_test_repo(storage: Arc<MemoryBlockStore>) -> Repository<SmolStr, MemoryBlockStore> { 54 let did = Did::new("did:plc:stresstest").unwrap(); 55 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 56 57 Repository::create(storage, did.into_static(), &signing_key, None) 58 .await 59 .unwrap() 60} 61 62/// Track existing records for generating realistic updates/deletes 63struct RecordTracker { 64 records: HashMap<SmolStr, u32>, 65 ticker: Ticker, 66} 67 68impl RecordTracker { 69 fn new() -> Self { 70 Self { 71 records: HashMap::new(), 72 ticker: Ticker::new(), 73 } 74 } 75 76 fn gen_new_rkey(&mut self) -> SmolStr { 77 self.ticker.next(None).into_static().to_smolstr() 78 } 79 80 fn pick_random_existing<R: Rng>(&self, rng: &mut R) -> Option<SmolStr> { 81 let keys: Vec<_> = self.records.keys().cloned().collect(); 82 keys.choose(rng).cloned() 83 } 84 85 fn add(&mut self, rkey: SmolStr, counter: u32) { 86 self.records.insert(rkey, counter); 87 } 88 89 fn remove(&mut self, rkey: &str) { 90 self.records.remove(rkey); 91 } 92 93 fn len(&self) -> usize { 94 self.records.len() 95 } 96} 97 98#[derive(Debug, Clone)] 99enum TestOp { 100 Create { rkey: SmolStr, counter: u32 }, 101 Update { rkey: SmolStr, counter: u32 }, 102 Delete { rkey: SmolStr }, 103} 104 105fn generate_creates_only<R: Rng>( 106 rng: &mut R, 107 tracker: &mut RecordTracker, 108 count: usize, 109) -> Vec<TestOp> { 110 let mut ops = Vec::new(); 111 for _ in 0..count { 112 let rkey = tracker.gen_new_rkey(); 113 let counter: u32 = rng.r#gen(); 114 tracker.add(rkey.clone(), counter); 115 ops.push(TestOp::Create { rkey, counter }); 116 } 117 ops 118} 119 120fn generate_random_ops<R: Rng>( 121 rng: &mut R, 122 tracker: &mut RecordTracker, 123 count: usize, 124) -> Vec<TestOp> { 125 let mut ops = Vec::new(); 126 127 for _ in 0..count { 128 // Weighted random choice: 50% create, 30% update, 20% delete 129 let action = rng.gen_range(0..100); 130 131 let op = if action < 50 || tracker.len() == 0 { 132 // Create 133 let rkey = tracker.gen_new_rkey(); 134 let counter: u32 = rng.r#gen(); 135 tracker.add(rkey.clone(), counter); 136 TestOp::Create { rkey, counter } 137 } else if action < 80 { 138 // Update 139 if let Some(rkey) = tracker.pick_random_existing(rng) { 140 let counter: u32 = rng.r#gen(); 141 tracker.add(rkey.clone(), counter); 142 TestOp::Update { rkey, counter } 143 } else { 144 // Fall back to create if no records exist 145 let rkey = tracker.gen_new_rkey(); 146 let counter: u32 = rng.r#gen(); 147 tracker.add(rkey.clone(), counter); 148 TestOp::Create { rkey, counter } 149 } 150 } else { 151 // Delete 152 if let Some(rkey) = tracker.pick_random_existing(rng) { 153 tracker.remove(&rkey); 154 TestOp::Delete { rkey } 155 } else { 156 // Fall back to create if no records exist 157 let rkey = tracker.gen_new_rkey(); 158 let counter: u32 = rng.r#gen(); 159 tracker.add(rkey.clone(), counter); 160 TestOp::Create { rkey, counter } 161 } 162 }; 163 164 ops.push(op); 165 } 166 167 ops 168} 169 170fn test_ops_to_record_writes( 171 ops: Vec<TestOp>, 172 collection: &Nsid, 173) -> Vec<RecordWriteOp<'_, SmolStr>> { 174 let collection_static = collection.clone().into_static(); 175 ops.into_iter() 176 .map(|op| match op { 177 TestOp::Create { rkey, counter } => RecordWriteOp::Create { 178 collection: collection_static.clone(), 179 rkey: RecordKey(Rkey::new(rkey).unwrap()).into_static(), 180 record: make_test_record(counter, "Random post"), 181 }, 182 TestOp::Update { rkey, counter } => RecordWriteOp::Update { 183 collection: collection_static.clone(), 184 rkey: RecordKey(Rkey::new(rkey).unwrap()).into_static(), 185 record: make_test_record(counter, "Updated post"), 186 prev: None, 187 }, 188 TestOp::Delete { rkey } => RecordWriteOp::Delete { 189 collection: collection_static.clone(), 190 rkey: RecordKey(Rkey::new(rkey).unwrap()).into_static(), 191 prev: None, 192 }, 193 }) 194 .collect() 195} 196 197#[tokio::test] 198async fn test_stress_random_operations() { 199 let storage = Arc::new(MemoryBlockStore::new()); 200 let mut repo = create_test_repo(storage.clone()).await; 201 202 let collection = Nsid::new("app.bsky.feed.post").unwrap().into_static(); 203 let did = Did::new("did:plc:stresstest").unwrap().into_static(); 204 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 205 let pubkey = get_public_key(&signing_key); 206 207 let mut rng = rand::thread_rng(); 208 let mut tracker = RecordTracker::new(); 209 210 // Step 1: Create initial batch of records 211 println!("Creating {} initial records...", INITIAL_RECORDS); 212 println!("Repo before initial commit:\n{}", repo); 213 214 let initial_ops = generate_creates_only(&mut rng, &mut tracker, INITIAL_RECORDS); 215 let record_writes = test_ops_to_record_writes(initial_ops, &collection); 216 217 let (repo_ops, commit_data) = repo 218 .create_commit(&record_writes, &did, None, &signing_key) 219 .await 220 .unwrap(); 221 222 repo.apply_commit(commit_data.clone()).await.unwrap(); 223 println!("Repo after initial commit:\n{}", repo); 224 225 // Validate initial commit 226 let firehose_commit = commit_data 227 .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 228 .await 229 .unwrap(); 230 231 validate_v1_1(&firehose_commit, &pubkey) 232 .await 233 .expect("Initial batch should validate"); 234 235 println!( 236 "Initial repo created with {} records", 237 tracker.records.len() 238 ); 239 240 // Step 2: Generate and apply random operations in batches 241 let mut commit_count = 1; 242 let mut total_ops = 0; 243 244 while total_ops < STRESS_OPERATIONS { 245 let batch_size = rng.gen_range(BATCH_SIZE_RANGE.0..=BATCH_SIZE_RANGE.1); 246 let remaining = STRESS_OPERATIONS - total_ops; 247 let ops_count = batch_size.min(remaining); 248 249 let ops = generate_random_ops(&mut rng, &mut tracker, ops_count); 250 let record_writes = test_ops_to_record_writes(ops, &collection); 251 252 let (repo_ops, commit_data) = repo 253 .create_commit(&record_writes, &did, None, &signing_key) 254 .await 255 .unwrap(); 256 257 repo.apply_commit(commit_data.clone()).await.unwrap(); 258 259 // Validate firehose commit 260 commit_count += 1; 261 let firehose_commit = commit_data 262 .to_firehose_commit( 263 &did, 264 commit_count, 265 Datetime::now(), 266 repo_ops.clone(), 267 vec![], 268 ) 269 .await 270 .unwrap(); 271 272 validate_v1_1(&firehose_commit, &pubkey) 273 .await 274 .unwrap_or_else(|e| { 275 eprintln!( 276 "Validation failed at commit {} (batch size {})", 277 commit_count, ops_count 278 ); 279 eprintln!("Error: {}", e); 280 eprintln!("Operations:\n{:?}", repo_ops); 281 eprintln!("Relevant blocks:\n{:?}", commit_data.relevant_blocks.keys()); 282 eprintln!("All blocks:\n{:?}", commit_data.blocks.keys()); 283 panic!( 284 "Validation failed at commit {} (batch size {}): {}", 285 commit_count, ops_count, e 286 ) 287 }); 288 289 total_ops += ops_count; 290 291 if commit_count % 50 == 0 { 292 println!( 293 "Processed {} commits, {} total operations, {} records in repo", 294 commit_count, 295 total_ops, 296 tracker.records.len() 297 ); 298 } 299 } 300 301 println!( 302 "Stress test complete: {} commits, {} operations, {} final records", 303 commit_count, 304 total_ops, 305 tracker.records.len() 306 ); 307} 308 309#[tokio::test] 310async fn test_stress_large_batches() { 311 let storage = Arc::new(MemoryBlockStore::new()); 312 let mut repo = create_test_repo(storage.clone()).await; 313 314 let collection = Nsid::new("app.bsky.feed.post").unwrap().into_static(); 315 let did = Did::new("did:plc:stresstest").unwrap().into_static(); 316 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 317 let pubkey = get_public_key(&signing_key); 318 319 let mut rng = rand::thread_rng(); 320 let mut tracker = RecordTracker::new(); 321 322 // Create initial records 323 let initial_ops = generate_creates_only(&mut rng, &mut tracker, 100); 324 let record_writes = test_ops_to_record_writes(initial_ops, &collection); 325 let (repo_ops, commit_data) = repo 326 .create_commit( 327 &record_writes, 328 &did, 329 Some(repo.current_commit_cid().clone()), 330 &signing_key, 331 ) 332 .await 333 .unwrap(); 334 repo.apply_commit(commit_data.clone()).await.unwrap(); 335 336 let firehose_commit = commit_data 337 .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 338 .await 339 .unwrap(); 340 341 validate_v1_1(&firehose_commit, &pubkey).await.unwrap(); 342 343 for batch_num in 1..=5000 { 344 let batch_size = rng.gen_range(1..=20); 345 let ops = generate_random_ops(&mut rng, &mut tracker, batch_size); 346 let record_writes = test_ops_to_record_writes(ops, &collection); 347 348 let (repo_ops, commit_data) = repo 349 .create_commit(&record_writes, &did, None, &signing_key) 350 .await 351 .unwrap(); 352 353 repo.apply_commit(commit_data.clone()).await.unwrap(); 354 355 let firehose_commit = commit_data 356 .to_firehose_commit(&did, batch_num + 1, Datetime::now(), repo_ops, vec![]) 357 .await 358 .unwrap(); 359 360 validate_v1_1(&firehose_commit, &pubkey) 361 .await 362 .unwrap_or_else(|e| { 363 panic!( 364 "Large batch validation failed (batch size {}): {}", 365 batch_size, e 366 ) 367 }); 368 369 repo.apply_commit(commit_data).await.unwrap(); 370 // println!( 371 // "Validated large batch {} with {} ops", 372 // batch_num, batch_size 373 // ); 374 } 375} 376 377#[tokio::test] 378async fn test_stress_with_fixture() { 379 use jacquard_repo::car::read_car; 380 use std::path::Path; 381 let fixture_path = 382 Path::new("tests/fixtures/repo-nonbinary.computer-2025-10-21T13_05_55.090Z.car"); 383 384 // Skip test in CI if fixture doesn't exist 385 if !fixture_path.exists() { 386 println!( 387 "Skipping fixture test - fixture not found at {:?}", 388 fixture_path 389 ); 390 return; 391 } 392 393 println!("Loading fixture repo from {:?}", fixture_path); 394 395 // Import CAR into storage 396 let storage = Arc::new(MemoryBlockStore::new()); 397 let header = read_car_header(fixture_path).await.unwrap(); 398 let parsed_car = read_car(fixture_path).await.unwrap(); 399 400 storage.put_many(parsed_car).await.unwrap(); 401 402 let root_cid = header.first().unwrap(); 403 404 // Load repository from fixture 405 let mut repo = Repository::from_commit(storage.clone(), root_cid) 406 .await 407 .unwrap(); 408 409 println!( 410 "Loaded fixture repo with commit at {}", 411 repo.current_commit_cid() 412 ); 413 414 let collection = Nsid::new("app.bsky.feed.post").unwrap().into_static(); 415 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 416 let pubkey = get_public_key(&signing_key); 417 let did = repo.did().clone().into_static(); 418 419 let mut rng = rand::thread_rng(); 420 let mut tracker = RecordTracker::new(); 421 422 // Perform random operations on fixture repo 423 for batch_num in 1..=20 { 424 let batch_size = rng.gen_range(10..=50); 425 let ops = generate_random_ops(&mut rng, &mut tracker, batch_size); 426 let record_writes = test_ops_to_record_writes(ops, &collection); 427 428 let (repo_ops, commit_data) = repo 429 .create_commit( 430 &record_writes, 431 &did, 432 Some(repo.current_commit_cid().clone()), 433 &signing_key, 434 ) 435 .await 436 .unwrap(); 437 438 repo.apply_commit(commit_data.clone()).await.unwrap(); 439 440 let firehose_commit = commit_data 441 .to_firehose_commit(&did, batch_num, Datetime::now(), repo_ops, vec![]) 442 .await 443 .unwrap(); 444 445 validate_v1_1(&firehose_commit, &pubkey) 446 .await 447 .unwrap_or_else(|e| panic!("Fixture validation failed at batch {}: {}", batch_num, e)); 448 } 449 450 println!("Fixture stress test complete - 20 batches validated"); 451}