Another project
1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::mpsc::{Receiver, TryRecvError};
5
6use bone_document::Document;
7use bone_interop::{CancelFlag, StepError};
8use bone_types::StepSchema;
9
10const PROGRESS_DIALOG_THRESHOLD_BYTES: u64 = 1_000_000;
11
12pub struct JobMeta {
13 cancel: Arc<AtomicBool>,
14 pub file_name: String,
15 pub show_progress: bool,
16}
17
18impl JobMeta {
19 pub fn request_cancel(&self) {
20 self.cancel.store(true, Ordering::Relaxed);
21 }
22
23 #[must_use]
24 pub fn cancel_requested(&self) -> bool {
25 self.cancel.load(Ordering::Relaxed)
26 }
27}
28
29#[derive(Debug)]
30pub enum JobResult<T> {
31 Finished(T),
32 Failed(StepError),
33 WorkerLost,
34}
35
36pub enum StepJob {
37 Import {
38 rx: Receiver<Result<Box<Document>, StepError>>,
39 baseline: Box<Document>,
40 meta: JobMeta,
41 },
42 Export {
43 rx: Receiver<Result<(), StepError>>,
44 meta: JobMeta,
45 },
46}
47
48impl StepJob {
49 #[must_use]
50 pub fn meta(&self) -> &JobMeta {
51 match self {
52 Self::Import { meta, .. } | Self::Export { meta, .. } => meta,
53 }
54 }
55}
56
57pub fn poll<T>(rx: &Receiver<Result<T, StepError>>) -> std::task::Poll<JobResult<T>> {
58 match rx.try_recv() {
59 Ok(Ok(value)) => std::task::Poll::Ready(JobResult::Finished(value)),
60 Ok(Err(e)) => std::task::Poll::Ready(JobResult::Failed(e)),
61 Err(TryRecvError::Empty) => std::task::Poll::Pending,
62 Err(TryRecvError::Disconnected) => std::task::Poll::Ready(JobResult::WorkerLost),
63 }
64}
65
66#[derive(Debug, thiserror::Error)]
67pub enum SpawnError {
68 #[error("step worker thread failed to start: {0}")]
69 Thread(std::io::Error),
70}
71
72pub fn spawn_import(path: PathBuf, baseline: Document) -> Result<StepJob, SpawnError> {
73 let show_progress =
74 std::fs::metadata(&path).is_ok_and(|m| m.len() > PROGRESS_DIALOG_THRESHOLD_BYTES);
75 let (rx, meta) = spawn(display_name(&path), show_progress, move |cancel| {
76 bone_interop::read(&path, cancel).map(Box::new)
77 })?;
78 Ok(StepJob::Import {
79 rx,
80 baseline: Box::new(baseline),
81 meta,
82 })
83}
84
85pub fn spawn_export(document: Document, path: PathBuf) -> Result<StepJob, SpawnError> {
86 let (rx, meta) = spawn(display_name(&path), false, move |cancel| {
87 bone_interop::write(&document, &path, StepSchema::Ap214, cancel)
88 })?;
89 Ok(StepJob::Export { rx, meta })
90}
91
92fn display_name(path: &Path) -> String {
93 path.file_name()
94 .map(|s| s.to_string_lossy().into_owned())
95 .unwrap_or_default()
96}
97
98fn spawn<T: Send + 'static>(
99 file_name: String,
100 show_progress: bool,
101 work: impl FnOnce(CancelFlag<'_>) -> Result<T, StepError> + Send + 'static,
102) -> Result<(Receiver<Result<T, StepError>>, JobMeta), SpawnError> {
103 let cancel = Arc::new(AtomicBool::new(false));
104 let cancel_for_worker = Arc::clone(&cancel);
105 let (tx, rx) = std::sync::mpsc::channel();
106 std::thread::Builder::new()
107 .name("bone-step-job".to_owned())
108 .spawn(move || {
109 let result = work(CancelFlag::new(cancel_for_worker.as_ref()));
110 let _ = tx.send(result);
111 })
112 .map_err(SpawnError::Thread)?;
113 Ok((
114 rx,
115 JobMeta {
116 cancel,
117 file_name,
118 show_progress,
119 },
120 ))
121}
122
123#[cfg(test)]
124mod tests {
125 use super::{JobResult, StepJob, poll, spawn_export, spawn_import};
126 use bone_interop::StepError;
127 use std::path::PathBuf;
128 use std::sync::mpsc::Receiver;
129
130 fn wait<T>(rx: &Receiver<Result<T, StepError>>) -> JobResult<T> {
131 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
132 loop {
133 match poll(rx) {
134 std::task::Poll::Ready(result) => return result,
135 std::task::Poll::Pending => {
136 assert!(std::time::Instant::now() < deadline, "step job timed out");
137 std::thread::yield_now();
138 }
139 }
140 }
141 }
142
143 #[test]
144 fn import_of_missing_file_reports_failure() {
145 let baseline =
146 bone_document::Document::new(bone_types::DocumentId::default(), "scallop".to_owned());
147 let job = match spawn_import(PathBuf::from("/nonexistent/scallop.step"), baseline) {
148 Ok(job) => job,
149 Err(e) => panic!("spawn import: {e}"),
150 };
151 let StepJob::Import { rx, .. } = job else {
152 panic!("spawn_import yields an import job");
153 };
154 assert!(matches!(wait(&rx), JobResult::Failed(StepError::Io { .. })));
155 }
156
157 #[test]
158 fn export_failure_leaves_no_step_file() {
159 let dir =
160 std::env::temp_dir().join(format!("bone-step-job-export-fail-{}", std::process::id()));
161 if let Err(e) = std::fs::create_dir_all(&dir) {
162 panic!("temp dir: {e}");
163 }
164 let target = dir.join("nautilus.step");
165 let document =
166 bone_document::Document::new(bone_types::DocumentId::default(), "nautilus".to_owned());
167 let job = match spawn_export(document, target.clone()) {
168 Ok(job) => job,
169 Err(e) => panic!("spawn export: {e}"),
170 };
171 let StepJob::Export { rx, .. } = job else {
172 panic!("spawn_export yields an export job");
173 };
174 match wait(&rx) {
175 JobResult::Failed(_) => {}
176 other => panic!("empty document export must fail, got {other:?}"),
177 }
178 assert!(!target.exists());
179 if let Err(e) = std::fs::remove_dir_all(&dir) {
180 panic!("temp dir cleanup: {e}");
181 }
182 }
183
184 #[test]
185 fn worker_panic_surfaces_as_worker_lost() {
186 let (rx, _meta) = match super::spawn("limpet.step".to_owned(), false, |_| {
187 panic!("worker dies before reporting")
188 }) {
189 Ok(spawned) => spawned,
190 Err(e) => panic!("spawn: {e}"),
191 };
192 assert!(matches!(wait::<()>(&rx), JobResult::WorkerLost));
193 }
194}