A better Rust ATProto crate
1//! Stress tests for firehose commit validation
2//!
3//! Generates thousands of random operations to catch edge cases in v1.1 validation.
4
5use jacquard_common::IntoStatic;
6use jacquard_common::types::crypto::{KeyCodec, PublicKey};
7use jacquard_common::types::recordkey::Rkey;
8use jacquard_common::types::string::{Datetime, Did, Nsid, RecordKey};
9use jacquard_common::types::tid::Ticker;
10use jacquard_common::types::value::RawData;
11use jacquard_repo::Repository;
12use jacquard_repo::car::read_car_header;
13use jacquard_repo::commit::firehose::validate_v1_1;
14use jacquard_repo::mst::RecordWriteOp;
15use jacquard_repo::storage::{BlockStore, MemoryBlockStore};
16use rand::Rng;
17use rand::seq::SliceRandom;
18use smol_str::{SmolStr, ToSmolStr};
19use std::collections::{BTreeMap, HashMap};
20use std::sync::Arc;
21
22// Test configuration
23const INITIAL_RECORDS: usize = 50;
24const STRESS_OPERATIONS: usize = 100;
25const BATCH_SIZE_RANGE: (usize, usize) = (1, 10);
26
27fn make_test_record(n: u32, text: &str) -> BTreeMap<SmolStr, RawData<'static>> {
28 let mut record = BTreeMap::new();
29 record.insert(
30 SmolStr::new("$type"),
31 RawData::String("app.bsky.feed.post".into()),
32 );
33 record.insert(
34 SmolStr::new("text"),
35 RawData::String(format!("{} #{}", text, n).into()),
36 );
37 record.insert(
38 SmolStr::new("createdAt"),
39 RawData::String("2024-01-01T00:00:00Z".to_string().into()),
40 );
41 record
42}
43
44fn get_public_key(signing_key: &k256::ecdsa::SigningKey) -> PublicKey<'static> {
45 let verifying_key = signing_key.verifying_key();
46 let pubkey_bytes = verifying_key.to_encoded_point(true).as_bytes().to_vec();
47 PublicKey {
48 codec: KeyCodec::Secp256k1,
49 bytes: pubkey_bytes.into(),
50 }
51}
52
53async fn create_test_repo(storage: Arc<MemoryBlockStore>) -> Repository<SmolStr, MemoryBlockStore> {
54 let did = Did::new("did:plc:stresstest").unwrap();
55 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng);
56
57 Repository::create(storage, did.into_static(), &signing_key, None)
58 .await
59 .unwrap()
60}
61
62/// Track existing records for generating realistic updates/deletes
63struct RecordTracker {
64 records: HashMap<SmolStr, u32>,
65 ticker: Ticker,
66}
67
68impl RecordTracker {
69 fn new() -> Self {
70 Self {
71 records: HashMap::new(),
72 ticker: Ticker::new(),
73 }
74 }
75
76 fn gen_new_rkey(&mut self) -> SmolStr {
77 self.ticker.next(None).into_static().to_smolstr()
78 }
79
80 fn pick_random_existing<R: Rng>(&self, rng: &mut R) -> Option<SmolStr> {
81 let keys: Vec<_> = self.records.keys().cloned().collect();
82 keys.choose(rng).cloned()
83 }
84
85 fn add(&mut self, rkey: SmolStr, counter: u32) {
86 self.records.insert(rkey, counter);
87 }
88
89 fn remove(&mut self, rkey: &str) {
90 self.records.remove(rkey);
91 }
92
93 fn len(&self) -> usize {
94 self.records.len()
95 }
96}
97
98#[derive(Debug, Clone)]
99enum TestOp {
100 Create { rkey: SmolStr, counter: u32 },
101 Update { rkey: SmolStr, counter: u32 },
102 Delete { rkey: SmolStr },
103}
104
105fn generate_creates_only<R: Rng>(
106 rng: &mut R,
107 tracker: &mut RecordTracker,
108 count: usize,
109) -> Vec<TestOp> {
110 let mut ops = Vec::new();
111 for _ in 0..count {
112 let rkey = tracker.gen_new_rkey();
113 let counter: u32 = rng.r#gen();
114 tracker.add(rkey.clone(), counter);
115 ops.push(TestOp::Create { rkey, counter });
116 }
117 ops
118}
119
120fn generate_random_ops<R: Rng>(
121 rng: &mut R,
122 tracker: &mut RecordTracker,
123 count: usize,
124) -> Vec<TestOp> {
125 let mut ops = Vec::new();
126
127 for _ in 0..count {
128 // Weighted random choice: 50% create, 30% update, 20% delete
129 let action = rng.gen_range(0..100);
130
131 let op = if action < 50 || tracker.len() == 0 {
132 // Create
133 let rkey = tracker.gen_new_rkey();
134 let counter: u32 = rng.r#gen();
135 tracker.add(rkey.clone(), counter);
136 TestOp::Create { rkey, counter }
137 } else if action < 80 {
138 // Update
139 if let Some(rkey) = tracker.pick_random_existing(rng) {
140 let counter: u32 = rng.r#gen();
141 tracker.add(rkey.clone(), counter);
142 TestOp::Update { rkey, counter }
143 } else {
144 // Fall back to create if no records exist
145 let rkey = tracker.gen_new_rkey();
146 let counter: u32 = rng.r#gen();
147 tracker.add(rkey.clone(), counter);
148 TestOp::Create { rkey, counter }
149 }
150 } else {
151 // Delete
152 if let Some(rkey) = tracker.pick_random_existing(rng) {
153 tracker.remove(&rkey);
154 TestOp::Delete { rkey }
155 } else {
156 // Fall back to create if no records exist
157 let rkey = tracker.gen_new_rkey();
158 let counter: u32 = rng.r#gen();
159 tracker.add(rkey.clone(), counter);
160 TestOp::Create { rkey, counter }
161 }
162 };
163
164 ops.push(op);
165 }
166
167 ops
168}
169
170fn test_ops_to_record_writes(
171 ops: Vec<TestOp>,
172 collection: &Nsid,
173) -> Vec<RecordWriteOp<'_, SmolStr>> {
174 let collection_static = collection.clone().into_static();
175 ops.into_iter()
176 .map(|op| match op {
177 TestOp::Create { rkey, counter } => RecordWriteOp::Create {
178 collection: collection_static.clone(),
179 rkey: RecordKey(Rkey::new(rkey).unwrap()).into_static(),
180 record: make_test_record(counter, "Random post"),
181 },
182 TestOp::Update { rkey, counter } => RecordWriteOp::Update {
183 collection: collection_static.clone(),
184 rkey: RecordKey(Rkey::new(rkey).unwrap()).into_static(),
185 record: make_test_record(counter, "Updated post"),
186 prev: None,
187 },
188 TestOp::Delete { rkey } => RecordWriteOp::Delete {
189 collection: collection_static.clone(),
190 rkey: RecordKey(Rkey::new(rkey).unwrap()).into_static(),
191 prev: None,
192 },
193 })
194 .collect()
195}
196
197#[tokio::test]
198async fn test_stress_random_operations() {
199 let storage = Arc::new(MemoryBlockStore::new());
200 let mut repo = create_test_repo(storage.clone()).await;
201
202 let collection = Nsid::new("app.bsky.feed.post").unwrap().into_static();
203 let did = Did::new("did:plc:stresstest").unwrap().into_static();
204 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng);
205 let pubkey = get_public_key(&signing_key);
206
207 let mut rng = rand::thread_rng();
208 let mut tracker = RecordTracker::new();
209
210 // Step 1: Create initial batch of records
211 println!("Creating {} initial records...", INITIAL_RECORDS);
212 println!("Repo before initial commit:\n{}", repo);
213
214 let initial_ops = generate_creates_only(&mut rng, &mut tracker, INITIAL_RECORDS);
215 let record_writes = test_ops_to_record_writes(initial_ops, &collection);
216
217 let (repo_ops, commit_data) = repo
218 .create_commit(&record_writes, &did, None, &signing_key)
219 .await
220 .unwrap();
221
222 repo.apply_commit(commit_data.clone()).await.unwrap();
223 println!("Repo after initial commit:\n{}", repo);
224
225 // Validate initial commit
226 let firehose_commit = commit_data
227 .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![])
228 .await
229 .unwrap();
230
231 validate_v1_1(&firehose_commit, &pubkey)
232 .await
233 .expect("Initial batch should validate");
234
235 println!(
236 "Initial repo created with {} records",
237 tracker.records.len()
238 );
239
240 // Step 2: Generate and apply random operations in batches
241 let mut commit_count = 1;
242 let mut total_ops = 0;
243
244 while total_ops < STRESS_OPERATIONS {
245 let batch_size = rng.gen_range(BATCH_SIZE_RANGE.0..=BATCH_SIZE_RANGE.1);
246 let remaining = STRESS_OPERATIONS - total_ops;
247 let ops_count = batch_size.min(remaining);
248
249 let ops = generate_random_ops(&mut rng, &mut tracker, ops_count);
250 let record_writes = test_ops_to_record_writes(ops, &collection);
251
252 let (repo_ops, commit_data) = repo
253 .create_commit(&record_writes, &did, None, &signing_key)
254 .await
255 .unwrap();
256
257 repo.apply_commit(commit_data.clone()).await.unwrap();
258
259 // Validate firehose commit
260 commit_count += 1;
261 let firehose_commit = commit_data
262 .to_firehose_commit(
263 &did,
264 commit_count,
265 Datetime::now(),
266 repo_ops.clone(),
267 vec![],
268 )
269 .await
270 .unwrap();
271
272 validate_v1_1(&firehose_commit, &pubkey)
273 .await
274 .unwrap_or_else(|e| {
275 eprintln!(
276 "Validation failed at commit {} (batch size {})",
277 commit_count, ops_count
278 );
279 eprintln!("Error: {}", e);
280 eprintln!("Operations:\n{:?}", repo_ops);
281 eprintln!("Relevant blocks:\n{:?}", commit_data.relevant_blocks.keys());
282 eprintln!("All blocks:\n{:?}", commit_data.blocks.keys());
283 panic!(
284 "Validation failed at commit {} (batch size {}): {}",
285 commit_count, ops_count, e
286 )
287 });
288
289 total_ops += ops_count;
290
291 if commit_count % 50 == 0 {
292 println!(
293 "Processed {} commits, {} total operations, {} records in repo",
294 commit_count,
295 total_ops,
296 tracker.records.len()
297 );
298 }
299 }
300
301 println!(
302 "Stress test complete: {} commits, {} operations, {} final records",
303 commit_count,
304 total_ops,
305 tracker.records.len()
306 );
307}
308
309#[tokio::test]
310async fn test_stress_large_batches() {
311 let storage = Arc::new(MemoryBlockStore::new());
312 let mut repo = create_test_repo(storage.clone()).await;
313
314 let collection = Nsid::new("app.bsky.feed.post").unwrap().into_static();
315 let did = Did::new("did:plc:stresstest").unwrap().into_static();
316 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng);
317 let pubkey = get_public_key(&signing_key);
318
319 let mut rng = rand::thread_rng();
320 let mut tracker = RecordTracker::new();
321
322 // Create initial records
323 let initial_ops = generate_creates_only(&mut rng, &mut tracker, 100);
324 let record_writes = test_ops_to_record_writes(initial_ops, &collection);
325 let (repo_ops, commit_data) = repo
326 .create_commit(
327 &record_writes,
328 &did,
329 Some(repo.current_commit_cid().clone()),
330 &signing_key,
331 )
332 .await
333 .unwrap();
334 repo.apply_commit(commit_data.clone()).await.unwrap();
335
336 let firehose_commit = commit_data
337 .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![])
338 .await
339 .unwrap();
340
341 validate_v1_1(&firehose_commit, &pubkey).await.unwrap();
342
343 for batch_num in 1..=5000 {
344 let batch_size = rng.gen_range(1..=20);
345 let ops = generate_random_ops(&mut rng, &mut tracker, batch_size);
346 let record_writes = test_ops_to_record_writes(ops, &collection);
347
348 let (repo_ops, commit_data) = repo
349 .create_commit(&record_writes, &did, None, &signing_key)
350 .await
351 .unwrap();
352
353 repo.apply_commit(commit_data.clone()).await.unwrap();
354
355 let firehose_commit = commit_data
356 .to_firehose_commit(&did, batch_num + 1, Datetime::now(), repo_ops, vec![])
357 .await
358 .unwrap();
359
360 validate_v1_1(&firehose_commit, &pubkey)
361 .await
362 .unwrap_or_else(|e| {
363 panic!(
364 "Large batch validation failed (batch size {}): {}",
365 batch_size, e
366 )
367 });
368
369 repo.apply_commit(commit_data).await.unwrap();
370 // println!(
371 // "Validated large batch {} with {} ops",
372 // batch_num, batch_size
373 // );
374 }
375}
376
377#[tokio::test]
378async fn test_stress_with_fixture() {
379 use jacquard_repo::car::read_car;
380 use std::path::Path;
381 let fixture_path =
382 Path::new("tests/fixtures/repo-nonbinary.computer-2025-10-21T13_05_55.090Z.car");
383
384 // Skip test in CI if fixture doesn't exist
385 if !fixture_path.exists() {
386 println!(
387 "Skipping fixture test - fixture not found at {:?}",
388 fixture_path
389 );
390 return;
391 }
392
393 println!("Loading fixture repo from {:?}", fixture_path);
394
395 // Import CAR into storage
396 let storage = Arc::new(MemoryBlockStore::new());
397 let header = read_car_header(fixture_path).await.unwrap();
398 let parsed_car = read_car(fixture_path).await.unwrap();
399
400 storage.put_many(parsed_car).await.unwrap();
401
402 let root_cid = header.first().unwrap();
403
404 // Load repository from fixture
405 let mut repo = Repository::from_commit(storage.clone(), root_cid)
406 .await
407 .unwrap();
408
409 println!(
410 "Loaded fixture repo with commit at {}",
411 repo.current_commit_cid()
412 );
413
414 let collection = Nsid::new("app.bsky.feed.post").unwrap().into_static();
415 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng);
416 let pubkey = get_public_key(&signing_key);
417 let did = repo.did().clone().into_static();
418
419 let mut rng = rand::thread_rng();
420 let mut tracker = RecordTracker::new();
421
422 // Perform random operations on fixture repo
423 for batch_num in 1..=20 {
424 let batch_size = rng.gen_range(10..=50);
425 let ops = generate_random_ops(&mut rng, &mut tracker, batch_size);
426 let record_writes = test_ops_to_record_writes(ops, &collection);
427
428 let (repo_ops, commit_data) = repo
429 .create_commit(
430 &record_writes,
431 &did,
432 Some(repo.current_commit_cid().clone()),
433 &signing_key,
434 )
435 .await
436 .unwrap();
437
438 repo.apply_commit(commit_data.clone()).await.unwrap();
439
440 let firehose_commit = commit_data
441 .to_firehose_commit(&did, batch_num, Datetime::now(), repo_ops, vec![])
442 .await
443 .unwrap();
444
445 validate_v1_1(&firehose_commit, &pubkey)
446 .await
447 .unwrap_or_else(|e| panic!("Fixture validation failed at batch {}: {}", batch_num, e));
448 }
449
450 println!("Fixture stress test complete - 20 batches validated");
451}