Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/binary"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "math"
13 "net"
14 "os"
15 "os/exec"
16 "path/filepath"
17 "slices"
18 "strings"
19 "sync/atomic"
20 "time"
21
22 "tangled.org/core/spindle/models"
23)
24
25const (
26 minGuestCID = 3
27 vmCrashLogTailBytes = 4096
28)
29
30func AllocateCID() (uint32, error) {
31 var data [4]byte
32 if _, err := rand.Read(data[:]); err != nil {
33 return 0, fmt.Errorf("allocate guest CID: %w", err)
34 }
35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil
36}
37
38func prepareWorkDir(workDir string) error {
39 if workDir == "" {
40 return fmt.Errorf("microvm work directory is required")
41 }
42 if err := os.MkdirAll(workDir, 0o755); err != nil {
43 return fmt.Errorf("create microvm work directory: %w", err)
44 }
45 return nil
46}
47
48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) {
49 paths := make(map[string]string, len(volumes))
50 for _, volume := range volumes {
51 if volume.ReadOnly {
52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image)
53 }
54 if volume.FSType != "ext4" {
55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType)
56 }
57 if volume.ImageType != "" && volume.ImageType != "raw" {
58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType)
59 }
60
61 path := filepath.Join(workDir, filepath.Base(volume.Image))
62 if err := createSparseFile(path, volume.SizeMiB); err != nil {
63 return nil, err
64 }
65 noJournal := volume.MountPoint == "/workspace"
66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil {
67 return nil, err
68 }
69 paths[volume.Image] = path
70 }
71 return paths, nil
72}
73
74func createSparseFile(path string, sizeMiB int64) error {
75 if sizeMiB <= 0 {
76 return fmt.Errorf("sparse file %q size must be positive", path)
77 }
78 if sizeMiB > math.MaxInt64/(1024*1024) {
79 return fmt.Errorf("sparse file %q size is too large", path)
80 }
81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
82 if err != nil {
83 return fmt.Errorf("create sparse file %q: %w", path, err)
84 }
85 defer file.Close()
86
87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil {
88 return fmt.Errorf("resize sparse file %q: %w", path, err)
89 }
90 return nil
91}
92
93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error {
94 if mkfsExt4 == "" {
95 return fmt.Errorf("mkfs.ext4 path is required")
96 }
97 args := []string{"-F"}
98 if noJournal {
99 args = append(args, "-O", "^has_journal")
100 }
101 args = append(args, path)
102
103 cmd := exec.CommandContext(ctx, mkfsExt4, args...)
104 output, err := cmd.CombinedOutput()
105 if err != nil {
106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output)))
107 }
108 return nil
109}
110
111func createParentedFile(path string) (*os.File, error) {
112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
113 return nil, fmt.Errorf("create log directory: %w", err)
114 }
115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
116 if err != nil {
117 return nil, fmt.Errorf("create log file %q: %w", path, err)
118 }
119 return file, nil
120}
121
122type VMLogs struct {
123 Serial string
124 Extra map[string]string
125}
126
127type VMHandle interface {
128 Shutdown(ctx context.Context) error
129 WaitContext(ctx context.Context) error
130 Close() error
131 Logs() VMLogs
132 CID() uint32
133 WorkDir() string
134 OOMKilled() bool
135}
136
137type VMConfig struct {
138 Image ImageSpec
139 CID uint32
140 EnableKVM bool
141 WorkDir string
142 Cgroup CgroupLimits
143
144 BootTimeout time.Duration
145 MkfsExt4 string
146 Dev bool
147}
148
149type workflowState struct {
150 ImageSpec ImageSpec
151 ImageSpecPath string
152 Config manifestConfig
153 ConfigKey string
154 Image string
155 CacheReadURLs []string
156 CacheTrustedPublicKeys []string
157 VM VMHandle
158 Agent *AgentSession
159 ReadCache *ReadCacheProxy
160 UploadCache *UploadCacheProxy
161 DNSProxy *DNSProxy
162 WorkDir string
163 NixOSToplevelCache nixosToplevelCacheStore
164}
165
166func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
167 if state == nil {
168 return nil
169 }
170
171 ctx = context.WithoutCancel(ctx)
172
173 var err error
174 err = errors.Join(err, e.drainNixCache(ctx, state))
175 err = errors.Join(err, e.shutdownVM(ctx, wid, state))
176 err = errors.Join(err, closeIO(&state.Agent))
177 err = errors.Join(err, closeIO(&state.ReadCache))
178 err = errors.Join(err, closeIO(&state.UploadCache))
179 err = errors.Join(err, closeIO(&state.DNSProxy))
180 err = errors.Join(err, removeWorkDir(state))
181 return err
182}
183
184func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error {
185 if e.cfg.NixCache.UploadURL == "" {
186 return nil
187 }
188
189 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout)
190 defer cancel()
191
192 if state.Agent != nil {
193 if _, err := state.Agent.Drain(drainCtx); err != nil {
194 return fmt.Errorf("drain guest nix cache uploads: %w", err)
195 }
196 }
197 return nil
198}
199
200func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
201 if state.VM == nil {
202 return nil
203 }
204
205 var err error
206
207 if state.Agent != nil {
208 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
209 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state)
210 cancel()
211
212 err = errors.Join(err, poweroffErr)
213 if poweredOff {
214 return errors.Join(err, closeIO(&state.VM))
215 }
216 }
217
218 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
219 defer cancel()
220
221 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil {
222 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr)
223 err = errors.Join(err, shutdownErr)
224 }
225
226 return errors.Join(err, closeIO(&state.VM))
227}
228
229func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) {
230 if err := state.Agent.Poweroff(ctx); err != nil {
231 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err)
232 return false, err
233 }
234
235 if err := state.VM.WaitContext(ctx); err != nil {
236 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err)
237 return false, nil
238 }
239
240 return true, nil
241}
242
243// helper for closing io interfaces, sets to nil to prevent double-close
244func closeIO[T io.Closer](field *T) error {
245 closer := *field
246 var zero T
247 *field = zero
248 if any(closer) == any(zero) {
249 return nil
250 }
251 return closer.Close()
252}
253
254func removeWorkDir(state *workflowState) error {
255 if state.WorkDir == "" {
256 return nil
257 }
258
259 err := os.RemoveAll(state.WorkDir)
260 state.WorkDir = ""
261 return err
262}
263
264// returns a context derived from ctx that is cancelled either when ctx itself
265// is cancelled or when the microVM exits on its own. the returned flag reports
266// whether the VM exited (as opposed to ctx being cancelled for another reason,
267// e.g. the workflow timeout), letting callers tell a crash apart from a
268// timeout. cancel must be called to release the watcher goroutine.
269func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) {
270 exited := &atomic.Bool{}
271 watchCtx, cancel := context.WithCancel(ctx)
272 if vm == nil {
273 return watchCtx, exited, cancel
274 }
275 go func() {
276 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled
277 if watchCtx.Err() == nil {
278 exited.Store(true)
279 cancel() // don't forget to cancel the watchCtx...
280 }
281 }()
282 return watchCtx, exited, cancel
283}
284
285func vmCrashLog(vm VMHandle) string {
286 if vm == nil {
287 return ""
288 }
289 logs := vm.Logs()
290
291 var b strings.Builder
292 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" {
293 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail)
294 }
295 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) {
296 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" {
297 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail)
298 }
299 }
300 return strings.TrimRight(b.String(), "\n")
301}
302
303func tailFile(path string, max int64) string {
304 if path == "" {
305 return ""
306 }
307 f, err := os.Open(path)
308 if err != nil {
309 return ""
310 }
311 defer f.Close()
312 if info, err := f.Stat(); err == nil && info.Size() > max {
313 if _, err := f.Seek(-max, io.SeekEnd); err != nil {
314 return ""
315 }
316 }
317 data, err := io.ReadAll(f)
318 if err != nil {
319 return ""
320 }
321 return strings.TrimSpace(string(data))
322}
323
324func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) {
325 select {
326 case conn := <-connCh:
327 if conn == nil {
328 return nil, fmt.Errorf("agent connection closed before setup")
329 }
330 return conn, nil
331 case <-ctx.Done():
332 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err())
333 }
334}
335
336func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) {
337 if logger == nil {
338 logger = slog.Default()
339 }
340
341 runner, err := runnerFor(cfg.Image.RunnerType)
342 if err != nil {
343 return nil, err
344 }
345 if err := cfg.Image.Validate(); err != nil {
346 return nil, err
347 }
348 if err := cfg.Image.validateImageFiles(); err != nil {
349 return nil, err
350 }
351 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil {
352 return nil, err
353 }
354
355 if err := prepareWorkDir(cfg.WorkDir); err != nil {
356 return nil, err
357 }
358
359 mkfsExt4 := cfg.MkfsExt4
360 if mkfsExt4 == "" {
361 mkfsExt4, err = exec.LookPath("mkfs.ext4")
362 if err != nil {
363 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err)
364 }
365 }
366 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4)
367 if err != nil {
368 return nil, err
369 }
370
371 return runner.Start(ctx, cfg, volumePaths, logger)
372}