Monorepo for Tangled
tangled.org
1//go:build linux
2
3package main
4
5import (
6 "context"
7 "database/sql"
8 "errors"
9 "fmt"
10 "log/slog"
11 "net"
12 "os"
13 "path/filepath"
14 "time"
15
16 "github.com/mdlayher/vsock"
17 "github.com/urfave/cli/v3"
18 agentv1 "tangled.org/core/spindle/agentproto/gen"
19 "tangled.org/core/spindle/db"
20 "tangled.org/core/spindle/engines/microvm"
21)
22
23func SpindleMicroVMRunCommand() *cli.Command {
24 return &cli.Command{
25 Name: "spindle-microvm-run",
26 Usage: "launch the Spindle base microVM and run one command over vsock",
27 Flags: []cli.Flag{
28 &cli.StringFlag{
29 Name: "image-spec",
30 Sources: cli.EnvVars("SPINDLE_MICROVM_IMAGE_SPEC"),
31 Usage: "path to microVM image spec JSON",
32 },
33 &cli.StringFlag{
34 Name: "mkfs-ext4",
35 Usage: "override mkfs.ext4 binary",
36 },
37 &cli.StringFlag{
38 Name: "work-dir",
39 Usage: "directory for per-run disks and sockets",
40 },
41 &cli.UintFlag{
42 Name: "cid",
43 Usage: "guest vsock CID; defaults to a random high CID",
44 },
45 &cli.UintFlag{
46 Name: "port",
47 Value: 10240,
48 Usage: "host vsock port to listen on",
49 },
50 &cli.UintFlag{
51 Name: "memory-mib",
52 Usage: "override the guest memory size in MiB (defaults to the image spec)",
53 },
54 &cli.BoolFlag{
55 Name: "disable-kvm",
56 Usage: "run without -enable-kvm even if /dev/kvm is available",
57 },
58 &cli.BoolFlag{
59 Name: "dev",
60 Usage: "enable dev mode (allows host network access, disables SSL verification)",
61 },
62 &cli.DurationFlag{
63 Name: "qmp-timeout",
64 Value: 10 * time.Second,
65 Usage: "how long to wait for qmp to become ready",
66 },
67 &cli.DurationFlag{
68 Name: "accept-timeout",
69 Value: 15 * time.Second,
70 Usage: "how long to wait for the guest agent after qemu starts",
71 },
72 &cli.DurationFlag{
73 Name: "exec-timeout",
74 Value: 30 * time.Second,
75 Usage: "timeout for the guest command",
76 },
77 &cli.DurationFlag{
78 Name: "cache-upload-wait-timeout",
79 Aliases: []string{"cache-drain-timeout"},
80 Value: 5 * time.Minute,
81 Usage: "how long to wait for guest cache uploads to finish after the command exits",
82 },
83 &cli.DurationFlag{
84 Name: "shutdown-timeout",
85 Value: 10 * time.Second,
86 Usage: "how long to wait for qemu to exit after guest powerdown",
87 },
88 &cli.StringFlag{
89 Name: "cwd",
90 Usage: "guest working directory",
91 },
92 &cli.StringSliceFlag{
93 Name: "cache-read-url",
94 Sources: cli.EnvVars("SPINDLE_NIX_CACHE_READ_URLS"),
95 Usage: "Nix binary cache URL to pass to the guest; repeatable",
96 },
97 &cli.StringSliceFlag{
98 Name: "cache-trusted-public-key",
99 Sources: cli.EnvVars("SPINDLE_NIX_CACHE_TRUSTED_PUBLIC_KEYS"),
100 Usage: "Nix binary cache public key to trust in the guest; repeatable",
101 },
102 &cli.StringFlag{
103 Name: "cache-upload-url",
104 Sources: cli.EnvVars("SPINDLE_NIX_CACHE_UPLOAD_URL"),
105 Usage: "optional cache upload URL for guest-built store paths",
106 },
107 &cli.StringFlag{
108 Name: "activate-config",
109 Usage: "JSON user config to activate before exec (e.g. '{\"services\":{\"openssh\":{\"enable\":true}}}')",
110 },
111 &cli.StringFlag{
112 Name: "db",
113 Usage: "path to sqlite database for config cache",
114 },
115 },
116 Action: runMicroVMRunDev,
117 }
118}
119
120func runMicroVMRunDev(ctx context.Context, cmd *cli.Command) error {
121 imageSpecPath := cmd.String("image-spec")
122 if imageSpecPath == "" {
123 return fmt.Errorf("--image-spec or SPINDLE_MICROVM_IMAGE_SPEC is required")
124 }
125
126 imageSpec, err := microvm.LoadImageSpec(imageSpecPath)
127 if err != nil {
128 return err
129 }
130
131 port := uint32(cmd.Uint("port"))
132 // tell the guest which host vsock port to dial back on. shuttle reads the
133 // cmdline params this is so we can run multiple of this process
134 // concurrently, because otherwise it listens on a specific vsock port, and
135 // we cant bind to the same port twice...
136 imageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", imageSpec.BootArgs, port)
137 if mib := cmd.Uint("memory-mib"); mib > 0 {
138 imageSpec.MemoryMiB = int(mib)
139 }
140 ln, err := vsock.Listen(port, nil)
141 if err != nil {
142 return fmt.Errorf("listen on vsock port %d: %w", port, err)
143 }
144 defer ln.Close()
145
146 vm, err := microvm.StartVM(ctx, microvm.VMConfig{
147 Image: imageSpec,
148 BootTimeout: cmd.Duration("qmp-timeout"),
149 CID: uint32(cmd.Uint("cid")),
150 EnableKVM: !cmd.Bool("disable-kvm"),
151 MkfsExt4: cmd.String("mkfs-ext4"),
152 WorkDir: cmd.String("work-dir"),
153 Dev: cmd.Bool("dev"),
154 }, slog.Default())
155 if err != nil {
156 return err
157 }
158 defer vm.Close()
159
160 logs := vm.Logs()
161 fmt.Fprintf(os.Stderr, "microvm started: cid=%d work-dir=%s serial-log=%s qemu-log=%s\n",
162 vm.CID(),
163 vm.WorkDir(),
164 logs.Serial,
165 logs.Extra["qemu"],
166 )
167
168 logger := slog.Default()
169
170 if cmd.Duration("accept-timeout") > 0 {
171 if err := ln.SetDeadline(time.Now().Add(cmd.Duration("accept-timeout"))); err != nil {
172 return fmt.Errorf("set accept deadline: %w", err)
173 }
174 }
175
176 argv := cmd.Args().Slice()
177 if len(argv) == 0 {
178 argv = []string{"/run/current-system/sw/bin/echo", "hello-from-spindle"}
179 }
180 jobID := "spindle-microvm-run"
181 execID := "dev-1"
182 var pendingConfigKey string
183 var pendingConfigToplevel string
184 var configCacheDB *db.DB
185
186 fmt.Fprintf(os.Stderr, "listening for agent on %s\n", ln.Addr())
187 conn, err := acceptExpectedVsockConn(ln, vm.CID(), logger)
188 if err != nil {
189 return fmt.Errorf("accept agent connection: %w", err)
190 }
191 defer conn.Close()
192
193 upstreams, err := microvm.BuildCacheUpstreams(cmd.StringSlice("cache-read-url"), nil)
194 if err != nil {
195 return fmt.Errorf("build cache upstreams: %w", err)
196 }
197
198 var readCache *microvm.ReadCacheProxy
199 if len(cmd.StringSlice("cache-read-url")) > 0 {
200 var err error
201 readCache, err = microvm.StartReadCacheProxy(ctx, vm.CID(), upstreams, logger)
202 if err != nil {
203 return fmt.Errorf("start read cache proxy: %w", err)
204 }
205 defer readCache.Close()
206 }
207
208 var uploadCache *microvm.UploadCacheProxy
209 if cmd.String("cache-upload-url") != "" {
210 var err error
211 uploadCache, err = microvm.StartUploadCacheProxy(ctx, vm.CID(), cmd.String("cache-upload-url"), upstreams, filepath.Join(vm.WorkDir(), "upload-cache"), logger)
212 if err != nil {
213 return fmt.Errorf("start upload cache proxy: %w", err)
214 }
215 defer uploadCache.Close()
216 }
217 dnsProxy, err := microvm.StartDNSProxy(ctx, vm.CID(), logger)
218 if err != nil {
219 return fmt.Errorf("start dns proxy: %w", err)
220 }
221 defer dnsProxy.Close()
222
223 session := microvm.NewAgentSession(conn, logger)
224
225 initCtx, cancelInit := context.WithTimeout(ctx, 30*time.Second)
226 defer cancelInit()
227 if err := session.Init(initCtx, &agentv1.Init{
228 JobId: jobID,
229 CacheTrustedPublicKeys: cmd.StringSlice("cache-trusted-public-key"),
230 CacheReadProxyPort: readCache.Port(),
231 CacheUploadProxyPort: uploadCache.Port(),
232 DnsProxyPort: dnsProxy.Port(),
233 }); err != nil {
234 return fmt.Errorf("init agent: %w", err)
235 }
236
237 execCtx := ctx
238 if cmd.Duration("exec-timeout") > 0 {
239 var cancel context.CancelFunc
240 execCtx, cancel = context.WithTimeout(ctx, cmd.Duration("exec-timeout"))
241 defer cancel()
242 }
243
244 if cmd.String("activate-config") != "" {
245 actCtx := execCtx
246 baseHash, err := microvm.BaseConfigHash(imageSpec)
247 if err != nil {
248 return fmt.Errorf("calculate base config hash: %w", err)
249 }
250
251 var configKey string
252 var cachedToplevel string
253 if cmd.String("db") != "" {
254 configCacheDB, err = db.Make(ctx, cmd.String("db"))
255 if err != nil {
256 return fmt.Errorf("failed to open database: %w", err)
257 }
258 defer configCacheDB.Close()
259
260 configKey, err = microvm.BuildConfigKey(imageSpec, cmd.String("activate-config"))
261 if err != nil {
262 return fmt.Errorf("calculate config key: %w", err)
263 }
264
265 record, err := configCacheDB.GetNixOSToplevelCacheRecord(configKey)
266 if err != nil {
267 if !errors.Is(err, sql.ErrNoRows) {
268 return fmt.Errorf("lookup config cache: %w", err)
269 }
270 } else {
271 cachedToplevel = record.Toplevel
272 fmt.Printf("realizing cached NixOS config %s\n", cachedToplevel)
273 }
274 }
275
276 result, err := session.ActivateConfig(actCtx, "dev-activate", &agentv1.ActivateConfig{
277 ConfigKey: configKey,
278 BaseConfigHash: baseHash,
279 UserConfig: cmd.String("activate-config"),
280 Toplevel: cachedToplevel,
281 })
282 if err != nil {
283 return fmt.Errorf("activate config: %w", err)
284 }
285 fmt.Fprintf(os.Stderr, "activated config toplevel: %s\n", result.Toplevel)
286
287 if configCacheDB != nil && cachedToplevel == "" && result.Toplevel != "" && configKey != "" {
288 if uploadCache == nil {
289 fmt.Fprintln(os.Stderr, "skipping config cache metadata commit: no cache upload url configured")
290 } else {
291 pendingConfigKey = configKey
292 pendingConfigToplevel = result.Toplevel
293 }
294 }
295 }
296
297 exitCode, err := session.Exec(execCtx, microvm.AgentExec{
298 ID: execID,
299 ExecStart: &agentv1.ExecStart{
300 Argv: argv,
301 Cwd: cmd.String("cwd"),
302 },
303 Stdout: os.Stdout,
304 Stderr: os.Stderr,
305 })
306 if err != nil {
307 return err
308 }
309
310 if uploadCache != nil {
311 uploadWaitCtx := ctx
312 if cmd.Duration("cache-upload-wait-timeout") > 0 {
313 var cancel context.CancelFunc
314 uploadWaitCtx, cancel = context.WithTimeout(ctx, cmd.Duration("cache-upload-wait-timeout"))
315 defer cancel()
316 }
317 uploaded, err := session.Drain(uploadWaitCtx)
318 if err != nil {
319 return err
320 }
321 fmt.Printf("cache uploaded: %d\n", uploaded)
322 if configCacheDB != nil && pendingConfigKey != "" && pendingConfigToplevel != "" {
323 if err := configCacheDB.SaveNixOSToplevelCacheRecord(pendingConfigKey, pendingConfigToplevel); err != nil {
324 return fmt.Errorf("save config cache: %w", err)
325 }
326 }
327 }
328
329 // mirror the engine shutdown order: ask the agent to power off first,
330 // then fall back to qemu powerdown / kill
331 shutdownCtx, cancel := context.WithTimeout(context.Background(), cmd.Duration("shutdown-timeout"))
332 defer cancel()
333 poweredOff := false
334 if err := session.Poweroff(shutdownCtx); err != nil {
335 fmt.Fprintf(os.Stderr, "agent poweroff: %s\n", err)
336 } else if err := vm.WaitContext(shutdownCtx); err == nil {
337 poweredOff = true
338 }
339 if !poweredOff {
340 if err := vm.Shutdown(shutdownCtx); err != nil {
341 fmt.Fprintf(os.Stderr, "microvm shutdown fallback: %s\n", err)
342 }
343 }
344
345 if exitCode != 0 {
346 return fmt.Errorf("guest command exited with code %d", exitCode)
347 }
348 return nil
349}
350
351func acceptExpectedVsockConn(ln *vsock.Listener, allowedCID uint32, logger *slog.Logger) (net.Conn, error) {
352 for {
353 conn, err := ln.Accept()
354 if err != nil {
355 return nil, err
356 }
357 if allowedCID == 0 {
358 return conn, nil
359 }
360 addr, ok := conn.RemoteAddr().(*vsock.Addr)
361 if ok && addr.ContextID == allowedCID {
362 return conn, nil
363 }
364 remote := conn.RemoteAddr()
365 _ = conn.Close()
366 logger.Warn("dropped agent connection from unexpected cid", "remote", remote, "expected", allowedCID)
367 }
368}