Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "net"
9 "net/http"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/mdlayher/vsock"
15)
16
17type UploadCacheBackend interface {
18 http.Handler
19 Close() error
20}
21
22type UploadCacheProxy struct {
23 port uint32
24
25 ln *vsock.Listener
26 server *http.Server
27 backend UploadCacheBackend
28}
29
30func StartUploadCacheProxy(ctx context.Context, cid uint32, uploadURL string, readUpstreams []CacheUpstream, stagingDir string, logger *slog.Logger) (*UploadCacheProxy, error) {
31 if strings.TrimSpace(uploadURL) == "" {
32 return nil, nil
33 }
34
35 if logger == nil {
36 logger = slog.Default()
37 }
38 logger = logger.With("where", "upload_cache_proxy", "cid", cid, "uploadURL", uploadURL)
39
40 backend, err := newUploadCacheBackend(uploadURL, readUpstreams, stagingDir, logger)
41 if err != nil {
42 return nil, err
43 }
44
45 ln, port, err := listenRandomVsockUploadPort(ctx)
46 if err != nil {
47 return nil, fmt.Errorf("listen for cache upload proxy: %w", err)
48 }
49
50 proxy := &UploadCacheProxy{
51 port: port,
52 ln: ln,
53 backend: backend,
54 }
55 proxy.server = &http.Server{
56 Handler: backend,
57 Protocols: cacheProxyProtocols(),
58 ReadHeaderTimeout: 30 * time.Second,
59 }
60
61 filtered := &cidFilteredVsockListener{
62 Listener: ln,
63 cid: cid,
64 logger: logger,
65 }
66 go func() {
67 if err := proxy.server.Serve(filtered); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, net.ErrClosed) {
68 logger.Warn("upload cache proxy stopped", "port", port, "error", err)
69 }
70 }()
71
72 logger.Info("started upload cache proxy", "port", port, "target", uploadURL, "readUpstreams", len(readUpstreams))
73 return proxy, nil
74}
75
76func newUploadCacheBackend(uploadURL string, readUpstreams []CacheUpstream, stagingDir string, logger *slog.Logger) (UploadCacheBackend, error) {
77 if strings.TrimSpace(uploadURL) == "" {
78 return nil, nil
79 }
80
81 target, err := url.Parse(uploadURL)
82 if err != nil {
83 return nil, fmt.Errorf("parse upload URL %q: %w", uploadURL, err)
84 }
85
86 switch target.Scheme {
87 case "http", "https":
88 if target.Host == "" {
89 return nil, fmt.Errorf("upload URL %q is missing host", uploadURL)
90 }
91 return newHTTPUploadProxyBackend(target, readUpstreams, logger), nil
92
93 case "ssh", "ssh-ng":
94 return newNixStoreUploadBackend(target.String(), stagingDir, readUpstreams, logger, nil)
95
96 case "":
97 switch uploadURL {
98 case "daemon", "local":
99 return newNixStoreUploadBackend(uploadURL, stagingDir, readUpstreams, logger, nil)
100 default:
101 return nil, fmt.Errorf("unsupported upload URL %q", uploadURL)
102 }
103
104 default:
105 return nil, fmt.Errorf("upload URL %q uses unsupported scheme %q", uploadURL, target.Scheme)
106 }
107}
108
109func (p *UploadCacheProxy) Port() uint32 {
110 if p == nil {
111 return 0
112 }
113 return p.port
114}
115
116func (p *UploadCacheProxy) Close() error {
117 if p == nil {
118 return nil
119 }
120
121 var closeErr error
122 if p.server != nil {
123 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
124 closeErr = errors.Join(closeErr, p.server.Shutdown(ctx))
125 cancel()
126 p.server = nil
127 }
128 if p.ln != nil {
129 closeErr = errors.Join(closeErr, p.ln.Close())
130 p.ln = nil
131 }
132 if p.backend != nil {
133 closeErr = errors.Join(closeErr, p.backend.Close())
134 }
135 return closeErr
136}
137
138func listenRandomVsockUploadPort(ctx context.Context) (*vsock.Listener, uint32, error) {
139 var lastErr error
140 for range 32 {
141 port, err := randomVsockPort()
142 if err != nil {
143 return nil, 0, err
144 }
145 ln, err := vsock.Listen(port, nil)
146 if err == nil {
147 return ln, port, nil
148 }
149 lastErr = err
150
151 select {
152 case <-ctx.Done():
153 return nil, 0, ctx.Err()
154 default:
155 }
156 }
157 return nil, 0, fmt.Errorf("listen on random vsock upload port: %w", lastErr)
158}