Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "net" 10 "net/http" 11 "net/http/httputil" 12 "net/url" 13 "strings" 14 "time" 15 16 "github.com/mdlayher/vsock" 17) 18 19type UploadCacheProxy struct { 20 port uint32 21 22 ln *vsock.Listener 23 server *http.Server 24} 25 26func StartUploadCacheProxy(ctx context.Context, cid uint32, uploadURL string, readUpstreams []CacheUpstream, logger *slog.Logger) (*UploadCacheProxy, error) { 27 if strings.TrimSpace(uploadURL) == "" { 28 return nil, nil 29 } 30 31 if logger == nil { 32 logger = slog.Default() 33 } 34 logger = logger.With("where", "upload_cache_proxy", "cid", cid, "uploadURL", uploadURL) 35 36 target, err := url.Parse(uploadURL) 37 if err != nil { 38 return nil, fmt.Errorf("parse upload URL %q: %w", uploadURL, err) 39 } 40 if target.Scheme != "http" && target.Scheme != "https" { 41 return nil, fmt.Errorf("upload URL %q uses unsupported scheme %q (must be http or https)", uploadURL, target.Scheme) 42 } 43 if target.Host == "" { 44 return nil, fmt.Errorf("upload URL %q is missing host", uploadURL) 45 } 46 47 ln, port, err := listenRandomVsockUploadPort(ctx) 48 if err != nil { 49 return nil, fmt.Errorf("listen for cache upload proxy: %w", err) 50 } 51 52 proxy := &UploadCacheProxy{ 53 port: port, 54 ln: ln, 55 } 56 proxy.server = &http.Server{ 57 Handler: uploadProxyHandler(target, readUpstreams, logger), 58 Protocols: cacheProxyProtocols(), 59 ReadHeaderTimeout: 30 * time.Second, 60 } 61 62 filtered := &cidFilteredVsockListener{ 63 Listener: ln, 64 cid: cid, 65 logger: logger, 66 } 67 go func() { 68 if err := proxy.server.Serve(filtered); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, net.ErrClosed) { 69 logger.Warn("upload cache proxy stopped", "port", port, "error", err) 70 } 71 }() 72 73 logger.Info("started upload cache proxy", "port", port, "target", uploadURL, "readUpstreams", len(readUpstreams)) 74 return proxy, nil 75} 76 77func (p *UploadCacheProxy) Port() uint32 { 78 if p == nil { 79 return 0 80 } 81 return p.port 82} 83 84func (p *UploadCacheProxy) Close() error { 85 if p == nil { 86 return nil 87 } 88 89 var closeErr error 90 if p.server != nil { 91 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 92 closeErr = errors.Join(closeErr, p.server.Shutdown(ctx)) 93 cancel() 94 p.server = nil 95 } 96 if p.ln != nil { 97 closeErr = errors.Join(closeErr, p.ln.Close()) 98 p.ln = nil 99 } 100 return closeErr 101} 102 103func uploadProxyHandler(target *url.URL, readUpstreams []CacheUpstream, logger *slog.Logger) http.Handler { 104 rp := httputil.NewSingleHostReverseProxy(target) 105 rp.ErrorLog = slog.NewLogLogger(logger.Handler(), slog.LevelError) 106 107 origDirector := rp.Director 108 rp.Director = func(req *http.Request) { 109 origDirector(req) 110 // ensure host matches target 111 req.Host = target.Host 112 // the transport doesn't turn URL userinfo into basic auth, only 113 // http.Client does, so do it ourselves 114 if user := target.User; user != nil { 115 password, _ := user.Password() 116 req.SetBasicAuth(user.Username(), password) 117 } 118 } 119 120 // before uploading, nix copy asks the destination whether it already has each 121 // path by GET/HEAD-ing <hash>.narinfo and skips the ones it does. we answer 122 // that check across the upload target *and* the read caches: if any of them 123 // already serves the path there is no point uploading it (the guest would 124 // just substitute it from there anyway). 125 narinfoUpstreams := append([]CacheUpstream{{url: target}}, readUpstreams...) 126 exists := &parallelRacingTransport{ 127 upstreams: narinfoUpstreams, 128 underlying: proxyTransport, 129 guardedUnderlying: guardedProxyTransport, 130 logger: logger, 131 } 132 133 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 134 if isNarinfoExistenceCheck(r) { 135 serveNarinfoExistence(w, r, exists, logger) 136 return 137 } 138 rp.ServeHTTP(w, r) 139 }) 140} 141 142func isNarinfoExistenceCheck(r *http.Request) bool { 143 if r.Method != http.MethodGet && r.Method != http.MethodHead { 144 return false 145 } 146 return strings.HasSuffix(r.URL.Path, ".narinfo") 147} 148 149func serveNarinfoExistence(w http.ResponseWriter, r *http.Request, exists http.RoundTripper, logger *slog.Logger) { 150 probe := r.Clone(r.Context()) 151 probe.RequestURI = "" 152 153 resp, err := exists.RoundTrip(probe) 154 if err != nil { 155 logger.Warn("upload proxy narinfo check failed, treating as not present", "path", r.URL.Path, "error", err) 156 w.WriteHeader(http.StatusNotFound) 157 return 158 } 159 defer resp.Body.Close() 160 161 for key, values := range resp.Header { 162 for _, value := range values { 163 w.Header().Add(key, value) 164 } 165 } 166 w.WriteHeader(resp.StatusCode) 167 if _, err := io.Copy(w, resp.Body); err != nil && !errors.Is(err, context.Canceled) { 168 logger.Warn("upload proxy narinfo copy failed", "path", r.URL.Path, "error", err) 169 } 170} 171 172func listenRandomVsockUploadPort(ctx context.Context) (*vsock.Listener, uint32, error) { 173 var lastErr error 174 for range 32 { 175 port, err := randomVsockPort() 176 if err != nil { 177 return nil, 0, err 178 } 179 ln, err := vsock.Listen(port, nil) 180 if err == nil { 181 return ln, port, nil 182 } 183 lastErr = err 184 185 select { 186 case <-ctx.Done(): 187 return nil, 0, ctx.Err() 188 default: 189 } 190 } 191 return nil, 0, fmt.Errorf("listen on random vsock upload port: %w", lastErr) 192}