Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 18 kB View raw
1use std::collections::HashMap; 2 3use knot_git::{Filter, RefUpdate, Repo}; 4use knot_types::{ObjectFormat, Oid, RefName}; 5 6use crate::error::PackError; 7use crate::meter::PackLimits; 8use crate::objects; 9use crate::pkt; 10use crate::quarantine::Quarantine; 11 12#[derive(Debug, Clone, PartialEq, Eq)] 13pub enum RefDecision { 14 Allow, 15 Reject(String), 16} 17 18pub struct ReceiveOutcome { 19 pub report: Vec<u8>, 20 pub side_band: bool, 21 pub push_options: Vec<String>, 22} 23 24pub trait ReceiveGuard { 25 fn authorize(&self, staged: &Repo, commands: &[ReceiveCommand]) -> Vec<RefDecision>; 26} 27 28const CAPS_BASE: &str = 29 "report-status delete-refs atomic ofs-delta side-band-64k push-options agent=knot2/0"; 30 31fn caps(format: ObjectFormat) -> String { 32 format!("{CAPS_BASE} object-format={}", format.capability()) 33} 34 35pub fn advertise(repo: &Repo) -> Result<Vec<u8>, PackError> { 36 let mut buf = Vec::new(); 37 pkt::write_data(&mut buf, b"# service=git-receive-pack\n")?; 38 pkt::write_flush(&mut buf)?; 39 write_advert(&mut buf, repo)?; 40 Ok(buf) 41} 42 43pub fn advertise_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> { 44 let mut buf = Vec::new(); 45 write_advert(&mut buf, repo)?; 46 Ok(buf) 47} 48 49fn write_advert(buf: &mut Vec<u8>, repo: &Repo) -> Result<(), PackError> { 50 let format = repo.object_format(); 51 let caps = caps(format); 52 let refs = repo.advertised_refs_for(knot_git::AdvertScope::Receive)?; 53 match refs.split_first() { 54 Some((first, rest)) => { 55 let mut line = format!("{} {}", first.target, first.name).into_bytes(); 56 line.push(0); 57 line.extend_from_slice(caps.as_bytes()); 58 line.push(b'\n'); 59 pkt::write_data(buf, &line)?; 60 rest.iter().try_fold(&mut *buf, |buf, record| { 61 pkt::write_data( 62 buf, 63 format!("{} {}\n", record.target, record.name).as_bytes(), 64 )?; 65 Ok::<_, PackError>(buf) 66 })?; 67 } 68 None => { 69 let mut line = format!("{} capabilities^{{}}", format.null_oid()).into_bytes(); 70 line.push(0); 71 line.extend_from_slice(caps.as_bytes()); 72 line.push(b'\n'); 73 pkt::write_data(buf, &line)?; 74 } 75 } 76 pkt::write_flush(buf)?; 77 Ok(()) 78} 79 80pub struct ReceiveCommand { 81 old: Oid, 82 new: Oid, 83 refname: String, 84} 85 86impl ReceiveCommand { 87 pub fn refname(&self) -> &str { 88 &self.refname 89 } 90 91 pub fn is_delete(&self) -> bool { 92 self.new.is_null() 93 } 94 95 pub fn is_create(&self) -> bool { 96 self.old.is_null() 97 } 98 99 fn parse(line: &[u8], first: bool) -> Option<ReceiveCommand> { 100 let line = if first { 101 line.split(|byte| *byte == 0).next().unwrap_or(line) 102 } else { 103 line 104 }; 105 let text = std::str::from_utf8(line).ok()?; 106 let mut parts = text.trim_end().split(' '); 107 Some(ReceiveCommand { 108 old: Oid::from_hex(parts.next()?).ok()?, 109 new: Oid::from_hex(parts.next()?).ok()?, 110 refname: parts.next()?.to_string(), 111 }) 112 } 113 114 fn to_update(&self) -> Result<RefUpdate, PackError> { 115 let name = RefName::new(self.refname.clone()) 116 .map_err(|error| PackError::Protocol(error.to_string()))?; 117 Ok(match (self.old.is_null(), self.new.is_null()) { 118 (_, true) => RefUpdate::Delete { 119 name, 120 old: self.old, 121 }, 122 (true, false) => RefUpdate::Create { 123 name, 124 new: self.new, 125 }, 126 (false, false) => RefUpdate::Update { 127 name, 128 old: self.old, 129 new: self.new, 130 }, 131 }) 132 } 133} 134 135fn forbidden_ref(refname: &str) -> Option<String> { 136 (!knot_git::is_public_ref(refname)) 137 .then(|| "refs/cobs/* and refs/hidden/* are reserved and cannot be pushed".to_string()) 138} 139 140fn reserved_create_only(command: &ReceiveCommand) -> Option<String> { 141 (knot_git::is_reserved(command.refname()) && !command.is_create()).then(|| { 142 "existing refs/cobs/* object cannot be modified or deleted over the wire".to_string() 143 }) 144} 145 146struct RefSnapshot { 147 by_name: HashMap<String, Oid>, 148} 149 150impl RefSnapshot { 151 fn capture(repo: &Repo) -> Result<RefSnapshot, PackError> { 152 let by_name = repo 153 .references()? 154 .into_iter() 155 .map(|record| (record.name.as_str().to_string(), record.target)) 156 .collect(); 157 Ok(RefSnapshot { by_name }) 158 } 159 160 fn tips(&self) -> Vec<Oid> { 161 self.by_name.values().copied().collect() 162 } 163 164 fn conflict(&self, command: &ReceiveCommand) -> Option<String> { 165 match ( 166 command.old.is_null(), 167 self.by_name.get(&command.refname).copied(), 168 ) { 169 (true, Some(_)) => Some("reference already exists".to_string()), 170 (false, found) if found != Some(command.old) => { 171 Some("stale info: old value does not match".to_string()) 172 } 173 _ => None, 174 } 175 } 176} 177 178fn first_conflict(snapshot: &RefSnapshot, commands: &[ReceiveCommand]) -> Option<(String, String)> { 179 commands.iter().find_map(|command| { 180 snapshot 181 .conflict(command) 182 .map(|reason| (command.refname.clone(), reason)) 183 }) 184} 185 186fn objects_present(repo: &Repo, wants: &[Oid], haves: &[Oid]) -> bool { 187 matches!( 188 repo.select_pack_objects_filtered( 189 wants, 190 haves, 191 Filter::None, 192 crate::upload::selection_budget(), 193 ), 194 Ok(selection) if selection.send.iter().all(|oid| repo.contains(*oid)) 195 ) 196} 197 198fn connectivity_reasons( 199 repo: &Repo, 200 commands: &[ReceiveCommand], 201 haves: &[Oid], 202) -> Vec<Option<String>> { 203 let news: Vec<Oid> = commands 204 .iter() 205 .map(|command| command.new) 206 .filter(|new| !new.is_null()) 207 .collect(); 208 let batched_ok = news.is_empty() || objects_present(repo, &news, haves); 209 commands 210 .iter() 211 .map(|command| { 212 if command.new.is_null() || batched_ok || objects_present(repo, &[command.new], haves) { 213 None 214 } else { 215 Some("missing necessary objects".to_string()) 216 } 217 }) 218 .collect() 219} 220 221pub(crate) fn fuzz(body: &[u8]) { 222 if let Ok(parsed) = pkt::split_receive(body) { 223 parsed 224 .commands 225 .iter() 226 .enumerate() 227 .for_each(|(index, line)| { 228 if let Some(command) = ReceiveCommand::parse(line, index == 0) { 229 let _ = command.to_update(); 230 } 231 }); 232 } 233} 234 235pub fn handle(repo: &Repo, body: &[u8], limits: &PackLimits) -> Result<Vec<u8>, PackError> { 236 let parsed = pkt::split_receive(body)?; 237 let atomic = parsed.caps.atomic; 238 let commands = parse_commands(&parsed); 239 240 let unpack = objects::index_pack( 241 &repo.objects_dir(), 242 parsed.pack, 243 limits, 244 repo.object_format().kind(), 245 ); 246 let results: Vec<(String, Option<String>)> = match &unpack { 247 Err(_) => all_failed(&commands, "unpacker error"), 248 Ok(()) => match RefSnapshot::capture(repo) { 249 Err(_) => all_failed(&commands, "ref snapshot unavailable"), 250 Ok(snapshot) => { 251 let haves = snapshot.tips(); 252 if atomic { 253 apply_atomic(repo, &snapshot, &commands, &haves) 254 } else { 255 commands 256 .iter() 257 .zip(connectivity_reasons(repo, &commands, &haves)) 258 .map(|(command, connectivity)| { 259 ( 260 command.refname.clone(), 261 apply_one(repo, command, connectivity), 262 ) 263 }) 264 .collect() 265 } 266 } 267 }, 268 }; 269 report(&unpack, &results) 270 .map(|report| pkt::frame_report(&report, &[], parsed.caps.side_band_64k)) 271} 272 273fn apply_one( 274 repo: &Repo, 275 command: &ReceiveCommand, 276 connectivity: Option<String>, 277) -> Option<String> { 278 if let Some(reason) = forbidden_ref(&command.refname) { 279 return Some(reason); 280 } 281 if let Some(reason) = connectivity { 282 return Some(reason); 283 } 284 command 285 .to_update() 286 .and_then(|update| repo.update_ref(&update).map_err(PackError::from)) 287 .err() 288 .map(|error| error.to_string().replace('\n', " ")) 289} 290 291fn atomic_failure( 292 snapshot: &RefSnapshot, 293 commands: &[ReceiveCommand], 294) -> Vec<(String, Option<String>)> { 295 let conflict = first_conflict(snapshot, commands); 296 commands 297 .iter() 298 .map(|command| match &conflict { 299 Some((refname, reason)) if *refname == command.refname => { 300 (command.refname.clone(), Some(reason.clone())) 301 } 302 _ => ( 303 command.refname.clone(), 304 Some("atomic transaction failed".to_string()), 305 ), 306 }) 307 .collect() 308} 309 310fn apply_atomic( 311 repo: &Repo, 312 snapshot: &RefSnapshot, 313 commands: &[ReceiveCommand], 314 haves: &[Oid], 315) -> Vec<(String, Option<String>)> { 316 let fail = |reason: String| -> Vec<(String, Option<String>)> { 317 commands 318 .iter() 319 .map(|command| (command.refname.clone(), Some(reason.clone()))) 320 .collect() 321 }; 322 if commands 323 .iter() 324 .any(|command| forbidden_ref(&command.refname).is_some()) 325 { 326 return commands 327 .iter() 328 .map(|command| { 329 let reason = forbidden_ref(&command.refname) 330 .unwrap_or_else(|| "atomic push aborted".to_string()); 331 (command.refname.clone(), Some(reason)) 332 }) 333 .collect(); 334 } 335 if let Some(command) = commands 336 .iter() 337 .zip(connectivity_reasons(repo, commands, haves)) 338 .find_map(|(command, reason)| reason.map(|_| command)) 339 { 340 return fail(format!("missing necessary objects for {}", command.refname)); 341 } 342 let updates = match commands 343 .iter() 344 .map(ReceiveCommand::to_update) 345 .collect::<Result<Vec<_>, _>>() 346 { 347 Ok(updates) => updates, 348 Err(error) => return fail(error.to_string().replace('\n', " ")), 349 }; 350 match repo.update_refs(&updates) { 351 Ok(()) => commands 352 .iter() 353 .map(|command| (command.refname.clone(), None)) 354 .collect(), 355 Err(_) => atomic_failure(snapshot, commands), 356 } 357} 358 359fn report( 360 unpack: &Result<(), PackError>, 361 results: &[(String, Option<String>)], 362) -> Result<Vec<u8>, PackError> { 363 let mut buf = Vec::new(); 364 match unpack { 365 Ok(()) => pkt::write_data(&mut buf, b"unpack ok\n")?, 366 Err(error) => pkt::write_data( 367 &mut buf, 368 format!("unpack {}\n", error.to_string().replace('\n', " ")).as_bytes(), 369 )?, 370 } 371 results 372 .iter() 373 .try_fold(&mut buf, |buf, (refname, failure)| { 374 let line = match failure { 375 None => format!("ok {refname}\n"), 376 Some(reason) => format!("ng {refname} {reason}\n"), 377 }; 378 pkt::write_data(buf, line.as_bytes())?; 379 Ok::<_, PackError>(buf) 380 })?; 381 pkt::write_flush(&mut buf)?; 382 Ok(buf) 383} 384 385fn parse_commands(parsed: &pkt::Receive) -> Vec<ReceiveCommand> { 386 parsed 387 .commands 388 .iter() 389 .enumerate() 390 .filter_map(|(index, line)| ReceiveCommand::parse(line, index == 0)) 391 .collect() 392} 393 394pub struct Preflight { 395 pub creates_branch: bool, 396} 397 398pub(crate) fn preflight(body: &[u8]) -> Preflight { 399 pkt::split_receive(body) 400 .map(|parsed| { 401 let commands = parse_commands(&parsed); 402 Preflight { 403 creates_branch: commands 404 .iter() 405 .any(|command| command.is_create() && knot_git::is_branch(command.refname())), 406 } 407 }) 408 .unwrap_or(Preflight { 409 creates_branch: false, 410 }) 411} 412 413fn all_failed(commands: &[ReceiveCommand], reason: &str) -> Vec<(String, Option<String>)> { 414 commands 415 .iter() 416 .map(|command| (command.refname.clone(), Some(reason.to_string()))) 417 .collect() 418} 419 420fn stage_to_quarantine(staged: &Repo, commands: &[ReceiveCommand]) { 421 commands 422 .iter() 423 .filter(|command| !command.is_delete()) 424 .for_each(|command| { 425 if let Ok(name) = RefName::new(command.refname.clone()) { 426 let _ = staged.update_ref(&RefUpdate::Create { 427 name, 428 new: command.new, 429 }); 430 } 431 }); 432} 433 434fn apply_guarded_atomic( 435 live: &Repo, 436 snapshot: &RefSnapshot, 437 commands: &[ReceiveCommand], 438 seal: &dyn Fn(&[RefUpdate]), 439) -> Vec<(String, Option<String>)> { 440 let updates = match commands 441 .iter() 442 .map(ReceiveCommand::to_update) 443 .collect::<Result<Vec<_>, _>>() 444 { 445 Ok(updates) => updates, 446 Err(error) => return all_failed(commands, &error.to_string().replace('\n', " ")), 447 }; 448 match live.update_refs_sealed(&updates, || seal(&updates)) { 449 Ok(()) => commands 450 .iter() 451 .map(|command| (command.refname.clone(), None)) 452 .collect(), 453 Err(_) => atomic_failure(snapshot, commands), 454 } 455} 456 457fn apply_guarded_update( 458 live: &Repo, 459 command: &ReceiveCommand, 460 seal: &dyn Fn(&[RefUpdate]), 461) -> Option<String> { 462 let update = match command.to_update() { 463 Ok(update) => update, 464 Err(error) => return Some(error.to_string().replace('\n', " ")), 465 }; 466 match live.update_ref_sealed(&update, || seal(std::slice::from_ref(&update))) { 467 Ok(()) => None, 468 Err(error) => Some(PackError::from(error).to_string().replace('\n', " ")), 469 } 470} 471 472fn parse_push_options(parsed: &pkt::Receive) -> Vec<String> { 473 parsed 474 .options 475 .iter() 476 .map(|option| { 477 String::from_utf8_lossy(option) 478 .trim_matches(|byte: char| byte == '\n' || byte == '\r') 479 .to_string() 480 }) 481 .filter(|option| !option.is_empty()) 482 .collect() 483} 484 485pub fn handle_guarded( 486 live: &Repo, 487 body: &[u8], 488 limits: &PackLimits, 489 guard: &dyn ReceiveGuard, 490 seal: &dyn Fn(&[RefUpdate]), 491) -> Result<ReceiveOutcome, PackError> { 492 let parsed = pkt::split_receive(body)?; 493 let atomic = parsed.caps.atomic; 494 let side_band = parsed.caps.side_band_64k; 495 let push_options = parse_push_options(&parsed); 496 let build = |report: Vec<u8>| ReceiveOutcome { 497 report, 498 side_band, 499 push_options: push_options.clone(), 500 }; 501 let commands = parse_commands(&parsed); 502 if commands.is_empty() { 503 return report(&Ok(()), &[]).map(build); 504 } 505 506 let quarantine = match Quarantine::stage(live, parsed.pack, limits, live.object_format().kind()) 507 { 508 Ok(quarantine) => quarantine, 509 Err(error) => { 510 let results = all_failed(&commands, "unpacker error"); 511 return report(&Err(error), &results).map(build); 512 } 513 }; 514 let staged = quarantine.repo(); 515 stage_to_quarantine(staged, &commands); 516 let snapshot = match RefSnapshot::capture(live) { 517 Ok(snapshot) => snapshot, 518 Err(_) => { 519 let results = all_failed(&commands, "ref snapshot unavailable"); 520 return report(&Ok(()), &results).map(build); 521 } 522 }; 523 let haves = snapshot.tips(); 524 525 let verdicts = guard.authorize(staged, &commands); 526 let reasons: Vec<Option<String>> = if verdicts.len() == commands.len() { 527 commands 528 .iter() 529 .zip(verdicts) 530 .zip(connectivity_reasons(staged, &commands, &haves)) 531 .map(|((command, verdict), connectivity)| match verdict { 532 RefDecision::Reject(reason) => Some(reason), 533 RefDecision::Allow => reserved_create_only(command) 534 .or(connectivity) 535 .or_else(|| snapshot.conflict(command)), 536 }) 537 .collect() 538 } else { 539 commands 540 .iter() 541 .map(|_| Some("authorization unavailable".to_string())) 542 .collect() 543 }; 544 545 let any_reject = reasons.iter().any(Option::is_some); 546 if atomic && any_reject { 547 let results = commands 548 .iter() 549 .zip(reasons) 550 .map(|(command, reason)| { 551 ( 552 command.refname.clone(), 553 Some(reason.unwrap_or_else(|| "atomic push aborted".to_string())), 554 ) 555 }) 556 .collect::<Vec<_>>(); 557 return report(&Ok(()), &results).map(build); 558 } 559 560 if reasons.iter().any(Option::is_none) 561 && let Err(error) = quarantine.migrate_into(live) 562 { 563 let results = all_failed(&commands, "object migration failed"); 564 return report(&Err(error), &results).map(build); 565 } 566 567 let results = if atomic { 568 apply_guarded_atomic(live, &snapshot, &commands, seal) 569 } else { 570 commands 571 .iter() 572 .zip(reasons) 573 .map(|(command, reason)| match reason { 574 Some(reason) => (command.refname.clone(), Some(reason)), 575 None => ( 576 command.refname.clone(), 577 apply_guarded_update(live, command, seal), 578 ), 579 }) 580 .collect() 581 }; 582 report(&Ok(()), &results).map(build) 583}