Now let's take a silly one
1use std::collections::HashMap;
2
3use knot_git::{Filter, RefUpdate, Repo};
4use knot_types::{ObjectFormat, Oid, RefName};
5
6use crate::error::PackError;
7use crate::meter::PackLimits;
8use crate::objects;
9use crate::pkt;
10use crate::quarantine::Quarantine;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum RefDecision {
14 Allow,
15 Reject(String),
16}
17
18pub struct ReceiveOutcome {
19 pub report: Vec<u8>,
20 pub side_band: bool,
21 pub push_options: Vec<String>,
22}
23
24pub trait ReceiveGuard {
25 fn authorize(&self, staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision>;
26}
27
28const CAPS_BASE: &str =
29 "report-status delete-refs atomic ofs-delta side-band-64k push-options agent=knot2/0";
30
31fn caps(format: ObjectFormat) -> String {
32 format!("{CAPS_BASE} object-format={}", format.capability())
33}
34
35pub fn advertise(repo: &Repo) -> Result<Vec<u8>, PackError> {
36 let mut buf = Vec::new();
37 pkt::write_data(&mut buf, b"# service=git-receive-pack\n")?;
38 pkt::write_flush(&mut buf)?;
39 write_advert(&mut buf, repo)?;
40 Ok(buf)
41}
42
43pub fn advertise_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> {
44 let mut buf = Vec::new();
45 write_advert(&mut buf, repo)?;
46 Ok(buf)
47}
48
49fn write_advert(buf: &mut Vec<u8>, repo: &Repo) -> Result<(), PackError> {
50 let format = repo.object_format();
51 let caps = caps(format);
52 let refs = repo.advertised_refs_for(knot_git::AdvertScope::Receive)?;
53 match refs.split_first() {
54 Some((first, rest)) => {
55 let mut line = format!("{} {}", first.target, first.name).into_bytes();
56 line.push(0);
57 line.extend_from_slice(caps.as_bytes());
58 line.push(b'\n');
59 pkt::write_data(buf, &line)?;
60 rest.iter().try_fold(&mut *buf, |buf, record| {
61 pkt::write_data(
62 buf,
63 format!("{} {}\n", record.target, record.name).as_bytes(),
64 )?;
65 Ok::<_, PackError>(buf)
66 })?;
67 }
68 None => {
69 let mut line = format!("{} capabilities^{{}}", format.null_oid()).into_bytes();
70 line.push(0);
71 line.extend_from_slice(caps.as_bytes());
72 line.push(b'\n');
73 pkt::write_data(buf, &line)?;
74 }
75 }
76 pkt::write_flush(buf)?;
77 Ok(())
78}
79
80pub struct ReceiveCommand {
81 old: Oid,
82 new: Oid,
83 refname: String,
84}
85
86impl ReceiveCommand {
87 pub fn refname(&self) -> &str {
88 &self.refname
89 }
90
91 pub fn is_delete(&self) -> bool {
92 self.new.is_null()
93 }
94
95 pub fn is_create(&self) -> bool {
96 self.old.is_null()
97 }
98
99 fn parse(line: &[u8], first: bool) -> Option<ReceiveCommand> {
100 let line = if first {
101 line.split(|byte| *byte == 0).next().unwrap_or(line)
102 } else {
103 line
104 };
105 let text = std::str::from_utf8(line).ok()?;
106 let mut parts = text.trim_end().split(' ');
107 Some(ReceiveCommand {
108 old: Oid::from_hex(parts.next()?).ok()?,
109 new: Oid::from_hex(parts.next()?).ok()?,
110 refname: parts.next()?.to_string(),
111 })
112 }
113
114 fn to_update(&self) -> Result<RefUpdate, PackError> {
115 let name = RefName::new(self.refname.clone())
116 .map_err(|error| PackError::Protocol(error.to_string()))?;
117 Ok(match (self.old.is_null(), self.new.is_null()) {
118 (_, true) => RefUpdate::Delete {
119 name,
120 old: self.old,
121 },
122 (true, false) => RefUpdate::Create {
123 name,
124 new: self.new,
125 },
126 (false, false) => RefUpdate::Update {
127 name,
128 old: self.old,
129 new: self.new,
130 },
131 })
132 }
133}
134
135fn forbidden_ref(refname: &str) -> Option<String> {
136 (!knot_git::is_public_ref(refname))
137 .then(|| "refs/cobs/* and refs/hidden/* are reserved and cannot be pushed".to_string())
138}
139
140fn reserved_create_only(command: &ReceiveCommand) -> Option<String> {
141 (knot_git::is_reserved(command.refname()) && !command.is_create()).then(|| {
142 "existing refs/cobs/* object cannot be modified or deleted over the wire".to_string()
143 })
144}
145
146struct RefSnapshot {
147 by_name: HashMap<String, Oid>,
148}
149
150impl RefSnapshot {
151 fn capture(repo: &Repo) -> Result<RefSnapshot, PackError> {
152 let by_name = repo
153 .references()?
154 .into_iter()
155 .map(|record| (record.name.as_str().to_string(), record.target))
156 .collect();
157 Ok(RefSnapshot { by_name })
158 }
159
160 fn tips(&self) -> Vec<Oid> {
161 self.by_name.values().copied().collect()
162 }
163
164 fn conflict(&self, command: &ReceiveCommand) -> Option<String> {
165 match (
166 command.old.is_null(),
167 self.by_name.get(&command.refname).copied(),
168 ) {
169 (true, Some(_)) => Some("reference already exists".to_string()),
170 (false, found) if found != Some(command.old) => {
171 Some("stale info: old value does not match".to_string())
172 }
173 _ => None,
174 }
175 }
176}
177
178fn first_conflict(snapshot: &RefSnapshot, commands: &[ReceiveCommand]) -> Option<(String, String)> {
179 commands.iter().find_map(|command| {
180 snapshot
181 .conflict(command)
182 .map(|reason| (command.refname.clone(), reason))
183 })
184}
185
186fn objects_present(repo: &Repo, wants: &[Oid], haves: &[Oid]) -> bool {
187 matches!(
188 repo.select_pack_objects_filtered(
189 wants,
190 haves,
191 Filter::None,
192 crate::upload::selection_budget(),
193 ),
194 Ok(selection) if selection.send.iter().all(|oid| repo.contains(*oid))
195 )
196}
197
198fn connectivity_reasons(
199 repo: &Repo,
200 commands: &[ReceiveCommand],
201 haves: &[Oid],
202) -> Vec<Option<String>> {
203 let news: Vec<Oid> = commands
204 .iter()
205 .map(|command| command.new)
206 .filter(|new| !new.is_null())
207 .collect();
208 let batched_ok = news.is_empty() || objects_present(repo, &news, haves);
209 commands
210 .iter()
211 .map(|command| {
212 if command.new.is_null() || batched_ok || objects_present(repo, &[command.new], haves) {
213 None
214 } else {
215 Some("missing necessary objects".to_string())
216 }
217 })
218 .collect()
219}
220
221pub(crate) fn fuzz(body: &[u8]) {
222 if let Ok(parsed) = pkt::split_receive(body) {
223 parsed
224 .commands
225 .iter()
226 .enumerate()
227 .for_each(|(index, line)| {
228 if let Some(command) = ReceiveCommand::parse(line, index == 0) {
229 let _ = command.to_update();
230 }
231 });
232 }
233}
234
235pub fn handle(repo: &Repo, body: &[u8], limits: &PackLimits) -> Result<Vec<u8>, PackError> {
236 let parsed = pkt::split_receive(body)?;
237 let atomic = parsed.caps.atomic;
238 let commands = parse_commands(&parsed);
239
240 let unpack = objects::index_pack(
241 &repo.objects_dir(),
242 parsed.pack,
243 limits,
244 repo.object_format().kind(),
245 );
246 let results: Vec<(String, Option<String>)> = match &unpack {
247 Err(_) => all_failed(&commands, "unpacker error"),
248 Ok(()) => match RefSnapshot::capture(repo) {
249 Err(_) => all_failed(&commands, "ref snapshot unavailable"),
250 Ok(snapshot) => {
251 let haves = snapshot.tips();
252 if atomic {
253 apply_atomic(repo, &snapshot, &commands, &haves)
254 } else {
255 commands
256 .iter()
257 .zip(connectivity_reasons(repo, &commands, &haves))
258 .map(|(command, connectivity)| {
259 (
260 command.refname.clone(),
261 apply_one(repo, command, connectivity),
262 )
263 })
264 .collect()
265 }
266 }
267 },
268 };
269 report(&unpack, &results)
270 .map(|report| pkt::frame_report(&report, &[], parsed.caps.side_band_64k))
271}
272
273fn apply_one(
274 repo: &Repo,
275 command: &ReceiveCommand,
276 connectivity: Option<String>,
277) -> Option<String> {
278 if let Some(reason) = forbidden_ref(&command.refname) {
279 return Some(reason);
280 }
281 if let Some(reason) = connectivity {
282 return Some(reason);
283 }
284 command
285 .to_update()
286 .and_then(|update| repo.update_ref(&update).map_err(PackError::from))
287 .err()
288 .map(|error| error.to_string().replace('\n', " "))
289}
290
291fn atomic_failure(
292 snapshot: &RefSnapshot,
293 commands: &[ReceiveCommand],
294) -> Vec<(String, Option<String>)> {
295 let conflict = first_conflict(snapshot, commands);
296 commands
297 .iter()
298 .map(|command| match &conflict {
299 Some((refname, reason)) if *refname == command.refname => {
300 (command.refname.clone(), Some(reason.clone()))
301 }
302 _ => (
303 command.refname.clone(),
304 Some("atomic transaction failed".to_string()),
305 ),
306 })
307 .collect()
308}
309
310fn apply_atomic(
311 repo: &Repo,
312 snapshot: &RefSnapshot,
313 commands: &[ReceiveCommand],
314 haves: &[Oid],
315) -> Vec<(String, Option<String>)> {
316 let fail = |reason: String| -> Vec<(String, Option<String>)> {
317 commands
318 .iter()
319 .map(|command| (command.refname.clone(), Some(reason.clone())))
320 .collect()
321 };
322 if commands
323 .iter()
324 .any(|command| forbidden_ref(&command.refname).is_some())
325 {
326 return commands
327 .iter()
328 .map(|command| {
329 let reason = forbidden_ref(&command.refname)
330 .unwrap_or_else(|| "atomic push aborted".to_string());
331 (command.refname.clone(), Some(reason))
332 })
333 .collect();
334 }
335 if let Some(command) = commands
336 .iter()
337 .zip(connectivity_reasons(repo, commands, haves))
338 .find_map(|(command, reason)| reason.map(|_| command))
339 {
340 return fail(format!("missing necessary objects for {}", command.refname));
341 }
342 let updates = match commands
343 .iter()
344 .map(ReceiveCommand::to_update)
345 .collect::<Result<Vec<_>, _>>()
346 {
347 Ok(updates) => updates,
348 Err(error) => return fail(error.to_string().replace('\n', " ")),
349 };
350 match repo.update_refs(&updates) {
351 Ok(()) => commands
352 .iter()
353 .map(|command| (command.refname.clone(), None))
354 .collect(),
355 Err(_) => atomic_failure(snapshot, commands),
356 }
357}
358
359fn report(
360 unpack: &Result<(), PackError>,
361 results: &[(String, Option<String>)],
362) -> Result<Vec<u8>, PackError> {
363 let mut buf = Vec::new();
364 match unpack {
365 Ok(()) => pkt::write_data(&mut buf, b"unpack ok\n")?,
366 Err(error) => pkt::write_data(
367 &mut buf,
368 format!("unpack {}\n", error.to_string().replace('\n', " ")).as_bytes(),
369 )?,
370 }
371 results
372 .iter()
373 .try_fold(&mut buf, |buf, (refname, failure)| {
374 let line = match failure {
375 None => format!("ok {refname}\n"),
376 Some(reason) => format!("ng {refname} {reason}\n"),
377 };
378 pkt::write_data(buf, line.as_bytes())?;
379 Ok::<_, PackError>(buf)
380 })?;
381 pkt::write_flush(&mut buf)?;
382 Ok(buf)
383}
384
385fn parse_commands(parsed: &pkt::Receive) -> Vec<ReceiveCommand> {
386 parsed
387 .commands
388 .iter()
389 .enumerate()
390 .filter_map(|(index, line)| ReceiveCommand::parse(line, index == 0))
391 .collect()
392}
393
394pub struct Preflight {
395 pub creates_branch: bool,
396}
397
398pub(crate) fn preflight(body: &[u8]) -> Preflight {
399 pkt::split_receive(body)
400 .map(|parsed| {
401 let commands = parse_commands(&parsed);
402 Preflight {
403 creates_branch: commands
404 .iter()
405 .any(|command| command.is_create() && knot_git::is_branch(command.refname())),
406 }
407 })
408 .unwrap_or(Preflight {
409 creates_branch: false,
410 })
411}
412
413fn all_failed(commands: &[ReceiveCommand], reason: &str) -> Vec<(String, Option<String>)> {
414 commands
415 .iter()
416 .map(|command| (command.refname.clone(), Some(reason.to_string())))
417 .collect()
418}
419
420fn stage_to_quarantine(staged: &Repo, commands: &[ReceiveCommand]) {
421 commands
422 .iter()
423 .filter(|command| !command.is_delete())
424 .for_each(|command| {
425 if let Ok(name) = RefName::new(command.refname.clone()) {
426 let _ = staged.update_ref(&RefUpdate::Create {
427 name,
428 new: command.new,
429 });
430 }
431 });
432}
433
434fn apply_guarded_atomic(
435 live: &Repo,
436 snapshot: &RefSnapshot,
437 commands: &[ReceiveCommand],
438 seal: &dyn Fn(&[RefUpdate]),
439) -> Vec<(String, Option<String>)> {
440 let updates = match commands
441 .iter()
442 .map(ReceiveCommand::to_update)
443 .collect::<Result<Vec<_>, _>>()
444 {
445 Ok(updates) => updates,
446 Err(error) => return all_failed(commands, &error.to_string().replace('\n', " ")),
447 };
448 match live.update_refs_sealed(&updates, || seal(&updates)) {
449 Ok(()) => commands
450 .iter()
451 .map(|command| (command.refname.clone(), None))
452 .collect(),
453 Err(_) => atomic_failure(snapshot, commands),
454 }
455}
456
457fn apply_guarded_update(
458 live: &Repo,
459 command: &ReceiveCommand,
460 seal: &dyn Fn(&[RefUpdate]),
461) -> Option<String> {
462 let update = match command.to_update() {
463 Ok(update) => update,
464 Err(error) => return Some(error.to_string().replace('\n', " ")),
465 };
466 match live.update_ref_sealed(&update, || seal(std::slice::from_ref(&update))) {
467 Ok(()) => None,
468 Err(error) => Some(PackError::from(error).to_string().replace('\n', " ")),
469 }
470}
471
472fn parse_push_options(parsed: &pkt::Receive) -> Vec<String> {
473 parsed
474 .options
475 .iter()
476 .map(|option| {
477 String::from_utf8_lossy(option)
478 .trim_matches(|byte: char| byte == '\n' || byte == '\r')
479 .to_string()
480 })
481 .filter(|option| !option.is_empty())
482 .collect()
483}
484
485pub fn handle_guarded(
486 live: &Repo,
487 body: &[u8],
488 limits: &PackLimits,
489 guard: &dyn ReceiveGuard,
490 seal: &dyn Fn(&[RefUpdate]),
491) -> Result<ReceiveOutcome, PackError> {
492 let parsed = pkt::split_receive(body)?;
493 let atomic = parsed.caps.atomic;
494 let side_band = parsed.caps.side_band_64k;
495 let push_options = parse_push_options(&parsed);
496 let build = |report: Vec<u8>| ReceiveOutcome {
497 report,
498 side_band,
499 push_options: push_options.clone(),
500 };
501 let commands = parse_commands(&parsed);
502 if commands.is_empty() {
503 return report(&Ok(()), &[]).map(build);
504 }
505
506 let quarantine = match Quarantine::stage(live, parsed.pack, limits, live.object_format().kind())
507 {
508 Ok(quarantine) => quarantine,
509 Err(error) => {
510 let results = all_failed(&commands, "unpacker error");
511 return report(&Err(error), &results).map(build);
512 }
513 };
514 let staged = quarantine.repo();
515 stage_to_quarantine(staged, &commands);
516 let snapshot = match RefSnapshot::capture(live) {
517 Ok(snapshot) => snapshot,
518 Err(_) => {
519 let results = all_failed(&commands, "ref snapshot unavailable");
520 return report(&Ok(()), &results).map(build);
521 }
522 };
523 let haves = snapshot.tips();
524
525 let verdicts = guard.authorize(staged, &commands);
526 let reasons: Vec<Option<String>> = if verdicts.len() == commands.len() {
527 commands
528 .iter()
529 .zip(verdicts)
530 .zip(connectivity_reasons(staged, &commands, &haves))
531 .map(|((command, verdict), connectivity)| match verdict {
532 RefDecision::Reject(reason) => Some(reason),
533 RefDecision::Allow => reserved_create_only(command)
534 .or(connectivity)
535 .or_else(|| snapshot.conflict(command)),
536 })
537 .collect()
538 } else {
539 commands
540 .iter()
541 .map(|_| Some("authorization unavailable".to_string()))
542 .collect()
543 };
544
545 let any_reject = reasons.iter().any(Option::is_some);
546 if atomic && any_reject {
547 let results = commands
548 .iter()
549 .zip(reasons)
550 .map(|(command, reason)| {
551 (
552 command.refname.clone(),
553 Some(reason.unwrap_or_else(|| "atomic push aborted".to_string())),
554 )
555 })
556 .collect::<Vec<_>>();
557 return report(&Ok(()), &results).map(build);
558 }
559
560 if reasons.iter().any(Option::is_none)
561 && let Err(error) = quarantine.migrate_into(live)
562 {
563 let results = all_failed(&commands, "object migration failed");
564 return report(&Err(error), &results).map(build);
565 }
566
567 let results = if atomic {
568 apply_guarded_atomic(live, &snapshot, &commands, seal)
569 } else {
570 commands
571 .iter()
572 .zip(reasons)
573 .map(|(command, reason)| match reason {
574 Some(reason) => (command.refname.clone(), Some(reason)),
575 None => (
576 command.refname.clone(),
577 apply_guarded_update(live, command, seal),
578 ),
579 })
580 .collect()
581 };
582 report(&Ok(()), &results).map(build)
583}