Monorepo for Tangled
tangled.org
1use crate::cache::{CacheUploadManager, ReadCacheProxy, WriteCacheProxy};
2use crate::command::Spec;
3use crate::dns_proxy::DnsProxy;
4use crate::exec;
5use crate::nix_config::{self, SYSTEMCTL_EXECUTABLE};
6use crate::on_payload;
7use crate::protocol::{self, Message, v1};
8use crate::pty;
9use crate::{activation, command};
10use anyhow::{Context, Result, bail};
11use std::time::Duration;
12use tokio::io::{AsyncWrite, BufReader};
13use tokio::sync::mpsc::{self, Sender};
14use tokio::task::{JoinError, JoinSet};
15use tokio_vsock::{VsockAddr, VsockStream};
16use tracing::{info, warn};
17
18pub async fn run(host_cid: u32, port: u32) -> Result<()> {
19 let mut conn = VsockStream::connect(VsockAddr::new(host_cid, port))
20 .await
21 .with_context(|| format!("dial host vsock cid={host_cid} port={port}"))?;
22
23 send_hello(&mut conn).await?;
24
25 let (reader_conn, writer_conn) = tokio::io::split(conn);
26 let (out_tx, out_rx) = mpsc::channel::<Message>(256);
27 let writer = tokio::spawn(async move { writer_loop(writer_conn, out_rx).await });
28 let mut reader = BufReader::new(reader_conn);
29
30 let init = match protocol::read_message(&mut reader).await? {
31 Some(Message {
32 init: Some(init), ..
33 }) => init,
34 Some(other) => bail!("expected init, got {}", protocol::kind(&other)),
35 None => bail!("read init: EOF"),
36 };
37 info!(job_id = %init.job_id, "received init");
38
39 let read_proxy = ReadCacheProxy::start(host_cid, init.cache_read_proxy_port)
40 .await
41 .context("start read cache proxy")?;
42 let write_proxy = WriteCacheProxy::start(host_cid, init.cache_upload_proxy_port)
43 .await
44 .context("start write cache proxy")?;
45 let _dns_proxy = DnsProxy::start(host_cid, init.dns_proxy_port)
46 .await
47 .context("start dns proxy")?;
48 let _cache_cfg = nix_config::configure(
49 &init,
50 read_proxy.as_ref().map(ReadCacheProxy::url).unwrap_or(""),
51 )
52 .await
53 .context("configure nix cache")?;
54 let uploader = CacheUploadManager::start(
55 write_proxy.as_ref().map(WriteCacheProxy::url).unwrap_or(""),
56 out_tx.clone(),
57 )
58 .await
59 .context("start cache upload manager")?;
60
61 let mut tasks = JoinSet::new();
62 let read_result: Result<()> = loop {
63 tokio::select! {
64 read = protocol::read_message(&mut reader) => match read {
65 Ok(Some(msg)) => spawn_message_task(&mut tasks, host_cid, msg, &out_tx, uploader.clone()),
66 Ok(None) => break Ok(()),
67 Err(error) => break Err(error).context("read message"),
68 },
69 Some(result) = tasks.join_next(), if !tasks.is_empty() => {
70 log_task_result(result, false);
71 }
72 }
73 };
74
75 tasks.abort_all();
76 while let Some(result) = tasks.join_next().await {
77 log_task_result(result, true);
78 }
79
80 drop(out_tx);
81 let _ = writer.await;
82 read_result?;
83 Ok(())
84}
85
86fn spawn_message_task(
87 tasks: &mut JoinSet<()>,
88 host_cid: u32,
89 msg: Message,
90 out_tx: &Sender<Message>,
91 uploader: Option<CacheUploadManager>,
92) {
93 let kind = protocol::kind(&msg);
94 let handle = on_payload!(msg, {
95 activate_config => tasks.spawn(activation::run(msg.id, activate_config, out_tx.clone())),
96 exec_start => tasks.spawn(exec::run(msg.id, exec_start, out_tx.clone())),
97 cache_drain => tasks.spawn(run_cache_drain(msg.id, cache_drain, out_tx.clone(), uploader)),
98 poweroff => tasks.spawn(run_poweroff(msg.id, poweroff, out_tx.clone())),
99 open_debug_shell => tasks.spawn(pty::run(host_cid, open_debug_shell)),
100 });
101 if handle.is_none() {
102 warn!(kind, "ignoring unsupported message");
103 }
104}
105
106fn log_task_result(result: Result<(), JoinError>, shutting_down: bool) {
107 match result {
108 Ok(()) => {}
109 Err(error) if shutting_down && error.is_cancelled() => {}
110 Err(error) => warn!(%error, "session handler task failed"),
111 }
112}
113
114async fn writer_loop<W>(mut conn: W, mut rx: mpsc::Receiver<Message>)
115where
116 W: AsyncWrite + Unpin,
117{
118 while let Some(msg) = rx.recv().await {
119 if let Err(error) = protocol::write_message(&mut conn, &msg).await {
120 warn!(%error, "failed to write protocol message");
121 break;
122 }
123 }
124}
125
126async fn send_hello(conn: &mut VsockStream) -> Result<()> {
127 let boot_id = tokio::fs::read_to_string("/proc/sys/kernel/random/boot_id")
128 .await
129 .unwrap_or_default()
130 .trim()
131 .to_owned();
132 let nix_version = nix_config::nix_version().await;
133
134 let hello_payload = v1::Hello {
135 protocol_version: protocol::PROTOCOL_VERSION,
136 agent_version: env!("CARGO_PKG_VERSION").to_string(),
137 boot_id: boot_id.clone(),
138 nix_version: nix_version.clone(),
139 };
140 info!(
141 protocol = hello_payload.protocol_version,
142 version = %hello_payload.agent_version,
143 boot = %hello_payload.boot_id,
144 nix = %hello_payload.nix_version,
145 "sent hello"
146 );
147 let hello = Message {
148 id: "hello".to_owned(),
149 hello: Some(hello_payload),
150 ..Default::default()
151 };
152
153 protocol::write_message(conn, &hello)
154 .await
155 .context("send hello")?;
156 Ok(())
157}
158
159async fn run_cache_drain(
160 id: String,
161 req: v1::CacheDrain,
162 out: Sender<Message>,
163 uploader: Option<CacheUploadManager>,
164) {
165 let timeout =
166 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds)));
167 let stats = match uploader.as_ref() {
168 Some(uploader) => uploader.drain(timeout).await,
169 None => Default::default(),
170 };
171
172 if let Some(error) = &stats.last_error {
173 warn!(
174 %id,
175 pending = stats.pending,
176 active = stats.active,
177 uploaded = stats.uploaded,
178 failed = stats.failed,
179 %error,
180 "cache drain completed with error"
181 );
182 } else {
183 info!(
184 %id,
185 uploaded = stats.uploaded,
186 failed = stats.failed,
187 "cache drain completed"
188 );
189 }
190
191 let result = Message {
192 id,
193 cache_drain_result: Some(v1::CacheDrainResult {
194 error: protocol::error_or_empty(stats.last_error),
195 cache_queued: stats.pending,
196 cache_active: stats.active,
197 cache_uploaded: stats.uploaded,
198 cache_failed: stats.failed,
199 }),
200 ..Default::default()
201 };
202 let _ = out.send(result).await;
203}
204
205async fn run_poweroff(id: String, _req: v1::Poweroff, out: Sender<Message>) {
206 let result = Message {
207 id,
208 poweroff_result: Some(v1::PoweroffResult {
209 error: String::new(),
210 }),
211 ..Default::default()
212 };
213 let _ = out.send(result).await;
214
215 tokio::spawn(async move {
216 tokio::time::sleep(Duration::from_millis(100)).await;
217
218 // prefer a clean shutdown through the init system when one is around
219 // (systemd on NixOS, busybox/openrc elsewhere), then fall back to the
220 // raw reboot(2) syscall on minimal guests
221 for poweroff in [SYSTEMCTL_EXECUTABLE, "/sbin/poweroff", "/usr/sbin/poweroff"] {
222 if !std::path::Path::new(poweroff).exists() {
223 continue;
224 }
225 let mut spec = Spec::new(poweroff).timeout(Duration::from_secs(5));
226 if poweroff == SYSTEMCTL_EXECUTABLE {
227 spec = spec.args(["poweroff"]);
228 }
229 match command::run_capture(spec).await {
230 Ok(output) if output.success() => return,
231 Ok(output) => warn!(
232 %poweroff,
233 exit_code = output.exit.exit_code,
234 error = ?output.exit.error,
235 output = %output.combined_lossy(),
236 "poweroff command failed"
237 ),
238 Err(error) => warn!(%poweroff, %error, "poweroff command failed"),
239 }
240 }
241
242 // only ever returns on failure
243 let error =
244 nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF).unwrap_err();
245 warn!(%error, "reboot(RB_POWER_OFF) syscall failed");
246 });
247}