Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "net"
10 "net/http"
11 "net/http/httputil"
12 "net/url"
13 "strings"
14 "time"
15
16 "github.com/mdlayher/vsock"
17)
18
19type UploadCacheProxy struct {
20 port uint32
21
22 ln *vsock.Listener
23 server *http.Server
24}
25
26func StartUploadCacheProxy(ctx context.Context, cid uint32, uploadURL string, readUpstreams []CacheUpstream, logger *slog.Logger) (*UploadCacheProxy, error) {
27 if strings.TrimSpace(uploadURL) == "" {
28 return nil, nil
29 }
30
31 if logger == nil {
32 logger = slog.Default()
33 }
34 logger = logger.With("where", "upload_cache_proxy", "cid", cid, "uploadURL", uploadURL)
35
36 target, err := url.Parse(uploadURL)
37 if err != nil {
38 return nil, fmt.Errorf("parse upload URL %q: %w", uploadURL, err)
39 }
40 if target.Scheme != "http" && target.Scheme != "https" {
41 return nil, fmt.Errorf("upload URL %q uses unsupported scheme %q (must be http or https)", uploadURL, target.Scheme)
42 }
43 if target.Host == "" {
44 return nil, fmt.Errorf("upload URL %q is missing host", uploadURL)
45 }
46
47 ln, port, err := listenRandomVsockUploadPort(ctx)
48 if err != nil {
49 return nil, fmt.Errorf("listen for cache upload proxy: %w", err)
50 }
51
52 proxy := &UploadCacheProxy{
53 port: port,
54 ln: ln,
55 }
56 proxy.server = &http.Server{
57 Handler: uploadProxyHandler(target, readUpstreams, logger),
58 Protocols: cacheProxyProtocols(),
59 ReadHeaderTimeout: 30 * time.Second,
60 }
61
62 filtered := &cidFilteredVsockListener{
63 Listener: ln,
64 cid: cid,
65 logger: logger,
66 }
67 go func() {
68 if err := proxy.server.Serve(filtered); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, net.ErrClosed) {
69 logger.Warn("upload cache proxy stopped", "port", port, "error", err)
70 }
71 }()
72
73 logger.Info("started upload cache proxy", "port", port, "target", uploadURL, "readUpstreams", len(readUpstreams))
74 return proxy, nil
75}
76
77func (p *UploadCacheProxy) Port() uint32 {
78 if p == nil {
79 return 0
80 }
81 return p.port
82}
83
84func (p *UploadCacheProxy) Close() error {
85 if p == nil {
86 return nil
87 }
88
89 var closeErr error
90 if p.server != nil {
91 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
92 closeErr = errors.Join(closeErr, p.server.Shutdown(ctx))
93 cancel()
94 p.server = nil
95 }
96 if p.ln != nil {
97 closeErr = errors.Join(closeErr, p.ln.Close())
98 p.ln = nil
99 }
100 return closeErr
101}
102
103func uploadProxyHandler(target *url.URL, readUpstreams []CacheUpstream, logger *slog.Logger) http.Handler {
104 rp := httputil.NewSingleHostReverseProxy(target)
105 rp.ErrorLog = slog.NewLogLogger(logger.Handler(), slog.LevelError)
106
107 origDirector := rp.Director
108 rp.Director = func(req *http.Request) {
109 origDirector(req)
110 // ensure host matches target
111 req.Host = target.Host
112 // the transport doesn't turn URL userinfo into basic auth, only
113 // http.Client does, so do it ourselves
114 if user := target.User; user != nil {
115 password, _ := user.Password()
116 req.SetBasicAuth(user.Username(), password)
117 }
118 }
119
120 // before uploading, nix copy asks the destination whether it already has each
121 // path by GET/HEAD-ing <hash>.narinfo and skips the ones it does. we answer
122 // that check across the upload target *and* the read caches: if any of them
123 // already serves the path there is no point uploading it (the guest would
124 // just substitute it from there anyway).
125 narinfoUpstreams := append([]CacheUpstream{{url: target}}, readUpstreams...)
126 exists := ¶llelRacingTransport{
127 upstreams: narinfoUpstreams,
128 underlying: proxyTransport,
129 guardedUnderlying: guardedProxyTransport,
130 logger: logger,
131 }
132
133 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134 if isNarinfoExistenceCheck(r) {
135 serveNarinfoExistence(w, r, exists, logger)
136 return
137 }
138 rp.ServeHTTP(w, r)
139 })
140}
141
142func isNarinfoExistenceCheck(r *http.Request) bool {
143 if r.Method != http.MethodGet && r.Method != http.MethodHead {
144 return false
145 }
146 return strings.HasSuffix(r.URL.Path, ".narinfo")
147}
148
149func serveNarinfoExistence(w http.ResponseWriter, r *http.Request, exists http.RoundTripper, logger *slog.Logger) {
150 probe := r.Clone(r.Context())
151 probe.RequestURI = ""
152
153 resp, err := exists.RoundTrip(probe)
154 if err != nil {
155 logger.Warn("upload proxy narinfo check failed, treating as not present", "path", r.URL.Path, "error", err)
156 w.WriteHeader(http.StatusNotFound)
157 return
158 }
159 defer resp.Body.Close()
160
161 for key, values := range resp.Header {
162 for _, value := range values {
163 w.Header().Add(key, value)
164 }
165 }
166 w.WriteHeader(resp.StatusCode)
167 if _, err := io.Copy(w, resp.Body); err != nil && !errors.Is(err, context.Canceled) {
168 logger.Warn("upload proxy narinfo copy failed", "path", r.URL.Path, "error", err)
169 }
170}
171
172func listenRandomVsockUploadPort(ctx context.Context) (*vsock.Listener, uint32, error) {
173 var lastErr error
174 for range 32 {
175 port, err := randomVsockPort()
176 if err != nil {
177 return nil, 0, err
178 }
179 ln, err := vsock.Listen(port, nil)
180 if err == nil {
181 return ln, port, nil
182 }
183 lastErr = err
184
185 select {
186 case <-ctx.Done():
187 return nil, 0, ctx.Err()
188 default:
189 }
190 }
191 return nil, 0, fmt.Errorf("listen on random vsock upload port: %w", lastErr)
192}