Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 24 kB View raw
1use axum::body::Bytes; 2use base64::Engine; 3use base64::engine::general_purpose::URL_SAFE_NO_PAD; 4use futures::future::join_all; 5use futures::stream::StreamExt; 6use http::Method; 7use http::header::AUTHORIZATION; 8use serde_json::{Value, json}; 9use std::collections::BTreeSet; 10use std::net::SocketAddr; 11use std::sync::Arc; 12use tower::ServiceExt; 13 14use knot_runtime::{Entropy, K256Signer, SeededEntropy, Signer}; 15use knot_types::RepoDid; 16 17use crate::harness::{Harness, SUBJECT_DIDS}; 18use crate::trace::{Outcome, Projection, Step, Trace, fnv1a}; 19 20const SKEW_BACKDATE_SECS: i64 = 600; 21const SKEW_LIFETIME_SECS: i64 = 60; 22 23#[derive(Clone, Copy)] 24enum ReadOp { 25 Version, 26 Owner, 27 ListMembers, 28 DidJson, 29 InfoRefs(usize), 30 Branches(usize), 31 Log(usize), 32 DescribeRepo(usize), 33 Tree(usize), 34 Blob(usize), 35 Languages(usize), 36} 37 38impl ReadOp { 39 fn name(self) -> &'static str { 40 match self { 41 ReadOp::Version => "version", 42 ReadOp::Owner => "owner", 43 ReadOp::ListMembers => "listMembers", 44 ReadOp::DidJson => "didJson", 45 ReadOp::InfoRefs(_) => "infoRefs", 46 ReadOp::Branches(_) => "branches", 47 ReadOp::Log(_) => "log", 48 ReadOp::DescribeRepo(_) => "describeRepo", 49 ReadOp::Tree(_) => "tree", 50 ReadOp::Blob(_) => "blob", 51 ReadOp::Languages(_) => "languages", 52 } 53 } 54} 55 56#[derive(Clone, Copy)] 57enum AdminOp { 58 AddMember(usize), 59 RemoveMember(usize), 60 Ban(usize), 61 Unban(usize), 62 CreateRepo(u32), 63 AddCollaborator(usize, usize), 64} 65 66impl AdminOp { 67 fn name(self) -> &'static str { 68 match self { 69 AdminOp::AddMember(_) => "addMember", 70 AdminOp::RemoveMember(_) => "removeMember", 71 AdminOp::Ban(_) => "ban", 72 AdminOp::Unban(_) => "unban", 73 AdminOp::CreateRepo(_) => "createRepo", 74 AdminOp::AddCollaborator(_, _) => "addCollaborator", 75 } 76 } 77} 78 79#[derive(Clone, Copy)] 80enum Planned { 81 Read { op: ReadOp, killed: bool }, 82 Admin { op: AdminOp, skew: bool }, 83 Probe { stranger: usize, drop: bool }, 84 Maintain { repo: usize }, 85} 86 87pub(crate) struct Round { 88 ops: Vec<Planned>, 89 advance_micros: u64, 90} 91 92struct Rng(SeededEntropy); 93 94impl Rng { 95 fn new(seed: u64) -> Self { 96 Self(SeededEntropy::new(seed ^ 0x57ee_d000)) 97 } 98 99 fn below(&self, n: u64) -> u64 { 100 self.0.next_u64() % n.max(1) 101 } 102 103 fn chance(&self, num: u64, den: u64) -> bool { 104 self.below(den) < num 105 } 106} 107 108pub(crate) fn plan(seed: u64, rounds: u32, subjects: usize) -> Vec<Round> { 109 let rng = Rng::new(seed); 110 let mut available: u32 = 1; 111 let mut rkey: u32 = 0; 112 let mut stranger: usize = 0; 113 (0..rounds) 114 .map(|round| { 115 let (ops, fresh_repos) = if round % 2 == 0 { 116 mutate_round(&rng, subjects, available, &mut rkey, &mut stranger) 117 } else { 118 (read_round(&rng, available), 0) 119 }; 120 let advance_micros = rng.below(3_000_000); 121 available += fresh_repos; 122 Round { 123 ops, 124 advance_micros, 125 } 126 }) 127 .collect() 128} 129 130pub(crate) fn predict(seed: u64, rounds: u32) -> Projection { 131 let (members, blocked) = plan(seed, rounds, SUBJECT_DIDS.len()) 132 .iter() 133 .flat_map(|round| round.ops.iter()) 134 .fold( 135 (BTreeSet::<String>::new(), BTreeSet::<String>::new()), 136 |(mut members, mut blocked), planned| { 137 if let Planned::Admin { op, skew: false } = planned { 138 match op { 139 AdminOp::AddMember(subject) => { 140 members.insert(SUBJECT_DIDS[*subject].to_string()); 141 } 142 AdminOp::RemoveMember(subject) => { 143 members.remove(SUBJECT_DIDS[*subject]); 144 } 145 AdminOp::Ban(subject) => { 146 blocked.insert(SUBJECT_DIDS[*subject].to_string()); 147 } 148 AdminOp::Unban(subject) => { 149 blocked.remove(SUBJECT_DIDS[*subject]); 150 } 151 AdminOp::CreateRepo(_) | AdminOp::AddCollaborator(_, _) => {} 152 } 153 } 154 (members, blocked) 155 }, 156 ); 157 Projection { 158 members: members.into_iter().collect(), 159 blocked: blocked.into_iter().collect(), 160 } 161} 162 163fn mutate_round( 164 rng: &Rng, 165 subjects: usize, 166 available: u32, 167 rkey: &mut u32, 168 stranger: &mut usize, 169) -> (Vec<Planned>, u32) { 170 let mut ops: Vec<Planned> = (0..subjects) 171 .filter(|_| rng.chance(2, 3)) 172 .map(|subject| { 173 let op = match rng.below(4) { 174 0 => AdminOp::AddMember(subject), 175 1 => AdminOp::RemoveMember(subject), 176 2 => AdminOp::Ban(subject), 177 _ => AdminOp::Unban(subject), 178 }; 179 Planned::Admin { 180 op, 181 skew: rng.chance(1, 5), 182 } 183 }) 184 .collect(); 185 186 (0..available) 187 .filter(|_| rng.chance(1, 2)) 188 .for_each(|repo| { 189 let planned = if rng.chance(1, 2) { 190 let subject = rng.below(subjects as u64) as usize; 191 Planned::Admin { 192 op: AdminOp::AddCollaborator(repo as usize, subject), 193 skew: rng.chance(1, 6), 194 } 195 } else { 196 Planned::Maintain { 197 repo: repo as usize, 198 } 199 }; 200 ops.push(planned); 201 }); 202 203 let mut fresh_repos = 0; 204 (0..rng.below(3)).for_each(|_| { 205 let key = *rkey; 206 *rkey += 1; 207 let skew = rng.chance(1, 8); 208 if !skew { 209 fresh_repos += 1; 210 } 211 ops.push(Planned::Admin { 212 op: AdminOp::CreateRepo(key), 213 skew, 214 }); 215 }); 216 217 (0..rng.below(3)).for_each(|_| { 218 let stranger_index = *stranger; 219 *stranger += 1; 220 ops.push(Planned::Probe { 221 stranger: stranger_index, 222 drop: rng.chance(1, 2), 223 }); 224 }); 225 226 (ops, fresh_repos) 227} 228 229fn read_round(rng: &Rng, available: u32) -> Vec<Planned> { 230 let mut ops: Vec<Planned> = [ 231 ReadOp::Version, 232 ReadOp::Owner, 233 ReadOp::ListMembers, 234 ReadOp::DidJson, 235 ] 236 .into_iter() 237 .map(|op| Planned::Read { 238 op, 239 killed: rng.chance(1, 5), 240 }) 241 .collect(); 242 (0..available) 243 .filter(|_| rng.chance(2, 3)) 244 .for_each(|repo| { 245 let repo = repo as usize; 246 let op = match rng.below(7) { 247 0 => ReadOp::Branches(repo), 248 1 => ReadOp::Log(repo), 249 2 => ReadOp::DescribeRepo(repo), 250 3 => ReadOp::InfoRefs(repo), 251 4 => ReadOp::Tree(repo), 252 5 => ReadOp::Blob(repo), 253 _ => ReadOp::Languages(repo), 254 }; 255 ops.push(Planned::Read { 256 op, 257 killed: rng.chance(1, 5), 258 }); 259 }); 260 ops 261} 262 263struct OpResult { 264 step: Step, 265 created: Option<RepoDid>, 266} 267 268pub(crate) async fn execute(harness: Arc<Harness>, seed: u64, plan: Vec<Round>) -> Trace { 269 let initial = ( 270 vec![harness.seed_repo.clone()], 271 Vec::<Step>::new(), 272 Vec::new(), 273 ); 274 let harness = &harness; 275 let (repos, steps, snapshots) = futures::stream::iter(plan.into_iter().enumerate()) 276 .fold( 277 initial, 278 |(repos, mut steps, mut snapshots), (round_index, round)| { 279 let harness = Arc::clone(harness); 280 async move { 281 let round_no = round_index as u32; 282 let drops = arm_drops(&harness, &round.ops); 283 let repos = Arc::new(repos); 284 let tasks = round 285 .ops 286 .iter() 287 .enumerate() 288 .map(|(index, planned)| { 289 let harness = Arc::clone(&harness); 290 let repos = Arc::clone(&repos); 291 let planned = *planned; 292 tokio::spawn(async move { 293 run_op(&harness, &repos, round_no, index as u32, planned).await 294 }) 295 }) 296 .collect::<Vec<_>>(); 297 let results: Vec<OpResult> = join_all(tasks) 298 .await 299 .into_iter() 300 .map(|joined| joined.expect("sim op task must not panic")) 301 .collect(); 302 drops 303 .iter() 304 .for_each(|host| harness.faults.clear_host(host)); 305 306 let created: Vec<RepoDid> = results 307 .iter() 308 .filter_map(|result| result.created.clone()) 309 .collect(); 310 let planned_creates = round 311 .ops 312 .iter() 313 .filter(|planned| { 314 matches!( 315 planned, 316 Planned::Admin { 317 op: AdminOp::CreateRepo(_), 318 skew: false, 319 } 320 ) 321 }) 322 .count(); 323 assert_eq!( 324 planned_creates, 325 created.len(), 326 "round {round_no}: {planned_creates} non-skew creates planned but \ 327 {} materialized, so plan/execute repo indices have drifted apart", 328 created.len() 329 ); 330 created.iter().for_each(|did| harness.populate(did)); 331 steps.extend(results.into_iter().map(|result| result.step)); 332 let mut repos = Arc::into_inner(repos) 333 .expect("all op tasks released the round repo snapshot"); 334 repos.extend(created); 335 336 harness.advance(round.advance_micros); 337 snapshots.push(harness.snapshot(round_no, &repos)); 338 (repos, steps, snapshots) 339 } 340 }, 341 ) 342 .await; 343 let no_fault_creates = steps 344 .iter() 345 .filter(|step| step.op == "createRepo" && step.fault == "none") 346 .count(); 347 let materialized = repos.len() - 1; 348 assert_eq!( 349 no_fault_creates, materialized, 350 "no-fault createRepo count {no_fault_creates} does not match the {materialized} repos \ 351 materialized: a planned create silently failed and repo_at would have masked the drift" 352 ); 353 Trace { 354 seed, 355 steps, 356 snapshots, 357 } 358} 359 360fn arm_drops(harness: &Harness, ops: &[Planned]) -> Vec<String> { 361 let hosts: Vec<String> = ops 362 .iter() 363 .filter_map(|planned| match planned { 364 Planned::Probe { 365 stranger, 366 drop: true, 367 } => Some(harness.strangers[*stranger].host.clone()), 368 _ => None, 369 }) 370 .collect(); 371 hosts.iter().for_each(|host| harness.faults.drop_host(host)); 372 hosts 373} 374 375async fn run_op( 376 harness: &Harness, 377 repos: &[RepoDid], 378 round: u32, 379 index: u32, 380 planned: Planned, 381) -> OpResult { 382 let make = |op: &'static str, actor: String, fault: &'static str, outcome: Outcome| Step { 383 round, 384 index, 385 op, 386 actor, 387 fault, 388 outcome, 389 }; 390 391 match planned { 392 Planned::Maintain { repo } => { 393 let repo = repo_at(repos, repo); 394 let outcome = match harness.maintain(repo) { 395 Ok(()) => Outcome::Answered { 396 status: 200, 397 body: 0, 398 }, 399 Err(message) => Outcome::Answered { 400 status: 500, 401 body: fnv1a(message.as_bytes()), 402 }, 403 }; 404 OpResult { 405 step: make("maintain", "knot".to_string(), "none", outcome), 406 created: None, 407 } 408 } 409 Planned::Read { op, killed } => { 410 let request = read_request(repos, op); 411 if killed { 412 drive_kill(harness.router(), request.method, &request.uri, request.body).await; 413 return OpResult { 414 step: make(op.name(), request.actor, "killed", Outcome::Killed), 415 created: None, 416 }; 417 } 418 let (status, body) = http_call( 419 harness.router(), 420 request.method, 421 &request.uri, 422 None, 423 request.body, 424 ) 425 .await; 426 OpResult { 427 step: make( 428 op.name(), 429 request.actor, 430 "none", 431 Outcome::Answered { 432 status, 433 body: body_digest(&body), 434 }, 435 ), 436 created: None, 437 } 438 } 439 Planned::Admin { op, skew } => { 440 let request = admin_request(harness, repos, op, skew, round, index); 441 let (status, body) = http_call( 442 harness.router(), 443 request.method, 444 &request.uri, 445 request.token.as_deref(), 446 request.body, 447 ) 448 .await; 449 let created = match op { 450 AdminOp::CreateRepo(_) if status == 200 => repo_did_of(&body), 451 _ => None, 452 }; 453 OpResult { 454 step: make( 455 op.name(), 456 request.actor, 457 if skew { "clock_skew" } else { "none" }, 458 Outcome::Answered { 459 status, 460 body: body_digest(&body), 461 }, 462 ), 463 created, 464 } 465 } 466 Planned::Probe { stranger, drop } => { 467 let request = probe_request(harness, stranger, round, index); 468 let (status, body) = http_call( 469 harness.router(), 470 request.method, 471 &request.uri, 472 request.token.as_deref(), 473 request.body, 474 ) 475 .await; 476 OpResult { 477 step: make( 478 "probe", 479 request.actor, 480 if drop { "drop_identity" } else { "none" }, 481 Outcome::Answered { 482 status, 483 body: body_digest(&body), 484 }, 485 ), 486 created: None, 487 } 488 } 489 } 490} 491 492struct Request { 493 method: Method, 494 uri: String, 495 token: Option<String>, 496 body: Bytes, 497 actor: String, 498} 499 500fn read_request(repos: &[RepoDid], op: ReadOp) -> Request { 501 match op { 502 ReadOp::Version => get("/xrpc/sh.tangled.knot.version"), 503 ReadOp::Owner => get("/xrpc/sh.tangled.owner"), 504 ReadOp::ListMembers => get("/xrpc/sh.tangled.knot.listMembers?subject=knot"), 505 ReadOp::DidJson => get("/.well-known/did.json"), 506 ReadOp::InfoRefs(repo) => Request { 507 method: Method::GET, 508 uri: format!( 509 "/{}/info/refs?service=git-upload-pack", 510 repo_at(repos, repo).as_str() 511 ), 512 token: None, 513 body: Bytes::new(), 514 actor: "anon".to_string(), 515 }, 516 ReadOp::Branches(repo) => repo_get("branches", "repo", repo_at(repos, repo)), 517 ReadOp::Log(repo) => repo_get("log", "repo", repo_at(repos, repo)), 518 ReadOp::DescribeRepo(repo) => repo_get("describeRepo", "repoDid", repo_at(repos, repo)), 519 ReadOp::Tree(repo) => repo_get("tree", "repo", repo_at(repos, repo)), 520 ReadOp::Languages(repo) => repo_get("languages", "repo", repo_at(repos, repo)), 521 ReadOp::Blob(repo) => Request { 522 method: Method::GET, 523 uri: format!( 524 "/xrpc/sh.tangled.repo.blob?repo={}&path=README.md", 525 enc(repo_at(repos, repo).as_str()) 526 ), 527 token: None, 528 body: Bytes::new(), 529 actor: "anon".to_string(), 530 }, 531 } 532} 533 534fn admin_request( 535 harness: &Harness, 536 repos: &[RepoDid], 537 op: AdminOp, 538 skew: bool, 539 round: u32, 540 index: u32, 541) -> Request { 542 let subjects = &harness.subjects; 543 match op { 544 AdminOp::AddMember(subject) => admin_post( 545 harness, 546 "addMember", 547 "sh.tangled.knot.addMember", 548 json!({ "subject": subjects[subject].as_str() }), 549 skew, 550 round, 551 index, 552 ), 553 AdminOp::RemoveMember(subject) => admin_post( 554 harness, 555 "removeMember", 556 "sh.tangled.knot.removeMember", 557 json!({ "subject": subjects[subject].as_str() }), 558 skew, 559 round, 560 index, 561 ), 562 AdminOp::Ban(subject) => admin_post( 563 harness, 564 "ban", 565 "sh.tangled.knot.ban", 566 json!({ "subject": subjects[subject].as_str() }), 567 skew, 568 round, 569 index, 570 ), 571 AdminOp::Unban(subject) => admin_post( 572 harness, 573 "unban", 574 "sh.tangled.knot.unban", 575 json!({ "subject": subjects[subject].as_str() }), 576 skew, 577 round, 578 index, 579 ), 580 AdminOp::CreateRepo(key) => { 581 let name = format!("repo{key}"); 582 admin_post( 583 harness, 584 "create", 585 "sh.tangled.repo.create", 586 json!({ "rkey": name, "name": name }), 587 skew, 588 round, 589 index, 590 ) 591 } 592 AdminOp::AddCollaborator(repo, subject) => admin_post( 593 harness, 594 "addCollaborator", 595 "sh.tangled.repo.addCollaborator", 596 json!({ 597 "repo": repo_at(repos, repo).as_str(), 598 "subject": subjects[subject].as_str(), 599 }), 600 skew, 601 round, 602 index, 603 ), 604 } 605} 606 607fn probe_request(harness: &Harness, stranger: usize, round: u32, index: u32) -> Request { 608 let actor = &harness.strangers[stranger]; 609 let token = mint( 610 &actor.signer, 611 actor.did.as_str(), 612 &harness.knot_aud, 613 "sh.tangled.knot.addMember", 614 jwt_window(harness, false), 615 round, 616 index, 617 ); 618 Request { 619 method: Method::POST, 620 uri: "/xrpc/sh.tangled.knot.addMember".to_string(), 621 token: Some(token), 622 body: encode_body(json!({ "subject": harness.subjects[0].as_str() })), 623 actor: actor.host.clone(), 624 } 625} 626 627fn get(path: &str) -> Request { 628 Request { 629 method: Method::GET, 630 uri: path.to_string(), 631 token: None, 632 body: Bytes::new(), 633 actor: "anon".to_string(), 634 } 635} 636 637fn repo_get(method: &str, param: &str, repo: &RepoDid) -> Request { 638 Request { 639 method: Method::GET, 640 uri: format!( 641 "/xrpc/sh.tangled.repo.{method}?{param}={}", 642 enc(repo.as_str()) 643 ), 644 token: None, 645 body: Bytes::new(), 646 actor: "anon".to_string(), 647 } 648} 649 650fn admin_post( 651 harness: &Harness, 652 method_short: &str, 653 nsid: &'static str, 654 body: Value, 655 skew: bool, 656 round: u32, 657 index: u32, 658) -> Request { 659 let admin = &harness.admin; 660 let token = mint( 661 &admin.signer, 662 admin.did.as_str(), 663 &harness.knot_aud, 664 nsid, 665 jwt_window(harness, skew), 666 round, 667 index, 668 ); 669 Request { 670 method: Method::POST, 671 uri: format!("/xrpc/{nsid}"), 672 token: Some(token), 673 body: encode_body(body), 674 actor: format!("admin:{method_short}"), 675 } 676} 677 678fn jwt_window(harness: &Harness, skew: bool) -> (i64, i64) { 679 let now = harness.now_seconds(); 680 if skew { 681 ( 682 now - SKEW_BACKDATE_SECS - SKEW_LIFETIME_SECS, 683 now - SKEW_BACKDATE_SECS, 684 ) 685 } else { 686 (now, now + 60) 687 } 688} 689 690fn mint( 691 signer: &K256Signer, 692 issuer: &str, 693 aud: &str, 694 nsid: &str, 695 window: (i64, i64), 696 round: u32, 697 index: u32, 698) -> String { 699 let header = URL_SAFE_NO_PAD.encode(br#"{"alg":"ES256K","typ":"JWT"}"#); 700 let payload = URL_SAFE_NO_PAD.encode( 701 serde_json::to_vec(&json!({ 702 "iss": issuer, 703 "aud": aud, 704 "iat": window.0, 705 "exp": window.1, 706 "jti": format!("sim-{round}-{index}"), 707 "lxm": nsid, 708 })) 709 .expect("claims serialize"), 710 ); 711 let signing_input = format!("{header}.{payload}"); 712 let signature = signer.sign(signing_input.as_bytes()); 713 format!( 714 "{signing_input}.{}", 715 URL_SAFE_NO_PAD.encode(signature.as_bytes()) 716 ) 717} 718 719async fn http_call( 720 router: axum::Router, 721 method: Method, 722 uri: &str, 723 token: Option<&str>, 724 body: Bytes, 725) -> (u16, Bytes) { 726 let mut request = http::Request::builder() 727 .method(method) 728 .uri(uri) 729 .body(axum::body::Body::from(body)) 730 .expect("request builds"); 731 if let Some(token) = token { 732 request.headers_mut().insert( 733 AUTHORIZATION, 734 http::HeaderValue::from_str(&format!("Bearer {token}")).expect("bearer header"), 735 ); 736 } 737 request 738 .extensions_mut() 739 .insert(axum::extract::ConnectInfo(SocketAddr::from(( 740 [127, 0, 0, 1], 741 4242, 742 )))); 743 let response = router.oneshot(request).await.expect("router answers"); 744 let status = response.status().as_u16(); 745 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX) 746 .await 747 .expect("response body"); 748 (status, bytes) 749} 750 751async fn drive_kill(router: axum::Router, method: Method, uri: &str, body: Bytes) { 752 let call = http_call(router, method, uri, None, body); 753 futures::pin_mut!(call); 754 tokio::select! { 755 biased; 756 _ = &mut call => {} 757 _ = tokio::task::yield_now() => {} 758 } 759} 760 761fn repo_did_of(body: &Bytes) -> Option<RepoDid> { 762 serde_json::from_slice::<Value>(body) 763 .ok() 764 .and_then(|value| { 765 value 766 .get("repoDid") 767 .and_then(Value::as_str) 768 .map(str::to_string) 769 }) 770 .and_then(|did| RepoDid::new(did).ok()) 771} 772 773fn body_digest(body: &Bytes) -> u64 { 774 match serde_json::from_slice::<Value>(body) { 775 Ok(mut value) => { 776 canonicalize(&mut value); 777 fnv1a(&serde_json::to_vec(&value).expect("canonical body serializes")) 778 } 779 Err(_) => fnv1a(body), 780 } 781} 782 783fn canonicalize(value: &mut Value) { 784 match value { 785 Value::Array(items) => { 786 items.iter_mut().for_each(canonicalize); 787 items.sort_by_cached_key(|item| serde_json::to_string(item).expect("array item")); 788 } 789 Value::Object(map) => map.values_mut().for_each(canonicalize), 790 _ => {} 791 } 792} 793 794fn encode_body(value: Value) -> Bytes { 795 Bytes::from(serde_json::to_vec(&value).expect("request body serializes")) 796} 797 798fn enc(did: &str) -> String { 799 did.replace(':', "%3A") 800} 801 802fn repo_at(repos: &[RepoDid], index: usize) -> &RepoDid { 803 repos.get(index).unwrap_or_else(|| { 804 panic!( 805 "plan/execute repo drift: index {index} exceeds {} repos created so far", 806 repos.len() 807 ) 808 }) 809}