Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::command::{self, OutKind, Spec}; 2use crate::protocol::{self, Message, v1}; 3use nix::unistd::{Group, User}; 4use std::ffi::OsString; 5use std::time::Duration; 6use tokio::sync::mpsc::Sender; 7use tracing::info; 8 9const DEFAULT_USER: &str = "spindle-workflow"; 10 11pub async fn run(id: String, req: v1::ExecStart, out: Sender<Message>) { 12 let send_exit = async |exit_code: i32, error: Option<String>, timed_out: bool| { 13 let msg = Message { 14 id: id.clone(), 15 exec_exit: Some(v1::ExecExit { 16 exit_code, 17 error: protocol::error_or_empty(error), 18 timed_out, 19 }), 20 ..Default::default() 21 }; 22 let _ = out.send(msg).await; 23 }; 24 25 if req.argv.is_empty() { 26 send_exit(127, Some("missing argv".to_owned()), false).await; 27 return; 28 } 29 30 let user = if req.user.is_empty() { 31 DEFAULT_USER 32 } else { 33 req.user.as_str() 34 }; 35 let run_as = match resolve_user(user) { 36 Ok(run_as) => run_as, 37 Err(err) => { 38 send_exit(127, Some(err), false).await; 39 return; 40 } 41 }; 42 43 let mut spec = Spec::new(req.argv[0].clone()) 44 .args(req.argv[1..].iter().cloned()) 45 .envs(parse_env(&req.env)) 46 .run_as(run_as.uid, run_as.gid); 47 if !req.cwd.is_empty() { 48 spec = spec.cwd(req.cwd.clone()); 49 } 50 let timeout = 51 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds))); 52 if let Some(timeout) = timeout { 53 spec = spec.timeout(timeout); 54 } 55 56 info!( 57 %id, 58 user = %run_as.name, 59 uid = run_as.uid, 60 gid = run_as.gid, 61 argv = ?req.argv, 62 cwd = ?req.cwd, 63 "starting exec" 64 ); 65 66 let cmd = match command::spawn_streaming(spec) { 67 Ok(cmd) => cmd, 68 Err(err) => { 69 send_exit(127, Some(err.to_string()), false).await; 70 return; 71 } 72 }; 73 let (mut events, exit_task) = cmd.into_parts(); 74 while let Some(event) = events.recv().await { 75 let data = String::from_utf8_lossy(&event.data).into_owned(); 76 let output = match event.kind { 77 OutKind::Stdout => Message { 78 id: id.clone(), 79 exec_stdout: Some(v1::ExecStdout { data }), 80 ..Default::default() 81 }, 82 OutKind::Stderr => Message { 83 id: id.clone(), 84 exec_stderr: Some(v1::ExecStderr { data }), 85 ..Default::default() 86 }, 87 }; 88 let _ = out.send(output).await; 89 } 90 let exit = match exit_task 91 .await 92 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}"))) 93 { 94 Ok(exit) => exit, 95 Err(err) => { 96 send_exit(127, Some(err.to_string()), false).await; 97 return; 98 } 99 }; 100 101 send_exit(exit.exit_code, exit.error, exit.timed_out).await 102} 103 104#[derive(Clone, Debug)] 105struct ResolvedUser { 106 name: String, 107 uid: u32, 108 gid: u32, 109} 110 111fn resolve_user(spec: &str) -> Result<ResolvedUser, String> { 112 let spec = spec.trim(); 113 if spec.is_empty() { 114 return resolve_user(DEFAULT_USER); 115 } 116 117 let (user_part, group_part) = spec 118 .split_once(':') 119 .map(|(user, group)| (user, Some(group))) 120 .unwrap_or((spec, None)); 121 122 let mut user = lookup_user(user_part)?; 123 if let Some(group) = group_part.filter(|group| !group.is_empty()) { 124 user.gid = lookup_group(group)?; 125 } 126 127 if user.uid == 0 || user.gid == 0 { 128 return Err(format!("refusing to run exec as privileged user {spec:?}")); 129 } 130 131 Ok(user) 132} 133 134fn lookup_user(name: &str) -> Result<ResolvedUser, String> { 135 match User::from_name(name) { 136 Ok(Some(user)) => Ok(ResolvedUser { 137 name: name.to_owned(), 138 uid: user.uid.as_raw(), 139 gid: user.gid.as_raw(), 140 }), 141 Ok(None) => { 142 let uid = name 143 .parse::<u32>() 144 .map_err(|_| format!("workflow user {name:?} was not found"))?; 145 Ok(ResolvedUser { 146 name: name.to_owned(), 147 uid, 148 gid: uid, 149 }) 150 } 151 Err(error) => Err(format!("lookup workflow user {name:?}: {error}")), 152 } 153} 154 155fn lookup_group(name: &str) -> Result<u32, String> { 156 match Group::from_name(name) { 157 Ok(Some(group)) => Ok(group.gid.as_raw()), 158 Ok(None) => name 159 .parse::<u32>() 160 .map_err(|_| format!("workflow group {name:?} was not found")), 161 Err(error) => Err(format!("lookup workflow group {name:?}: {error}")), 162 } 163} 164 165fn parse_env(values: &[String]) -> Vec<(OsString, OsString)> { 166 values 167 .iter() 168 .filter_map(|value| value.split_once('=')) 169 .map(|(key, value)| (OsString::from(key), OsString::from(value))) 170 .collect() 171} 172 173#[cfg(test)] 174mod tests { 175 use super::*; 176 177 #[test] 178 fn refuses_root_exec_user() { 179 let err = resolve_user("root").unwrap_err(); 180 assert!(err.contains("refusing to run exec as privileged user")); 181 } 182 183 #[test] 184 fn refuses_root_exec_group() { 185 let err = resolve_user("65534:0").unwrap_err(); 186 assert!(err.contains("refusing to run exec as privileged user")); 187 } 188}