Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "math" 13 "net" 14 "os" 15 "os/exec" 16 "path/filepath" 17 "slices" 18 "strings" 19 "sync/atomic" 20 "time" 21 22 "tangled.org/core/spindle/models" 23) 24 25const ( 26 minGuestCID = 3 27 vmCrashLogTailBytes = 4096 28) 29 30func AllocateCID() (uint32, error) { 31 var data [4]byte 32 if _, err := rand.Read(data[:]); err != nil { 33 return 0, fmt.Errorf("allocate guest CID: %w", err) 34 } 35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil 36} 37 38func prepareWorkDir(workDir string) error { 39 if workDir == "" { 40 return fmt.Errorf("microvm work directory is required") 41 } 42 if err := os.MkdirAll(workDir, 0o755); err != nil { 43 return fmt.Errorf("create microvm work directory: %w", err) 44 } 45 return nil 46} 47 48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) { 49 paths := make(map[string]string, len(volumes)) 50 for _, volume := range volumes { 51 if volume.ReadOnly { 52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image) 53 } 54 if volume.FSType != "ext4" { 55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType) 56 } 57 if volume.ImageType != "" && volume.ImageType != "raw" { 58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType) 59 } 60 61 path := filepath.Join(workDir, filepath.Base(volume.Image)) 62 if err := createSparseFile(path, volume.SizeMiB); err != nil { 63 return nil, err 64 } 65 noJournal := volume.MountPoint == "/workspace" 66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil { 67 return nil, err 68 } 69 paths[volume.Image] = path 70 } 71 return paths, nil 72} 73 74func createSparseFile(path string, sizeMiB int64) error { 75 if sizeMiB <= 0 { 76 return fmt.Errorf("sparse file %q size must be positive", path) 77 } 78 if sizeMiB > math.MaxInt64/(1024*1024) { 79 return fmt.Errorf("sparse file %q size is too large", path) 80 } 81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600) 82 if err != nil { 83 return fmt.Errorf("create sparse file %q: %w", path, err) 84 } 85 defer file.Close() 86 87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil { 88 return fmt.Errorf("resize sparse file %q: %w", path, err) 89 } 90 return nil 91} 92 93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error { 94 if mkfsExt4 == "" { 95 return fmt.Errorf("mkfs.ext4 path is required") 96 } 97 args := []string{"-F"} 98 if noJournal { 99 args = append(args, "-O", "^has_journal") 100 } 101 args = append(args, path) 102 103 cmd := exec.CommandContext(ctx, mkfsExt4, args...) 104 output, err := cmd.CombinedOutput() 105 if err != nil { 106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output))) 107 } 108 return nil 109} 110 111func createParentedFile(path string) (*os.File, error) { 112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { 113 return nil, fmt.Errorf("create log directory: %w", err) 114 } 115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) 116 if err != nil { 117 return nil, fmt.Errorf("create log file %q: %w", path, err) 118 } 119 return file, nil 120} 121 122type VMLogs struct { 123 Serial string 124 Extra map[string]string 125} 126 127type VMHandle interface { 128 Shutdown(ctx context.Context) error 129 WaitContext(ctx context.Context) error 130 Close() error 131 Logs() VMLogs 132 CID() uint32 133 WorkDir() string 134 OOMKilled() bool 135} 136 137type VMConfig struct { 138 Image ImageSpec 139 CID uint32 140 EnableKVM bool 141 WorkDir string 142 Cgroup CgroupLimits 143 144 BootTimeout time.Duration 145 MkfsExt4 string 146 Dev bool 147} 148 149type workflowState struct { 150 ImageSpec ImageSpec 151 ImageSpecPath string 152 Config manifestConfig 153 ConfigKey string 154 Image string 155 CacheReadURLs []string 156 CacheTrustedPublicKeys []string 157 VM VMHandle 158 Agent *AgentSession 159 ReadCache *ReadCacheProxy 160 UploadCache *UploadCacheProxy 161 DNSProxy *DNSProxy 162 WorkDir string 163 NixOSToplevelCache nixosToplevelCacheStore 164} 165 166func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 167 if state == nil { 168 return nil 169 } 170 171 ctx = context.WithoutCancel(ctx) 172 173 var err error 174 // todo(dawn): expose this error to the user as a warning 175 if drainErr := e.drainNixCache(ctx, state); drainErr != nil { 176 e.l.Warn("cache drain failed during cleanup; continuing", "workflow", wid, "error", drainErr) 177 } 178 err = errors.Join(err, e.shutdownVM(ctx, wid, state)) 179 err = errors.Join(err, closeIO(&state.Agent)) 180 err = errors.Join(err, closeIO(&state.ReadCache)) 181 err = errors.Join(err, closeIO(&state.UploadCache)) 182 err = errors.Join(err, closeIO(&state.DNSProxy)) 183 err = errors.Join(err, removeWorkDir(state)) 184 return err 185} 186 187func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error { 188 if e.cfg.NixCache.UploadURL == "" { 189 return nil 190 } 191 192 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 193 defer cancel() 194 195 if state.Agent != nil { 196 if _, err := state.Agent.Drain(drainCtx); err != nil { 197 return fmt.Errorf("drain guest nix cache uploads: %w", err) 198 } 199 } 200 return nil 201} 202 203func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 204 if state.VM == nil { 205 return nil 206 } 207 208 var err error 209 210 if state.Agent != nil { 211 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 212 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state) 213 cancel() 214 215 err = errors.Join(err, poweroffErr) 216 if poweredOff { 217 return errors.Join(err, closeIO(&state.VM)) 218 } 219 } 220 221 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 222 defer cancel() 223 224 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil { 225 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr) 226 err = errors.Join(err, shutdownErr) 227 } 228 229 return errors.Join(err, closeIO(&state.VM)) 230} 231 232func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) { 233 if err := state.Agent.Poweroff(ctx); err != nil { 234 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err) 235 return false, err 236 } 237 238 if err := state.VM.WaitContext(ctx); err != nil { 239 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err) 240 return false, nil 241 } 242 243 return true, nil 244} 245 246// helper for closing io interfaces, sets to nil to prevent double-close 247func closeIO[T io.Closer](field *T) error { 248 closer := *field 249 var zero T 250 *field = zero 251 if any(closer) == any(zero) { 252 return nil 253 } 254 return closer.Close() 255} 256 257func removeWorkDir(state *workflowState) error { 258 if state.WorkDir == "" { 259 return nil 260 } 261 262 err := os.RemoveAll(state.WorkDir) 263 state.WorkDir = "" 264 return err 265} 266 267// returns a context derived from ctx that is cancelled either when ctx itself 268// is cancelled or when the microVM exits on its own. the returned flag reports 269// whether the VM exited (as opposed to ctx being cancelled for another reason, 270// e.g. the workflow timeout), letting callers tell a crash apart from a 271// timeout. cancel must be called to release the watcher goroutine. 272func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) { 273 exited := &atomic.Bool{} 274 watchCtx, cancel := context.WithCancel(ctx) 275 if vm == nil { 276 return watchCtx, exited, cancel 277 } 278 go func() { 279 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled 280 if watchCtx.Err() == nil { 281 exited.Store(true) 282 cancel() // don't forget to cancel the watchCtx... 283 } 284 }() 285 return watchCtx, exited, cancel 286} 287 288func vmCrashLog(vm VMHandle) string { 289 if vm == nil { 290 return "" 291 } 292 logs := vm.Logs() 293 294 var b strings.Builder 295 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" { 296 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail) 297 } 298 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) { 299 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" { 300 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail) 301 } 302 } 303 return strings.TrimRight(b.String(), "\n") 304} 305 306func tailFile(path string, max int64) string { 307 if path == "" { 308 return "" 309 } 310 f, err := os.Open(path) 311 if err != nil { 312 return "" 313 } 314 defer f.Close() 315 if info, err := f.Stat(); err == nil && info.Size() > max { 316 if _, err := f.Seek(-max, io.SeekEnd); err != nil { 317 return "" 318 } 319 } 320 data, err := io.ReadAll(f) 321 if err != nil { 322 return "" 323 } 324 return strings.TrimSpace(string(data)) 325} 326 327func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) { 328 select { 329 case conn := <-connCh: 330 if conn == nil { 331 return nil, fmt.Errorf("agent connection closed before setup") 332 } 333 return conn, nil 334 case <-ctx.Done(): 335 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err()) 336 } 337} 338 339func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) { 340 if logger == nil { 341 logger = slog.Default() 342 } 343 344 runner, err := runnerFor(cfg.Image.RunnerType) 345 if err != nil { 346 return nil, err 347 } 348 if err := cfg.Image.Validate(); err != nil { 349 return nil, err 350 } 351 if err := cfg.Image.validateImageFiles(); err != nil { 352 return nil, err 353 } 354 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil { 355 return nil, err 356 } 357 358 if err := prepareWorkDir(cfg.WorkDir); err != nil { 359 return nil, err 360 } 361 362 mkfsExt4 := cfg.MkfsExt4 363 if mkfsExt4 == "" { 364 mkfsExt4, err = exec.LookPath("mkfs.ext4") 365 if err != nil { 366 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err) 367 } 368 } 369 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4) 370 if err != nil { 371 return nil, err 372 } 373 374 return runner.Start(ctx, cfg, volumePaths, logger) 375}