Now let's take a silly one
1use axum::body::Bytes;
2use base64::Engine;
3use base64::engine::general_purpose::URL_SAFE_NO_PAD;
4use futures::future::join_all;
5use futures::stream::StreamExt;
6use http::Method;
7use http::header::AUTHORIZATION;
8use serde_json::{Value, json};
9use std::collections::BTreeSet;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use tower::ServiceExt;
13
14use knot_runtime::{Entropy, K256Signer, SeededEntropy, Signer};
15use knot_types::RepoDid;
16
17use crate::harness::{Harness, SUBJECT_DIDS};
18use crate::trace::{Outcome, Projection, Step, Trace, fnv1a};
19
20const SKEW_BACKDATE_SECS: i64 = 600;
21const SKEW_LIFETIME_SECS: i64 = 60;
22
23#[derive(Clone, Copy)]
24enum ReadOp {
25 Version,
26 Owner,
27 ListMembers,
28 DidJson,
29 InfoRefs(usize),
30 Branches(usize),
31 Log(usize),
32 DescribeRepo(usize),
33 Tree(usize),
34 Blob(usize),
35 Languages(usize),
36}
37
38impl ReadOp {
39 fn name(self) -> &'static str {
40 match self {
41 ReadOp::Version => "version",
42 ReadOp::Owner => "owner",
43 ReadOp::ListMembers => "listMembers",
44 ReadOp::DidJson => "didJson",
45 ReadOp::InfoRefs(_) => "infoRefs",
46 ReadOp::Branches(_) => "branches",
47 ReadOp::Log(_) => "log",
48 ReadOp::DescribeRepo(_) => "describeRepo",
49 ReadOp::Tree(_) => "tree",
50 ReadOp::Blob(_) => "blob",
51 ReadOp::Languages(_) => "languages",
52 }
53 }
54}
55
56#[derive(Clone, Copy)]
57enum AdminOp {
58 AddMember(usize),
59 RemoveMember(usize),
60 Ban(usize),
61 Unban(usize),
62 CreateRepo(u32),
63 AddCollaborator(usize, usize),
64}
65
66impl AdminOp {
67 fn name(self) -> &'static str {
68 match self {
69 AdminOp::AddMember(_) => "addMember",
70 AdminOp::RemoveMember(_) => "removeMember",
71 AdminOp::Ban(_) => "ban",
72 AdminOp::Unban(_) => "unban",
73 AdminOp::CreateRepo(_) => "createRepo",
74 AdminOp::AddCollaborator(_, _) => "addCollaborator",
75 }
76 }
77}
78
79#[derive(Clone, Copy)]
80enum Planned {
81 Read { op: ReadOp, killed: bool },
82 Admin { op: AdminOp, skew: bool },
83 Probe { stranger: usize, drop: bool },
84 Maintain { repo: usize },
85}
86
87pub(crate) struct Round {
88 ops: Vec<Planned>,
89 advance_micros: u64,
90}
91
92struct Rng(SeededEntropy);
93
94impl Rng {
95 fn new(seed: u64) -> Self {
96 Self(SeededEntropy::new(seed ^ 0x57ee_d000))
97 }
98
99 fn below(&self, n: u64) -> u64 {
100 self.0.next_u64() % n.max(1)
101 }
102
103 fn chance(&self, num: u64, den: u64) -> bool {
104 self.below(den) < num
105 }
106}
107
108pub(crate) fn plan(seed: u64, rounds: u32, subjects: usize) -> Vec<Round> {
109 let rng = Rng::new(seed);
110 let mut available: u32 = 1;
111 let mut rkey: u32 = 0;
112 let mut stranger: usize = 0;
113 (0..rounds)
114 .map(|round| {
115 let (ops, fresh_repos) = if round % 2 == 0 {
116 mutate_round(&rng, subjects, available, &mut rkey, &mut stranger)
117 } else {
118 (read_round(&rng, available), 0)
119 };
120 let advance_micros = rng.below(3_000_000);
121 available += fresh_repos;
122 Round {
123 ops,
124 advance_micros,
125 }
126 })
127 .collect()
128}
129
130pub(crate) fn predict(seed: u64, rounds: u32) -> Projection {
131 let (members, blocked) = plan(seed, rounds, SUBJECT_DIDS.len())
132 .iter()
133 .flat_map(|round| round.ops.iter())
134 .fold(
135 (BTreeSet::<String>::new(), BTreeSet::<String>::new()),
136 |(mut members, mut blocked), planned| {
137 if let Planned::Admin { op, skew: false } = planned {
138 match op {
139 AdminOp::AddMember(subject) => {
140 members.insert(SUBJECT_DIDS[*subject].to_string());
141 }
142 AdminOp::RemoveMember(subject) => {
143 members.remove(SUBJECT_DIDS[*subject]);
144 }
145 AdminOp::Ban(subject) => {
146 blocked.insert(SUBJECT_DIDS[*subject].to_string());
147 }
148 AdminOp::Unban(subject) => {
149 blocked.remove(SUBJECT_DIDS[*subject]);
150 }
151 AdminOp::CreateRepo(_) | AdminOp::AddCollaborator(_, _) => {}
152 }
153 }
154 (members, blocked)
155 },
156 );
157 Projection {
158 members: members.into_iter().collect(),
159 blocked: blocked.into_iter().collect(),
160 }
161}
162
163fn mutate_round(
164 rng: &Rng,
165 subjects: usize,
166 available: u32,
167 rkey: &mut u32,
168 stranger: &mut usize,
169) -> (Vec<Planned>, u32) {
170 let mut ops: Vec<Planned> = (0..subjects)
171 .filter(|_| rng.chance(2, 3))
172 .map(|subject| {
173 let op = match rng.below(4) {
174 0 => AdminOp::AddMember(subject),
175 1 => AdminOp::RemoveMember(subject),
176 2 => AdminOp::Ban(subject),
177 _ => AdminOp::Unban(subject),
178 };
179 Planned::Admin {
180 op,
181 skew: rng.chance(1, 5),
182 }
183 })
184 .collect();
185
186 (0..available)
187 .filter(|_| rng.chance(1, 2))
188 .for_each(|repo| {
189 let planned = if rng.chance(1, 2) {
190 let subject = rng.below(subjects as u64) as usize;
191 Planned::Admin {
192 op: AdminOp::AddCollaborator(repo as usize, subject),
193 skew: rng.chance(1, 6),
194 }
195 } else {
196 Planned::Maintain {
197 repo: repo as usize,
198 }
199 };
200 ops.push(planned);
201 });
202
203 let mut fresh_repos = 0;
204 (0..rng.below(3)).for_each(|_| {
205 let key = *rkey;
206 *rkey += 1;
207 let skew = rng.chance(1, 8);
208 if !skew {
209 fresh_repos += 1;
210 }
211 ops.push(Planned::Admin {
212 op: AdminOp::CreateRepo(key),
213 skew,
214 });
215 });
216
217 (0..rng.below(3)).for_each(|_| {
218 let stranger_index = *stranger;
219 *stranger += 1;
220 ops.push(Planned::Probe {
221 stranger: stranger_index,
222 drop: rng.chance(1, 2),
223 });
224 });
225
226 (ops, fresh_repos)
227}
228
229fn read_round(rng: &Rng, available: u32) -> Vec<Planned> {
230 let mut ops: Vec<Planned> = [
231 ReadOp::Version,
232 ReadOp::Owner,
233 ReadOp::ListMembers,
234 ReadOp::DidJson,
235 ]
236 .into_iter()
237 .map(|op| Planned::Read {
238 op,
239 killed: rng.chance(1, 5),
240 })
241 .collect();
242 (0..available)
243 .filter(|_| rng.chance(2, 3))
244 .for_each(|repo| {
245 let repo = repo as usize;
246 let op = match rng.below(7) {
247 0 => ReadOp::Branches(repo),
248 1 => ReadOp::Log(repo),
249 2 => ReadOp::DescribeRepo(repo),
250 3 => ReadOp::InfoRefs(repo),
251 4 => ReadOp::Tree(repo),
252 5 => ReadOp::Blob(repo),
253 _ => ReadOp::Languages(repo),
254 };
255 ops.push(Planned::Read {
256 op,
257 killed: rng.chance(1, 5),
258 });
259 });
260 ops
261}
262
263struct OpResult {
264 step: Step,
265 created: Option<RepoDid>,
266}
267
268pub(crate) async fn execute(harness: Arc<Harness>, seed: u64, plan: Vec<Round>) -> Trace {
269 let initial = (
270 vec![harness.seed_repo.clone()],
271 Vec::<Step>::new(),
272 Vec::new(),
273 );
274 let harness = &harness;
275 let (repos, steps, snapshots) = futures::stream::iter(plan.into_iter().enumerate())
276 .fold(
277 initial,
278 |(repos, mut steps, mut snapshots), (round_index, round)| {
279 let harness = Arc::clone(harness);
280 async move {
281 let round_no = round_index as u32;
282 let drops = arm_drops(&harness, &round.ops);
283 let repos = Arc::new(repos);
284 let tasks = round
285 .ops
286 .iter()
287 .enumerate()
288 .map(|(index, planned)| {
289 let harness = Arc::clone(&harness);
290 let repos = Arc::clone(&repos);
291 let planned = *planned;
292 tokio::spawn(async move {
293 run_op(&harness, &repos, round_no, index as u32, planned).await
294 })
295 })
296 .collect::<Vec<_>>();
297 let results: Vec<OpResult> = join_all(tasks)
298 .await
299 .into_iter()
300 .map(|joined| joined.expect("sim op task must not panic"))
301 .collect();
302 drops
303 .iter()
304 .for_each(|host| harness.faults.clear_host(host));
305
306 let created: Vec<RepoDid> = results
307 .iter()
308 .filter_map(|result| result.created.clone())
309 .collect();
310 let planned_creates = round
311 .ops
312 .iter()
313 .filter(|planned| {
314 matches!(
315 planned,
316 Planned::Admin {
317 op: AdminOp::CreateRepo(_),
318 skew: false,
319 }
320 )
321 })
322 .count();
323 assert_eq!(
324 planned_creates,
325 created.len(),
326 "round {round_no}: {planned_creates} non-skew creates planned but \
327 {} materialized, so plan/execute repo indices have drifted apart",
328 created.len()
329 );
330 created.iter().for_each(|did| harness.populate(did));
331 steps.extend(results.into_iter().map(|result| result.step));
332 let mut repos = Arc::into_inner(repos)
333 .expect("all op tasks released the round repo snapshot");
334 repos.extend(created);
335
336 harness.advance(round.advance_micros);
337 snapshots.push(harness.snapshot(round_no, &repos));
338 (repos, steps, snapshots)
339 }
340 },
341 )
342 .await;
343 let no_fault_creates = steps
344 .iter()
345 .filter(|step| step.op == "createRepo" && step.fault == "none")
346 .count();
347 let materialized = repos.len() - 1;
348 assert_eq!(
349 no_fault_creates, materialized,
350 "no-fault createRepo count {no_fault_creates} does not match the {materialized} repos \
351 materialized: a planned create silently failed and repo_at would have masked the drift"
352 );
353 Trace {
354 seed,
355 steps,
356 snapshots,
357 }
358}
359
360fn arm_drops(harness: &Harness, ops: &[Planned]) -> Vec<String> {
361 let hosts: Vec<String> = ops
362 .iter()
363 .filter_map(|planned| match planned {
364 Planned::Probe {
365 stranger,
366 drop: true,
367 } => Some(harness.strangers[*stranger].host.clone()),
368 _ => None,
369 })
370 .collect();
371 hosts.iter().for_each(|host| harness.faults.drop_host(host));
372 hosts
373}
374
375async fn run_op(
376 harness: &Harness,
377 repos: &[RepoDid],
378 round: u32,
379 index: u32,
380 planned: Planned,
381) -> OpResult {
382 let make = |op: &'static str, actor: String, fault: &'static str, outcome: Outcome| Step {
383 round,
384 index,
385 op,
386 actor,
387 fault,
388 outcome,
389 };
390
391 match planned {
392 Planned::Maintain { repo } => {
393 let repo = repo_at(repos, repo);
394 let outcome = match harness.maintain(repo) {
395 Ok(()) => Outcome::Answered {
396 status: 200,
397 body: 0,
398 },
399 Err(message) => Outcome::Answered {
400 status: 500,
401 body: fnv1a(message.as_bytes()),
402 },
403 };
404 OpResult {
405 step: make("maintain", "knot".to_string(), "none", outcome),
406 created: None,
407 }
408 }
409 Planned::Read { op, killed } => {
410 let request = read_request(repos, op);
411 if killed {
412 drive_kill(harness.router(), request.method, &request.uri, request.body).await;
413 return OpResult {
414 step: make(op.name(), request.actor, "killed", Outcome::Killed),
415 created: None,
416 };
417 }
418 let (status, body) = http_call(
419 harness.router(),
420 request.method,
421 &request.uri,
422 None,
423 request.body,
424 )
425 .await;
426 OpResult {
427 step: make(
428 op.name(),
429 request.actor,
430 "none",
431 Outcome::Answered {
432 status,
433 body: body_digest(&body),
434 },
435 ),
436 created: None,
437 }
438 }
439 Planned::Admin { op, skew } => {
440 let request = admin_request(harness, repos, op, skew, round, index);
441 let (status, body) = http_call(
442 harness.router(),
443 request.method,
444 &request.uri,
445 request.token.as_deref(),
446 request.body,
447 )
448 .await;
449 let created = match op {
450 AdminOp::CreateRepo(_) if status == 200 => repo_did_of(&body),
451 _ => None,
452 };
453 OpResult {
454 step: make(
455 op.name(),
456 request.actor,
457 if skew { "clock_skew" } else { "none" },
458 Outcome::Answered {
459 status,
460 body: body_digest(&body),
461 },
462 ),
463 created,
464 }
465 }
466 Planned::Probe { stranger, drop } => {
467 let request = probe_request(harness, stranger, round, index);
468 let (status, body) = http_call(
469 harness.router(),
470 request.method,
471 &request.uri,
472 request.token.as_deref(),
473 request.body,
474 )
475 .await;
476 OpResult {
477 step: make(
478 "probe",
479 request.actor,
480 if drop { "drop_identity" } else { "none" },
481 Outcome::Answered {
482 status,
483 body: body_digest(&body),
484 },
485 ),
486 created: None,
487 }
488 }
489 }
490}
491
492struct Request {
493 method: Method,
494 uri: String,
495 token: Option<String>,
496 body: Bytes,
497 actor: String,
498}
499
500fn read_request(repos: &[RepoDid], op: ReadOp) -> Request {
501 match op {
502 ReadOp::Version => get("/xrpc/sh.tangled.knot.version"),
503 ReadOp::Owner => get("/xrpc/sh.tangled.owner"),
504 ReadOp::ListMembers => get("/xrpc/sh.tangled.knot.listMembers?subject=knot"),
505 ReadOp::DidJson => get("/.well-known/did.json"),
506 ReadOp::InfoRefs(repo) => Request {
507 method: Method::GET,
508 uri: format!(
509 "/{}/info/refs?service=git-upload-pack",
510 repo_at(repos, repo).as_str()
511 ),
512 token: None,
513 body: Bytes::new(),
514 actor: "anon".to_string(),
515 },
516 ReadOp::Branches(repo) => repo_get("branches", "repo", repo_at(repos, repo)),
517 ReadOp::Log(repo) => repo_get("log", "repo", repo_at(repos, repo)),
518 ReadOp::DescribeRepo(repo) => repo_get("describeRepo", "repoDid", repo_at(repos, repo)),
519 ReadOp::Tree(repo) => repo_get("tree", "repo", repo_at(repos, repo)),
520 ReadOp::Languages(repo) => repo_get("languages", "repo", repo_at(repos, repo)),
521 ReadOp::Blob(repo) => Request {
522 method: Method::GET,
523 uri: format!(
524 "/xrpc/sh.tangled.repo.blob?repo={}&path=README.md",
525 enc(repo_at(repos, repo).as_str())
526 ),
527 token: None,
528 body: Bytes::new(),
529 actor: "anon".to_string(),
530 },
531 }
532}
533
534fn admin_request(
535 harness: &Harness,
536 repos: &[RepoDid],
537 op: AdminOp,
538 skew: bool,
539 round: u32,
540 index: u32,
541) -> Request {
542 let subjects = &harness.subjects;
543 match op {
544 AdminOp::AddMember(subject) => admin_post(
545 harness,
546 "addMember",
547 "sh.tangled.knot.addMember",
548 json!({ "subject": subjects[subject].as_str() }),
549 skew,
550 round,
551 index,
552 ),
553 AdminOp::RemoveMember(subject) => admin_post(
554 harness,
555 "removeMember",
556 "sh.tangled.knot.removeMember",
557 json!({ "subject": subjects[subject].as_str() }),
558 skew,
559 round,
560 index,
561 ),
562 AdminOp::Ban(subject) => admin_post(
563 harness,
564 "ban",
565 "sh.tangled.knot.ban",
566 json!({ "subject": subjects[subject].as_str() }),
567 skew,
568 round,
569 index,
570 ),
571 AdminOp::Unban(subject) => admin_post(
572 harness,
573 "unban",
574 "sh.tangled.knot.unban",
575 json!({ "subject": subjects[subject].as_str() }),
576 skew,
577 round,
578 index,
579 ),
580 AdminOp::CreateRepo(key) => {
581 let name = format!("repo{key}");
582 admin_post(
583 harness,
584 "create",
585 "sh.tangled.repo.create",
586 json!({ "rkey": name, "name": name }),
587 skew,
588 round,
589 index,
590 )
591 }
592 AdminOp::AddCollaborator(repo, subject) => admin_post(
593 harness,
594 "addCollaborator",
595 "sh.tangled.repo.addCollaborator",
596 json!({
597 "repo": repo_at(repos, repo).as_str(),
598 "subject": subjects[subject].as_str(),
599 }),
600 skew,
601 round,
602 index,
603 ),
604 }
605}
606
607fn probe_request(harness: &Harness, stranger: usize, round: u32, index: u32) -> Request {
608 let actor = &harness.strangers[stranger];
609 let token = mint(
610 &actor.signer,
611 actor.did.as_str(),
612 &harness.knot_aud,
613 "sh.tangled.knot.addMember",
614 jwt_window(harness, false),
615 round,
616 index,
617 );
618 Request {
619 method: Method::POST,
620 uri: "/xrpc/sh.tangled.knot.addMember".to_string(),
621 token: Some(token),
622 body: encode_body(json!({ "subject": harness.subjects[0].as_str() })),
623 actor: actor.host.clone(),
624 }
625}
626
627fn get(path: &str) -> Request {
628 Request {
629 method: Method::GET,
630 uri: path.to_string(),
631 token: None,
632 body: Bytes::new(),
633 actor: "anon".to_string(),
634 }
635}
636
637fn repo_get(method: &str, param: &str, repo: &RepoDid) -> Request {
638 Request {
639 method: Method::GET,
640 uri: format!(
641 "/xrpc/sh.tangled.repo.{method}?{param}={}",
642 enc(repo.as_str())
643 ),
644 token: None,
645 body: Bytes::new(),
646 actor: "anon".to_string(),
647 }
648}
649
650fn admin_post(
651 harness: &Harness,
652 method_short: &str,
653 nsid: &'static str,
654 body: Value,
655 skew: bool,
656 round: u32,
657 index: u32,
658) -> Request {
659 let admin = &harness.admin;
660 let token = mint(
661 &admin.signer,
662 admin.did.as_str(),
663 &harness.knot_aud,
664 nsid,
665 jwt_window(harness, skew),
666 round,
667 index,
668 );
669 Request {
670 method: Method::POST,
671 uri: format!("/xrpc/{nsid}"),
672 token: Some(token),
673 body: encode_body(body),
674 actor: format!("admin:{method_short}"),
675 }
676}
677
678fn jwt_window(harness: &Harness, skew: bool) -> (i64, i64) {
679 let now = harness.now_seconds();
680 if skew {
681 (
682 now - SKEW_BACKDATE_SECS - SKEW_LIFETIME_SECS,
683 now - SKEW_BACKDATE_SECS,
684 )
685 } else {
686 (now, now + 60)
687 }
688}
689
690fn mint(
691 signer: &K256Signer,
692 issuer: &str,
693 aud: &str,
694 nsid: &str,
695 window: (i64, i64),
696 round: u32,
697 index: u32,
698) -> String {
699 let header = URL_SAFE_NO_PAD.encode(br#"{"alg":"ES256K","typ":"JWT"}"#);
700 let payload = URL_SAFE_NO_PAD.encode(
701 serde_json::to_vec(&json!({
702 "iss": issuer,
703 "aud": aud,
704 "iat": window.0,
705 "exp": window.1,
706 "jti": format!("sim-{round}-{index}"),
707 "lxm": nsid,
708 }))
709 .expect("claims serialize"),
710 );
711 let signing_input = format!("{header}.{payload}");
712 let signature = signer.sign(signing_input.as_bytes());
713 format!(
714 "{signing_input}.{}",
715 URL_SAFE_NO_PAD.encode(signature.as_bytes())
716 )
717}
718
719async fn http_call(
720 router: axum::Router,
721 method: Method,
722 uri: &str,
723 token: Option<&str>,
724 body: Bytes,
725) -> (u16, Bytes) {
726 let mut request = http::Request::builder()
727 .method(method)
728 .uri(uri)
729 .body(axum::body::Body::from(body))
730 .expect("request builds");
731 if let Some(token) = token {
732 request.headers_mut().insert(
733 AUTHORIZATION,
734 http::HeaderValue::from_str(&format!("Bearer {token}")).expect("bearer header"),
735 );
736 }
737 request
738 .extensions_mut()
739 .insert(axum::extract::ConnectInfo(SocketAddr::from((
740 [127, 0, 0, 1],
741 4242,
742 ))));
743 let response = router.oneshot(request).await.expect("router answers");
744 let status = response.status().as_u16();
745 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
746 .await
747 .expect("response body");
748 (status, bytes)
749}
750
751async fn drive_kill(router: axum::Router, method: Method, uri: &str, body: Bytes) {
752 let call = http_call(router, method, uri, None, body);
753 futures::pin_mut!(call);
754 tokio::select! {
755 biased;
756 _ = &mut call => {}
757 _ = tokio::task::yield_now() => {}
758 }
759}
760
761fn repo_did_of(body: &Bytes) -> Option<RepoDid> {
762 serde_json::from_slice::<Value>(body)
763 .ok()
764 .and_then(|value| {
765 value
766 .get("repoDid")
767 .and_then(Value::as_str)
768 .map(str::to_string)
769 })
770 .and_then(|did| RepoDid::new(did).ok())
771}
772
773fn body_digest(body: &Bytes) -> u64 {
774 match serde_json::from_slice::<Value>(body) {
775 Ok(mut value) => {
776 canonicalize(&mut value);
777 fnv1a(&serde_json::to_vec(&value).expect("canonical body serializes"))
778 }
779 Err(_) => fnv1a(body),
780 }
781}
782
783fn canonicalize(value: &mut Value) {
784 match value {
785 Value::Array(items) => {
786 items.iter_mut().for_each(canonicalize);
787 items.sort_by_cached_key(|item| serde_json::to_string(item).expect("array item"));
788 }
789 Value::Object(map) => map.values_mut().for_each(canonicalize),
790 _ => {}
791 }
792}
793
794fn encode_body(value: Value) -> Bytes {
795 Bytes::from(serde_json::to_vec(&value).expect("request body serializes"))
796}
797
798fn enc(did: &str) -> String {
799 did.replace(':', "%3A")
800}
801
802fn repo_at(repos: &[RepoDid], index: usize) -> &RepoDid {
803 repos.get(index).unwrap_or_else(|| {
804 panic!(
805 "plan/execute repo drift: index {index} exceeds {} repos created so far",
806 repos.len()
807 )
808 })
809}