Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "math" 13 "net" 14 "os" 15 "os/exec" 16 "path/filepath" 17 "slices" 18 "strings" 19 "sync/atomic" 20 "time" 21 22 "tangled.org/core/spindle/models" 23) 24 25const ( 26 minGuestCID = 3 27 vmCrashLogTailBytes = 4096 28) 29 30func AllocateCID() (uint32, error) { 31 var data [4]byte 32 if _, err := rand.Read(data[:]); err != nil { 33 return 0, fmt.Errorf("allocate guest CID: %w", err) 34 } 35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil 36} 37 38func prepareWorkDir(workDir string) error { 39 if workDir == "" { 40 return fmt.Errorf("microvm work directory is required") 41 } 42 if err := os.MkdirAll(workDir, 0o755); err != nil { 43 return fmt.Errorf("create microvm work directory: %w", err) 44 } 45 return nil 46} 47 48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) { 49 paths := make(map[string]string, len(volumes)) 50 for _, volume := range volumes { 51 if volume.ReadOnly { 52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image) 53 } 54 if volume.FSType != "ext4" { 55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType) 56 } 57 if volume.ImageType != "" && volume.ImageType != "raw" { 58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType) 59 } 60 61 path := filepath.Join(workDir, filepath.Base(volume.Image)) 62 if err := createSparseFile(path, volume.SizeMiB); err != nil { 63 return nil, err 64 } 65 noJournal := volume.MountPoint == "/workspace" 66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil { 67 return nil, err 68 } 69 paths[volume.Image] = path 70 } 71 return paths, nil 72} 73 74func createSparseFile(path string, sizeMiB int64) error { 75 if sizeMiB <= 0 { 76 return fmt.Errorf("sparse file %q size must be positive", path) 77 } 78 if sizeMiB > math.MaxInt64/(1024*1024) { 79 return fmt.Errorf("sparse file %q size is too large", path) 80 } 81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600) 82 if err != nil { 83 return fmt.Errorf("create sparse file %q: %w", path, err) 84 } 85 defer file.Close() 86 87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil { 88 return fmt.Errorf("resize sparse file %q: %w", path, err) 89 } 90 return nil 91} 92 93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error { 94 if mkfsExt4 == "" { 95 return fmt.Errorf("mkfs.ext4 path is required") 96 } 97 args := []string{"-F"} 98 if noJournal { 99 args = append(args, "-O", "^has_journal") 100 } 101 args = append(args, path) 102 103 cmd := exec.CommandContext(ctx, mkfsExt4, args...) 104 output, err := cmd.CombinedOutput() 105 if err != nil { 106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output))) 107 } 108 return nil 109} 110 111func createParentedFile(path string) (*os.File, error) { 112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { 113 return nil, fmt.Errorf("create log directory: %w", err) 114 } 115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) 116 if err != nil { 117 return nil, fmt.Errorf("create log file %q: %w", path, err) 118 } 119 return file, nil 120} 121 122type VMLogs struct { 123 Serial string 124 Extra map[string]string 125} 126 127type VMHandle interface { 128 Shutdown(ctx context.Context) error 129 WaitContext(ctx context.Context) error 130 Close() error 131 Logs() VMLogs 132 CID() uint32 133 WorkDir() string 134 OOMKilled() bool 135} 136 137type VMConfig struct { 138 Image ImageSpec 139 CID uint32 140 EnableKVM bool 141 WorkDir string 142 Cgroup CgroupLimits 143 144 BootTimeout time.Duration 145 MkfsExt4 string 146 Dev bool 147} 148 149type workflowState struct { 150 ImageSpec ImageSpec 151 ImageSpecPath string 152 Config manifestConfig 153 ConfigKey string 154 Image string 155 CacheReadURLs []string 156 CacheTrustedPublicKeys []string 157 VM VMHandle 158 Agent *AgentSession 159 ReadCache *ReadCacheProxy 160 UploadCache *UploadCacheProxy 161 DNSProxy *DNSProxy 162 WorkDir string 163 NixOSToplevelCache nixosToplevelCacheStore 164} 165 166func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 167 if state == nil { 168 return nil 169 } 170 171 ctx = context.WithoutCancel(ctx) 172 173 var err error 174 err = errors.Join(err, e.drainNixCache(ctx, state)) 175 err = errors.Join(err, e.shutdownVM(ctx, wid, state)) 176 err = errors.Join(err, closeIO(&state.Agent)) 177 err = errors.Join(err, closeIO(&state.ReadCache)) 178 err = errors.Join(err, closeIO(&state.UploadCache)) 179 err = errors.Join(err, closeIO(&state.DNSProxy)) 180 err = errors.Join(err, removeWorkDir(state)) 181 return err 182} 183 184func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error { 185 if e.cfg.NixCache.UploadURL == "" { 186 return nil 187 } 188 189 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 190 defer cancel() 191 192 if state.Agent != nil { 193 if _, err := state.Agent.Drain(drainCtx); err != nil { 194 return fmt.Errorf("drain guest nix cache uploads: %w", err) 195 } 196 } 197 return nil 198} 199 200func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 201 if state.VM == nil { 202 return nil 203 } 204 205 var err error 206 207 if state.Agent != nil { 208 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 209 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state) 210 cancel() 211 212 err = errors.Join(err, poweroffErr) 213 if poweredOff { 214 return errors.Join(err, closeIO(&state.VM)) 215 } 216 } 217 218 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 219 defer cancel() 220 221 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil { 222 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr) 223 err = errors.Join(err, shutdownErr) 224 } 225 226 return errors.Join(err, closeIO(&state.VM)) 227} 228 229func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) { 230 if err := state.Agent.Poweroff(ctx); err != nil { 231 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err) 232 return false, err 233 } 234 235 if err := state.VM.WaitContext(ctx); err != nil { 236 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err) 237 return false, nil 238 } 239 240 return true, nil 241} 242 243// helper for closing io interfaces, sets to nil to prevent double-close 244func closeIO[T io.Closer](field *T) error { 245 closer := *field 246 var zero T 247 *field = zero 248 if any(closer) == any(zero) { 249 return nil 250 } 251 return closer.Close() 252} 253 254func removeWorkDir(state *workflowState) error { 255 if state.WorkDir == "" { 256 return nil 257 } 258 259 err := os.RemoveAll(state.WorkDir) 260 state.WorkDir = "" 261 return err 262} 263 264// returns a context derived from ctx that is cancelled either when ctx itself 265// is cancelled or when the microVM exits on its own. the returned flag reports 266// whether the VM exited (as opposed to ctx being cancelled for another reason, 267// e.g. the workflow timeout), letting callers tell a crash apart from a 268// timeout. cancel must be called to release the watcher goroutine. 269func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) { 270 exited := &atomic.Bool{} 271 watchCtx, cancel := context.WithCancel(ctx) 272 if vm == nil { 273 return watchCtx, exited, cancel 274 } 275 go func() { 276 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled 277 if watchCtx.Err() == nil { 278 exited.Store(true) 279 cancel() // don't forget to cancel the watchCtx... 280 } 281 }() 282 return watchCtx, exited, cancel 283} 284 285func vmCrashLog(vm VMHandle) string { 286 if vm == nil { 287 return "" 288 } 289 logs := vm.Logs() 290 291 var b strings.Builder 292 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" { 293 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail) 294 } 295 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) { 296 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" { 297 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail) 298 } 299 } 300 return strings.TrimRight(b.String(), "\n") 301} 302 303func tailFile(path string, max int64) string { 304 if path == "" { 305 return "" 306 } 307 f, err := os.Open(path) 308 if err != nil { 309 return "" 310 } 311 defer f.Close() 312 if info, err := f.Stat(); err == nil && info.Size() > max { 313 if _, err := f.Seek(-max, io.SeekEnd); err != nil { 314 return "" 315 } 316 } 317 data, err := io.ReadAll(f) 318 if err != nil { 319 return "" 320 } 321 return strings.TrimSpace(string(data)) 322} 323 324func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) { 325 select { 326 case conn := <-connCh: 327 if conn == nil { 328 return nil, fmt.Errorf("agent connection closed before setup") 329 } 330 return conn, nil 331 case <-ctx.Done(): 332 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err()) 333 } 334} 335 336func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) { 337 if logger == nil { 338 logger = slog.Default() 339 } 340 341 runner, err := runnerFor(cfg.Image.RunnerType) 342 if err != nil { 343 return nil, err 344 } 345 if err := cfg.Image.Validate(); err != nil { 346 return nil, err 347 } 348 if err := cfg.Image.validateImageFiles(); err != nil { 349 return nil, err 350 } 351 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil { 352 return nil, err 353 } 354 355 if err := prepareWorkDir(cfg.WorkDir); err != nil { 356 return nil, err 357 } 358 359 mkfsExt4 := cfg.MkfsExt4 360 if mkfsExt4 == "" { 361 mkfsExt4, err = exec.LookPath("mkfs.ext4") 362 if err != nil { 363 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err) 364 } 365 } 366 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4) 367 if err != nil { 368 return nil, err 369 } 370 371 return runner.Start(ctx, cfg, volumePaths, logger) 372}