Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "slices"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 "gopkg.in/yaml.v3"
17
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/log"
20 "tangled.org/core/spindle/agentproto"
21 agentv1 "tangled.org/core/spindle/agentproto/gen"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/models"
26 "tangled.org/core/spindle/secrets"
27)
28
29const (
30 guestWorkDir = "/workspace/repo"
31 activationStepAction = "activate-config"
32 agentAcceptTimeout = 2 * time.Minute
33 agentHandshakeTimeout = 30 * time.Second
34 cacheDrainTimeout = 5 * time.Minute
35 vmShutdownTimeout = 10 * time.Second
36 guestTimeoutGrace = 5 * time.Second
37)
38
39type cleanupFunc func(context.Context) error
40
41type Engine struct {
42 l *slog.Logger
43 cfg *config.Config
44 db *db.DB
45 agent *agentHub
46 scheduler *engine.ResourceScheduler[Resources]
47 cgroupParent *CgroupParent
48
49 cleanupMu sync.Mutex
50 cleanup map[string][]cleanupFunc
51}
52
53type Step struct {
54 name string
55 kind models.StepKind
56 command string
57 environment map[string]string
58 action string
59 config manifestConfig
60 configKey string
61}
62
63func (s Step) Name() string { return s.name }
64func (s Step) Command() string { return s.command }
65func (s Step) Kind() models.StepKind { return s.kind }
66
67func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) {
68 l := log.FromContext(ctx).With("component", "engine.microvm")
69 port := cfg.MicroVMPipelines.AgentPort
70 if port == 0 {
71 port = agentproto.DefaultPort
72 }
73 agent, err := newAgentHub(port, l)
74 if err != nil {
75 return nil, err
76 }
77 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines)
78 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold)
79
80 var cgroupParent *CgroupParent
81 if cfg.MicroVMPipelines.EnableCgroups {
82 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l)
83 if err != nil {
84 return nil, err
85 }
86 }
87
88 return &Engine{
89 l: l,
90 cfg: cfg,
91 db: d,
92 agent: agent,
93 scheduler: engine.NewResourceScheduler[Resources](budget, max, agingThreshold),
94 cgroupParent: cgroupParent,
95 cleanup: make(map[string][]cleanupFunc),
96 }, nil
97}
98
99func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
100 swf := &models.Workflow{}
101 var dwf manifestWorkflow
102
103 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil {
104 return nil, err
105 }
106
107 for _, dstep := range dwf.Steps {
108 swf.Steps = append(swf.Steps, Step{
109 name: dstep.Name,
110 kind: models.StepKindUser,
111 command: dstep.Command,
112 environment: dstep.Environment,
113 })
114 }
115 swf.Name = twf.Name
116 swf.Environment = dwf.Environment
117
118 if tpl.TriggerMetadata != nil {
119 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" {
120 swf.Steps = append([]models.Step{clone}, swf.Steps...)
121 }
122 }
123
124 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image)
125 if err != nil {
126 return nil, err
127 }
128 configKey := ""
129 config := manifestConfig{
130 Services: dwf.Services,
131 Virtualisation: dwf.Virtualisation,
132 Dependencies: dwf.Dependencies,
133 Registry: dwf.Registry,
134 }
135 if config.Enabled() {
136 if !imageSpec.SupportsConfigActivation() {
137 return nil, fmt.Errorf(
138 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image",
139 imageName,
140 )
141 }
142 var err error
143 configKey, err = buildConfigKey(imageSpec, config)
144 if err != nil {
145 return nil, fmt.Errorf("build config key: %w", err)
146 }
147 activationStep := Step{
148 name: "NixOS config activation",
149 kind: models.StepKindSystem,
150 command: "activate nixos config",
151 action: activationStepAction,
152 config: config,
153 configKey: configKey,
154 }
155
156 insertAt := 0
157 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem {
158 insertAt = 1
159 }
160 swf.Steps = append(swf.Steps, nil)
161 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:])
162 swf.Steps[insertAt] = activationStep
163 }
164
165 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches)
166 if err != nil {
167 return nil, err
168 }
169
170 swf.Data = &workflowState{
171 ImageSpec: imageSpec,
172 ImageSpecPath: imageSpecPath,
173 Config: config,
174 ConfigKey: configKey,
175 Image: imageName,
176 CacheReadURLs: cacheURLs,
177 CacheTrustedPublicKeys: cacheKeys,
178 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db),
179 }
180 return swf, nil
181}
182
183func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
184 l := e.l.With("workflow", wid)
185 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem}
186
187 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0})
188 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0})
189
190 state, ok := wf.Data.(*workflowState)
191 if !ok || state == nil {
192 return fmt.Errorf("workflow state is not initialized")
193 }
194
195 cid, err := AllocateCID()
196 if err != nil {
197 return err
198 }
199 connCh, unregister, err := e.agent.expect(cid)
200 if err != nil {
201 return err
202 }
203 defer unregister()
204
205 workDirBase := e.cfg.MicroVMPipelines.OverlayDir
206 if workDirBase == "" {
207 workDirBase = os.TempDir()
208 }
209 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*")
210 if err != nil {
211 return fmt.Errorf("create workflow microVM directory: %w", err)
212 }
213 state.WorkDir = workDir
214
215 setupDone := false
216 defer func() {
217 if setupDone {
218 return
219 }
220 if err := e.cleanupState(context.Background(), wid, state); err != nil {
221 l.Error("failed to cleanup failed setup", "error", err)
222 }
223 }()
224
225 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs)
226 if err != nil {
227 return err
228 }
229 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l)
230 if err != nil {
231 return err
232 }
233 state.ReadCache = readCache
234 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, l)
235 if err != nil {
236 return err
237 }
238 state.UploadCache = uploadCache
239 dnsProxy, err := StartDNSProxy(ctx, cid, l)
240 if err != nil {
241 return err
242 }
243 state.DNSProxy = dnsProxy
244
245 port := e.cfg.MicroVMPipelines.AgentPort
246 if port == 0 {
247 port = agentproto.DefaultPort
248 }
249 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port)
250
251 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image)
252 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir)
253
254 var vm VMHandle
255 vm, err = StartVM(ctx, VMConfig{
256 Image: state.ImageSpec,
257 CID: cid,
258 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM,
259 WorkDir: workDir,
260 Cgroup: e.cgroupLimits(wid, state.ImageSpec),
261 Dev: e.cfg.Server.Dev,
262 }, l)
263 if err != nil {
264 return err
265 }
266 state.VM = vm
267
268 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout)
269 defer cancelAccept()
270 conn, err := waitAgentConn(acceptCtx, connCh)
271 if err != nil {
272 return err
273 }
274
275 agentSession := NewAgentSession(conn, l)
276 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout)
277 defer cancelInit()
278 if err := agentSession.Init(initCtx, &agentv1.Init{
279 JobId: wid.String(),
280 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...),
281 CacheReadProxyPort: readCache.Port(),
282 CacheUploadProxyPort: uploadCache.Port(),
283 DnsProxyPort: dnsProxy.Port(),
284 }); err != nil {
285 _ = agentSession.Close()
286 return err
287 }
288 state.Agent = agentSession
289 wf.Data = state
290
291 e.registerCleanup(wid, func(ctx context.Context) error {
292 return e.cleanupState(ctx, wid, state)
293 })
294 setupDone = true
295
296 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"),
297 "agent connected; serial log: %s\n", vm.Logs().Serial,
298 )
299 return nil
300}
301
302func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
303 state, ok := w.Data.(*workflowState)
304 if !ok || state == nil || state.Agent == nil {
305 return fmt.Errorf("microVM workflow is not connected to agent")
306 }
307
308 stderr := wfLogger.DataWriter(idx, "stderr")
309
310 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM)
311 defer cancelWatch()
312
313 step := w.Steps[idx]
314 if s, ok := step.(Step); ok && s.action == activationStepAction {
315 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout"))
316 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
317 }
318 env := []string{
319 "HOME=/workspace",
320 "LOGNAME=" + guestWorkflowUser,
321 "PATH=/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
322 "USER=" + guestWorkflowUser,
323 }
324 for k, v := range w.Environment {
325 env = append(env, k+"="+v)
326 }
327 for _, s := range secrets {
328 env = append(env, s.Key+"="+s.Value)
329 }
330 if s, ok := step.(Step); ok {
331 for k, v := range s.environment {
332 env = append(env, k+"="+v)
333 }
334 }
335
336 stdout := wfLogger.DataWriter(idx, "stdout")
337 exitCode, err := state.Agent.Exec(execCtx, AgentExec{
338 ID: fmt.Sprintf("%s-%d", wid.String(), idx),
339 ExecStart: &agentv1.ExecStart{
340 Argv: []string{state.ImageSpec.Shell, "-lc", step.Command()},
341 Env: env,
342 Cwd: guestWorkDir,
343 User: guestWorkflowUser,
344 // timeout not set here, Exec will fill it
345 },
346 Stdout: stdout,
347 Stderr: stderr,
348 })
349 if err != nil {
350 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
351 }
352
353 if exitCode != 0 {
354 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode)
355 return engine.ErrWorkflowFailed
356 }
357 return nil
358}
359
360// reads the vm serial logs so we report the tail of that as an error instead of
361// just "guest agent connection lost: EOF"
362func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error {
363 if err == nil {
364 return nil
365 }
366 l := e.l.With("workflow", wid, "step", step.Name())
367
368 if vmExited != nil && vmExited.Load() {
369 reason := "microVM exited unexpectedly"
370 oom := state.VM != nil && state.VM.OOMKilled()
371 if oom {
372 reason = "microVM killed by OOM (cgroup memory limit exceeded)"
373 }
374 if detail := vmCrashLog(state.VM); detail != "" {
375 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail)
376 l.Debug(reason, "oom", oom, "detail", detail)
377 } else {
378 fmt.Fprintln(stderr, reason)
379 l.Debug(reason, "oom", oom)
380 }
381 return errors.New(reason + "; see workflow logs for serial output")
382 }
383
384 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil {
385 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut))
386 return engine.ErrTimedOut
387 }
388
389 // the agent connection dropped while qemu stayed up (eg. the guest kernel
390 // OOM-killed the agent or a guest panic), so surface serial logs, those
391 // will be more helpful.
392 if detail := vmCrashLog(state.VM); detail != "" {
393 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail)
394 l.Debug("step failed", "error", err, "detail", detail)
395 } else {
396 l.Debug("step failed", "error", err)
397 }
398 return err
399}
400
401func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error {
402 cfg := step.config
403 if !cfg.Enabled() {
404 return nil
405 }
406
407 configKey := step.configKey
408 if configKey == "" {
409 configKey = state.ConfigKey
410 }
411
412 userConfigJSON, err := json.Marshal(cfg)
413 if err != nil {
414 return fmt.Errorf("encode user config: %w", err)
415 }
416
417 var cachedToplevel string
418 if configKey != "" {
419 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil {
420 return err
421 } else if ok {
422 cachedToplevel = record.Toplevel
423 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel)
424 }
425 }
426 if cachedToplevel == "" {
427 fmt.Fprintf(out, "building NixOS config from user config\n")
428 }
429
430 baseHash, err := BaseConfigHash(state.ImageSpec)
431 if err != nil {
432 return fmt.Errorf("calculate base config hash: %w", err)
433 }
434
435 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{
436 ConfigKey: configKey,
437 BaseConfigHash: baseHash,
438 UserConfig: string(userConfigJSON),
439 Toplevel: cachedToplevel,
440 })
441 if err != nil {
442 return err
443 }
444 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel)
445
446 if cachedToplevel != "" || configKey == "" {
447 return nil
448 }
449 if e.cfg.NixCache.UploadURL == "" {
450 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel)
451 return nil
452 }
453
454 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout)
455 defer cancel()
456 if _, err := state.Agent.Drain(drainCtx); err != nil {
457 return fmt.Errorf("drain config cache uploads before metadata commit: %w", err)
458 }
459 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil {
460 return err
461 }
462 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel)
463 return nil
464}
465
466func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
467 fns := e.drainCleanups(wid)
468
469 var cleanupErr error
470 for i := len(fns) - 1; i >= 0; i-- {
471 if err := fns[i](ctx); err != nil {
472 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
473 cleanupErr = errors.Join(cleanupErr, err)
474 }
475 }
476 return cleanupErr
477}
478
479func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error {
480 return nil
481}
482
483func (e *Engine) WorkflowTimeout() time.Duration {
484 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout)
485 if err != nil {
486 d = 5 * time.Minute
487 }
488 return d + guestTimeoutGrace
489}
490
491func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
492 e.cleanupMu.Lock()
493 defer e.cleanupMu.Unlock()
494 key := wid.String()
495 e.cleanup[key] = append(e.cleanup[key], fn)
496}
497
498func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
499 e.cleanupMu.Lock()
500 defer e.cleanupMu.Unlock()
501 key := wid.String()
502 fns := e.cleanup[key]
503 delete(e.cleanup, key)
504 return fns
505}
506
507func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits {
508 cfg := e.cfg.MicroVMPipelines
509 return CgroupLimits{
510 Enabled: cfg.EnableCgroups,
511 Parent: e.cgroupParent,
512 Name: "workflow-" + wid.String(),
513 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB,
514 SwapMaxMiB: cfg.CgroupSwapMaxMiB,
515 PidsMax: cfg.CgroupPidsMax,
516 }
517}