Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "net" 9 "net/http" 10 "net/url" 11 "strings" 12 "time" 13 14 "github.com/mdlayher/vsock" 15) 16 17type UploadCacheBackend interface { 18 http.Handler 19 Close() error 20} 21 22type UploadCacheProxy struct { 23 port uint32 24 25 ln *vsock.Listener 26 server *http.Server 27 backend UploadCacheBackend 28} 29 30func StartUploadCacheProxy(ctx context.Context, cid uint32, uploadURL string, readUpstreams []CacheUpstream, stagingDir string, logger *slog.Logger) (*UploadCacheProxy, error) { 31 if strings.TrimSpace(uploadURL) == "" { 32 return nil, nil 33 } 34 35 if logger == nil { 36 logger = slog.Default() 37 } 38 logger = logger.With("where", "upload_cache_proxy", "cid", cid, "uploadURL", uploadURL) 39 40 backend, err := newUploadCacheBackend(uploadURL, readUpstreams, stagingDir, logger) 41 if err != nil { 42 return nil, err 43 } 44 45 ln, port, err := listenRandomVsockUploadPort(ctx) 46 if err != nil { 47 return nil, fmt.Errorf("listen for cache upload proxy: %w", err) 48 } 49 50 proxy := &UploadCacheProxy{ 51 port: port, 52 ln: ln, 53 backend: backend, 54 } 55 proxy.server = &http.Server{ 56 Handler: backend, 57 Protocols: cacheProxyProtocols(), 58 ReadHeaderTimeout: 30 * time.Second, 59 } 60 61 filtered := &cidFilteredVsockListener{ 62 Listener: ln, 63 cid: cid, 64 logger: logger, 65 } 66 go func() { 67 if err := proxy.server.Serve(filtered); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, net.ErrClosed) { 68 logger.Warn("upload cache proxy stopped", "port", port, "error", err) 69 } 70 }() 71 72 logger.Info("started upload cache proxy", "port", port, "target", uploadURL, "readUpstreams", len(readUpstreams)) 73 return proxy, nil 74} 75 76func newUploadCacheBackend(uploadURL string, readUpstreams []CacheUpstream, stagingDir string, logger *slog.Logger) (UploadCacheBackend, error) { 77 if strings.TrimSpace(uploadURL) == "" { 78 return nil, nil 79 } 80 81 target, err := url.Parse(uploadURL) 82 if err != nil { 83 return nil, fmt.Errorf("parse upload URL %q: %w", uploadURL, err) 84 } 85 86 switch target.Scheme { 87 case "http", "https": 88 if target.Host == "" { 89 return nil, fmt.Errorf("upload URL %q is missing host", uploadURL) 90 } 91 return newHTTPUploadProxyBackend(target, readUpstreams, logger), nil 92 93 case "ssh", "ssh-ng": 94 return newNixStoreUploadBackend(target.String(), stagingDir, readUpstreams, logger, nil) 95 96 case "": 97 switch uploadURL { 98 case "daemon", "local": 99 return newNixStoreUploadBackend(uploadURL, stagingDir, readUpstreams, logger, nil) 100 default: 101 return nil, fmt.Errorf("unsupported upload URL %q", uploadURL) 102 } 103 104 default: 105 return nil, fmt.Errorf("upload URL %q uses unsupported scheme %q", uploadURL, target.Scheme) 106 } 107} 108 109func (p *UploadCacheProxy) Port() uint32 { 110 if p == nil { 111 return 0 112 } 113 return p.port 114} 115 116func (p *UploadCacheProxy) Close() error { 117 if p == nil { 118 return nil 119 } 120 121 var closeErr error 122 if p.server != nil { 123 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 124 closeErr = errors.Join(closeErr, p.server.Shutdown(ctx)) 125 cancel() 126 p.server = nil 127 } 128 if p.ln != nil { 129 closeErr = errors.Join(closeErr, p.ln.Close()) 130 p.ln = nil 131 } 132 if p.backend != nil { 133 closeErr = errors.Join(closeErr, p.backend.Close()) 134 } 135 return closeErr 136} 137 138func listenRandomVsockUploadPort(ctx context.Context) (*vsock.Listener, uint32, error) { 139 var lastErr error 140 for range 32 { 141 port, err := randomVsockPort() 142 if err != nil { 143 return nil, 0, err 144 } 145 ln, err := vsock.Listen(port, nil) 146 if err == nil { 147 return ln, port, nil 148 } 149 lastErr = err 150 151 select { 152 case <-ctx.Done(): 153 return nil, 0, ctx.Err() 154 default: 155 } 156 } 157 return nil, 0, fmt.Errorf("listen on random vsock upload port: %w", lastErr) 158}