Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "path/filepath"
12 "slices"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "gopkg.in/yaml.v3"
18
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/log"
21 "tangled.org/core/spindle/agentproto"
22 agentv1 "tangled.org/core/spindle/agentproto/gen"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 guestWorkDir = "/workspace/repo"
32 guestBasePATH = "/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
33 guestDevShellEnvPath = "/run/spindle/devshell-env.sh"
34 activationStepAction = "activate-config"
35 agentAcceptTimeout = 2 * time.Minute
36 agentHandshakeTimeout = 30 * time.Second
37 cacheDrainTimeout = 5 * time.Minute
38 vmShutdownTimeout = 10 * time.Second
39 guestTimeoutGrace = 5 * time.Second
40)
41
42type cleanupFunc func(context.Context) error
43
44type Engine struct {
45 l *slog.Logger
46 cfg *config.Config
47 db *db.DB
48 agent *agentHub
49 scheduler *engine.ResourceScheduler[Resources]
50 cgroupParent *CgroupParent
51
52 cleanupMu sync.Mutex
53 cleanup map[string][]cleanupFunc
54}
55
56type Step struct {
57 name string
58 kind models.StepKind
59 command string
60 environment map[string]string
61 action string
62 config manifestConfig
63 configKey string
64}
65
66func (s Step) Name() string { return s.name }
67func (s Step) Command() string { return s.command }
68func (s Step) Kind() models.StepKind { return s.kind }
69
70func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) {
71 l := log.FromContext(ctx).With("component", "engine.microvm")
72 port := cfg.MicroVMPipelines.AgentPort
73 if port == 0 {
74 port = agentproto.DefaultPort
75 }
76 agent, err := newAgentHub(port, l)
77 if err != nil {
78 return nil, err
79 }
80 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines)
81 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold)
82
83 var cgroupParent *CgroupParent
84 if cfg.MicroVMPipelines.EnableCgroups {
85 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l)
86 if err != nil {
87 return nil, err
88 }
89 }
90
91 return &Engine{
92 l: l,
93 cfg: cfg,
94 db: d,
95 agent: agent,
96 scheduler: engine.NewResourceScheduler(budget, max, agingThreshold),
97 cgroupParent: cgroupParent,
98 cleanup: make(map[string][]cleanupFunc),
99 }, nil
100}
101
102func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
103 swf := &models.Workflow{}
104 var dwf manifestWorkflow
105
106 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil {
107 return nil, err
108 }
109
110 for _, dstep := range dwf.Steps {
111 swf.Steps = append(swf.Steps, Step{
112 name: dstep.Name,
113 kind: models.StepKindUser,
114 command: dstep.Command,
115 environment: dstep.Environment,
116 })
117 }
118 swf.Name = twf.Name
119 swf.Environment = dwf.Environment
120
121 if tpl.TriggerMetadata != nil {
122 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" {
123 swf.Steps = append([]models.Step{clone}, swf.Steps...)
124 }
125 }
126
127 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image)
128 if err != nil {
129 return nil, err
130 }
131 configKey := ""
132 config := manifestConfig{
133 Services: dwf.Services,
134 Virtualisation: dwf.Virtualisation,
135 Dependencies: dwf.Dependencies,
136 Registry: dwf.Registry,
137 }
138 if config.Enabled() {
139 if !imageSpec.SupportsConfigActivation() {
140 return nil, fmt.Errorf(
141 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image",
142 imageName,
143 )
144 }
145 var err error
146 configKey, err = buildConfigKey(imageSpec, config)
147 if err != nil {
148 return nil, fmt.Errorf("build config key: %w", err)
149 }
150 activationStep := Step{
151 name: "NixOS config activation",
152 kind: models.StepKindSystem,
153 command: "activate nixos config",
154 action: activationStepAction,
155 config: config,
156 configKey: configKey,
157 }
158
159 insertAt := 0
160 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem {
161 insertAt = 1
162 }
163 swf.Steps = append(swf.Steps, nil)
164 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:])
165 swf.Steps[insertAt] = activationStep
166 }
167
168 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches)
169 if err != nil {
170 return nil, err
171 }
172
173 swf.Data = &workflowState{
174 ImageSpec: imageSpec,
175 ImageSpecPath: imageSpecPath,
176 Config: config,
177 ConfigKey: configKey,
178 Image: imageName,
179 CacheReadURLs: cacheURLs,
180 CacheTrustedPublicKeys: cacheKeys,
181 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db),
182 }
183 return swf, nil
184}
185
186func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
187 l := e.l.With("workflow", wid)
188 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem}
189
190 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0})
191 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0})
192
193 state, ok := wf.Data.(*workflowState)
194 if !ok || state == nil {
195 return fmt.Errorf("workflow state is not initialized")
196 }
197
198 cid, err := AllocateCID()
199 if err != nil {
200 return err
201 }
202 connCh, unregister, err := e.agent.expect(cid)
203 if err != nil {
204 return err
205 }
206 defer unregister()
207
208 workDirBase := e.cfg.MicroVMPipelines.OverlayDir
209 if workDirBase == "" {
210 workDirBase = os.TempDir()
211 }
212 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*")
213 if err != nil {
214 return fmt.Errorf("create workflow microVM directory: %w", err)
215 }
216 state.WorkDir = workDir
217
218 setupDone := false
219 defer func() {
220 if setupDone {
221 return
222 }
223 if detail := vmCrashLog(state.VM); detail != "" {
224 l.Error("microVM setup failed", "detail", detail)
225 }
226 if err := e.cleanupState(context.Background(), wid, state); err != nil {
227 l.Error("failed to cleanup failed setup", "error", err)
228 }
229 }()
230
231 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs)
232 if err != nil {
233 return err
234 }
235 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l)
236 if err != nil {
237 return err
238 }
239 state.ReadCache = readCache
240 stagingDir := filepath.Join(workDir, "upload-cache")
241 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l)
242 if err != nil {
243 return err
244 }
245 state.UploadCache = uploadCache
246 dnsProxy, err := StartDNSProxy(ctx, cid, l)
247 if err != nil {
248 return err
249 }
250 state.DNSProxy = dnsProxy
251
252 port := e.cfg.MicroVMPipelines.AgentPort
253 if port == 0 {
254 port = agentproto.DefaultPort
255 }
256 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port)
257
258 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image)
259 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir)
260
261 var vm VMHandle
262 vm, err = StartVM(ctx, VMConfig{
263 Image: state.ImageSpec,
264 CID: cid,
265 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM,
266 WorkDir: workDir,
267 Cgroup: e.cgroupLimits(wid, state.ImageSpec),
268 Dev: e.cfg.Server.Dev,
269 }, l)
270 if err != nil {
271 return err
272 }
273 state.VM = vm
274
275 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout)
276 defer cancelAccept()
277 conn, err := waitAgentConn(acceptCtx, connCh)
278 if err != nil {
279 return err
280 }
281
282 agentSession := NewAgentSession(conn, l)
283 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout)
284 defer cancelInit()
285 if err := agentSession.Init(initCtx, &agentv1.Init{
286 JobId: wid.String(),
287 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...),
288 CacheReadProxyPort: readCache.Port(),
289 CacheUploadProxyPort: uploadCache.Port(),
290 DnsProxyPort: dnsProxy.Port(),
291 }); err != nil {
292 _ = agentSession.Close()
293 return err
294 }
295 state.Agent = agentSession
296 wf.Data = state
297
298 e.registerCleanup(wid, func(ctx context.Context) error {
299 return e.cleanupState(ctx, wid, state)
300 })
301 setupDone = true
302
303 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"),
304 "agent connected; serial log: %s\n", vm.Logs().Serial,
305 )
306 return nil
307}
308
309func applyDepsSource(command string) string {
310 return fmt.Sprintf(
311 // check if it exists because not all images have this
312 `if [ -f %s ]; then . %s; export PATH="$PATH:%s"; fi; %s`,
313 guestDevShellEnvPath, guestDevShellEnvPath, guestBasePATH, command,
314 )
315}
316
317func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
318 state, ok := w.Data.(*workflowState)
319 if !ok || state == nil || state.Agent == nil {
320 return fmt.Errorf("microVM workflow is not connected to agent")
321 }
322
323 stderr := wfLogger.DataWriter(idx, "stderr")
324
325 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM)
326 defer cancelWatch()
327
328 step := w.Steps[idx]
329 if s, ok := step.(Step); ok && s.action == activationStepAction {
330 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout"))
331 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
332 }
333 env := []string{
334 "HOME=/workspace",
335 "LOGNAME=" + guestWorkflowUser,
336 "PATH=" + guestBasePATH,
337 "USER=" + guestWorkflowUser,
338 }
339 for k, v := range w.Environment {
340 env = append(env, k+"="+v)
341 }
342 for _, s := range secrets {
343 env = append(env, s.Key+"="+s.Value)
344 }
345 if s, ok := step.(Step); ok {
346 for k, v := range s.environment {
347 env = append(env, k+"="+v)
348 }
349 }
350
351 stdout := wfLogger.DataWriter(idx, "stdout")
352 exitCode, err := state.Agent.Exec(execCtx, AgentExec{
353 ID: fmt.Sprintf("%s-%d", wid.String(), idx),
354 ExecStart: &agentv1.ExecStart{
355 Argv: []string{state.ImageSpec.Shell, "-lc", applyDepsSource(step.Command())},
356 Env: env,
357 Cwd: guestWorkDir,
358 User: guestWorkflowUser,
359 // timeout not set here, Exec will fill it
360 },
361 Stdout: stdout,
362 Stderr: stderr,
363 })
364 if err != nil {
365 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
366 }
367
368 if exitCode != 0 {
369 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode)
370 return engine.ErrWorkflowFailed
371 }
372 return nil
373}
374
375// reads the vm serial logs so we report the tail of that as an error instead of
376// just "guest agent connection lost: EOF"
377func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error {
378 if err == nil {
379 return nil
380 }
381 l := e.l.With("workflow", wid, "step", step.Name())
382
383 if vmExited != nil && vmExited.Load() {
384 reason := "microVM exited unexpectedly"
385 oom := state.VM != nil && state.VM.OOMKilled()
386 if oom {
387 reason = "microVM killed by OOM (cgroup memory limit exceeded)"
388 }
389 if detail := vmCrashLog(state.VM); detail != "" {
390 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail)
391 l.Error(reason, "oom", oom, "detail", detail)
392 } else {
393 fmt.Fprintln(stderr, reason)
394 l.Error(reason, "oom", oom)
395 }
396 return errors.New(reason + "; see workflow logs for serial output")
397 }
398
399 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil {
400 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut))
401 return engine.ErrTimedOut
402 }
403
404 // the agent connection dropped while qemu stayed up (eg. the guest kernel
405 // OOM-killed the agent or a guest panic), so surface serial logs, those
406 // will be more helpful.
407 if detail := vmCrashLog(state.VM); detail != "" {
408 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail)
409 l.Error("step failed", "error", err, "detail", detail)
410 } else {
411 l.Error("step failed", "error", err)
412 }
413 return err
414}
415
416func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error {
417 cfg := step.config
418 if !cfg.Enabled() {
419 return nil
420 }
421
422 configKey := step.configKey
423 if configKey == "" {
424 configKey = state.ConfigKey
425 }
426
427 userConfigJSON, err := json.Marshal(cfg)
428 if err != nil {
429 return fmt.Errorf("encode user config: %w", err)
430 }
431
432 var cachedToplevel string
433 if configKey != "" {
434 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil {
435 return err
436 } else if ok {
437 cachedToplevel = record.Toplevel
438 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel)
439 }
440 }
441 if cachedToplevel == "" {
442 fmt.Fprintf(out, "building NixOS config from user config\n")
443 }
444
445 baseHash, err := BaseConfigHash(state.ImageSpec)
446 if err != nil {
447 return fmt.Errorf("calculate base config hash: %w", err)
448 }
449
450 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{
451 ConfigKey: configKey,
452 BaseConfigHash: baseHash,
453 UserConfig: string(userConfigJSON),
454 Toplevel: cachedToplevel,
455 }, out)
456 if err != nil {
457 return err
458 }
459 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel)
460
461 if cachedToplevel != "" || configKey == "" {
462 return nil
463 }
464 if e.cfg.NixCache.UploadURL == "" {
465 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel)
466 return nil
467 }
468
469 if err := e.drainNixCache(ctx, state); err != nil {
470 // a partial upload would leave the cache unable to realize this toplevel,
471 // so skip the metadata commit rather than poison it with an un-realizable
472 // key. the config still activated fine, so don't fail the workflow.
473 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err)
474 return nil
475 }
476 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil {
477 return err
478 }
479 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel)
480 return nil
481}
482
483func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
484 fns := e.drainCleanups(wid)
485
486 var cleanupErr error
487 for i := len(fns) - 1; i >= 0; i-- {
488 if err := fns[i](ctx); err != nil {
489 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
490 cleanupErr = errors.Join(cleanupErr, err)
491 }
492 }
493 return cleanupErr
494}
495
496func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error {
497 return nil
498}
499
500func (e *Engine) WorkflowTimeout() time.Duration {
501 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout)
502 if err != nil {
503 d = 5 * time.Minute
504 }
505 return d + guestTimeoutGrace
506}
507
508func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
509 e.cleanupMu.Lock()
510 defer e.cleanupMu.Unlock()
511 key := wid.String()
512 e.cleanup[key] = append(e.cleanup[key], fn)
513}
514
515func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
516 e.cleanupMu.Lock()
517 defer e.cleanupMu.Unlock()
518 key := wid.String()
519 fns := e.cleanup[key]
520 delete(e.cleanup, key)
521 return fns
522}
523
524func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits {
525 cfg := e.cfg.MicroVMPipelines
526 return CgroupLimits{
527 Enabled: cfg.EnableCgroups,
528 Parent: e.cgroupParent,
529 Name: "workflow-" + wid.String(),
530 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB,
531 SwapMaxMiB: cfg.CgroupSwapMaxMiB,
532 PidsMax: cfg.CgroupPidsMax,
533 }
534}