Monorepo for Tangled
tangled.org
1package microvm
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/binary"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "math"
13 "net"
14 "os"
15 "os/exec"
16 "path/filepath"
17 "slices"
18 "strings"
19 "sync/atomic"
20 "time"
21
22 "tangled.org/core/spindle/models"
23)
24
25const (
26 minGuestCID = 3
27 vmCrashLogTailBytes = 4096
28)
29
30func AllocateCID() (uint32, error) {
31 var data [4]byte
32 if _, err := rand.Read(data[:]); err != nil {
33 return 0, fmt.Errorf("allocate guest CID: %w", err)
34 }
35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil
36}
37
38func prepareWorkDir(workDir string) error {
39 if workDir == "" {
40 return fmt.Errorf("microvm work directory is required")
41 }
42 if err := os.MkdirAll(workDir, 0o755); err != nil {
43 return fmt.Errorf("create microvm work directory: %w", err)
44 }
45 return nil
46}
47
48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) {
49 paths := make(map[string]string, len(volumes))
50 for _, volume := range volumes {
51 if volume.ReadOnly {
52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image)
53 }
54 if volume.FSType != "ext4" {
55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType)
56 }
57 if volume.ImageType != "" && volume.ImageType != "raw" {
58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType)
59 }
60
61 path := filepath.Join(workDir, filepath.Base(volume.Image))
62 if err := createSparseFile(path, volume.SizeMiB); err != nil {
63 return nil, err
64 }
65 noJournal := volume.MountPoint == "/workspace"
66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil {
67 return nil, err
68 }
69 paths[volume.Image] = path
70 }
71 return paths, nil
72}
73
74func createSparseFile(path string, sizeMiB int64) error {
75 if sizeMiB <= 0 {
76 return fmt.Errorf("sparse file %q size must be positive", path)
77 }
78 if sizeMiB > math.MaxInt64/(1024*1024) {
79 return fmt.Errorf("sparse file %q size is too large", path)
80 }
81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
82 if err != nil {
83 return fmt.Errorf("create sparse file %q: %w", path, err)
84 }
85 defer file.Close()
86
87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil {
88 return fmt.Errorf("resize sparse file %q: %w", path, err)
89 }
90 return nil
91}
92
93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error {
94 if mkfsExt4 == "" {
95 return fmt.Errorf("mkfs.ext4 path is required")
96 }
97 args := []string{"-F"}
98 if noJournal {
99 args = append(args, "-O", "^has_journal")
100 }
101 args = append(args, path)
102
103 cmd := exec.CommandContext(ctx, mkfsExt4, args...)
104 output, err := cmd.CombinedOutput()
105 if err != nil {
106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output)))
107 }
108 return nil
109}
110
111func createParentedFile(path string) (*os.File, error) {
112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
113 return nil, fmt.Errorf("create log directory: %w", err)
114 }
115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
116 if err != nil {
117 return nil, fmt.Errorf("create log file %q: %w", path, err)
118 }
119 return file, nil
120}
121
122type VMLogs struct {
123 Serial string
124 Extra map[string]string
125}
126
127type VMHandle interface {
128 Shutdown(ctx context.Context) error
129 WaitContext(ctx context.Context) error
130 Close() error
131 Logs() VMLogs
132 CID() uint32
133 WorkDir() string
134 OOMKilled() bool
135}
136
137type VMConfig struct {
138 Image ImageSpec
139 CID uint32
140 EnableKVM bool
141 WorkDir string
142 Cgroup CgroupLimits
143
144 BootTimeout time.Duration
145 MkfsExt4 string
146 Dev bool
147}
148
149type workflowState struct {
150 ImageSpec ImageSpec
151 ImageSpecPath string
152 Config manifestConfig
153 ConfigKey string
154 Image string
155 CacheReadURLs []string
156 CacheTrustedPublicKeys []string
157 VM VMHandle
158 Agent *AgentSession
159 ReadCache *ReadCacheProxy
160 UploadCache *UploadCacheProxy
161 DNSProxy *DNSProxy
162 WorkDir string
163 NixOSToplevelCache nixosToplevelCacheStore
164}
165
166func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
167 if state == nil {
168 return nil
169 }
170
171 ctx = context.WithoutCancel(ctx)
172
173 var err error
174 err = errors.Join(err, e.drainNixCache(ctx, state))
175 err = errors.Join(err, e.shutdownVM(ctx, wid, state))
176 err = errors.Join(err, closeIO(&state.Agent))
177 err = errors.Join(err, closeIO(&state.ReadCache))
178 err = errors.Join(err, closeIO(&state.UploadCache))
179 err = errors.Join(err, closeIO(&state.DNSProxy))
180 err = errors.Join(err, removeWorkDir(state))
181 return err
182}
183
184func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error {
185 if state.Agent == nil || e.cfg.NixCache.UploadURL == "" {
186 return nil
187 }
188
189 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout)
190 defer cancel()
191
192 if _, err := state.Agent.Drain(drainCtx); err != nil {
193 return fmt.Errorf("drain nix cache: %w", err)
194 }
195 return nil
196}
197
198func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error {
199 if state.VM == nil {
200 return nil
201 }
202
203 var err error
204
205 if state.Agent != nil {
206 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
207 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state)
208 cancel()
209
210 err = errors.Join(err, poweroffErr)
211 if poweredOff {
212 return errors.Join(err, closeIO(&state.VM))
213 }
214 }
215
216 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout)
217 defer cancel()
218
219 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil {
220 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr)
221 err = errors.Join(err, shutdownErr)
222 }
223
224 return errors.Join(err, closeIO(&state.VM))
225}
226
227func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) {
228 if err := state.Agent.Poweroff(ctx); err != nil {
229 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err)
230 return false, err
231 }
232
233 if err := state.VM.WaitContext(ctx); err != nil {
234 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err)
235 return false, nil
236 }
237
238 return true, nil
239}
240
241// helper for closing io interfaces, sets to nil to prevent double-close
242func closeIO[T io.Closer](field *T) error {
243 closer := *field
244 var zero T
245 *field = zero
246 if any(closer) == any(zero) {
247 return nil
248 }
249 return closer.Close()
250}
251
252func removeWorkDir(state *workflowState) error {
253 if state.WorkDir == "" {
254 return nil
255 }
256
257 err := os.RemoveAll(state.WorkDir)
258 state.WorkDir = ""
259 return err
260}
261
262// returns a context derived from ctx that is cancelled either when ctx itself
263// is cancelled or when the microVM exits on its own. the returned flag reports
264// whether the VM exited (as opposed to ctx being cancelled for another reason,
265// e.g. the workflow timeout), letting callers tell a crash apart from a
266// timeout. cancel must be called to release the watcher goroutine.
267func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) {
268 exited := &atomic.Bool{}
269 watchCtx, cancel := context.WithCancel(ctx)
270 if vm == nil {
271 return watchCtx, exited, cancel
272 }
273 go func() {
274 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled
275 if watchCtx.Err() == nil {
276 exited.Store(true)
277 cancel() // don't forget to cancel the watchCtx...
278 }
279 }()
280 return watchCtx, exited, cancel
281}
282
283func vmCrashLog(vm VMHandle) string {
284 if vm == nil {
285 return ""
286 }
287 logs := vm.Logs()
288
289 var b strings.Builder
290 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" {
291 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail)
292 }
293 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) {
294 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" {
295 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail)
296 }
297 }
298 return strings.TrimRight(b.String(), "\n")
299}
300
301func tailFile(path string, max int64) string {
302 if path == "" {
303 return ""
304 }
305 f, err := os.Open(path)
306 if err != nil {
307 return ""
308 }
309 defer f.Close()
310 if info, err := f.Stat(); err == nil && info.Size() > max {
311 if _, err := f.Seek(-max, io.SeekEnd); err != nil {
312 return ""
313 }
314 }
315 data, err := io.ReadAll(f)
316 if err != nil {
317 return ""
318 }
319 return strings.TrimSpace(string(data))
320}
321
322func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) {
323 select {
324 case conn := <-connCh:
325 if conn == nil {
326 return nil, fmt.Errorf("agent connection closed before setup")
327 }
328 return conn, nil
329 case <-ctx.Done():
330 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err())
331 }
332}
333
334func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) {
335 if logger == nil {
336 logger = slog.Default()
337 }
338
339 runner, err := runnerFor(cfg.Image.RunnerType)
340 if err != nil {
341 return nil, err
342 }
343 if err := cfg.Image.Validate(); err != nil {
344 return nil, err
345 }
346 if err := cfg.Image.validateImageFiles(); err != nil {
347 return nil, err
348 }
349 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil {
350 return nil, err
351 }
352
353 if err := prepareWorkDir(cfg.WorkDir); err != nil {
354 return nil, err
355 }
356
357 mkfsExt4 := cfg.MkfsExt4
358 if mkfsExt4 == "" {
359 mkfsExt4, err = exec.LookPath("mkfs.ext4")
360 if err != nil {
361 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err)
362 }
363 }
364 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4)
365 if err != nil {
366 return nil, err
367 }
368
369 return runner.Start(ctx, cfg, volumePaths, logger)
370}