Monorepo for Tangled
tangled.org
1use anyhow::{Context, Result};
2use nix::sys::signal::{Signal, kill};
3use nix::unistd::{Gid, Pid, Uid, User, getgrouplist, setgid, setgroups, setuid};
4use pty_process::{Command as PtyCommand, OwnedReadPty, OwnedWritePty, Size};
5use std::ffi::{CString, OsString};
6use std::io;
7use std::os::unix::process::ExitStatusExt;
8use std::path::PathBuf;
9use std::process::Stdio;
10use std::time::Duration;
11use tokio::io::{AsyncRead, AsyncReadExt};
12use tokio::process::{Child, Command};
13use tokio::sync::mpsc::{self, Receiver, Sender};
14use tokio::task::JoinHandle;
15use tracing::warn;
16
17#[derive(Clone, Debug)]
18pub struct Spec {
19 pub program: OsString,
20 pub args: Vec<OsString>,
21 pub env: Vec<(OsString, OsString)>,
22 pub cwd: Option<PathBuf>,
23 pub timeout: Option<Duration>,
24 pub uid: Option<u32>,
25 pub gid: Option<u32>,
26}
27
28impl Spec {
29 pub fn new(program: impl Into<OsString>) -> Self {
30 Self {
31 program: program.into(),
32 args: Vec::new(),
33 env: Vec::new(),
34 cwd: None,
35 timeout: None,
36 uid: None,
37 gid: None,
38 }
39 }
40
41 pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
42 self.args.push(arg.into());
43 self
44 }
45
46 pub fn args<I, S>(mut self, args: I) -> Self
47 where
48 I: IntoIterator<Item = S>,
49 S: Into<OsString>,
50 {
51 self.args.extend(args.into_iter().map(Into::into));
52 self
53 }
54
55 pub fn envs<I, K, V>(mut self, env: I) -> Self
56 where
57 I: IntoIterator<Item = (K, V)>,
58 K: Into<OsString>,
59 V: Into<OsString>,
60 {
61 self.env.extend(
62 env.into_iter()
63 .map(|(key, value)| (key.into(), value.into())),
64 );
65 self
66 }
67
68 pub fn cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
69 self.cwd = Some(cwd.into());
70 self
71 }
72
73 pub fn timeout(mut self, timeout: Duration) -> Self {
74 self.timeout = Some(timeout);
75 self
76 }
77
78 pub fn run_as(mut self, uid: u32, gid: u32) -> Self {
79 self.uid = Some(uid);
80 self.gid = Some(gid);
81 self
82 }
83}
84
85#[derive(Clone, Debug)]
86pub struct ExitResult {
87 pub exit_code: i32,
88 pub error: Option<String>,
89 pub timed_out: bool,
90}
91
92#[derive(Clone, Debug)]
93pub struct CaptureOutput {
94 pub exit: ExitResult,
95 pub stdout: Vec<u8>,
96 pub stderr: Vec<u8>,
97}
98
99impl CaptureOutput {
100 pub fn success(&self) -> bool {
101 self.exit.exit_code == 0 && self.exit.error.is_none()
102 }
103
104 pub fn combined_lossy(&self) -> String {
105 let mut data = self.stdout.clone();
106 data.extend_from_slice(&self.stderr);
107 String::from_utf8_lossy(&data).trim().to_owned()
108 }
109}
110
111#[derive(Clone, Copy, Debug)]
112pub enum OutKind {
113 Stdout,
114 Stderr,
115}
116
117#[derive(Clone, Debug)]
118pub struct OutData {
119 pub data: Vec<u8>,
120 pub kind: OutKind,
121}
122
123pub struct StreamingCommand {
124 events: Receiver<OutData>,
125 exit: JoinHandle<Result<ExitResult>>,
126}
127
128impl StreamingCommand {
129 pub fn into_parts(self) -> (Receiver<OutData>, JoinHandle<Result<ExitResult>>) {
130 (self.events, self.exit)
131 }
132}
133
134pub async fn run_capture(spec: Spec) -> Result<CaptureOutput> {
135 let running = spawn_streaming(spec)?;
136 let mut stdout = Vec::new();
137 let mut stderr = Vec::new();
138 let (mut events, exit_task) = running.into_parts();
139
140 while let Some(event) = events.recv().await {
141 match event.kind {
142 OutKind::Stdout => stdout.extend_from_slice(&event.data),
143 OutKind::Stderr => stderr.extend_from_slice(&event.data),
144 }
145 }
146
147 let exit = exit_task
148 .await
149 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}")))?;
150 Ok(CaptureOutput {
151 exit,
152 stdout,
153 stderr,
154 })
155}
156
157pub fn spawn_streaming(mut spec: Spec) -> Result<StreamingCommand> {
158 let mut child = spawn(&mut spec)?;
159 let stdout = child.stdout.take().context("stdout pipe missing")?;
160 let stderr = child.stderr.take().context("stderr pipe missing")?;
161
162 let (events_tx, events_rx) = mpsc::channel(64);
163 let stdout_thread = spawn_reader(stdout, events_tx.clone(), OutKind::Stdout);
164 let stderr_thread = spawn_reader(stderr, events_tx.clone(), OutKind::Stderr);
165 drop(events_tx);
166
167 let exit = tokio::spawn(async move {
168 let exit = wait_child(&mut child, spec.timeout).await;
169
170 // ensure all output is observed before exiting
171 // this assumes children dont daemonize and hold onto the stdout/err
172 stdout_thread.await.context("stdout reader task failed")?;
173 stderr_thread.await.context("stderr reader task failed")?;
174
175 Ok(exit)
176 });
177
178 Ok(StreamingCommand {
179 events: events_rx,
180 exit,
181 })
182}
183
184fn spawn(spec: &mut Spec) -> Result<Child> {
185 let mut cmd = Command::new(&spec.program);
186 cmd.args(&spec.args)
187 .envs(spec.env.iter().map(|(key, value)| (key, value)))
188 .stdout(Stdio::piped())
189 .stderr(Stdio::piped());
190
191 if let Some(cwd) = &spec.cwd {
192 cmd.current_dir(cwd);
193 }
194
195 // don't use rust's .uid() / .gid() methods here because they clear
196 // supplemantary groups, which means for example adding a user to "docker"
197 // group won't actually let it access the sock.
198 // https://github.com/rust-lang/rust/issues/90747
199 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) {
200 let groups = resolve_supplementary_groups(uid, gid)?;
201 // SAFETY: pre_exec runs between fork and execve in the child.
202 // we only call async-signal-safe syscalls and we don't touch any
203 // shared state, no allocator, no mutexes, no globals.
204 unsafe {
205 cmd.pre_exec(move || {
206 setgroups(&groups).map_err(io::Error::from)?;
207 setgid(Gid::from_raw(gid)).map_err(io::Error::from)?;
208 setuid(Uid::from_raw(uid)).map_err(io::Error::from)?;
209 Ok(())
210 });
211 }
212 }
213
214 // allow us to kill this whole process tree on deadline
215 cmd.process_group(0);
216
217 cmd.spawn()
218 .with_context(|| format!("spawn {:?}", &spec.program))
219}
220
221// resolve the supplementary group list up front so the pre_exec hook never has
222// to read /etc/group (which is not async-signal-safe) between fork and exec.
223fn resolve_supplementary_groups(uid: u32, gid: u32) -> Result<Vec<Gid>> {
224 let username = User::from_uid(Uid::from_raw(uid))
225 .ok()
226 .flatten()
227 .map(|u| u.name)
228 .with_context(|| format!("lookup passwd entry for uid {uid}"))?;
229 let cname = CString::new(username)
230 .with_context(|| format!("username for uid {uid} contained a null byte"))?;
231 getgrouplist(&cname, Gid::from_raw(gid)).context("resolve supplementary groups")
232}
233
234pub fn spawn_pty(spec: Spec, rows: u16, cols: u16) -> Result<(OwnedReadPty, OwnedWritePty, Child)> {
235 let (pty, pts) = pty_process::open().context("open pty")?;
236 pty.resize(Size::new(rows, cols)).context("set pty size")?;
237
238 let mut cmd = PtyCommand::new(&spec.program)
239 .args(&spec.args)
240 .envs(spec.env.iter().map(|(key, value)| (key, value)));
241 if let Some(cwd) = &spec.cwd {
242 cmd = cmd.current_dir(cwd);
243 }
244
245 // drop privileges in the child. this RELIES on pty-process composing our
246 // pre_exec hook *after* its own session setup: it wraps us as `move || {
247 // session_leader()?; ours()?; }`, so setsid + TIOCSCTTY run first (while
248 // still privileged) and only then do we drop to the workflow user. that
249 // ordering is what we want and we depend on it. if pty-process ever ran our
250 // hook first, the session setup would happen post-drop. (it'd likely still
251 // work, since setsid/TIOCSCTTY on our own pty need no privilege, but it is
252 // not the behaviour we're assuming here)
253 // don't use .uid()/.gid() here, they clear supplementary groups (see L195).
254 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) {
255 let groups = resolve_supplementary_groups(uid, gid)?;
256 // SAFETY: pre_exec runs between fork and execve in the child. every call
257 // below is async-signal-safe and touches no shared state.
258 cmd = unsafe {
259 cmd.pre_exec(move || {
260 setgroups(&groups).map_err(io::Error::from)?;
261 setgid(Gid::from_raw(gid)).map_err(io::Error::from)?;
262 setuid(Uid::from_raw(uid)).map_err(io::Error::from)?;
263 Ok(())
264 })
265 };
266 }
267
268 // spawn consumes the slave (dup'd onto the child's 0/1/2 and then closed in
269 // the parent), so the master reports EOF once the shell and all its children
270 // have exited.
271 let child = cmd
272 .spawn(pts)
273 .with_context(|| format!("spawn pty shell {:?}", &spec.program))?;
274
275 let (reader, writer) = pty.into_split();
276 Ok((reader, writer, child))
277}
278
279async fn wait_child(child: &mut Child, timeout: Option<Duration>) -> ExitResult {
280 let wait = child.wait();
281 let status = match timeout {
282 Some(timeout) => match tokio::time::timeout(timeout, wait).await {
283 Ok(status) => status,
284 Err(_) => {
285 if let Some(pid) = child.id()
286 && let Err(error) = kill(Pid::from_raw(-(pid as i32)), Signal::SIGKILL)
287 {
288 warn!(pid, %error, "failed to kill process group");
289 }
290 let _ = child.wait().await;
291 return ExitResult {
292 exit_code: 124,
293 error: Some("command timed out".to_owned()),
294 timed_out: true,
295 };
296 }
297 },
298 None => wait.await,
299 };
300
301 match status {
302 Ok(status) => ExitResult {
303 exit_code: status
304 .code()
305 .or_else(|| status.signal().map(|signal| 128 + signal))
306 .unwrap_or(1),
307 error: None,
308 timed_out: false,
309 },
310 Err(error) => ExitResult {
311 exit_code: 1,
312 error: Some(error.to_string()),
313 timed_out: false,
314 },
315 }
316}
317
318fn spawn_reader(
319 mut reader: impl AsyncRead + Unpin + Send + 'static,
320 events: Sender<OutData>,
321 kind: OutKind,
322) -> JoinHandle<()> {
323 tokio::spawn(async move {
324 let mut buf = [0_u8; 32 * 1024];
325 loop {
326 match reader.read(&mut buf).await {
327 Ok(0) => return,
328 Ok(n) => {
329 let event = OutData {
330 data: buf[..n].to_vec(),
331 kind,
332 };
333 if events.send(event).await.is_err() {
334 return;
335 }
336 }
337 Err(error) => {
338 warn!(%error, "failed to read command stream");
339 return;
340 }
341 }
342 }
343 })
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use tokio::io::AsyncReadExt;
350
351 #[tokio::test]
352 async fn pty_runs_a_shell_and_reports_exit() {
353 let spec = Spec::new("/bin/sh")
354 .arg("-c")
355 .arg("printf 'hello pty'; exit 7");
356 let (mut reader, _writer, mut child) = spawn_pty(spec, 24, 80).expect("spawn pty");
357
358 let mut output = Vec::new();
359 let mut chunk = [0u8; 1024];
360 loop {
361 match reader.read(&mut chunk).await {
362 Ok(0) => break,
363 Ok(n) => output.extend_from_slice(&chunk[..n]),
364 // linux signals slave-closed with EIO rather than EOF
365 Err(error) if error.raw_os_error() == Some(nix::libc::EIO) => break,
366 Err(error) => panic!("read pty master: {error}"),
367 }
368 }
369
370 let status = child.wait().await.expect("wait child");
371 let text = String::from_utf8_lossy(&output);
372 assert!(text.contains("hello pty"), "unexpected output: {text:?}");
373 assert_eq!(status.code(), Some(7));
374 }
375
376 #[tokio::test]
377 async fn pty_resize_succeeds() {
378 let spec = Spec::new("/bin/sh").arg("-c").arg("sleep 0.2");
379 let (_reader, writer, mut child) = spawn_pty(spec, 24, 80).expect("spawn pty");
380 writer.resize(Size::new(40, 120)).expect("resize");
381 let _ = child.wait().await;
382 }
383}