Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/binary"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "math"
13 "net"
14 "os"
15 "os/exec"
16 "path/filepath"
17 "slices"
18 "strings"
19 "sync/atomic"
20 "time"
21
22 "tangled.org/core/spindle/models"
23)
24
25const (
26 minGuestCID = 3
27 vmCrashLogTailBytes = 4096
28)
29
30func AllocateCID() (uint32, error) {
31 var data [4]byte
32 if _, err := rand.Read(data[:]); err != nil {
33 return 0, fmt.Errorf("allocate guest CID: %w", err)
34 }
35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil
36}
37
38func prepareWorkDir(workDir string) error {
39 if workDir == "" {
40 return fmt.Errorf("microvm work directory is required")
41 }
42 if err := os.MkdirAll(workDir, 0o755); err != nil {
43 return fmt.Errorf("create microvm work directory: %w", err)
44 }
45 return nil
46}
47
48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) {
49 paths := make(map[string]string, len(volumes))
50 for _, volume := range volumes {
51 if volume.ReadOnly {
52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image)
53 }
54 if volume.FSType != "ext4" {
55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType)
56 }
57 if volume.ImageType != "" && volume.ImageType != "raw" {
58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType)
59 }
60
61 path := filepath.Join(workDir, filepath.Base(volume.Image))
62 if err := createSparseFile(path, volume.SizeMiB); err != nil {
63 return nil, err
64 }
65 noJournal := volume.MountPoint == "/workspace"
66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil {
67 return nil, err
68 }
69 paths[volume.Image] = path
70 }
71 return paths, nil
72}
73
74func createSparseFile(path string, sizeMiB int64) error {
75 if sizeMiB <= 0 {
76 return fmt.Errorf("sparse file %q size must be positive", path)
77 }
78 if sizeMiB > math.MaxInt64/(1024*1024) {
79 return fmt.Errorf("sparse file %q size is too large", path)
80 }
81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
82 if err != nil {
83 return fmt.Errorf("create sparse file %q: %w", path, err)
84 }
85 defer file.Close()
86
87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil {
88 return fmt.Errorf("resize sparse file %q: %w", path, err)
89 }
90 return nil
91}
92
93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error {
94 if mkfsExt4 == "" {
95 return fmt.Errorf("mkfs.ext4 path is required")
96 }
97 args := []string{"-F"}
98 if noJournal {
99 args = append(args, "-O", "^has_journal")
100 }
101 args = append(args, path)
102
103 cmd := exec.CommandContext(ctx, mkfsExt4, args...)
104 output, err := cmd.CombinedOutput()
105 if err != nil {
106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output)))
107 }
108 return nil
109}
110
111func createParentedFile(path string) (*os.File, error) {
112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
113 return nil, fmt.Errorf("create log directory: %w", err)
114 }
115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
116 if err != nil {
117 return nil, fmt.Errorf("create log file %q: %w", path, err)
118 }
119 return file, nil
120}
121
122type VMLogs struct {
123 Serial string
124 Extra map[string]string
125}
126
127type VMHandle interface {
128 Shutdown(ctx context.Context) error
129 WaitContext(ctx context.Context) error
130 Close() error
131 Logs() VMLogs
132 CID() uint32
133 WorkDir() string
134 OOMKilled() bool
135}
136
137type VMConfig struct {
138 Image ImageSpec
139 CID uint32
140 EnableKVM bool
141 WorkDir string
142 Cgroup CgroupLimits
143
144 BootTimeout time.Duration
145 MkfsExt4 string
146 Dev bool
147}
148
149type workflowState struct {
150 ImageSpec ImageSpec
151 ImageSpecPath string
152 Config manifestConfig
153 ConfigKey string
154 Image string
155 CacheReadURLs []string
156 CacheTrustedPublicKeys []string
157 VM VMHandle
158 Agent *AgentSession
159 ReadCache *ReadCacheProxy
160 UploadCache *UploadCacheProxy
161 DNSProxy *DNSProxy
162 WorkDir string
163 NixOSToplevelCache nixosToplevelCacheStore
164}
165
166func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
167 if state == nil {
168 return nil
169 }
170
171 ctx = context.WithoutCancel(ctx)
172
173 var err error
174 // todo(dawn): expose this error to the user as a warning
175 if drainErr := e.drainNixCache(ctx, state); drainErr != nil {
176 e.l.Warn("cache drain failed during cleanup; continuing", "workflow", wid, "error", drainErr)
177 }
178 err = errors.Join(err, e.shutdownVM(ctx, wid, state))
179 err = errors.Join(err, closeIO(&state.Agent))
180 err = errors.Join(err, closeIO(&state.ReadCache))
181 err = errors.Join(err, closeIO(&state.UploadCache))
182 err = errors.Join(err, closeIO(&state.DNSProxy))
183 err = errors.Join(err, removeWorkDir(state))
184 return err
185}
186
187func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error {
188 if e.cfg.NixCache.UploadURL == "" {
189 return nil
190 }
191
192 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout)
193 defer cancel()
194
195 if state.Agent != nil {
196 if _, err := state.Agent.Drain(drainCtx); err != nil {
197 return fmt.Errorf("drain guest nix cache uploads: %w", err)
198 }
199 }
200 return nil
201}
202
203func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
204 if state.VM == nil {
205 return nil
206 }
207
208 var err error
209
210 if state.Agent != nil {
211 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
212 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state)
213 cancel()
214
215 err = errors.Join(err, poweroffErr)
216 if poweredOff {
217 return errors.Join(err, closeIO(&state.VM))
218 }
219 }
220
221 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
222 defer cancel()
223
224 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil {
225 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr)
226 err = errors.Join(err, shutdownErr)
227 }
228
229 return errors.Join(err, closeIO(&state.VM))
230}
231
232func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) {
233 if err := state.Agent.Poweroff(ctx); err != nil {
234 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err)
235 return false, err
236 }
237
238 if err := state.VM.WaitContext(ctx); err != nil {
239 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err)
240 return false, nil
241 }
242
243 return true, nil
244}
245
246// helper for closing io interfaces, sets to nil to prevent double-close
247func closeIO[T io.Closer](field *T) error {
248 closer := *field
249 var zero T
250 *field = zero
251 if any(closer) == any(zero) {
252 return nil
253 }
254 return closer.Close()
255}
256
257func removeWorkDir(state *workflowState) error {
258 if state.WorkDir == "" {
259 return nil
260 }
261
262 err := os.RemoveAll(state.WorkDir)
263 state.WorkDir = ""
264 return err
265}
266
267// returns a context derived from ctx that is cancelled either when ctx itself
268// is cancelled or when the microVM exits on its own. the returned flag reports
269// whether the VM exited (as opposed to ctx being cancelled for another reason,
270// e.g. the workflow timeout), letting callers tell a crash apart from a
271// timeout. cancel must be called to release the watcher goroutine.
272func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) {
273 exited := &atomic.Bool{}
274 watchCtx, cancel := context.WithCancel(ctx)
275 if vm == nil {
276 return watchCtx, exited, cancel
277 }
278 go func() {
279 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled
280 if watchCtx.Err() == nil {
281 exited.Store(true)
282 cancel() // don't forget to cancel the watchCtx...
283 }
284 }()
285 return watchCtx, exited, cancel
286}
287
288func vmCrashLog(vm VMHandle) string {
289 if vm == nil {
290 return ""
291 }
292 logs := vm.Logs()
293
294 var b strings.Builder
295 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" {
296 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail)
297 }
298 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) {
299 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" {
300 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail)
301 }
302 }
303 return strings.TrimRight(b.String(), "\n")
304}
305
306func tailFile(path string, max int64) string {
307 if path == "" {
308 return ""
309 }
310 f, err := os.Open(path)
311 if err != nil {
312 return ""
313 }
314 defer f.Close()
315 if info, err := f.Stat(); err == nil && info.Size() > max {
316 if _, err := f.Seek(-max, io.SeekEnd); err != nil {
317 return ""
318 }
319 }
320 data, err := io.ReadAll(f)
321 if err != nil {
322 return ""
323 }
324 return strings.TrimSpace(string(data))
325}
326
327func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) {
328 select {
329 case conn := <-connCh:
330 if conn == nil {
331 return nil, fmt.Errorf("agent connection closed before setup")
332 }
333 return conn, nil
334 case <-ctx.Done():
335 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err())
336 }
337}
338
339func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) {
340 if logger == nil {
341 logger = slog.Default()
342 }
343
344 runner, err := runnerFor(cfg.Image.RunnerType)
345 if err != nil {
346 return nil, err
347 }
348 if err := cfg.Image.Validate(); err != nil {
349 return nil, err
350 }
351 if err := cfg.Image.validateImageFiles(); err != nil {
352 return nil, err
353 }
354 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil {
355 return nil, err
356 }
357
358 if err := prepareWorkDir(cfg.WorkDir); err != nil {
359 return nil, err
360 }
361
362 mkfsExt4 := cfg.MkfsExt4
363 if mkfsExt4 == "" {
364 mkfsExt4, err = exec.LookPath("mkfs.ext4")
365 if err != nil {
366 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err)
367 }
368 }
369 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4)
370 if err != nil {
371 return nil, err
372 }
373
374 return runner.Start(ctx, cfg, volumePaths, logger)
375}