Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 "path/filepath" 13 "slices" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 "gopkg.in/yaml.v3" 19 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/log" 22 "tangled.org/core/spindle/agentproto" 23 agentv1 "tangled.org/core/spindle/agentproto/gen" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/db" 26 "tangled.org/core/spindle/engine" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/secrets" 29) 30 31const ( 32 guestWorkDir = "/workspace/repo" 33 guestBasePATH = "/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" 34 guestDevShellEnvPath = "/run/spindle/devshell-env.sh" 35 activationStepAction = "activate-config" 36 agentAcceptTimeout = 2 * time.Minute 37 agentHandshakeTimeout = 30 * time.Second 38 cacheDrainTimeout = 5 * time.Minute 39 vmShutdownTimeout = 10 * time.Second 40 guestTimeoutGrace = 5 * time.Second 41) 42 43type cleanupFunc func(context.Context) error 44 45type Engine struct { 46 l *slog.Logger 47 cfg *config.Config 48 db *db.DB 49 agent *agentHub 50 scheduler *engine.ResourceScheduler[Resources] 51 cgroupParent *CgroupParent 52 53 cleanupMu sync.Mutex 54 cleanup map[string][]cleanupFunc 55} 56 57type Step struct { 58 name string 59 kind models.StepKind 60 command string 61 environment map[string]string 62 action string 63 config manifestConfig 64 configKey string 65} 66 67func (s Step) Name() string { return s.name } 68func (s Step) Command() string { return s.command } 69func (s Step) Kind() models.StepKind { return s.kind } 70 71func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) { 72 l := log.FromContext(ctx).With("component", "engine.microvm") 73 port := cfg.MicroVMPipelines.AgentPort 74 if port == 0 { 75 port = agentproto.DefaultPort 76 } 77 agent, err := newAgentHub(port, l) 78 if err != nil { 79 return nil, err 80 } 81 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines) 82 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold) 83 84 var cgroupParent *CgroupParent 85 if cfg.MicroVMPipelines.EnableCgroups { 86 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l) 87 if err != nil { 88 return nil, err 89 } 90 } 91 92 return &Engine{ 93 l: l, 94 cfg: cfg, 95 db: d, 96 agent: agent, 97 scheduler: engine.NewResourceScheduler(budget, max, agingThreshold), 98 cgroupParent: cgroupParent, 99 cleanup: make(map[string][]cleanupFunc), 100 }, nil 101} 102 103func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 104 swf := &models.Workflow{} 105 var dwf manifestWorkflow 106 107 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil { 108 return nil, err 109 } 110 111 for _, dstep := range dwf.Steps { 112 swf.Steps = append(swf.Steps, Step{ 113 name: dstep.Name, 114 kind: models.StepKindUser, 115 command: dstep.Command, 116 environment: dstep.Environment, 117 }) 118 } 119 swf.Name = twf.Name 120 swf.Environment = dwf.Environment 121 122 if tpl.TriggerMetadata != nil { 123 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" { 124 swf.Steps = append([]models.Step{clone}, swf.Steps...) 125 } 126 } 127 128 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image) 129 if err != nil { 130 return nil, err 131 } 132 configKey := "" 133 config := manifestConfig{ 134 Services: dwf.Services, 135 Virtualisation: dwf.Virtualisation, 136 Dependencies: dwf.Dependencies, 137 Registry: dwf.Registry, 138 } 139 if config.Enabled() { 140 if !imageSpec.SupportsConfigActivation() { 141 return nil, fmt.Errorf( 142 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image", 143 imageName, 144 ) 145 } 146 var err error 147 configKey, err = buildConfigKey(imageSpec, config) 148 if err != nil { 149 return nil, fmt.Errorf("build config key: %w", err) 150 } 151 activationStep := Step{ 152 name: "NixOS config activation", 153 kind: models.StepKindSystem, 154 command: "activate nixos config", 155 action: activationStepAction, 156 config: config, 157 configKey: configKey, 158 } 159 160 insertAt := 0 161 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem { 162 insertAt = 1 163 } 164 swf.Steps = append(swf.Steps, nil) 165 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:]) 166 swf.Steps[insertAt] = activationStep 167 } 168 169 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches) 170 if err != nil { 171 return nil, err 172 } 173 174 swf.Data = &workflowState{ 175 ImageSpec: imageSpec, 176 ImageSpecPath: imageSpecPath, 177 Config: config, 178 ConfigKey: configKey, 179 Image: imageName, 180 CacheReadURLs: cacheURLs, 181 CacheTrustedPublicKeys: cacheKeys, 182 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db), 183 } 184 return swf, nil 185} 186 187func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 188 l := e.l.With("workflow", wid) 189 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem} 190 191 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0}) 192 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0}) 193 194 state, ok := wf.Data.(*workflowState) 195 if !ok || state == nil { 196 return fmt.Errorf("workflow state is not initialized") 197 } 198 199 cid, err := AllocateCID() 200 if err != nil { 201 return err 202 } 203 connCh, unregister, err := e.agent.expect(cid) 204 if err != nil { 205 return err 206 } 207 defer unregister() 208 209 workDirBase := e.cfg.MicroVMPipelines.OverlayDir 210 if workDirBase == "" { 211 workDirBase = os.TempDir() 212 } 213 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*") 214 if err != nil { 215 return fmt.Errorf("create workflow microVM directory: %w", err) 216 } 217 state.WorkDir = workDir 218 219 setupDone := false 220 defer func() { 221 if setupDone { 222 return 223 } 224 if detail := vmCrashLog(state.VM); detail != "" { 225 l.Error("microVM setup failed", "detail", detail) 226 } 227 if err := e.cleanupState(context.Background(), wid, state); err != nil { 228 l.Error("failed to cleanup failed setup", "error", err) 229 } 230 }() 231 232 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs) 233 if err != nil { 234 return err 235 } 236 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l) 237 if err != nil { 238 return err 239 } 240 state.ReadCache = readCache 241 stagingDir := filepath.Join(workDir, "upload-cache") 242 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l) 243 if err != nil { 244 return err 245 } 246 state.UploadCache = uploadCache 247 dnsProxy, err := StartDNSProxy(ctx, cid, l) 248 if err != nil { 249 return err 250 } 251 state.DNSProxy = dnsProxy 252 253 port := e.cfg.MicroVMPipelines.AgentPort 254 if port == 0 { 255 port = agentproto.DefaultPort 256 } 257 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port) 258 259 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image) 260 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir) 261 262 var vm VMHandle 263 vm, err = StartVM(ctx, VMConfig{ 264 Image: state.ImageSpec, 265 CID: cid, 266 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM, 267 WorkDir: workDir, 268 Cgroup: e.cgroupLimits(wid, state.ImageSpec), 269 Dev: e.cfg.Server.Dev, 270 }, l) 271 if err != nil { 272 return err 273 } 274 state.VM = vm 275 276 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout) 277 defer cancelAccept() 278 conn, err := waitAgentConn(acceptCtx, connCh) 279 if err != nil { 280 return err 281 } 282 283 agentSession := NewAgentSession(conn, l) 284 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout) 285 defer cancelInit() 286 if err := agentSession.Init(initCtx, &agentv1.Init{ 287 JobId: wid.String(), 288 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...), 289 CacheReadProxyPort: readCache.Port(), 290 CacheUploadProxyPort: uploadCache.Port(), 291 DnsProxyPort: dnsProxy.Port(), 292 }); err != nil { 293 _ = agentSession.Close() 294 return err 295 } 296 state.Agent = agentSession 297 wf.Data = state 298 299 e.registerCleanup(wid, func(ctx context.Context) error { 300 return e.cleanupState(ctx, wid, state) 301 }) 302 setupDone = true 303 304 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), 305 "agent connected; serial log: %s\n", vm.Logs().Serial, 306 ) 307 return nil 308} 309 310func applyDepsSource(command string) string { 311 return fmt.Sprintf( 312 // check if it exists because not all images have this 313 `if [ -f %s ]; then . %s; export PATH="$PATH:%s"; fi; %s`, 314 guestDevShellEnvPath, guestDevShellEnvPath, guestBasePATH, command, 315 ) 316} 317 318func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 319 state, ok := w.Data.(*workflowState) 320 if !ok || state == nil || state.Agent == nil { 321 return fmt.Errorf("microVM workflow is not connected to agent") 322 } 323 324 stderr := wfLogger.DataWriter(idx, "stderr") 325 326 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM) 327 defer cancelWatch() 328 329 step := w.Steps[idx] 330 if s, ok := step.(Step); ok && s.action == activationStepAction { 331 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout")) 332 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 333 } 334 env := []string{ 335 "HOME=/workspace", 336 "LOGNAME=" + guestWorkflowUser, 337 "PATH=" + guestBasePATH, 338 "USER=" + guestWorkflowUser, 339 } 340 for k, v := range w.Environment { 341 env = append(env, k+"="+v) 342 } 343 for _, s := range secrets { 344 env = append(env, s.Key+"="+s.Value) 345 } 346 if s, ok := step.(Step); ok { 347 for k, v := range s.environment { 348 env = append(env, k+"="+v) 349 } 350 } 351 352 stdout := wfLogger.DataWriter(idx, "stdout") 353 exitCode, err := state.Agent.Exec(execCtx, AgentExec{ 354 ID: fmt.Sprintf("%s-%d", wid.String(), idx), 355 ExecStart: &agentv1.ExecStart{ 356 Argv: []string{state.ImageSpec.Shell, "-lc", applyDepsSource(step.Command())}, 357 Env: env, 358 Cwd: guestWorkDir, 359 User: guestWorkflowUser, 360 // timeout not set here, Exec will fill it 361 }, 362 Stdout: stdout, 363 Stderr: stderr, 364 }) 365 if err != nil { 366 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 367 } 368 369 if exitCode != 0 { 370 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode) 371 return engine.ErrWorkflowFailed 372 } 373 return nil 374} 375 376// reads the vm serial logs so we report the tail of that as an error instead of 377// just "guest agent connection lost: EOF" 378func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error { 379 if err == nil { 380 return nil 381 } 382 l := e.l.With("workflow", wid, "step", step.Name()) 383 384 if vmExited != nil && vmExited.Load() { 385 reason := "microVM exited unexpectedly" 386 oom := state.VM != nil && state.VM.OOMKilled() 387 if oom { 388 reason = "microVM killed by OOM (cgroup memory limit exceeded)" 389 } 390 if detail := vmCrashLog(state.VM); detail != "" { 391 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail) 392 l.Error(reason, "oom", oom, "detail", detail) 393 } else { 394 fmt.Fprintln(stderr, reason) 395 l.Error(reason, "oom", oom) 396 } 397 return errors.New(reason + "; see workflow logs for serial output") 398 } 399 400 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil { 401 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut)) 402 return engine.ErrTimedOut 403 } 404 405 // the agent connection dropped while qemu stayed up (eg. the guest kernel 406 // OOM-killed the agent or a guest panic), so surface serial logs, those 407 // will be more helpful. 408 if detail := vmCrashLog(state.VM); detail != "" { 409 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail) 410 l.Error("step failed", "error", err, "detail", detail) 411 } else { 412 l.Error("step failed", "error", err) 413 } 414 return err 415} 416 417func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error { 418 cfg := step.config 419 if !cfg.Enabled() { 420 return nil 421 } 422 423 configKey := step.configKey 424 if configKey == "" { 425 configKey = state.ConfigKey 426 } 427 428 userConfigJSON, err := json.Marshal(cfg) 429 if err != nil { 430 return fmt.Errorf("encode user config: %w", err) 431 } 432 433 var cachedToplevel string 434 if configKey != "" { 435 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil { 436 return err 437 } else if ok { 438 // todo(dawn): we should probably use gc roots to eliminate TOCTOU 439 // the spindle will have to manage the gc roots, and for remote we have to 440 // ssh in to the host and add / remove gc root. 441 // we need to have this check anyway since the only check http caches can 442 // use is this one, since we cant manage gc roots there... 443 if e.anyCacheHasPath(ctx, state, record.Toplevel) { 444 cachedToplevel = record.Toplevel 445 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel) 446 } 447 } 448 } 449 if cachedToplevel == "" { 450 fmt.Fprintf(out, "building NixOS config from user config\n") 451 } 452 453 baseHash, err := BaseConfigHash(state.ImageSpec) 454 if err != nil { 455 return fmt.Errorf("calculate base config hash: %w", err) 456 } 457 458 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{ 459 ConfigKey: configKey, 460 BaseConfigHash: baseHash, 461 UserConfig: string(userConfigJSON), 462 Toplevel: cachedToplevel, 463 }, out) 464 if err != nil { 465 return err 466 } 467 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel) 468 469 if cachedToplevel != "" || configKey == "" { 470 return nil 471 } 472 if e.cfg.NixCache.UploadURL == "" { 473 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel) 474 return nil 475 } 476 477 if err := e.drainNixCache(ctx, state); err != nil { 478 // a partial upload would leave the cache unable to realize this toplevel, 479 // so skip the metadata commit rather than poison it with an un-realizable 480 // key. the config still activated fine, so don't fail the workflow. 481 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err) 482 return nil 483 } 484 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil { 485 return err 486 } 487 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel) 488 return nil 489} 490 491func (e *Engine) anyCacheHasPath(ctx context.Context, state *workflowState, storePath string) bool { 492 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs) 493 if err != nil { 494 e.l.Warn("config cache check: build upstreams failed; treating as absent", "path", storePath, "error", err) 495 return false 496 } 497 if len(upstreams) == 0 { 498 return false 499 } 500 hash, _, err := parseStorePath(storePath) 501 if err != nil { 502 e.l.Warn("config cache check: invalid toplevel path; treating as absent", "path", storePath, "error", err) 503 return false 504 } 505 req, err := http.NewRequestWithContext(ctx, http.MethodHead, "http://upstream/"+hash+".narinfo", nil) 506 if err != nil { 507 e.l.Warn("config cache check: build request failed; treating as absent", "path", storePath, "error", err) 508 return false 509 } 510 resp, err := newNarinfoExistenceTransport(upstreams, e.l).RoundTrip(req) 511 if err != nil { 512 e.l.Warn("config cache check: narinfo probe failed; treating as absent", "path", storePath, "error", err) 513 return false 514 } 515 defer resp.Body.Close() 516 _, _ = io.Copy(io.Discard, resp.Body) 517 return resp.StatusCode == http.StatusOK 518} 519 520func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 521 fns := e.drainCleanups(wid) 522 523 var cleanupErr error 524 for i := len(fns) - 1; i >= 0; i-- { 525 if err := fns[i](ctx); err != nil { 526 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 527 cleanupErr = errors.Join(cleanupErr, err) 528 } 529 } 530 return cleanupErr 531} 532 533func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error { 534 return nil 535} 536 537func (e *Engine) WorkflowTimeout() time.Duration { 538 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout) 539 if err != nil { 540 d = 5 * time.Minute 541 } 542 return d + guestTimeoutGrace 543} 544 545func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 546 e.cleanupMu.Lock() 547 defer e.cleanupMu.Unlock() 548 key := wid.String() 549 e.cleanup[key] = append(e.cleanup[key], fn) 550} 551 552func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 553 e.cleanupMu.Lock() 554 defer e.cleanupMu.Unlock() 555 key := wid.String() 556 fns := e.cleanup[key] 557 delete(e.cleanup, key) 558 return fns 559} 560 561func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits { 562 cfg := e.cfg.MicroVMPipelines 563 return CgroupLimits{ 564 Enabled: cfg.EnableCgroups, 565 Parent: e.cgroupParent, 566 Name: "workflow-" + wid.String(), 567 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB, 568 SwapMaxMiB: cfg.CgroupSwapMaxMiB, 569 PidsMax: cfg.CgroupPidsMax, 570 } 571}