Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "os" 11 "path/filepath" 12 "slices" 13 "sync" 14 "sync/atomic" 15 "time" 16 17 "gopkg.in/yaml.v3" 18 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/log" 21 "tangled.org/core/spindle/agentproto" 22 agentv1 "tangled.org/core/spindle/agentproto/gen" 23 "tangled.org/core/spindle/config" 24 "tangled.org/core/spindle/db" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 guestWorkDir = "/workspace/repo" 32 guestBasePATH = "/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" 33 guestDevShellEnvPath = "/run/spindle/devshell-env.sh" 34 activationStepAction = "activate-config" 35 agentAcceptTimeout = 2 * time.Minute 36 agentHandshakeTimeout = 30 * time.Second 37 cacheDrainTimeout = 5 * time.Minute 38 vmShutdownTimeout = 10 * time.Second 39 guestTimeoutGrace = 5 * time.Second 40) 41 42type cleanupFunc func(context.Context) error 43 44type Engine struct { 45 l *slog.Logger 46 cfg *config.Config 47 db *db.DB 48 agent *agentHub 49 scheduler *engine.ResourceScheduler[Resources] 50 cgroupParent *CgroupParent 51 52 cleanupMu sync.Mutex 53 cleanup map[string][]cleanupFunc 54} 55 56type Step struct { 57 name string 58 kind models.StepKind 59 command string 60 environment map[string]string 61 action string 62 config manifestConfig 63 configKey string 64} 65 66func (s Step) Name() string { return s.name } 67func (s Step) Command() string { return s.command } 68func (s Step) Kind() models.StepKind { return s.kind } 69 70func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) { 71 l := log.FromContext(ctx).With("component", "engine.microvm") 72 port := cfg.MicroVMPipelines.AgentPort 73 if port == 0 { 74 port = agentproto.DefaultPort 75 } 76 agent, err := newAgentHub(port, l) 77 if err != nil { 78 return nil, err 79 } 80 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines) 81 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold) 82 83 var cgroupParent *CgroupParent 84 if cfg.MicroVMPipelines.EnableCgroups { 85 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l) 86 if err != nil { 87 return nil, err 88 } 89 } 90 91 return &Engine{ 92 l: l, 93 cfg: cfg, 94 db: d, 95 agent: agent, 96 scheduler: engine.NewResourceScheduler(budget, max, agingThreshold), 97 cgroupParent: cgroupParent, 98 cleanup: make(map[string][]cleanupFunc), 99 }, nil 100} 101 102func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 103 swf := &models.Workflow{} 104 var dwf manifestWorkflow 105 106 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil { 107 return nil, err 108 } 109 110 for _, dstep := range dwf.Steps { 111 swf.Steps = append(swf.Steps, Step{ 112 name: dstep.Name, 113 kind: models.StepKindUser, 114 command: dstep.Command, 115 environment: dstep.Environment, 116 }) 117 } 118 swf.Name = twf.Name 119 swf.Environment = dwf.Environment 120 121 if tpl.TriggerMetadata != nil { 122 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" { 123 swf.Steps = append([]models.Step{clone}, swf.Steps...) 124 } 125 } 126 127 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image) 128 if err != nil { 129 return nil, err 130 } 131 configKey := "" 132 config := manifestConfig{ 133 Services: dwf.Services, 134 Virtualisation: dwf.Virtualisation, 135 Dependencies: dwf.Dependencies, 136 Registry: dwf.Registry, 137 } 138 if config.Enabled() { 139 if !imageSpec.SupportsConfigActivation() { 140 return nil, fmt.Errorf( 141 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image", 142 imageName, 143 ) 144 } 145 var err error 146 configKey, err = buildConfigKey(imageSpec, config) 147 if err != nil { 148 return nil, fmt.Errorf("build config key: %w", err) 149 } 150 activationStep := Step{ 151 name: "NixOS config activation", 152 kind: models.StepKindSystem, 153 command: "activate nixos config", 154 action: activationStepAction, 155 config: config, 156 configKey: configKey, 157 } 158 159 insertAt := 0 160 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem { 161 insertAt = 1 162 } 163 swf.Steps = append(swf.Steps, nil) 164 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:]) 165 swf.Steps[insertAt] = activationStep 166 } 167 168 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches) 169 if err != nil { 170 return nil, err 171 } 172 173 swf.Data = &workflowState{ 174 ImageSpec: imageSpec, 175 ImageSpecPath: imageSpecPath, 176 Config: config, 177 ConfigKey: configKey, 178 Image: imageName, 179 CacheReadURLs: cacheURLs, 180 CacheTrustedPublicKeys: cacheKeys, 181 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db), 182 } 183 return swf, nil 184} 185 186func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 187 l := e.l.With("workflow", wid) 188 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem} 189 190 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0}) 191 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0}) 192 193 state, ok := wf.Data.(*workflowState) 194 if !ok || state == nil { 195 return fmt.Errorf("workflow state is not initialized") 196 } 197 198 cid, err := AllocateCID() 199 if err != nil { 200 return err 201 } 202 connCh, unregister, err := e.agent.expect(cid) 203 if err != nil { 204 return err 205 } 206 defer unregister() 207 208 workDirBase := e.cfg.MicroVMPipelines.OverlayDir 209 if workDirBase == "" { 210 workDirBase = os.TempDir() 211 } 212 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*") 213 if err != nil { 214 return fmt.Errorf("create workflow microVM directory: %w", err) 215 } 216 state.WorkDir = workDir 217 218 setupDone := false 219 defer func() { 220 if setupDone { 221 return 222 } 223 if detail := vmCrashLog(state.VM); detail != "" { 224 l.Error("microVM setup failed", "detail", detail) 225 } 226 if err := e.cleanupState(context.Background(), wid, state); err != nil { 227 l.Error("failed to cleanup failed setup", "error", err) 228 } 229 }() 230 231 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs) 232 if err != nil { 233 return err 234 } 235 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l) 236 if err != nil { 237 return err 238 } 239 state.ReadCache = readCache 240 stagingDir := filepath.Join(workDir, "upload-cache") 241 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l) 242 if err != nil { 243 return err 244 } 245 state.UploadCache = uploadCache 246 dnsProxy, err := StartDNSProxy(ctx, cid, l) 247 if err != nil { 248 return err 249 } 250 state.DNSProxy = dnsProxy 251 252 port := e.cfg.MicroVMPipelines.AgentPort 253 if port == 0 { 254 port = agentproto.DefaultPort 255 } 256 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port) 257 258 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image) 259 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir) 260 261 var vm VMHandle 262 vm, err = StartVM(ctx, VMConfig{ 263 Image: state.ImageSpec, 264 CID: cid, 265 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM, 266 WorkDir: workDir, 267 Cgroup: e.cgroupLimits(wid, state.ImageSpec), 268 Dev: e.cfg.Server.Dev, 269 }, l) 270 if err != nil { 271 return err 272 } 273 state.VM = vm 274 275 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout) 276 defer cancelAccept() 277 conn, err := waitAgentConn(acceptCtx, connCh) 278 if err != nil { 279 return err 280 } 281 282 agentSession := NewAgentSession(conn, l) 283 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout) 284 defer cancelInit() 285 if err := agentSession.Init(initCtx, &agentv1.Init{ 286 JobId: wid.String(), 287 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...), 288 CacheReadProxyPort: readCache.Port(), 289 CacheUploadProxyPort: uploadCache.Port(), 290 DnsProxyPort: dnsProxy.Port(), 291 }); err != nil { 292 _ = agentSession.Close() 293 return err 294 } 295 state.Agent = agentSession 296 wf.Data = state 297 298 e.registerCleanup(wid, func(ctx context.Context) error { 299 return e.cleanupState(ctx, wid, state) 300 }) 301 setupDone = true 302 303 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), 304 "agent connected; serial log: %s\n", vm.Logs().Serial, 305 ) 306 return nil 307} 308 309func applyDepsSource(command string) string { 310 return fmt.Sprintf( 311 // check if it exists because not all images have this 312 `if [ -f %s ]; then . %s; export PATH="$PATH:%s"; fi; %s`, 313 guestDevShellEnvPath, guestDevShellEnvPath, guestBasePATH, command, 314 ) 315} 316 317func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 318 state, ok := w.Data.(*workflowState) 319 if !ok || state == nil || state.Agent == nil { 320 return fmt.Errorf("microVM workflow is not connected to agent") 321 } 322 323 stderr := wfLogger.DataWriter(idx, "stderr") 324 325 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM) 326 defer cancelWatch() 327 328 step := w.Steps[idx] 329 if s, ok := step.(Step); ok && s.action == activationStepAction { 330 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout")) 331 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 332 } 333 env := []string{ 334 "HOME=/workspace", 335 "LOGNAME=" + guestWorkflowUser, 336 "PATH=" + guestBasePATH, 337 "USER=" + guestWorkflowUser, 338 } 339 for k, v := range w.Environment { 340 env = append(env, k+"="+v) 341 } 342 for _, s := range secrets { 343 env = append(env, s.Key+"="+s.Value) 344 } 345 if s, ok := step.(Step); ok { 346 for k, v := range s.environment { 347 env = append(env, k+"="+v) 348 } 349 } 350 351 stdout := wfLogger.DataWriter(idx, "stdout") 352 exitCode, err := state.Agent.Exec(execCtx, AgentExec{ 353 ID: fmt.Sprintf("%s-%d", wid.String(), idx), 354 ExecStart: &agentv1.ExecStart{ 355 Argv: []string{state.ImageSpec.Shell, "-lc", applyDepsSource(step.Command())}, 356 Env: env, 357 Cwd: guestWorkDir, 358 User: guestWorkflowUser, 359 // timeout not set here, Exec will fill it 360 }, 361 Stdout: stdout, 362 Stderr: stderr, 363 }) 364 if err != nil { 365 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 366 } 367 368 if exitCode != 0 { 369 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode) 370 return engine.ErrWorkflowFailed 371 } 372 return nil 373} 374 375// reads the vm serial logs so we report the tail of that as an error instead of 376// just "guest agent connection lost: EOF" 377func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error { 378 if err == nil { 379 return nil 380 } 381 l := e.l.With("workflow", wid, "step", step.Name()) 382 383 if vmExited != nil && vmExited.Load() { 384 reason := "microVM exited unexpectedly" 385 oom := state.VM != nil && state.VM.OOMKilled() 386 if oom { 387 reason = "microVM killed by OOM (cgroup memory limit exceeded)" 388 } 389 if detail := vmCrashLog(state.VM); detail != "" { 390 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail) 391 l.Error(reason, "oom", oom, "detail", detail) 392 } else { 393 fmt.Fprintln(stderr, reason) 394 l.Error(reason, "oom", oom) 395 } 396 return errors.New(reason + "; see workflow logs for serial output") 397 } 398 399 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil { 400 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut)) 401 return engine.ErrTimedOut 402 } 403 404 // the agent connection dropped while qemu stayed up (eg. the guest kernel 405 // OOM-killed the agent or a guest panic), so surface serial logs, those 406 // will be more helpful. 407 if detail := vmCrashLog(state.VM); detail != "" { 408 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail) 409 l.Error("step failed", "error", err, "detail", detail) 410 } else { 411 l.Error("step failed", "error", err) 412 } 413 return err 414} 415 416func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error { 417 cfg := step.config 418 if !cfg.Enabled() { 419 return nil 420 } 421 422 configKey := step.configKey 423 if configKey == "" { 424 configKey = state.ConfigKey 425 } 426 427 userConfigJSON, err := json.Marshal(cfg) 428 if err != nil { 429 return fmt.Errorf("encode user config: %w", err) 430 } 431 432 var cachedToplevel string 433 if configKey != "" { 434 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil { 435 return err 436 } else if ok { 437 cachedToplevel = record.Toplevel 438 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel) 439 } 440 } 441 if cachedToplevel == "" { 442 fmt.Fprintf(out, "building NixOS config from user config\n") 443 } 444 445 baseHash, err := BaseConfigHash(state.ImageSpec) 446 if err != nil { 447 return fmt.Errorf("calculate base config hash: %w", err) 448 } 449 450 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{ 451 ConfigKey: configKey, 452 BaseConfigHash: baseHash, 453 UserConfig: string(userConfigJSON), 454 Toplevel: cachedToplevel, 455 }, out) 456 if err != nil { 457 return err 458 } 459 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel) 460 461 if cachedToplevel != "" || configKey == "" { 462 return nil 463 } 464 if e.cfg.NixCache.UploadURL == "" { 465 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel) 466 return nil 467 } 468 469 if err := e.drainNixCache(ctx, state); err != nil { 470 // a partial upload would leave the cache unable to realize this toplevel, 471 // so skip the metadata commit rather than poison it with an un-realizable 472 // key. the config still activated fine, so don't fail the workflow. 473 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err) 474 return nil 475 } 476 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil { 477 return err 478 } 479 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel) 480 return nil 481} 482 483func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 484 fns := e.drainCleanups(wid) 485 486 var cleanupErr error 487 for i := len(fns) - 1; i >= 0; i-- { 488 if err := fns[i](ctx); err != nil { 489 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 490 cleanupErr = errors.Join(cleanupErr, err) 491 } 492 } 493 return cleanupErr 494} 495 496func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error { 497 return nil 498} 499 500func (e *Engine) WorkflowTimeout() time.Duration { 501 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout) 502 if err != nil { 503 d = 5 * time.Minute 504 } 505 return d + guestTimeoutGrace 506} 507 508func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 509 e.cleanupMu.Lock() 510 defer e.cleanupMu.Unlock() 511 key := wid.String() 512 e.cleanup[key] = append(e.cleanup[key], fn) 513} 514 515func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 516 e.cleanupMu.Lock() 517 defer e.cleanupMu.Unlock() 518 key := wid.String() 519 fns := e.cleanup[key] 520 delete(e.cleanup, key) 521 return fns 522} 523 524func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits { 525 cfg := e.cfg.MicroVMPipelines 526 return CgroupLimits{ 527 Enabled: cfg.EnableCgroups, 528 Parent: e.cgroupParent, 529 Name: "workflow-" + wid.String(), 530 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB, 531 SwapMaxMiB: cfg.CgroupSwapMaxMiB, 532 PidsMax: cfg.CgroupPidsMax, 533 } 534}