Now let's take a silly one
1use std::collections::HashMap;
2
3use flate2::{Decompress, FlushDecompress, Status};
4use gix_pack::data::{Entry, entry::Header, header};
5
6use crate::error::{PackError, PackLimit};
7use crate::meter::{PackLimits, check_depth, malformed};
8use crate::pkt::{self, Frame};
9
10const HEADER_SLACK: usize = 64;
11
12fn pack_start(buf: &[u8]) -> Option<usize> {
13 let caps = pkt::first_command(buf)
14 .map(pkt::parse_caps)
15 .unwrap_or_default();
16 let boundary = if caps.push_options { 2 } else { 1 };
17 pkt::frames(buf, Some(boundary))
18 .filter_map(|item| match item {
19 Ok((Frame::Flush, at)) => Some(at),
20 _ => None,
21 })
22 .nth(boundary - 1)
23}
24
25fn new_oid_field(line: &[u8]) -> Option<&[u8]> {
26 let line = line.split(|byte| *byte == 0).next().unwrap_or(line);
27 line.split(|byte| *byte == b' ').nth(1)
28}
29
30fn no_pack_needed(buf: &[u8]) -> bool {
31 pkt::frames(buf, Some(1))
32 .filter_map(|item| match item {
33 Ok((Frame::Data(payload), _)) => Some(payload),
34 _ => None,
35 })
36 .all(|line| {
37 new_oid_field(line)
38 .map(|oid| oid.iter().all(|byte| *byte == b'0'))
39 .unwrap_or(false)
40 })
41}
42
43struct EntryInflate {
44 data_offset: u64,
45 decompressed_size: u64,
46 decompress: Decompress,
47 produced: u64,
48}
49
50impl EntryInflate {
51 fn feed(&mut self, pack: &[u8], scratch: &mut [u8]) -> Result<Option<u64>, PackError> {
52 loop {
53 let consumed = self.decompress.total_in() as usize;
54 let out_before = self.decompress.total_out();
55 let input = pack
56 .get(self.data_offset as usize + consumed..)
57 .unwrap_or_default();
58 let status = self
59 .decompress
60 .decompress(input, scratch, FlushDecompress::None)
61 .map_err(|error| PackError::Pack(format!("inflate: {error}")))?;
62 self.produced += self.decompress.total_out() - out_before;
63 if self.produced > self.decompressed_size {
64 return Err(malformed("object inflates beyond its declared size"));
65 }
66 match status {
67 Status::StreamEnd => {
68 return if self.produced == self.decompressed_size {
69 self.data_offset
70 .checked_add(self.decompress.total_in())
71 .map(Some)
72 .ok_or_else(|| malformed("pack offset overflow"))
73 } else {
74 Err(malformed("object decompressed size mismatch"))
75 };
76 }
77 Status::Ok | Status::BufError => {
78 if self.decompress.total_in() as usize == consumed
79 && self.decompress.total_out() == out_before
80 {
81 return Ok(None);
82 }
83 }
84 }
85 }
86 }
87}
88
89#[derive(Default)]
90struct PackProgress {
91 num_objects: u32,
92 objects_done: u32,
93 next_offset: u64,
94 total_decompressed: u64,
95 base_of: HashMap<u64, u64>,
96 current: Option<EntryInflate>,
97 depth_checked: bool,
98}
99
100impl PackProgress {
101 fn scan(
102 &mut self,
103 pack: &[u8],
104 limits: &PackLimits,
105 kind: gix::hash::Kind,
106 ) -> Result<Option<usize>, PackError> {
107 let hash_len = kind.len_in_bytes();
108 let mut scratch = [0u8; 8192];
109 loop {
110 if self.objects_done == self.num_objects {
111 if !self.depth_checked {
112 check_depth(&self.base_of, limits.max_delta_depth)?;
113 self.depth_checked = true;
114 }
115 let total_len = (self.next_offset as usize)
116 .checked_add(hash_len)
117 .ok_or_else(|| malformed("pack length overflow"))?;
118 return Ok((pack.len() >= total_len).then_some(total_len));
119 }
120 match self.current.as_mut() {
121 Some(entry) => match entry.feed(pack, &mut scratch)? {
122 Some(next_offset) => {
123 self.next_offset = next_offset;
124 self.objects_done += 1;
125 self.current = None;
126 }
127 None => return Ok(None),
128 },
129 None => {
130 let start = self.next_offset as usize;
131 let Some(mut reader) = pack.get(start..) else {
132 return Ok(None);
133 };
134 let entry = match Entry::from_read(&mut reader, self.next_offset, hash_len) {
135 Ok(entry) => entry,
136 Err(error) => {
137 return if pack.len().saturating_sub(start) < HEADER_SLACK {
138 Ok(None)
139 } else {
140 Err(PackError::Pack(error.to_string()))
141 };
142 }
143 };
144 if entry.decompressed_size > limits.max_object_bytes {
145 return Err(PackError::LimitExceeded(PackLimit::ObjectBytes));
146 }
147 self.total_decompressed = self
148 .total_decompressed
149 .checked_add(entry.decompressed_size)
150 .ok_or_else(|| malformed("decompressed size overflow"))?;
151 if self.total_decompressed > limits.max_total_bytes {
152 return Err(PackError::LimitExceeded(PackLimit::TotalBytes));
153 }
154 if let Header::OfsDelta { base_distance } = entry.header {
155 let base = entry
156 .checked_base_pack_offset(base_distance)
157 .ok_or_else(|| malformed("ofs-delta base out of range"))?;
158 self.base_of.insert(self.next_offset, base);
159 }
160 self.current = Some(EntryInflate {
161 data_offset: entry.data_offset,
162 decompressed_size: entry.decompressed_size,
163 decompress: Decompress::new(true),
164 produced: 0,
165 });
166 }
167 }
168 }
169 }
170}
171
172pub struct ReceiveFramer {
173 limits: PackLimits,
174 kind: gix::hash::Kind,
175 pack_start: Option<usize>,
176 pack: Option<PackProgress>,
177}
178
179impl ReceiveFramer {
180 pub fn new(limits: PackLimits, kind: gix::hash::Kind) -> Self {
181 Self {
182 limits,
183 kind,
184 pack_start: None,
185 pack: None,
186 }
187 }
188
189 pub fn advance(&mut self, buf: &[u8]) -> Result<Option<usize>, PackError> {
190 let pack_start = match self.pack_start {
191 Some(start) => start,
192 None => match pack_start(buf) {
193 Some(start) => {
194 self.pack_start = Some(start);
195 start
196 }
197 None => return Ok(None),
198 },
199 };
200 let pack = &buf[pack_start..];
201 if self.pack.is_none() {
202 if pack.is_empty() {
203 return Ok(no_pack_needed(buf).then_some(pack_start));
204 }
205 if pack.len() < 12 {
206 return Ok(None);
207 }
208 if !pack.starts_with(b"PACK") {
209 return Err(malformed("packfile is missing its PACK signature"));
210 }
211 let head: [u8; 12] = pack[..12].try_into().expect("checked length");
212 let (_version, num_objects) =
213 header::decode(&head).map_err(|error| PackError::Pack(error.to_string()))?;
214 if num_objects > self.limits.max_objects {
215 return Err(PackError::LimitExceeded(PackLimit::Objects));
216 }
217 self.pack = Some(PackProgress {
218 num_objects,
219 next_offset: 12,
220 ..PackProgress::default()
221 });
222 }
223 let kind = self.kind;
224 self.pack
225 .as_mut()
226 .expect("pack progress initialized")
227 .scan(pack, &self.limits, kind)
228 .map(|done| done.map(|len| pack_start + len))
229 }
230}
231
232pub fn receive_request_complete(
233 buf: &[u8],
234 limits: &PackLimits,
235 kind: gix::hash::Kind,
236) -> Result<Option<usize>, PackError> {
237 ReceiveFramer::new(*limits, kind).advance(buf)
238}
239
240pub fn archive_request_complete(buf: &[u8]) -> Option<usize> {
241 pkt::frames(buf, Some(1)).find_map(|item| match item {
242 Ok((Frame::Flush, end)) => Some(end),
243 _ => None,
244 })
245}
246
247#[derive(Default)]
248pub struct UploadFramer {
249 scanned: usize,
250 v2: bool,
251 flushes: usize,
252 complete: Option<usize>,
253}
254
255impl UploadFramer {
256 pub fn new() -> Self {
257 Self::default()
258 }
259
260 pub fn advance(&mut self, buf: &[u8]) -> Option<usize> {
261 if self.complete.is_some() {
262 return self.complete;
263 }
264 let base = self.scanned;
265 for item in pkt::frames(&buf[base..], None) {
266 let Ok((frame, at)) = item else { break };
267 let boundary = base + at;
268 match frame {
269 Frame::Data(payload) => {
270 if payload.starts_with(b"command=") {
271 self.v2 = true;
272 }
273 let trimmed = payload
274 .iter()
275 .rposition(|byte| !byte.is_ascii_whitespace())
276 .map(|end| &payload[..=end])
277 .unwrap_or(payload);
278 if !self.v2 && trimmed == b"done" {
279 self.complete = Some(boundary);
280 return self.complete;
281 }
282 }
283 Frame::Flush => {
284 self.flushes += 1;
285 if self.v2 {
286 self.complete = Some(boundary);
287 return self.complete;
288 }
289 }
290 _ => {}
291 }
292 self.scanned = boundary;
293 }
294 None
295 }
296
297 pub fn unanswered_flushes(&self) -> usize {
298 self.flushes.saturating_sub(1)
299 }
300}
301
302pub fn upload_v0_nak() -> Vec<u8> {
303 let mut buf = Vec::new();
304 pkt::write_data(&mut buf, b"NAK\n").expect("write to in-memory buffer never fails");
305 buf
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311
312 fn v2_fetch_request() -> Vec<u8> {
313 let mut buf = Vec::new();
314 pkt::write_data(&mut buf, b"command=fetch\n").unwrap();
315 pkt::write_delim(&mut buf).unwrap();
316 pkt::write_data(&mut buf, b"want 1111111111111111111111111111111111111111\n").unwrap();
317 pkt::write_data(&mut buf, b"want 2222222222222222222222222222222222222222\n").unwrap();
318 pkt::write_data(&mut buf, b"done\n").unwrap();
319 pkt::write_flush(&mut buf).unwrap();
320 buf
321 }
322
323 #[test]
324 fn upload_framer_completes_a_v2_request_at_the_terminating_flush() {
325 let request = v2_fetch_request();
326 assert_eq!(UploadFramer::new().advance(&request), Some(request.len()));
327 }
328
329 #[test]
330 fn upload_framer_fed_one_byte_at_a_time_never_overruns_the_buffer() {
331 let request = v2_fetch_request();
332 let mut framer = UploadFramer::new();
333 let mut buf = Vec::new();
334 let mut completed = None;
335 for byte in &request {
336 buf.push(*byte);
337 if let Some(len) = framer.advance(&buf) {
338 assert!(
339 len <= buf.len(),
340 "advance returned {len} past buffer of {}",
341 buf.len()
342 );
343 completed = Some(len);
344 break;
345 }
346 }
347 assert_eq!(
348 completed,
349 Some(request.len()),
350 "incrementally fed request completes exactly once the whole buffer has arrived"
351 );
352 }
353
354 #[test]
355 fn upload_framer_counts_v0_have_batch_flushes_without_completing() {
356 let mut buf = Vec::new();
357 pkt::write_data(&mut buf, b"want 1111111111111111111111111111111111111111\n").unwrap();
358 pkt::write_flush(&mut buf).unwrap();
359 pkt::write_data(&mut buf, b"have 2222222222222222222222222222222222222222\n").unwrap();
360 pkt::write_flush(&mut buf).unwrap();
361 let mut framer = UploadFramer::new();
362 assert_eq!(framer.advance(&buf), None, "v0 request is open until done");
363 assert_eq!(
364 framer.unanswered_flushes(),
365 1,
366 "two flushes seen, one have-batch awaits NAK"
367 );
368 }
369}