Now let's take a silly one
1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use axum::{Json, Router, routing::get};
6use bytes::Bytes;
7use serde_json::json;
8use tempfile::TempDir;
9use url::Url;
10
11use knot_atproto::{Atproto, knot_did_document};
12use knot_cob::{CobHome, CobStore};
13use knot_cobs::{Grant, Registration, RegistryChange};
14use knot_events::{EventLog, SubscriberGate};
15use knot_git::{
16 EntryKind, Identity, Layout, NewCommit, RefUpdate, Repo, StagedAction, StagedChange,
17};
18use knot_index::{Index, Resolved};
19use knot_runtime::{
20 Clock, Entropy, FakeHttp, HttpRequest, HttpResponse, K256Signer, ManualClock, NetworkError,
21 SeededEntropy, Signer, UnixMicros,
22};
23use knot_secrets::SealedStore;
24use knot_types::{
25 AccountDid, AdmissionPolicy, KnotId, Oid, OwnerDid, RefName, RepoDid, RepoName, RepoRkey,
26 UnixSeconds,
27};
28use knot_xrpc::{
29 CobLocks, Committer, LimitConfig, PreAuthLimiter, ReadBudget, Reservations, XrpcState,
30};
31
32use crate::trace::{RepoCollaborators, Snapshot};
33
34const EMPTY_TREE: &str = "4b825dc642cb6eb9a060e54bf8d69288fbee4904";
35const KNOT_HOST: &str = "knot.nel.pet";
36const ADMIN_HOST: &str = "admin.nel.pet";
37const STRANGER_SEED_BASE: u64 = 1_000;
38const PDS_ENDPOINT: &str = "https://pds.nel.pet";
39const START_MICROS: u64 = 1_000_000_000;
40const NO_REFLOG_EXPIRY_FLOOR_SECS: i64 = i64::MAX / 4;
41
42pub(crate) const SUBJECT_DIDS: [&str; 8] = [
43 "did:plc:limpet",
44 "did:plc:whelk",
45 "did:plc:mussel",
46 "did:plc:conch",
47 "did:plc:scallop",
48 "did:plc:cuttle",
49 "did:plc:periwinkle",
50 "did:plc:nautilus",
51];
52
53pub(crate) type Responder =
54 Box<dyn Fn(&HttpRequest) -> Result<HttpResponse, NetworkError> + Send + Sync>;
55
56pub(crate) struct Actor {
57 pub host: String,
58 pub did: AccountDid,
59 pub signer: K256Signer,
60}
61
62#[derive(Default)]
63pub(crate) struct Faults {
64 dropped: Mutex<HashSet<String>>,
65}
66
67impl Faults {
68 fn is_dropped(&self, host: &str) -> bool {
69 self.dropped.lock().expect("faults lock").contains(host)
70 }
71
72 pub(crate) fn drop_host(&self, host: &str) {
73 self.dropped
74 .lock()
75 .expect("faults lock")
76 .insert(host.to_string());
77 }
78
79 pub(crate) fn clear_host(&self, host: &str) {
80 self.dropped.lock().expect("faults lock").remove(host);
81 }
82}
83
84pub(crate) struct Harness {
85 _dir: TempDir,
86 pub clock: Arc<ManualClock>,
87 pub faults: Arc<Faults>,
88 pub layout: Layout,
89 pub index: Arc<Index>,
90 pub knot_aud: String,
91 router: Router,
92 pub admin: Actor,
93 pub strangers: Vec<Actor>,
94 pub subjects: Vec<AccountDid>,
95 pub seed_repo: RepoDid,
96 options: knot_maintenance::Options,
97}
98
99impl Harness {
100 pub(crate) fn build(seed: u64, stranger_pool: usize) -> Self {
101 let dir = tempfile::tempdir().expect("sim tempdir");
102 let knot = KnotId::new(format!("did:web:{KNOT_HOST}")).expect("knot did");
103 let layout = Layout::new(dir.path().join("repos"))
104 .reserving_meta(&knot)
105 .expect("reserve meta");
106 layout.bootstrap_meta(&knot).expect("bootstrap meta");
107 let meta_path = layout.meta_path(&knot).expect("meta path");
108
109 let entropy = Arc::new(SeededEntropy::new(seed ^ 0x5eed_0050));
110 let secrets = Arc::new(
111 SealedStore::open(
112 dir.path().join("keys.sealed"),
113 &[7u8; 32],
114 Box::new(SeededEntropy::new(seed ^ 0x5ec0)),
115 )
116 .expect("sealed store"),
117 );
118 let knot_pubkey = secrets.ensure(&knot).expect("seal knot key");
119 let knot_service_url = format!("https://{KNOT_HOST}");
120
121 let admin = actor(ADMIN_HOST, 1);
122 let strangers: Vec<Actor> = (0..stranger_pool)
123 .map(|index| {
124 let host = format!("stranger{index}.nel.pet");
125 actor(&host, STRANGER_SEED_BASE + index as u64)
126 })
127 .collect();
128 let subjects: Vec<AccountDid> = SUBJECT_DIDS
129 .iter()
130 .map(|did| AccountDid::new(*did).expect("subject did"))
131 .collect();
132
133 let seed_repo = RepoDid::new("did:plc:squid").expect("seed repo did");
134 seed_on_disk(&layout, &seed_repo);
135 register_seed_repo(&meta_path, &knot, &secrets, &admin.did, &seed_repo);
136
137 let index = Arc::new(Index::new(meta_path.clone(), layout.clone()));
138 index.rebuild().expect("rebuild index");
139 index.warm_collaborators();
140
141 let clock = Arc::new(ManualClock::new(UnixMicros::new(START_MICROS)));
142 let pubkeys = pubkey_map(&admin, &strangers);
143 let faults = Arc::new(Faults::default());
144 let responder = build_responder(Arc::clone(&faults), pubkeys);
145 let atproto = Arc::new(Atproto::new(
146 FakeHttp::new(responder),
147 Arc::clone(&clock),
148 knot.clone(),
149 Url::parse("https://plc.directory/").expect("plc url"),
150 ));
151
152 let state = Arc::new(XrpcState {
153 layout: layout.clone(),
154 index: Arc::clone(&index),
155 atproto,
156 secrets,
157 entropy: entropy as Arc<dyn Entropy>,
158 admins: BTreeSet::from([admin.did.clone()]),
159 admission: AdmissionPolicy::Closed,
160 knot_did: knot.clone(),
161 meta_path,
162 knot_service_url: knot_service_url.clone(),
163 limiter: Arc::new(PreAuthLimiter::with_config(LimitConfig {
164 burst: 1_000_000,
165 refill_micros: 1,
166 per_peer_inflight: 4096,
167 global_inflight: 4096,
168 })),
169 cob_locks: Arc::new(CobLocks::default()),
170 reservations: Arc::new(Reservations::new(1_000_000, 256, 256)),
171 trusted_proxy_header: None,
172 committer: Committer {
173 name: "knot".to_string(),
174 email: "knot@nel.pet".to_string(),
175 },
176 max_body_bytes: 256 * 1024,
177 max_patch_bytes: 16 * 1024 * 1024,
178 max_patch_decompressed_bytes: 128 * 1024 * 1024,
179 max_response_bytes: 8 * 1024 * 1024,
180 max_archive_bytes: 1024 * 1024 * 1024,
181 tree_last_commit_budget: ReadBudget::Unbounded,
182 blob_last_commit_budget: ReadBudget::Unbounded,
183 languages_budget: ReadBudget::Unbounded,
184 languages_push_budget: Duration::from_secs(120),
185 git_http: Arc::new(FakeHttp::new(|_request: &HttpRequest| {
186 Err(NetworkError::Connect(
187 "simulation serves no git upstream".to_string(),
188 ))
189 })),
190 fork_max_pack_bytes: 1024 * 1024 * 1024,
191 service_owner: admin.did.clone(),
192 events: Arc::new(EventLog::new(Arc::clone(&clock), 4096)),
193 subscriber_gate: Arc::new(SubscriberGate::new(256, 64)),
194 });
195
196 let resolver: Arc<dyn knot_pack::RepoResolver> = {
197 let index = Arc::clone(&index);
198 Arc::new(move |target: &knot_pack::RepoTarget| match target {
199 knot_pack::RepoTarget::Did(did) => match index.owner_of(did) {
200 Resolved::Ready(Some(_)) => knot_pack::RepoLookup::Hosted(did.clone()),
201 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted,
202 Resolved::Warming => knot_pack::RepoLookup::Unavailable,
203 },
204 knot_pack::RepoTarget::OwnerRkey(owner, rkey) => {
205 match index.resolve_repo(owner, rkey) {
206 Resolved::Ready(Some(found)) => knot_pack::RepoLookup::Hosted(found),
207 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted,
208 Resolved::Warming => knot_pack::RepoLookup::Unavailable,
209 }
210 }
211 })
212 };
213 let did_document = knot_did_document(&knot, &knot_pubkey, &knot_service_url);
214 let knot_aud = knot.as_str().to_string();
215 let router = knot_pack::router(layout.clone(), resolver)
216 .merge(knot_xrpc::router(Arc::clone(&state)))
217 .route(
218 "/.well-known/did.json",
219 get(move || {
220 let document = did_document.clone();
221 async move { Json(document) }
222 }),
223 );
224 let options = knot_maintenance::Options {
225 repack_max_objects: 1_000_000,
226 prune_grace_secs: 0,
227 reflog_floor_secs: NO_REFLOG_EXPIRY_FLOOR_SECS,
228 commit_graph: true,
229 };
230
231 Self {
232 _dir: dir,
233 clock,
234 faults,
235 layout,
236 index,
237 knot_aud,
238 router,
239 admin,
240 strangers,
241 subjects,
242 seed_repo,
243 options,
244 }
245 }
246
247 pub(crate) fn router(&self) -> Router {
248 self.router.clone()
249 }
250
251 pub(crate) fn now_seconds(&self) -> i64 {
252 (self.clock.now_unix_micros().get() / 1_000_000) as i64
253 }
254
255 pub(crate) fn advance(&self, micros: u64) {
256 self.clock.advance(micros);
257 }
258
259 pub(crate) fn maintain(&self, repo: &RepoDid) -> Result<(), String> {
260 let now = self.now_seconds();
261 self.layout
262 .open(repo)
263 .map_err(knot_maintenance::MaintError::from)
264 .and_then(|opened| knot_maintenance::run_repo(&opened, now, &self.options))
265 .map(|_| ())
266 .map_err(|error| error.to_string())
267 }
268
269 pub(crate) fn populate(&self, repo: &RepoDid) {
270 let opened = self.layout.open(repo).expect("open created repo");
271 write_history(&opened, repo.as_str());
272 }
273
274 pub(crate) fn snapshot(&self, round: u32, repos: &[RepoDid]) -> Snapshot {
275 let collaborators = repos
276 .iter()
277 .map(|repo| {
278 let _ = self.index.ensure_collaborators(repo);
279 RepoCollaborators {
280 repo: repo.as_str().to_string(),
281 subjects: sorted(grant_subjects(self.index.collaborator_entries(repo))),
282 }
283 })
284 .collect();
285 let repo_list = {
286 let mut repos: Vec<String> = self
287 .index
288 .hosted_repos()
289 .iter()
290 .map(|repo| repo.as_str().to_string())
291 .collect();
292 repos.sort();
293 repos
294 };
295 Snapshot {
296 round,
297 clock_micros: self.clock.now_unix_micros().get(),
298 members: sorted(grant_subjects(self.index.member_entries())),
299 blocked: sorted(grant_subjects(self.index.blocked_entries())),
300 repos: repo_list,
301 collaborators,
302 }
303 }
304}
305
306fn actor(host: &str, seed: u64) -> Actor {
307 Actor {
308 host: host.to_string(),
309 did: AccountDid::new(format!("did:web:{host}")).expect("actor did"),
310 signer: K256Signer::generate(&SeededEntropy::new(seed)),
311 }
312}
313
314fn pubkey_map(admin: &Actor, strangers: &[Actor]) -> HashMap<String, Vec<u8>> {
315 std::iter::once((
316 admin.host.clone(),
317 admin.signer.public_key().as_bytes().to_vec(),
318 ))
319 .chain(strangers.iter().map(|actor| {
320 (
321 actor.host.clone(),
322 actor.signer.public_key().as_bytes().to_vec(),
323 )
324 }))
325 .collect()
326}
327
328fn build_responder(faults: Arc<Faults>, pubkeys: HashMap<String, Vec<u8>>) -> Responder {
329 let plc_signer = K256Signer::generate(&SeededEntropy::new(7));
330 Box::new(move |request: &HttpRequest| {
331 if request.method == http::Method::POST {
332 return Ok(ok_body(Bytes::new()));
333 }
334 let host = request.url.host_str().unwrap_or_default().to_string();
335 if faults.is_dropped(&host) {
336 return Err(NetworkError::Timeout(
337 "identity resolution dropped by simulation".to_string(),
338 ));
339 }
340 if host == "plc.directory" {
341 let did = request.url.path().trim_start_matches('/').to_string();
342 return Ok(ok_body(did_doc(&did, plc_signer.public_key().as_bytes())));
343 }
344 match pubkeys.get(&host) {
345 Some(sec1) => Ok(ok_body(did_doc(&format!("did:web:{host}"), sec1))),
346 None => Ok(HttpResponse {
347 status: http::StatusCode::NOT_FOUND,
348 headers: http::HeaderMap::new(),
349 body: Bytes::new(),
350 }),
351 }
352 })
353}
354
355fn did_doc(did: &str, sec1: &[u8]) -> Bytes {
356 let multikey = knot_types::crypto::multikey(0xe7, sec1);
357 Bytes::from(
358 serde_json::to_vec(&json!({
359 "id": did,
360 "alsoKnownAs": [],
361 "verificationMethod": [{
362 "id": format!("{did}#atproto"),
363 "type": "Multikey",
364 "controller": did,
365 "publicKeyMultibase": multikey
366 }],
367 "service": [{
368 "id": "#atproto_pds",
369 "type": "AtprotoPersonalDataServer",
370 "serviceEndpoint": PDS_ENDPOINT
371 }]
372 }))
373 .expect("did doc serializes"),
374 )
375}
376
377fn ok_body(body: Bytes) -> HttpResponse {
378 HttpResponse {
379 status: http::StatusCode::OK,
380 headers: http::HeaderMap::new(),
381 body,
382 }
383}
384
385fn seed_on_disk(layout: &Layout, did: &RepoDid) {
386 let repo = layout.create(did).expect("create seed repo");
387 write_history(&repo, "reef");
388}
389
390fn write_history(repo: &Repo, marker: &str) {
391 let identity = Identity {
392 name: "nel".to_string(),
393 email: "nel@oyster.cafe".to_string(),
394 time: UnixSeconds::new(1_700_000_000),
395 offset_seconds: 0,
396 };
397 let main = RefName::new("refs/heads/main").expect("main ref");
398 let first_tree = repo
399 .write_staged_tree(
400 Oid::from_hex(EMPTY_TREE).expect("empty tree"),
401 &[StagedChange {
402 path: "README.md".to_string(),
403 action: StagedAction::Put {
404 content: format!("# {marker}\n").into_bytes(),
405 kind: EntryKind::Blob,
406 },
407 }],
408 )
409 .expect("first tree");
410 let root = repo
411 .write_commit(&NewCommit {
412 tree: first_tree,
413 parents: Vec::new(),
414 author: identity.clone(),
415 committer: identity.clone(),
416 message: "root".to_string(),
417 extra_headers: Vec::new(),
418 })
419 .expect("root commit");
420 let second_tree = repo
421 .write_staged_tree(
422 first_tree,
423 &[StagedChange {
424 path: "src/main.rs".to_string(),
425 action: StagedAction::Put {
426 content: b"fn main() {}\n".to_vec(),
427 kind: EntryKind::Blob,
428 },
429 }],
430 )
431 .expect("second tree");
432 let tip = repo
433 .write_commit(&NewCommit {
434 tree: second_tree,
435 parents: vec![root],
436 author: identity.clone(),
437 committer: identity,
438 message: "add main".to_string(),
439 extra_headers: Vec::new(),
440 })
441 .expect("second commit");
442 repo.update_ref(&RefUpdate::Create {
443 name: main.clone(),
444 new: tip,
445 })
446 .expect("create main");
447 repo.set_head(&main).expect("set head");
448}
449
450fn register_seed_repo(
451 meta_path: &std::path::Path,
452 knot: &KnotId,
453 secrets: &SealedStore,
454 owner: &AccountDid,
455 repo: &RepoDid,
456) {
457 let meta = Repo::open(meta_path).expect("open meta");
458 let store = CobStore::new(&meta);
459 let signer = secrets.signer(knot).expect("knot signer");
460 store
461 .create(
462 &CobHome::from(knot),
463 &RegistryChange::Register(Registration {
464 owner: OwnerDid::new(owner.as_str()).expect("owner did"),
465 rkey: RepoRkey::new("anemone").expect("rkey"),
466 name: RepoName::new("anemone").expect("name"),
467 repo: repo.clone(),
468 created_at: UnixSeconds::new(1),
469 }),
470 &signer,
471 UnixSeconds::new(1),
472 )
473 .expect("register seed repo");
474}
475
476fn grant_subjects(resolved: Resolved<Vec<Grant>>) -> Vec<String> {
477 match resolved {
478 Resolved::Ready(grants) => grants
479 .into_iter()
480 .map(|grant| grant.subject.as_str().to_string())
481 .collect(),
482 Resolved::Warming => Vec::new(),
483 }
484}
485
486fn sorted(mut values: Vec<String>) -> Vec<String> {
487 values.sort();
488 values
489}