Monorepo for Tangled
tangled.org
1use crate::command::{self, Spec};
2use crate::nix_config::{SPINDLE_RUN_DIR, clean_store_paths, nix_executable};
3use crate::protocol::{Message, v1};
4use anyhow::{Context, Result};
5use nix::unistd::{Group, chown};
6use serde::de::DeserializeOwned;
7use serde::{Deserialize, Serialize};
8use std::fmt::{self, Write as FmtWrite};
9use std::fs::{self, File, OpenOptions};
10use std::io::{self, Read};
11use std::net::Shutdown;
12use std::os::unix::fs::OpenOptionsExt;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::sync::{Semaphore, mpsc, oneshot, watch};
17use tokio::task::{JoinError, JoinHandle, JoinSet};
18use tokio_vsock::{VMADDR_CID_LOCAL, VsockAddr, VsockListener, VsockStream};
19use tracing::{info, warn};
20
21mod read_proxy;
22mod write_proxy;
23
24pub use read_proxy::ReadCacheProxy;
25pub use write_proxy::WriteCacheProxy;
26
27const UPLOAD_QUEUE_CAPACITY: usize = 128;
28const CONNECTION_WORKERS: usize = 4;
29const CACHE_ENQUEUE_IO_TIMEOUT: Duration = Duration::from_secs(2);
30const DEFAULT_CACHE_ENQUEUE_PORT: u32 = 10241;
31const SHUTTLE_CACHE_ENQUEUE_PORT_ENV: &str = "SHUTTLE_CACHE_VSOCK_PORT";
32const NIX_BUILD_GROUP: &str = "nixbld";
33const SPINDLE_HOOK_TOKEN: &str = "/run/spindle/hook-token";
34
35#[derive(Clone, Debug, Default)]
36pub struct CacheStats {
37 pub pending: u32,
38 pub active: u32,
39 pub uploaded: u32,
40 pub failed: u32,
41 pub last_error: Option<String>,
42}
43
44#[derive(Debug, Default)]
45struct CacheState {
46 stats: CacheStats,
47 enqueue_active: u32,
48 stopped: bool,
49}
50
51impl CacheState {
52 fn snapshot(&self) -> CacheSnapshot {
53 CacheSnapshot {
54 stats: self.stats.clone(),
55 enqueue_active: self.enqueue_active,
56 stopped: self.stopped,
57 }
58 }
59}
60
61#[derive(Clone, Debug, Default)]
62struct CacheSnapshot {
63 stats: CacheStats,
64 enqueue_active: u32,
65 stopped: bool,
66}
67
68impl CacheSnapshot {
69 fn is_idle(&self) -> bool {
70 self.stats.pending == 0 && self.stats.active == 0 && self.enqueue_active == 0
71 }
72}
73
74#[derive(Clone)]
75pub struct CacheUploadManager {
76 inner: Arc<CacheUploadInner>,
77}
78
79struct CacheUploadInner {
80 cmd_tx: mpsc::Sender<Cmd>,
81 stats_rx: watch::Receiver<CacheSnapshot>,
82 handles: Mutex<Vec<JoinHandle<()>>>,
83}
84
85struct UploadJob {
86 paths: Vec<String>,
87 count: u32,
88}
89
90enum Cmd {
91 EnqueueStarted,
92 EnqueueFinished,
93 Enqueue {
94 paths: Vec<String>,
95 reply: oneshot::Sender<Result<usize, String>>,
96 },
97 UploadStarted {
98 count: u32,
99 },
100 UploadFinished {
101 count: u32,
102 error: Option<String>,
103 },
104 UploadWorkerStopped,
105 Stop,
106}
107
108impl CacheUploadManager {
109 pub async fn start(upload_url: &str, event_tx: mpsc::Sender<Message>) -> Result<Option<Self>> {
110 if upload_url.is_empty() {
111 // nothing to upload to, so don't require the guest-local vsock
112 // listener (vsock_loopback) or the nix post-build hook
113 info!("no cache upload url configured, cache uploads disabled");
114 return Ok(None);
115 }
116 let token = create_hook_token().context("create cache hook token")?;
117 let port = cache_enqueue_port();
118 let listener = VsockListener::bind(VsockAddr::new(VMADDR_CID_LOCAL, port))
119 .with_context(|| format!("listen on guest-local vsock port {port}"))?;
120
121 let (cmd_tx, cmd_rx) = mpsc::channel::<Cmd>(UPLOAD_QUEUE_CAPACITY);
122 let (upload_tx, upload_rx) = mpsc::channel::<UploadJob>(UPLOAD_QUEUE_CAPACITY);
123 let (stats_tx, stats_rx) = watch::channel(CacheSnapshot::default());
124
125 let mut handles = Vec::with_capacity(3);
126
127 handles.push(tokio::spawn(async move {
128 cache_manager_loop(cmd_rx, upload_tx, stats_tx).await;
129 }));
130
131 let upload_cmd_tx = cmd_tx.clone();
132 let upload_url = upload_url.to_owned();
133 handles.push(tokio::spawn(async move {
134 upload_loop(upload_rx, upload_cmd_tx, upload_url).await;
135 }));
136
137 let accept_cmd_tx = cmd_tx.clone();
138 handles.push(tokio::spawn(async move {
139 accept_loop(listener, token, event_tx, accept_cmd_tx).await;
140 }));
141
142 info!(
143 port,
144 workers = CONNECTION_WORKERS,
145 "cache upload queue ready"
146 );
147 let inner = Arc::new(CacheUploadInner {
148 cmd_tx,
149 stats_rx,
150 handles: Mutex::new(handles),
151 });
152 Ok(Some(Self { inner }))
153 }
154
155 pub async fn drain(&self, timeout: Option<Duration>) -> CacheStats {
156 let mut stats_rx = self.inner.stats_rx.clone();
157 let wait = async {
158 loop {
159 let snapshot = stats_rx.borrow_and_update().clone();
160 if snapshot.is_idle() || snapshot.stopped {
161 return snapshot.stats;
162 }
163 if stats_rx.changed().await.is_err() {
164 let mut stats = stats_rx.borrow().stats.clone();
165 stats.last_error = Some("cache manager stopped".to_owned());
166 return stats;
167 }
168 }
169 };
170
171 match timeout {
172 Some(timeout) => match tokio::time::timeout(timeout, wait).await {
173 Ok(stats) => stats,
174 Err(_) => {
175 let mut stats = self.inner.stats_rx.borrow().stats.clone();
176 stats.last_error = Some("cache drain timed out".to_owned());
177 stats
178 }
179 },
180 None => wait.await,
181 }
182 }
183}
184
185impl Drop for CacheUploadInner {
186 fn drop(&mut self) {
187 let _ = self.cmd_tx.try_send(Cmd::Stop);
188 if let Ok(mut handles) = self.handles.lock() {
189 for handle in handles.drain(..) {
190 handle.abort();
191 }
192 }
193 let _ = fs::remove_file(SPINDLE_HOOK_TOKEN);
194 }
195}
196
197#[derive(Debug, Deserialize, Serialize)]
198struct EnqueueBuiltPathsRequest {
199 token: String,
200 paths: Vec<String>,
201}
202
203#[derive(Debug, Deserialize, Serialize)]
204struct EnqueueBuiltPathsResponse {
205 queued: usize,
206 #[serde(default, skip_serializing_if = "String::is_empty")]
207 error: String,
208}
209
210#[derive(Debug)]
211enum JsonLineError {
212 Empty,
213 TimedOut,
214 Io(io::Error),
215 Json(serde_json::Error),
216}
217
218impl JsonLineError {
219 fn enqueue_request_message(self) -> String {
220 match self {
221 Self::Empty => "empty cache enqueue request".to_owned(),
222 Self::TimedOut => "cache enqueue read timed out".to_owned(),
223 Self::Io(error) => error.to_string(),
224 Self::Json(error) => error.to_string(),
225 }
226 }
227}
228
229impl fmt::Display for JsonLineError {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 match self {
232 Self::Empty => f.write_str("empty message"),
233 Self::TimedOut => f.write_str("timed out"),
234 Self::Io(error) => error.fmt(f),
235 Self::Json(error) => error.fmt(f),
236 }
237 }
238}
239
240async fn cache_manager_loop(
241 mut cmd_rx: mpsc::Receiver<Cmd>,
242 upload_tx: mpsc::Sender<UploadJob>,
243 stats_tx: watch::Sender<CacheSnapshot>,
244) {
245 let mut state = CacheState::default();
246
247 let publish_state = |state: &CacheState| {
248 let _ = stats_tx.send(state.snapshot());
249 };
250
251 while let Some(command) = cmd_rx.recv().await {
252 match command {
253 Cmd::EnqueueStarted => {
254 state.enqueue_active += 1;
255 publish_state(&state);
256 }
257 Cmd::EnqueueFinished => {
258 decrement_counter(&mut state.enqueue_active, 1, "cache enqueue active");
259 publish_state(&state);
260 }
261 Cmd::Enqueue { paths, reply } => {
262 let result = enqueue_upload_job(&mut state, &upload_tx, paths);
263 publish_state(&state);
264 let _ = reply.send(result);
265 }
266 Cmd::UploadStarted { count } => {
267 decrement_counter(&mut state.stats.pending, count, "cache pending uploads");
268 state.stats.active += count;
269 publish_state(&state);
270 }
271 Cmd::UploadFinished { count, error } => {
272 decrement_counter(&mut state.stats.active, count, "cache active uploads");
273 if let Some(error) = error {
274 state.stats.failed += count;
275 state.stats.last_error = Some(error);
276 } else {
277 state.stats.uploaded += count;
278 }
279 publish_state(&state);
280 }
281 Cmd::UploadWorkerStopped => {
282 state.stopped = true;
283 publish_state(&state);
284 }
285 Cmd::Stop => {
286 state.stopped = true;
287 publish_state(&state);
288 break;
289 }
290 }
291 }
292
293 state.stopped = true;
294 publish_state(&state);
295}
296
297fn enqueue_upload_job(
298 state: &mut CacheState,
299 upload_tx: &mpsc::Sender<UploadJob>,
300 paths: Vec<String>,
301) -> Result<usize, String> {
302 if state.stopped {
303 return Err("cache manager stopped".to_owned());
304 }
305
306 let count = paths.len() as u32;
307 if count == 0 {
308 return Ok(0);
309 }
310
311 match upload_tx.try_send(UploadJob { paths, count }) {
312 Ok(()) => {
313 state.stats.pending += count;
314 Ok(count as usize)
315 }
316 Err(mpsc::error::TrySendError::Full(_)) => Err("cache upload queue is full".to_owned()),
317 Err(mpsc::error::TrySendError::Closed(_)) => {
318 state.stopped = true;
319 Err("cache upload worker stopped".to_owned())
320 }
321 }
322}
323
324fn decrement_counter(counter: &mut u32, count: u32, name: &'static str) {
325 match counter.checked_sub(count) {
326 Some(value) => *counter = value,
327 None => {
328 warn!(name, current = *counter, count, "cache counter underflow");
329 *counter = 0;
330 }
331 }
332}
333
334async fn upload_loop(
335 mut upload_rx: mpsc::Receiver<UploadJob>,
336 cmd_tx: mpsc::Sender<Cmd>,
337 upload_url: String,
338) {
339 while let Some(job) = upload_rx.recv().await {
340 let cmd = Cmd::UploadStarted { count: job.count };
341 if cmd_tx.send(cmd).await.is_err() {
342 break;
343 }
344
345 let error = upload_paths(&upload_url, &job.paths)
346 .await
347 .err()
348 .map(|error| error.to_string());
349
350 let cmd = Cmd::UploadFinished {
351 count: job.count,
352 error,
353 };
354 if cmd_tx.send(cmd).await.is_err() {
355 break;
356 }
357 }
358
359 let _ = cmd_tx.send(Cmd::UploadWorkerStopped).await;
360}
361
362// runs nix copy against the write cache proxy, which goes to the spindle
363// and spindle will then forward the request to the actual binary cache
364async fn upload_paths(upload_url: &str, paths: &[String]) -> Result<()> {
365 fn add_query_param(url: &str, key: &str, value: &str) -> String {
366 let separator = url.contains('?').then_some('&').unwrap_or('?');
367 format!("{}{}{}={}", url, separator, key, value)
368 }
369
370 if paths.is_empty() || upload_url.is_empty() {
371 return Ok(());
372 }
373
374 // we use zstd 3 because its the best usually. it is faster than no
375 // compression also because of IO savings
376 let dest_url = add_query_param(upload_url, "compression", "zstd");
377 let dest_url = add_query_param(&dest_url, "compression-level", "3");
378 let dest_url = add_query_param(&dest_url, "parallel-compression", "true");
379
380 let spec = Spec::new(nix_executable())
381 .args(["copy", "--to", &dest_url])
382 .args(paths.iter().cloned())
383 .timeout(Duration::from_secs(10 * 60));
384
385 let output = command::run_capture(spec).await.context("run nix copy")?;
386 if !output.success() {
387 anyhow::bail!(
388 "nix copy failed: exit={} error={:?} output={}",
389 output.exit.exit_code,
390 output.exit.error,
391 output.combined_lossy(),
392 );
393 }
394
395 info!(paths = paths.len(), %upload_url, "uploaded cache paths");
396 Ok(())
397}
398
399async fn accept_loop(
400 listener: VsockListener,
401 token: String,
402 event_tx: mpsc::Sender<Message>,
403 cmd_tx: mpsc::Sender<Cmd>,
404) {
405 let permits = Arc::new(Semaphore::new(CONNECTION_WORKERS));
406 let mut tasks = JoinSet::new();
407 loop {
408 tokio::select! {
409 accepted = listener.accept() => match accepted {
410 Ok((conn, _addr)) => {
411 let Ok(permit) = permits.clone().try_acquire_owned() else {
412 tasks.spawn(async move {
413 let mut conn = conn;
414 write_enqueue_response(
415 &mut conn,
416 0,
417 Some("cache enqueue workers are busy".to_owned()),
418 )
419 .await;
420 });
421 warn!("cache enqueue dropped because workers are busy");
422 continue;
423 };
424
425 let worker_cmd_tx = cmd_tx.clone();
426 if let Err(error) = start_enqueue_request(&worker_cmd_tx) {
427 tasks.spawn(async move {
428 let mut conn = conn;
429 write_enqueue_response(&mut conn, 0, Some(error)).await;
430 });
431 continue;
432 }
433
434 let worker_token = token.clone();
435 let worker_event_tx = event_tx.clone();
436 tasks.spawn(async move {
437 let _permit = permit;
438 handle_enqueue_conn(
439 conn,
440 &worker_token,
441 &worker_event_tx,
442 &worker_cmd_tx,
443 )
444 .await;
445 let _ = worker_cmd_tx.send(Cmd::EnqueueFinished).await;
446 });
447 }
448 Err(error) => {
449 if error.kind() != io::ErrorKind::Interrupted {
450 warn!(%error, "cache enqueue accept failed");
451 }
452 }
453 },
454 Some(result) = tasks.join_next(), if !tasks.is_empty() => {
455 log_enqueue_task_result(result);
456 }
457 }
458 }
459}
460
461fn log_enqueue_task_result(result: Result<(), JoinError>) {
462 if let Err(error) = result {
463 warn!(%error, "cache enqueue task failed");
464 }
465}
466
467async fn handle_enqueue_conn(
468 mut conn: VsockStream,
469 expected_token: &str,
470 event_tx: &mpsc::Sender<Message>,
471 cmd_tx: &mpsc::Sender<Cmd>,
472) {
473 let req: EnqueueBuiltPathsRequest = match read_enqueue_request(&mut conn).await {
474 Ok(req) => req,
475 Err(error) => {
476 write_enqueue_response(&mut conn, 0, Some(error)).await;
477 return;
478 }
479 };
480
481 if req.token != expected_token {
482 write_enqueue_response(&mut conn, 0, Some("invalid cache enqueue token".to_owned())).await;
483 return;
484 }
485
486 match enqueue_paths(cmd_tx, req.paths).await {
487 Ok(queued) => {
488 send_built_paths_event(event_tx, queued.event_paths).await;
489 write_enqueue_response(&mut conn, queued.count, None).await;
490 }
491 Err(error) => write_enqueue_response(&mut conn, 0, Some(error)).await,
492 }
493}
494
495fn start_enqueue_request(cmd_tx: &mpsc::Sender<Cmd>) -> Result<(), String> {
496 match cmd_tx.try_send(Cmd::EnqueueStarted) {
497 Ok(()) => Ok(()),
498 Err(mpsc::error::TrySendError::Full(_)) => Err("cache upload queue is full".to_owned()),
499 Err(mpsc::error::TrySendError::Closed(_)) => Err("cache upload worker stopped".to_owned()),
500 }
501}
502
503async fn read_enqueue_request(conn: &mut VsockStream) -> Result<EnqueueBuiltPathsRequest, String> {
504 read_json_line(conn)
505 .await
506 .map_err(JsonLineError::enqueue_request_message)
507}
508
509async fn read_json_line<T>(conn: &mut VsockStream) -> Result<T, JsonLineError>
510where
511 T: DeserializeOwned,
512{
513 let mut data = Vec::new();
514 let mut reader = BufReader::new(conn);
515 let bytes_read = tokio::time::timeout(
516 CACHE_ENQUEUE_IO_TIMEOUT,
517 reader.read_until(b'\n', &mut data),
518 )
519 .await
520 .map_err(|_| JsonLineError::TimedOut)?
521 .map_err(JsonLineError::Io)?;
522
523 if bytes_read == 0 {
524 return Err(JsonLineError::Empty);
525 }
526
527 serde_json::from_slice(&data).map_err(JsonLineError::Json)
528}
529
530struct QueuedPaths {
531 count: usize,
532 event_paths: Vec<String>,
533}
534
535async fn enqueue_paths(
536 cmd_tx: &mpsc::Sender<Cmd>,
537 paths: Vec<String>,
538) -> Result<QueuedPaths, String> {
539 let paths = clean_store_paths(&paths);
540 let event_paths = paths.clone();
541 if paths.is_empty() {
542 return Ok(QueuedPaths {
543 count: 0,
544 event_paths,
545 });
546 }
547
548 let (reply, queued) = oneshot::channel();
549 match cmd_tx.try_send(Cmd::Enqueue { paths, reply }) {
550 Ok(()) => match queued.await {
551 Ok(Ok(count)) => Ok(QueuedPaths { count, event_paths }),
552 Ok(Err(error)) => Err(error),
553 Err(_) => Err("cache upload worker stopped".to_owned()),
554 },
555 Err(mpsc::error::TrySendError::Full(_)) => Err("cache upload queue is full".to_owned()),
556 Err(mpsc::error::TrySendError::Closed(_)) => Err("cache upload worker stopped".to_owned()),
557 }
558}
559
560async fn send_built_paths_event(event_tx: &mpsc::Sender<Message>, paths: Vec<String>) {
561 if paths.is_empty() {
562 return;
563 }
564
565 let msg = Message {
566 id: "built-paths".to_owned(),
567 built_paths: Some(v1::BuiltPaths {
568 paths,
569 reason: "post_build_hook".to_owned(),
570 }),
571 ..Default::default()
572 };
573 let _ = event_tx.send(msg).await;
574}
575
576async fn write_enqueue_response(conn: &mut VsockStream, queued: usize, error: Option<String>) {
577 let response = EnqueueBuiltPathsResponse {
578 queued,
579 error: error.unwrap_or_default(),
580 };
581 let _ = write_json_line(conn, &response).await;
582}
583
584async fn write_json_line<T>(conn: &mut VsockStream, value: &T) -> Result<(), JsonLineError>
585where
586 T: Serialize + ?Sized,
587{
588 let data = serde_json::to_vec(value).map_err(JsonLineError::Json)?;
589 tokio::time::timeout(CACHE_ENQUEUE_IO_TIMEOUT, async {
590 conn.write_all(&data).await?;
591 conn.write_all(b"\n").await?;
592 VsockStream::shutdown(conn, Shutdown::Write)
593 })
594 .await
595 .map_err(|_| JsonLineError::TimedOut)?
596 .map_err(JsonLineError::Io)
597}
598
599// we use a loopback vsock here since its better than having to do the whole http song and dance!
600pub async fn enqueue_built_paths(paths: &[String]) {
601 let paths = clean_store_paths(paths);
602 if paths.is_empty() {
603 return;
604 }
605
606 let token = match read_hook_token() {
607 Ok(token) => token,
608 Err(_) => return,
609 };
610
611 if token.is_empty() {
612 return;
613 }
614
615 let mut conn =
616 match VsockStream::connect(VsockAddr::new(VMADDR_CID_LOCAL, cache_enqueue_port())).await {
617 Ok(conn) => conn,
618 Err(error) => {
619 warn!(paths = paths.len(), %error, "cache enqueue unavailable");
620 return;
621 }
622 };
623
624 let request = EnqueueBuiltPathsRequest { token, paths };
625 match write_json_line(&mut conn, &request).await {
626 Ok(()) => {}
627 Err(JsonLineError::Json(error)) => {
628 warn!(%error, "cache enqueue encode failed");
629 return;
630 }
631 Err(JsonLineError::TimedOut) => {
632 warn!("cache enqueue write timed out");
633 return;
634 }
635 Err(error) => {
636 warn!(%error, "cache enqueue write failed");
637 return;
638 }
639 }
640
641 let response: EnqueueBuiltPathsResponse = match read_json_line(&mut conn).await {
642 Ok(response) => response,
643 Err(JsonLineError::Empty) => {
644 warn!("cache enqueue ack was empty");
645 return;
646 }
647 Err(JsonLineError::TimedOut) => {
648 warn!("cache enqueue ack timed out");
649 return;
650 }
651 Err(error) => {
652 warn!(%error, "cache enqueue ack failed");
653 return;
654 }
655 };
656
657 if !response.error.is_empty() {
658 warn!(error = %response.error, "cache enqueue rejected");
659 return;
660 }
661
662 info!(queued = response.queued, "cache paths enqueued");
663}
664
665fn cache_enqueue_port() -> u32 {
666 std::env::var(SHUTTLE_CACHE_ENQUEUE_PORT_ENV)
667 .ok()
668 .and_then(|value| value.parse().ok())
669 .unwrap_or(DEFAULT_CACHE_ENQUEUE_PORT)
670}
671
672fn create_hook_token() -> Result<String> {
673 use std::io::Write;
674
675 fs::create_dir_all(SPINDLE_RUN_DIR).with_context(|| format!("create {SPINDLE_RUN_DIR}"))?;
676 let token = random_token().context("generate hook token")?;
677 let mut file = OpenOptions::new()
678 .create(true)
679 .truncate(true)
680 .write(true)
681 .mode(0o640)
682 .open(SPINDLE_HOOK_TOKEN)
683 .with_context(|| format!("create {SPINDLE_HOOK_TOKEN}"))?;
684 allow_nix_build_group(SPINDLE_HOOK_TOKEN)?;
685 file.write_all(token.as_bytes())
686 .with_context(|| format!("write {SPINDLE_HOOK_TOKEN}"))?;
687 file.write_all(b"\n")
688 .with_context(|| format!("write {SPINDLE_HOOK_TOKEN}"))?;
689 Ok(token)
690}
691
692fn allow_nix_build_group(path: &str) -> Result<()> {
693 let Some(group) =
694 Group::from_name(NIX_BUILD_GROUP).with_context(|| format!("lookup {NIX_BUILD_GROUP}"))?
695 else {
696 warn!(
697 group = NIX_BUILD_GROUP,
698 "nix build group not found; cache hook token remains root-only"
699 );
700 return Ok(());
701 };
702
703 chown(path, None, Some(group.gid)).with_context(|| format!("chown {path} to {NIX_BUILD_GROUP}"))
704}
705
706fn read_hook_token() -> Result<String> {
707 fs::read_to_string(SPINDLE_HOOK_TOKEN)
708 .map(|token| token.trim().to_owned())
709 .with_context(|| format!("read {SPINDLE_HOOK_TOKEN}"))
710}
711
712fn random_token() -> Result<String> {
713 let mut bytes = [0_u8; 32];
714 File::open("/dev/urandom")
715 .context("open /dev/urandom")?
716 .read_exact(&mut bytes)
717 .context("read /dev/urandom")?;
718
719 let mut token = String::with_capacity(bytes.len() * 2);
720 for byte in bytes {
721 write!(&mut token, "{byte:02x}").unwrap();
722 }
723 Ok(token)
724}