Monorepo for Tangled tangled.org
11

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::command::{self, OutKind, Spec}; 2use crate::protocol::{self, Message, v1}; 3use nix::unistd::{Group, User}; 4use std::ffi::OsString; 5use std::path::PathBuf; 6use std::time::Duration; 7use tokio::sync::mpsc::Sender; 8use tracing::{info, warn}; 9 10const DEFAULT_USER: &str = "spindle-workflow"; 11 12pub async fn run(id: String, req: v1::ExecStart, out: Sender<Message>) { 13 let send_exit = async |exit_code: i32, error: Option<String>, timed_out: bool| { 14 let msg = Message { 15 id: id.clone(), 16 exec_exit: Some(v1::ExecExit { 17 exit_code, 18 error: protocol::error_or_empty(error), 19 timed_out, 20 }), 21 ..Default::default() 22 }; 23 let _ = out.send(msg).await; 24 }; 25 26 if req.argv.is_empty() { 27 send_exit(127, Some("missing argv".to_owned()), false).await; 28 return; 29 } 30 31 let user = if req.user.is_empty() { 32 DEFAULT_USER 33 } else { 34 req.user.as_str() 35 }; 36 let run_as = match resolve_user(user) { 37 Ok(run_as) => run_as, 38 Err(err) => { 39 send_exit(127, Some(err), false).await; 40 return; 41 } 42 }; 43 44 let mut env = run_as.login_env(); 45 let runtime_dir = run_as.runtime_dir(); 46 match runtime_dir.try_exists() { 47 Ok(true) => env.push(( 48 OsString::from("XDG_RUNTIME_DIR"), 49 runtime_dir.into_os_string(), 50 )), 51 Ok(false) => {} 52 Err(err) => warn!(error = %err, "could not stat XDG_RUNTIME_DIR for workflow user"), 53 } 54 55 let mut spec = Spec::new(req.argv[0].clone()) 56 .args(req.argv[1..].iter().cloned()) 57 .envs(env) 58 .envs(parse_env(&req.env)) 59 .run_as(run_as.uid, run_as.gid); 60 if !req.cwd.is_empty() { 61 spec = spec.cwd(req.cwd.clone()); 62 } 63 let timeout = 64 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds))); 65 if let Some(timeout) = timeout { 66 spec = spec.timeout(timeout); 67 } 68 69 info!( 70 %id, 71 user = %run_as.name, 72 uid = run_as.uid, 73 gid = run_as.gid, 74 argv = ?req.argv, 75 cwd = ?req.cwd, 76 "starting exec" 77 ); 78 79 let cmd = match command::spawn_streaming(spec) { 80 Ok(cmd) => cmd, 81 Err(err) => { 82 send_exit(127, Some(err.to_string()), false).await; 83 return; 84 } 85 }; 86 let (mut events, exit_task) = cmd.into_parts(); 87 while let Some(event) = events.recv().await { 88 let data = String::from_utf8_lossy(&event.data).into_owned(); 89 let output = match event.kind { 90 OutKind::Stdout => Message { 91 id: id.clone(), 92 exec_stdout: Some(v1::ExecStdout { data }), 93 ..Default::default() 94 }, 95 OutKind::Stderr => Message { 96 id: id.clone(), 97 exec_stderr: Some(v1::ExecStderr { data }), 98 ..Default::default() 99 }, 100 }; 101 let _ = out.send(output).await; 102 } 103 let exit = match exit_task 104 .await 105 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}"))) 106 { 107 Ok(exit) => exit, 108 Err(err) => { 109 send_exit(127, Some(err.to_string()), false).await; 110 return; 111 } 112 }; 113 114 send_exit(exit.exit_code, exit.error, exit.timed_out).await 115} 116 117#[derive(Clone, Debug)] 118struct ResolvedUser { 119 name: String, 120 uid: u32, 121 gid: u32, 122 home: OsString, 123 shell: OsString, 124} 125 126impl ResolvedUser { 127 fn login_env(&self) -> Vec<(OsString, OsString)> { 128 vec![ 129 (OsString::from("USER"), OsString::from(&self.name)), 130 (OsString::from("LOGNAME"), OsString::from(&self.name)), 131 (OsString::from("HOME"), self.home.clone()), 132 (OsString::from("SHELL"), self.shell.clone()), 133 ] 134 } 135 136 fn runtime_dir(&self) -> PathBuf { 137 PathBuf::from(format!("/run/user/{}", self.uid)) 138 } 139} 140 141fn resolve_user(spec: &str) -> Result<ResolvedUser, String> { 142 let spec = spec.trim(); 143 if spec.is_empty() { 144 return resolve_user(DEFAULT_USER); 145 } 146 147 let (user_part, group_part) = spec 148 .split_once(':') 149 .map(|(user, group)| (user, Some(group))) 150 .unwrap_or((spec, None)); 151 152 let mut user = lookup_user(user_part)?; 153 if let Some(group) = group_part.filter(|group| !group.is_empty()) { 154 user.gid = lookup_group(group)?; 155 } 156 157 if user.uid == 0 || user.gid == 0 { 158 return Err(format!("refusing to run exec as privileged user {spec:?}")); 159 } 160 161 Ok(user) 162} 163 164fn lookup_user(name: &str) -> Result<ResolvedUser, String> { 165 match User::from_name(name) { 166 Ok(Some(user)) => Ok(ResolvedUser { 167 name: name.to_owned(), 168 uid: user.uid.as_raw(), 169 gid: user.gid.as_raw(), 170 home: user.dir.into_os_string(), 171 shell: user.shell.into_os_string(), 172 }), 173 Ok(None) => { 174 let uid = name 175 .parse::<u32>() 176 .map_err(|_| format!("workflow user {name:?} was not found"))?; 177 Ok(ResolvedUser { 178 name: name.to_owned(), 179 uid, 180 gid: uid, 181 home: OsString::from("/"), 182 shell: OsString::from("/bin/sh"), 183 }) 184 } 185 Err(error) => Err(format!("lookup workflow user {name:?}: {error}")), 186 } 187} 188 189fn lookup_group(name: &str) -> Result<u32, String> { 190 match Group::from_name(name) { 191 Ok(Some(group)) => Ok(group.gid.as_raw()), 192 Ok(None) => name 193 .parse::<u32>() 194 .map_err(|_| format!("workflow group {name:?} was not found")), 195 Err(error) => Err(format!("lookup workflow group {name:?}: {error}")), 196 } 197} 198 199fn parse_env(values: &[String]) -> Vec<(OsString, OsString)> { 200 values 201 .iter() 202 .filter_map(|value| value.split_once('=')) 203 .map(|(key, value)| (OsString::from(key), OsString::from(value))) 204 .collect() 205} 206 207#[cfg(test)] 208mod tests { 209 use super::*; 210 211 #[test] 212 fn refuses_root_exec_user() { 213 let err = resolve_user("root").unwrap_err(); 214 assert!(err.contains("refusing to run exec as privileged user")); 215 } 216 217 #[test] 218 fn refuses_root_exec_group() { 219 let err = resolve_user("65534:0").unwrap_err(); 220 assert!(err.contains("refusing to run exec as privileged user")); 221 } 222}