Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.4 kB View raw
1use std::io; 2 3use gix_packetline::{Channel, blocking_io::encode}; 4 5pub const MAX_BAND: usize = 65515; 6 7pub enum Frame<'a> { 8 Data(&'a [u8]), 9 Flush, 10 Delim, 11 ResponseEnd, 12} 13 14fn hex4(prefix: &[u8]) -> io::Result<u16> { 15 std::str::from_utf8(prefix) 16 .ok() 17 .and_then(|text| u16::from_str_radix(text, 16).ok()) 18 .ok_or_else(|| io::Error::other("invalid pkt-line length prefix")) 19} 20 21pub fn frames( 22 input: &[u8], 23 stop_after_flushes: Option<usize>, 24) -> impl Iterator<Item = io::Result<(Frame<'_>, usize)>> + '_ { 25 let mut pos = 0usize; 26 let mut flushes = 0usize; 27 let mut stopped = false; 28 std::iter::from_fn(move || { 29 (!stopped && pos + 4 <= input.len()).then(|| { 30 let frame = hex4(&input[pos..pos + 4]).and_then(|len| { 31 pos += 4; 32 match len { 33 0 => { 34 flushes += 1; 35 stopped = stop_after_flushes == Some(flushes); 36 Ok((Frame::Flush, pos)) 37 } 38 1 => Ok((Frame::Delim, pos)), 39 2 => Ok((Frame::ResponseEnd, pos)), 40 3 => Err(io::Error::other("invalid pkt-line length 3")), 41 n => { 42 let end = pos - 4 + usize::from(n); 43 (end <= input.len()) 44 .then(|| { 45 let payload = &input[pos..end]; 46 pos = end; 47 (Frame::Data(payload), end) 48 }) 49 .ok_or_else(|| io::Error::other("truncated pkt-line")) 50 } 51 } 52 }); 53 stopped |= frame.is_err(); 54 frame 55 }) 56 }) 57} 58 59pub fn data_payloads(input: &[u8]) -> io::Result<Vec<&[u8]>> { 60 collect_data(input, Some(1)) 61} 62 63pub fn data_payloads_all(input: &[u8]) -> io::Result<Vec<&[u8]>> { 64 collect_data(input, None) 65} 66 67fn collect_data(input: &[u8], stop_after_flushes: Option<usize>) -> io::Result<Vec<&[u8]>> { 68 frames(input, stop_after_flushes) 69 .filter_map(|item| match item { 70 Ok((Frame::Data(payload), _)) => Some(Ok(payload)), 71 Ok(_) => None, 72 Err(err) => Some(Err(err)), 73 }) 74 .collect() 75} 76 77#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] 78pub struct Caps { 79 pub atomic: bool, 80 pub side_band_64k: bool, 81 pub push_options: bool, 82} 83 84pub(crate) fn first_command(input: &[u8]) -> Option<&[u8]> { 85 frames(input, Some(1)).find_map(|item| match item { 86 Ok((Frame::Data(payload), _)) => Some(payload), 87 _ => None, 88 }) 89} 90 91pub(crate) fn parse_caps(first_command: &[u8]) -> Caps { 92 let caps = first_command 93 .split(|byte| *byte == 0) 94 .nth(1) 95 .and_then(|caps| std::str::from_utf8(caps).ok()) 96 .unwrap_or_default(); 97 let has = |needle: &str| caps.split_whitespace().any(|cap| cap == needle); 98 Caps { 99 atomic: has("atomic"), 100 side_band_64k: has("side-band-64k"), 101 push_options: has("push-options"), 102 } 103} 104 105pub struct Receive<'a> { 106 pub commands: Vec<&'a [u8]>, 107 pub options: Vec<&'a [u8]>, 108 pub pack: &'a [u8], 109 pub caps: Caps, 110} 111 112pub fn split_receive(input: &[u8]) -> io::Result<Receive<'_>> { 113 let caps = first_command(input).map(parse_caps).unwrap_or_default(); 114 let boundary = if caps.push_options { 2 } else { 1 }; 115 frames(input, Some(boundary)) 116 .try_fold( 117 (Vec::new(), Vec::new(), 0usize, None), 118 |(mut commands, mut options, flushes, end), item| { 119 item.map(|(frame, at)| match frame { 120 Frame::Data(payload) if flushes == 0 => { 121 commands.push(payload); 122 (commands, options, flushes, end) 123 } 124 Frame::Data(payload) if flushes == 1 && caps.push_options => { 125 options.push(payload); 126 (commands, options, flushes, end) 127 } 128 Frame::Data(_) => (commands, options, flushes, end), 129 Frame::Flush => (commands, options, flushes + 1, Some(at)), 130 _ => (commands, options, flushes, end), 131 }) 132 }, 133 ) 134 .map(|(commands, options, _flushes, end)| Receive { 135 commands, 136 options, 137 caps, 138 pack: &input[end.unwrap_or(input.len())..], 139 }) 140} 141 142pub fn write_data(buf: &mut Vec<u8>, payload: &[u8]) -> io::Result<()> { 143 encode::data_to_write(payload, buf).map(|_| ()) 144} 145 146pub fn write_flush(buf: &mut Vec<u8>) -> io::Result<()> { 147 encode::flush_to_write(buf).map(|_| ()) 148} 149 150pub fn write_delim(buf: &mut Vec<u8>) -> io::Result<()> { 151 encode::delim_to_write(buf).map(|_| ()) 152} 153 154pub fn write_band(buf: &mut Vec<u8>, chunk: &[u8]) -> io::Result<()> { 155 encode::band_to_write(Channel::Data, chunk, buf).map(|_| ()) 156} 157 158pub fn write_band_progress(buf: &mut Vec<u8>, message: &[u8]) -> io::Result<()> { 159 encode::band_to_write(Channel::Progress, message, buf).map(|_| ()) 160} 161 162pub fn write_band_error(buf: &mut Vec<u8>, message: &[u8]) -> io::Result<()> { 163 encode::band_to_write(Channel::Error, message, buf).map(|_| ()) 164} 165 166pub fn frame_report(report: &[u8], messages: &[String], side_band: bool) -> Vec<u8> { 167 if !side_band { 168 return report.to_vec(); 169 } 170 let mut buf = Vec::new(); 171 report 172 .chunks(MAX_BAND) 173 .for_each(|chunk| write_band(&mut buf, chunk).expect("band write to vec never fails")); 174 messages.iter().for_each(|message| { 175 format!("{message}\n") 176 .into_bytes() 177 .chunks(MAX_BAND) 178 .for_each(|chunk| { 179 write_band_progress(&mut buf, chunk).expect("band write to vec never fails") 180 }); 181 }); 182 write_flush(&mut buf).expect("flush write to vec never fails"); 183 buf 184} 185 186#[cfg(test)] 187mod tests { 188 use super::*; 189 190 fn command_line(caps: &str) -> Vec<u8> { 191 let mut line = b"\ 192 0000000000000000000000000000000000000000 \ 193 1111111111111111111111111111111111111111 refs/heads/main" 194 .to_vec(); 195 line.push(0); 196 line.extend_from_slice(caps.as_bytes()); 197 line.push(b'\n'); 198 line 199 } 200 201 #[test] 202 fn a_malformed_length_prefix_terminates_instead_of_spinning() { 203 let garbage = b"zzzz this is not a pkt-line stream at all"; 204 assert_eq!( 205 frames(garbage, None).count(), 206 1, 207 "bad length prefix yields one error frame then the stream ends" 208 ); 209 assert!( 210 first_command(garbage).is_none(), 211 "no command is parsed out of garbage, and scan does not loop" 212 ); 213 assert!( 214 split_receive(garbage).is_err(), 215 "malformed prefix is a parse error, never an infinite loop" 216 ); 217 } 218 219 #[test] 220 fn split_receive_skips_the_push_options_section_before_the_pack() { 221 let mut body = Vec::new(); 222 write_data( 223 &mut body, 224 &command_line("report-status side-band-64k push-options"), 225 ) 226 .unwrap(); 227 write_flush(&mut body).unwrap(); 228 write_data(&mut body, b"ci-skip").unwrap(); 229 write_data(&mut body, b"verbose-ci").unwrap(); 230 write_flush(&mut body).unwrap(); 231 body.extend_from_slice(b"PACKreal-pack-bytes"); 232 233 let parsed = split_receive(&body).unwrap(); 234 assert!(parsed.caps.push_options); 235 assert!(parsed.caps.side_band_64k); 236 assert_eq!(parsed.commands.len(), 1); 237 assert_eq!(parsed.options, vec![&b"ci-skip"[..], &b"verbose-ci"[..]]); 238 assert_eq!(parsed.pack, b"PACKreal-pack-bytes"); 239 } 240 241 #[test] 242 fn split_receive_without_push_options_starts_the_pack_after_the_command_flush() { 243 let mut body = Vec::new(); 244 write_data(&mut body, &command_line("report-status side-band-64k")).unwrap(); 245 write_flush(&mut body).unwrap(); 246 body.extend_from_slice(b"PACKbytes"); 247 248 let parsed = split_receive(&body).unwrap(); 249 assert!(!parsed.caps.push_options); 250 assert!(parsed.options.is_empty()); 251 assert_eq!(parsed.pack, b"PACKbytes"); 252 } 253 254 #[test] 255 fn frame_report_muxes_the_report_on_band_one_and_messages_on_band_two() { 256 let report = b"unpack ok\n"; 257 let messages = vec!["hello there".to_string()]; 258 let framed = frame_report(report, &messages, true); 259 260 let bands: Vec<(u8, Vec<u8>)> = frames(&framed, None) 261 .filter_map(|item| match item { 262 Ok((Frame::Data(payload), _)) => Some((payload[0], payload[1..].to_vec())), 263 _ => None, 264 }) 265 .collect(); 266 assert_eq!(bands[0].0, 1, "report rides band 1"); 267 assert_eq!(bands[0].1, report); 268 assert_eq!(bands[1].0, 2, "message rides band 2"); 269 assert_eq!(bands[1].1, b"hello there\n"); 270 assert!(framed.ends_with(b"0000"), "outer flush closes the stream"); 271 } 272 273 #[test] 274 fn frame_report_passes_through_raw_without_side_band() { 275 let report = b"unpack ok\n0000"; 276 assert_eq!( 277 frame_report(report, &["dropped".to_string()], false), 278 report 279 ); 280 } 281}