Now let's take a silly one
1use std::collections::HashSet;
2use std::error::Error;
3use std::io::Write;
4use std::path::Path;
5use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
6use std::sync::{Arc, mpsc};
7use std::time::{Duration, Instant};
8
9use gix::ObjectId;
10use gix::prelude::FindExt;
11use gix::progress::Discard;
12use gix_pack::data::{Version, output};
13use knot_types::Oid;
14
15use crate::error::PackError;
16use crate::meter::{self, PackLimits};
17use crate::resolve;
18
19type OidStream = Box<dyn Iterator<Item = Result<ObjectId, Box<dyn Error + Send + Sync>>> + Send>;
20
21fn odb_at(objects_dir: &Path, kind: gix::hash::Kind) -> Result<gix::odb::Handle, PackError> {
22 gix::odb::at_opts(
23 objects_dir,
24 std::iter::empty(),
25 gix::odb::store::init::Options {
26 object_hash: kind,
27 ..Default::default()
28 },
29 )
30 .map_err(|error| PackError::Pack(error.to_string()))
31}
32
33pub fn write_pack(
34 objects_dir: &Path,
35 oids: Vec<Oid>,
36 thin_bases: Option<&HashSet<Oid>>,
37 out: &mut dyn Write,
38 kind: gix::hash::Kind,
39) -> Result<(), PackError> {
40 let mut odb = odb_at(objects_dir, kind)?;
41 odb.prevent_pack_unload();
42 odb.refresh_never();
43
44 let interrupt = AtomicBool::new(false);
45 let permitted_bases: Option<HashSet<ObjectId>> =
46 thin_bases.map(|bases| bases.iter().map(|oid| oid.object_id()).collect());
47 let oids: OidStream = Box::new(oids.into_iter().map(|oid| Ok(oid.object_id())));
48
49 let (counts, _) = output::count::objects(
50 odb.clone(),
51 oids,
52 &Discard,
53 &interrupt,
54 output::count::objects::Options {
55 thread_limit: Some(1),
56 chunk_size: 50,
57 input_object_expansion: output::count::objects::ObjectExpansion::AsIs,
58 },
59 )
60 .map_err(|error| PackError::Pack(error.to_string()))?;
61
62 write_counts(counts, odb, permitted_bases, out, kind)
63}
64
65pub struct ExpandedPack {
66 counts: Vec<output::Count>,
67 odb: gix::odb::Handle,
68 kind: gix::hash::Kind,
69}
70
71impl ExpandedPack {
72 pub fn len(&self) -> usize {
73 self.counts.len()
74 }
75
76 pub fn is_empty(&self) -> bool {
77 self.counts.is_empty()
78 }
79}
80
81pub fn count_expanded(
82 objects_dir: &Path,
83 roots: Vec<Oid>,
84 max_objects: usize,
85 deadline: Instant,
86 kind: gix::hash::Kind,
87) -> Result<ExpandedPack, PackError> {
88 let mut odb = odb_at(objects_dir, kind)?;
89 odb.prevent_pack_unload();
90 odb.refresh_never();
91
92 let interrupt = Arc::new(AtomicBool::new(false));
93 let counter = Arc::new(AtomicUsize::new(0));
94 let progress = SharedCount(Arc::clone(&counter));
95 let oids: OidStream = Box::new(roots.into_iter().map(|oid| Ok(oid.object_id())));
96 let result = {
97 let _watchdog = Watchdog::arm(
98 interrupt.clone(),
99 Arc::clone(&counter),
100 max_objects,
101 deadline,
102 );
103 output::count::objects(
104 Interruptible {
105 inner: odb.clone(),
106 flag: Arc::clone(&interrupt),
107 },
108 oids,
109 &progress,
110 &interrupt,
111 output::count::objects::Options {
112 thread_limit: Some(1),
113 chunk_size: 50,
114 input_object_expansion: output::count::objects::ObjectExpansion::TreeContents,
115 },
116 )
117 };
118 let over_cap = counter.load(Ordering::Relaxed) > max_objects;
119 let timed_out = interrupt.load(Ordering::Relaxed);
120 let counts = match result {
121 Ok((counts, _)) => counts,
122 Err(_) if over_cap => return Err(PackError::SelectionTooLarge),
123 Err(_) if timed_out => return Err(PackError::SelectionTimeout),
124 Err(error) => return Err(PackError::Pack(error.to_string())),
125 };
126 if counts.len() > max_objects {
127 return Err(PackError::SelectionTooLarge);
128 }
129 if timed_out {
130 return Err(PackError::SelectionTimeout);
131 }
132 Ok(ExpandedPack { counts, odb, kind })
133}
134
135pub fn write_expanded(pack: ExpandedPack, out: &mut dyn Write) -> Result<(), PackError> {
136 write_counts(pack.counts, pack.odb, None, out, pack.kind)
137}
138
139fn write_counts(
140 counts: Vec<output::Count>,
141 odb: gix::odb::Handle,
142 permitted_bases: Option<HashSet<ObjectId>>,
143 out: &mut dyn Write,
144 kind: gix::hash::Kind,
145) -> Result<(), PackError> {
146 let num_entries = counts.len() as u32;
147 let allow_thin_pack = permitted_bases.is_some();
148 let entries = output::entry::iter_from_counts(
149 counts,
150 odb.clone(),
151 Box::new(Discard),
152 output::entry::iter_from_counts::Options {
153 thread_limit: Some(1),
154 mode: output::entry::iter_from_counts::Mode::PackCopyAndBaseObjects,
155 allow_thin_pack,
156 chunk_size: 50,
157 version: Version::V2,
158 },
159 );
160
161 let entries = entries.map(move |chunk| -> Result<Vec<output::Entry>, PackError> {
162 let (_, entries) = chunk.map_err(|error| PackError::Pack(error.to_string()))?;
163 entries
164 .into_iter()
165 .map(|entry| restrict_thin_base(&odb, permitted_bases.as_ref(), entry))
166 .collect()
167 });
168
169 let mut writer =
170 output::bytes::FromEntriesIter::new(entries, out, num_entries, Version::V2, kind);
171 writer
172 .try_fold((), |(), written| written.map(|_| ()))
173 .map_err(|error| PackError::Pack(error.to_string()))?;
174 Ok(())
175}
176
177struct SharedCount(Arc<AtomicUsize>);
178
179impl gix::progress::Count for SharedCount {
180 fn set(&self, step: usize) {
181 self.0.store(step, Ordering::Relaxed);
182 }
183
184 fn step(&self) -> usize {
185 self.0.load(Ordering::Relaxed)
186 }
187
188 fn inc_by(&self, step: usize) {
189 self.0.fetch_add(step, Ordering::Relaxed);
190 }
191
192 fn counter(&self) -> Arc<AtomicUsize> {
193 Arc::clone(&self.0)
194 }
195}
196
197#[derive(Debug)]
198struct Halted;
199
200impl std::fmt::Display for Halted {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.write_str("object enumeration halted by selection budget")
203 }
204}
205
206impl Error for Halted {}
207
208#[derive(Clone)]
209struct Interruptible {
210 inner: gix::odb::Handle,
211 flag: Arc<AtomicBool>,
212}
213
214impl gix_pack::Find for Interruptible {
215 fn contains(&self, id: &gix::hash::oid) -> bool {
216 self.inner.contains(id)
217 }
218
219 fn try_find_cached<'a>(
220 &self,
221 id: &gix::hash::oid,
222 buffer: &'a mut Vec<u8>,
223 pack_cache: &mut dyn gix_pack::cache::DecodeEntry,
224 ) -> Result<
225 Option<(gix::objs::Data<'a>, Option<gix_pack::data::entry::Location>)>,
226 gix::objs::find::Error,
227 > {
228 if self.flag.load(Ordering::Relaxed) {
229 return Err(Box::new(Halted));
230 }
231 self.inner.try_find_cached(id, buffer, pack_cache)
232 }
233
234 fn location_by_oid(
235 &self,
236 id: &gix::hash::oid,
237 buf: &mut Vec<u8>,
238 ) -> Option<gix_pack::data::entry::Location> {
239 self.inner.location_by_oid(id, buf)
240 }
241
242 fn pack_offsets_and_oid(
243 &self,
244 pack_id: u32,
245 ) -> Option<Vec<(gix_pack::data::Offset, gix::hash::ObjectId)>> {
246 self.inner.pack_offsets_and_oid(pack_id)
247 }
248
249 fn entry_by_location(
250 &self,
251 location: &gix_pack::data::entry::Location,
252 ) -> Option<gix_pack::find::Entry> {
253 self.inner.entry_by_location(location)
254 }
255}
256
257struct Watchdog {
258 idle: Option<mpsc::Sender<()>>,
259 watch: Option<std::thread::JoinHandle<()>>,
260}
261
262impl Watchdog {
263 fn arm(
264 flag: Arc<AtomicBool>,
265 counter: Arc<AtomicUsize>,
266 max_objects: usize,
267 deadline: Instant,
268 ) -> Self {
269 let (idle, wake) = mpsc::channel::<()>();
270 let watch = std::thread::spawn(move || {
271 let poll = Duration::from_millis(25);
272 loop {
273 if Instant::now() >= deadline || counter.load(Ordering::Relaxed) > max_objects {
274 flag.store(true, Ordering::Relaxed);
275 return;
276 }
277 if !matches!(
278 wake.recv_timeout(poll),
279 Err(mpsc::RecvTimeoutError::Timeout)
280 ) {
281 return;
282 }
283 }
284 });
285 Self {
286 idle: Some(idle),
287 watch: Some(watch),
288 }
289 }
290}
291
292impl Drop for Watchdog {
293 fn drop(&mut self) {
294 self.idle.take();
295 if let Some(watch) = self.watch.take() {
296 let _ = watch.join();
297 }
298 }
299}
300
301fn restrict_thin_base(
302 odb: &gix::odb::Handle,
303 permitted: Option<&HashSet<ObjectId>>,
304 entry: output::Entry,
305) -> Result<output::Entry, PackError> {
306 let base = match &entry.kind {
307 output::entry::Kind::DeltaOid { id } => *id,
308 _ => return Ok(entry),
309 };
310 if permitted.is_some_and(|bases| bases.contains(&base)) {
311 return Ok(entry);
312 }
313 let mut buf = Vec::new();
314 let object = odb
315 .find(&entry.id, &mut buf)
316 .map_err(|error| PackError::Pack(error.to_string()))?;
317 let count = output::Count::from_data(entry.id, None);
318 output::Entry::from_data(&count, &object).map_err(|error| PackError::Pack(error.to_string()))
319}
320
321pub fn index_pack(
322 objects_dir: &Path,
323 pack: &[u8],
324 limits: &PackLimits,
325 kind: gix::hash::Kind,
326) -> Result<(), PackError> {
327 if pack.is_empty() {
328 return Ok(());
329 }
330 if !pack.starts_with(b"PACK") {
331 return Err(PackError::Pack(
332 "packfile is missing its PACK signature".to_string(),
333 ));
334 }
335 meter::meter(pack, limits, kind)?;
336 write_with_gix(objects_dir, pack, kind)
337 .or_else(|_| resolve::resolve(objects_dir, pack, limits, kind))
338}
339
340fn write_with_gix(objects_dir: &Path, pack: &[u8], kind: gix::hash::Kind) -> Result<(), PackError> {
341 let odb = odb_at(objects_dir, kind)?;
342 let interrupt = AtomicBool::new(false);
343 let mut reader = std::io::Cursor::new(pack);
344 gix_pack::Bundle::write_to_directory(
345 &mut reader,
346 Some(&objects_dir.join("pack")),
347 &mut Discard,
348 &interrupt,
349 Some(odb),
350 gix_pack::bundle::write::Options {
351 thread_limit: Some(1),
352 iteration_mode: gix_pack::data::input::Mode::Verify,
353 index_version: gix_pack::index::Version::default(),
354 object_hash: kind,
355 },
356 )
357 .map(|_| ())
358 .map_err(|error| PackError::Pack(error.to_string()))
359}
360
361#[cfg(test)]
362mod tests {
363 use gix_pack::Find;
364
365 use super::*;
366
367 #[test]
368 fn interruptible_find_errors_once_the_flag_is_set() {
369 let dir = tempfile::tempdir().unwrap();
370 let flag = Arc::new(AtomicBool::new(false));
371 let find = Interruptible {
372 inner: gix::odb::at(dir.path()).unwrap(),
373 flag: Arc::clone(&flag),
374 };
375 let absent = Oid::null().object_id();
376 let mut buf = Vec::new();
377
378 assert!(
379 find.try_find(&absent, &mut buf).unwrap().is_none(),
380 "before budget trips, lookup of an absent object is a plain miss"
381 );
382
383 flag.store(true, Ordering::Relaxed);
384 assert!(
385 find.try_find(&absent, &mut buf).is_err(),
386 "once tripped, every decode errors, which is what aborts a breadthfirst tree walk \
387 mid-closure instead of waiting for the next commit-root boundary"
388 );
389 }
390}