Monorepo for Tangled
tangled.org
1use crate::cache::{CacheUploadManager, ReadCacheProxy, WriteCacheProxy};
2use crate::command::Spec;
3use crate::dns_proxy::DnsProxy;
4use crate::exec;
5use crate::nix_config::{self, SYSTEMCTL_EXECUTABLE};
6use crate::on_payload;
7use crate::protocol::{self, Message, v1};
8use crate::{activation, command};
9use anyhow::{Context, Result, bail};
10use std::time::Duration;
11use tokio::io::{AsyncWrite, BufReader};
12use tokio::sync::mpsc::{self, Sender};
13use tokio::task::{JoinError, JoinSet};
14use tokio_vsock::{VsockAddr, VsockStream};
15use tracing::{info, warn};
16
17pub async fn run(host_cid: u32, port: u32) -> Result<()> {
18 let mut conn = VsockStream::connect(VsockAddr::new(host_cid, port))
19 .await
20 .with_context(|| format!("dial host vsock cid={host_cid} port={port}"))?;
21
22 send_hello(&mut conn).await?;
23
24 let (reader_conn, writer_conn) = tokio::io::split(conn);
25 let (out_tx, out_rx) = mpsc::channel::<Message>(256);
26 let writer = tokio::spawn(async move { writer_loop(writer_conn, out_rx).await });
27 let mut reader = BufReader::new(reader_conn);
28
29 let init = match protocol::read_message(&mut reader).await? {
30 Some(Message {
31 init: Some(init), ..
32 }) => init,
33 Some(other) => bail!("expected init, got {}", protocol::kind(&other)),
34 None => bail!("read init: EOF"),
35 };
36 info!(job_id = %init.job_id, "received init");
37
38 let read_proxy = ReadCacheProxy::start(host_cid, init.cache_read_proxy_port)
39 .await
40 .context("start read cache proxy")?;
41 let write_proxy = WriteCacheProxy::start(host_cid, init.cache_upload_proxy_port)
42 .await
43 .context("start write cache proxy")?;
44 let _dns_proxy = DnsProxy::start(host_cid, init.dns_proxy_port)
45 .await
46 .context("start dns proxy")?;
47 let _cache_cfg = nix_config::configure(
48 &init,
49 read_proxy.as_ref().map(ReadCacheProxy::url).unwrap_or(""),
50 )
51 .await
52 .context("configure nix cache")?;
53 let uploader = CacheUploadManager::start(
54 write_proxy.as_ref().map(WriteCacheProxy::url).unwrap_or(""),
55 out_tx.clone(),
56 )
57 .await
58 .context("start cache upload manager")?;
59
60 let mut tasks = JoinSet::new();
61 let read_result: Result<()> = loop {
62 tokio::select! {
63 read = protocol::read_message(&mut reader) => match read {
64 Ok(Some(msg)) => spawn_message_task(&mut tasks, msg, &out_tx, uploader.clone()),
65 Ok(None) => break Ok(()),
66 Err(error) => break Err(error).context("read message"),
67 },
68 Some(result) = tasks.join_next(), if !tasks.is_empty() => {
69 log_task_result(result, false);
70 }
71 }
72 };
73
74 tasks.abort_all();
75 while let Some(result) = tasks.join_next().await {
76 log_task_result(result, true);
77 }
78
79 drop(out_tx);
80 let _ = writer.await;
81 read_result?;
82 Ok(())
83}
84
85fn spawn_message_task(
86 tasks: &mut JoinSet<()>,
87 msg: Message,
88 out_tx: &Sender<Message>,
89 uploader: Option<CacheUploadManager>,
90) {
91 let kind = protocol::kind(&msg);
92 let handle = on_payload!(msg, {
93 activate_config => tasks.spawn(activation::run(msg.id, activate_config, out_tx.clone())),
94 exec_start => tasks.spawn(exec::run(msg.id, exec_start, out_tx.clone())),
95 cache_drain => tasks.spawn(run_cache_drain(msg.id, cache_drain, out_tx.clone(), uploader)),
96 poweroff => tasks.spawn(run_poweroff(msg.id, poweroff, out_tx.clone())),
97 });
98 if handle.is_none() {
99 warn!(kind, "ignoring unsupported message");
100 }
101}
102
103fn log_task_result(result: Result<(), JoinError>, shutting_down: bool) {
104 match result {
105 Ok(()) => {}
106 Err(error) if shutting_down && error.is_cancelled() => {}
107 Err(error) => warn!(%error, "session handler task failed"),
108 }
109}
110
111async fn writer_loop<W>(mut conn: W, mut rx: mpsc::Receiver<Message>)
112where
113 W: AsyncWrite + Unpin,
114{
115 while let Some(msg) = rx.recv().await {
116 if let Err(error) = protocol::write_message(&mut conn, &msg).await {
117 warn!(%error, "failed to write protocol message");
118 break;
119 }
120 }
121}
122
123async fn send_hello(conn: &mut VsockStream) -> Result<()> {
124 let boot_id = tokio::fs::read_to_string("/proc/sys/kernel/random/boot_id")
125 .await
126 .unwrap_or_default()
127 .trim()
128 .to_owned();
129 let nix_version = nix_config::nix_version().await;
130
131 let hello_payload = v1::Hello {
132 protocol_version: protocol::PROTOCOL_VERSION,
133 agent_version: env!("CARGO_PKG_VERSION").to_string(),
134 boot_id: boot_id.clone(),
135 nix_version: nix_version.clone(),
136 };
137 info!(
138 protocol = hello_payload.protocol_version,
139 version = %hello_payload.agent_version,
140 boot = %hello_payload.boot_id,
141 nix = %hello_payload.nix_version,
142 "sent hello"
143 );
144 let hello = Message {
145 id: "hello".to_owned(),
146 hello: Some(hello_payload),
147 ..Default::default()
148 };
149
150 protocol::write_message(conn, &hello)
151 .await
152 .context("send hello")?;
153 Ok(())
154}
155
156async fn run_cache_drain(
157 id: String,
158 req: v1::CacheDrain,
159 out: Sender<Message>,
160 uploader: Option<CacheUploadManager>,
161) {
162 let timeout =
163 (req.timeout_seconds > 0).then(|| Duration::from_secs(u64::from(req.timeout_seconds)));
164 let stats = match uploader.as_ref() {
165 Some(uploader) => uploader.drain(timeout).await,
166 None => Default::default(),
167 };
168
169 if let Some(error) = &stats.last_error {
170 warn!(
171 %id,
172 pending = stats.pending,
173 active = stats.active,
174 uploaded = stats.uploaded,
175 failed = stats.failed,
176 %error,
177 "cache drain completed with error"
178 );
179 } else {
180 info!(
181 %id,
182 uploaded = stats.uploaded,
183 failed = stats.failed,
184 "cache drain completed"
185 );
186 }
187
188 let result = Message {
189 id,
190 cache_drain_result: Some(v1::CacheDrainResult {
191 error: protocol::error_or_empty(stats.last_error),
192 cache_queued: stats.pending,
193 cache_active: stats.active,
194 cache_uploaded: stats.uploaded,
195 cache_failed: stats.failed,
196 }),
197 ..Default::default()
198 };
199 let _ = out.send(result).await;
200}
201
202async fn run_poweroff(id: String, _req: v1::Poweroff, out: Sender<Message>) {
203 let result = Message {
204 id,
205 poweroff_result: Some(v1::PoweroffResult {
206 error: String::new(),
207 }),
208 ..Default::default()
209 };
210 let _ = out.send(result).await;
211
212 tokio::spawn(async move {
213 tokio::time::sleep(Duration::from_millis(100)).await;
214
215 // prefer a clean shutdown through the init system when one is around
216 // (systemd on NixOS, busybox/openrc elsewhere), then fall back to the
217 // raw reboot(2) syscall on minimal guests
218 for poweroff in [SYSTEMCTL_EXECUTABLE, "/sbin/poweroff", "/usr/sbin/poweroff"] {
219 if !std::path::Path::new(poweroff).exists() {
220 continue;
221 }
222 let mut spec = Spec::new(poweroff).timeout(Duration::from_secs(5));
223 if poweroff == SYSTEMCTL_EXECUTABLE {
224 spec = spec.args(["poweroff"]);
225 }
226 match command::run_capture(spec).await {
227 Ok(output) if output.success() => return,
228 Ok(output) => warn!(
229 %poweroff,
230 exit_code = output.exit.exit_code,
231 error = ?output.exit.error,
232 output = %output.combined_lossy(),
233 "poweroff command failed"
234 ),
235 Err(error) => warn!(%poweroff, %error, "poweroff command failed"),
236 }
237 }
238
239 // only ever returns on failure
240 let error =
241 nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF).unwrap_err();
242 warn!(%error, "reboot(RB_POWER_OFF) syscall failed");
243 });
244}