Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

spindle/microvm: add ssh debug into failed job VMs

Signed-off-by: dawn <dawn@tangled.org>

author
dawn
date (Jun 25, 2026, 8:34 PM +0300) commit f23d8633 parent 6c3753a7 change-id uykwpnll
+1241 -61
+11
Cargo.lock
··· 2975 2975 ] 2976 2976 2977 2977 [[package]] 2978 + name = "pty-process" 2979 + version = "0.5.3" 2980 + source = "registry+https://github.com/rust-lang/crates.io-index" 2981 + checksum = "71cec9e2670207c5ebb9e477763c74436af3b9091dd550b9fb3c1bec7f3ea266" 2982 + dependencies = [ 2983 + "rustix", 2984 + "tokio", 2985 + ] 2986 + 2987 + [[package]] 2978 2988 name = "quick_cache" 2979 2989 version = "0.6.22" 2980 2990 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3680 3690 "prost", 3681 3691 "prost-protovalidate", 3682 3692 "prost-reflect", 3693 + "pty-process", 3683 3694 "serde", 3684 3695 "serde_json", 3685 3696 "tempfile",
+5
docker-compose.yml
··· 152 152 SPINDLE_MICROVM_PIPELINES_AGENT_PORT: "11240" 153 153 SPINDLE_S3_LOG_BUCKET: "" 154 154 SPINDLE_MICROVM_PIPELINES_ENABLE_CGROUPS: "false" 155 + SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_ENABLED: true 156 + SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_LISTEN_ADDR: 0.0.0.0:2223 157 + SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_GRACE_PERIOD: 10m 155 158 # these two are required for cgroups, uncomment if testing 156 159 # privileged: true 157 160 # cgroup: host ··· 166 169 security_opt: 167 170 - label=disable 168 171 - seccomp=unconfined 172 + ports: 173 + - "2223:2223" 169 174 volumes: 170 175 - spindle-data:/var/lib/spindle 171 176 - spindle-logs:/var/log/spindle
+1
nix/microvm/base.nix
··· 199 199 group = "spindle-workflow"; 200 200 home = "/workspace"; 201 201 createHome = false; 202 + shell = pkgs.bashInteractive; 202 203 }; 203 204 users.users.spindle-workflow.extraGroups = lib.mkIf config.virtualisation.docker.enable [ 204 205 "docker"
+37
nix/modules/spindle.nix
··· 257 257 ''; 258 258 }; 259 259 }; 260 + 261 + debugSsh = { 262 + enable = mkOption { 263 + type = types.bool; 264 + default = false; 265 + description = '' 266 + Enable the debug ssh server that lets authorized users ssh into a 267 + failed microVM to debug it. 268 + ''; 269 + }; 270 + listenAddr = mkOption { 271 + type = types.str; 272 + default = "0.0.0.0:2222"; 273 + example = "0.0.0.0:2225"; 274 + description = "Address for the debug ssh server to listen on."; 275 + }; 276 + hostKeyPath = mkOption { 277 + type = with types; nullOr path; 278 + default = null; 279 + example = "/var/lib/spindle/debug_ssh_host_key"; 280 + description = '' 281 + Path to the ssh host key for the debug server. If null, one is generated 282 + once and persisted next to the spindle db. 283 + ''; 284 + }; 285 + gracePeriod = mkOption { 286 + type = types.str; 287 + default = "5m"; 288 + description = '' 289 + How long a failed workflow's microVM is kept alive for the user to ssh in. 290 + ''; 291 + }; 292 + }; 260 293 }; 261 294 262 295 nixCache = { ··· 377 410 "SPINDLE_MICROVM_PIPELINES_CGROUP_PIDS_MAX=${toString cfg.pipelines.microvm.cgroup.pidsMax}" 378 411 "SPINDLE_MICROVM_PIPELINES_CGROUP_SWAP_MAX_MIB=${toString cfg.pipelines.microvm.cgroup.swapMaxMiB}" 379 412 "SPINDLE_MICROVM_PIPELINES_CGROUP_SUPERVISOR_MEMORY_MIN_MIB=${toString cfg.pipelines.microvm.cgroup.supervisorMinMiB}" 413 + "SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_ENABLED=${lib.boolToString cfg.pipelines.microvm.debugSsh.enable}" 414 + "SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_LISTEN_ADDR=${cfg.pipelines.microvm.debugSsh.listenAddr}" 415 + "SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_HOST_KEY_PATH=${optionalString (cfg.pipelines.microvm.debugSsh.hostKeyPath != null) (toString cfg.pipelines.microvm.debugSsh.hostKeyPath)}" 416 + "SPINDLE_MICROVM_PIPELINES_DEBUG_SSH_GRACE_PERIOD=${cfg.pipelines.microvm.debugSsh.gracePeriod}" 380 417 "SPINDLE_NIX_CACHE_READ_URLS=${concatStringsSep "," cfg.pipelines.nixCache.readUrls}" 381 418 "SPINDLE_NIX_CACHE_TRUSTED_PUBLIC_KEYS=${concatStringsSep "," cfg.pipelines.nixCache.trustedPublicKeys}" 382 419 "SPINDLE_NIX_CACHE_UPLOAD_URL=${cfg.pipelines.nixCache.uploadUrl}"
+2 -1
shuttle/Cargo.toml
··· 8 8 [dependencies] 9 9 anyhow = "1" 10 10 base64 = "0.22" 11 - nix = { version = "0.31", features = ["fs", "process", "reboot", "signal", "user"] } 11 + nix = { version = "0.31", features = ["fs", "process", "reboot", "signal", "term", "user"] } 12 12 prost = "0.14" 13 13 prost-reflect = "0.16" 14 14 prost-protovalidate = "0.3" 15 15 once_cell = "1" 16 + pty-process = { version = "0.5.3", features = ["async"] } 16 17 serde = { version = "1", features = ["derive"] } 17 18 serde_json = "1" 18 19 tempfile = "3"
+99 -14
shuttle/src/command.rs
··· 1 1 use anyhow::{Context, Result}; 2 2 use nix::sys::signal::{Signal, kill}; 3 3 use nix::unistd::{Gid, Pid, Uid, User, getgrouplist, setgid, setgroups, setuid}; 4 - use std::ffi::{CString, OsStr, OsString}; 4 + use pty_process::{Command as PtyCommand, OwnedReadPty, OwnedWritePty, Size}; 5 + use std::ffi::{CString, OsString}; 5 6 use std::io; 6 7 use std::os::unix::process::ExitStatusExt; 7 8 use std::path::PathBuf; ··· 196 197 // group won't actually let it access the sock. 197 198 // https://github.com/rust-lang/rust/issues/90747 198 199 if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) { 199 - let username = User::from_uid(Uid::from_raw(uid)) 200 - .ok() 201 - .flatten() 202 - .map(|u| u.name) 203 - .with_context(|| format!("lookup passwd entry for uid {uid}"))?; 204 - let cname = CString::new(username) 205 - .with_context(|| format!("username for uid {uid} contained a null byte"))?; 206 - // resolve groups beforehand so we don't have to read /etc/group in the pre_exec 207 - let groups = 208 - getgrouplist(&cname, Gid::from_raw(gid)).context("resolve supplementary groups")?; 200 + let groups = resolve_supplementary_groups(uid, gid)?; 209 201 // SAFETY: pre_exec runs between fork and execve in the child. 210 202 // we only call async-signal-safe syscalls and we don't touch any 211 203 // shared state, no allocator, no mutexes, no globals. ··· 223 215 cmd.process_group(0); 224 216 225 217 cmd.spawn() 226 - .with_context(|| format!("spawn {}", display_os(&spec.program))) 218 + .with_context(|| format!("spawn {:?}", &spec.program)) 219 + } 220 + 221 + // resolve the supplementary group list up front so the pre_exec hook never has 222 + // to read /etc/group (which is not async-signal-safe) between fork and exec. 223 + fn resolve_supplementary_groups(uid: u32, gid: u32) -> Result<Vec<Gid>> { 224 + let username = User::from_uid(Uid::from_raw(uid)) 225 + .ok() 226 + .flatten() 227 + .map(|u| u.name) 228 + .with_context(|| format!("lookup passwd entry for uid {uid}"))?; 229 + let cname = CString::new(username) 230 + .with_context(|| format!("username for uid {uid} contained a null byte"))?; 231 + getgrouplist(&cname, Gid::from_raw(gid)).context("resolve supplementary groups") 232 + } 233 + 234 + pub fn spawn_pty(spec: Spec, rows: u16, cols: u16) -> Result<(OwnedReadPty, OwnedWritePty, Child)> { 235 + let (pty, pts) = pty_process::open().context("open pty")?; 236 + pty.resize(Size::new(rows, cols)).context("set pty size")?; 237 + 238 + let mut cmd = PtyCommand::new(&spec.program) 239 + .args(&spec.args) 240 + .envs(spec.env.iter().map(|(key, value)| (key, value))); 241 + if let Some(cwd) = &spec.cwd { 242 + cmd = cmd.current_dir(cwd); 243 + } 244 + 245 + // drop privileges in the child. this RELIES on pty-process composing our 246 + // pre_exec hook *after* its own session setup: it wraps us as `move || { 247 + // session_leader()?; ours()?; }`, so setsid + TIOCSCTTY run first (while 248 + // still privileged) and only then do we drop to the workflow user. that 249 + // ordering is what we want and we depend on it. if pty-process ever ran our 250 + // hook first, the session setup would happen post-drop. (it'd likely still 251 + // work, since setsid/TIOCSCTTY on our own pty need no privilege, but it is 252 + // not the behaviour we're assuming here) 253 + // don't use .uid()/.gid() here, they clear supplementary groups (see L195). 254 + if let (Some(uid), Some(gid)) = (spec.uid, spec.gid) { 255 + let groups = resolve_supplementary_groups(uid, gid)?; 256 + // SAFETY: pre_exec runs between fork and execve in the child. every call 257 + // below is async-signal-safe and touches no shared state. 258 + cmd = unsafe { 259 + cmd.pre_exec(move || { 260 + setgroups(&groups).map_err(io::Error::from)?; 261 + setgid(Gid::from_raw(gid)).map_err(io::Error::from)?; 262 + setuid(Uid::from_raw(uid)).map_err(io::Error::from)?; 263 + Ok(()) 264 + }) 265 + }; 266 + } 267 + 268 + // spawn consumes the slave (dup'd onto the child's 0/1/2 and then closed in 269 + // the parent), so the master reports EOF once the shell and all its children 270 + // have exited. 271 + let child = cmd 272 + .spawn(pts) 273 + .with_context(|| format!("spawn pty shell {:?}", &spec.program))?; 274 + 275 + let (reader, writer) = pty.into_split(); 276 + Ok((reader, writer, child)) 227 277 } 228 278 229 279 async fn wait_child(child: &mut Child, timeout: Option<Duration>) -> ExitResult { ··· 293 343 }) 294 344 } 295 345 296 - fn display_os(value: &OsStr) -> String { 297 - value.to_string_lossy().into_owned() 346 + #[cfg(test)] 347 + mod tests { 348 + use super::*; 349 + use tokio::io::AsyncReadExt; 350 + 351 + #[tokio::test] 352 + async fn pty_runs_a_shell_and_reports_exit() { 353 + let spec = Spec::new("/bin/sh") 354 + .arg("-c") 355 + .arg("printf 'hello pty'; exit 7"); 356 + let (mut reader, _writer, mut child) = spawn_pty(spec, 24, 80).expect("spawn pty"); 357 + 358 + let mut output = Vec::new(); 359 + let mut chunk = [0u8; 1024]; 360 + loop { 361 + match reader.read(&mut chunk).await { 362 + Ok(0) => break, 363 + Ok(n) => output.extend_from_slice(&chunk[..n]), 364 + // linux signals slave-closed with EIO rather than EOF 365 + Err(error) if error.raw_os_error() == Some(nix::libc::EIO) => break, 366 + Err(error) => panic!("read pty master: {error}"), 367 + } 368 + } 369 + 370 + let status = child.wait().await.expect("wait child"); 371 + let text = String::from_utf8_lossy(&output); 372 + assert!(text.contains("hello pty"), "unexpected output: {text:?}"); 373 + assert_eq!(status.code(), Some(7)); 374 + } 375 + 376 + #[tokio::test] 377 + async fn pty_resize_succeeds() { 378 + let spec = Spec::new("/bin/sh").arg("-c").arg("sleep 0.2"); 379 + let (_reader, writer, mut child) = spawn_pty(spec, 24, 80).expect("spawn pty"); 380 + writer.resize(Size::new(40, 120)).expect("resize"); 381 + let _ = child.wait().await; 382 + } 298 383 }
shuttle/src/gen/file_descriptor_set.bin

This is a binary file and will not be displayed.

+33
shuttle/src/gen/spindle/agent/v1/spindle.agent.v1.rs
··· 113 113 pub error: ::prost::alloc::string::String, 114 114 } 115 115 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] 116 + pub struct OpenDebugShell { 117 + #[prost(uint32, tag = "1")] 118 + pub vsock_port: u32, 119 + #[prost(string, tag = "2")] 120 + pub term: ::prost::alloc::string::String, 121 + #[prost(uint32, tag = "3")] 122 + pub rows: u32, 123 + /// debug shells always run as the spindle-workflow user, with that user's 124 + /// login shell, starting in its home dir. nothing here is client-specifiable. 125 + #[prost(uint32, tag = "4")] 126 + pub cols: u32, 127 + } 128 + /// changes meaning based on who sends this: 129 + /// guest->host is shell output, host->guest is keyboard input 130 + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] 131 + pub struct PtyData { 132 + #[prost(bytes = "bytes", tag = "1")] 133 + pub data: ::prost::bytes::Bytes, 134 + } 135 + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] 136 + pub struct PtyResize { 137 + #[prost(uint32, tag = "1")] 138 + pub rows: u32, 139 + #[prost(uint32, tag = "2")] 140 + pub cols: u32, 141 + } 142 + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] 116 143 pub struct Message { 117 144 #[prost(string, tag = "1")] 118 145 pub id: ::prost::alloc::string::String, ··· 142 169 pub poweroff: ::core::option::Option<Poweroff>, 143 170 #[prost(message, optional, tag = "14")] 144 171 pub poweroff_result: ::core::option::Option<PoweroffResult>, 172 + #[prost(message, optional, tag = "15")] 173 + pub open_debug_shell: ::core::option::Option<OpenDebugShell>, 174 + #[prost(message, optional, tag = "16")] 175 + pub pty_data: ::core::option::Option<PtyData>, 176 + #[prost(message, optional, tag = "17")] 177 + pub pty_resize: ::core::option::Option<PtyResize>, 145 178 } 146 179 // @@protoc_insertion_point(module)
+1
shuttle/src/main.rs
··· 9 9 mod logging; 10 10 mod nix_config; 11 11 mod protocol; 12 + mod pty; 12 13 mod session; 13 14 14 15 use std::env;
+6
shuttle/src/protocol.rs
··· 43 43 CacheDrainResult, 44 44 Poweroff, 45 45 PoweroffResult, 46 + OpenDebugShell, 47 + PtyData, 48 + PtyResize, 46 49 Message, 47 50 ); 48 51 ··· 79 82 cache_drain_result => "cache_drain_result", 80 83 poweroff => "poweroff", 81 84 poweroff_result => "poweroff_result", 85 + open_debug_shell => "open_debug_shell", 86 + pty_data => "pty_data", 87 + pty_resize => "pty_resize", 82 88 }) 83 89 .unwrap_or_else(|| unreachable!("validated message has no payload")) 84 90 }
+182
shuttle/src/pty.rs
··· 1 + use std::path::Path; 2 + 3 + use crate::command::{self, Spec}; 4 + use crate::protocol::{self, Message, v1}; 5 + use anyhow::{Context, Result, bail}; 6 + use nix::sys::signal::{Signal, kill}; 7 + use nix::unistd::{Pid, User}; 8 + use pty_process::Size; 9 + use tokio::io::{AsyncReadExt, BufReader}; 10 + use tokio_vsock::{VsockAddr, VsockStream}; 11 + use tracing::{info, warn}; 12 + 13 + const WF_USER: &str = "spindle-workflow"; 14 + const READ_CHUNK: usize = 32 * 1024; 15 + 16 + pub async fn run(host_cid: u32, open: v1::OpenDebugShell) { 17 + if let Err(error) = serve(host_cid, open).await { 18 + warn!(%error, "debug shell session failed"); 19 + } 20 + } 21 + 22 + async fn serve(host_cid: u32, open: v1::OpenDebugShell) -> Result<()> { 23 + let conn = VsockStream::connect(VsockAddr::new(host_cid, open.vsock_port)) 24 + .await 25 + .with_context(|| format!("dial host debug vsock port {}", open.vsock_port))?; 26 + info!(port = open.vsock_port, "debug shell connected"); 27 + 28 + let user = resolve_user(WF_USER)?; 29 + 30 + let rows = clamp_tty_dim(open.rows); 31 + let cols = clamp_tty_dim(open.cols); 32 + 33 + let spec = Spec::new(&user.shell) 34 + .arg("-l") 35 + .envs(user.env(&open.term)) 36 + .run_as(user.uid, user.gid) 37 + .cwd(Path::new(&user.home).join("repo")); // /workflow/repo 38 + 39 + let (mut pty_reader, mut pty_writer, mut child) = 40 + command::spawn_pty(spec, rows, cols).context("spawn pty shell")?; 41 + let pid = child.id(); 42 + 43 + let (conn_reader, conn_writer) = tokio::io::split(conn); 44 + let mut conn_reader = BufReader::new(conn_reader); 45 + let mut conn_writer = conn_writer; 46 + 47 + let mut buf = vec![0u8; READ_CHUNK]; 48 + let client_gone = loop { 49 + tokio::select! { 50 + read = pty_reader.read(&mut buf) => match read { 51 + Ok(0) => break false, // shell exited 52 + Ok(n) => { 53 + let msg = Message { 54 + id: "pty".to_owned(), 55 + pty_data: Some(v1::PtyData { data: buf[..n].to_vec().into() }), 56 + ..Default::default() 57 + }; 58 + if protocol::write_message(&mut conn_writer, &msg).await.is_err() { 59 + break true; 60 + } 61 + } 62 + Err(error) => { 63 + // linux returns EIO (not a clean EOF) on the master once the 64 + // slave side is fully closed, so treat that as the shell 65 + // exiting normally rather than a real read failure. 66 + if error.raw_os_error() != Some(nix::libc::EIO) { 67 + warn!(%error, "pty master read failed"); 68 + } 69 + break false; 70 + } 71 + }, 72 + incoming = protocol::read_message(&mut conn_reader) => match incoming { 73 + Ok(Some(msg)) => { 74 + if let Some(data) = msg.pty_data { 75 + use tokio::io::AsyncWriteExt; 76 + if pty_writer.write_all(&data.data).await.is_err() { 77 + break false; 78 + } 79 + } else if let Some(resize) = msg.pty_resize { 80 + let size = Size::new(clamp_tty_dim(resize.rows), clamp_tty_dim(resize.cols)); 81 + if let Err(error) = pty_writer.resize(size) { 82 + warn!(%error, "pty resize failed"); 83 + } 84 + } 85 + // anything else on the debug channel is ignored 86 + } 87 + Ok(None) => break true, // client closed the connection 88 + Err(error) => { 89 + warn!(%error, "debug channel read failed"); 90 + break true; 91 + } 92 + }, 93 + } 94 + }; 95 + 96 + // if the client disconnected first, hang up the shell's process group so we 97 + // don't leak a detached session. (pty-process calls setsid in the child, so 98 + // it leads a new session and process group => pgid == pid.) 99 + if client_gone && let Some(pid) = pid { 100 + let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGHUP); 101 + } 102 + 103 + let exit_code = match child.wait().await { 104 + Ok(status) => { 105 + use std::os::unix::process::ExitStatusExt; 106 + status 107 + .code() 108 + .or_else(|| status.signal().map(|signal| 128 + signal)) 109 + .unwrap_or(1) 110 + } 111 + Err(error) => { 112 + warn!(%error, "waiting on debug shell failed"); 113 + 1 114 + } 115 + }; 116 + 117 + let exit = Message { 118 + id: "pty".to_owned(), 119 + exec_exit: Some(v1::ExecExit { 120 + exit_code, 121 + error: String::new(), 122 + timed_out: false, 123 + }), 124 + ..Default::default() 125 + }; 126 + let _ = protocol::write_message(&mut conn_writer, &exit).await; 127 + info!(exit_code, "debug shell session ended"); 128 + Ok(()) 129 + } 130 + 131 + struct ResolvedUser { 132 + uid: u32, 133 + gid: u32, 134 + name: String, 135 + home: String, 136 + shell: String, 137 + } 138 + 139 + impl ResolvedUser { 140 + fn env(&self, term: &str) -> Vec<(String, String)> { 141 + let term = if term.is_empty() { 142 + "xterm-256color" 143 + } else { 144 + term 145 + }; 146 + vec![ 147 + ("TERM".to_owned(), term.to_owned()), 148 + ("HOME".to_owned(), self.home.clone()), 149 + ("USER".to_owned(), self.name.clone()), 150 + ("LOGNAME".to_owned(), self.name.clone()), 151 + ("SHELL".to_owned(), self.shell.clone()), 152 + ( 153 + "PATH".to_owned(), 154 + "/run/current-system/sw/bin:/usr/bin:/bin".to_owned(), 155 + ), 156 + ] 157 + } 158 + } 159 + 160 + fn resolve_user(name: &str) -> Result<ResolvedUser> { 161 + let user = User::from_name(name) 162 + .with_context(|| format!("lookup user {name:?}"))? 163 + .with_context(|| format!("debug shell user {name:?} not found"))?; 164 + if user.uid.as_raw() == 0 || user.gid.as_raw() == 0 { 165 + bail!("refusing to open a debug shell as privileged user {name:?}"); 166 + } 167 + let shell = user.shell.to_string_lossy().into_owned(); 168 + if shell.is_empty() { 169 + bail!("debug shell user {name:?} has no login shell set in the image"); 170 + } 171 + Ok(ResolvedUser { 172 + uid: user.uid.as_raw(), 173 + gid: user.gid.as_raw(), 174 + name: user.name, 175 + home: user.dir.to_string_lossy().into_owned(), 176 + shell, 177 + }) 178 + } 179 + 180 + fn clamp_tty_dim(value: u32) -> u16 { 181 + value.clamp(1, u16::MAX as u32) as u16 182 + }
+4 -1
shuttle/src/session.rs
··· 5 5 use crate::nix_config::{self, SYSTEMCTL_EXECUTABLE}; 6 6 use crate::on_payload; 7 7 use crate::protocol::{self, Message, v1}; 8 + use crate::pty; 8 9 use crate::{activation, command}; 9 10 use anyhow::{Context, Result, bail}; 10 11 use std::time::Duration; ··· 61 62 let read_result: Result<()> = loop { 62 63 tokio::select! { 63 64 read = protocol::read_message(&mut reader) => match read { 64 - Ok(Some(msg)) => spawn_message_task(&mut tasks, msg, &out_tx, uploader.clone()), 65 + Ok(Some(msg)) => spawn_message_task(&mut tasks, host_cid, msg, &out_tx, uploader.clone()), 65 66 Ok(None) => break Ok(()), 66 67 Err(error) => break Err(error).context("read message"), 67 68 }, ··· 84 85 85 86 fn spawn_message_task( 86 87 tasks: &mut JoinSet<()>, 88 + host_cid: u32, 87 89 msg: Message, 88 90 out_tx: &Sender<Message>, 89 91 uploader: Option<CacheUploadManager>, ··· 94 96 exec_start => tasks.spawn(exec::run(msg.id, exec_start, out_tx.clone())), 95 97 cache_drain => tasks.spawn(run_cache_drain(msg.id, cache_drain, out_tx.clone(), uploader)), 96 98 poweroff => tasks.spawn(run_poweroff(msg.id, poweroff, out_tx.clone())), 99 + open_debug_shell => tasks.spawn(pty::run(host_cid, open_debug_shell)), 97 100 }); 98 101 if handle.is_none() { 99 102 warn!(kind, "ignoring unsupported message");
+230 -14
spindle/agentproto/gen/agent.pb.go
··· 780 780 return "" 781 781 } 782 782 783 + type OpenDebugShell struct { 784 + state protoimpl.MessageState `protogen:"open.v1"` 785 + VsockPort uint32 `protobuf:"varint,1,opt,name=vsock_port,json=vsockPort,proto3" json:"vsock_port,omitempty"` 786 + Term string `protobuf:"bytes,2,opt,name=term,proto3" json:"term,omitempty"` 787 + Rows uint32 `protobuf:"varint,3,opt,name=rows,proto3" json:"rows,omitempty"` 788 + Cols uint32 `protobuf:"varint,4,opt,name=cols,proto3" json:"cols,omitempty"` 789 + unknownFields protoimpl.UnknownFields 790 + sizeCache protoimpl.SizeCache 791 + } 792 + 793 + func (x *OpenDebugShell) Reset() { 794 + *x = OpenDebugShell{} 795 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[13] 796 + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 797 + ms.StoreMessageInfo(mi) 798 + } 799 + 800 + func (x *OpenDebugShell) String() string { 801 + return protoimpl.X.MessageStringOf(x) 802 + } 803 + 804 + func (*OpenDebugShell) ProtoMessage() {} 805 + 806 + func (x *OpenDebugShell) ProtoReflect() protoreflect.Message { 807 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[13] 808 + if x != nil { 809 + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 810 + if ms.LoadMessageInfo() == nil { 811 + ms.StoreMessageInfo(mi) 812 + } 813 + return ms 814 + } 815 + return mi.MessageOf(x) 816 + } 817 + 818 + // Deprecated: Use OpenDebugShell.ProtoReflect.Descriptor instead. 819 + func (*OpenDebugShell) Descriptor() ([]byte, []int) { 820 + return file_spindle_agent_v1_agent_proto_rawDescGZIP(), []int{13} 821 + } 822 + 823 + func (x *OpenDebugShell) GetVsockPort() uint32 { 824 + if x != nil { 825 + return x.VsockPort 826 + } 827 + return 0 828 + } 829 + 830 + func (x *OpenDebugShell) GetTerm() string { 831 + if x != nil { 832 + return x.Term 833 + } 834 + return "" 835 + } 836 + 837 + func (x *OpenDebugShell) GetRows() uint32 { 838 + if x != nil { 839 + return x.Rows 840 + } 841 + return 0 842 + } 843 + 844 + func (x *OpenDebugShell) GetCols() uint32 { 845 + if x != nil { 846 + return x.Cols 847 + } 848 + return 0 849 + } 850 + 851 + // changes meaning based on who sends this: 852 + // guest->host is shell output, host->guest is keyboard input 853 + type PtyData struct { 854 + state protoimpl.MessageState `protogen:"open.v1"` 855 + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` 856 + unknownFields protoimpl.UnknownFields 857 + sizeCache protoimpl.SizeCache 858 + } 859 + 860 + func (x *PtyData) Reset() { 861 + *x = PtyData{} 862 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[14] 863 + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 864 + ms.StoreMessageInfo(mi) 865 + } 866 + 867 + func (x *PtyData) String() string { 868 + return protoimpl.X.MessageStringOf(x) 869 + } 870 + 871 + func (*PtyData) ProtoMessage() {} 872 + 873 + func (x *PtyData) ProtoReflect() protoreflect.Message { 874 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[14] 875 + if x != nil { 876 + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 877 + if ms.LoadMessageInfo() == nil { 878 + ms.StoreMessageInfo(mi) 879 + } 880 + return ms 881 + } 882 + return mi.MessageOf(x) 883 + } 884 + 885 + // Deprecated: Use PtyData.ProtoReflect.Descriptor instead. 886 + func (*PtyData) Descriptor() ([]byte, []int) { 887 + return file_spindle_agent_v1_agent_proto_rawDescGZIP(), []int{14} 888 + } 889 + 890 + func (x *PtyData) GetData() []byte { 891 + if x != nil { 892 + return x.Data 893 + } 894 + return nil 895 + } 896 + 897 + type PtyResize struct { 898 + state protoimpl.MessageState `protogen:"open.v1"` 899 + Rows uint32 `protobuf:"varint,1,opt,name=rows,proto3" json:"rows,omitempty"` 900 + Cols uint32 `protobuf:"varint,2,opt,name=cols,proto3" json:"cols,omitempty"` 901 + unknownFields protoimpl.UnknownFields 902 + sizeCache protoimpl.SizeCache 903 + } 904 + 905 + func (x *PtyResize) Reset() { 906 + *x = PtyResize{} 907 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[15] 908 + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 909 + ms.StoreMessageInfo(mi) 910 + } 911 + 912 + func (x *PtyResize) String() string { 913 + return protoimpl.X.MessageStringOf(x) 914 + } 915 + 916 + func (*PtyResize) ProtoMessage() {} 917 + 918 + func (x *PtyResize) ProtoReflect() protoreflect.Message { 919 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[15] 920 + if x != nil { 921 + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 922 + if ms.LoadMessageInfo() == nil { 923 + ms.StoreMessageInfo(mi) 924 + } 925 + return ms 926 + } 927 + return mi.MessageOf(x) 928 + } 929 + 930 + // Deprecated: Use PtyResize.ProtoReflect.Descriptor instead. 931 + func (*PtyResize) Descriptor() ([]byte, []int) { 932 + return file_spindle_agent_v1_agent_proto_rawDescGZIP(), []int{15} 933 + } 934 + 935 + func (x *PtyResize) GetRows() uint32 { 936 + if x != nil { 937 + return x.Rows 938 + } 939 + return 0 940 + } 941 + 942 + func (x *PtyResize) GetCols() uint32 { 943 + if x != nil { 944 + return x.Cols 945 + } 946 + return 0 947 + } 948 + 783 949 type Message struct { 784 950 state protoimpl.MessageState `protogen:"open.v1"` 785 951 Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` ··· 796 962 CacheDrainResult *CacheDrainResult `protobuf:"bytes,12,opt,name=cache_drain_result,json=cacheDrainResult,proto3" json:"cache_drain_result,omitempty"` 797 963 Poweroff *Poweroff `protobuf:"bytes,13,opt,name=poweroff,proto3" json:"poweroff,omitempty"` 798 964 PoweroffResult *PoweroffResult `protobuf:"bytes,14,opt,name=poweroff_result,json=poweroffResult,proto3" json:"poweroff_result,omitempty"` 965 + OpenDebugShell *OpenDebugShell `protobuf:"bytes,15,opt,name=open_debug_shell,json=openDebugShell,proto3" json:"open_debug_shell,omitempty"` 966 + PtyData *PtyData `protobuf:"bytes,16,opt,name=pty_data,json=ptyData,proto3" json:"pty_data,omitempty"` 967 + PtyResize *PtyResize `protobuf:"bytes,17,opt,name=pty_resize,json=ptyResize,proto3" json:"pty_resize,omitempty"` 799 968 unknownFields protoimpl.UnknownFields 800 969 sizeCache protoimpl.SizeCache 801 970 } 802 971 803 972 func (x *Message) Reset() { 804 973 *x = Message{} 805 - mi := &file_spindle_agent_v1_agent_proto_msgTypes[13] 974 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[16] 806 975 ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 807 976 ms.StoreMessageInfo(mi) 808 977 } ··· 814 983 func (*Message) ProtoMessage() {} 815 984 816 985 func (x *Message) ProtoReflect() protoreflect.Message { 817 - mi := &file_spindle_agent_v1_agent_proto_msgTypes[13] 986 + mi := &file_spindle_agent_v1_agent_proto_msgTypes[16] 818 987 if x != nil { 819 988 ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) 820 989 if ms.LoadMessageInfo() == nil { ··· 827 996 828 997 // Deprecated: Use Message.ProtoReflect.Descriptor instead. 829 998 func (*Message) Descriptor() ([]byte, []int) { 830 - return file_spindle_agent_v1_agent_proto_rawDescGZIP(), []int{13} 999 + return file_spindle_agent_v1_agent_proto_rawDescGZIP(), []int{16} 831 1000 } 832 1001 833 1002 func (x *Message) GetId() string { ··· 928 1097 return nil 929 1098 } 930 1099 1100 + func (x *Message) GetOpenDebugShell() *OpenDebugShell { 1101 + if x != nil { 1102 + return x.OpenDebugShell 1103 + } 1104 + return nil 1105 + } 1106 + 1107 + func (x *Message) GetPtyData() *PtyData { 1108 + if x != nil { 1109 + return x.PtyData 1110 + } 1111 + return nil 1112 + } 1113 + 1114 + func (x *Message) GetPtyResize() *PtyResize { 1115 + if x != nil { 1116 + return x.PtyResize 1117 + } 1118 + return nil 1119 + } 1120 + 931 1121 var File_spindle_agent_v1_agent_proto protoreflect.FileDescriptor 932 1122 933 1123 const file_spindle_agent_v1_agent_proto_rawDesc = "" + ··· 990 1180 "\n" + 991 1181 "\bPoweroff\"&\n" + 992 1182 "\x0ePoweroffResult\x12\x14\n" + 993 - "\x05error\x18\x01 \x01(\tR\x05error\"\xa8\b\n" + 1183 + "\x05error\x18\x01 \x01(\tR\x05error\"k\n" + 1184 + "\x0eOpenDebugShell\x12\x1d\n" + 1185 + "\n" + 1186 + "vsock_port\x18\x01 \x01(\rR\tvsockPort\x12\x12\n" + 1187 + "\x04term\x18\x02 \x01(\tR\x04term\x12\x12\n" + 1188 + "\x04rows\x18\x03 \x01(\rR\x04rows\x12\x12\n" + 1189 + "\x04cols\x18\x04 \x01(\rR\x04cols\"\x1d\n" + 1190 + "\aPtyData\x12\x12\n" + 1191 + "\x04data\x18\x01 \x01(\fR\x04data\"3\n" + 1192 + "\tPtyResize\x12\x12\n" + 1193 + "\x04rows\x18\x01 \x01(\rR\x04rows\x12\x12\n" + 1194 + "\x04cols\x18\x02 \x01(\rR\x04cols\"\x8e\n" + 1195 + "\n" + 994 1196 "\aMessage\x12\x17\n" + 995 1197 "\x02id\x18\x01 \x01(\tB\a\xbaH\x04r\x02\x10\x01R\x02id\x12-\n" + 996 1198 "\x05hello\x18\x02 \x01(\v2\x17.spindle.agent.v1.HelloR\x05hello\x12*\n" + ··· 1011 1213 "cacheDrain\x12P\n" + 1012 1214 "\x12cache_drain_result\x18\f \x01(\v2\".spindle.agent.v1.CacheDrainResultR\x10cacheDrainResult\x126\n" + 1013 1215 "\bpoweroff\x18\r \x01(\v2\x1a.spindle.agent.v1.PoweroffR\bpoweroff\x12I\n" + 1014 - "\x0fpoweroff_result\x18\x0e \x01(\v2 .spindle.agent.v1.PoweroffResultR\x0epoweroffResult:\xb9\x01\xbaH\xb5\x01\"\xb2\x01\n" + 1216 + "\x0fpoweroff_result\x18\x0e \x01(\v2 .spindle.agent.v1.PoweroffResultR\x0epoweroffResult\x12J\n" + 1217 + "\x10open_debug_shell\x18\x0f \x01(\v2 .spindle.agent.v1.OpenDebugShellR\x0eopenDebugShell\x124\n" + 1218 + "\bpty_data\x18\x10 \x01(\v2\x19.spindle.agent.v1.PtyDataR\aptyData\x12:\n" + 1219 + "\n" + 1220 + "pty_resize\x18\x11 \x01(\v2\x1b.spindle.agent.v1.PtyResizeR\tptyResize:\xe1\x01\xbaH\xdd\x01\"\xda\x01\n" + 1015 1221 "\x05hello\n" + 1016 1222 "\x04init\n" + 1017 1223 "\n" + ··· 1025 1231 "\vcache_drain\n" + 1026 1232 "\x12cache_drain_result\n" + 1027 1233 "\bpoweroff\n" + 1028 - "\x0fpoweroff_result\x10\x01B1Z/tangled.org/core/spindle/agentproto/gen;agentv1b\x06proto3" 1234 + "\x0fpoweroff_result\n" + 1235 + "\x10open_debug_shell\n" + 1236 + "\bpty_data\n" + 1237 + "\n" + 1238 + "pty_resize\x10\x01B1Z/tangled.org/core/spindle/agentproto/gen;agentv1b\x06proto3" 1029 1239 1030 1240 var ( 1031 1241 file_spindle_agent_v1_agent_proto_rawDescOnce sync.Once ··· 1039 1249 return file_spindle_agent_v1_agent_proto_rawDescData 1040 1250 } 1041 1251 1042 - var file_spindle_agent_v1_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 14) 1252 + var file_spindle_agent_v1_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 17) 1043 1253 var file_spindle_agent_v1_agent_proto_goTypes = []any{ 1044 1254 (*Hello)(nil), // 0: spindle.agent.v1.Hello 1045 1255 (*Init)(nil), // 1: spindle.agent.v1.Init ··· 1054 1264 (*CacheDrainResult)(nil), // 10: spindle.agent.v1.CacheDrainResult 1055 1265 (*Poweroff)(nil), // 11: spindle.agent.v1.Poweroff 1056 1266 (*PoweroffResult)(nil), // 12: spindle.agent.v1.PoweroffResult 1057 - (*Message)(nil), // 13: spindle.agent.v1.Message 1267 + (*OpenDebugShell)(nil), // 13: spindle.agent.v1.OpenDebugShell 1268 + (*PtyData)(nil), // 14: spindle.agent.v1.PtyData 1269 + (*PtyResize)(nil), // 15: spindle.agent.v1.PtyResize 1270 + (*Message)(nil), // 16: spindle.agent.v1.Message 1058 1271 } 1059 1272 var file_spindle_agent_v1_agent_proto_depIdxs = []int32{ 1060 1273 0, // 0: spindle.agent.v1.Message.hello:type_name -> spindle.agent.v1.Hello ··· 1070 1283 10, // 10: spindle.agent.v1.Message.cache_drain_result:type_name -> spindle.agent.v1.CacheDrainResult 1071 1284 11, // 11: spindle.agent.v1.Message.poweroff:type_name -> spindle.agent.v1.Poweroff 1072 1285 12, // 12: spindle.agent.v1.Message.poweroff_result:type_name -> spindle.agent.v1.PoweroffResult 1073 - 13, // [13:13] is the sub-list for method output_type 1074 - 13, // [13:13] is the sub-list for method input_type 1075 - 13, // [13:13] is the sub-list for extension type_name 1076 - 13, // [13:13] is the sub-list for extension extendee 1077 - 0, // [0:13] is the sub-list for field type_name 1286 + 13, // 13: spindle.agent.v1.Message.open_debug_shell:type_name -> spindle.agent.v1.OpenDebugShell 1287 + 14, // 14: spindle.agent.v1.Message.pty_data:type_name -> spindle.agent.v1.PtyData 1288 + 15, // 15: spindle.agent.v1.Message.pty_resize:type_name -> spindle.agent.v1.PtyResize 1289 + 16, // [16:16] is the sub-list for method output_type 1290 + 16, // [16:16] is the sub-list for method input_type 1291 + 16, // [16:16] is the sub-list for extension type_name 1292 + 16, // [16:16] is the sub-list for extension extendee 1293 + 0, // [0:16] is the sub-list for field type_name 1078 1294 } 1079 1295 1080 1296 func init() { file_spindle_agent_v1_agent_proto_init() } ··· 1088 1304 GoPackagePath: reflect.TypeOf(x{}).PkgPath(), 1089 1305 RawDescriptor: unsafe.Slice(unsafe.StringData(file_spindle_agent_v1_agent_proto_rawDesc), len(file_spindle_agent_v1_agent_proto_rawDesc)), 1090 1306 NumEnums: 0, 1091 - NumMessages: 14, 1307 + NumMessages: 17, 1092 1308 NumExtensions: 0, 1093 1309 NumServices: 0, 1094 1310 },
+38 -4
spindle/agentproto/spindle/agent/v1/agent.proto
··· 82 82 string error = 1; 83 83 } 84 84 85 + message OpenDebugShell { 86 + uint32 vsock_port = 1; 87 + string term = 2; 88 + uint32 rows = 3; 89 + uint32 cols = 4; 90 + } 91 + 92 + // changes meaning based on who sends this: 93 + // guest->host is shell output, host->guest is keyboard input 94 + message PtyData { 95 + bytes data = 1; 96 + } 97 + 98 + message PtyResize { 99 + uint32 rows = 1; 100 + uint32 cols = 2; 101 + } 102 + 85 103 message Message { 86 104 option (buf.validate.message).oneof = { 87 105 fields: [ 88 - "hello", "init", "exec_start", "exec_stdout", "exec_stderr", "exec_exit", 89 - "activate_config", "activate_config_result", "built_paths", "cache_drain", 90 - "cache_drain_result", "poweroff", "poweroff_result" 91 - ], 106 + "hello", 107 + "init", 108 + "exec_start", 109 + "exec_stdout", 110 + "exec_stderr", 111 + "exec_exit", 112 + "activate_config", 113 + "activate_config_result", 114 + "built_paths", 115 + "cache_drain", 116 + "cache_drain_result", 117 + "poweroff", 118 + "poweroff_result", 119 + "open_debug_shell", 120 + "pty_data", 121 + "pty_resize" 122 + ] 92 123 required: true 93 124 }; 94 125 ··· 107 138 CacheDrainResult cache_drain_result = 12; 108 139 Poweroff poweroff = 13; 109 140 PoweroffResult poweroff_result = 14; 141 + OpenDebugShell open_debug_shell = 15; 142 + PtyData pty_data = 16; 143 + PtyResize pty_resize = 17; 110 144 }
+11
spindle/config/config.go
··· 78 78 79 79 AgingThreshold time.Duration `env:"AGING_THRESHOLD, default=30s"` 80 80 81 + DebugSSH DebugSSH `env:",prefix=DEBUG_SSH_"` 82 + 81 83 EnableCgroups bool `env:"ENABLE_CGROUPS, default=false"` 82 84 CgroupParent string `env:"CGROUP_PARENT, default=self"` 83 85 CgroupPidsMax int64 `env:"CGROUP_PIDS_MAX, default=4096"` 84 86 CgroupSwapMaxMiB *int64 `env:"CGROUP_SWAP_MAX_MIB"` 85 87 // memory.min that will get assigned to the supervisor (spindle itself) cgroup 86 88 CgroupSupervisorMemoryMinMiB int64 `env:"CGROUP_SUPERVISOR_MEMORY_MIN_MIB, default=512"` 89 + } 90 + 91 + type DebugSSH struct { 92 + Enabled bool `env:"ENABLED, default=false"` 93 + ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:2222"` 94 + // path to private key; if empty, spindle will generate one next to the db 95 + HostKeyPath string `env:"HOST_KEY_PATH"` 96 + // how long to keep a failed wf alive after failure, for sshing in 97 + GracePeriod time.Duration `env:"GRACE_PERIOD, default=5m"` 87 98 } 88 99 89 100 type NixCache struct {
+5 -19
spindle/engine/engine.go
··· 20 20 ErrWorkflowFailed = errors.New("workflow failed") 21 21 ) 22 22 23 - type workflowFinalizer interface { 24 - FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error 25 - } 26 - 27 23 func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 28 24 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 29 25 ··· 51 47 l.Info("using workflow timeout", "timeout", workflowTimeout) 52 48 53 49 for _, w := range wfs { 54 - wg.Add(1) 55 - go func() { 56 - defer wg.Done() 57 - 50 + wg.Go(func() { 58 51 wid := models.WorkflowId{ 59 52 PipelineId: pipelineId, 60 53 Name: w.Name, ··· 117 110 } 118 111 return 119 112 } 113 + // don't put this after the workflowTimeout deadline assignment 114 + // below. engines that implement "ssh-after-fail" rely on the 115 + // unbounded ctx for retaining the workflow after it fails. 120 116 defer eng.DestroyWorkflow(ctx, wid) 121 117 122 118 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) ··· 155 151 } 156 152 } 157 153 158 - if finalizer, ok := eng.(workflowFinalizer); ok { 159 - if err := finalizer.FinalizeWorkflow(ctx, wid, &w, wfLogger); err != nil { 160 - dbErr := db.StatusFailed(wid, err.Error(), -1, n) 161 - if dbErr != nil { 162 - l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 163 - } 164 - return 165 - } 166 - } 167 - 168 154 err = db.StatusSuccess(wid, n) 169 155 if err != nil { 170 156 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 171 157 } 172 - }() 158 + }) 173 159 } 174 160 } 175 161
+31 -2
spindle/engines/microvm/README.md
··· 41 41 spindle expects, they should work. That is: 42 42 - a guest agent is present inside of the image and when that image boots it will 43 43 get started, 44 - - `spindle-workflow` user exists, 45 - - and the work directory is configured (`/workspace`). 44 + - the `spindle-workflow` user exists, is unprivileged (non-zero uid/gid), and has 45 + a usable login shell and home dir set in the image's passwd: workflow steps run 46 + as this user, and the debug shell (see below) launches its passwd shell as a 47 + login shell in its home dir. an unset or `nologin`/`false` shell breaks debug 48 + ssh, 49 + - and the work directory is configured (`/workspace`, with `/workspace/repo` as 50 + the per-step working dir). 46 51 47 52 ## Image discovery 48 53 ··· 233 238 never made it to the destination store. The guest still only ever sees the same 234 239 HTTP binary-cache upload protocol over vsock; it never gets direct access to 235 240 SSH credentials or the destination store itself. 241 + 242 + ### Debug ssh 243 + 244 + When a workflow fails, spindle can keep its microVM alive for a configured grace 245 + window (`MicroVMPipelines.SSH`) and print an `ssh` invocation so you can poke at 246 + the failed VM interactively. Spindle terminates the ssh connection itself and 247 + bridges a pty into the live guest over the agent's vsock; the guest stays 248 + keyless and never runs an ssh daemon. 249 + 250 + Access mirrors a git push: the ssh username is the job id, and the offered 251 + public key is sent to the job's repo knot (`sh.tangled.repo.checkPushAllowed`). 252 + The session is accepted only if that key is allowed to push to the job's repo. 253 + 254 + The shell is deliberately not configurable from either end. It always: 255 + - runs as the `spindle-workflow` user (the ssh username selects the *job*, not a 256 + unix user), 257 + - uses that user's login shell from the image's passwd, launched as a login 258 + shell (`-l`), and 259 + - starts in the dir where the repo was cloned to. 260 + 261 + The only things the client influences are the terminal type and window size 262 + (forwarded from the ssh pty request, and on resize). This relies on the image 263 + configuring `spindle-workflow` properly per the expectations above; in 264 + particular a missing or `nologin`/`false` won't work of course.
+13
spindle/engines/microvm/agent.go
··· 245 245 } 246 246 } 247 247 248 + func (s *AgentSession) OpenDebugShell(req *agentv1.OpenDebugShell) error { 249 + s.mu.Lock() 250 + defer s.mu.Unlock() 251 + 252 + if err := s.enc.Encode(&agentproto.Message{ 253 + Id: "debug-shell", 254 + OpenDebugShell: req, 255 + }); err != nil { 256 + return fmt.Errorf("send open_debug_shell: %w", err) 257 + } 258 + return nil 259 + } 260 + 248 261 func (s *AgentSession) Poweroff(ctx context.Context) error { 249 262 s.mu.Lock() 250 263 defer s.mu.Unlock()
+326
spindle/engines/microvm/debug.go
··· 1 + package microvm 2 + 3 + import ( 4 + "context" 5 + "encoding/hex" 6 + "errors" 7 + "fmt" 8 + "hash/fnv" 9 + "io" 10 + "log/slog" 11 + "net" 12 + "regexp" 13 + "strings" 14 + "sync" 15 + "time" 16 + 17 + "tangled.org/core/spindle/agentproto" 18 + agentv1 "tangled.org/core/spindle/agentproto/gen" 19 + "tangled.org/core/spindle/models" 20 + ) 21 + 22 + const debugAcceptTimeout = 15 * time.Second 23 + 24 + type debugTarget struct { 25 + cid uint32 26 + agent *AgentSession 27 + knot string 28 + repoDid string 29 + wfLogger models.WorkflowLogger 30 + maxAliveAt time.Time 31 + stepCount int // index to emit the debug step at 32 + connected chan struct{} // closed when the user first ssh's in, ending the grace window 33 + released chan struct{} // closed when the user exits the debug shell, to tear down early 34 + } 35 + 36 + var debugHandleSafe = regexp.MustCompile(`[^a-zA-Z0-9_.-]`) 37 + 38 + func newDebugHandle(wid models.WorkflowId) string { 39 + h := fnv.New32a() 40 + _, _ = io.WriteString(h, wid.String()) 41 + token := hex.EncodeToString(h.Sum(nil)) 42 + name := strings.Trim(debugHandleSafe.ReplaceAllString(wid.Name, "-"), "-") 43 + if name == "" { 44 + return token 45 + } 46 + return name + "-" + token 47 + } 48 + 49 + func (e *Engine) registerDebugTarget(wid models.WorkflowId, t debugTarget) { 50 + e.debugMu.Lock() 51 + defer e.debugMu.Unlock() 52 + e.debug[newDebugHandle(wid)] = t 53 + } 54 + 55 + func (e *Engine) unregisterDebugTarget(wid models.WorkflowId) { 56 + e.debugMu.Lock() 57 + defer e.debugMu.Unlock() 58 + delete(e.debug, newDebugHandle(wid)) 59 + } 60 + 61 + // ends the grace window so we hold the VM until the shell exits instead. 62 + func (e *Engine) markDebugConnected(handle string) { 63 + e.debugMu.Lock() 64 + defer e.debugMu.Unlock() 65 + if t, ok := e.debug[handle]; ok && t.connected != nil { 66 + select { 67 + case <-t.connected: // already signalled 68 + default: 69 + close(t.connected) 70 + } 71 + } 72 + } 73 + 74 + func (e *Engine) releaseDebugTarget(handle string) { 75 + e.debugMu.Lock() 76 + defer e.debugMu.Unlock() 77 + t, ok := e.debug[handle] 78 + if !ok { 79 + return 80 + } 81 + delete(e.debug, handle) 82 + if t.released != nil { 83 + close(t.released) 84 + } 85 + } 86 + 87 + func (e *Engine) lookupDebugTarget(handle string) (debugTarget, bool) { 88 + e.debugMu.Lock() 89 + defer e.debugMu.Unlock() 90 + t, ok := e.debug[handle] 91 + return t, ok 92 + } 93 + 94 + func (e *Engine) RepoForJob(jobID string) (knot, repoDid string, ok bool) { 95 + t, found := e.lookupDebugTarget(jobID) 96 + if !found || t.knot == "" || t.repoDid == "" { 97 + return "", "", false 98 + } 99 + return t.knot, t.repoDid, true 100 + } 101 + 102 + func (e *Engine) OpenDebugSession(ctx context.Context, jobID, term string, rows, cols int) (*DebugSession, error) { 103 + target, ok := e.lookupDebugTarget(jobID) 104 + if !ok { 105 + return nil, fmt.Errorf("no live microVM for job %q", jobID) 106 + } 107 + 108 + ln, port, err := listenRandomVsockPort(ctx) 109 + if err != nil { 110 + return nil, fmt.Errorf("listen for debug shell: %w", err) 111 + } 112 + filtered := &cidFilteredVsockListener{Listener: ln, cid: target.cid, logger: e.l} 113 + 114 + if err := target.agent.OpenDebugShell(&agentv1.OpenDebugShell{ 115 + VsockPort: port, 116 + Term: term, 117 + Rows: clampDim(rows), 118 + Cols: clampDim(cols), 119 + }); err != nil { 120 + _ = ln.Close() 121 + return nil, fmt.Errorf("ask guest to open debug shell: %w", err) 122 + } 123 + 124 + conn, err := acceptWithTimeout(ctx, filtered, debugAcceptTimeout) 125 + if err != nil { 126 + _ = ln.Close() 127 + return nil, fmt.Errorf("accept debug shell connection: %w", err) 128 + } 129 + 130 + e.markDebugConnected(jobID) 131 + 132 + return newDebugSession(conn, ln, e.l), nil 133 + } 134 + 135 + func (e *Engine) maybeRetainForDebug(ctx context.Context, wid models.WorkflowId) { 136 + handle := newDebugHandle(wid) 137 + target, ok := e.lookupDebugTarget(handle) 138 + if !ok { 139 + // no target registered means the workflow didn't fail; nothing to retain 140 + return 141 + } 142 + 143 + wfLogger := target.wfLogger 144 + if wfLogger == nil { 145 + wfLogger = models.NullLogger{} 146 + } 147 + step := Step{name: "Debug shell", kind: models.StepKindSystem} 148 + idx := target.stepCount 149 + 150 + wfLogger.ControlWriter(idx, step, models.StepStatusStart).Write([]byte{0}) 151 + defer wfLogger.ControlWriter(idx, step, models.StepStatusEnd).Write([]byte{0}) 152 + 153 + ssh := e.cfg.MicroVMPipelines.DebugSSH 154 + grace := ssh.GracePeriod 155 + 156 + cmd := debugSSHCommand(ssh.ListenAddr, e.cfg.Server.Hostname, handle) 157 + out := wfLogger.DataWriter(idx, "stdout") 158 + fmt.Fprintf(out, "Workflow failed, connect within %s to debug until shell exit or workflow timeout:\n", grace) 159 + fmt.Fprintf(out, " %s\n", cmd) 160 + e.l.Info("retaining failed microVM for debug", "workflow", wid, "grace", grace.String()) 161 + 162 + maxAlive := time.NewTimer(time.Until(target.maxAliveAt)) 163 + defer maxAlive.Stop() 164 + graceTimer := time.NewTimer(grace) 165 + defer graceTimer.Stop() 166 + 167 + // wait for the user to ssh in within the grace window 168 + select { 169 + case <-ctx.Done(): 170 + return 171 + case <-maxAlive.C: 172 + e.l.Info("debug retention hit max VM lifetime; tearing down microVM", "workflow", wid) 173 + return 174 + case <-graceTimer.C: 175 + e.l.Info("nobody ssh'd in within grace; tearing down microVM", "workflow", wid) 176 + return 177 + case <-target.connected: 178 + e.l.Info("debug shell connected; holding microVM until exit", "workflow", wid) 179 + } 180 + 181 + // connected: hold the VM until the user exits or it hits its max lifetime 182 + select { 183 + case <-ctx.Done(): 184 + case <-maxAlive.C: 185 + e.l.Info("debug session hit max VM lifetime; tearing down microVM", "workflow", wid) 186 + case <-target.released: 187 + e.l.Info("debug shell exited; tearing down microVM", "workflow", wid) 188 + } 189 + } 190 + 191 + func debugSSHCommand(listenAddr, hostname, jobID string) string { 192 + host, port := hostname, "" 193 + if h, p, err := net.SplitHostPort(listenAddr); err == nil { 194 + port = p 195 + if h != "" && h != "0.0.0.0" && h != "::" { 196 + host = h 197 + } 198 + } 199 + cmd := "ssh -tt " 200 + if port != "" && port != "22" { 201 + cmd += "-p " + port + " " 202 + } 203 + return cmd + jobID + "@" + host 204 + } 205 + 206 + // bridges an interactive shell over the agentproto vsock. 207 + // Read to it gets the shell output from guest, Write sends the keyboard input from user. 208 + type DebugSession struct { 209 + conn net.Conn 210 + ln net.Listener 211 + enc *agentproto.Encoder 212 + dec *agentproto.Decoder 213 + l *slog.Logger 214 + 215 + out chan []byte 216 + leftover []byte 217 + exitCode int 218 + closeOne sync.Once 219 + } 220 + 221 + func newDebugSession(conn net.Conn, ln net.Listener, l *slog.Logger) *DebugSession { 222 + d := &DebugSession{ 223 + conn: conn, 224 + ln: ln, 225 + enc: agentproto.NewEncoder(conn), 226 + dec: agentproto.NewDecoder(conn), 227 + l: l, 228 + out: make(chan []byte, 16), 229 + } 230 + go d.readLoop() 231 + return d 232 + } 233 + 234 + func (d *DebugSession) readLoop() { 235 + defer close(d.out) 236 + for { 237 + msg, err := d.dec.Decode() 238 + if err != nil { 239 + if !errors.Is(err, io.EOF) { 240 + d.l.Debug("debug shell decode ended", "error", err) 241 + } 242 + return 243 + } 244 + if p := msg.PtyData; p != nil && len(p.Data) > 0 { 245 + d.out <- p.Data 246 + } else if p := msg.ExecExit; p != nil { 247 + d.exitCode = int(p.ExitCode) 248 + return 249 + } 250 + } 251 + } 252 + 253 + func (d *DebugSession) Read(p []byte) (int, error) { 254 + if len(d.leftover) == 0 { 255 + chunk, ok := <-d.out 256 + if !ok { 257 + return 0, io.EOF 258 + } 259 + d.leftover = chunk 260 + } 261 + n := copy(p, d.leftover) 262 + d.leftover = d.leftover[n:] 263 + return n, nil 264 + } 265 + 266 + func (d *DebugSession) Write(p []byte) (int, error) { 267 + if err := d.enc.Encode(&agentproto.Message{ 268 + Id: "pty", 269 + PtyData: &agentv1.PtyData{Data: append([]byte(nil), p...)}, 270 + }); err != nil { 271 + return 0, err 272 + } 273 + return len(p), nil 274 + } 275 + 276 + func (d *DebugSession) Resize(rows, cols int) error { 277 + return d.enc.Encode(&agentproto.Message{ 278 + Id: "pty", 279 + PtyResize: &agentv1.PtyResize{Rows: clampDim(rows), Cols: clampDim(cols)}, 280 + }) 281 + } 282 + 283 + func (d *DebugSession) ExitCode() int { return d.exitCode } 284 + 285 + func (d *DebugSession) Close() error { 286 + var err error 287 + d.closeOne.Do(func() { 288 + err = d.conn.Close() 289 + if d.ln != nil { 290 + _ = d.ln.Close() 291 + } 292 + }) 293 + return err 294 + } 295 + 296 + func acceptWithTimeout(ctx context.Context, ln net.Listener, timeout time.Duration) (net.Conn, error) { 297 + ctx, cancel := context.WithTimeout(ctx, timeout) 298 + defer cancel() 299 + 300 + type result struct { 301 + conn net.Conn 302 + err error 303 + } 304 + ch := make(chan result, 1) 305 + go func() { 306 + conn, err := ln.Accept() 307 + ch <- result{conn, err} 308 + }() 309 + 310 + select { 311 + case <-ctx.Done(): 312 + return nil, ctx.Err() 313 + case r := <-ch: 314 + return r.conn, r.err 315 + } 316 + } 317 + 318 + func clampDim(v int) uint32 { 319 + if v < 1 { 320 + return 1 321 + } 322 + if v > 65535 { 323 + return 65535 324 + } 325 + return uint32(v) 326 + }
+171
spindle/engines/microvm/debugssh.go
··· 1 + package microvm 2 + 3 + import ( 4 + "context" 5 + "crypto/ed25519" 6 + "crypto/rand" 7 + "encoding/pem" 8 + "fmt" 9 + "io" 10 + "net/http" 11 + "os" 12 + "path/filepath" 13 + "time" 14 + 15 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 16 + "github.com/gliderlabs/ssh" 17 + gossh "golang.org/x/crypto/ssh" 18 + "tangled.org/core/api/tangled" 19 + "tangled.org/core/hostutil" 20 + ) 21 + 22 + // debug ssh: terminates ssh here, bridges a pty into a live (failed) microVM 23 + // over the guest's agent conn. the guest stays keyless. access mirrors git 24 + // push: the offered key goes to the repo's knot 25 + // (sh.tangled.repo.checkPushAllowed) and is accepted only if it can push to the 26 + // job's repo. 27 + 28 + const debugAuthTimeout = 5 * time.Second 29 + 30 + func (e *Engine) serveDebugSSH(ctx context.Context) { 31 + dbg := e.cfg.MicroVMPipelines.DebugSSH 32 + if !dbg.Enabled || dbg.ListenAddr == "" { 33 + return 34 + } 35 + addr := dbg.ListenAddr 36 + httpc := &http.Client{Timeout: debugAuthTimeout} 37 + 38 + srv := &ssh.Server{ 39 + Addr: addr, 40 + Handler: e.debugHandle, 41 + PublicKeyHandler: func(c ssh.Context, key ssh.PublicKey) bool { 42 + return e.checkDebugAuth(c, c.User(), key, httpc) 43 + }, 44 + } 45 + keyPath := e.cfg.MicroVMPipelines.DebugSSH.HostKeyPath 46 + if keyPath == "" { 47 + keyPath = filepath.Join(filepath.Dir(e.cfg.Server.DBPath), "debug_ssh_host_key") 48 + } 49 + if err := ensureDebugHostKey(keyPath); err != nil { 50 + e.l.Error("debug ssh: ensure host key", "path", keyPath, "err", err) 51 + return 52 + } 53 + if err := srv.SetOption(ssh.HostKeyFile(keyPath)); err != nil { 54 + e.l.Error("debug ssh: load host key", "path", keyPath, "err", err) 55 + return 56 + } 57 + 58 + go func() { 59 + <-ctx.Done() 60 + _ = srv.Close() 61 + }() 62 + 63 + e.l.Info("starting debug ssh server", "address", addr) 64 + if err := srv.ListenAndServe(); err != nil && err != ssh.ErrServerClosed { 65 + e.l.Error("debug ssh server stopped", "err", err) 66 + } 67 + } 68 + 69 + func ensureDebugHostKey(path string) error { 70 + if _, err := os.Stat(path); err == nil { 71 + return nil 72 + } else if !os.IsNotExist(err) { 73 + return fmt.Errorf("stat host key: %w", err) 74 + } 75 + 76 + _, priv, err := ed25519.GenerateKey(rand.Reader) 77 + if err != nil { 78 + return fmt.Errorf("generate host key: %w", err) 79 + } 80 + block, err := gossh.MarshalPrivateKey(priv, "") 81 + if err != nil { 82 + return fmt.Errorf("marshal host key: %w", err) 83 + } 84 + 85 + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { 86 + return fmt.Errorf("create host key dir: %w", err) 87 + } 88 + if err := os.WriteFile(path, pem.EncodeToMemory(block), 0o600); err != nil { 89 + return fmt.Errorf("write host key: %w", err) 90 + } 91 + return nil 92 + } 93 + 94 + func (e *Engine) debugHandle(sess ssh.Session) { 95 + ptyReq, winCh, isPty := sess.Pty() 96 + if !isPty { 97 + io.WriteString(sess.Stderr(), "error: no terminal allocated; use `ssh -t`\n") 98 + _ = sess.Exit(1) 99 + return 100 + } 101 + 102 + jobID := sess.User() 103 + l := e.l.With("component", "debugssh", "job", jobID) 104 + 105 + debug, err := e.OpenDebugSession(sess.Context(), jobID, ptyReq.Term, ptyReq.Window.Height, ptyReq.Window.Width) 106 + if err != nil { 107 + fmt.Fprintf(sess.Stderr(), "error: %v\n", err) 108 + _ = sess.Exit(1) 109 + return 110 + } 111 + defer debug.Close() 112 + l.Info("debug shell opened") 113 + 114 + go func() { 115 + for win := range winCh { 116 + if err := debug.Resize(win.Height, win.Width); err != nil { 117 + l.Debug("debug ssh resize failed", "error", err) 118 + } 119 + } 120 + }() 121 + 122 + // keyboard -> shell, runs until the client hangs up 123 + go func() { _, _ = io.Copy(debug, sess) }() 124 + // shell -> client, returns when the shell exits (Read hits EOF) 125 + _, _ = io.Copy(sess, debug) 126 + 127 + code := debug.ExitCode() 128 + l.Info("debug shell closed", "exitCode", code) 129 + // the user is done; let retention tear the VM down now instead of waiting 130 + // out the rest of the grace period 131 + e.releaseDebugTarget(jobID) 132 + _ = sess.Exit(code) 133 + } 134 + 135 + func (e *Engine) checkDebugAuth(ctx context.Context, jobID string, key ssh.PublicKey, httpc *http.Client) bool { 136 + l := e.l.With("component", "debugssh", "job", jobID, "keyType", key.Type()) 137 + 138 + knot, repoDid, ok := e.RepoForJob(jobID) 139 + if !ok { 140 + l.Warn("debug ssh: no live job / unknown repo") 141 + return false 142 + } 143 + 144 + host, noSSL, err := hostutil.ParseHostname(knot) 145 + if err != nil { 146 + l.Error("debug ssh: bad knot host", "knot", knot, "error", err) 147 + return false 148 + } 149 + scheme := "https" 150 + if noSSL { 151 + scheme = "http" 152 + } 153 + xc := &indigoxrpc.Client{Host: fmt.Sprintf("%s://%s", scheme, host), Client: httpc} 154 + 155 + reqCtx, cancel := context.WithTimeout(ctx, debugAuthTimeout) 156 + defer cancel() 157 + 158 + out, err := tangled.RepoCheckPushAllowed(reqCtx, xc, string(gossh.MarshalAuthorizedKey(key)), repoDid) 159 + if err != nil { 160 + l.Error("debug ssh: push-allowed check failed", "knot", knot, "repo", repoDid, "error", err) 161 + return false 162 + } 163 + if !out.Allowed { 164 + l.Warn("debug ssh: key not allowed to push", "knot", knot, "repo", repoDid) 165 + return false 166 + } 167 + if out.Did != nil { 168 + l.Info("debug ssh: authorized", "did", *out.Did, "knot", knot, "repo", repoDid) 169 + } 170 + return true 171 + }
+30 -6
spindle/engines/microvm/engine.go
··· 52 52 53 53 cleanupMu sync.Mutex 54 54 cleanup map[string][]cleanupFunc 55 + 56 + debugMu sync.Mutex 57 + debug map[string]debugTarget 55 58 } 56 59 57 60 type Step struct { ··· 89 92 } 90 93 } 91 94 92 - return &Engine{ 95 + e := &Engine{ 93 96 l: l, 94 97 cfg: cfg, 95 98 db: d, ··· 97 100 scheduler: engine.NewResourceScheduler(budget, max, agingThreshold), 98 101 cgroupParent: cgroupParent, 99 102 cleanup: make(map[string][]cleanupFunc), 100 - }, nil 103 + debug: make(map[string]debugTarget), 104 + } 105 + 106 + if cfg.MicroVMPipelines.DebugSSH.ListenAddr != "" { 107 + go e.serveDebugSSH(ctx) 108 + } 109 + 110 + return e, nil 101 111 } 102 112 103 113 func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { ··· 272 282 return err 273 283 } 274 284 state.VM = vm 285 + state.StartedAt = time.Now() 275 286 276 287 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout) 277 288 defer cancelAccept() ··· 294 305 return err 295 306 } 296 307 state.Agent = agentSession 308 + state.CID = cid 297 309 wf.Data = state 298 310 299 311 e.registerCleanup(wid, func(ctx context.Context) error { ··· 368 380 369 381 if exitCode != 0 { 370 382 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode) 383 + e.registerDebugTarget(wid, debugTarget{ 384 + cid: state.CID, 385 + agent: state.Agent, 386 + knot: w.Environment["TANGLED_REPO_KNOT"], 387 + repoDid: w.Environment["TANGLED_REPO_REPO_DID"], 388 + wfLogger: wfLogger, 389 + stepCount: len(w.Steps), 390 + maxAliveAt: state.StartedAt.Add(e.WorkflowTimeout()), 391 + connected: make(chan struct{}), 392 + released: make(chan struct{}), 393 + }) 371 394 return engine.ErrWorkflowFailed 372 395 } 373 396 return nil ··· 518 541 } 519 542 520 543 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 544 + if e.cfg.MicroVMPipelines.DebugSSH.Enabled { 545 + // keep a failed VM alive for the grace period before we tear it down 546 + e.maybeRetainForDebug(ctx, wid) 547 + } 548 + 521 549 fns := e.drainCleanups(wid) 522 550 523 551 var cleanupErr error ··· 528 556 } 529 557 } 530 558 return cleanupErr 531 - } 532 - 533 - func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error { 534 - return nil 535 559 } 536 560 537 561 func (e *Engine) WorkflowTimeout() time.Duration {
+5
spindle/engines/microvm/vm.go
··· 155 155 CacheReadURLs []string 156 156 CacheTrustedPublicKeys []string 157 157 VM VMHandle 158 + CID uint32 158 159 Agent *AgentSession 159 160 ReadCache *ReadCacheProxy 160 161 UploadCache *UploadCacheProxy 161 162 DNSProxy *DNSProxy 162 163 WorkDir string 163 164 NixOSToplevelCache nixosToplevelCacheStore 165 + StartedAt time.Time // when the VM booted, for the max-lifetime cap 164 166 } 165 167 166 168 func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 167 169 if state == nil { 168 170 return nil 169 171 } 172 + 173 + // stop advertising this VM for debug shells before we tear it down 174 + e.unregisterDebugTarget(wid) 170 175 171 176 ctx = context.WithoutCancel(ctx) 172 177