Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "os" 11 "slices" 12 "sync" 13 "sync/atomic" 14 "time" 15 16 "gopkg.in/yaml.v3" 17 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/log" 20 "tangled.org/core/spindle/agentproto" 21 agentv1 "tangled.org/core/spindle/agentproto/gen" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/spindle/secrets" 27) 28 29const ( 30 guestWorkDir = "/workspace/repo" 31 activationStepAction = "activate-config" 32 agentAcceptTimeout = 2 * time.Minute 33 agentHandshakeTimeout = 30 * time.Second 34 cacheDrainTimeout = 5 * time.Minute 35 vmShutdownTimeout = 10 * time.Second 36 guestTimeoutGrace = 5 * time.Second 37) 38 39type cleanupFunc func(context.Context) error 40 41type Engine struct { 42 l *slog.Logger 43 cfg *config.Config 44 db *db.DB 45 agent *agentHub 46 scheduler *engine.ResourceScheduler[Resources] 47 cgroupParent *CgroupParent 48 49 cleanupMu sync.Mutex 50 cleanup map[string][]cleanupFunc 51} 52 53type Step struct { 54 name string 55 kind models.StepKind 56 command string 57 environment map[string]string 58 action string 59 config manifestConfig 60 configKey string 61} 62 63func (s Step) Name() string { return s.name } 64func (s Step) Command() string { return s.command } 65func (s Step) Kind() models.StepKind { return s.kind } 66 67func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) { 68 l := log.FromContext(ctx).With("component", "engine.microvm") 69 port := cfg.MicroVMPipelines.AgentPort 70 if port == 0 { 71 port = agentproto.DefaultPort 72 } 73 agent, err := newAgentHub(port, l) 74 if err != nil { 75 return nil, err 76 } 77 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines) 78 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold) 79 80 var cgroupParent *CgroupParent 81 if cfg.MicroVMPipelines.EnableCgroups { 82 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l) 83 if err != nil { 84 return nil, err 85 } 86 } 87 88 return &Engine{ 89 l: l, 90 cfg: cfg, 91 db: d, 92 agent: agent, 93 scheduler: engine.NewResourceScheduler[Resources](budget, max, agingThreshold), 94 cgroupParent: cgroupParent, 95 cleanup: make(map[string][]cleanupFunc), 96 }, nil 97} 98 99func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 100 swf := &models.Workflow{} 101 var dwf manifestWorkflow 102 103 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil { 104 return nil, err 105 } 106 107 for _, dstep := range dwf.Steps { 108 swf.Steps = append(swf.Steps, Step{ 109 name: dstep.Name, 110 kind: models.StepKindUser, 111 command: dstep.Command, 112 environment: dstep.Environment, 113 }) 114 } 115 swf.Name = twf.Name 116 swf.Environment = dwf.Environment 117 118 if tpl.TriggerMetadata != nil { 119 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" { 120 swf.Steps = append([]models.Step{clone}, swf.Steps...) 121 } 122 } 123 124 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image) 125 if err != nil { 126 return nil, err 127 } 128 configKey := "" 129 config := manifestConfig{ 130 Services: dwf.Services, 131 Virtualisation: dwf.Virtualisation, 132 Dependencies: dwf.Dependencies, 133 Registry: dwf.Registry, 134 } 135 if config.Enabled() { 136 if !imageSpec.SupportsConfigActivation() { 137 return nil, fmt.Errorf( 138 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image", 139 imageName, 140 ) 141 } 142 var err error 143 configKey, err = buildConfigKey(imageSpec, config) 144 if err != nil { 145 return nil, fmt.Errorf("build config key: %w", err) 146 } 147 activationStep := Step{ 148 name: "NixOS config activation", 149 kind: models.StepKindSystem, 150 command: "activate nixos config", 151 action: activationStepAction, 152 config: config, 153 configKey: configKey, 154 } 155 156 insertAt := 0 157 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem { 158 insertAt = 1 159 } 160 swf.Steps = append(swf.Steps, nil) 161 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:]) 162 swf.Steps[insertAt] = activationStep 163 } 164 165 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches) 166 if err != nil { 167 return nil, err 168 } 169 170 swf.Data = &workflowState{ 171 ImageSpec: imageSpec, 172 ImageSpecPath: imageSpecPath, 173 Config: config, 174 ConfigKey: configKey, 175 Image: imageName, 176 CacheReadURLs: cacheURLs, 177 CacheTrustedPublicKeys: cacheKeys, 178 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db), 179 } 180 return swf, nil 181} 182 183func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 184 l := e.l.With("workflow", wid) 185 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem} 186 187 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0}) 188 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0}) 189 190 state, ok := wf.Data.(*workflowState) 191 if !ok || state == nil { 192 return fmt.Errorf("workflow state is not initialized") 193 } 194 195 cid, err := AllocateCID() 196 if err != nil { 197 return err 198 } 199 connCh, unregister, err := e.agent.expect(cid) 200 if err != nil { 201 return err 202 } 203 defer unregister() 204 205 workDirBase := e.cfg.MicroVMPipelines.OverlayDir 206 if workDirBase == "" { 207 workDirBase = os.TempDir() 208 } 209 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*") 210 if err != nil { 211 return fmt.Errorf("create workflow microVM directory: %w", err) 212 } 213 state.WorkDir = workDir 214 215 setupDone := false 216 defer func() { 217 if setupDone { 218 return 219 } 220 if err := e.cleanupState(context.Background(), wid, state); err != nil { 221 l.Error("failed to cleanup failed setup", "error", err) 222 } 223 }() 224 225 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs) 226 if err != nil { 227 return err 228 } 229 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l) 230 if err != nil { 231 return err 232 } 233 state.ReadCache = readCache 234 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, l) 235 if err != nil { 236 return err 237 } 238 state.UploadCache = uploadCache 239 dnsProxy, err := StartDNSProxy(ctx, cid, l) 240 if err != nil { 241 return err 242 } 243 state.DNSProxy = dnsProxy 244 245 port := e.cfg.MicroVMPipelines.AgentPort 246 if port == 0 { 247 port = agentproto.DefaultPort 248 } 249 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port) 250 251 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image) 252 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir) 253 254 var vm VMHandle 255 vm, err = StartVM(ctx, VMConfig{ 256 Image: state.ImageSpec, 257 CID: cid, 258 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM, 259 WorkDir: workDir, 260 Cgroup: e.cgroupLimits(wid, state.ImageSpec), 261 Dev: e.cfg.Server.Dev, 262 }, l) 263 if err != nil { 264 return err 265 } 266 state.VM = vm 267 268 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout) 269 defer cancelAccept() 270 conn, err := waitAgentConn(acceptCtx, connCh) 271 if err != nil { 272 return err 273 } 274 275 agentSession := NewAgentSession(conn, l) 276 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout) 277 defer cancelInit() 278 if err := agentSession.Init(initCtx, &agentv1.Init{ 279 JobId: wid.String(), 280 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...), 281 CacheReadProxyPort: readCache.Port(), 282 CacheUploadProxyPort: uploadCache.Port(), 283 DnsProxyPort: dnsProxy.Port(), 284 }); err != nil { 285 _ = agentSession.Close() 286 return err 287 } 288 state.Agent = agentSession 289 wf.Data = state 290 291 e.registerCleanup(wid, func(ctx context.Context) error { 292 return e.cleanupState(ctx, wid, state) 293 }) 294 setupDone = true 295 296 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), 297 "agent connected; serial log: %s\n", vm.Logs().Serial, 298 ) 299 return nil 300} 301 302func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 303 state, ok := w.Data.(*workflowState) 304 if !ok || state == nil || state.Agent == nil { 305 return fmt.Errorf("microVM workflow is not connected to agent") 306 } 307 308 stderr := wfLogger.DataWriter(idx, "stderr") 309 310 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM) 311 defer cancelWatch() 312 313 step := w.Steps[idx] 314 if s, ok := step.(Step); ok && s.action == activationStepAction { 315 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout")) 316 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 317 } 318 env := []string{ 319 "HOME=/workspace", 320 "LOGNAME=" + guestWorkflowUser, 321 "PATH=/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", 322 "USER=" + guestWorkflowUser, 323 } 324 for k, v := range w.Environment { 325 env = append(env, k+"="+v) 326 } 327 for _, s := range secrets { 328 env = append(env, s.Key+"="+s.Value) 329 } 330 if s, ok := step.(Step); ok { 331 for k, v := range s.environment { 332 env = append(env, k+"="+v) 333 } 334 } 335 336 stdout := wfLogger.DataWriter(idx, "stdout") 337 exitCode, err := state.Agent.Exec(execCtx, AgentExec{ 338 ID: fmt.Sprintf("%s-%d", wid.String(), idx), 339 ExecStart: &agentv1.ExecStart{ 340 Argv: []string{state.ImageSpec.Shell, "-lc", step.Command()}, 341 Env: env, 342 Cwd: guestWorkDir, 343 User: guestWorkflowUser, 344 // timeout not set here, Exec will fill it 345 }, 346 Stdout: stdout, 347 Stderr: stderr, 348 }) 349 if err != nil { 350 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 351 } 352 353 if exitCode != 0 { 354 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode) 355 return engine.ErrWorkflowFailed 356 } 357 return nil 358} 359 360// reads the vm serial logs so we report the tail of that as an error instead of 361// just "guest agent connection lost: EOF" 362func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error { 363 if err == nil { 364 return nil 365 } 366 l := e.l.With("workflow", wid, "step", step.Name()) 367 368 if vmExited != nil && vmExited.Load() { 369 reason := "microVM exited unexpectedly" 370 oom := state.VM != nil && state.VM.OOMKilled() 371 if oom { 372 reason = "microVM killed by OOM (cgroup memory limit exceeded)" 373 } 374 if detail := vmCrashLog(state.VM); detail != "" { 375 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail) 376 l.Debug(reason, "oom", oom, "detail", detail) 377 } else { 378 fmt.Fprintln(stderr, reason) 379 l.Debug(reason, "oom", oom) 380 } 381 return errors.New(reason + "; see workflow logs for serial output") 382 } 383 384 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil { 385 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut)) 386 return engine.ErrTimedOut 387 } 388 389 // the agent connection dropped while qemu stayed up (eg. the guest kernel 390 // OOM-killed the agent or a guest panic), so surface serial logs, those 391 // will be more helpful. 392 if detail := vmCrashLog(state.VM); detail != "" { 393 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail) 394 l.Debug("step failed", "error", err, "detail", detail) 395 } else { 396 l.Debug("step failed", "error", err) 397 } 398 return err 399} 400 401func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error { 402 cfg := step.config 403 if !cfg.Enabled() { 404 return nil 405 } 406 407 configKey := step.configKey 408 if configKey == "" { 409 configKey = state.ConfigKey 410 } 411 412 userConfigJSON, err := json.Marshal(cfg) 413 if err != nil { 414 return fmt.Errorf("encode user config: %w", err) 415 } 416 417 var cachedToplevel string 418 if configKey != "" { 419 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil { 420 return err 421 } else if ok { 422 cachedToplevel = record.Toplevel 423 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel) 424 } 425 } 426 if cachedToplevel == "" { 427 fmt.Fprintf(out, "building NixOS config from user config\n") 428 } 429 430 baseHash, err := BaseConfigHash(state.ImageSpec) 431 if err != nil { 432 return fmt.Errorf("calculate base config hash: %w", err) 433 } 434 435 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{ 436 ConfigKey: configKey, 437 BaseConfigHash: baseHash, 438 UserConfig: string(userConfigJSON), 439 Toplevel: cachedToplevel, 440 }) 441 if err != nil { 442 return err 443 } 444 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel) 445 446 if cachedToplevel != "" || configKey == "" { 447 return nil 448 } 449 if e.cfg.NixCache.UploadURL == "" { 450 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel) 451 return nil 452 } 453 454 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 455 defer cancel() 456 if _, err := state.Agent.Drain(drainCtx); err != nil { 457 return fmt.Errorf("drain config cache uploads before metadata commit: %w", err) 458 } 459 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil { 460 return err 461 } 462 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel) 463 return nil 464} 465 466func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 467 fns := e.drainCleanups(wid) 468 469 var cleanupErr error 470 for i := len(fns) - 1; i >= 0; i-- { 471 if err := fns[i](ctx); err != nil { 472 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 473 cleanupErr = errors.Join(cleanupErr, err) 474 } 475 } 476 return cleanupErr 477} 478 479func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error { 480 return nil 481} 482 483func (e *Engine) WorkflowTimeout() time.Duration { 484 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout) 485 if err != nil { 486 d = 5 * time.Minute 487 } 488 return d + guestTimeoutGrace 489} 490 491func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 492 e.cleanupMu.Lock() 493 defer e.cleanupMu.Unlock() 494 key := wid.String() 495 e.cleanup[key] = append(e.cleanup[key], fn) 496} 497 498func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 499 e.cleanupMu.Lock() 500 defer e.cleanupMu.Unlock() 501 key := wid.String() 502 fns := e.cleanup[key] 503 delete(e.cleanup, key) 504 return fns 505} 506 507func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits { 508 cfg := e.cfg.MicroVMPipelines 509 return CgroupLimits{ 510 Enabled: cfg.EnableCgroups, 511 Parent: e.cgroupParent, 512 Name: "workflow-" + wid.String(), 513 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB, 514 SwapMaxMiB: cfg.CgroupSwapMaxMiB, 515 PidsMax: cfg.CgroupPidsMax, 516 } 517}