Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 16 kB View raw
1use std::path::Path; 2use std::time::Instant; 3 4use knot_cob::{CobHome, CobStore}; 5use knot_cobs::{Grant, MembersChange}; 6use knot_git::{Layout, Repo}; 7use knot_pack::{ 8 PackLimits, ReceiveCommand, ReceiveFramer, ReceiveGuard, RefDecision, receive_pack_guarded, 9 receive_request_complete, sweep_incoming, 10}; 11use knot_runtime::{K256Signer, SeededEntropy, Signer}; 12use knot_types::{AccountDid, ActorId, ObjectFormat, Oid, RepoDid, UnixSeconds}; 13 14const SHA1: ObjectFormat = ObjectFormat::SHA1; 15 16mod common; 17use common::{commit, must, pack_objects, receive_request}; 18 19fn limits() -> PackLimits { 20 PackLimits::default() 21} 22 23fn seeded_pack(layout: &Layout, did: &RepoDid) -> (Repo, String, Vec<u8>) { 24 let bare = layout.create(did).unwrap(); 25 let work_dir = tempfile::tempdir().unwrap(); 26 let work = work_dir.path(); 27 must(work, &["init", "-q", "-b", "main"]); 28 commit(work, "a.txt", "x\n", "c1"); 29 let c1 = must(work, &["rev-parse", "HEAD"]); 30 let oids: Vec<String> = must(work, &["rev-list", "--objects", &c1]) 31 .lines() 32 .map(|line| line.split_whitespace().next().unwrap().to_string()) 33 .collect(); 34 let pack = pack_objects(work, &oids); 35 (bare, c1, pack) 36} 37 38fn report_text(report: &[u8]) -> String { 39 String::from_utf8_lossy(report).replace('\0', "") 40} 41 42fn dir_count(path: &Path) -> usize { 43 std::fs::read_dir(path) 44 .map(|entries| entries.filter_map(Result::ok).count()) 45 .unwrap_or(0) 46} 47 48fn live_object_count(repo: &Repo) -> usize { 49 let objects = repo.objects_dir(); 50 let loose: usize = std::fs::read_dir(&objects) 51 .map(|entries| { 52 entries 53 .filter_map(Result::ok) 54 .filter(|entry| { 55 entry 56 .file_name() 57 .to_str() 58 .is_some_and(|name| name.len() == 2) 59 && entry.path().is_dir() 60 }) 61 .map(|shard| dir_count(&shard.path())) 62 .sum() 63 }) 64 .unwrap_or(0); 65 let packs = std::fs::read_dir(objects.join("pack")) 66 .map(|entries| { 67 entries 68 .filter_map(Result::ok) 69 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "pack")) 70 .count() 71 }) 72 .unwrap_or(0); 73 loose + packs 74} 75 76struct AllowPublic; 77 78impl ReceiveGuard for AllowPublic { 79 fn authorize(&self, _staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> { 80 commands 81 .iter() 82 .map(|command| { 83 if knot_git::is_public_ref(command.refname()) { 84 RefDecision::Allow 85 } else { 86 RefDecision::Reject("not public ref".to_string()) 87 } 88 }) 89 .collect() 90 } 91} 92 93struct DenyAll; 94 95impl ReceiveGuard for DenyAll { 96 fn authorize(&self, _staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> { 97 commands 98 .iter() 99 .map(|_| RefDecision::Reject("unauthorized".to_string())) 100 .collect() 101 } 102} 103 104struct AllowAll; 105 106impl ReceiveGuard for AllowAll { 107 fn authorize(&self, _staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> { 108 commands.iter().map(|_| RefDecision::Allow).collect() 109 } 110} 111 112struct CobVerify { 113 owner: ActorId, 114 home: CobHome, 115} 116 117impl ReceiveGuard for CobVerify { 118 fn authorize(&self, staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> { 119 commands 120 .iter() 121 .map(|command| { 122 let store = CobStore::new(staged); 123 match knot_cobs::verify_cob_ref(&store, &self.home, command.refname(), &self.owner) 124 { 125 Ok(_) => RefDecision::Allow, 126 Err(error) => RefDecision::Reject(error.to_string()), 127 } 128 }) 129 .collect() 130 } 131} 132 133#[test] 134fn an_authorized_push_migrates_objects_and_updates_the_ref() { 135 let scan = tempfile::tempdir().unwrap(); 136 let layout = Layout::new(scan.path()); 137 let did = RepoDid::new("did:plc:squid").unwrap(); 138 let (bare, c1, pack) = seeded_pack(&layout, &did); 139 140 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack); 141 let report = receive_pack_guarded(&bare, &request, &limits(), &AllowPublic, &|_| {}) 142 .unwrap() 143 .report; 144 let report = report_text(&report); 145 assert!(report.contains("unpack ok"), "{report}"); 146 assert!(report.contains("ok refs/heads/main"), "{report}"); 147 148 let refs = bare.references().unwrap(); 149 assert_eq!(refs.len(), 1); 150 assert_eq!(refs[0].name.as_str(), "refs/heads/main"); 151 assert_eq!(refs[0].target, Oid::from_hex(&c1).unwrap()); 152 assert!(bare.contains(Oid::from_hex(&c1).unwrap())); 153} 154 155#[test] 156fn a_denied_push_leaves_no_objects_in_the_live_odb() { 157 let scan = tempfile::tempdir().unwrap(); 158 let layout = Layout::new(scan.path()); 159 let did = RepoDid::new("did:plc:squid").unwrap(); 160 let (bare, c1, pack) = seeded_pack(&layout, &did); 161 162 assert_eq!( 163 live_object_count(&bare), 164 0, 165 "fresh bare repo has no objects" 166 ); 167 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack); 168 let report = receive_pack_guarded(&bare, &request, &limits(), &DenyAll, &|_| {}) 169 .unwrap() 170 .report; 171 let report = report_text(&report); 172 assert!( 173 report.contains("ng refs/heads/main unauthorized"), 174 "{report}" 175 ); 176 177 assert!( 178 bare.references().unwrap().is_empty(), 179 "denied push must not create the ref" 180 ); 181 assert_eq!( 182 live_object_count(&bare), 183 0, 184 "denied push must leave no objects in the live odb" 185 ); 186 assert!(!bare.contains(Oid::from_hex(&c1).unwrap())); 187} 188 189#[test] 190fn a_stale_non_fast_forward_push_is_rejected_and_leaves_no_new_objects() { 191 let scan = tempfile::tempdir().unwrap(); 192 let layout = Layout::new(scan.path()); 193 let did = RepoDid::new("did:plc:squid").unwrap(); 194 let (bare, c1, pack) = seeded_pack(&layout, &did); 195 196 receive_pack_guarded( 197 &bare, 198 &receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack), 199 &limits(), 200 &AllowPublic, 201 &|_| {}, 202 ) 203 .unwrap(); 204 let after_first = live_object_count(&bare); 205 206 let wrong_old = "1".repeat(40); 207 let fresh = "2".repeat(40); 208 let report = receive_pack_guarded( 209 &bare, 210 &receive_request("refs/heads/main", &wrong_old, &fresh, b""), 211 &limits(), 212 &AllowPublic, 213 &|_| {}, 214 ) 215 .unwrap() 216 .report; 217 let report = report_text(&report); 218 assert!(report.contains("ng refs/heads/main"), "{report}"); 219 assert_eq!( 220 bare.find_ref(&knot_types::RefName::new("refs/heads/main").unwrap()) 221 .unwrap(), 222 Some(Oid::from_hex(&c1).unwrap()), 223 "stale push must not move the ref" 224 ); 225 assert_eq!( 226 live_object_count(&bare), 227 after_first, 228 "stale push must not add objects to the live odb" 229 ); 230} 231 232#[test] 233fn a_cob_ref_verifies_against_the_owner_key_and_is_refused_for_a_stranger() { 234 let scan = tempfile::tempdir().unwrap(); 235 let layout = Layout::new(scan.path()); 236 let signer = K256Signer::generate(&SeededEntropy::new(7)); 237 238 let source_did = RepoDid::new("did:plc:source").unwrap(); 239 let home = CobHome::from(&source_did); 240 let source = layout.create(&source_did).unwrap(); 241 let store = CobStore::new(&source); 242 let grant = Grant { 243 subject: AccountDid::new("did:plc:nel").unwrap(), 244 added_by: AccountDid::new("did:plc:nel").unwrap(), 245 created_at: UnixSeconds::new(1), 246 }; 247 let created = store 248 .create( 249 &home, 250 &MembersChange::Add(grant), 251 &signer, 252 UnixSeconds::new(1), 253 ) 254 .unwrap(); 255 let tip = created.tip.oid().to_hex(); 256 let refname = format!( 257 "refs/cobs/sh.tangled.knot.member/{}", 258 created.object.oid().to_hex() 259 ); 260 261 let reachable: Vec<String> = must(source.path(), &["rev-list", "--objects", &tip]) 262 .lines() 263 .map(|line| line.split_whitespace().next().unwrap().to_string()) 264 .collect(); 265 let pack = pack_objects(source.path(), &reachable); 266 let owner = ActorId::from_secp256k1(signer.public_key().as_bytes()); 267 268 let dest = layout 269 .create(&RepoDid::new("did:plc:dest").unwrap()) 270 .unwrap(); 271 let report = receive_pack_guarded( 272 &dest, 273 &receive_request(&refname, &Oid::null().to_hex(), &tip, &pack), 274 &limits(), 275 &CobVerify { 276 owner: owner.clone(), 277 home: home.clone(), 278 }, 279 &|_| {}, 280 ) 281 .unwrap() 282 .report; 283 assert!( 284 report_text(&report).contains(&format!("ok {refname}")), 285 "owner-signed COB ref must be accepted at the receive boundary: {}", 286 report_text(&report) 287 ); 288 289 let stranger_key = K256Signer::generate(&SeededEntropy::new(9)); 290 let stranger = ActorId::from_secp256k1(stranger_key.public_key().as_bytes()); 291 let dest2 = layout 292 .create(&RepoDid::new("did:plc:dest2").unwrap()) 293 .unwrap(); 294 let report = receive_pack_guarded( 295 &dest2, 296 &receive_request(&refname, &Oid::null().to_hex(), &tip, &pack), 297 &limits(), 298 &CobVerify { 299 owner: stranger, 300 home: home.clone(), 301 }, 302 &|_| {}, 303 ) 304 .unwrap() 305 .report; 306 assert!( 307 report_text(&report).contains(&format!("ng {refname}")), 308 "COB ref not signed by the resolved owner key must be refused: {}", 309 report_text(&report) 310 ); 311 assert_eq!( 312 live_object_count(&dest2), 313 0, 314 "refused COB ref push leaves no objects behind" 315 ); 316} 317 318#[test] 319fn a_reserved_ref_update_is_refused_even_when_the_guard_allows_it() { 320 let scan = tempfile::tempdir().unwrap(); 321 let layout = Layout::new(scan.path()); 322 let did = RepoDid::new("did:plc:squid").unwrap(); 323 let bare = layout.create(&did).unwrap(); 324 325 let cob_ref = format!("refs/cobs/sh.tangled.knot.member/{}", "a".repeat(40)); 326 let old = "1".repeat(40); 327 let new = "2".repeat(40); 328 let report = receive_pack_guarded( 329 &bare, 330 &receive_request(&cob_ref, &old, &new, b""), 331 &limits(), 332 &AllowAll, 333 &|_| {}, 334 ) 335 .unwrap() 336 .report; 337 let report = report_text(&report); 338 339 assert!( 340 report.contains(&format!("ng {cob_ref}")), 341 "non-create update to a reserved ref must be refused even under an allow-all guard: {report}" 342 ); 343 assert!( 344 report.contains("cannot be modified"), 345 "refusal must name the create-only rule, not connectivity or compare-and-swap: {report}" 346 ); 347 assert!( 348 bare.references().unwrap().is_empty(), 349 "refused reserved-ref update must land nothing" 350 ); 351 assert_eq!( 352 live_object_count(&bare), 353 0, 354 "refused reserved-ref update must migrate no objects" 355 ); 356} 357 358fn pseudo_random(seed: u64, len: usize) -> Vec<u8> { 359 let mut state = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(1); 360 (0..len) 361 .map(|_| { 362 state ^= state << 13; 363 state ^= state >> 7; 364 state ^= state << 17; 365 (state >> 24) as u8 366 }) 367 .collect() 368} 369 370fn incompressible_pack(megabytes: usize) -> Vec<u8> { 371 let work_dir = tempfile::tempdir().unwrap(); 372 let work = work_dir.path(); 373 must(work, &["init", "-q", "-b", "main"]); 374 (0..megabytes).for_each(|i| { 375 let blob = pseudo_random(i as u64 + 1, 1024 * 1024); 376 std::fs::write(work.join(format!("blob-{i:04}.bin")), blob).unwrap(); 377 }); 378 must(work, &["add", "-A"]); 379 must(work, &["commit", "-q", "-m", "bulk"]); 380 let head = must(work, &["rev-parse", "HEAD"]); 381 let oids: Vec<String> = must(work, &["rev-list", "--objects", &head]) 382 .lines() 383 .map(|line| line.split_whitespace().next().unwrap().to_string()) 384 .collect(); 385 pack_objects(work, &oids) 386} 387 388#[test] 389fn the_receive_framer_scans_incrementally_in_linear_time() { 390 const READ_CHUNK: usize = 64 * 1024; 391 let pack = incompressible_pack(24); 392 let body = receive_request( 393 "refs/heads/main", 394 &Oid::null().to_hex(), 395 &"1".repeat(40), 396 &pack, 397 ); 398 assert!( 399 pack.len() > 8 * 1024 * 1024, 400 "pack must be large enough to expose any quadratic scaling: {} bytes", 401 pack.len() 402 ); 403 404 let single_start = Instant::now(); 405 let complete = ReceiveFramer::new(limits(), SHA1.kind()) 406 .advance(&body) 407 .unwrap(); 408 let single = single_start.elapsed(); 409 assert_eq!( 410 complete, 411 Some(body.len()), 412 "one advance over the full body frames whole request" 413 ); 414 415 let chunks = body.len().div_ceil(READ_CHUNK); 416 let chunked_start = Instant::now(); 417 let mut framer = ReceiveFramer::new(limits(), SHA1.kind()); 418 let mut framed_at = None; 419 (1..=chunks).for_each(|n| { 420 let end = (n * READ_CHUNK).min(body.len()); 421 if framed_at.is_none() 422 && let Some(total) = framer.advance(&body[..end]).unwrap() 423 { 424 framed_at = Some(total); 425 } 426 }); 427 let chunked = chunked_start.elapsed(); 428 assert_eq!( 429 framed_at, 430 Some(body.len()), 431 "feeding the same body in 64KB chunks frames it at identical length" 432 ); 433 434 let ratio = chunked.as_secs_f64() / single.as_secs_f64().max(1e-6); 435 assert!( 436 ratio < 5.0, 437 "resumable framer never re-inflates settled objects, so the {chunks}-chunk feed \ 438 must stay within a small constant of one pass; got {ratio:.2}x" 439 ); 440} 441 442#[test] 443fn receive_request_completes_only_when_the_whole_pack_has_arrived() { 444 let scan = tempfile::tempdir().unwrap(); 445 let layout = Layout::new(scan.path()); 446 let did = RepoDid::new("did:plc:squid").unwrap(); 447 let (_bare, c1, pack) = seeded_pack(&layout, &did); 448 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack); 449 450 assert_eq!( 451 receive_request_complete(&request[..request.len() / 2], &limits(), SHA1.kind()).unwrap(), 452 None, 453 "half-delivered request is not yet complete" 454 ); 455 assert_eq!( 456 receive_request_complete(&request, &limits(), SHA1.kind()).unwrap(), 457 Some(request.len()), 458 "whole request reports its exact length" 459 ); 460 let mut trailing = request.clone(); 461 trailing.extend_from_slice(b"junk-after-the-pack"); 462 assert_eq!( 463 receive_request_complete(&trailing, &limits(), SHA1.kind()).unwrap(), 464 Some(request.len()), 465 "framer stops at the pack trailer, ignoring trailing bytes" 466 ); 467} 468 469#[test] 470fn the_scanner_refuses_an_object_over_the_per_object_cap() { 471 let scan = tempfile::tempdir().unwrap(); 472 let layout = Layout::new(scan.path()); 473 let did = RepoDid::new("did:plc:squid").unwrap(); 474 let (_bare, c1, pack) = seeded_pack(&layout, &did); 475 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack); 476 let tight = PackLimits { 477 max_object_bytes: 1, 478 ..PackLimits::default() 479 }; 480 assert!( 481 receive_request_complete(&request, &tight, SHA1.kind()).is_err(), 482 "object beyond the per-object cap is refused by the framer" 483 ); 484} 485 486#[test] 487fn sweep_incoming_removes_abandoned_staging_directories() { 488 let scan = tempfile::tempdir().unwrap(); 489 let layout = Layout::new(scan.path()); 490 let did = RepoDid::new("did:plc:squid").unwrap(); 491 let bare = layout.create(&did).unwrap(); 492 493 let staging = bare.path().join(".knot2-incoming.4242.0"); 494 std::fs::create_dir_all(staging.join("objects")).unwrap(); 495 std::fs::write(staging.join("objects").join("leftover"), b"x").unwrap(); 496 assert!(staging.exists()); 497 498 let swept = sweep_incoming(scan.path()); 499 assert_eq!(swept, 1, "exactly one staging directory is swept"); 500 assert!(!staging.exists(), "abandoned staging directory is gone"); 501 assert!( 502 bare.path().join("objects").exists(), 503 "live repository is untouched" 504 ); 505}