Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "path/filepath"
12 "slices"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "gopkg.in/yaml.v3"
18
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/log"
21 "tangled.org/core/spindle/agentproto"
22 agentv1 "tangled.org/core/spindle/agentproto/gen"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 guestWorkDir = "/workspace/repo"
32 activationStepAction = "activate-config"
33 agentAcceptTimeout = 2 * time.Minute
34 agentHandshakeTimeout = 30 * time.Second
35 cacheDrainTimeout = 5 * time.Minute
36 vmShutdownTimeout = 10 * time.Second
37 guestTimeoutGrace = 5 * time.Second
38)
39
40type cleanupFunc func(context.Context) error
41
42type Engine struct {
43 l *slog.Logger
44 cfg *config.Config
45 db *db.DB
46 agent *agentHub
47 scheduler *engine.ResourceScheduler[Resources]
48 cgroupParent *CgroupParent
49
50 cleanupMu sync.Mutex
51 cleanup map[string][]cleanupFunc
52}
53
54type Step struct {
55 name string
56 kind models.StepKind
57 command string
58 environment map[string]string
59 action string
60 config manifestConfig
61 configKey string
62}
63
64func (s Step) Name() string { return s.name }
65func (s Step) Command() string { return s.command }
66func (s Step) Kind() models.StepKind { return s.kind }
67
68func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) {
69 l := log.FromContext(ctx).With("component", "engine.microvm")
70 port := cfg.MicroVMPipelines.AgentPort
71 if port == 0 {
72 port = agentproto.DefaultPort
73 }
74 agent, err := newAgentHub(port, l)
75 if err != nil {
76 return nil, err
77 }
78 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines)
79 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold)
80
81 var cgroupParent *CgroupParent
82 if cfg.MicroVMPipelines.EnableCgroups {
83 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l)
84 if err != nil {
85 return nil, err
86 }
87 }
88
89 return &Engine{
90 l: l,
91 cfg: cfg,
92 db: d,
93 agent: agent,
94 scheduler: engine.NewResourceScheduler[Resources](budget, max, agingThreshold),
95 cgroupParent: cgroupParent,
96 cleanup: make(map[string][]cleanupFunc),
97 }, nil
98}
99
100func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
101 swf := &models.Workflow{}
102 var dwf manifestWorkflow
103
104 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil {
105 return nil, err
106 }
107
108 for _, dstep := range dwf.Steps {
109 swf.Steps = append(swf.Steps, Step{
110 name: dstep.Name,
111 kind: models.StepKindUser,
112 command: dstep.Command,
113 environment: dstep.Environment,
114 })
115 }
116 swf.Name = twf.Name
117 swf.Environment = dwf.Environment
118
119 if tpl.TriggerMetadata != nil {
120 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" {
121 swf.Steps = append([]models.Step{clone}, swf.Steps...)
122 }
123 }
124
125 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image)
126 if err != nil {
127 return nil, err
128 }
129 configKey := ""
130 config := manifestConfig{
131 Services: dwf.Services,
132 Virtualisation: dwf.Virtualisation,
133 Dependencies: dwf.Dependencies,
134 Registry: dwf.Registry,
135 }
136 if config.Enabled() {
137 if !imageSpec.SupportsConfigActivation() {
138 return nil, fmt.Errorf(
139 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image",
140 imageName,
141 )
142 }
143 var err error
144 configKey, err = buildConfigKey(imageSpec, config)
145 if err != nil {
146 return nil, fmt.Errorf("build config key: %w", err)
147 }
148 activationStep := Step{
149 name: "NixOS config activation",
150 kind: models.StepKindSystem,
151 command: "activate nixos config",
152 action: activationStepAction,
153 config: config,
154 configKey: configKey,
155 }
156
157 insertAt := 0
158 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem {
159 insertAt = 1
160 }
161 swf.Steps = append(swf.Steps, nil)
162 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:])
163 swf.Steps[insertAt] = activationStep
164 }
165
166 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches)
167 if err != nil {
168 return nil, err
169 }
170
171 swf.Data = &workflowState{
172 ImageSpec: imageSpec,
173 ImageSpecPath: imageSpecPath,
174 Config: config,
175 ConfigKey: configKey,
176 Image: imageName,
177 CacheReadURLs: cacheURLs,
178 CacheTrustedPublicKeys: cacheKeys,
179 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db),
180 }
181 return swf, nil
182}
183
184func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
185 l := e.l.With("workflow", wid)
186 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem}
187
188 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0})
189 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0})
190
191 state, ok := wf.Data.(*workflowState)
192 if !ok || state == nil {
193 return fmt.Errorf("workflow state is not initialized")
194 }
195
196 cid, err := AllocateCID()
197 if err != nil {
198 return err
199 }
200 connCh, unregister, err := e.agent.expect(cid)
201 if err != nil {
202 return err
203 }
204 defer unregister()
205
206 workDirBase := e.cfg.MicroVMPipelines.OverlayDir
207 if workDirBase == "" {
208 workDirBase = os.TempDir()
209 }
210 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*")
211 if err != nil {
212 return fmt.Errorf("create workflow microVM directory: %w", err)
213 }
214 state.WorkDir = workDir
215
216 setupDone := false
217 defer func() {
218 if setupDone {
219 return
220 }
221 if err := e.cleanupState(context.Background(), wid, state); err != nil {
222 l.Error("failed to cleanup failed setup", "error", err)
223 }
224 }()
225
226 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs)
227 if err != nil {
228 return err
229 }
230 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l)
231 if err != nil {
232 return err
233 }
234 state.ReadCache = readCache
235 stagingDir := filepath.Join(workDir, "upload-cache")
236 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l)
237 if err != nil {
238 return err
239 }
240 state.UploadCache = uploadCache
241 dnsProxy, err := StartDNSProxy(ctx, cid, l)
242 if err != nil {
243 return err
244 }
245 state.DNSProxy = dnsProxy
246
247 port := e.cfg.MicroVMPipelines.AgentPort
248 if port == 0 {
249 port = agentproto.DefaultPort
250 }
251 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port)
252
253 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image)
254 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir)
255
256 var vm VMHandle
257 vm, err = StartVM(ctx, VMConfig{
258 Image: state.ImageSpec,
259 CID: cid,
260 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM,
261 WorkDir: workDir,
262 Cgroup: e.cgroupLimits(wid, state.ImageSpec),
263 Dev: e.cfg.Server.Dev,
264 }, l)
265 if err != nil {
266 return err
267 }
268 state.VM = vm
269
270 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout)
271 defer cancelAccept()
272 conn, err := waitAgentConn(acceptCtx, connCh)
273 if err != nil {
274 return err
275 }
276
277 agentSession := NewAgentSession(conn, l)
278 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout)
279 defer cancelInit()
280 if err := agentSession.Init(initCtx, &agentv1.Init{
281 JobId: wid.String(),
282 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...),
283 CacheReadProxyPort: readCache.Port(),
284 CacheUploadProxyPort: uploadCache.Port(),
285 DnsProxyPort: dnsProxy.Port(),
286 }); err != nil {
287 _ = agentSession.Close()
288 return err
289 }
290 state.Agent = agentSession
291 wf.Data = state
292
293 e.registerCleanup(wid, func(ctx context.Context) error {
294 return e.cleanupState(ctx, wid, state)
295 })
296 setupDone = true
297
298 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"),
299 "agent connected; serial log: %s\n", vm.Logs().Serial,
300 )
301 return nil
302}
303
304func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
305 state, ok := w.Data.(*workflowState)
306 if !ok || state == nil || state.Agent == nil {
307 return fmt.Errorf("microVM workflow is not connected to agent")
308 }
309
310 stderr := wfLogger.DataWriter(idx, "stderr")
311
312 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM)
313 defer cancelWatch()
314
315 step := w.Steps[idx]
316 if s, ok := step.(Step); ok && s.action == activationStepAction {
317 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout"))
318 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
319 }
320 env := []string{
321 "HOME=/workspace",
322 "LOGNAME=" + guestWorkflowUser,
323 "PATH=/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
324 "USER=" + guestWorkflowUser,
325 }
326 for k, v := range w.Environment {
327 env = append(env, k+"="+v)
328 }
329 for _, s := range secrets {
330 env = append(env, s.Key+"="+s.Value)
331 }
332 if s, ok := step.(Step); ok {
333 for k, v := range s.environment {
334 env = append(env, k+"="+v)
335 }
336 }
337
338 stdout := wfLogger.DataWriter(idx, "stdout")
339 exitCode, err := state.Agent.Exec(execCtx, AgentExec{
340 ID: fmt.Sprintf("%s-%d", wid.String(), idx),
341 ExecStart: &agentv1.ExecStart{
342 Argv: []string{state.ImageSpec.Shell, "-lc", step.Command()},
343 Env: env,
344 Cwd: guestWorkDir,
345 User: guestWorkflowUser,
346 // timeout not set here, Exec will fill it
347 },
348 Stdout: stdout,
349 Stderr: stderr,
350 })
351 if err != nil {
352 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
353 }
354
355 if exitCode != 0 {
356 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode)
357 return engine.ErrWorkflowFailed
358 }
359 return nil
360}
361
362// reads the vm serial logs so we report the tail of that as an error instead of
363// just "guest agent connection lost: EOF"
364func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error {
365 if err == nil {
366 return nil
367 }
368 l := e.l.With("workflow", wid, "step", step.Name())
369
370 if vmExited != nil && vmExited.Load() {
371 reason := "microVM exited unexpectedly"
372 oom := state.VM != nil && state.VM.OOMKilled()
373 if oom {
374 reason = "microVM killed by OOM (cgroup memory limit exceeded)"
375 }
376 if detail := vmCrashLog(state.VM); detail != "" {
377 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail)
378 l.Debug(reason, "oom", oom, "detail", detail)
379 } else {
380 fmt.Fprintln(stderr, reason)
381 l.Debug(reason, "oom", oom)
382 }
383 return errors.New(reason + "; see workflow logs for serial output")
384 }
385
386 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil {
387 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut))
388 return engine.ErrTimedOut
389 }
390
391 // the agent connection dropped while qemu stayed up (eg. the guest kernel
392 // OOM-killed the agent or a guest panic), so surface serial logs, those
393 // will be more helpful.
394 if detail := vmCrashLog(state.VM); detail != "" {
395 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail)
396 l.Debug("step failed", "error", err, "detail", detail)
397 } else {
398 l.Debug("step failed", "error", err)
399 }
400 return err
401}
402
403func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error {
404 cfg := step.config
405 if !cfg.Enabled() {
406 return nil
407 }
408
409 configKey := step.configKey
410 if configKey == "" {
411 configKey = state.ConfigKey
412 }
413
414 userConfigJSON, err := json.Marshal(cfg)
415 if err != nil {
416 return fmt.Errorf("encode user config: %w", err)
417 }
418
419 var cachedToplevel string
420 if configKey != "" {
421 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil {
422 return err
423 } else if ok {
424 cachedToplevel = record.Toplevel
425 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel)
426 }
427 }
428 if cachedToplevel == "" {
429 fmt.Fprintf(out, "building NixOS config from user config\n")
430 }
431
432 baseHash, err := BaseConfigHash(state.ImageSpec)
433 if err != nil {
434 return fmt.Errorf("calculate base config hash: %w", err)
435 }
436
437 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{
438 ConfigKey: configKey,
439 BaseConfigHash: baseHash,
440 UserConfig: string(userConfigJSON),
441 Toplevel: cachedToplevel,
442 })
443 if err != nil {
444 return err
445 }
446 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel)
447
448 if cachedToplevel != "" || configKey == "" {
449 return nil
450 }
451 if e.cfg.NixCache.UploadURL == "" {
452 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel)
453 return nil
454 }
455
456 if err := e.drainNixCache(ctx, state); err != nil {
457 // a partial upload would leave the cache unable to realize this toplevel,
458 // so skip the metadata commit rather than poison it with an un-realizable
459 // key. the config still activated fine, so don't fail the workflow.
460 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err)
461 return nil
462 }
463 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil {
464 return err
465 }
466 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel)
467 return nil
468}
469
470func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
471 fns := e.drainCleanups(wid)
472
473 var cleanupErr error
474 for i := len(fns) - 1; i >= 0; i-- {
475 if err := fns[i](ctx); err != nil {
476 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
477 cleanupErr = errors.Join(cleanupErr, err)
478 }
479 }
480 return cleanupErr
481}
482
483func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error {
484 return nil
485}
486
487func (e *Engine) WorkflowTimeout() time.Duration {
488 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout)
489 if err != nil {
490 d = 5 * time.Minute
491 }
492 return d + guestTimeoutGrace
493}
494
495func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
496 e.cleanupMu.Lock()
497 defer e.cleanupMu.Unlock()
498 key := wid.String()
499 e.cleanup[key] = append(e.cleanup[key], fn)
500}
501
502func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
503 e.cleanupMu.Lock()
504 defer e.cleanupMu.Unlock()
505 key := wid.String()
506 fns := e.cleanup[key]
507 delete(e.cleanup, key)
508 return fns
509}
510
511func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits {
512 cfg := e.cfg.MicroVMPipelines
513 return CgroupLimits{
514 Enabled: cfg.EnableCgroups,
515 Parent: e.cgroupParent,
516 Name: "workflow-" + wid.String(),
517 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB,
518 SwapMaxMiB: cfg.CgroupSwapMaxMiB,
519 PidsMax: cfg.CgroupPidsMax,
520 }
521}