Monorepo for Tangled
tangled.org
1use crate::command::{self, OutKind, Spec};
2use crate::protocol::{self, Message, v1};
3use nix::unistd::{Group, User};
4use std::ffi::OsString;
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::sync::mpsc::Sender;
8use tracing::{info, warn};
9
10const DEFAULT_USER: &str = "spindle-workflow";
11
12pub async fn run(id: String, req: v1::ExecStart, out: Sender<Message>) {
13 let send_exit = async |exit_code: i32, error: Option<String>, timed_out: bool| {
14 let msg = Message {
15 id: id.clone(),
16 exec_exit: Some(v1::ExecExit {
17 exit_code,
18 error: protocol::error_or_empty(error),
19 timed_out,
20 }),
21 ..Default::default()
22 };
23 let _ = out.send(msg).await;
24 };
25
26 if req.argv.is_empty() {
27 send_exit(127, Some("missing argv".to_owned()), false).await;
28 return;
29 }
30
31 let user = if req.user.is_empty() {
32 DEFAULT_USER
33 } else {
34 req.user.as_str()
35 };
36 let run_as = match resolve_user(user) {
37 Ok(run_as) => run_as,
38 Err(err) => {
39 send_exit(127, Some(err), false).await;
40 return;
41 }
42 };
43
44 let mut env = run_as.login_env();
45 let runtime_dir = run_as.runtime_dir();
46 match runtime_dir.try_exists() {
47 Ok(true) => env.push((
48 OsString::from("XDG_RUNTIME_DIR"),
49 runtime_dir.into_os_string(),
50 )),
51 Ok(false) => {}
52 Err(err) => warn!(error = %err, "could not stat XDG_RUNTIME_DIR for workflow user"),
53 }
54
55 let mut spec = Spec::new(req.argv[0].clone())
56 .args(req.argv[1..].iter().cloned())
57 .envs(env)
58 .envs(parse_env(&req.env))
59 .run_as(run_as.uid, run_as.gid);
60 if !req.cwd.is_empty() {
61 spec = spec.cwd(req.cwd.clone());
62 }
63 let timeout =
64 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds)));
65 if let Some(timeout) = timeout {
66 spec = spec.timeout(timeout);
67 }
68
69 info!(
70 %id,
71 user = %run_as.name,
72 uid = run_as.uid,
73 gid = run_as.gid,
74 argv = ?req.argv,
75 cwd = ?req.cwd,
76 "starting exec"
77 );
78
79 let cmd = match command::spawn_streaming(spec) {
80 Ok(cmd) => cmd,
81 Err(err) => {
82 send_exit(127, Some(err.to_string()), false).await;
83 return;
84 }
85 };
86 let (mut events, exit_task) = cmd.into_parts();
87 while let Some(event) = events.recv().await {
88 let data = String::from_utf8_lossy(&event.data).into_owned();
89 let output = match event.kind {
90 OutKind::Stdout => Message {
91 id: id.clone(),
92 exec_stdout: Some(v1::ExecStdout { data }),
93 ..Default::default()
94 },
95 OutKind::Stderr => Message {
96 id: id.clone(),
97 exec_stderr: Some(v1::ExecStderr { data }),
98 ..Default::default()
99 },
100 };
101 let _ = out.send(output).await;
102 }
103 let exit = match exit_task
104 .await
105 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}")))
106 {
107 Ok(exit) => exit,
108 Err(err) => {
109 send_exit(127, Some(err.to_string()), false).await;
110 return;
111 }
112 };
113
114 send_exit(exit.exit_code, exit.error, exit.timed_out).await
115}
116
117#[derive(Clone, Debug)]
118struct ResolvedUser {
119 name: String,
120 uid: u32,
121 gid: u32,
122 home: OsString,
123 shell: OsString,
124}
125
126impl ResolvedUser {
127 fn login_env(&self) -> Vec<(OsString, OsString)> {
128 vec![
129 (OsString::from("USER"), OsString::from(&self.name)),
130 (OsString::from("LOGNAME"), OsString::from(&self.name)),
131 (OsString::from("HOME"), self.home.clone()),
132 (OsString::from("SHELL"), self.shell.clone()),
133 ]
134 }
135
136 fn runtime_dir(&self) -> PathBuf {
137 PathBuf::from(format!("/run/user/{}", self.uid))
138 }
139}
140
141fn resolve_user(spec: &str) -> Result<ResolvedUser, String> {
142 let spec = spec.trim();
143 if spec.is_empty() {
144 return resolve_user(DEFAULT_USER);
145 }
146
147 let (user_part, group_part) = spec
148 .split_once(':')
149 .map(|(user, group)| (user, Some(group)))
150 .unwrap_or((spec, None));
151
152 let mut user = lookup_user(user_part)?;
153 if let Some(group) = group_part.filter(|group| !group.is_empty()) {
154 user.gid = lookup_group(group)?;
155 }
156
157 if user.uid == 0 || user.gid == 0 {
158 return Err(format!("refusing to run exec as privileged user {spec:?}"));
159 }
160
161 Ok(user)
162}
163
164fn lookup_user(name: &str) -> Result<ResolvedUser, String> {
165 match User::from_name(name) {
166 Ok(Some(user)) => Ok(ResolvedUser {
167 name: name.to_owned(),
168 uid: user.uid.as_raw(),
169 gid: user.gid.as_raw(),
170 home: user.dir.into_os_string(),
171 shell: user.shell.into_os_string(),
172 }),
173 Ok(None) => {
174 let uid = name
175 .parse::<u32>()
176 .map_err(|_| format!("workflow user {name:?} was not found"))?;
177 Ok(ResolvedUser {
178 name: name.to_owned(),
179 uid,
180 gid: uid,
181 home: OsString::from("/"),
182 shell: OsString::from("/bin/sh"),
183 })
184 }
185 Err(error) => Err(format!("lookup workflow user {name:?}: {error}")),
186 }
187}
188
189fn lookup_group(name: &str) -> Result<u32, String> {
190 match Group::from_name(name) {
191 Ok(Some(group)) => Ok(group.gid.as_raw()),
192 Ok(None) => name
193 .parse::<u32>()
194 .map_err(|_| format!("workflow group {name:?} was not found")),
195 Err(error) => Err(format!("lookup workflow group {name:?}: {error}")),
196 }
197}
198
199fn parse_env(values: &[String]) -> Vec<(OsString, OsString)> {
200 values
201 .iter()
202 .filter_map(|value| value.split_once('='))
203 .map(|(key, value)| (OsString::from(key), OsString::from(value)))
204 .collect()
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn refuses_root_exec_user() {
213 let err = resolve_user("root").unwrap_err();
214 assert!(err.contains("refusing to run exec as privileged user"));
215 }
216
217 #[test]
218 fn refuses_root_exec_group() {
219 let err = resolve_user("65534:0").unwrap_err();
220 assert!(err.contains("refusing to run exec as privileged user"));
221 }
222}