Monorepo for Tangled
tangled.org
1use anyhow::{Context, Result};
2use tokio::net::{TcpListener, TcpStream};
3use tokio::task::{JoinError, JoinHandle, JoinSet};
4use tokio_vsock::{VsockAddr, VsockStream};
5use tracing::{info, warn};
6
7// this implements a vsock <-> tcp proxy for communicating with spindle
8pub struct VsockTcpProxy {
9 url: String,
10 handle: JoinHandle<()>,
11}
12
13impl VsockTcpProxy {
14 pub async fn start(
15 name: &'static str,
16 bind_addr: &str,
17 host_cid: u32,
18 host_port: u32,
19 ) -> Result<Self> {
20 if host_port == 0 {
21 anyhow::bail!("port 0 cant be requested");
22 }
23
24 let listener = TcpListener::bind(bind_addr)
25 .await
26 .with_context(|| format!("bind {name} listener {bind_addr}"))?;
27 let local_addr = listener
28 .local_addr()
29 .with_context(|| format!("{name} local address"))?;
30 let url = format!("http://{local_addr}");
31
32 let handle = tokio::spawn(async move {
33 accept_loop(name, listener, host_cid, host_port).await;
34 });
35
36 info!(%url, host_cid, host_port, "{name} ready");
37 Ok(Self { url, handle })
38 }
39
40 pub fn url(&self) -> &str {
41 &self.url
42 }
43}
44
45impl Drop for VsockTcpProxy {
46 fn drop(&mut self) {
47 self.handle.abort();
48 }
49}
50
51async fn accept_loop(name: &'static str, listener: TcpListener, host_cid: u32, host_port: u32) {
52 let mut tasks = JoinSet::new();
53 loop {
54 tokio::select! {
55 accepted = listener.accept() => match accepted {
56 Ok((conn, _addr)) => {
57 tasks.spawn(async move {
58 if let Err(error) = proxy_conn(name, conn, host_cid, host_port).await {
59 warn!(%error, "{name} connection failed");
60 }
61 });
62 }
63 Err(error) => warn!(%error, "{name} accept failed"),
64 },
65 Some(result) = tasks.join_next(), if !tasks.is_empty() => {
66 log_proxy_task_result(result);
67 }
68 }
69 }
70}
71
72fn log_proxy_task_result(result: Result<(), JoinError>) {
73 if let Err(error) = result {
74 warn!(%error, "proxy task failed");
75 }
76}
77
78async fn proxy_conn(
79 name: &'static str,
80 mut tcp: TcpStream,
81 host_cid: u32,
82 host_port: u32,
83) -> Result<()> {
84 let mut host = VsockStream::connect(VsockAddr::new(host_cid, host_port))
85 .await
86 .with_context(|| format!("dial host {name} cid={host_cid} port={host_port}"))?;
87
88 tokio::io::copy_bidirectional(&mut tcp, &mut host)
89 .await
90 .context("proxy connection copy")?;
91 Ok(())
92}