Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12 "path/filepath"
13 "slices"
14 "sync"
15 "sync/atomic"
16 "time"
17
18 "gopkg.in/yaml.v3"
19
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/log"
22 "tangled.org/core/spindle/agentproto"
23 agentv1 "tangled.org/core/spindle/agentproto/gen"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/db"
26 "tangled.org/core/spindle/engine"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/secrets"
29)
30
31const (
32 guestWorkDir = "/workspace/repo"
33 guestBasePATH = "/run/current-system/sw/bin:/nix/var/nix/profiles/default/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
34 guestDevShellEnvPath = "/run/spindle/devshell-env.sh"
35 activationStepAction = "activate-config"
36 agentAcceptTimeout = 2 * time.Minute
37 agentHandshakeTimeout = 30 * time.Second
38 cacheDrainTimeout = 5 * time.Minute
39 vmShutdownTimeout = 10 * time.Second
40 guestTimeoutGrace = 5 * time.Second
41)
42
43type cleanupFunc func(context.Context) error
44
45type Engine struct {
46 l *slog.Logger
47 cfg *config.Config
48 db *db.DB
49 agent *agentHub
50 scheduler *engine.ResourceScheduler[Resources]
51 cgroupParent *CgroupParent
52
53 cleanupMu sync.Mutex
54 cleanup map[string][]cleanupFunc
55}
56
57type Step struct {
58 name string
59 kind models.StepKind
60 command string
61 environment map[string]string
62 action string
63 config manifestConfig
64 configKey string
65}
66
67func (s Step) Name() string { return s.name }
68func (s Step) Command() string { return s.command }
69func (s Step) Kind() models.StepKind { return s.kind }
70
71func New(ctx context.Context, cfg *config.Config, d *db.DB) (*Engine, error) {
72 l := log.FromContext(ctx).With("component", "engine.microvm")
73 port := cfg.MicroVMPipelines.AgentPort
74 if port == 0 {
75 port = agentproto.DefaultPort
76 }
77 agent, err := newAgentHub(port, l)
78 if err != nil {
79 return nil, err
80 }
81 budget, max, agingThreshold := newVMBudgetConfig(cfg.MicroVMPipelines)
82 l.Info("initialized microVM workflow budget", "budget", budget.String(), "maxWorkflow", max.String(), "agingThreshold", agingThreshold)
83
84 var cgroupParent *CgroupParent
85 if cfg.MicroVMPipelines.EnableCgroups {
86 cgroupParent, err = initCgroupParent(cfg.MicroVMPipelines.CgroupParent, cfg.MicroVMPipelines.CgroupSupervisorMemoryMinMiB, l)
87 if err != nil {
88 return nil, err
89 }
90 }
91
92 return &Engine{
93 l: l,
94 cfg: cfg,
95 db: d,
96 agent: agent,
97 scheduler: engine.NewResourceScheduler(budget, max, agingThreshold),
98 cgroupParent: cgroupParent,
99 cleanup: make(map[string][]cleanupFunc),
100 }, nil
101}
102
103func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
104 swf := &models.Workflow{}
105 var dwf manifestWorkflow
106
107 if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil {
108 return nil, err
109 }
110
111 for _, dstep := range dwf.Steps {
112 swf.Steps = append(swf.Steps, Step{
113 name: dstep.Name,
114 kind: models.StepKindUser,
115 command: dstep.Command,
116 environment: dstep.Environment,
117 })
118 }
119 swf.Name = twf.Name
120 swf.Environment = dwf.Environment
121
122 if tpl.TriggerMetadata != nil {
123 if clone := models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev); clone.Command() != "" {
124 swf.Steps = append([]models.Step{clone}, swf.Steps...)
125 }
126 }
127
128 imageSpec, imageSpecPath, imageName, err := e.resolveImage(dwf.Image)
129 if err != nil {
130 return nil, err
131 }
132 configKey := ""
133 config := manifestConfig{
134 Services: dwf.Services,
135 Virtualisation: dwf.Virtualisation,
136 Dependencies: dwf.Dependencies,
137 Registry: dwf.Registry,
138 }
139 if config.Enabled() {
140 if !imageSpec.SupportsConfigActivation() {
141 return nil, fmt.Errorf(
142 "microVM image %q is not a NixOS image: services, virtualisation, dependencies and registry workflow options require a NixOS image",
143 imageName,
144 )
145 }
146 var err error
147 configKey, err = buildConfigKey(imageSpec, config)
148 if err != nil {
149 return nil, fmt.Errorf("build config key: %w", err)
150 }
151 activationStep := Step{
152 name: "NixOS config activation",
153 kind: models.StepKindSystem,
154 command: "activate nixos config",
155 action: activationStepAction,
156 config: config,
157 configKey: configKey,
158 }
159
160 insertAt := 0
161 if len(swf.Steps) > 0 && swf.Steps[0].Kind() == models.StepKindSystem {
162 insertAt = 1
163 }
164 swf.Steps = append(swf.Steps, nil)
165 copy(swf.Steps[insertAt+1:], swf.Steps[insertAt:])
166 swf.Steps[insertAt] = activationStep
167 }
168
169 cacheURLs, cacheKeys, err := workflowCaches(dwf.Caches)
170 if err != nil {
171 return nil, err
172 }
173
174 swf.Data = &workflowState{
175 ImageSpec: imageSpec,
176 ImageSpecPath: imageSpecPath,
177 Config: config,
178 ConfigKey: configKey,
179 Image: imageName,
180 CacheReadURLs: cacheURLs,
181 CacheTrustedPublicKeys: cacheKeys,
182 NixOSToplevelCache: newNixOSToplevelCacheStore(e.db),
183 }
184 return swf, nil
185}
186
187func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
188 l := e.l.With("workflow", wid)
189 setupStep := Step{name: "microVM setup", kind: models.StepKindSystem}
190
191 wfLogger.ControlWriter(-1, setupStep, models.StepStatusStart).Write([]byte{0})
192 defer wfLogger.ControlWriter(-1, setupStep, models.StepStatusEnd).Write([]byte{0})
193
194 state, ok := wf.Data.(*workflowState)
195 if !ok || state == nil {
196 return fmt.Errorf("workflow state is not initialized")
197 }
198
199 cid, err := AllocateCID()
200 if err != nil {
201 return err
202 }
203 connCh, unregister, err := e.agent.expect(cid)
204 if err != nil {
205 return err
206 }
207 defer unregister()
208
209 workDirBase := e.cfg.MicroVMPipelines.OverlayDir
210 if workDirBase == "" {
211 workDirBase = os.TempDir()
212 }
213 workDir, err := os.MkdirTemp(workDirBase, "spindle-microvm-"+wid.String()+"-*")
214 if err != nil {
215 return fmt.Errorf("create workflow microVM directory: %w", err)
216 }
217 state.WorkDir = workDir
218
219 setupDone := false
220 defer func() {
221 if setupDone {
222 return
223 }
224 if detail := vmCrashLog(state.VM); detail != "" {
225 l.Error("microVM setup failed", "detail", detail)
226 }
227 if err := e.cleanupState(context.Background(), wid, state); err != nil {
228 l.Error("failed to cleanup failed setup", "error", err)
229 }
230 }()
231
232 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs)
233 if err != nil {
234 return err
235 }
236 readCache, err := StartReadCacheProxy(ctx, cid, upstreams, l)
237 if err != nil {
238 return err
239 }
240 state.ReadCache = readCache
241 stagingDir := filepath.Join(workDir, "upload-cache")
242 uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l)
243 if err != nil {
244 return err
245 }
246 state.UploadCache = uploadCache
247 dnsProxy, err := StartDNSProxy(ctx, cid, l)
248 if err != nil {
249 return err
250 }
251 state.DNSProxy = dnsProxy
252
253 port := e.cfg.MicroVMPipelines.AgentPort
254 if port == 0 {
255 port = agentproto.DefaultPort
256 }
257 state.ImageSpec.BootArgs = fmt.Sprintf("%s shuttle.vsock_port=%d", state.ImageSpec.BootArgs, port)
258
259 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"), "starting microVM image %s\n", state.Image)
260 l.Info("starting microVM workflow", "image", state.Image, "imageSpec", state.ImageSpecPath, "cid", cid, "workDir", workDir)
261
262 var vm VMHandle
263 vm, err = StartVM(ctx, VMConfig{
264 Image: state.ImageSpec,
265 CID: cid,
266 EnableKVM: e.cfg.MicroVMPipelines.EnableKVM,
267 WorkDir: workDir,
268 Cgroup: e.cgroupLimits(wid, state.ImageSpec),
269 Dev: e.cfg.Server.Dev,
270 }, l)
271 if err != nil {
272 return err
273 }
274 state.VM = vm
275
276 acceptCtx, cancelAccept := context.WithTimeout(ctx, agentAcceptTimeout)
277 defer cancelAccept()
278 conn, err := waitAgentConn(acceptCtx, connCh)
279 if err != nil {
280 return err
281 }
282
283 agentSession := NewAgentSession(conn, l)
284 initCtx, cancelInit := context.WithTimeout(ctx, agentHandshakeTimeout)
285 defer cancelInit()
286 if err := agentSession.Init(initCtx, &agentv1.Init{
287 JobId: wid.String(),
288 CacheTrustedPublicKeys: append(slices.Clone(e.cfg.NixCache.TrustedPublicKeys), state.CacheTrustedPublicKeys...),
289 CacheReadProxyPort: readCache.Port(),
290 CacheUploadProxyPort: uploadCache.Port(),
291 DnsProxyPort: dnsProxy.Port(),
292 }); err != nil {
293 _ = agentSession.Close()
294 return err
295 }
296 state.Agent = agentSession
297 wf.Data = state
298
299 e.registerCleanup(wid, func(ctx context.Context) error {
300 return e.cleanupState(ctx, wid, state)
301 })
302 setupDone = true
303
304 fmt.Fprintf(wfLogger.DataWriter(-1, "stdout"),
305 "agent connected; serial log: %s\n", vm.Logs().Serial,
306 )
307 return nil
308}
309
310func applyDepsSource(command string) string {
311 return fmt.Sprintf(
312 // check if it exists because not all images have this
313 `if [ -f %s ]; then . %s; export PATH="$PATH:%s"; fi; %s`,
314 guestDevShellEnvPath, guestDevShellEnvPath, guestBasePATH, command,
315 )
316}
317
318func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
319 state, ok := w.Data.(*workflowState)
320 if !ok || state == nil || state.Agent == nil {
321 return fmt.Errorf("microVM workflow is not connected to agent")
322 }
323
324 stderr := wfLogger.DataWriter(idx, "stderr")
325
326 execCtx, vmExited, cancelWatch := watchVMExit(ctx, state.VM)
327 defer cancelWatch()
328
329 step := w.Steps[idx]
330 if s, ok := step.(Step); ok && s.action == activationStepAction {
331 err := e.activateConfig(execCtx, wid, state, s, wfLogger.DataWriter(idx, "stdout"))
332 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
333 }
334 env := []string{
335 "HOME=/workspace",
336 "LOGNAME=" + guestWorkflowUser,
337 "PATH=" + guestBasePATH,
338 "USER=" + guestWorkflowUser,
339 }
340 for k, v := range w.Environment {
341 env = append(env, k+"="+v)
342 }
343 for _, s := range secrets {
344 env = append(env, s.Key+"="+s.Value)
345 }
346 if s, ok := step.(Step); ok {
347 for k, v := range s.environment {
348 env = append(env, k+"="+v)
349 }
350 }
351
352 stdout := wfLogger.DataWriter(idx, "stdout")
353 exitCode, err := state.Agent.Exec(execCtx, AgentExec{
354 ID: fmt.Sprintf("%s-%d", wid.String(), idx),
355 ExecStart: &agentv1.ExecStart{
356 Argv: []string{state.ImageSpec.Shell, "-lc", applyDepsSource(step.Command())},
357 Env: env,
358 Cwd: guestWorkDir,
359 User: guestWorkflowUser,
360 // timeout not set here, Exec will fill it
361 },
362 Stdout: stdout,
363 Stderr: stderr,
364 })
365 if err != nil {
366 return e.classifyStepError(ctx, wid, step, state, stderr, vmExited, err)
367 }
368
369 if exitCode != 0 {
370 e.l.Debug("step exited non-zero", "workflow", wid, "step", step.Name(), "exitCode", exitCode)
371 return engine.ErrWorkflowFailed
372 }
373 return nil
374}
375
376// reads the vm serial logs so we report the tail of that as an error instead of
377// just "guest agent connection lost: EOF"
378func (e *Engine) classifyStepError(ctx context.Context, wid models.WorkflowId, step models.Step, state *workflowState, stderr io.Writer, vmExited *atomic.Bool, err error) error {
379 if err == nil {
380 return nil
381 }
382 l := e.l.With("workflow", wid, "step", step.Name())
383
384 if vmExited != nil && vmExited.Load() {
385 reason := "microVM exited unexpectedly"
386 oom := state.VM != nil && state.VM.OOMKilled()
387 if oom {
388 reason = "microVM killed by OOM (cgroup memory limit exceeded)"
389 }
390 if detail := vmCrashLog(state.VM); detail != "" {
391 fmt.Fprintf(stderr, "%s:\n%s\n", reason, detail)
392 l.Error(reason, "oom", oom, "detail", detail)
393 } else {
394 fmt.Fprintln(stderr, reason)
395 l.Error(reason, "oom", oom)
396 }
397 return errors.New(reason + "; see workflow logs for serial output")
398 }
399
400 if errors.Is(err, errGuestTimedOut) || ctx.Err() != nil {
401 l.Debug("step timed out", "guestReported", errors.Is(err, errGuestTimedOut))
402 return engine.ErrTimedOut
403 }
404
405 // the agent connection dropped while qemu stayed up (eg. the guest kernel
406 // OOM-killed the agent or a guest panic), so surface serial logs, those
407 // will be more helpful.
408 if detail := vmCrashLog(state.VM); detail != "" {
409 fmt.Fprintf(stderr, "step failed (%v):\n%s\n", err, detail)
410 l.Error("step failed", "error", err, "detail", detail)
411 } else {
412 l.Error("step failed", "error", err)
413 }
414 return err
415}
416
417func (e *Engine) activateConfig(ctx context.Context, wid models.WorkflowId, state *workflowState, step Step, out io.Writer) error {
418 cfg := step.config
419 if !cfg.Enabled() {
420 return nil
421 }
422
423 configKey := step.configKey
424 if configKey == "" {
425 configKey = state.ConfigKey
426 }
427
428 userConfigJSON, err := json.Marshal(cfg)
429 if err != nil {
430 return fmt.Errorf("encode user config: %w", err)
431 }
432
433 var cachedToplevel string
434 if configKey != "" {
435 if record, ok, err := state.NixOSToplevelCache.Lookup(configKey); err != nil {
436 return err
437 } else if ok {
438 // todo(dawn): we should probably use gc roots to eliminate TOCTOU
439 // the spindle will have to manage the gc roots, and for remote we have to
440 // ssh in to the host and add / remove gc root.
441 // we need to have this check anyway since the only check http caches can
442 // use is this one, since we cant manage gc roots there...
443 if e.anyCacheHasPath(ctx, state, record.Toplevel) {
444 cachedToplevel = record.Toplevel
445 fmt.Fprintf(out, "realizing cached NixOS config %s\n", cachedToplevel)
446 }
447 }
448 }
449 if cachedToplevel == "" {
450 fmt.Fprintf(out, "building NixOS config from user config\n")
451 }
452
453 baseHash, err := BaseConfigHash(state.ImageSpec)
454 if err != nil {
455 return fmt.Errorf("calculate base config hash: %w", err)
456 }
457
458 result, err := state.Agent.ActivateConfig(ctx, fmt.Sprintf("%s-config", wid.String()), &agentv1.ActivateConfig{
459 ConfigKey: configKey,
460 BaseConfigHash: baseHash,
461 UserConfig: string(userConfigJSON),
462 Toplevel: cachedToplevel,
463 }, out)
464 if err != nil {
465 return err
466 }
467 fmt.Fprintf(out, "activated NixOS config toplevel %s\n", result.Toplevel)
468
469 if cachedToplevel != "" || configKey == "" {
470 return nil
471 }
472 if e.cfg.NixCache.UploadURL == "" {
473 e.l.Warn("not committing config cache metadata: no upload URL configured", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel)
474 return nil
475 }
476
477 if err := e.drainNixCache(ctx, state); err != nil {
478 // a partial upload would leave the cache unable to realize this toplevel,
479 // so skip the metadata commit rather than poison it with an un-realizable
480 // key. the config still activated fine, so don't fail the workflow.
481 e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err)
482 return nil
483 }
484 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil {
485 return err
486 }
487 fmt.Fprintf(out, "committed config cache metadata %s -> %s\n", configKey, result.Toplevel)
488 return nil
489}
490
491func (e *Engine) anyCacheHasPath(ctx context.Context, state *workflowState, storePath string) bool {
492 upstreams, err := BuildCacheUpstreams(e.cfg.NixCache.ReadURLs, state.CacheReadURLs)
493 if err != nil {
494 e.l.Warn("config cache check: build upstreams failed; treating as absent", "path", storePath, "error", err)
495 return false
496 }
497 if len(upstreams) == 0 {
498 return false
499 }
500 hash, _, err := parseStorePath(storePath)
501 if err != nil {
502 e.l.Warn("config cache check: invalid toplevel path; treating as absent", "path", storePath, "error", err)
503 return false
504 }
505 req, err := http.NewRequestWithContext(ctx, http.MethodHead, "http://upstream/"+hash+".narinfo", nil)
506 if err != nil {
507 e.l.Warn("config cache check: build request failed; treating as absent", "path", storePath, "error", err)
508 return false
509 }
510 resp, err := newNarinfoExistenceTransport(upstreams, e.l).RoundTrip(req)
511 if err != nil {
512 e.l.Warn("config cache check: narinfo probe failed; treating as absent", "path", storePath, "error", err)
513 return false
514 }
515 defer resp.Body.Close()
516 _, _ = io.Copy(io.Discard, resp.Body)
517 return resp.StatusCode == http.StatusOK
518}
519
520func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
521 fns := e.drainCleanups(wid)
522
523 var cleanupErr error
524 for i := len(fns) - 1; i >= 0; i-- {
525 if err := fns[i](ctx); err != nil {
526 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
527 cleanupErr = errors.Join(cleanupErr, err)
528 }
529 }
530 return cleanupErr
531}
532
533func (e *Engine) FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, w *models.Workflow, wfLogger models.WorkflowLogger) error {
534 return nil
535}
536
537func (e *Engine) WorkflowTimeout() time.Duration {
538 d, err := time.ParseDuration(e.cfg.MicroVMPipelines.WorkflowTimeout)
539 if err != nil {
540 d = 5 * time.Minute
541 }
542 return d + guestTimeoutGrace
543}
544
545func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
546 e.cleanupMu.Lock()
547 defer e.cleanupMu.Unlock()
548 key := wid.String()
549 e.cleanup[key] = append(e.cleanup[key], fn)
550}
551
552func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
553 e.cleanupMu.Lock()
554 defer e.cleanupMu.Unlock()
555 key := wid.String()
556 fns := e.cleanup[key]
557 delete(e.cleanup, key)
558 return fns
559}
560
561func (e *Engine) cgroupLimits(wid models.WorkflowId, spec ImageSpec) CgroupLimits {
562 cfg := e.cfg.MicroVMPipelines
563 return CgroupLimits{
564 Enabled: cfg.EnableCgroups,
565 Parent: e.cgroupParent,
566 Name: "workflow-" + wid.String(),
567 MemoryMaxMiB: resourcesForImage(spec).MemoryMiB,
568 SwapMaxMiB: cfg.CgroupSwapMaxMiB,
569 PidsMax: cfg.CgroupPidsMax,
570 }
571}