Monorepo for Tangled
tangled.org
1use anyhow::{Context, Result};
2use nix::sys::signal::{Signal, kill};
3use nix::unistd::{Gid, Pid, Uid, User, getgrouplist, setgid, setgroups, setuid};
4use std::ffi::{CString, OsStr, OsString};
5use std::io;
6use std::os::unix::process::ExitStatusExt;
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt};
11use tokio::process::{Child, Command};
12use tokio::sync::mpsc::{self, Receiver, Sender};
13use tokio::task::JoinHandle;
14use tracing::warn;
15
16#[derive(Clone, Debug)]
17pub struct Spec {
18 pub program: OsString,
19 pub args: Vec<OsString>,
20 pub env: Vec<(OsString, OsString)>,
21 pub cwd: Option<PathBuf>,
22 pub timeout: Option<Duration>,
23 pub uid: Option<u32>,
24 pub gid: Option<u32>,
25}
26
27impl Spec {
28 pub fn new(program: impl Into<OsString>) -> Self {
29 Self {
30 program: program.into(),
31 args: Vec::new(),
32 env: Vec::new(),
33 cwd: None,
34 timeout: None,
35 uid: None,
36 gid: None,
37 }
38 }
39
40 pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
41 self.args.push(arg.into());
42 self
43 }
44
45 pub fn args<I, S>(mut self, args: I) -> Self
46 where
47 I: IntoIterator<Item = S>,
48 S: Into<OsString>,
49 {
50 self.args.extend(args.into_iter().map(Into::into));
51 self
52 }
53
54 pub fn envs<I, K, V>(mut self, env: I) -> Self
55 where
56 I: IntoIterator<Item = (K, V)>,
57 K: Into<OsString>,
58 V: Into<OsString>,
59 {
60 self.env.extend(
61 env.into_iter()
62 .map(|(key, value)| (key.into(), value.into())),
63 );
64 self
65 }
66
67 pub fn cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
68 self.cwd = Some(cwd.into());
69 self
70 }
71
72 pub fn timeout(mut self, timeout: Duration) -> Self {
73 self.timeout = Some(timeout);
74 self
75 }
76
77 pub fn run_as(mut self, uid: u32, gid: u32) -> Self {
78 self.uid = Some(uid);
79 self.gid = Some(gid);
80 self
81 }
82}
83
84#[derive(Clone, Debug)]
85pub struct ExitResult {
86 pub exit_code: i32,
87 pub error: Option<String>,
88 pub timed_out: bool,
89}
90
91#[derive(Clone, Debug)]
92pub struct CaptureOutput {
93 pub exit: ExitResult,
94 pub stdout: Vec<u8>,
95 pub stderr: Vec<u8>,
96}
97
98impl CaptureOutput {
99 pub fn success(&self) -> bool {
100 self.exit.exit_code == 0 && self.exit.error.is_none()
101 }
102
103 pub fn combined_lossy(&self) -> String {
104 let mut data = self.stdout.clone();
105 data.extend_from_slice(&self.stderr);
106 String::from_utf8_lossy(&data).trim().to_owned()
107 }
108}
109
110#[derive(Clone, Copy, Debug)]
111pub enum OutKind {
112 Stdout,
113 Stderr,
114}
115
116#[derive(Clone, Debug)]
117pub struct OutData {
118 pub data: Vec<u8>,
119 pub kind: OutKind,
120}
121
122pub struct StreamingCommand {
123 events: Receiver<OutData>,
124 exit: JoinHandle<Result<ExitResult>>,
125}
126
127impl StreamingCommand {
128 pub fn into_parts(self) -> (Receiver<OutData>, JoinHandle<Result<ExitResult>>) {
129 (self.events, self.exit)
130 }
131}
132
133pub async fn run_capture(spec: Spec) -> Result<CaptureOutput> {
134 let running = spawn_streaming(spec)?;
135 let mut stdout = Vec::new();
136 let mut stderr = Vec::new();
137 let (mut events, exit_task) = running.into_parts();
138
139 while let Some(event) = events.recv().await {
140 match event.kind {
141 OutKind::Stdout => stdout.extend_from_slice(&event.data),
142 OutKind::Stderr => stderr.extend_from_slice(&event.data),
143 }
144 }
145
146 let exit = exit_task
147 .await
148 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}")))?;
149 Ok(CaptureOutput {
150 exit,
151 stdout,
152 stderr,
153 })
154}
155
156pub fn spawn_streaming(mut spec: Spec) -> Result<StreamingCommand> {
157 let mut child = spawn(&mut spec)?;
158 let stdout = child.stdout.take().context("stdout pipe missing")?;
159 let stderr = child.stderr.take().context("stderr pipe missing")?;
160
161 let (events_tx, events_rx) = mpsc::channel(64);
162 let stdout_thread = spawn_reader(stdout, events_tx.clone(), OutKind::Stdout);
163 let stderr_thread = spawn_reader(stderr, events_tx.clone(), OutKind::Stderr);
164 drop(events_tx);
165
166 let exit = tokio::spawn(async move {
167 let exit = wait_child(&mut child, spec.timeout).await;
168
169 // ensure all output is observed before exiting
170 // this assumes children dont daemonize and hold onto the stdout/err
171 stdout_thread.await.context("stdout reader task failed")?;
172 stderr_thread.await.context("stderr reader task failed")?;
173
174 Ok(exit)
175 });
176
177 Ok(StreamingCommand {
178 events: events_rx,
179 exit,
180 })
181}
182
183fn spawn(spec: &mut Spec) -> Result<Child> {
184 let mut cmd = Command::new(&spec.program);
185 cmd.args(&spec.args)
186 .envs(spec.env.iter().map(|(key, value)| (key, value)))
187 .stdout(Stdio::piped())
188 .stderr(Stdio::piped());
189
190 if let Some(cwd) = &spec.cwd {
191 cmd.current_dir(cwd);
192 }
193
194 // don't use rust's .uid() / .gid() methods here because they clear
195 // supplemantary groups, which means for example adding a user to "docker"
196 // group won't actually let it access the sock.
197 // https://github.com/rust-lang/rust/issues/90747
198 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) {
199 let username = User::from_uid(Uid::from_raw(uid))
200 .ok()
201 .flatten()
202 .map(|u| u.name)
203 .with_context(|| format!("lookup passwd entry for uid {uid}"))?;
204 let cname = CString::new(username)
205 .with_context(|| format!("username for uid {uid} contained a null byte"))?;
206 // resolve groups beforehand so we don't have to read /etc/group in the pre_exec
207 let groups =
208 getgrouplist(&cname, Gid::from_raw(gid)).context("resolve supplementary groups")?;
209 // SAFETY: pre_exec runs between fork and execve in the child.
210 // we only call async-signal-safe syscalls and we don't touch any
211 // shared state, no allocator, no mutexes, no globals.
212 unsafe {
213 cmd.pre_exec(move || {
214 setgroups(&groups).map_err(io::Error::from)?;
215 setgid(Gid::from_raw(gid)).map_err(io::Error::from)?;
216 setuid(Uid::from_raw(uid)).map_err(io::Error::from)?;
217 Ok(())
218 });
219 }
220 }
221
222 // allow us to kill this whole process tree on deadline
223 cmd.process_group(0);
224
225 cmd.spawn()
226 .with_context(|| format!("spawn {}", display_os(&spec.program)))
227}
228
229async fn wait_child(child: &mut Child, timeout: Option<Duration>) -> ExitResult {
230 let wait = child.wait();
231 let status = match timeout {
232 Some(timeout) => match tokio::time::timeout(timeout, wait).await {
233 Ok(status) => status,
234 Err(_) => {
235 if let Some(pid) = child.id()
236 && let Err(error) = kill(Pid::from_raw(-(pid as i32)), Signal::SIGKILL)
237 {
238 warn!(pid, %error, "failed to kill process group");
239 }
240 let _ = child.wait().await;
241 return ExitResult {
242 exit_code: 124,
243 error: Some("command timed out".to_owned()),
244 timed_out: true,
245 };
246 }
247 },
248 None => wait.await,
249 };
250
251 match status {
252 Ok(status) => ExitResult {
253 exit_code: status
254 .code()
255 .or_else(|| status.signal().map(|signal| 128 + signal))
256 .unwrap_or(1),
257 error: None,
258 timed_out: false,
259 },
260 Err(error) => ExitResult {
261 exit_code: 1,
262 error: Some(error.to_string()),
263 timed_out: false,
264 },
265 }
266}
267
268fn spawn_reader(
269 mut reader: impl AsyncRead + Unpin + Send + 'static,
270 events: Sender<OutData>,
271 kind: OutKind,
272) -> JoinHandle<()> {
273 tokio::spawn(async move {
274 let mut buf = [0_u8; 32 * 1024];
275 loop {
276 match reader.read(&mut buf).await {
277 Ok(0) => return,
278 Ok(n) => {
279 let event = OutData {
280 data: buf[..n].to_vec(),
281 kind,
282 };
283 if events.send(event).await.is_err() {
284 return;
285 }
286 }
287 Err(error) => {
288 warn!(%error, "failed to read command stream");
289 return;
290 }
291 }
292 }
293 })
294}
295
296fn display_os(value: &OsStr) -> String {
297 value.to_string_lossy().into_owned()
298}