Monorepo for Tangled tangled.org
12

Configure Feed

Select the types of activity you want to include in your feed.

1use anyhow::{Context, Result}; 2use nix::sys::signal::{Signal, kill}; 3use nix::unistd::{Gid, Pid, Uid, User, getgrouplist, setgid, setgroups, setuid}; 4use pty_process::{Command as PtyCommand, OwnedReadPty, OwnedWritePty, Size}; 5use std::ffi::{CString, OsString}; 6use std::io; 7use std::os::unix::process::ExitStatusExt; 8use std::path::PathBuf; 9use std::process::Stdio; 10use std::time::Duration; 11use tokio::io::{AsyncRead, AsyncReadExt}; 12use tokio::process::{Child, Command}; 13use tokio::sync::mpsc::{self, Receiver, Sender}; 14use tokio::task::JoinHandle; 15use tracing::warn; 16 17#[derive(Clone, Debug)] 18pub struct Spec { 19 pub program: OsString, 20 pub args: Vec<OsString>, 21 pub env: Vec<(OsString, OsString)>, 22 pub cwd: Option<PathBuf>, 23 pub timeout: Option<Duration>, 24 pub uid: Option<u32>, 25 pub gid: Option<u32>, 26} 27 28impl Spec { 29 pub fn new(program: impl Into<OsString>) -> Self { 30 Self { 31 program: program.into(), 32 args: Vec::new(), 33 env: Vec::new(), 34 cwd: None, 35 timeout: None, 36 uid: None, 37 gid: None, 38 } 39 } 40 41 pub fn arg(mut self, arg: impl Into<OsString>) -> Self { 42 self.args.push(arg.into()); 43 self 44 } 45 46 pub fn args<I, S>(mut self, args: I) -> Self 47 where 48 I: IntoIterator<Item = S>, 49 S: Into<OsString>, 50 { 51 self.args.extend(args.into_iter().map(Into::into)); 52 self 53 } 54 55 pub fn envs<I, K, V>(mut self, env: I) -> Self 56 where 57 I: IntoIterator<Item = (K, V)>, 58 K: Into<OsString>, 59 V: Into<OsString>, 60 { 61 self.env.extend( 62 env.into_iter() 63 .map(|(key, value)| (key.into(), value.into())), 64 ); 65 self 66 } 67 68 pub fn cwd(mut self, cwd: impl Into<PathBuf>) -> Self { 69 self.cwd = Some(cwd.into()); 70 self 71 } 72 73 pub fn timeout(mut self, timeout: Duration) -> Self { 74 self.timeout = Some(timeout); 75 self 76 } 77 78 pub fn run_as(mut self, uid: u32, gid: u32) -> Self { 79 self.uid = Some(uid); 80 self.gid = Some(gid); 81 self 82 } 83} 84 85#[derive(Clone, Debug)] 86pub struct ExitResult { 87 pub exit_code: i32, 88 pub error: Option<String>, 89 pub timed_out: bool, 90} 91 92#[derive(Clone, Debug)] 93pub struct CaptureOutput { 94 pub exit: ExitResult, 95 pub stdout: Vec<u8>, 96 pub stderr: Vec<u8>, 97} 98 99impl CaptureOutput { 100 pub fn success(&self) -> bool { 101 self.exit.exit_code == 0 && self.exit.error.is_none() 102 } 103 104 pub fn combined_lossy(&self) -> String { 105 let mut data = self.stdout.clone(); 106 data.extend_from_slice(&self.stderr); 107 String::from_utf8_lossy(&data).trim().to_owned() 108 } 109} 110 111#[derive(Clone, Copy, Debug)] 112pub enum OutKind { 113 Stdout, 114 Stderr, 115} 116 117#[derive(Clone, Debug)] 118pub struct OutData { 119 pub data: Vec<u8>, 120 pub kind: OutKind, 121} 122 123pub struct StreamingCommand { 124 events: Receiver<OutData>, 125 exit: JoinHandle<Result<ExitResult>>, 126} 127 128impl StreamingCommand { 129 pub fn into_parts(self) -> (Receiver<OutData>, JoinHandle<Result<ExitResult>>) { 130 (self.events, self.exit) 131 } 132} 133 134pub async fn run_capture(spec: Spec) -> Result<CaptureOutput> { 135 let running = spawn_streaming(spec)?; 136 let mut stdout = Vec::new(); 137 let mut stderr = Vec::new(); 138 let (mut events, exit_task) = running.into_parts(); 139 140 while let Some(event) = events.recv().await { 141 match event.kind { 142 OutKind::Stdout => stdout.extend_from_slice(&event.data), 143 OutKind::Stderr => stderr.extend_from_slice(&event.data), 144 } 145 } 146 147 let exit = exit_task 148 .await 149 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}")))?; 150 Ok(CaptureOutput { 151 exit, 152 stdout, 153 stderr, 154 }) 155} 156 157pub fn spawn_streaming(mut spec: Spec) -> Result<StreamingCommand> { 158 let mut child = spawn(&mut spec)?; 159 let stdout = child.stdout.take().context("stdout pipe missing")?; 160 let stderr = child.stderr.take().context("stderr pipe missing")?; 161 162 let (events_tx, events_rx) = mpsc::channel(64); 163 let stdout_thread = spawn_reader(stdout, events_tx.clone(), OutKind::Stdout); 164 let stderr_thread = spawn_reader(stderr, events_tx.clone(), OutKind::Stderr); 165 drop(events_tx); 166 167 let exit = tokio::spawn(async move { 168 let exit = wait_child(&mut child, spec.timeout).await; 169 170 // ensure all output is observed before exiting 171 // this assumes children dont daemonize and hold onto the stdout/err 172 stdout_thread.await.context("stdout reader task failed")?; 173 stderr_thread.await.context("stderr reader task failed")?; 174 175 Ok(exit) 176 }); 177 178 Ok(StreamingCommand { 179 events: events_rx, 180 exit, 181 }) 182} 183 184fn spawn(spec: &mut Spec) -> Result<Child> { 185 let mut cmd = Command::new(&spec.program); 186 cmd.args(&spec.args) 187 .envs(spec.env.iter().map(|(key, value)| (key, value))) 188 .stdout(Stdio::piped()) 189 .stderr(Stdio::piped()); 190 191 if let Some(cwd) = &spec.cwd { 192 cmd.current_dir(cwd); 193 } 194 195 // don't use rust's .uid() / .gid() methods here because they clear 196 // supplemantary groups, which means for example adding a user to "docker" 197 // group won't actually let it access the sock. 198 // https://github.com/rust-lang/rust/issues/90747 199 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) { 200 let groups = resolve_supplementary_groups(uid, gid)?; 201 // SAFETY: pre_exec runs between fork and execve in the child. 202 // we only call async-signal-safe syscalls and we don't touch any 203 // shared state, no allocator, no mutexes, no globals. 204 unsafe { 205 cmd.pre_exec(move || { 206 setgroups(&groups).map_err(io::Error::from)?; 207 setgid(Gid::from_raw(gid)).map_err(io::Error::from)?; 208 setuid(Uid::from_raw(uid)).map_err(io::Error::from)?; 209 Ok(()) 210 }); 211 } 212 } 213 214 // allow us to kill this whole process tree on deadline 215 cmd.process_group(0); 216 217 cmd.spawn() 218 .with_context(|| format!("spawn {:?}", &spec.program)) 219} 220 221// resolve the supplementary group list up front so the pre_exec hook never has 222// to read /etc/group (which is not async-signal-safe) between fork and exec. 223fn resolve_supplementary_groups(uid: u32, gid: u32) -> Result<Vec<Gid>> { 224 let username = User::from_uid(Uid::from_raw(uid)) 225 .ok() 226 .flatten() 227 .map(|u| u.name) 228 .with_context(|| format!("lookup passwd entry for uid {uid}"))?; 229 let cname = CString::new(username) 230 .with_context(|| format!("username for uid {uid} contained a null byte"))?; 231 getgrouplist(&cname, Gid::from_raw(gid)).context("resolve supplementary groups") 232} 233 234pub fn spawn_pty(spec: Spec, rows: u16, cols: u16) -> Result<(OwnedReadPty, OwnedWritePty, Child)> { 235 let (pty, pts) = pty_process::open().context("open pty")?; 236 pty.resize(Size::new(rows, cols)).context("set pty size")?; 237 238 let mut cmd = PtyCommand::new(&spec.program) 239 .args(&spec.args) 240 .envs(spec.env.iter().map(|(key, value)| (key, value))); 241 if let Some(cwd) = &spec.cwd { 242 cmd = cmd.current_dir(cwd); 243 } 244 245 // drop privileges in the child. this RELIES on pty-process composing our 246 // pre_exec hook *after* its own session setup: it wraps us as `move || { 247 // session_leader()?; ours()?; }`, so setsid + TIOCSCTTY run first (while 248 // still privileged) and only then do we drop to the workflow user. that 249 // ordering is what we want and we depend on it. if pty-process ever ran our 250 // hook first, the session setup would happen post-drop. (it'd likely still 251 // work, since setsid/TIOCSCTTY on our own pty need no privilege, but it is 252 // not the behaviour we're assuming here) 253 // don't use .uid()/.gid() here, they clear supplementary groups (see L195). 254 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) { 255 let groups = resolve_supplementary_groups(uid, gid)?; 256 // SAFETY: pre_exec runs between fork and execve in the child. every call 257 // below is async-signal-safe and touches no shared state. 258 cmd = unsafe { 259 cmd.pre_exec(move || { 260 setgroups(&groups).map_err(io::Error::from)?; 261 setgid(Gid::from_raw(gid)).map_err(io::Error::from)?; 262 setuid(Uid::from_raw(uid)).map_err(io::Error::from)?; 263 Ok(()) 264 }) 265 }; 266 } 267 268 // spawn consumes the slave (dup'd onto the child's 0/1/2 and then closed in 269 // the parent), so the master reports EOF once the shell and all its children 270 // have exited. 271 let child = cmd 272 .spawn(pts) 273 .with_context(|| format!("spawn pty shell {:?}", &spec.program))?; 274 275 let (reader, writer) = pty.into_split(); 276 Ok((reader, writer, child)) 277} 278 279async fn wait_child(child: &mut Child, timeout: Option<Duration>) -> ExitResult { 280 let wait = child.wait(); 281 let status = match timeout { 282 Some(timeout) => match tokio::time::timeout(timeout, wait).await { 283 Ok(status) => status, 284 Err(_) => { 285 if let Some(pid) = child.id() 286 && let Err(error) = kill(Pid::from_raw(-(pid as i32)), Signal::SIGKILL) 287 { 288 warn!(pid, %error, "failed to kill process group"); 289 } 290 let _ = child.wait().await; 291 return ExitResult { 292 exit_code: 124, 293 error: Some("command timed out".to_owned()), 294 timed_out: true, 295 }; 296 } 297 }, 298 None => wait.await, 299 }; 300 301 match status { 302 Ok(status) => ExitResult { 303 exit_code: status 304 .code() 305 .or_else(|| status.signal().map(|signal| 128 + signal)) 306 .unwrap_or(1), 307 error: None, 308 timed_out: false, 309 }, 310 Err(error) => ExitResult { 311 exit_code: 1, 312 error: Some(error.to_string()), 313 timed_out: false, 314 }, 315 } 316} 317 318fn spawn_reader( 319 mut reader: impl AsyncRead + Unpin + Send + 'static, 320 events: Sender<OutData>, 321 kind: OutKind, 322) -> JoinHandle<()> { 323 tokio::spawn(async move { 324 let mut buf = [0_u8; 32 * 1024]; 325 loop { 326 match reader.read(&mut buf).await { 327 Ok(0) => return, 328 Ok(n) => { 329 let event = OutData { 330 data: buf[..n].to_vec(), 331 kind, 332 }; 333 if events.send(event).await.is_err() { 334 return; 335 } 336 } 337 Err(error) => { 338 warn!(%error, "failed to read command stream"); 339 return; 340 } 341 } 342 } 343 }) 344} 345 346#[cfg(test)] 347mod tests { 348 use super::*; 349 use tokio::io::AsyncReadExt; 350 351 #[tokio::test] 352 async fn pty_runs_a_shell_and_reports_exit() { 353 let spec = Spec::new("/bin/sh") 354 .arg("-c") 355 .arg("printf 'hello pty'; exit 7"); 356 let (mut reader, _writer, mut child) = spawn_pty(spec, 24, 80).expect("spawn pty"); 357 358 let mut output = Vec::new(); 359 let mut chunk = [0u8; 1024]; 360 loop { 361 match reader.read(&mut chunk).await { 362 Ok(0) => break, 363 Ok(n) => output.extend_from_slice(&chunk[..n]), 364 // linux signals slave-closed with EIO rather than EOF 365 Err(error) if error.raw_os_error() == Some(nix::libc::EIO) => break, 366 Err(error) => panic!("read pty master: {error}"), 367 } 368 } 369 370 let status = child.wait().await.expect("wait child"); 371 let text = String::from_utf8_lossy(&output); 372 assert!(text.contains("hello pty"), "unexpected output: {text:?}"); 373 assert_eq!(status.code(), Some(7)); 374 } 375 376 #[tokio::test] 377 async fn pty_resize_succeeds() { 378 let spec = Spec::new("/bin/sh").arg("-c").arg("sleep 0.2"); 379 let (_reader, writer, mut child) = spawn_pty(spec, 24, 80).expect("spawn pty"); 380 writer.resize(Size::new(40, 120)).expect("resize"); 381 let _ = child.wait().await; 382 } 383}