Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::cache::{CacheUploadManager, ReadCacheProxy, WriteCacheProxy}; 2use crate::command::Spec; 3use crate::dns_proxy::DnsProxy; 4use crate::exec; 5use crate::nix_config::{self, SYSTEMCTL_EXECUTABLE}; 6use crate::on_payload; 7use crate::protocol::{self, Message, v1}; 8use crate::{activation, command}; 9use anyhow::{Context, Result, bail}; 10use std::time::Duration; 11use tokio::io::{AsyncWrite, BufReader}; 12use tokio::sync::mpsc::{self, Sender}; 13use tokio::task::{JoinError, JoinSet}; 14use tokio_vsock::{VsockAddr, VsockStream}; 15use tracing::{info, warn}; 16 17pub async fn run(host_cid: u32, port: u32) -> Result<()> { 18 let mut conn = VsockStream::connect(VsockAddr::new(host_cid, port)) 19 .await 20 .with_context(|| format!("dial host vsock cid={host_cid} port={port}"))?; 21 22 send_hello(&mut conn).await?; 23 24 let (reader_conn, writer_conn) = tokio::io::split(conn); 25 let (out_tx, out_rx) = mpsc::channel::<Message>(256); 26 let writer = tokio::spawn(async move { writer_loop(writer_conn, out_rx).await }); 27 let mut reader = BufReader::new(reader_conn); 28 29 let init = match protocol::read_message(&mut reader).await? { 30 Some(Message { 31 init: Some(init), .. 32 }) => init, 33 Some(other) => bail!("expected init, got {}", protocol::kind(&other)), 34 None => bail!("read init: EOF"), 35 }; 36 info!(job_id = %init.job_id, "received init"); 37 38 let read_proxy = ReadCacheProxy::start(host_cid, init.cache_read_proxy_port) 39 .await 40 .context("start read cache proxy")?; 41 let write_proxy = WriteCacheProxy::start(host_cid, init.cache_upload_proxy_port) 42 .await 43 .context("start write cache proxy")?; 44 let _dns_proxy = DnsProxy::start(host_cid, init.dns_proxy_port) 45 .await 46 .context("start dns proxy")?; 47 let _cache_cfg = nix_config::configure( 48 &init, 49 read_proxy.as_ref().map(ReadCacheProxy::url).unwrap_or(""), 50 ) 51 .await 52 .context("configure nix cache")?; 53 let uploader = CacheUploadManager::start( 54 write_proxy.as_ref().map(WriteCacheProxy::url).unwrap_or(""), 55 out_tx.clone(), 56 ) 57 .await 58 .context("start cache upload manager")?; 59 60 let mut tasks = JoinSet::new(); 61 let read_result: Result<()> = loop { 62 tokio::select! { 63 read = protocol::read_message(&mut reader) => match read { 64 Ok(Some(msg)) => spawn_message_task(&mut tasks, msg, &out_tx, uploader.clone()), 65 Ok(None) => break Ok(()), 66 Err(error) => break Err(error).context("read message"), 67 }, 68 Some(result) = tasks.join_next(), if !tasks.is_empty() => { 69 log_task_result(result, false); 70 } 71 } 72 }; 73 74 tasks.abort_all(); 75 while let Some(result) = tasks.join_next().await { 76 log_task_result(result, true); 77 } 78 79 drop(out_tx); 80 let _ = writer.await; 81 read_result?; 82 Ok(()) 83} 84 85fn spawn_message_task( 86 tasks: &mut JoinSet<()>, 87 msg: Message, 88 out_tx: &Sender<Message>, 89 uploader: Option<CacheUploadManager>, 90) { 91 let kind = protocol::kind(&msg); 92 let handle = on_payload!(msg, { 93 activate_config => tasks.spawn(activation::run(msg.id, activate_config, out_tx.clone())), 94 exec_start => tasks.spawn(exec::run(msg.id, exec_start, out_tx.clone())), 95 cache_drain => tasks.spawn(run_cache_drain(msg.id, cache_drain, out_tx.clone(), uploader)), 96 poweroff => tasks.spawn(run_poweroff(msg.id, poweroff, out_tx.clone())), 97 }); 98 if handle.is_none() { 99 warn!(kind, "ignoring unsupported message"); 100 } 101} 102 103fn log_task_result(result: Result<(), JoinError>, shutting_down: bool) { 104 match result { 105 Ok(()) => {} 106 Err(error) if shutting_down && error.is_cancelled() => {} 107 Err(error) => warn!(%error, "session handler task failed"), 108 } 109} 110 111async fn writer_loop<W>(mut conn: W, mut rx: mpsc::Receiver<Message>) 112where 113 W: AsyncWrite + Unpin, 114{ 115 while let Some(msg) = rx.recv().await { 116 if let Err(error) = protocol::write_message(&mut conn, &msg).await { 117 warn!(%error, "failed to write protocol message"); 118 break; 119 } 120 } 121} 122 123async fn send_hello(conn: &mut VsockStream) -> Result<()> { 124 let boot_id = tokio::fs::read_to_string("/proc/sys/kernel/random/boot_id") 125 .await 126 .unwrap_or_default() 127 .trim() 128 .to_owned(); 129 let nix_version = nix_config::nix_version().await; 130 131 let hello_payload = v1::Hello { 132 protocol_version: protocol::PROTOCOL_VERSION, 133 agent_version: env!("CARGO_PKG_VERSION").to_string(), 134 boot_id: boot_id.clone(), 135 nix_version: nix_version.clone(), 136 }; 137 info!( 138 protocol = hello_payload.protocol_version, 139 version = %hello_payload.agent_version, 140 boot = %hello_payload.boot_id, 141 nix = %hello_payload.nix_version, 142 "sent hello" 143 ); 144 let hello = Message { 145 id: "hello".to_owned(), 146 hello: Some(hello_payload), 147 ..Default::default() 148 }; 149 150 protocol::write_message(conn, &hello) 151 .await 152 .context("send hello")?; 153 Ok(()) 154} 155 156async fn run_cache_drain( 157 id: String, 158 req: v1::CacheDrain, 159 out: Sender<Message>, 160 uploader: Option<CacheUploadManager>, 161) { 162 let timeout = 163 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds))); 164 let stats = match uploader.as_ref() { 165 Some(uploader) => uploader.drain(timeout).await, 166 None => Default::default(), 167 }; 168 169 if let Some(error) = &stats.last_error { 170 warn!( 171 %id, 172 pending = stats.pending, 173 active = stats.active, 174 uploaded = stats.uploaded, 175 failed = stats.failed, 176 %error, 177 "cache drain completed with error" 178 ); 179 } else { 180 info!( 181 %id, 182 uploaded = stats.uploaded, 183 failed = stats.failed, 184 "cache drain completed" 185 ); 186 } 187 188 let result = Message { 189 id, 190 cache_drain_result: Some(v1::CacheDrainResult { 191 error: protocol::error_or_empty(stats.last_error), 192 cache_queued: stats.pending, 193 cache_active: stats.active, 194 cache_uploaded: stats.uploaded, 195 cache_failed: stats.failed, 196 }), 197 ..Default::default() 198 }; 199 let _ = out.send(result).await; 200} 201 202async fn run_poweroff(id: String, _req: v1::Poweroff, out: Sender<Message>) { 203 let result = Message { 204 id, 205 poweroff_result: Some(v1::PoweroffResult { 206 error: String::new(), 207 }), 208 ..Default::default() 209 }; 210 let _ = out.send(result).await; 211 212 tokio::spawn(async move { 213 tokio::time::sleep(Duration::from_millis(100)).await; 214 215 // prefer a clean shutdown through the init system when one is around 216 // (systemd on NixOS, busybox/openrc elsewhere), then fall back to the 217 // raw reboot(2) syscall on minimal guests 218 for poweroff in [SYSTEMCTL_EXECUTABLE, "/sbin/poweroff", "/usr/sbin/poweroff"] { 219 if !std::path::Path::new(poweroff).exists() { 220 continue; 221 } 222 let mut spec = Spec::new(poweroff).timeout(Duration::from_secs(5)); 223 if poweroff == SYSTEMCTL_EXECUTABLE { 224 spec = spec.args(["poweroff"]); 225 } 226 match command::run_capture(spec).await { 227 Ok(output) if output.success() => return, 228 Ok(output) => warn!( 229 %poweroff, 230 exit_code = output.exit.exit_code, 231 error = ?output.exit.error, 232 output = %output.combined_lossy(), 233 "poweroff command failed" 234 ), 235 Err(error) => warn!(%poweroff, %error, "poweroff command failed"), 236 } 237 } 238 239 // only ever returns on failure 240 let error = 241 nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF).unwrap_err(); 242 warn!(%error, "reboot(RB_POWER_OFF) syscall failed"); 243 }); 244}