Monorepo for Tangled
tangled.org
1use crate::command::{self, OutKind, Spec};
2use crate::protocol::{self, Message, v1};
3use nix::unistd::{Group, User};
4use std::ffi::OsString;
5use std::time::Duration;
6use tokio::sync::mpsc::Sender;
7use tracing::info;
8
9const DEFAULT_USER: &str = "spindle-workflow";
10
11pub async fn run(id: String, req: v1::ExecStart, out: Sender<Message>) {
12 let send_exit = async |exit_code: i32, error: Option<String>, timed_out: bool| {
13 let msg = Message {
14 id: id.clone(),
15 exec_exit: Some(v1::ExecExit {
16 exit_code,
17 error: protocol::error_or_empty(error),
18 timed_out,
19 }),
20 ..Default::default()
21 };
22 let _ = out.send(msg).await;
23 };
24
25 if req.argv.is_empty() {
26 send_exit(127, Some("missing argv".to_owned()), false).await;
27 return;
28 }
29
30 let user = if req.user.is_empty() {
31 DEFAULT_USER
32 } else {
33 req.user.as_str()
34 };
35 let run_as = match resolve_user(user) {
36 Ok(run_as) => run_as,
37 Err(err) => {
38 send_exit(127, Some(err), false).await;
39 return;
40 }
41 };
42
43 let mut spec = Spec::new(req.argv[0].clone())
44 .args(req.argv[1..].iter().cloned())
45 .envs(parse_env(&req.env))
46 .run_as(run_as.uid, run_as.gid);
47 if !req.cwd.is_empty() {
48 spec = spec.cwd(req.cwd.clone());
49 }
50 let timeout =
51 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds)));
52 if let Some(timeout) = timeout {
53 spec = spec.timeout(timeout);
54 }
55
56 info!(
57 %id,
58 user = %run_as.name,
59 uid = run_as.uid,
60 gid = run_as.gid,
61 argv = ?req.argv,
62 cwd = ?req.cwd,
63 "starting exec"
64 );
65
66 let cmd = match command::spawn_streaming(spec) {
67 Ok(cmd) => cmd,
68 Err(err) => {
69 send_exit(127, Some(err.to_string()), false).await;
70 return;
71 }
72 };
73 let (mut events, exit_task) = cmd.into_parts();
74 while let Some(event) = events.recv().await {
75 let data = String::from_utf8_lossy(&event.data).into_owned();
76 let output = match event.kind {
77 OutKind::Stdout => Message {
78 id: id.clone(),
79 exec_stdout: Some(v1::ExecStdout { data }),
80 ..Default::default()
81 },
82 OutKind::Stderr => Message {
83 id: id.clone(),
84 exec_stderr: Some(v1::ExecStderr { data }),
85 ..Default::default()
86 },
87 };
88 let _ = out.send(output).await;
89 }
90 let exit = match exit_task
91 .await
92 .unwrap_or_else(|error| Err(anyhow::anyhow!("command supervisor failed: {error}")))
93 {
94 Ok(exit) => exit,
95 Err(err) => {
96 send_exit(127, Some(err.to_string()), false).await;
97 return;
98 }
99 };
100
101 send_exit(exit.exit_code, exit.error, exit.timed_out).await
102}
103
104#[derive(Clone, Debug)]
105struct ResolvedUser {
106 name: String,
107 uid: u32,
108 gid: u32,
109}
110
111fn resolve_user(spec: &str) -> Result<ResolvedUser, String> {
112 let spec = spec.trim();
113 if spec.is_empty() {
114 return resolve_user(DEFAULT_USER);
115 }
116
117 let (user_part, group_part) = spec
118 .split_once(':')
119 .map(|(user, group)| (user, Some(group)))
120 .unwrap_or((spec, None));
121
122 let mut user = lookup_user(user_part)?;
123 if let Some(group) = group_part.filter(|group| !group.is_empty()) {
124 user.gid = lookup_group(group)?;
125 }
126
127 if user.uid == 0 || user.gid == 0 {
128 return Err(format!("refusing to run exec as privileged user {spec:?}"));
129 }
130
131 Ok(user)
132}
133
134fn lookup_user(name: &str) -> Result<ResolvedUser, String> {
135 match User::from_name(name) {
136 Ok(Some(user)) => Ok(ResolvedUser {
137 name: name.to_owned(),
138 uid: user.uid.as_raw(),
139 gid: user.gid.as_raw(),
140 }),
141 Ok(None) => {
142 let uid = name
143 .parse::<u32>()
144 .map_err(|_| format!("workflow user {name:?} was not found"))?;
145 Ok(ResolvedUser {
146 name: name.to_owned(),
147 uid,
148 gid: uid,
149 })
150 }
151 Err(error) => Err(format!("lookup workflow user {name:?}: {error}")),
152 }
153}
154
155fn lookup_group(name: &str) -> Result<u32, String> {
156 match Group::from_name(name) {
157 Ok(Some(group)) => Ok(group.gid.as_raw()),
158 Ok(None) => name
159 .parse::<u32>()
160 .map_err(|_| format!("workflow group {name:?} was not found")),
161 Err(error) => Err(format!("lookup workflow group {name:?}: {error}")),
162 }
163}
164
165fn parse_env(values: &[String]) -> Vec<(OsString, OsString)> {
166 values
167 .iter()
168 .filter_map(|value| value.split_once('='))
169 .map(|(key, value)| (OsString::from(key), OsString::from(value)))
170 .collect()
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn refuses_root_exec_user() {
179 let err = resolve_user("root").unwrap_err();
180 assert!(err.contains("refusing to run exec as privileged user"));
181 }
182
183 #[test]
184 fn refuses_root_exec_group() {
185 let err = resolve_user("65534:0").unwrap_err();
186 assert!(err.contains("refusing to run exec as privileged user"));
187 }
188}