Another project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 6.0 kB View raw
1use std::path::{Path, PathBuf}; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicBool, Ordering}; 4use std::sync::mpsc::{Receiver, TryRecvError}; 5 6use bone_document::Document; 7use bone_interop::{CancelFlag, StepError}; 8use bone_types::StepSchema; 9 10const PROGRESS_DIALOG_THRESHOLD_BYTES: u64 = 1_000_000; 11 12pub struct JobMeta { 13 cancel: Arc<AtomicBool>, 14 pub file_name: String, 15 pub show_progress: bool, 16} 17 18impl JobMeta { 19 pub fn request_cancel(&self) { 20 self.cancel.store(true, Ordering::Relaxed); 21 } 22 23 #[must_use] 24 pub fn cancel_requested(&self) -> bool { 25 self.cancel.load(Ordering::Relaxed) 26 } 27} 28 29#[derive(Debug)] 30pub enum JobResult<T> { 31 Finished(T), 32 Failed(StepError), 33 WorkerLost, 34} 35 36pub enum StepJob { 37 Import { 38 rx: Receiver<Result<Box<Document>, StepError>>, 39 baseline: Box<Document>, 40 meta: JobMeta, 41 }, 42 Export { 43 rx: Receiver<Result<(), StepError>>, 44 meta: JobMeta, 45 }, 46} 47 48impl StepJob { 49 #[must_use] 50 pub fn meta(&self) -> &JobMeta { 51 match self { 52 Self::Import { meta, .. } | Self::Export { meta, .. } => meta, 53 } 54 } 55} 56 57pub fn poll<T>(rx: &Receiver<Result<T, StepError>>) -> std::task::Poll<JobResult<T>> { 58 match rx.try_recv() { 59 Ok(Ok(value)) => std::task::Poll::Ready(JobResult::Finished(value)), 60 Ok(Err(e)) => std::task::Poll::Ready(JobResult::Failed(e)), 61 Err(TryRecvError::Empty) => std::task::Poll::Pending, 62 Err(TryRecvError::Disconnected) => std::task::Poll::Ready(JobResult::WorkerLost), 63 } 64} 65 66#[derive(Debug, thiserror::Error)] 67pub enum SpawnError { 68 #[error("step worker thread failed to start: {0}")] 69 Thread(std::io::Error), 70} 71 72pub fn spawn_import(path: PathBuf, baseline: Document) -> Result<StepJob, SpawnError> { 73 let show_progress = 74 std::fs::metadata(&path).is_ok_and(|m| m.len() > PROGRESS_DIALOG_THRESHOLD_BYTES); 75 let (rx, meta) = spawn(display_name(&path), show_progress, move |cancel| { 76 bone_interop::read(&path, cancel).map(Box::new) 77 })?; 78 Ok(StepJob::Import { 79 rx, 80 baseline: Box::new(baseline), 81 meta, 82 }) 83} 84 85pub fn spawn_export(document: Document, path: PathBuf) -> Result<StepJob, SpawnError> { 86 let (rx, meta) = spawn(display_name(&path), false, move |cancel| { 87 bone_interop::write(&document, &path, StepSchema::Ap214, cancel) 88 })?; 89 Ok(StepJob::Export { rx, meta }) 90} 91 92fn display_name(path: &Path) -> String { 93 path.file_name() 94 .map(|s| s.to_string_lossy().into_owned()) 95 .unwrap_or_default() 96} 97 98fn spawn<T: Send + 'static>( 99 file_name: String, 100 show_progress: bool, 101 work: impl FnOnce(CancelFlag<'_>) -> Result<T, StepError> + Send + 'static, 102) -> Result<(Receiver<Result<T, StepError>>, JobMeta), SpawnError> { 103 let cancel = Arc::new(AtomicBool::new(false)); 104 let cancel_for_worker = Arc::clone(&cancel); 105 let (tx, rx) = std::sync::mpsc::channel(); 106 std::thread::Builder::new() 107 .name("bone-step-job".to_owned()) 108 .spawn(move || { 109 let result = work(CancelFlag::new(cancel_for_worker.as_ref())); 110 let _ = tx.send(result); 111 }) 112 .map_err(SpawnError::Thread)?; 113 Ok(( 114 rx, 115 JobMeta { 116 cancel, 117 file_name, 118 show_progress, 119 }, 120 )) 121} 122 123#[cfg(test)] 124mod tests { 125 use super::{JobResult, StepJob, poll, spawn_export, spawn_import}; 126 use bone_interop::StepError; 127 use std::path::PathBuf; 128 use std::sync::mpsc::Receiver; 129 130 fn wait<T>(rx: &Receiver<Result<T, StepError>>) -> JobResult<T> { 131 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); 132 loop { 133 match poll(rx) { 134 std::task::Poll::Ready(result) => return result, 135 std::task::Poll::Pending => { 136 assert!(std::time::Instant::now() < deadline, "step job timed out"); 137 std::thread::yield_now(); 138 } 139 } 140 } 141 } 142 143 #[test] 144 fn import_of_missing_file_reports_failure() { 145 let baseline = 146 bone_document::Document::new(bone_types::DocumentId::default(), "scallop".to_owned()); 147 let job = match spawn_import(PathBuf::from("/nonexistent/scallop.step"), baseline) { 148 Ok(job) => job, 149 Err(e) => panic!("spawn import: {e}"), 150 }; 151 let StepJob::Import { rx, .. } = job else { 152 panic!("spawn_import yields an import job"); 153 }; 154 assert!(matches!(wait(&rx), JobResult::Failed(StepError::Io { .. }))); 155 } 156 157 #[test] 158 fn export_failure_leaves_no_step_file() { 159 let dir = 160 std::env::temp_dir().join(format!("bone-step-job-export-fail-{}", std::process::id())); 161 if let Err(e) = std::fs::create_dir_all(&dir) { 162 panic!("temp dir: {e}"); 163 } 164 let target = dir.join("nautilus.step"); 165 let document = 166 bone_document::Document::new(bone_types::DocumentId::default(), "nautilus".to_owned()); 167 let job = match spawn_export(document, target.clone()) { 168 Ok(job) => job, 169 Err(e) => panic!("spawn export: {e}"), 170 }; 171 let StepJob::Export { rx, .. } = job else { 172 panic!("spawn_export yields an export job"); 173 }; 174 match wait(&rx) { 175 JobResult::Failed(_) => {} 176 other => panic!("empty document export must fail, got {other:?}"), 177 } 178 assert!(!target.exists()); 179 if let Err(e) = std::fs::remove_dir_all(&dir) { 180 panic!("temp dir cleanup: {e}"); 181 } 182 } 183 184 #[test] 185 fn worker_panic_surfaces_as_worker_lost() { 186 let (rx, _meta) = match super::spawn("limpet.step".to_owned(), false, |_| { 187 panic!("worker dies before reporting") 188 }) { 189 Ok(spawned) => spawned, 190 Err(e) => panic!("spawn: {e}"), 191 }; 192 assert!(matches!(wait::<()>(&rx), JobResult::WorkerLost)); 193 } 194}