Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 13 kB View raw
1use std::collections::HashMap; 2 3use flate2::{Decompress, FlushDecompress, Status}; 4use gix_pack::data::{Entry, entry::Header, header}; 5 6use crate::error::{PackError, PackLimit}; 7use crate::meter::{PackLimits, check_depth, malformed}; 8use crate::pkt::{self, Frame}; 9 10const HEADER_SLACK: usize = 64; 11 12fn pack_start(buf: &[u8]) -> Option<usize> { 13 let caps = pkt::first_command(buf) 14 .map(pkt::parse_caps) 15 .unwrap_or_default(); 16 let boundary = if caps.push_options { 2 } else { 1 }; 17 pkt::frames(buf, Some(boundary)) 18 .filter_map(|item| match item { 19 Ok((Frame::Flush, at)) => Some(at), 20 _ => None, 21 }) 22 .nth(boundary - 1) 23} 24 25fn new_oid_field(line: &[u8]) -> Option<&[u8]> { 26 let line = line.split(|byte| *byte == 0).next().unwrap_or(line); 27 line.split(|byte| *byte == b' ').nth(1) 28} 29 30fn no_pack_needed(buf: &[u8]) -> bool { 31 pkt::frames(buf, Some(1)) 32 .filter_map(|item| match item { 33 Ok((Frame::Data(payload), _)) => Some(payload), 34 _ => None, 35 }) 36 .all(|line| { 37 new_oid_field(line) 38 .map(|oid| oid.iter().all(|byte| *byte == b'0')) 39 .unwrap_or(false) 40 }) 41} 42 43struct EntryInflate { 44 data_offset: u64, 45 decompressed_size: u64, 46 decompress: Decompress, 47 produced: u64, 48} 49 50impl EntryInflate { 51 fn feed(&mut self, pack: &[u8], scratch: &mut [u8]) -> Result<Option<u64>, PackError> { 52 loop { 53 let consumed = self.decompress.total_in() as usize; 54 let out_before = self.decompress.total_out(); 55 let input = pack 56 .get(self.data_offset as usize + consumed..) 57 .unwrap_or_default(); 58 let status = self 59 .decompress 60 .decompress(input, scratch, FlushDecompress::None) 61 .map_err(|error| PackError::Pack(format!("inflate: {error}")))?; 62 self.produced += self.decompress.total_out() - out_before; 63 if self.produced > self.decompressed_size { 64 return Err(malformed("object inflates beyond its declared size")); 65 } 66 match status { 67 Status::StreamEnd => { 68 return if self.produced == self.decompressed_size { 69 self.data_offset 70 .checked_add(self.decompress.total_in()) 71 .map(Some) 72 .ok_or_else(|| malformed("pack offset overflow")) 73 } else { 74 Err(malformed("object decompressed size mismatch")) 75 }; 76 } 77 Status::Ok | Status::BufError => { 78 if self.decompress.total_in() as usize == consumed 79 && self.decompress.total_out() == out_before 80 { 81 return Ok(None); 82 } 83 } 84 } 85 } 86 } 87} 88 89#[derive(Default)] 90struct PackProgress { 91 num_objects: u32, 92 objects_done: u32, 93 next_offset: u64, 94 total_decompressed: u64, 95 base_of: HashMap<u64, u64>, 96 current: Option<EntryInflate>, 97 depth_checked: bool, 98} 99 100impl PackProgress { 101 fn scan( 102 &mut self, 103 pack: &[u8], 104 limits: &PackLimits, 105 kind: gix::hash::Kind, 106 ) -> Result<Option<usize>, PackError> { 107 let hash_len = kind.len_in_bytes(); 108 let mut scratch = [0u8; 8192]; 109 loop { 110 if self.objects_done == self.num_objects { 111 if !self.depth_checked { 112 check_depth(&self.base_of, limits.max_delta_depth)?; 113 self.depth_checked = true; 114 } 115 let total_len = (self.next_offset as usize) 116 .checked_add(hash_len) 117 .ok_or_else(|| malformed("pack length overflow"))?; 118 return Ok((pack.len() >= total_len).then_some(total_len)); 119 } 120 match self.current.as_mut() { 121 Some(entry) => match entry.feed(pack, &mut scratch)? { 122 Some(next_offset) => { 123 self.next_offset = next_offset; 124 self.objects_done += 1; 125 self.current = None; 126 } 127 None => return Ok(None), 128 }, 129 None => { 130 let start = self.next_offset as usize; 131 let Some(mut reader) = pack.get(start..) else { 132 return Ok(None); 133 }; 134 let entry = match Entry::from_read(&mut reader, self.next_offset, hash_len) { 135 Ok(entry) => entry, 136 Err(error) => { 137 return if pack.len().saturating_sub(start) < HEADER_SLACK { 138 Ok(None) 139 } else { 140 Err(PackError::Pack(error.to_string())) 141 }; 142 } 143 }; 144 if entry.decompressed_size > limits.max_object_bytes { 145 return Err(PackError::LimitExceeded(PackLimit::ObjectBytes)); 146 } 147 self.total_decompressed = self 148 .total_decompressed 149 .checked_add(entry.decompressed_size) 150 .ok_or_else(|| malformed("decompressed size overflow"))?; 151 if self.total_decompressed > limits.max_total_bytes { 152 return Err(PackError::LimitExceeded(PackLimit::TotalBytes)); 153 } 154 if let Header::OfsDelta { base_distance } = entry.header { 155 let base = entry 156 .checked_base_pack_offset(base_distance) 157 .ok_or_else(|| malformed("ofs-delta base out of range"))?; 158 self.base_of.insert(self.next_offset, base); 159 } 160 self.current = Some(EntryInflate { 161 data_offset: entry.data_offset, 162 decompressed_size: entry.decompressed_size, 163 decompress: Decompress::new(true), 164 produced: 0, 165 }); 166 } 167 } 168 } 169 } 170} 171 172pub struct ReceiveFramer { 173 limits: PackLimits, 174 kind: gix::hash::Kind, 175 pack_start: Option<usize>, 176 pack: Option<PackProgress>, 177} 178 179impl ReceiveFramer { 180 pub fn new(limits: PackLimits, kind: gix::hash::Kind) -> Self { 181 Self { 182 limits, 183 kind, 184 pack_start: None, 185 pack: None, 186 } 187 } 188 189 pub fn advance(&mut self, buf: &[u8]) -> Result<Option<usize>, PackError> { 190 let pack_start = match self.pack_start { 191 Some(start) => start, 192 None => match pack_start(buf) { 193 Some(start) => { 194 self.pack_start = Some(start); 195 start 196 } 197 None => return Ok(None), 198 }, 199 }; 200 let pack = &buf[pack_start..]; 201 if self.pack.is_none() { 202 if pack.is_empty() { 203 return Ok(no_pack_needed(buf).then_some(pack_start)); 204 } 205 if pack.len() < 12 { 206 return Ok(None); 207 } 208 if !pack.starts_with(b"PACK") { 209 return Err(malformed("packfile is missing its PACK signature")); 210 } 211 let head: [u8; 12] = pack[..12].try_into().expect("checked length"); 212 let (_version, num_objects) = 213 header::decode(&head).map_err(|error| PackError::Pack(error.to_string()))?; 214 if num_objects > self.limits.max_objects { 215 return Err(PackError::LimitExceeded(PackLimit::Objects)); 216 } 217 self.pack = Some(PackProgress { 218 num_objects, 219 next_offset: 12, 220 ..PackProgress::default() 221 }); 222 } 223 let kind = self.kind; 224 self.pack 225 .as_mut() 226 .expect("pack progress initialized") 227 .scan(pack, &self.limits, kind) 228 .map(|done| done.map(|len| pack_start + len)) 229 } 230} 231 232pub fn receive_request_complete( 233 buf: &[u8], 234 limits: &PackLimits, 235 kind: gix::hash::Kind, 236) -> Result<Option<usize>, PackError> { 237 ReceiveFramer::new(*limits, kind).advance(buf) 238} 239 240pub fn archive_request_complete(buf: &[u8]) -> Option<usize> { 241 pkt::frames(buf, Some(1)).find_map(|item| match item { 242 Ok((Frame::Flush, end)) => Some(end), 243 _ => None, 244 }) 245} 246 247#[derive(Default)] 248pub struct UploadFramer { 249 scanned: usize, 250 v2: bool, 251 flushes: usize, 252 complete: Option<usize>, 253} 254 255impl UploadFramer { 256 pub fn new() -> Self { 257 Self::default() 258 } 259 260 pub fn advance(&mut self, buf: &[u8]) -> Option<usize> { 261 if self.complete.is_some() { 262 return self.complete; 263 } 264 let base = self.scanned; 265 for item in pkt::frames(&buf[base..], None) { 266 let Ok((frame, at)) = item else { break }; 267 let boundary = base + at; 268 match frame { 269 Frame::Data(payload) => { 270 if payload.starts_with(b"command=") { 271 self.v2 = true; 272 } 273 let trimmed = payload 274 .iter() 275 .rposition(|byte| !byte.is_ascii_whitespace()) 276 .map(|end| &payload[..=end]) 277 .unwrap_or(payload); 278 if !self.v2 && trimmed == b"done" { 279 self.complete = Some(boundary); 280 return self.complete; 281 } 282 } 283 Frame::Flush => { 284 self.flushes += 1; 285 if self.v2 { 286 self.complete = Some(boundary); 287 return self.complete; 288 } 289 } 290 _ => {} 291 } 292 self.scanned = boundary; 293 } 294 None 295 } 296 297 pub fn unanswered_flushes(&self) -> usize { 298 self.flushes.saturating_sub(1) 299 } 300} 301 302pub fn upload_v0_nak() -> Vec<u8> { 303 let mut buf = Vec::new(); 304 pkt::write_data(&mut buf, b"NAK\n").expect("write to in-memory buffer never fails"); 305 buf 306} 307 308#[cfg(test)] 309mod tests { 310 use super::*; 311 312 fn v2_fetch_request() -> Vec<u8> { 313 let mut buf = Vec::new(); 314 pkt::write_data(&mut buf, b"command=fetch\n").unwrap(); 315 pkt::write_delim(&mut buf).unwrap(); 316 pkt::write_data(&mut buf, b"want 1111111111111111111111111111111111111111\n").unwrap(); 317 pkt::write_data(&mut buf, b"want 2222222222222222222222222222222222222222\n").unwrap(); 318 pkt::write_data(&mut buf, b"done\n").unwrap(); 319 pkt::write_flush(&mut buf).unwrap(); 320 buf 321 } 322 323 #[test] 324 fn upload_framer_completes_a_v2_request_at_the_terminating_flush() { 325 let request = v2_fetch_request(); 326 assert_eq!(UploadFramer::new().advance(&request), Some(request.len())); 327 } 328 329 #[test] 330 fn upload_framer_fed_one_byte_at_a_time_never_overruns_the_buffer() { 331 let request = v2_fetch_request(); 332 let mut framer = UploadFramer::new(); 333 let mut buf = Vec::new(); 334 let mut completed = None; 335 for byte in &request { 336 buf.push(*byte); 337 if let Some(len) = framer.advance(&buf) { 338 assert!( 339 len <= buf.len(), 340 "advance returned {len} past buffer of {}", 341 buf.len() 342 ); 343 completed = Some(len); 344 break; 345 } 346 } 347 assert_eq!( 348 completed, 349 Some(request.len()), 350 "incrementally fed request completes exactly once the whole buffer has arrived" 351 ); 352 } 353 354 #[test] 355 fn upload_framer_counts_v0_have_batch_flushes_without_completing() { 356 let mut buf = Vec::new(); 357 pkt::write_data(&mut buf, b"want 1111111111111111111111111111111111111111\n").unwrap(); 358 pkt::write_flush(&mut buf).unwrap(); 359 pkt::write_data(&mut buf, b"have 2222222222222222222222222222222222222222\n").unwrap(); 360 pkt::write_flush(&mut buf).unwrap(); 361 let mut framer = UploadFramer::new(); 362 assert_eq!(framer.advance(&buf), None, "v0 request is open until done"); 363 assert_eq!( 364 framer.unanswered_flushes(), 365 1, 366 "two flushes seen, one have-batch awaits NAK" 367 ); 368 } 369}