Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 17 kB View raw
1use std::collections::{BTreeSet, HashMap, HashSet}; 2use std::sync::{Arc, Mutex}; 3use std::time::Duration; 4 5use axum::{Json, Router, routing::get}; 6use bytes::Bytes; 7use serde_json::json; 8use tempfile::TempDir; 9use url::Url; 10 11use knot_atproto::{Atproto, knot_did_document}; 12use knot_cob::{CobHome, CobStore}; 13use knot_cobs::{Grant, Registration, RegistryChange}; 14use knot_events::{EventLog, SubscriberGate}; 15use knot_git::{ 16 EntryKind, Identity, Layout, NewCommit, RefUpdate, Repo, StagedAction, StagedChange, 17}; 18use knot_index::{Index, Resolved}; 19use knot_runtime::{ 20 Clock, Entropy, FakeHttp, HttpRequest, HttpResponse, K256Signer, ManualClock, NetworkError, 21 SeededEntropy, Signer, UnixMicros, 22}; 23use knot_secrets::SealedStore; 24use knot_types::{ 25 AccountDid, AdmissionPolicy, KnotId, Oid, OwnerDid, RefName, RepoDid, RepoName, RepoRkey, 26 UnixSeconds, 27}; 28use knot_xrpc::{ 29 CobLocks, Committer, LimitConfig, PreAuthLimiter, ReadBudget, Reservations, XrpcState, 30}; 31 32use crate::trace::{RepoCollaborators, Snapshot}; 33 34const EMPTY_TREE: &str = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"; 35const KNOT_HOST: &str = "knot.nel.pet"; 36const ADMIN_HOST: &str = "admin.nel.pet"; 37const STRANGER_SEED_BASE: u64 = 1_000; 38const PDS_ENDPOINT: &str = "https://pds.nel.pet"; 39const START_MICROS: u64 = 1_000_000_000; 40const NO_REFLOG_EXPIRY_FLOOR_SECS: i64 = i64::MAX / 4; 41 42pub(crate) const SUBJECT_DIDS: [&str; 8] = [ 43 "did:plc:limpet", 44 "did:plc:whelk", 45 "did:plc:mussel", 46 "did:plc:conch", 47 "did:plc:scallop", 48 "did:plc:cuttle", 49 "did:plc:periwinkle", 50 "did:plc:nautilus", 51]; 52 53pub(crate) type Responder = 54 Box<dyn Fn(&HttpRequest) -> Result<HttpResponse, NetworkError> + Send + Sync>; 55 56pub(crate) struct Actor { 57 pub host: String, 58 pub did: AccountDid, 59 pub signer: K256Signer, 60} 61 62#[derive(Default)] 63pub(crate) struct Faults { 64 dropped: Mutex<HashSet<String>>, 65} 66 67impl Faults { 68 fn is_dropped(&self, host: &str) -> bool { 69 self.dropped.lock().expect("faults lock").contains(host) 70 } 71 72 pub(crate) fn drop_host(&self, host: &str) { 73 self.dropped 74 .lock() 75 .expect("faults lock") 76 .insert(host.to_string()); 77 } 78 79 pub(crate) fn clear_host(&self, host: &str) { 80 self.dropped.lock().expect("faults lock").remove(host); 81 } 82} 83 84pub(crate) struct Harness { 85 _dir: TempDir, 86 pub clock: Arc<ManualClock>, 87 pub faults: Arc<Faults>, 88 pub layout: Layout, 89 pub index: Arc<Index>, 90 pub knot_aud: String, 91 router: Router, 92 pub admin: Actor, 93 pub strangers: Vec<Actor>, 94 pub subjects: Vec<AccountDid>, 95 pub seed_repo: RepoDid, 96 options: knot_maintenance::Options, 97} 98 99impl Harness { 100 pub(crate) fn build(seed: u64, stranger_pool: usize) -> Self { 101 let dir = tempfile::tempdir().expect("sim tempdir"); 102 let knot = KnotId::new(format!("did:web:{KNOT_HOST}")).expect("knot did"); 103 let layout = Layout::new(dir.path().join("repos")) 104 .reserving_meta(&knot) 105 .expect("reserve meta"); 106 layout.bootstrap_meta(&knot).expect("bootstrap meta"); 107 let meta_path = layout.meta_path(&knot).expect("meta path"); 108 109 let entropy = Arc::new(SeededEntropy::new(seed ^ 0x5eed_0050)); 110 let secrets = Arc::new( 111 SealedStore::open( 112 dir.path().join("keys.sealed"), 113 &[7u8; 32], 114 Box::new(SeededEntropy::new(seed ^ 0x5ec0)), 115 ) 116 .expect("sealed store"), 117 ); 118 let knot_pubkey = secrets.ensure(&knot).expect("seal knot key"); 119 let knot_service_url = format!("https://{KNOT_HOST}"); 120 121 let admin = actor(ADMIN_HOST, 1); 122 let strangers: Vec<Actor> = (0..stranger_pool) 123 .map(|index| { 124 let host = format!("stranger{index}.nel.pet"); 125 actor(&host, STRANGER_SEED_BASE + index as u64) 126 }) 127 .collect(); 128 let subjects: Vec<AccountDid> = SUBJECT_DIDS 129 .iter() 130 .map(|did| AccountDid::new(*did).expect("subject did")) 131 .collect(); 132 133 let seed_repo = RepoDid::new("did:plc:squid").expect("seed repo did"); 134 seed_on_disk(&layout, &seed_repo); 135 register_seed_repo(&meta_path, &knot, &secrets, &admin.did, &seed_repo); 136 137 let index = Arc::new(Index::new(meta_path.clone(), layout.clone())); 138 index.rebuild().expect("rebuild index"); 139 index.warm_collaborators(); 140 141 let clock = Arc::new(ManualClock::new(UnixMicros::new(START_MICROS))); 142 let pubkeys = pubkey_map(&admin, &strangers); 143 let faults = Arc::new(Faults::default()); 144 let responder = build_responder(Arc::clone(&faults), pubkeys); 145 let atproto = Arc::new(Atproto::new( 146 FakeHttp::new(responder), 147 Arc::clone(&clock), 148 knot.clone(), 149 Url::parse("https://plc.directory/").expect("plc url"), 150 )); 151 152 let state = Arc::new(XrpcState { 153 layout: layout.clone(), 154 index: Arc::clone(&index), 155 atproto, 156 secrets, 157 entropy: entropy as Arc<dyn Entropy>, 158 admins: BTreeSet::from([admin.did.clone()]), 159 admission: AdmissionPolicy::Closed, 160 knot_did: knot.clone(), 161 meta_path, 162 knot_service_url: knot_service_url.clone(), 163 limiter: Arc::new(PreAuthLimiter::with_config(LimitConfig { 164 burst: 1_000_000, 165 refill_micros: 1, 166 per_peer_inflight: 4096, 167 global_inflight: 4096, 168 })), 169 cob_locks: Arc::new(CobLocks::default()), 170 reservations: Arc::new(Reservations::new(1_000_000, 256, 256)), 171 trusted_proxy_header: None, 172 committer: Committer { 173 name: "knot".to_string(), 174 email: "knot@nel.pet".to_string(), 175 }, 176 max_body_bytes: 256 * 1024, 177 max_patch_bytes: 16 * 1024 * 1024, 178 max_patch_decompressed_bytes: 128 * 1024 * 1024, 179 max_response_bytes: 8 * 1024 * 1024, 180 max_archive_bytes: 1024 * 1024 * 1024, 181 tree_last_commit_budget: ReadBudget::Unbounded, 182 blob_last_commit_budget: ReadBudget::Unbounded, 183 languages_budget: ReadBudget::Unbounded, 184 languages_push_budget: Duration::from_secs(120), 185 git_http: Arc::new(FakeHttp::new(|_request: &HttpRequest| { 186 Err(NetworkError::Connect( 187 "simulation serves no git upstream".to_string(), 188 )) 189 })), 190 fork_max_pack_bytes: 1024 * 1024 * 1024, 191 service_owner: admin.did.clone(), 192 events: Arc::new(EventLog::new(Arc::clone(&clock), 4096)), 193 subscriber_gate: Arc::new(SubscriberGate::new(256, 64)), 194 }); 195 196 let resolver: Arc<dyn knot_pack::RepoResolver> = { 197 let index = Arc::clone(&index); 198 Arc::new(move |target: &knot_pack::RepoTarget| match target { 199 knot_pack::RepoTarget::Did(did) => match index.owner_of(did) { 200 Resolved::Ready(Some(_)) => knot_pack::RepoLookup::Hosted(did.clone()), 201 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted, 202 Resolved::Warming => knot_pack::RepoLookup::Unavailable, 203 }, 204 knot_pack::RepoTarget::OwnerRkey(owner, rkey) => { 205 match index.resolve_repo(owner, rkey) { 206 Resolved::Ready(Some(found)) => knot_pack::RepoLookup::Hosted(found), 207 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted, 208 Resolved::Warming => knot_pack::RepoLookup::Unavailable, 209 } 210 } 211 }) 212 }; 213 let did_document = knot_did_document(&knot, &knot_pubkey, &knot_service_url); 214 let knot_aud = knot.as_str().to_string(); 215 let router = knot_pack::router(layout.clone(), resolver) 216 .merge(knot_xrpc::router(Arc::clone(&state))) 217 .route( 218 "/.well-known/did.json", 219 get(move || { 220 let document = did_document.clone(); 221 async move { Json(document) } 222 }), 223 ); 224 let options = knot_maintenance::Options { 225 repack_max_objects: 1_000_000, 226 prune_grace_secs: 0, 227 reflog_floor_secs: NO_REFLOG_EXPIRY_FLOOR_SECS, 228 commit_graph: true, 229 }; 230 231 Self { 232 _dir: dir, 233 clock, 234 faults, 235 layout, 236 index, 237 knot_aud, 238 router, 239 admin, 240 strangers, 241 subjects, 242 seed_repo, 243 options, 244 } 245 } 246 247 pub(crate) fn router(&self) -> Router { 248 self.router.clone() 249 } 250 251 pub(crate) fn now_seconds(&self) -> i64 { 252 (self.clock.now_unix_micros().get() / 1_000_000) as i64 253 } 254 255 pub(crate) fn advance(&self, micros: u64) { 256 self.clock.advance(micros); 257 } 258 259 pub(crate) fn maintain(&self, repo: &RepoDid) -> Result<(), String> { 260 let now = self.now_seconds(); 261 self.layout 262 .open(repo) 263 .map_err(knot_maintenance::MaintError::from) 264 .and_then(|opened| knot_maintenance::run_repo(&opened, now, &self.options)) 265 .map(|_| ()) 266 .map_err(|error| error.to_string()) 267 } 268 269 pub(crate) fn populate(&self, repo: &RepoDid) { 270 let opened = self.layout.open(repo).expect("open created repo"); 271 write_history(&opened, repo.as_str()); 272 } 273 274 pub(crate) fn snapshot(&self, round: u32, repos: &[RepoDid]) -> Snapshot { 275 let collaborators = repos 276 .iter() 277 .map(|repo| { 278 let _ = self.index.ensure_collaborators(repo); 279 RepoCollaborators { 280 repo: repo.as_str().to_string(), 281 subjects: sorted(grant_subjects(self.index.collaborator_entries(repo))), 282 } 283 }) 284 .collect(); 285 let repo_list = { 286 let mut repos: Vec<String> = self 287 .index 288 .hosted_repos() 289 .iter() 290 .map(|repo| repo.as_str().to_string()) 291 .collect(); 292 repos.sort(); 293 repos 294 }; 295 Snapshot { 296 round, 297 clock_micros: self.clock.now_unix_micros().get(), 298 members: sorted(grant_subjects(self.index.member_entries())), 299 blocked: sorted(grant_subjects(self.index.blocked_entries())), 300 repos: repo_list, 301 collaborators, 302 } 303 } 304} 305 306fn actor(host: &str, seed: u64) -> Actor { 307 Actor { 308 host: host.to_string(), 309 did: AccountDid::new(format!("did:web:{host}")).expect("actor did"), 310 signer: K256Signer::generate(&SeededEntropy::new(seed)), 311 } 312} 313 314fn pubkey_map(admin: &Actor, strangers: &[Actor]) -> HashMap<String, Vec<u8>> { 315 std::iter::once(( 316 admin.host.clone(), 317 admin.signer.public_key().as_bytes().to_vec(), 318 )) 319 .chain(strangers.iter().map(|actor| { 320 ( 321 actor.host.clone(), 322 actor.signer.public_key().as_bytes().to_vec(), 323 ) 324 })) 325 .collect() 326} 327 328fn build_responder(faults: Arc<Faults>, pubkeys: HashMap<String, Vec<u8>>) -> Responder { 329 let plc_signer = K256Signer::generate(&SeededEntropy::new(7)); 330 Box::new(move |request: &HttpRequest| { 331 if request.method == http::Method::POST { 332 return Ok(ok_body(Bytes::new())); 333 } 334 let host = request.url.host_str().unwrap_or_default().to_string(); 335 if faults.is_dropped(&host) { 336 return Err(NetworkError::Timeout( 337 "identity resolution dropped by simulation".to_string(), 338 )); 339 } 340 if host == "plc.directory" { 341 let did = request.url.path().trim_start_matches('/').to_string(); 342 return Ok(ok_body(did_doc(&did, plc_signer.public_key().as_bytes()))); 343 } 344 match pubkeys.get(&host) { 345 Some(sec1) => Ok(ok_body(did_doc(&format!("did:web:{host}"), sec1))), 346 None => Ok(HttpResponse { 347 status: http::StatusCode::NOT_FOUND, 348 headers: http::HeaderMap::new(), 349 body: Bytes::new(), 350 }), 351 } 352 }) 353} 354 355fn did_doc(did: &str, sec1: &[u8]) -> Bytes { 356 let multikey = knot_types::crypto::multikey(0xe7, sec1); 357 Bytes::from( 358 serde_json::to_vec(&json!({ 359 "id": did, 360 "alsoKnownAs": [], 361 "verificationMethod": [{ 362 "id": format!("{did}#atproto"), 363 "type": "Multikey", 364 "controller": did, 365 "publicKeyMultibase": multikey 366 }], 367 "service": [{ 368 "id": "#atproto_pds", 369 "type": "AtprotoPersonalDataServer", 370 "serviceEndpoint": PDS_ENDPOINT 371 }] 372 })) 373 .expect("did doc serializes"), 374 ) 375} 376 377fn ok_body(body: Bytes) -> HttpResponse { 378 HttpResponse { 379 status: http::StatusCode::OK, 380 headers: http::HeaderMap::new(), 381 body, 382 } 383} 384 385fn seed_on_disk(layout: &Layout, did: &RepoDid) { 386 let repo = layout.create(did).expect("create seed repo"); 387 write_history(&repo, "reef"); 388} 389 390fn write_history(repo: &Repo, marker: &str) { 391 let identity = Identity { 392 name: "nel".to_string(), 393 email: "nel@oyster.cafe".to_string(), 394 time: UnixSeconds::new(1_700_000_000), 395 offset_seconds: 0, 396 }; 397 let main = RefName::new("refs/heads/main").expect("main ref"); 398 let first_tree = repo 399 .write_staged_tree( 400 Oid::from_hex(EMPTY_TREE).expect("empty tree"), 401 &[StagedChange { 402 path: "README.md".to_string(), 403 action: StagedAction::Put { 404 content: format!("# {marker}\n").into_bytes(), 405 kind: EntryKind::Blob, 406 }, 407 }], 408 ) 409 .expect("first tree"); 410 let root = repo 411 .write_commit(&NewCommit { 412 tree: first_tree, 413 parents: Vec::new(), 414 author: identity.clone(), 415 committer: identity.clone(), 416 message: "root".to_string(), 417 extra_headers: Vec::new(), 418 }) 419 .expect("root commit"); 420 let second_tree = repo 421 .write_staged_tree( 422 first_tree, 423 &[StagedChange { 424 path: "src/main.rs".to_string(), 425 action: StagedAction::Put { 426 content: b"fn main() {}\n".to_vec(), 427 kind: EntryKind::Blob, 428 }, 429 }], 430 ) 431 .expect("second tree"); 432 let tip = repo 433 .write_commit(&NewCommit { 434 tree: second_tree, 435 parents: vec![root], 436 author: identity.clone(), 437 committer: identity, 438 message: "add main".to_string(), 439 extra_headers: Vec::new(), 440 }) 441 .expect("second commit"); 442 repo.update_ref(&RefUpdate::Create { 443 name: main.clone(), 444 new: tip, 445 }) 446 .expect("create main"); 447 repo.set_head(&main).expect("set head"); 448} 449 450fn register_seed_repo( 451 meta_path: &std::path::Path, 452 knot: &KnotId, 453 secrets: &SealedStore, 454 owner: &AccountDid, 455 repo: &RepoDid, 456) { 457 let meta = Repo::open(meta_path).expect("open meta"); 458 let store = CobStore::new(&meta); 459 let signer = secrets.signer(knot).expect("knot signer"); 460 store 461 .create( 462 &CobHome::from(knot), 463 &RegistryChange::Register(Registration { 464 owner: OwnerDid::new(owner.as_str()).expect("owner did"), 465 rkey: RepoRkey::new("anemone").expect("rkey"), 466 name: RepoName::new("anemone").expect("name"), 467 repo: repo.clone(), 468 created_at: UnixSeconds::new(1), 469 }), 470 &signer, 471 UnixSeconds::new(1), 472 ) 473 .expect("register seed repo"); 474} 475 476fn grant_subjects(resolved: Resolved<Vec<Grant>>) -> Vec<String> { 477 match resolved { 478 Resolved::Ready(grants) => grants 479 .into_iter() 480 .map(|grant| grant.subject.as_str().to_string()) 481 .collect(), 482 Resolved::Warming => Vec::new(), 483 } 484} 485 486fn sorted(mut values: Vec<String>) -> Vec<String> { 487 values.sort(); 488 values 489}