Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "path/filepath"
12 "slices"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "gopkg.in/yaml.v3"
18
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/log"
21 "tangled.org/core/spindle/agentproto"
22 agentv1 "tangled.org/core/spindle/agentproto/gen"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 guestWorkDir = "/workspace/repo"
32 guestBasePATH = "/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
33 guestDevShellEnvPath = "/run/spindle/devshell-env.sh"
34 activationStepAction = "activate-config"
35 agentAcceptTimeout = 2 * time.Minute
36 agentHandshakeTimeout = 30 * time.Second
37 cacheDrainTimeout = 5 * time.Minute
38 vmShutdownTimeout = 10 * time.Second
39 guestTimeoutGrace = 5 * time.Second
40)
41
42type cleanupFunc func(context.Context) error
43
44type Engine struct {
45 l *slog.Logger
46 cfg *config.Config
47 db *db.DB
48 agent *agentHub
49 scheduler *engine.ResourceScheduler[Resources]
50 cgroupParent *CgroupParent
51
52 cleanupMu sync.Mutex
53 cleanup map[string][]cleanupFunc
54}
55
56type Step struct {
57 name string
58 kind models.StepKind
59 command string
60 environment map[string]string
61 action string
62 config manifestConfig
63 configKey string
64}
65
66func (s Step) Name() string { return s.name }
67func (s Step) Command() string { return s.command }
68func (s Step) Kind() models.StepKind { return s.kind }
69
70func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) {
71 l := log.FromContext(ctx).With("component", "engine.microvm")
72 port := cfg.MicroVMPipelines.AgentPort
73 if port == 0 {
74 port = agentproto.DefaultPort
75 }
76 agent, err := newAgentHub(port, l)
77 if err != nil {
78 return nil, err
79 }
80 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines)
81 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold)
82
83 var cgroupParent *CgroupParent
84 if cfg.MicroVMPipelines.EnableCgroups {
85 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l)
86 if err != nil {
87 return nil, err
88 }
89 }
90
91 return &Engine{
92 l: l,
93 cfg: cfg,
94 db: d,
95 agent: agent,
96 scheduler: engine.NewResourceScheduler(budget, max, agingThreshold),
97 cgroupParent: cgroupParent,
98 cleanup: make(map[string][]cleanupFunc),
99 }, nil
100}
101
102func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
103 swf := &models.Workflow{}
104 var dwf manifestWorkflow
105
106 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil {
107 return nil, err
108 }
109
110 for _, dstep := range dwf.Steps {
111 swf.Steps = append(swf.Steps, Step{
112 name: dstep.Name,
113 kind: models.StepKindUser,
114 command: dstep.Command,
115 environment: dstep.Environment,
116 })
117 }
118 swf.Name = twf.Name
119 swf.Environment = dwf.Environment
120
121 if tpl.TriggerMetadata != nil {
122 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" {
123 swf.Steps = append([]models.Step{clone}, swf.Steps...)
124 }
125 }
126
127 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image)
128 if err != nil {
129 return nil, err
130 }
131 configKey := ""
132 config := manifestConfig{
133 Services: dwf.Services,
134 Virtualisation: dwf.Virtualisation,
135 Dependencies: dwf.Dependencies,
136 Registry: dwf.Registry,
137 }
138 if config.Enabled() {
139 if !imageSpec.SupportsConfigActivation() {
140 return nil, fmt.Errorf(
141 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image",
142 imageName,
143 )
144 }
145 var err error
146 configKey, err = buildConfigKey(imageSpec, config)
147 if err != nil {
148 return nil, fmt.Errorf("build config key: %w", err)
149 }
150 activationStep := Step{
151 name: "NixOS config activation",
152 kind: models.StepKindSystem,
153 command: "activate nixos config",
154 action: activationStepAction,
155 config: config,
156 configKey: configKey,
157 }
158
159 insertAt := 0
160 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem {
161 insertAt = 1
162 }
163 swf.Steps = append(swf.Steps, nil)
164 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:])
165 swf.Steps[insertAt] = activationStep
166 }
167
168 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches)
169 if err != nil {
170 return nil, err
171 }
172
173 swf.Data = &workflowState{
174 ImageSpec: imageSpec,
175 ImageSpecPath: imageSpecPath,
176 Config: config,
177 ConfigKey: configKey,
178 Image: imageName,
179 CacheReadURLs: cacheURLs,
180 CacheTrustedPublicKeys: cacheKeys,
181 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db),
182 }
183 return swf, nil
184}
185
186func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
187 l := e.l.With("workflow", wid)
188 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem}
189
190 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0})
191 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0})
192
193 state, ok := wf.Data.(*workflowState)
194 if !ok || state == nil {
195 return fmt.Errorf("workflow state is not initialized")
196 }
197
198 cid, err := AllocateCID()
199 if err != nil {
200 return err
201 }
202 connCh, unregister, err := e.agent.expect(cid)
203 if err != nil {
204 return err
205 }
206 defer unregister()
207
208 workDirBase := e.cfg.MicroVMPipelines.OverlayDir
209 if workDirBase == "" {
210 workDirBase = os.TempDir()
211 }
212 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*")
213 if err != nil {
214 return fmt.Errorf("create workflow microVM directory: %w", err)
215 }
216 state.WorkDir = workDir
217
218 setupDone := false
219 defer func() {
220 if setupDone {
221 return
222 }
223 if err := e.cleanupState(context.Background(), wid, state); err != nil {
224 l.Error("failed to cleanup failed setup", "error", err)
225 }
226 }()
227
228 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs)
229 if err != nil {
230 return err
231 }
232 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l)
233 if err != nil {
234 return err
235 }
236 state.ReadCache = readCache
237 stagingDir := filepath.Join(workDir, "upload-cache")
238 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l)
239 if err != nil {
240 return err
241 }
242 state.UploadCache = uploadCache
243 dnsProxy, err := StartDNSProxy(ctx, cid, l)
244 if err != nil {
245 return err
246 }
247 state.DNSProxy = dnsProxy
248
249 port := e.cfg.MicroVMPipelines.AgentPort
250 if port == 0 {
251 port = agentproto.DefaultPort
252 }
253 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port)
254
255 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image)
256 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir)
257
258 var vm VMHandle
259 vm, err = StartVM(ctx, VMConfig{
260 Image: state.ImageSpec,
261 CID: cid,
262 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM,
263 WorkDir: workDir,
264 Cgroup: e.cgroupLimits(wid, state.ImageSpec),
265 Dev: e.cfg.Server.Dev,
266 }, l)
267 if err != nil {
268 return err
269 }
270 state.VM = vm
271
272 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout)
273 defer cancelAccept()
274 conn, err := waitAgentConn(acceptCtx, connCh)
275 if err != nil {
276 return err
277 }
278
279 agentSession := NewAgentSession(conn, l)
280 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout)
281 defer cancelInit()
282 if err := agentSession.Init(initCtx, &agentv1.Init{
283 JobId: wid.String(),
284 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...),
285 CacheReadProxyPort: readCache.Port(),
286 CacheUploadProxyPort: uploadCache.Port(),
287 DnsProxyPort: dnsProxy.Port(),
288 }); err != nil {
289 _ = agentSession.Close()
290 return err
291 }
292 state.Agent = agentSession
293 wf.Data = state
294
295 e.registerCleanup(wid, func(ctx context.Context) error {
296 return e.cleanupState(ctx, wid, state)
297 })
298 setupDone = true
299
300 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"),
301 "agent connected; serial log: %s\n", vm.Logs().Serial,
302 )
303 return nil
304}
305
306func applyDepsSource(command string) string {
307 return fmt.Sprintf(
308 // check if it exists because not all images have this
309 `if [ -f %s ]; then . %s; export PATH="$PATH:%s"; fi; %s`,
310 guestDevShellEnvPath, guestDevShellEnvPath, guestBasePATH, command,
311 )
312}
313
314func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
315 state, ok := w.Data.(*workflowState)
316 if !ok || state == nil || state.Agent == nil {
317 return fmt.Errorf("microVM workflow is not connected to agent")
318 }
319
320 stderr := wfLogger.DataWriter(idx, "stderr")
321
322 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM)
323 defer cancelWatch()
324
325 step := w.Steps[idx]
326 if s, ok := step.(Step); ok && s.action == activationStepAction {
327 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout"))
328 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
329 }
330 env := []string{
331 "HOME=/workspace",
332 "LOGNAME=" + guestWorkflowUser,
333 "PATH=" + guestBasePATH,
334 "USER=" + guestWorkflowUser,
335 }
336 for k, v := range w.Environment {
337 env = append(env, k+"="+v)
338 }
339 for _, s := range secrets {
340 env = append(env, s.Key+"="+s.Value)
341 }
342 if s, ok := step.(Step); ok {
343 for k, v := range s.environment {
344 env = append(env, k+"="+v)
345 }
346 }
347
348 stdout := wfLogger.DataWriter(idx, "stdout")
349 exitCode, err := state.Agent.Exec(execCtx, AgentExec{
350 ID: fmt.Sprintf("%s-%d", wid.String(), idx),
351 ExecStart: &agentv1.ExecStart{
352 Argv: []string{state.ImageSpec.Shell, "-lc", applyDepsSource(step.Command())},
353 Env: env,
354 Cwd: guestWorkDir,
355 User: guestWorkflowUser,
356 // timeout not set here, Exec will fill it
357 },
358 Stdout: stdout,
359 Stderr: stderr,
360 })
361 if err != nil {
362 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
363 }
364
365 if exitCode != 0 {
366 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode)
367 return engine.ErrWorkflowFailed
368 }
369 return nil
370}
371
372// reads the vm serial logs so we report the tail of that as an error instead of
373// just "guest agent connection lost: EOF"
374func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error {
375 if err == nil {
376 return nil
377 }
378 l := e.l.With("workflow", wid, "step", step.Name())
379
380 if vmExited != nil && vmExited.Load() {
381 reason := "microVM exited unexpectedly"
382 oom := state.VM != nil && state.VM.OOMKilled()
383 if oom {
384 reason = "microVM killed by OOM (cgroup memory limit exceeded)"
385 }
386 if detail := vmCrashLog(state.VM); detail != "" {
387 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail)
388 l.Debug(reason, "oom", oom, "detail", detail)
389 } else {
390 fmt.Fprintln(stderr, reason)
391 l.Debug(reason, "oom", oom)
392 }
393 return errors.New(reason + "; see workflow logs for serial output")
394 }
395
396 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil {
397 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut))
398 return engine.ErrTimedOut
399 }
400
401 // the agent connection dropped while qemu stayed up (eg. the guest kernel
402 // OOM-killed the agent or a guest panic), so surface serial logs, those
403 // will be more helpful.
404 if detail := vmCrashLog(state.VM); detail != "" {
405 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail)
406 l.Debug("step failed", "error", err, "detail", detail)
407 } else {
408 l.Debug("step failed", "error", err)
409 }
410 return err
411}
412
413func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error {
414 cfg := step.config
415 if !cfg.Enabled() {
416 return nil
417 }
418
419 configKey := step.configKey
420 if configKey == "" {
421 configKey = state.ConfigKey
422 }
423
424 userConfigJSON, err := json.Marshal(cfg)
425 if err != nil {
426 return fmt.Errorf("encode user config: %w", err)
427 }
428
429 var cachedToplevel string
430 if configKey != "" {
431 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil {
432 return err
433 } else if ok {
434 cachedToplevel = record.Toplevel
435 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel)
436 }
437 }
438 if cachedToplevel == "" {
439 fmt.Fprintf(out, "building NixOS config from user config\n")
440 }
441
442 baseHash, err := BaseConfigHash(state.ImageSpec)
443 if err != nil {
444 return fmt.Errorf("calculate base config hash: %w", err)
445 }
446
447 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{
448 ConfigKey: configKey,
449 BaseConfigHash: baseHash,
450 UserConfig: string(userConfigJSON),
451 Toplevel: cachedToplevel,
452 })
453 if err != nil {
454 return err
455 }
456 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel)
457
458 if cachedToplevel != "" || configKey == "" {
459 return nil
460 }
461 if e.cfg.NixCache.UploadURL == "" {
462 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel)
463 return nil
464 }
465
466 if err := e.drainNixCache(ctx, state); err != nil {
467 // a partial upload would leave the cache unable to realize this toplevel,
468 // so skip the metadata commit rather than poison it with an un-realizable
469 // key. the config still activated fine, so don't fail the workflow.
470 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err)
471 return nil
472 }
473 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil {
474 return err
475 }
476 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel)
477 return nil
478}
479
480func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
481 fns := e.drainCleanups(wid)
482
483 var cleanupErr error
484 for i := len(fns) - 1; i >= 0; i-- {
485 if err := fns[i](ctx); err != nil {
486 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
487 cleanupErr = errors.Join(cleanupErr, err)
488 }
489 }
490 return cleanupErr
491}
492
493func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error {
494 return nil
495}
496
497func (e *Engine) WorkflowTimeout() time.Duration {
498 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout)
499 if err != nil {
500 d = 5 * time.Minute
501 }
502 return d + guestTimeoutGrace
503}
504
505func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
506 e.cleanupMu.Lock()
507 defer e.cleanupMu.Unlock()
508 key := wid.String()
509 e.cleanup[key] = append(e.cleanup[key], fn)
510}
511
512func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
513 e.cleanupMu.Lock()
514 defer e.cleanupMu.Unlock()
515 key := wid.String()
516 fns := e.cleanup[key]
517 delete(e.cleanup, key)
518 return fns
519}
520
521func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits {
522 cfg := e.cfg.MicroVMPipelines
523 return CgroupLimits{
524 Enabled: cfg.EnableCgroups,
525 Parent: e.cgroupParent,
526 Name: "workflow-" + wid.String(),
527 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB,
528 SwapMaxMiB: cfg.CgroupSwapMaxMiB,
529 PidsMax: cfg.CgroupPidsMax,
530 }
531}