Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/binary"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "math"
13 "net"
14 "os"
15 "os/exec"
16 "path/filepath"
17 "slices"
18 "strings"
19 "sync/atomic"
20 "time"
21
22 "tangled.org/core/spindle/models"
23)
24
25const (
26 minGuestCID = 3
27 vmCrashLogTailBytes = 4096
28)
29
30func AllocateCID() (uint32, error) {
31 var data [4]byte
32 if _, err := rand.Read(data[:]); err != nil {
33 return 0, fmt.Errorf("allocate guest CID: %w", err)
34 }
35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil
36}
37
38func prepareWorkDir(workDir string) error {
39 if workDir == "" {
40 return fmt.Errorf("microvm work directory is required")
41 }
42 if err := os.MkdirAll(workDir, 0o755); err != nil {
43 return fmt.Errorf("create microvm work directory: %w", err)
44 }
45 return nil
46}
47
48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) {
49 paths := make(map[string]string, len(volumes))
50 for _, volume := range volumes {
51 if volume.ReadOnly {
52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image)
53 }
54 if volume.FSType != "ext4" {
55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType)
56 }
57 if volume.ImageType != "" && volume.ImageType != "raw" {
58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType)
59 }
60
61 path := filepath.Join(workDir, filepath.Base(volume.Image))
62 if err := createSparseFile(path, volume.SizeMiB); err != nil {
63 return nil, err
64 }
65 noJournal := volume.MountPoint == "/workspace"
66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil {
67 return nil, err
68 }
69 paths[volume.Image] = path
70 }
71 return paths, nil
72}
73
74func createSparseFile(path string, sizeMiB int64) error {
75 if sizeMiB <= 0 {
76 return fmt.Errorf("sparse file %q size must be positive", path)
77 }
78 if sizeMiB > math.MaxInt64/(1024*1024) {
79 return fmt.Errorf("sparse file %q size is too large", path)
80 }
81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
82 if err != nil {
83 return fmt.Errorf("create sparse file %q: %w", path, err)
84 }
85 defer file.Close()
86
87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil {
88 return fmt.Errorf("resize sparse file %q: %w", path, err)
89 }
90 return nil
91}
92
93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error {
94 if mkfsExt4 == "" {
95 return fmt.Errorf("mkfs.ext4 path is required")
96 }
97 args := []string{"-F"}
98 if noJournal {
99 args = append(args, "-O", "^has_journal")
100 }
101 args = append(args, path)
102
103 cmd := exec.CommandContext(ctx, mkfsExt4, args...)
104 output, err := cmd.CombinedOutput()
105 if err != nil {
106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output)))
107 }
108 return nil
109}
110
111func createParentedFile(path string) (*os.File, error) {
112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
113 return nil, fmt.Errorf("create log directory: %w", err)
114 }
115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
116 if err != nil {
117 return nil, fmt.Errorf("create log file %q: %w", path, err)
118 }
119 return file, nil
120}
121
122type VMLogs struct {
123 Serial string
124 Extra map[string]string
125}
126
127type VMHandle interface {
128 Shutdown(ctx context.Context) error
129 WaitContext(ctx context.Context) error
130 Close() error
131 Logs() VMLogs
132 CID() uint32
133 WorkDir() string
134 OOMKilled() bool
135}
136
137type VMConfig struct {
138 Image ImageSpec
139 CID uint32
140 EnableKVM bool
141 WorkDir string
142 Cgroup CgroupLimits
143
144 BootTimeout time.Duration
145 MkfsExt4 string
146 Dev bool
147}
148
149type workflowState struct {
150 ImageSpec ImageSpec
151 ImageSpecPath string
152 Config manifestConfig
153 ConfigKey string
154 Image string
155 CacheReadURLs []string
156 CacheTrustedPublicKeys []string
157 VM VMHandle
158 CID uint32
159 Agent *AgentSession
160 ReadCache *ReadCacheProxy
161 UploadCache *UploadCacheProxy
162 DNSProxy *DNSProxy
163 WorkDir string
164 NixOSToplevelCache nixosToplevelCacheStore
165 StartedAt time.Time // when the VM booted, for the max-lifetime cap
166}
167
168func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
169 if state == nil {
170 return nil
171 }
172
173 // stop advertising this VM for debug shells before we tear it down
174 e.unregisterDebugTarget(wid)
175
176 ctx = context.WithoutCancel(ctx)
177
178 var err error
179 // todo(dawn): expose this error to the user as a warning
180 if drainErr := e.drainNixCache(ctx, state); drainErr != nil {
181 e.l.Warn("cache drain failed during cleanup; continuing", "workflow", wid, "error", drainErr)
182 }
183 err = errors.Join(err, e.shutdownVM(ctx, wid, state))
184 err = errors.Join(err, closeIO(&state.Agent))
185 err = errors.Join(err, closeIO(&state.ReadCache))
186 err = errors.Join(err, closeIO(&state.UploadCache))
187 err = errors.Join(err, closeIO(&state.DNSProxy))
188 err = errors.Join(err, removeWorkDir(state))
189 return err
190}
191
192func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error {
193 if e.cfg.NixCache.UploadURL == "" {
194 return nil
195 }
196
197 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout)
198 defer cancel()
199
200 if state.Agent != nil {
201 if _, err := state.Agent.Drain(drainCtx); err != nil {
202 return fmt.Errorf("drain guest nix cache uploads: %w", err)
203 }
204 }
205 return nil
206}
207
208func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
209 if state.VM == nil {
210 return nil
211 }
212
213 var err error
214
215 if state.Agent != nil {
216 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
217 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state)
218 cancel()
219
220 err = errors.Join(err, poweroffErr)
221 if poweredOff {
222 return errors.Join(err, closeIO(&state.VM))
223 }
224 }
225
226 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
227 defer cancel()
228
229 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil {
230 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr)
231 err = errors.Join(err, shutdownErr)
232 }
233
234 return errors.Join(err, closeIO(&state.VM))
235}
236
237func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) {
238 if err := state.Agent.Poweroff(ctx); err != nil {
239 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err)
240 return false, err
241 }
242
243 if err := state.VM.WaitContext(ctx); err != nil {
244 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err)
245 return false, nil
246 }
247
248 return true, nil
249}
250
251// helper for closing io interfaces, sets to nil to prevent double-close
252func closeIO[T io.Closer](field *T) error {
253 closer := *field
254 var zero T
255 *field = zero
256 if any(closer) == any(zero) {
257 return nil
258 }
259 return closer.Close()
260}
261
262func removeWorkDir(state *workflowState) error {
263 if state.WorkDir == "" {
264 return nil
265 }
266
267 err := os.RemoveAll(state.WorkDir)
268 state.WorkDir = ""
269 return err
270}
271
272// returns a context derived from ctx that is cancelled either when ctx itself
273// is cancelled or when the microVM exits on its own. the returned flag reports
274// whether the VM exited (as opposed to ctx being cancelled for another reason,
275// e.g. the workflow timeout), letting callers tell a crash apart from a
276// timeout. cancel must be called to release the watcher goroutine.
277func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) {
278 exited := &atomic.Bool{}
279 watchCtx, cancel := context.WithCancel(ctx)
280 if vm == nil {
281 return watchCtx, exited, cancel
282 }
283 go func() {
284 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled
285 if watchCtx.Err() == nil {
286 exited.Store(true)
287 cancel() // don't forget to cancel the watchCtx...
288 }
289 }()
290 return watchCtx, exited, cancel
291}
292
293func vmCrashLog(vm VMHandle) string {
294 if vm == nil {
295 return ""
296 }
297 logs := vm.Logs()
298
299 var b strings.Builder
300 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" {
301 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail)
302 }
303 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) {
304 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" {
305 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail)
306 }
307 }
308 return strings.TrimRight(b.String(), "\n")
309}
310
311func tailFile(path string, max int64) string {
312 if path == "" {
313 return ""
314 }
315 f, err := os.Open(path)
316 if err != nil {
317 return ""
318 }
319 defer f.Close()
320 if info, err := f.Stat(); err == nil && info.Size() > max {
321 if _, err := f.Seek(-max, io.SeekEnd); err != nil {
322 return ""
323 }
324 }
325 data, err := io.ReadAll(f)
326 if err != nil {
327 return ""
328 }
329 return strings.TrimSpace(string(data))
330}
331
332func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) {
333 select {
334 case conn := <-connCh:
335 if conn == nil {
336 return nil, fmt.Errorf("agent connection closed before setup")
337 }
338 return conn, nil
339 case <-ctx.Done():
340 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err())
341 }
342}
343
344func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) {
345 if logger == nil {
346 logger = slog.Default()
347 }
348
349 runner, err := runnerFor(cfg.Image.RunnerType)
350 if err != nil {
351 return nil, err
352 }
353 if err := cfg.Image.Validate(); err != nil {
354 return nil, err
355 }
356 if err := cfg.Image.validateImageFiles(); err != nil {
357 return nil, err
358 }
359 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil {
360 return nil, err
361 }
362
363 if err := prepareWorkDir(cfg.WorkDir); err != nil {
364 return nil, err
365 }
366
367 mkfsExt4 := cfg.MkfsExt4
368 if mkfsExt4 == "" {
369 mkfsExt4, err = exec.LookPath("mkfs.ext4")
370 if err != nil {
371 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err)
372 }
373 }
374 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4)
375 if err != nil {
376 return nil, err
377 }
378
379 return runner.Start(ctx, cfg, volumePaths, logger)
380}