Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 11 kB View raw
1use std::collections::HashSet; 2use std::error::Error; 3use std::io::Write; 4use std::path::Path; 5use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 6use std::sync::{Arc, mpsc}; 7use std::time::{Duration, Instant}; 8 9use gix::ObjectId; 10use gix::prelude::FindExt; 11use gix::progress::Discard; 12use gix_pack::data::{Version, output}; 13use knot_types::Oid; 14 15use crate::error::PackError; 16use crate::meter::{self, PackLimits}; 17use crate::resolve; 18 19type OidStream = Box<dyn Iterator<Item = Result<ObjectId, Box<dyn Error + Send + Sync>>> + Send>; 20 21fn odb_at(objects_dir: &Path, kind: gix::hash::Kind) -> Result<gix::odb::Handle, PackError> { 22 gix::odb::at_opts( 23 objects_dir, 24 std::iter::empty(), 25 gix::odb::store::init::Options { 26 object_hash: kind, 27 ..Default::default() 28 }, 29 ) 30 .map_err(|error| PackError::Pack(error.to_string())) 31} 32 33pub fn write_pack( 34 objects_dir: &Path, 35 oids: Vec<Oid>, 36 thin_bases: Option<&HashSet<Oid>>, 37 out: &mut dyn Write, 38 kind: gix::hash::Kind, 39) -> Result<(), PackError> { 40 let mut odb = odb_at(objects_dir, kind)?; 41 odb.prevent_pack_unload(); 42 odb.refresh_never(); 43 44 let interrupt = AtomicBool::new(false); 45 let permitted_bases: Option<HashSet<ObjectId>> = 46 thin_bases.map(|bases| bases.iter().map(|oid| oid.object_id()).collect()); 47 let oids: OidStream = Box::new(oids.into_iter().map(|oid| Ok(oid.object_id()))); 48 49 let (counts, _) = output::count::objects( 50 odb.clone(), 51 oids, 52 &Discard, 53 &interrupt, 54 output::count::objects::Options { 55 thread_limit: Some(1), 56 chunk_size: 50, 57 input_object_expansion: output::count::objects::ObjectExpansion::AsIs, 58 }, 59 ) 60 .map_err(|error| PackError::Pack(error.to_string()))?; 61 62 write_counts(counts, odb, permitted_bases, out, kind) 63} 64 65pub struct ExpandedPack { 66 counts: Vec<output::Count>, 67 odb: gix::odb::Handle, 68 kind: gix::hash::Kind, 69} 70 71impl ExpandedPack { 72 pub fn len(&self) -> usize { 73 self.counts.len() 74 } 75 76 pub fn is_empty(&self) -> bool { 77 self.counts.is_empty() 78 } 79} 80 81pub fn count_expanded( 82 objects_dir: &Path, 83 roots: Vec<Oid>, 84 max_objects: usize, 85 deadline: Instant, 86 kind: gix::hash::Kind, 87) -> Result<ExpandedPack, PackError> { 88 let mut odb = odb_at(objects_dir, kind)?; 89 odb.prevent_pack_unload(); 90 odb.refresh_never(); 91 92 let interrupt = Arc::new(AtomicBool::new(false)); 93 let counter = Arc::new(AtomicUsize::new(0)); 94 let progress = SharedCount(Arc::clone(&counter)); 95 let oids: OidStream = Box::new(roots.into_iter().map(|oid| Ok(oid.object_id()))); 96 let result = { 97 let _watchdog = Watchdog::arm( 98 interrupt.clone(), 99 Arc::clone(&counter), 100 max_objects, 101 deadline, 102 ); 103 output::count::objects( 104 Interruptible { 105 inner: odb.clone(), 106 flag: Arc::clone(&interrupt), 107 }, 108 oids, 109 &progress, 110 &interrupt, 111 output::count::objects::Options { 112 thread_limit: Some(1), 113 chunk_size: 50, 114 input_object_expansion: output::count::objects::ObjectExpansion::TreeContents, 115 }, 116 ) 117 }; 118 let over_cap = counter.load(Ordering::Relaxed) > max_objects; 119 let timed_out = interrupt.load(Ordering::Relaxed); 120 let counts = match result { 121 Ok((counts, _)) => counts, 122 Err(_) if over_cap => return Err(PackError::SelectionTooLarge), 123 Err(_) if timed_out => return Err(PackError::SelectionTimeout), 124 Err(error) => return Err(PackError::Pack(error.to_string())), 125 }; 126 if counts.len() > max_objects { 127 return Err(PackError::SelectionTooLarge); 128 } 129 if timed_out { 130 return Err(PackError::SelectionTimeout); 131 } 132 Ok(ExpandedPack { counts, odb, kind }) 133} 134 135pub fn write_expanded(pack: ExpandedPack, out: &mut dyn Write) -> Result<(), PackError> { 136 write_counts(pack.counts, pack.odb, None, out, pack.kind) 137} 138 139fn write_counts( 140 counts: Vec<output::Count>, 141 odb: gix::odb::Handle, 142 permitted_bases: Option<HashSet<ObjectId>>, 143 out: &mut dyn Write, 144 kind: gix::hash::Kind, 145) -> Result<(), PackError> { 146 let num_entries = counts.len() as u32; 147 let allow_thin_pack = permitted_bases.is_some(); 148 let entries = output::entry::iter_from_counts( 149 counts, 150 odb.clone(), 151 Box::new(Discard), 152 output::entry::iter_from_counts::Options { 153 thread_limit: Some(1), 154 mode: output::entry::iter_from_counts::Mode::PackCopyAndBaseObjects, 155 allow_thin_pack, 156 chunk_size: 50, 157 version: Version::V2, 158 }, 159 ); 160 161 let entries = entries.map(move |chunk| -> Result<Vec<output::Entry>, PackError> { 162 let (_, entries) = chunk.map_err(|error| PackError::Pack(error.to_string()))?; 163 entries 164 .into_iter() 165 .map(|entry| restrict_thin_base(&odb, permitted_bases.as_ref(), entry)) 166 .collect() 167 }); 168 169 let mut writer = 170 output::bytes::FromEntriesIter::new(entries, out, num_entries, Version::V2, kind); 171 writer 172 .try_fold((), |(), written| written.map(|_| ())) 173 .map_err(|error| PackError::Pack(error.to_string()))?; 174 Ok(()) 175} 176 177struct SharedCount(Arc<AtomicUsize>); 178 179impl gix::progress::Count for SharedCount { 180 fn set(&self, step: usize) { 181 self.0.store(step, Ordering::Relaxed); 182 } 183 184 fn step(&self) -> usize { 185 self.0.load(Ordering::Relaxed) 186 } 187 188 fn inc_by(&self, step: usize) { 189 self.0.fetch_add(step, Ordering::Relaxed); 190 } 191 192 fn counter(&self) -> Arc<AtomicUsize> { 193 Arc::clone(&self.0) 194 } 195} 196 197#[derive(Debug)] 198struct Halted; 199 200impl std::fmt::Display for Halted { 201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 202 f.write_str("object enumeration halted by selection budget") 203 } 204} 205 206impl Error for Halted {} 207 208#[derive(Clone)] 209struct Interruptible { 210 inner: gix::odb::Handle, 211 flag: Arc<AtomicBool>, 212} 213 214impl gix_pack::Find for Interruptible { 215 fn contains(&self, id: &gix::hash::oid) -> bool { 216 self.inner.contains(id) 217 } 218 219 fn try_find_cached<'a>( 220 &self, 221 id: &gix::hash::oid, 222 buffer: &'a mut Vec<u8>, 223 pack_cache: &mut dyn gix_pack::cache::DecodeEntry, 224 ) -> Result< 225 Option<(gix::objs::Data<'a>, Option<gix_pack::data::entry::Location>)>, 226 gix::objs::find::Error, 227 > { 228 if self.flag.load(Ordering::Relaxed) { 229 return Err(Box::new(Halted)); 230 } 231 self.inner.try_find_cached(id, buffer, pack_cache) 232 } 233 234 fn location_by_oid( 235 &self, 236 id: &gix::hash::oid, 237 buf: &mut Vec<u8>, 238 ) -> Option<gix_pack::data::entry::Location> { 239 self.inner.location_by_oid(id, buf) 240 } 241 242 fn pack_offsets_and_oid( 243 &self, 244 pack_id: u32, 245 ) -> Option<Vec<(gix_pack::data::Offset, gix::hash::ObjectId)>> { 246 self.inner.pack_offsets_and_oid(pack_id) 247 } 248 249 fn entry_by_location( 250 &self, 251 location: &gix_pack::data::entry::Location, 252 ) -> Option<gix_pack::find::Entry> { 253 self.inner.entry_by_location(location) 254 } 255} 256 257struct Watchdog { 258 idle: Option<mpsc::Sender<()>>, 259 watch: Option<std::thread::JoinHandle<()>>, 260} 261 262impl Watchdog { 263 fn arm( 264 flag: Arc<AtomicBool>, 265 counter: Arc<AtomicUsize>, 266 max_objects: usize, 267 deadline: Instant, 268 ) -> Self { 269 let (idle, wake) = mpsc::channel::<()>(); 270 let watch = std::thread::spawn(move || { 271 let poll = Duration::from_millis(25); 272 loop { 273 if Instant::now() >= deadline || counter.load(Ordering::Relaxed) > max_objects { 274 flag.store(true, Ordering::Relaxed); 275 return; 276 } 277 if !matches!( 278 wake.recv_timeout(poll), 279 Err(mpsc::RecvTimeoutError::Timeout) 280 ) { 281 return; 282 } 283 } 284 }); 285 Self { 286 idle: Some(idle), 287 watch: Some(watch), 288 } 289 } 290} 291 292impl Drop for Watchdog { 293 fn drop(&mut self) { 294 self.idle.take(); 295 if let Some(watch) = self.watch.take() { 296 let _ = watch.join(); 297 } 298 } 299} 300 301fn restrict_thin_base( 302 odb: &gix::odb::Handle, 303 permitted: Option<&HashSet<ObjectId>>, 304 entry: output::Entry, 305) -> Result<output::Entry, PackError> { 306 let base = match &entry.kind { 307 output::entry::Kind::DeltaOid { id } => *id, 308 _ => return Ok(entry), 309 }; 310 if permitted.is_some_and(|bases| bases.contains(&base)) { 311 return Ok(entry); 312 } 313 let mut buf = Vec::new(); 314 let object = odb 315 .find(&entry.id, &mut buf) 316 .map_err(|error| PackError::Pack(error.to_string()))?; 317 let count = output::Count::from_data(entry.id, None); 318 output::Entry::from_data(&count, &object).map_err(|error| PackError::Pack(error.to_string())) 319} 320 321pub fn index_pack( 322 objects_dir: &Path, 323 pack: &[u8], 324 limits: &PackLimits, 325 kind: gix::hash::Kind, 326) -> Result<(), PackError> { 327 if pack.is_empty() { 328 return Ok(()); 329 } 330 if !pack.starts_with(b"PACK") { 331 return Err(PackError::Pack( 332 "packfile is missing its PACK signature".to_string(), 333 )); 334 } 335 meter::meter(pack, limits, kind)?; 336 write_with_gix(objects_dir, pack, kind) 337 .or_else(|_| resolve::resolve(objects_dir, pack, limits, kind)) 338} 339 340fn write_with_gix(objects_dir: &Path, pack: &[u8], kind: gix::hash::Kind) -> Result<(), PackError> { 341 let odb = odb_at(objects_dir, kind)?; 342 let interrupt = AtomicBool::new(false); 343 let mut reader = std::io::Cursor::new(pack); 344 gix_pack::Bundle::write_to_directory( 345 &mut reader, 346 Some(&objects_dir.join("pack")), 347 &mut Discard, 348 &interrupt, 349 Some(odb), 350 gix_pack::bundle::write::Options { 351 thread_limit: Some(1), 352 iteration_mode: gix_pack::data::input::Mode::Verify, 353 index_version: gix_pack::index::Version::default(), 354 object_hash: kind, 355 }, 356 ) 357 .map(|_| ()) 358 .map_err(|error| PackError::Pack(error.to_string())) 359} 360 361#[cfg(test)] 362mod tests { 363 use gix_pack::Find; 364 365 use super::*; 366 367 #[test] 368 fn interruptible_find_errors_once_the_flag_is_set() { 369 let dir = tempfile::tempdir().unwrap(); 370 let flag = Arc::new(AtomicBool::new(false)); 371 let find = Interruptible { 372 inner: gix::odb::at(dir.path()).unwrap(), 373 flag: Arc::clone(&flag), 374 }; 375 let absent = Oid::null().object_id(); 376 let mut buf = Vec::new(); 377 378 assert!( 379 find.try_find(&absent, &mut buf).unwrap().is_none(), 380 "before budget trips, lookup of an absent object is a plain miss" 381 ); 382 383 flag.store(true, Ordering::Relaxed); 384 assert!( 385 find.try_find(&absent, &mut buf).is_err(), 386 "once tripped, every decode errors, which is what aborts a breadthfirst tree walk \ 387 mid-closure instead of waiting for the next commit-root boundary" 388 ); 389 } 390}