Now let's take a silly one
1use std::path::Path;
2use std::time::Instant;
3
4use knot_cob::{CobHome, CobStore};
5use knot_cobs::{Grant, MembersChange};
6use knot_git::{Layout, Repo};
7use knot_pack::{
8 PackLimits, ReceiveCommand, ReceiveFramer, ReceiveGuard, RefDecision, receive_pack_guarded,
9 receive_request_complete, sweep_incoming,
10};
11use knot_runtime::{K256Signer, SeededEntropy, Signer};
12use knot_types::{AccountDid, ActorId, ObjectFormat, Oid, RepoDid, UnixSeconds};
13
14const SHA1: ObjectFormat = ObjectFormat::SHA1;
15
16mod common;
17use common::{commit, must, pack_objects, receive_request};
18
19fn limits() -> PackLimits {
20 PackLimits::default()
21}
22
23fn seeded_pack(layout: &Layout, did: &RepoDid) -> (Repo, String, Vec<u8>) {
24 let bare = layout.create(did).unwrap();
25 let work_dir = tempfile::tempdir().unwrap();
26 let work = work_dir.path();
27 must(work, &["init", "-q", "-b", "main"]);
28 commit(work, "a.txt", "x\n", "c1");
29 let c1 = must(work, &["rev-parse", "HEAD"]);
30 let oids: Vec<String> = must(work, &["rev-list", "--objects", &c1])
31 .lines()
32 .map(|line| line.split_whitespace().next().unwrap().to_string())
33 .collect();
34 let pack = pack_objects(work, &oids);
35 (bare, c1, pack)
36}
37
38fn report_text(report: &[u8]) -> String {
39 String::from_utf8_lossy(report).replace('\0', "")
40}
41
42fn dir_count(path: &Path) -> usize {
43 std::fs::read_dir(path)
44 .map(|entries| entries.filter_map(Result::ok).count())
45 .unwrap_or(0)
46}
47
48fn live_object_count(repo: &Repo) -> usize {
49 let objects = repo.objects_dir();
50 let loose: usize = std::fs::read_dir(&objects)
51 .map(|entries| {
52 entries
53 .filter_map(Result::ok)
54 .filter(|entry| {
55 entry
56 .file_name()
57 .to_str()
58 .is_some_and(|name| name.len() == 2)
59 && entry.path().is_dir()
60 })
61 .map(|shard| dir_count(&shard.path()))
62 .sum()
63 })
64 .unwrap_or(0);
65 let packs = std::fs::read_dir(objects.join("pack"))
66 .map(|entries| {
67 entries
68 .filter_map(Result::ok)
69 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "pack"))
70 .count()
71 })
72 .unwrap_or(0);
73 loose + packs
74}
75
76struct AllowPublic;
77
78impl ReceiveGuard for AllowPublic {
79 fn authorize(&self, _staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> {
80 commands
81 .iter()
82 .map(|command| {
83 if knot_git::is_public_ref(command.refname()) {
84 RefDecision::Allow
85 } else {
86 RefDecision::Reject("not public ref".to_string())
87 }
88 })
89 .collect()
90 }
91}
92
93struct DenyAll;
94
95impl ReceiveGuard for DenyAll {
96 fn authorize(&self, _staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> {
97 commands
98 .iter()
99 .map(|_| RefDecision::Reject("unauthorized".to_string()))
100 .collect()
101 }
102}
103
104struct AllowAll;
105
106impl ReceiveGuard for AllowAll {
107 fn authorize(&self, _staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> {
108 commands.iter().map(|_| RefDecision::Allow).collect()
109 }
110}
111
112struct CobVerify {
113 owner: ActorId,
114 home: CobHome,
115}
116
117impl ReceiveGuard for CobVerify {
118 fn authorize(&self, staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision> {
119 commands
120 .iter()
121 .map(|command| {
122 let store = CobStore::new(staged);
123 match knot_cobs::verify_cob_ref(&store, &self.home, command.refname(), &self.owner)
124 {
125 Ok(_) => RefDecision::Allow,
126 Err(error) => RefDecision::Reject(error.to_string()),
127 }
128 })
129 .collect()
130 }
131}
132
133#[test]
134fn an_authorized_push_migrates_objects_and_updates_the_ref() {
135 let scan = tempfile::tempdir().unwrap();
136 let layout = Layout::new(scan.path());
137 let did = RepoDid::new("did:plc:squid").unwrap();
138 let (bare, c1, pack) = seeded_pack(&layout, &did);
139
140 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack);
141 let report = receive_pack_guarded(&bare, &request, &limits(), &AllowPublic, &|_| {})
142 .unwrap()
143 .report;
144 let report = report_text(&report);
145 assert!(report.contains("unpack ok"), "{report}");
146 assert!(report.contains("ok refs/heads/main"), "{report}");
147
148 let refs = bare.references().unwrap();
149 assert_eq!(refs.len(), 1);
150 assert_eq!(refs[0].name.as_str(), "refs/heads/main");
151 assert_eq!(refs[0].target, Oid::from_hex(&c1).unwrap());
152 assert!(bare.contains(Oid::from_hex(&c1).unwrap()));
153}
154
155#[test]
156fn a_denied_push_leaves_no_objects_in_the_live_odb() {
157 let scan = tempfile::tempdir().unwrap();
158 let layout = Layout::new(scan.path());
159 let did = RepoDid::new("did:plc:squid").unwrap();
160 let (bare, c1, pack) = seeded_pack(&layout, &did);
161
162 assert_eq!(
163 live_object_count(&bare),
164 0,
165 "fresh bare repo has no objects"
166 );
167 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack);
168 let report = receive_pack_guarded(&bare, &request, &limits(), &DenyAll, &|_| {})
169 .unwrap()
170 .report;
171 let report = report_text(&report);
172 assert!(
173 report.contains("ng refs/heads/main unauthorized"),
174 "{report}"
175 );
176
177 assert!(
178 bare.references().unwrap().is_empty(),
179 "denied push must not create the ref"
180 );
181 assert_eq!(
182 live_object_count(&bare),
183 0,
184 "denied push must leave no objects in the live odb"
185 );
186 assert!(!bare.contains(Oid::from_hex(&c1).unwrap()));
187}
188
189#[test]
190fn a_stale_non_fast_forward_push_is_rejected_and_leaves_no_new_objects() {
191 let scan = tempfile::tempdir().unwrap();
192 let layout = Layout::new(scan.path());
193 let did = RepoDid::new("did:plc:squid").unwrap();
194 let (bare, c1, pack) = seeded_pack(&layout, &did);
195
196 receive_pack_guarded(
197 &bare,
198 &receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack),
199 &limits(),
200 &AllowPublic,
201 &|_| {},
202 )
203 .unwrap();
204 let after_first = live_object_count(&bare);
205
206 let wrong_old = "1".repeat(40);
207 let fresh = "2".repeat(40);
208 let report = receive_pack_guarded(
209 &bare,
210 &receive_request("refs/heads/main", &wrong_old, &fresh, b""),
211 &limits(),
212 &AllowPublic,
213 &|_| {},
214 )
215 .unwrap()
216 .report;
217 let report = report_text(&report);
218 assert!(report.contains("ng refs/heads/main"), "{report}");
219 assert_eq!(
220 bare.find_ref(&knot_types::RefName::new("refs/heads/main").unwrap())
221 .unwrap(),
222 Some(Oid::from_hex(&c1).unwrap()),
223 "stale push must not move the ref"
224 );
225 assert_eq!(
226 live_object_count(&bare),
227 after_first,
228 "stale push must not add objects to the live odb"
229 );
230}
231
232#[test]
233fn a_cob_ref_verifies_against_the_owner_key_and_is_refused_for_a_stranger() {
234 let scan = tempfile::tempdir().unwrap();
235 let layout = Layout::new(scan.path());
236 let signer = K256Signer::generate(&SeededEntropy::new(7));
237
238 let source_did = RepoDid::new("did:plc:source").unwrap();
239 let home = CobHome::from(&source_did);
240 let source = layout.create(&source_did).unwrap();
241 let store = CobStore::new(&source);
242 let grant = Grant {
243 subject: AccountDid::new("did:plc:nel").unwrap(),
244 added_by: AccountDid::new("did:plc:nel").unwrap(),
245 created_at: UnixSeconds::new(1),
246 };
247 let created = store
248 .create(
249 &home,
250 &MembersChange::Add(grant),
251 &signer,
252 UnixSeconds::new(1),
253 )
254 .unwrap();
255 let tip = created.tip.oid().to_hex();
256 let refname = format!(
257 "refs/cobs/sh.tangled.knot.member/{}",
258 created.object.oid().to_hex()
259 );
260
261 let reachable: Vec<String> = must(source.path(), &["rev-list", "--objects", &tip])
262 .lines()
263 .map(|line| line.split_whitespace().next().unwrap().to_string())
264 .collect();
265 let pack = pack_objects(source.path(), &reachable);
266 let owner = ActorId::from_secp256k1(signer.public_key().as_bytes());
267
268 let dest = layout
269 .create(&RepoDid::new("did:plc:dest").unwrap())
270 .unwrap();
271 let report = receive_pack_guarded(
272 &dest,
273 &receive_request(&refname, &Oid::null().to_hex(), &tip, &pack),
274 &limits(),
275 &CobVerify {
276 owner: owner.clone(),
277 home: home.clone(),
278 },
279 &|_| {},
280 )
281 .unwrap()
282 .report;
283 assert!(
284 report_text(&report).contains(&format!("ok {refname}")),
285 "owner-signed COB ref must be accepted at the receive boundary: {}",
286 report_text(&report)
287 );
288
289 let stranger_key = K256Signer::generate(&SeededEntropy::new(9));
290 let stranger = ActorId::from_secp256k1(stranger_key.public_key().as_bytes());
291 let dest2 = layout
292 .create(&RepoDid::new("did:plc:dest2").unwrap())
293 .unwrap();
294 let report = receive_pack_guarded(
295 &dest2,
296 &receive_request(&refname, &Oid::null().to_hex(), &tip, &pack),
297 &limits(),
298 &CobVerify {
299 owner: stranger,
300 home: home.clone(),
301 },
302 &|_| {},
303 )
304 .unwrap()
305 .report;
306 assert!(
307 report_text(&report).contains(&format!("ng {refname}")),
308 "COB ref not signed by the resolved owner key must be refused: {}",
309 report_text(&report)
310 );
311 assert_eq!(
312 live_object_count(&dest2),
313 0,
314 "refused COB ref push leaves no objects behind"
315 );
316}
317
318#[test]
319fn a_reserved_ref_update_is_refused_even_when_the_guard_allows_it() {
320 let scan = tempfile::tempdir().unwrap();
321 let layout = Layout::new(scan.path());
322 let did = RepoDid::new("did:plc:squid").unwrap();
323 let bare = layout.create(&did).unwrap();
324
325 let cob_ref = format!("refs/cobs/sh.tangled.knot.member/{}", "a".repeat(40));
326 let old = "1".repeat(40);
327 let new = "2".repeat(40);
328 let report = receive_pack_guarded(
329 &bare,
330 &receive_request(&cob_ref, &old, &new, b""),
331 &limits(),
332 &AllowAll,
333 &|_| {},
334 )
335 .unwrap()
336 .report;
337 let report = report_text(&report);
338
339 assert!(
340 report.contains(&format!("ng {cob_ref}")),
341 "non-create update to a reserved ref must be refused even under an allow-all guard: {report}"
342 );
343 assert!(
344 report.contains("cannot be modified"),
345 "refusal must name the create-only rule, not connectivity or compare-and-swap: {report}"
346 );
347 assert!(
348 bare.references().unwrap().is_empty(),
349 "refused reserved-ref update must land nothing"
350 );
351 assert_eq!(
352 live_object_count(&bare),
353 0,
354 "refused reserved-ref update must migrate no objects"
355 );
356}
357
358fn pseudo_random(seed: u64, len: usize) -> Vec<u8> {
359 let mut state = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(1);
360 (0..len)
361 .map(|_| {
362 state ^= state << 13;
363 state ^= state >> 7;
364 state ^= state << 17;
365 (state >> 24) as u8
366 })
367 .collect()
368}
369
370fn incompressible_pack(megabytes: usize) -> Vec<u8> {
371 let work_dir = tempfile::tempdir().unwrap();
372 let work = work_dir.path();
373 must(work, &["init", "-q", "-b", "main"]);
374 (0..megabytes).for_each(|i| {
375 let blob = pseudo_random(i as u64 + 1, 1024 * 1024);
376 std::fs::write(work.join(format!("blob-{i:04}.bin")), blob).unwrap();
377 });
378 must(work, &["add", "-A"]);
379 must(work, &["commit", "-q", "-m", "bulk"]);
380 let head = must(work, &["rev-parse", "HEAD"]);
381 let oids: Vec<String> = must(work, &["rev-list", "--objects", &head])
382 .lines()
383 .map(|line| line.split_whitespace().next().unwrap().to_string())
384 .collect();
385 pack_objects(work, &oids)
386}
387
388#[test]
389fn the_receive_framer_scans_incrementally_in_linear_time() {
390 const READ_CHUNK: usize = 64 * 1024;
391 let pack = incompressible_pack(24);
392 let body = receive_request(
393 "refs/heads/main",
394 &Oid::null().to_hex(),
395 &"1".repeat(40),
396 &pack,
397 );
398 assert!(
399 pack.len() > 8 * 1024 * 1024,
400 "pack must be large enough to expose any quadratic scaling: {} bytes",
401 pack.len()
402 );
403
404 let single_start = Instant::now();
405 let complete = ReceiveFramer::new(limits(), SHA1.kind())
406 .advance(&body)
407 .unwrap();
408 let single = single_start.elapsed();
409 assert_eq!(
410 complete,
411 Some(body.len()),
412 "one advance over the full body frames whole request"
413 );
414
415 let chunks = body.len().div_ceil(READ_CHUNK);
416 let chunked_start = Instant::now();
417 let mut framer = ReceiveFramer::new(limits(), SHA1.kind());
418 let mut framed_at = None;
419 (1..=chunks).for_each(|n| {
420 let end = (n * READ_CHUNK).min(body.len());
421 if framed_at.is_none()
422 && let Some(total) = framer.advance(&body[..end]).unwrap()
423 {
424 framed_at = Some(total);
425 }
426 });
427 let chunked = chunked_start.elapsed();
428 assert_eq!(
429 framed_at,
430 Some(body.len()),
431 "feeding the same body in 64KB chunks frames it at identical length"
432 );
433
434 let ratio = chunked.as_secs_f64() / single.as_secs_f64().max(1e-6);
435 assert!(
436 ratio < 5.0,
437 "resumable framer never re-inflates settled objects, so the {chunks}-chunk feed \
438 must stay within a small constant of one pass; got {ratio:.2}x"
439 );
440}
441
442#[test]
443fn receive_request_completes_only_when_the_whole_pack_has_arrived() {
444 let scan = tempfile::tempdir().unwrap();
445 let layout = Layout::new(scan.path());
446 let did = RepoDid::new("did:plc:squid").unwrap();
447 let (_bare, c1, pack) = seeded_pack(&layout, &did);
448 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack);
449
450 assert_eq!(
451 receive_request_complete(&request[..request.len() / 2], &limits(), SHA1.kind()).unwrap(),
452 None,
453 "half-delivered request is not yet complete"
454 );
455 assert_eq!(
456 receive_request_complete(&request, &limits(), SHA1.kind()).unwrap(),
457 Some(request.len()),
458 "whole request reports its exact length"
459 );
460 let mut trailing = request.clone();
461 trailing.extend_from_slice(b"junk-after-the-pack");
462 assert_eq!(
463 receive_request_complete(&trailing, &limits(), SHA1.kind()).unwrap(),
464 Some(request.len()),
465 "framer stops at the pack trailer, ignoring trailing bytes"
466 );
467}
468
469#[test]
470fn the_scanner_refuses_an_object_over_the_per_object_cap() {
471 let scan = tempfile::tempdir().unwrap();
472 let layout = Layout::new(scan.path());
473 let did = RepoDid::new("did:plc:squid").unwrap();
474 let (_bare, c1, pack) = seeded_pack(&layout, &did);
475 let request = receive_request("refs/heads/main", &Oid::null().to_hex(), &c1, &pack);
476 let tight = PackLimits {
477 max_object_bytes: 1,
478 ..PackLimits::default()
479 };
480 assert!(
481 receive_request_complete(&request, &tight, SHA1.kind()).is_err(),
482 "object beyond the per-object cap is refused by the framer"
483 );
484}
485
486#[test]
487fn sweep_incoming_removes_abandoned_staging_directories() {
488 let scan = tempfile::tempdir().unwrap();
489 let layout = Layout::new(scan.path());
490 let did = RepoDid::new("did:plc:squid").unwrap();
491 let bare = layout.create(&did).unwrap();
492
493 let staging = bare.path().join(".knot2-incoming.4242.0");
494 std::fs::create_dir_all(staging.join("objects")).unwrap();
495 std::fs::write(staging.join("objects").join("leftover"), b"x").unwrap();
496 assert!(staging.exists());
497
498 let swept = sweep_incoming(scan.path());
499 assert_eq!(swept, 1, "exactly one staging directory is swept");
500 assert!(!staging.exists(), "abandoned staging directory is gone");
501 assert!(
502 bare.path().join("objects").exists(),
503 "live repository is untouched"
504 );
505}