Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::cache::{CacheUploadManager, ReadCacheProxy, WriteCacheProxy}; 2use crate::command::Spec; 3use crate::dns_proxy::DnsProxy; 4use crate::exec; 5use crate::nix_config::{self, SYSTEMCTL_EXECUTABLE}; 6use crate::on_payload; 7use crate::protocol::{self, Message, v1}; 8use crate::pty; 9use crate::{activation, command}; 10use anyhow::{Context, Result, bail}; 11use std::time::Duration; 12use tokio::io::{AsyncWrite, BufReader}; 13use tokio::sync::mpsc::{self, Sender}; 14use tokio::task::{JoinError, JoinSet}; 15use tokio_vsock::{VsockAddr, VsockStream}; 16use tracing::{info, warn}; 17 18pub async fn run(host_cid: u32, port: u32) -> Result<()> { 19 let mut conn = VsockStream::connect(VsockAddr::new(host_cid, port)) 20 .await 21 .with_context(|| format!("dial host vsock cid={host_cid} port={port}"))?; 22 23 send_hello(&mut conn).await?; 24 25 let (reader_conn, writer_conn) = tokio::io::split(conn); 26 let (out_tx, out_rx) = mpsc::channel::<Message>(256); 27 let writer = tokio::spawn(async move { writer_loop(writer_conn, out_rx).await }); 28 let mut reader = BufReader::new(reader_conn); 29 30 let init = match protocol::read_message(&mut reader).await? { 31 Some(Message { 32 init: Some(init), .. 33 }) => init, 34 Some(other) => bail!("expected init, got {}", protocol::kind(&other)), 35 None => bail!("read init: EOF"), 36 }; 37 info!(job_id = %init.job_id, "received init"); 38 39 let read_proxy = ReadCacheProxy::start(host_cid, init.cache_read_proxy_port) 40 .await 41 .context("start read cache proxy")?; 42 let write_proxy = WriteCacheProxy::start(host_cid, init.cache_upload_proxy_port) 43 .await 44 .context("start write cache proxy")?; 45 let _dns_proxy = DnsProxy::start(host_cid, init.dns_proxy_port) 46 .await 47 .context("start dns proxy")?; 48 let _cache_cfg = nix_config::configure( 49 &init, 50 read_proxy.as_ref().map(ReadCacheProxy::url).unwrap_or(""), 51 ) 52 .await 53 .context("configure nix cache")?; 54 let uploader = CacheUploadManager::start( 55 write_proxy.as_ref().map(WriteCacheProxy::url).unwrap_or(""), 56 out_tx.clone(), 57 ) 58 .await 59 .context("start cache upload manager")?; 60 61 let mut tasks = JoinSet::new(); 62 let read_result: Result<()> = loop { 63 tokio::select! { 64 read = protocol::read_message(&mut reader) => match read { 65 Ok(Some(msg)) => spawn_message_task(&mut tasks, host_cid, msg, &out_tx, uploader.clone()), 66 Ok(None) => break Ok(()), 67 Err(error) => break Err(error).context("read message"), 68 }, 69 Some(result) = tasks.join_next(), if !tasks.is_empty() => { 70 log_task_result(result, false); 71 } 72 } 73 }; 74 75 tasks.abort_all(); 76 while let Some(result) = tasks.join_next().await { 77 log_task_result(result, true); 78 } 79 80 drop(out_tx); 81 let _ = writer.await; 82 read_result?; 83 Ok(()) 84} 85 86fn spawn_message_task( 87 tasks: &mut JoinSet<()>, 88 host_cid: u32, 89 msg: Message, 90 out_tx: &Sender<Message>, 91 uploader: Option<CacheUploadManager>, 92) { 93 let kind = protocol::kind(&msg); 94 let handle = on_payload!(msg, { 95 activate_config => tasks.spawn(activation::run(msg.id, activate_config, out_tx.clone())), 96 exec_start => tasks.spawn(exec::run(msg.id, exec_start, out_tx.clone())), 97 cache_drain => tasks.spawn(run_cache_drain(msg.id, cache_drain, out_tx.clone(), uploader)), 98 poweroff => tasks.spawn(run_poweroff(msg.id, poweroff, out_tx.clone())), 99 open_debug_shell => tasks.spawn(pty::run(host_cid, open_debug_shell)), 100 }); 101 if handle.is_none() { 102 warn!(kind, "ignoring unsupported message"); 103 } 104} 105 106fn log_task_result(result: Result<(), JoinError>, shutting_down: bool) { 107 match result { 108 Ok(()) => {} 109 Err(error) if shutting_down && error.is_cancelled() => {} 110 Err(error) => warn!(%error, "session handler task failed"), 111 } 112} 113 114async fn writer_loop<W>(mut conn: W, mut rx: mpsc::Receiver<Message>) 115where 116 W: AsyncWrite + Unpin, 117{ 118 while let Some(msg) = rx.recv().await { 119 if let Err(error) = protocol::write_message(&mut conn, &msg).await { 120 warn!(%error, "failed to write protocol message"); 121 break; 122 } 123 } 124} 125 126async fn send_hello(conn: &mut VsockStream) -> Result<()> { 127 let boot_id = tokio::fs::read_to_string("/proc/sys/kernel/random/boot_id") 128 .await 129 .unwrap_or_default() 130 .trim() 131 .to_owned(); 132 let nix_version = nix_config::nix_version().await; 133 134 let hello_payload = v1::Hello { 135 protocol_version: protocol::PROTOCOL_VERSION, 136 agent_version: env!("CARGO_PKG_VERSION").to_string(), 137 boot_id: boot_id.clone(), 138 nix_version: nix_version.clone(), 139 }; 140 info!( 141 protocol = hello_payload.protocol_version, 142 version = %hello_payload.agent_version, 143 boot = %hello_payload.boot_id, 144 nix = %hello_payload.nix_version, 145 "sent hello" 146 ); 147 let hello = Message { 148 id: "hello".to_owned(), 149 hello: Some(hello_payload), 150 ..Default::default() 151 }; 152 153 protocol::write_message(conn, &hello) 154 .await 155 .context("send hello")?; 156 Ok(()) 157} 158 159async fn run_cache_drain( 160 id: String, 161 req: v1::CacheDrain, 162 out: Sender<Message>, 163 uploader: Option<CacheUploadManager>, 164) { 165 let timeout = 166 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds))); 167 let stats = match uploader.as_ref() { 168 Some(uploader) => uploader.drain(timeout).await, 169 None => Default::default(), 170 }; 171 172 if let Some(error) = &stats.last_error { 173 warn!( 174 %id, 175 pending = stats.pending, 176 active = stats.active, 177 uploaded = stats.uploaded, 178 failed = stats.failed, 179 %error, 180 "cache drain completed with error" 181 ); 182 } else { 183 info!( 184 %id, 185 uploaded = stats.uploaded, 186 failed = stats.failed, 187 "cache drain completed" 188 ); 189 } 190 191 let result = Message { 192 id, 193 cache_drain_result: Some(v1::CacheDrainResult { 194 error: protocol::error_or_empty(stats.last_error), 195 cache_queued: stats.pending, 196 cache_active: stats.active, 197 cache_uploaded: stats.uploaded, 198 cache_failed: stats.failed, 199 }), 200 ..Default::default() 201 }; 202 let _ = out.send(result).await; 203} 204 205async fn run_poweroff(id: String, _req: v1::Poweroff, out: Sender<Message>) { 206 let result = Message { 207 id, 208 poweroff_result: Some(v1::PoweroffResult { 209 error: String::new(), 210 }), 211 ..Default::default() 212 }; 213 let _ = out.send(result).await; 214 215 tokio::spawn(async move { 216 tokio::time::sleep(Duration::from_millis(100)).await; 217 218 // prefer a clean shutdown through the init system when one is around 219 // (systemd on NixOS, busybox/openrc elsewhere), then fall back to the 220 // raw reboot(2) syscall on minimal guests 221 for poweroff in [SYSTEMCTL_EXECUTABLE, "/sbin/poweroff", "/usr/sbin/poweroff"] { 222 if !std::path::Path::new(poweroff).exists() { 223 continue; 224 } 225 let mut spec = Spec::new(poweroff).timeout(Duration::from_secs(5)); 226 if poweroff == SYSTEMCTL_EXECUTABLE { 227 spec = spec.args(["poweroff"]); 228 } 229 match command::run_capture(spec).await { 230 Ok(output) if output.success() => return, 231 Ok(output) => warn!( 232 %poweroff, 233 exit_code = output.exit.exit_code, 234 error = ?output.exit.error, 235 output = %output.combined_lossy(), 236 "poweroff command failed" 237 ), 238 Err(error) => warn!(%poweroff, %error, "poweroff command failed"), 239 } 240 } 241 242 // only ever returns on failure 243 let error = 244 nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF).unwrap_err(); 245 warn!(%error, "reboot(RB_POWER_OFF) syscall failed"); 246 }); 247}