Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

1//go:build linux 2 3package main 4 5import ( 6 "context" 7 "database/sql" 8 "errors" 9 "fmt" 10 "log/slog" 11 "net" 12 "os" 13 "path/filepath" 14 "time" 15 16 "github.com/mdlayher/vsock" 17 "github.com/urfave/cli/v3" 18 agentv1 "tangled.org/core/spindle/agentproto/gen" 19 "tangled.org/core/spindle/db" 20 "tangled.org/core/spindle/engines/microvm" 21) 22 23func SpindleMicroVMRunCommand() *cli.Command { 24 return &cli.Command{ 25 Name: "spindle-microvm-run", 26 Usage: "launch the Spindle base microVM and run one command over vsock", 27 Flags: []cli.Flag{ 28 &cli.StringFlag{ 29 Name: "image-spec", 30 Sources: cli.EnvVars("SPINDLE_MICROVM_IMAGE_SPEC"), 31 Usage: "path to microVM image spec JSON", 32 }, 33 &cli.StringFlag{ 34 Name: "mkfs-ext4", 35 Usage: "override mkfs.ext4 binary", 36 }, 37 &cli.StringFlag{ 38 Name: "work-dir", 39 Usage: "directory for per-run disks and sockets", 40 }, 41 &cli.UintFlag{ 42 Name: "cid", 43 Usage: "guest vsock CID; defaults to a random high CID", 44 }, 45 &cli.UintFlag{ 46 Name: "port", 47 Value: 10240, 48 Usage: "host vsock port to listen on", 49 }, 50 &cli.UintFlag{ 51 Name: "memory-mib", 52 Usage: "override the guest memory size in MiB (defaults to the image spec)", 53 }, 54 &cli.BoolFlag{ 55 Name: "disable-kvm", 56 Usage: "run without -enable-kvm even if /dev/kvm is available", 57 }, 58 &cli.BoolFlag{ 59 Name: "dev", 60 Usage: "enable dev mode (allows host network access, disables SSL verification)", 61 }, 62 &cli.DurationFlag{ 63 Name: "qmp-timeout", 64 Value: 10 * time.Second, 65 Usage: "how long to wait for qmp to become ready", 66 }, 67 &cli.DurationFlag{ 68 Name: "accept-timeout", 69 Value: 15 * time.Second, 70 Usage: "how long to wait for the guest agent after qemu starts", 71 }, 72 &cli.DurationFlag{ 73 Name: "exec-timeout", 74 Value: 30 * time.Second, 75 Usage: "timeout for the guest command", 76 }, 77 &cli.DurationFlag{ 78 Name: "cache-upload-wait-timeout", 79 Aliases: []string{"cache-drain-timeout"}, 80 Value: 5 * time.Minute, 81 Usage: "how long to wait for guest cache uploads to finish after the command exits", 82 }, 83 &cli.DurationFlag{ 84 Name: "shutdown-timeout", 85 Value: 10 * time.Second, 86 Usage: "how long to wait for qemu to exit after guest powerdown", 87 }, 88 &cli.StringFlag{ 89 Name: "cwd", 90 Usage: "guest working directory", 91 }, 92 &cli.StringSliceFlag{ 93 Name: "cache-read-url", 94 Sources: cli.EnvVars("SPINDLE_NIX_CACHE_READ_URLS"), 95 Usage: "Nix binary cache URL to pass to the guest; repeatable", 96 }, 97 &cli.StringSliceFlag{ 98 Name: "cache-trusted-public-key", 99 Sources: cli.EnvVars("SPINDLE_NIX_CACHE_TRUSTED_PUBLIC_KEYS"), 100 Usage: "Nix binary cache public key to trust in the guest; repeatable", 101 }, 102 &cli.StringFlag{ 103 Name: "cache-upload-url", 104 Sources: cli.EnvVars("SPINDLE_NIX_CACHE_UPLOAD_URL"), 105 Usage: "optional cache upload URL for guest-built store paths", 106 }, 107 &cli.StringFlag{ 108 Name: "activate-config", 109 Usage: "JSON user config to activate before exec (e.g. '{\"services\":{\"openssh\":{\"enable\":true}}}')", 110 }, 111 &cli.StringFlag{ 112 Name: "db", 113 Usage: "path to sqlite database for config cache", 114 }, 115 }, 116 Action: runMicroVMRunDev, 117 } 118} 119 120func runMicroVMRunDev(ctx context.Context, cmd *cli.Command) error { 121 imageSpecPath := cmd.String("image-spec") 122 if imageSpecPath == "" { 123 return fmt.Errorf("--image-spec or SPINDLE_MICROVM_IMAGE_SPEC is required") 124 } 125 126 imageSpec, err := microvm.LoadImageSpec(imageSpecPath) 127 if err != nil { 128 return err 129 } 130 131 port := uint32(cmd.Uint("port")) 132 // tell the guest which host vsock port to dial back on. shuttle reads the 133 // cmdline params this is so we can run multiple of this process 134 // concurrently, because otherwise it listens on a specific vsock port, and 135 // we cant bind to the same port twice... 136 imageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", imageSpec.BootArgs, port) 137 if mib := cmd.Uint("memory-mib"); mib > 0 { 138 imageSpec.MemoryMiB = int(mib) 139 } 140 ln, err := vsock.Listen(port, nil) 141 if err != nil { 142 return fmt.Errorf("listen on vsock port %d: %w", port, err) 143 } 144 defer ln.Close() 145 146 vm, err := microvm.StartVM(ctx, microvm.VMConfig{ 147 Image: imageSpec, 148 BootTimeout: cmd.Duration("qmp-timeout"), 149 CID: uint32(cmd.Uint("cid")), 150 EnableKVM: !cmd.Bool("disable-kvm"), 151 MkfsExt4: cmd.String("mkfs-ext4"), 152 WorkDir: cmd.String("work-dir"), 153 Dev: cmd.Bool("dev"), 154 }, slog.Default()) 155 if err != nil { 156 return err 157 } 158 defer vm.Close() 159 160 logs := vm.Logs() 161 fmt.Fprintf(os.Stderr, "microvm started: cid=%d work-dir=%s serial-log=%s qemu-log=%s\n", 162 vm.CID(), 163 vm.WorkDir(), 164 logs.Serial, 165 logs.Extra["qemu"], 166 ) 167 168 logger := slog.Default() 169 170 if cmd.Duration("accept-timeout") > 0 { 171 if err := ln.SetDeadline(time.Now().Add(cmd.Duration("accept-timeout"))); err != nil { 172 return fmt.Errorf("set accept deadline: %w", err) 173 } 174 } 175 176 argv := cmd.Args().Slice() 177 if len(argv) == 0 { 178 argv = []string{"/run/current-system/sw/bin/echo", "hello-from-spindle"} 179 } 180 jobID := "spindle-microvm-run" 181 execID := "dev-1" 182 var pendingConfigKey string 183 var pendingConfigToplevel string 184 var configCacheDB *db.DB 185 186 fmt.Fprintf(os.Stderr, "listening for agent on %s\n", ln.Addr()) 187 conn, err := acceptExpectedVsockConn(ln, vm.CID(), logger) 188 if err != nil { 189 return fmt.Errorf("accept agent connection: %w", err) 190 } 191 defer conn.Close() 192 193 upstreams, err := microvm.BuildCacheUpstreams(cmd.StringSlice("cache-read-url"), nil) 194 if err != nil { 195 return fmt.Errorf("build cache upstreams: %w", err) 196 } 197 198 var readCache *microvm.ReadCacheProxy 199 if len(cmd.StringSlice("cache-read-url")) > 0 { 200 var err error 201 readCache, err = microvm.StartReadCacheProxy(ctx, vm.CID(), upstreams, logger) 202 if err != nil { 203 return fmt.Errorf("start read cache proxy: %w", err) 204 } 205 defer readCache.Close() 206 } 207 208 var uploadCache *microvm.UploadCacheProxy 209 if cmd.String("cache-upload-url") != "" { 210 var err error 211 uploadCache, err = microvm.StartUploadCacheProxy(ctx, vm.CID(), cmd.String("cache-upload-url"), upstreams, filepath.Join(vm.WorkDir(), "upload-cache"), logger) 212 if err != nil { 213 return fmt.Errorf("start upload cache proxy: %w", err) 214 } 215 defer uploadCache.Close() 216 } 217 dnsProxy, err := microvm.StartDNSProxy(ctx, vm.CID(), logger) 218 if err != nil { 219 return fmt.Errorf("start dns proxy: %w", err) 220 } 221 defer dnsProxy.Close() 222 223 session := microvm.NewAgentSession(conn, logger) 224 225 initCtx, cancelInit := context.WithTimeout(ctx, 30*time.Second) 226 defer cancelInit() 227 if err := session.Init(initCtx, &agentv1.Init{ 228 JobId: jobID, 229 CacheTrustedPublicKeys: cmd.StringSlice("cache-trusted-public-key"), 230 CacheReadProxyPort: readCache.Port(), 231 CacheUploadProxyPort: uploadCache.Port(), 232 DnsProxyPort: dnsProxy.Port(), 233 }); err != nil { 234 return fmt.Errorf("init agent: %w", err) 235 } 236 237 execCtx := ctx 238 if cmd.Duration("exec-timeout") > 0 { 239 var cancel context.CancelFunc 240 execCtx, cancel = context.WithTimeout(ctx, cmd.Duration("exec-timeout")) 241 defer cancel() 242 } 243 244 if cmd.String("activate-config") != "" { 245 actCtx := execCtx 246 baseHash, err := microvm.BaseConfigHash(imageSpec) 247 if err != nil { 248 return fmt.Errorf("calculate base config hash: %w", err) 249 } 250 251 var configKey string 252 var cachedToplevel string 253 if cmd.String("db") != "" { 254 configCacheDB, err = db.Make(ctx, cmd.String("db")) 255 if err != nil { 256 return fmt.Errorf("failed to open database: %w", err) 257 } 258 defer configCacheDB.Close() 259 260 configKey, err = microvm.BuildConfigKey(imageSpec, cmd.String("activate-config")) 261 if err != nil { 262 return fmt.Errorf("calculate config key: %w", err) 263 } 264 265 record, err := configCacheDB.GetNixOSToplevelCacheRecord(configKey) 266 if err != nil { 267 if !errors.Is(err, sql.ErrNoRows) { 268 return fmt.Errorf("lookup config cache: %w", err) 269 } 270 } else { 271 cachedToplevel = record.Toplevel 272 fmt.Printf("realizing cached NixOS config %s\n", cachedToplevel) 273 } 274 } 275 276 result, err := session.ActivateConfig(actCtx, "dev-activate", &agentv1.ActivateConfig{ 277 ConfigKey: configKey, 278 BaseConfigHash: baseHash, 279 UserConfig: cmd.String("activate-config"), 280 Toplevel: cachedToplevel, 281 }) 282 if err != nil { 283 return fmt.Errorf("activate config: %w", err) 284 } 285 fmt.Fprintf(os.Stderr, "activated config toplevel: %s\n", result.Toplevel) 286 287 if configCacheDB != nil && cachedToplevel == "" && result.Toplevel != "" && configKey != "" { 288 if uploadCache == nil { 289 fmt.Fprintln(os.Stderr, "skipping config cache metadata commit: no cache upload url configured") 290 } else { 291 pendingConfigKey = configKey 292 pendingConfigToplevel = result.Toplevel 293 } 294 } 295 } 296 297 exitCode, err := session.Exec(execCtx, microvm.AgentExec{ 298 ID: execID, 299 ExecStart: &agentv1.ExecStart{ 300 Argv: argv, 301 Cwd: cmd.String("cwd"), 302 }, 303 Stdout: os.Stdout, 304 Stderr: os.Stderr, 305 }) 306 if err != nil { 307 return err 308 } 309 310 if uploadCache != nil { 311 uploadWaitCtx := ctx 312 if cmd.Duration("cache-upload-wait-timeout") > 0 { 313 var cancel context.CancelFunc 314 uploadWaitCtx, cancel = context.WithTimeout(ctx, cmd.Duration("cache-upload-wait-timeout")) 315 defer cancel() 316 } 317 uploaded, err := session.Drain(uploadWaitCtx) 318 if err != nil { 319 return err 320 } 321 fmt.Printf("cache uploaded: %d\n", uploaded) 322 if configCacheDB != nil && pendingConfigKey != "" && pendingConfigToplevel != "" { 323 if err := configCacheDB.SaveNixOSToplevelCacheRecord(pendingConfigKey, pendingConfigToplevel); err != nil { 324 return fmt.Errorf("save config cache: %w", err) 325 } 326 } 327 } 328 329 // mirror the engine shutdown order: ask the agent to power off first, 330 // then fall back to qemu powerdown / kill 331 shutdownCtx, cancel := context.WithTimeout(context.Background(), cmd.Duration("shutdown-timeout")) 332 defer cancel() 333 poweredOff := false 334 if err := session.Poweroff(shutdownCtx); err != nil { 335 fmt.Fprintf(os.Stderr, "agent poweroff: %s\n", err) 336 } else if err := vm.WaitContext(shutdownCtx); err == nil { 337 poweredOff = true 338 } 339 if !poweredOff { 340 if err := vm.Shutdown(shutdownCtx); err != nil { 341 fmt.Fprintf(os.Stderr, "microvm shutdown fallback: %s\n", err) 342 } 343 } 344 345 if exitCode != 0 { 346 return fmt.Errorf("guest command exited with code %d", exitCode) 347 } 348 return nil 349} 350 351func acceptExpectedVsockConn(ln *vsock.Listener, allowedCID uint32, logger *slog.Logger) (net.Conn, error) { 352 for { 353 conn, err := ln.Accept() 354 if err != nil { 355 return nil, err 356 } 357 if allowedCID == 0 { 358 return conn, nil 359 } 360 addr, ok := conn.RemoteAddr().(*vsock.Addr) 361 if ok && addr.ContextID == allowedCID { 362 return conn, nil 363 } 364 remote := conn.RemoteAddr() 365 _ = conn.Close() 366 logger.Warn("dropped agent connection from unexpected cid", "remote", remote, "expected", allowedCID) 367 } 368}