Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "os" 11 "path/filepath" 12 "slices" 13 "sync" 14 "sync/atomic" 15 "time" 16 17 "gopkg.in/yaml.v3" 18 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/log" 21 "tangled.org/core/spindle/agentproto" 22 agentv1 "tangled.org/core/spindle/agentproto/gen" 23 "tangled.org/core/spindle/config" 24 "tangled.org/core/spindle/db" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 guestWorkDir = "/workspace/repo" 32 activationStepAction = "activate-config" 33 agentAcceptTimeout = 2 * time.Minute 34 agentHandshakeTimeout = 30 * time.Second 35 cacheDrainTimeout = 5 * time.Minute 36 vmShutdownTimeout = 10 * time.Second 37 guestTimeoutGrace = 5 * time.Second 38) 39 40type cleanupFunc func(context.Context) error 41 42type Engine struct { 43 l *slog.Logger 44 cfg *config.Config 45 db *db.DB 46 agent *agentHub 47 scheduler *engine.ResourceScheduler[Resources] 48 cgroupParent *CgroupParent 49 50 cleanupMu sync.Mutex 51 cleanup map[string][]cleanupFunc 52} 53 54type Step struct { 55 name string 56 kind models.StepKind 57 command string 58 environment map[string]string 59 action string 60 config manifestConfig 61 configKey string 62} 63 64func (s Step) Name() string { return s.name } 65func (s Step) Command() string { return s.command } 66func (s Step) Kind() models.StepKind { return s.kind } 67 68func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) { 69 l := log.FromContext(ctx).With("component", "engine.microvm") 70 port := cfg.MicroVMPipelines.AgentPort 71 if port == 0 { 72 port = agentproto.DefaultPort 73 } 74 agent, err := newAgentHub(port, l) 75 if err != nil { 76 return nil, err 77 } 78 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines) 79 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold) 80 81 var cgroupParent *CgroupParent 82 if cfg.MicroVMPipelines.EnableCgroups { 83 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l) 84 if err != nil { 85 return nil, err 86 } 87 } 88 89 return &Engine{ 90 l: l, 91 cfg: cfg, 92 db: d, 93 agent: agent, 94 scheduler: engine.NewResourceScheduler[Resources](budget, max, agingThreshold), 95 cgroupParent: cgroupParent, 96 cleanup: make(map[string][]cleanupFunc), 97 }, nil 98} 99 100func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 101 swf := &models.Workflow{} 102 var dwf manifestWorkflow 103 104 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil { 105 return nil, err 106 } 107 108 for _, dstep := range dwf.Steps { 109 swf.Steps = append(swf.Steps, Step{ 110 name: dstep.Name, 111 kind: models.StepKindUser, 112 command: dstep.Command, 113 environment: dstep.Environment, 114 }) 115 } 116 swf.Name = twf.Name 117 swf.Environment = dwf.Environment 118 119 if tpl.TriggerMetadata != nil { 120 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" { 121 swf.Steps = append([]models.Step{clone}, swf.Steps...) 122 } 123 } 124 125 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image) 126 if err != nil { 127 return nil, err 128 } 129 configKey := "" 130 config := manifestConfig{ 131 Services: dwf.Services, 132 Virtualisation: dwf.Virtualisation, 133 Dependencies: dwf.Dependencies, 134 Registry: dwf.Registry, 135 } 136 if config.Enabled() { 137 if !imageSpec.SupportsConfigActivation() { 138 return nil, fmt.Errorf( 139 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image", 140 imageName, 141 ) 142 } 143 var err error 144 configKey, err = buildConfigKey(imageSpec, config) 145 if err != nil { 146 return nil, fmt.Errorf("build config key: %w", err) 147 } 148 activationStep := Step{ 149 name: "NixOS config activation", 150 kind: models.StepKindSystem, 151 command: "activate nixos config", 152 action: activationStepAction, 153 config: config, 154 configKey: configKey, 155 } 156 157 insertAt := 0 158 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem { 159 insertAt = 1 160 } 161 swf.Steps = append(swf.Steps, nil) 162 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:]) 163 swf.Steps[insertAt] = activationStep 164 } 165 166 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches) 167 if err != nil { 168 return nil, err 169 } 170 171 swf.Data = &workflowState{ 172 ImageSpec: imageSpec, 173 ImageSpecPath: imageSpecPath, 174 Config: config, 175 ConfigKey: configKey, 176 Image: imageName, 177 CacheReadURLs: cacheURLs, 178 CacheTrustedPublicKeys: cacheKeys, 179 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db), 180 } 181 return swf, nil 182} 183 184func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 185 l := e.l.With("workflow", wid) 186 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem} 187 188 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0}) 189 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0}) 190 191 state, ok := wf.Data.(*workflowState) 192 if !ok || state == nil { 193 return fmt.Errorf("workflow state is not initialized") 194 } 195 196 cid, err := AllocateCID() 197 if err != nil { 198 return err 199 } 200 connCh, unregister, err := e.agent.expect(cid) 201 if err != nil { 202 return err 203 } 204 defer unregister() 205 206 workDirBase := e.cfg.MicroVMPipelines.OverlayDir 207 if workDirBase == "" { 208 workDirBase = os.TempDir() 209 } 210 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*") 211 if err != nil { 212 return fmt.Errorf("create workflow microVM directory: %w", err) 213 } 214 state.WorkDir = workDir 215 216 setupDone := false 217 defer func() { 218 if setupDone { 219 return 220 } 221 if err := e.cleanupState(context.Background(), wid, state); err != nil { 222 l.Error("failed to cleanup failed setup", "error", err) 223 } 224 }() 225 226 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs) 227 if err != nil { 228 return err 229 } 230 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l) 231 if err != nil { 232 return err 233 } 234 state.ReadCache = readCache 235 stagingDir := filepath.Join(workDir, "upload-cache") 236 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l) 237 if err != nil { 238 return err 239 } 240 state.UploadCache = uploadCache 241 dnsProxy, err := StartDNSProxy(ctx, cid, l) 242 if err != nil { 243 return err 244 } 245 state.DNSProxy = dnsProxy 246 247 port := e.cfg.MicroVMPipelines.AgentPort 248 if port == 0 { 249 port = agentproto.DefaultPort 250 } 251 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port) 252 253 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image) 254 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir) 255 256 var vm VMHandle 257 vm, err = StartVM(ctx, VMConfig{ 258 Image: state.ImageSpec, 259 CID: cid, 260 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM, 261 WorkDir: workDir, 262 Cgroup: e.cgroupLimits(wid, state.ImageSpec), 263 Dev: e.cfg.Server.Dev, 264 }, l) 265 if err != nil { 266 return err 267 } 268 state.VM = vm 269 270 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout) 271 defer cancelAccept() 272 conn, err := waitAgentConn(acceptCtx, connCh) 273 if err != nil { 274 return err 275 } 276 277 agentSession := NewAgentSession(conn, l) 278 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout) 279 defer cancelInit() 280 if err := agentSession.Init(initCtx, &agentv1.Init{ 281 JobId: wid.String(), 282 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...), 283 CacheReadProxyPort: readCache.Port(), 284 CacheUploadProxyPort: uploadCache.Port(), 285 DnsProxyPort: dnsProxy.Port(), 286 }); err != nil { 287 _ = agentSession.Close() 288 return err 289 } 290 state.Agent = agentSession 291 wf.Data = state 292 293 e.registerCleanup(wid, func(ctx context.Context) error { 294 return e.cleanupState(ctx, wid, state) 295 }) 296 setupDone = true 297 298 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), 299 "agent connected; serial log: %s\n", vm.Logs().Serial, 300 ) 301 return nil 302} 303 304func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 305 state, ok := w.Data.(*workflowState) 306 if !ok || state == nil || state.Agent == nil { 307 return fmt.Errorf("microVM workflow is not connected to agent") 308 } 309 310 stderr := wfLogger.DataWriter(idx, "stderr") 311 312 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM) 313 defer cancelWatch() 314 315 step := w.Steps[idx] 316 if s, ok := step.(Step); ok && s.action == activationStepAction { 317 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout")) 318 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 319 } 320 env := []string{ 321 "HOME=/workspace", 322 "LOGNAME=" + guestWorkflowUser, 323 "PATH=/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", 324 "USER=" + guestWorkflowUser, 325 } 326 for k, v := range w.Environment { 327 env = append(env, k+"="+v) 328 } 329 for _, s := range secrets { 330 env = append(env, s.Key+"="+s.Value) 331 } 332 if s, ok := step.(Step); ok { 333 for k, v := range s.environment { 334 env = append(env, k+"="+v) 335 } 336 } 337 338 stdout := wfLogger.DataWriter(idx, "stdout") 339 exitCode, err := state.Agent.Exec(execCtx, AgentExec{ 340 ID: fmt.Sprintf("%s-%d", wid.String(), idx), 341 ExecStart: &agentv1.ExecStart{ 342 Argv: []string{state.ImageSpec.Shell, "-lc", step.Command()}, 343 Env: env, 344 Cwd: guestWorkDir, 345 User: guestWorkflowUser, 346 // timeout not set here, Exec will fill it 347 }, 348 Stdout: stdout, 349 Stderr: stderr, 350 }) 351 if err != nil { 352 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err) 353 } 354 355 if exitCode != 0 { 356 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode) 357 return engine.ErrWorkflowFailed 358 } 359 return nil 360} 361 362// reads the vm serial logs so we report the tail of that as an error instead of 363// just "guest agent connection lost: EOF" 364func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error { 365 if err == nil { 366 return nil 367 } 368 l := e.l.With("workflow", wid, "step", step.Name()) 369 370 if vmExited != nil && vmExited.Load() { 371 reason := "microVM exited unexpectedly" 372 oom := state.VM != nil && state.VM.OOMKilled() 373 if oom { 374 reason = "microVM killed by OOM (cgroup memory limit exceeded)" 375 } 376 if detail := vmCrashLog(state.VM); detail != "" { 377 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail) 378 l.Debug(reason, "oom", oom, "detail", detail) 379 } else { 380 fmt.Fprintln(stderr, reason) 381 l.Debug(reason, "oom", oom) 382 } 383 return errors.New(reason + "; see workflow logs for serial output") 384 } 385 386 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil { 387 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut)) 388 return engine.ErrTimedOut 389 } 390 391 // the agent connection dropped while qemu stayed up (eg. the guest kernel 392 // OOM-killed the agent or a guest panic), so surface serial logs, those 393 // will be more helpful. 394 if detail := vmCrashLog(state.VM); detail != "" { 395 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail) 396 l.Debug("step failed", "error", err, "detail", detail) 397 } else { 398 l.Debug("step failed", "error", err) 399 } 400 return err 401} 402 403func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error { 404 cfg := step.config 405 if !cfg.Enabled() { 406 return nil 407 } 408 409 configKey := step.configKey 410 if configKey == "" { 411 configKey = state.ConfigKey 412 } 413 414 userConfigJSON, err := json.Marshal(cfg) 415 if err != nil { 416 return fmt.Errorf("encode user config: %w", err) 417 } 418 419 var cachedToplevel string 420 if configKey != "" { 421 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil { 422 return err 423 } else if ok { 424 cachedToplevel = record.Toplevel 425 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel) 426 } 427 } 428 if cachedToplevel == "" { 429 fmt.Fprintf(out, "building NixOS config from user config\n") 430 } 431 432 baseHash, err := BaseConfigHash(state.ImageSpec) 433 if err != nil { 434 return fmt.Errorf("calculate base config hash: %w", err) 435 } 436 437 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{ 438 ConfigKey: configKey, 439 BaseConfigHash: baseHash, 440 UserConfig: string(userConfigJSON), 441 Toplevel: cachedToplevel, 442 }) 443 if err != nil { 444 return err 445 } 446 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel) 447 448 if cachedToplevel != "" || configKey == "" { 449 return nil 450 } 451 if e.cfg.NixCache.UploadURL == "" { 452 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel) 453 return nil 454 } 455 456 if err := e.drainNixCache(ctx, state); err != nil { 457 // a partial upload would leave the cache unable to realize this toplevel, 458 // so skip the metadata commit rather than poison it with an un-realizable 459 // key. the config still activated fine, so don't fail the workflow. 460 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err) 461 return nil 462 } 463 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil { 464 return err 465 } 466 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel) 467 return nil 468} 469 470func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 471 fns := e.drainCleanups(wid) 472 473 var cleanupErr error 474 for i := len(fns) - 1; i >= 0; i-- { 475 if err := fns[i](ctx); err != nil { 476 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 477 cleanupErr = errors.Join(cleanupErr, err) 478 } 479 } 480 return cleanupErr 481} 482 483func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error { 484 return nil 485} 486 487func (e *Engine) WorkflowTimeout() time.Duration { 488 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout) 489 if err != nil { 490 d = 5 * time.Minute 491 } 492 return d + guestTimeoutGrace 493} 494 495func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 496 e.cleanupMu.Lock() 497 defer e.cleanupMu.Unlock() 498 key := wid.String() 499 e.cleanup[key] = append(e.cleanup[key], fn) 500} 501 502func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 503 e.cleanupMu.Lock() 504 defer e.cleanupMu.Unlock() 505 key := wid.String() 506 fns := e.cleanup[key] 507 delete(e.cleanup, key) 508 return fns 509} 510 511func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits { 512 cfg := e.cfg.MicroVMPipelines 513 return CgroupLimits{ 514 Enabled: cfg.EnableCgroups, 515 Parent: e.cgroupParent, 516 Name: "workflow-" + wid.String(), 517 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB, 518 SwapMaxMiB: cfg.CgroupSwapMaxMiB, 519 PidsMax: cfg.CgroupPidsMax, 520 } 521}