Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use anyhow::{Context, Result}; 2use tokio::net::{TcpListener, TcpStream}; 3use tokio::task::{JoinError, JoinHandle, JoinSet}; 4use tokio_vsock::{VsockAddr, VsockStream}; 5use tracing::{info, warn}; 6 7// this implements a vsock <-> tcp proxy for communicating with spindle 8pub struct VsockTcpProxy { 9 url: String, 10 handle: JoinHandle<()>, 11} 12 13impl VsockTcpProxy { 14 pub async fn start( 15 name: &'static str, 16 bind_addr: &str, 17 host_cid: u32, 18 host_port: u32, 19 ) -> Result<Self> { 20 if host_port == 0 { 21 anyhow::bail!("port 0 cant be requested"); 22 } 23 24 let listener = TcpListener::bind(bind_addr) 25 .await 26 .with_context(|| format!("bind {name} listener {bind_addr}"))?; 27 let local_addr = listener 28 .local_addr() 29 .with_context(|| format!("{name} local address"))?; 30 let url = format!("http://{local_addr}"); 31 32 let handle = tokio::spawn(async move { 33 accept_loop(name, listener, host_cid, host_port).await; 34 }); 35 36 info!(%url, host_cid, host_port, "{name} ready"); 37 Ok(Self { url, handle }) 38 } 39 40 pub fn url(&self) -> &str { 41 &self.url 42 } 43} 44 45impl Drop for VsockTcpProxy { 46 fn drop(&mut self) { 47 self.handle.abort(); 48 } 49} 50 51async fn accept_loop(name: &'static str, listener: TcpListener, host_cid: u32, host_port: u32) { 52 let mut tasks = JoinSet::new(); 53 loop { 54 tokio::select! { 55 accepted = listener.accept() => match accepted { 56 Ok((conn, _addr)) => { 57 tasks.spawn(async move { 58 if let Err(error) = proxy_conn(name, conn, host_cid, host_port).await { 59 warn!(%error, "{name} connection failed"); 60 } 61 }); 62 } 63 Err(error) => warn!(%error, "{name} accept failed"), 64 }, 65 Some(result) = tasks.join_next(), if !tasks.is_empty() => { 66 log_proxy_task_result(result); 67 } 68 } 69 } 70} 71 72fn log_proxy_task_result(result: Result<(), JoinError>) { 73 if let Err(error) = result { 74 warn!(%error, "proxy task failed"); 75 } 76} 77 78async fn proxy_conn( 79 name: &'static str, 80 mut tcp: TcpStream, 81 host_cid: u32, 82 host_port: u32, 83) -> Result<()> { 84 let mut host = VsockStream::connect(VsockAddr::new(host_cid, host_port)) 85 .await 86 .with_context(|| format!("dial host {name} cid={host_cid} port={host_port}"))?; 87 88 tokio::io::copy_bidirectional(&mut tcp, &mut host) 89 .await 90 .context("proxy connection copy")?; 91 Ok(()) 92}