Now let's take a silly one
1use std::io;
2
3use gix_packetline::{Channel, blocking_io::encode};
4
5pub const MAX_BAND: usize = 65515;
6
7pub enum Frame<'a> {
8 Data(&'a [u8]),
9 Flush,
10 Delim,
11 ResponseEnd,
12}
13
14fn hex4(prefix: &[u8]) -> io::Result<u16> {
15 std::str::from_utf8(prefix)
16 .ok()
17 .and_then(|text| u16::from_str_radix(text, 16).ok())
18 .ok_or_else(|| io::Error::other("invalid pkt-line length prefix"))
19}
20
21pub fn frames(
22 input: &[u8],
23 stop_after_flushes: Option<usize>,
24) -> impl Iterator<Item = io::Result<(Frame<'_>, usize)>> + '_ {
25 let mut pos = 0usize;
26 let mut flushes = 0usize;
27 let mut stopped = false;
28 std::iter::from_fn(move || {
29 (!stopped && pos + 4 <= input.len()).then(|| {
30 let frame = hex4(&input[pos..pos + 4]).and_then(|len| {
31 pos += 4;
32 match len {
33 0 => {
34 flushes += 1;
35 stopped = stop_after_flushes == Some(flushes);
36 Ok((Frame::Flush, pos))
37 }
38 1 => Ok((Frame::Delim, pos)),
39 2 => Ok((Frame::ResponseEnd, pos)),
40 3 => Err(io::Error::other("invalid pkt-line length 3")),
41 n => {
42 let end = pos - 4 + usize::from(n);
43 (end <= input.len())
44 .then(|| {
45 let payload = &input[pos..end];
46 pos = end;
47 (Frame::Data(payload), end)
48 })
49 .ok_or_else(|| io::Error::other("truncated pkt-line"))
50 }
51 }
52 });
53 stopped |= frame.is_err();
54 frame
55 })
56 })
57}
58
59pub fn data_payloads(input: &[u8]) -> io::Result<Vec<&[u8]>> {
60 collect_data(input, Some(1))
61}
62
63pub fn data_payloads_all(input: &[u8]) -> io::Result<Vec<&[u8]>> {
64 collect_data(input, None)
65}
66
67fn collect_data(input: &[u8], stop_after_flushes: Option<usize>) -> io::Result<Vec<&[u8]>> {
68 frames(input, stop_after_flushes)
69 .filter_map(|item| match item {
70 Ok((Frame::Data(payload), _)) => Some(Ok(payload)),
71 Ok(_) => None,
72 Err(err) => Some(Err(err)),
73 })
74 .collect()
75}
76
77#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
78pub struct Caps {
79 pub atomic: bool,
80 pub side_band_64k: bool,
81 pub push_options: bool,
82}
83
84pub(crate) fn first_command(input: &[u8]) -> Option<&[u8]> {
85 frames(input, Some(1)).find_map(|item| match item {
86 Ok((Frame::Data(payload), _)) => Some(payload),
87 _ => None,
88 })
89}
90
91pub(crate) fn parse_caps(first_command: &[u8]) -> Caps {
92 let caps = first_command
93 .split(|byte| *byte == 0)
94 .nth(1)
95 .and_then(|caps| std::str::from_utf8(caps).ok())
96 .unwrap_or_default();
97 let has = |needle: &str| caps.split_whitespace().any(|cap| cap == needle);
98 Caps {
99 atomic: has("atomic"),
100 side_band_64k: has("side-band-64k"),
101 push_options: has("push-options"),
102 }
103}
104
105pub struct Receive<'a> {
106 pub commands: Vec<&'a [u8]>,
107 pub options: Vec<&'a [u8]>,
108 pub pack: &'a [u8],
109 pub caps: Caps,
110}
111
112pub fn split_receive(input: &[u8]) -> io::Result<Receive<'_>> {
113 let caps = first_command(input).map(parse_caps).unwrap_or_default();
114 let boundary = if caps.push_options { 2 } else { 1 };
115 frames(input, Some(boundary))
116 .try_fold(
117 (Vec::new(), Vec::new(), 0usize, None),
118 |(mut commands, mut options, flushes, end), item| {
119 item.map(|(frame, at)| match frame {
120 Frame::Data(payload) if flushes == 0 => {
121 commands.push(payload);
122 (commands, options, flushes, end)
123 }
124 Frame::Data(payload) if flushes == 1 && caps.push_options => {
125 options.push(payload);
126 (commands, options, flushes, end)
127 }
128 Frame::Data(_) => (commands, options, flushes, end),
129 Frame::Flush => (commands, options, flushes + 1, Some(at)),
130 _ => (commands, options, flushes, end),
131 })
132 },
133 )
134 .map(|(commands, options, _flushes, end)| Receive {
135 commands,
136 options,
137 caps,
138 pack: &input[end.unwrap_or(input.len())..],
139 })
140}
141
142pub fn write_data(buf: &mut Vec<u8>, payload: &[u8]) -> io::Result<()> {
143 encode::data_to_write(payload, buf).map(|_| ())
144}
145
146pub fn write_flush(buf: &mut Vec<u8>) -> io::Result<()> {
147 encode::flush_to_write(buf).map(|_| ())
148}
149
150pub fn write_delim(buf: &mut Vec<u8>) -> io::Result<()> {
151 encode::delim_to_write(buf).map(|_| ())
152}
153
154pub fn write_band(buf: &mut Vec<u8>, chunk: &[u8]) -> io::Result<()> {
155 encode::band_to_write(Channel::Data, chunk, buf).map(|_| ())
156}
157
158pub fn write_band_progress(buf: &mut Vec<u8>, message: &[u8]) -> io::Result<()> {
159 encode::band_to_write(Channel::Progress, message, buf).map(|_| ())
160}
161
162pub fn write_band_error(buf: &mut Vec<u8>, message: &[u8]) -> io::Result<()> {
163 encode::band_to_write(Channel::Error, message, buf).map(|_| ())
164}
165
166pub fn frame_report(report: &[u8], messages: &[String], side_band: bool) -> Vec<u8> {
167 if !side_band {
168 return report.to_vec();
169 }
170 let mut buf = Vec::new();
171 report
172 .chunks(MAX_BAND)
173 .for_each(|chunk| write_band(&mut buf, chunk).expect("band write to vec never fails"));
174 messages.iter().for_each(|message| {
175 format!("{message}\n")
176 .into_bytes()
177 .chunks(MAX_BAND)
178 .for_each(|chunk| {
179 write_band_progress(&mut buf, chunk).expect("band write to vec never fails")
180 });
181 });
182 write_flush(&mut buf).expect("flush write to vec never fails");
183 buf
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 fn command_line(caps: &str) -> Vec<u8> {
191 let mut line = b"\
192 0000000000000000000000000000000000000000 \
193 1111111111111111111111111111111111111111 refs/heads/main"
194 .to_vec();
195 line.push(0);
196 line.extend_from_slice(caps.as_bytes());
197 line.push(b'\n');
198 line
199 }
200
201 #[test]
202 fn a_malformed_length_prefix_terminates_instead_of_spinning() {
203 let garbage = b"zzzz this is not a pkt-line stream at all";
204 assert_eq!(
205 frames(garbage, None).count(),
206 1,
207 "bad length prefix yields one error frame then the stream ends"
208 );
209 assert!(
210 first_command(garbage).is_none(),
211 "no command is parsed out of garbage, and scan does not loop"
212 );
213 assert!(
214 split_receive(garbage).is_err(),
215 "malformed prefix is a parse error, never an infinite loop"
216 );
217 }
218
219 #[test]
220 fn split_receive_skips_the_push_options_section_before_the_pack() {
221 let mut body = Vec::new();
222 write_data(
223 &mut body,
224 &command_line("report-status side-band-64k push-options"),
225 )
226 .unwrap();
227 write_flush(&mut body).unwrap();
228 write_data(&mut body, b"ci-skip").unwrap();
229 write_data(&mut body, b"verbose-ci").unwrap();
230 write_flush(&mut body).unwrap();
231 body.extend_from_slice(b"PACKreal-pack-bytes");
232
233 let parsed = split_receive(&body).unwrap();
234 assert!(parsed.caps.push_options);
235 assert!(parsed.caps.side_band_64k);
236 assert_eq!(parsed.commands.len(), 1);
237 assert_eq!(parsed.options, vec![&b"ci-skip"[..], &b"verbose-ci"[..]]);
238 assert_eq!(parsed.pack, b"PACKreal-pack-bytes");
239 }
240
241 #[test]
242 fn split_receive_without_push_options_starts_the_pack_after_the_command_flush() {
243 let mut body = Vec::new();
244 write_data(&mut body, &command_line("report-status side-band-64k")).unwrap();
245 write_flush(&mut body).unwrap();
246 body.extend_from_slice(b"PACKbytes");
247
248 let parsed = split_receive(&body).unwrap();
249 assert!(!parsed.caps.push_options);
250 assert!(parsed.options.is_empty());
251 assert_eq!(parsed.pack, b"PACKbytes");
252 }
253
254 #[test]
255 fn frame_report_muxes_the_report_on_band_one_and_messages_on_band_two() {
256 let report = b"unpack ok\n";
257 let messages = vec!["hello there".to_string()];
258 let framed = frame_report(report, &messages, true);
259
260 let bands: Vec<(u8, Vec<u8>)> = frames(&framed, None)
261 .filter_map(|item| match item {
262 Ok((Frame::Data(payload), _)) => Some((payload[0], payload[1..].to_vec())),
263 _ => None,
264 })
265 .collect();
266 assert_eq!(bands[0].0, 1, "report rides band 1");
267 assert_eq!(bands[0].1, report);
268 assert_eq!(bands[1].0, 2, "message rides band 2");
269 assert_eq!(bands[1].1, b"hello there\n");
270 assert!(framed.ends_with(b"0000"), "outer flush closes the stream");
271 }
272
273 #[test]
274 fn frame_report_passes_through_raw_without_side_band() {
275 let report = b"unpack ok\n0000";
276 assert_eq!(
277 frame_report(report, &["dropped".to_string()], false),
278 report
279 );
280 }
281}