Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1use anyhow::{Context, Result}; 2use nix::sys::signal::{Signal, kill}; 3use nix::unistd::{Gid, Pid, Uid, User, getgrouplist, setgid, setgroups, setuid}; 4use std::ffi::{CString, OsStr, OsString}; 5use std::io; 6use std::os::unix::process::ExitStatusExt; 7use std::path::PathBuf; 8use std::process::Stdio; 9use std::time::Duration; 10use tokio::io::{AsyncRead, AsyncReadExt}; 11use tokio::process::{Child, Command}; 12use tokio::sync::mpsc::{self, Receiver, Sender}; 13use tokio::task::JoinHandle; 14use tracing::warn; 15 16#[derive(Clone, Debug)] 17pub struct Spec { 18 pub program: OsString, 19 pub args: Vec<OsString>, 20 pub env: Vec<(OsString, OsString)>, 21 pub cwd: Option<PathBuf>, 22 pub timeout: Option<Duration>, 23 pub uid: Option<u32>, 24 pub gid: Option<u32>, 25} 26 27impl Spec { 28 pub fn new(program: impl Into<OsString>) -> Self { 29 Self { 30 program: program.into(), 31 args: Vec::new(), 32 env: Vec::new(), 33 cwd: None, 34 timeout: None, 35 uid: None, 36 gid: None, 37 } 38 } 39 40 pub fn arg(mut self, arg: impl Into<OsString>) -> Self { 41 self.args.push(arg.into()); 42 self 43 } 44 45 pub fn args<I, S>(mut self, args: I) -> Self 46 where 47 I: IntoIterator<Item = S>, 48 S: Into<OsString>, 49 { 50 self.args.extend(args.into_iter().map(Into::into)); 51 self 52 } 53 54 pub fn envs<I, K, V>(mut self, env: I) -> Self 55 where 56 I: IntoIterator<Item = (K, V)>, 57 K: Into<OsString>, 58 V: Into<OsString>, 59 { 60 self.env.extend( 61 env.into_iter() 62 .map(|(key, value)| (key.into(), value.into())), 63 ); 64 self 65 } 66 67 pub fn cwd(mut self, cwd: impl Into<PathBuf>) -> Self { 68 self.cwd = Some(cwd.into()); 69 self 70 } 71 72 pub fn timeout(mut self, timeout: Duration) -> Self { 73 self.timeout = Some(timeout); 74 self 75 } 76 77 pub fn run_as(mut self, uid: u32, gid: u32) -> Self { 78 self.uid = Some(uid); 79 self.gid = Some(gid); 80 self 81 } 82} 83 84#[derive(Clone, Debug)] 85pub struct ExitResult { 86 pub exit_code: i32, 87 pub error: Option<String>, 88 pub timed_out: bool, 89} 90 91#[derive(Clone, Debug)] 92pub struct CaptureOutput { 93 pub exit: ExitResult, 94 pub stdout: Vec<u8>, 95 pub stderr: Vec<u8>, 96} 97 98impl CaptureOutput { 99 pub fn success(&self) -> bool { 100 self.exit.exit_code == 0 && self.exit.error.is_none() 101 } 102 103 pub fn combined_lossy(&self) -> String { 104 let mut data = self.stdout.clone(); 105 data.extend_from_slice(&self.stderr); 106 String::from_utf8_lossy(&data).trim().to_owned() 107 } 108} 109 110#[derive(Clone, Copy, Debug)] 111pub enum OutKind { 112 Stdout, 113 Stderr, 114} 115 116#[derive(Clone, Debug)] 117pub struct OutData { 118 pub data: Vec<u8>, 119 pub kind: OutKind, 120} 121 122pub struct StreamingCommand { 123 events: Receiver<OutData>, 124 exit: JoinHandle<Result<ExitResult>>, 125} 126 127impl StreamingCommand { 128 pub fn into_parts(self) -> (Receiver<OutData>, JoinHandle<Result<ExitResult>>) { 129 (self.events, self.exit) 130 } 131} 132 133pub async fn run_capture(spec: Spec) -> Result<CaptureOutput> { 134 let running = spawn_streaming(spec)?; 135 let mut stdout = Vec::new(); 136 let mut stderr = Vec::new(); 137 let (mut events, exit_task) = running.into_parts(); 138 139 while let Some(event) = events.recv().await { 140 match event.kind { 141 OutKind::Stdout => stdout.extend_from_slice(&event.data), 142 OutKind::Stderr => stderr.extend_from_slice(&event.data), 143 } 144 } 145 146 let exit = exit_task 147 .await 148 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}")))?; 149 Ok(CaptureOutput { 150 exit, 151 stdout, 152 stderr, 153 }) 154} 155 156pub fn spawn_streaming(mut spec: Spec) -> Result<StreamingCommand> { 157 let mut child = spawn(&mut spec)?; 158 let stdout = child.stdout.take().context("stdout pipe missing")?; 159 let stderr = child.stderr.take().context("stderr pipe missing")?; 160 161 let (events_tx, events_rx) = mpsc::channel(64); 162 let stdout_thread = spawn_reader(stdout, events_tx.clone(), OutKind::Stdout); 163 let stderr_thread = spawn_reader(stderr, events_tx.clone(), OutKind::Stderr); 164 drop(events_tx); 165 166 let exit = tokio::spawn(async move { 167 let exit = wait_child(&mut child, spec.timeout).await; 168 169 // ensure all output is observed before exiting 170 // this assumes children dont daemonize and hold onto the stdout/err 171 stdout_thread.await.context("stdout reader task failed")?; 172 stderr_thread.await.context("stderr reader task failed")?; 173 174 Ok(exit) 175 }); 176 177 Ok(StreamingCommand { 178 events: events_rx, 179 exit, 180 }) 181} 182 183fn spawn(spec: &mut Spec) -> Result<Child> { 184 let mut cmd = Command::new(&spec.program); 185 cmd.args(&spec.args) 186 .envs(spec.env.iter().map(|(key, value)| (key, value))) 187 .stdout(Stdio::piped()) 188 .stderr(Stdio::piped()); 189 190 if let Some(cwd) = &spec.cwd { 191 cmd.current_dir(cwd); 192 } 193 194 // don't use rust's .uid() / .gid() methods here because they clear 195 // supplemantary groups, which means for example adding a user to "docker" 196 // group won't actually let it access the sock. 197 // https://github.com/rust-lang/rust/issues/90747 198 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) { 199 let username = User::from_uid(Uid::from_raw(uid)) 200 .ok() 201 .flatten() 202 .map(|u| u.name) 203 .with_context(|| format!("lookup passwd entry for uid {uid}"))?; 204 let cname = CString::new(username) 205 .with_context(|| format!("username for uid {uid} contained a null byte"))?; 206 // resolve groups beforehand so we don't have to read /etc/group in the pre_exec 207 let groups = 208 getgrouplist(&cname, Gid::from_raw(gid)).context("resolve supplementary groups")?; 209 // SAFETY: pre_exec runs between fork and execve in the child. 210 // we only call async-signal-safe syscalls and we don't touch any 211 // shared state, no allocator, no mutexes, no globals. 212 unsafe { 213 cmd.pre_exec(move || { 214 setgroups(&groups).map_err(io::Error::from)?; 215 setgid(Gid::from_raw(gid)).map_err(io::Error::from)?; 216 setuid(Uid::from_raw(uid)).map_err(io::Error::from)?; 217 Ok(()) 218 }); 219 } 220 } 221 222 // allow us to kill this whole process tree on deadline 223 cmd.process_group(0); 224 225 cmd.spawn() 226 .with_context(|| format!("spawn {}", display_os(&spec.program))) 227} 228 229async fn wait_child(child: &mut Child, timeout: Option<Duration>) -> ExitResult { 230 let wait = child.wait(); 231 let status = match timeout { 232 Some(timeout) => match tokio::time::timeout(timeout, wait).await { 233 Ok(status) => status, 234 Err(_) => { 235 if let Some(pid) = child.id() 236 && let Err(error) = kill(Pid::from_raw(-(pid as i32)), Signal::SIGKILL) 237 { 238 warn!(pid, %error, "failed to kill process group"); 239 } 240 let _ = child.wait().await; 241 return ExitResult { 242 exit_code: 124, 243 error: Some("command timed out".to_owned()), 244 timed_out: true, 245 }; 246 } 247 }, 248 None => wait.await, 249 }; 250 251 match status { 252 Ok(status) => ExitResult { 253 exit_code: status 254 .code() 255 .or_else(|| status.signal().map(|signal| 128 + signal)) 256 .unwrap_or(1), 257 error: None, 258 timed_out: false, 259 }, 260 Err(error) => ExitResult { 261 exit_code: 1, 262 error: Some(error.to_string()), 263 timed_out: false, 264 }, 265 } 266} 267 268fn spawn_reader( 269 mut reader: impl AsyncRead + Unpin + Send + 'static, 270 events: Sender<OutData>, 271 kind: OutKind, 272) -> JoinHandle<()> { 273 tokio::spawn(async move { 274 let mut buf = [0_u8; 32 * 1024]; 275 loop { 276 match reader.read(&mut buf).await { 277 Ok(0) => return, 278 Ok(n) => { 279 let event = OutData { 280 data: buf[..n].to_vec(), 281 kind, 282 }; 283 if events.send(event).await.is_err() { 284 return; 285 } 286 } 287 Err(error) => { 288 warn!(%error, "failed to read command stream"); 289 return; 290 } 291 } 292 } 293 }) 294} 295 296fn display_os(value: &OsStr) -> String { 297 value.to_string_lossy().into_owned() 298}