Monorepo for Tangled tangled.org
11

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "math" 13 "net" 14 "os" 15 "os/exec" 16 "path/filepath" 17 "slices" 18 "strings" 19 "sync/atomic" 20 "time" 21 22 "tangled.org/core/spindle/models" 23) 24 25const ( 26 minGuestCID = 3 27 vmCrashLogTailBytes = 4096 28) 29 30func AllocateCID() (uint32, error) { 31 var data [4]byte 32 if _, err := rand.Read(data[:]); err != nil { 33 return 0, fmt.Errorf("allocate guest CID: %w", err) 34 } 35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil 36} 37 38func prepareWorkDir(workDir string) error { 39 if workDir == "" { 40 return fmt.Errorf("microvm work directory is required") 41 } 42 if err := os.MkdirAll(workDir, 0o755); err != nil { 43 return fmt.Errorf("create microvm work directory: %w", err) 44 } 45 return nil 46} 47 48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) { 49 paths := make(map[string]string, len(volumes)) 50 for _, volume := range volumes { 51 if volume.ReadOnly { 52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image) 53 } 54 if volume.FSType != "ext4" { 55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType) 56 } 57 if volume.ImageType != "" && volume.ImageType != "raw" { 58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType) 59 } 60 61 path := filepath.Join(workDir, filepath.Base(volume.Image)) 62 if err := createSparseFile(path, volume.SizeMiB); err != nil { 63 return nil, err 64 } 65 noJournal := volume.MountPoint == "/workspace" 66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil { 67 return nil, err 68 } 69 paths[volume.Image] = path 70 } 71 return paths, nil 72} 73 74func createSparseFile(path string, sizeMiB int64) error { 75 if sizeMiB <= 0 { 76 return fmt.Errorf("sparse file %q size must be positive", path) 77 } 78 if sizeMiB > math.MaxInt64/(1024*1024) { 79 return fmt.Errorf("sparse file %q size is too large", path) 80 } 81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600) 82 if err != nil { 83 return fmt.Errorf("create sparse file %q: %w", path, err) 84 } 85 defer file.Close() 86 87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil { 88 return fmt.Errorf("resize sparse file %q: %w", path, err) 89 } 90 return nil 91} 92 93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error { 94 if mkfsExt4 == "" { 95 return fmt.Errorf("mkfs.ext4 path is required") 96 } 97 args := []string{"-F"} 98 if noJournal { 99 args = append(args, "-O", "^has_journal") 100 } 101 args = append(args, path) 102 103 cmd := exec.CommandContext(ctx, mkfsExt4, args...) 104 output, err := cmd.CombinedOutput() 105 if err != nil { 106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output))) 107 } 108 return nil 109} 110 111func createParentedFile(path string) (*os.File, error) { 112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { 113 return nil, fmt.Errorf("create log directory: %w", err) 114 } 115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) 116 if err != nil { 117 return nil, fmt.Errorf("create log file %q: %w", path, err) 118 } 119 return file, nil 120} 121 122type VMLogs struct { 123 Serial string 124 Extra map[string]string 125} 126 127type VMHandle interface { 128 Shutdown(ctx context.Context) error 129 WaitContext(ctx context.Context) error 130 Close() error 131 Logs() VMLogs 132 CID() uint32 133 WorkDir() string 134 OOMKilled() bool 135} 136 137type VMConfig struct { 138 Image ImageSpec 139 CID uint32 140 EnableKVM bool 141 WorkDir string 142 Cgroup CgroupLimits 143 144 BootTimeout time.Duration 145 MkfsExt4 string 146 Dev bool 147} 148 149type workflowState struct { 150 ImageSpec ImageSpec 151 ImageSpecPath string 152 Config manifestConfig 153 ConfigKey string 154 Image string 155 CacheReadURLs []string 156 CacheTrustedPublicKeys []string 157 VM VMHandle 158 CID uint32 159 Agent *AgentSession 160 ReadCache *ReadCacheProxy 161 UploadCache *UploadCacheProxy 162 DNSProxy *DNSProxy 163 WorkDir string 164 NixOSToplevelCache nixosToplevelCacheStore 165 StartedAt time.Time // when the VM booted, for the max-lifetime cap 166} 167 168func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 169 if state == nil { 170 return nil 171 } 172 173 // stop advertising this VM for debug shells before we tear it down 174 e.unregisterDebugTarget(wid) 175 176 ctx = context.WithoutCancel(ctx) 177 178 var err error 179 // todo(dawn): expose this error to the user as a warning 180 if drainErr := e.drainNixCache(ctx, state); drainErr != nil { 181 e.l.Warn("cache drain failed during cleanup; continuing", "workflow", wid, "error", drainErr) 182 } 183 err = errors.Join(err, e.shutdownVM(ctx, wid, state)) 184 err = errors.Join(err, closeIO(&state.Agent)) 185 err = errors.Join(err, closeIO(&state.ReadCache)) 186 err = errors.Join(err, closeIO(&state.UploadCache)) 187 err = errors.Join(err, closeIO(&state.DNSProxy)) 188 err = errors.Join(err, removeWorkDir(state)) 189 return err 190} 191 192func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error { 193 if e.cfg.NixCache.UploadURL == "" { 194 return nil 195 } 196 197 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 198 defer cancel() 199 200 if state.Agent != nil { 201 if _, err := state.Agent.Drain(drainCtx); err != nil { 202 return fmt.Errorf("drain guest nix cache uploads: %w", err) 203 } 204 } 205 return nil 206} 207 208func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 209 if state.VM == nil { 210 return nil 211 } 212 213 var err error 214 215 if state.Agent != nil { 216 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 217 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state) 218 cancel() 219 220 err = errors.Join(err, poweroffErr) 221 if poweredOff { 222 return errors.Join(err, closeIO(&state.VM)) 223 } 224 } 225 226 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 227 defer cancel() 228 229 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil { 230 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr) 231 err = errors.Join(err, shutdownErr) 232 } 233 234 return errors.Join(err, closeIO(&state.VM)) 235} 236 237func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) { 238 if err := state.Agent.Poweroff(ctx); err != nil { 239 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err) 240 return false, err 241 } 242 243 if err := state.VM.WaitContext(ctx); err != nil { 244 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err) 245 return false, nil 246 } 247 248 return true, nil 249} 250 251// helper for closing io interfaces, sets to nil to prevent double-close 252func closeIO[T io.Closer](field *T) error { 253 closer := *field 254 var zero T 255 *field = zero 256 if any(closer) == any(zero) { 257 return nil 258 } 259 return closer.Close() 260} 261 262func removeWorkDir(state *workflowState) error { 263 if state.WorkDir == "" { 264 return nil 265 } 266 267 err := os.RemoveAll(state.WorkDir) 268 state.WorkDir = "" 269 return err 270} 271 272// returns a context derived from ctx that is cancelled either when ctx itself 273// is cancelled or when the microVM exits on its own. the returned flag reports 274// whether the VM exited (as opposed to ctx being cancelled for another reason, 275// e.g. the workflow timeout), letting callers tell a crash apart from a 276// timeout. cancel must be called to release the watcher goroutine. 277func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) { 278 exited := &atomic.Bool{} 279 watchCtx, cancel := context.WithCancel(ctx) 280 if vm == nil { 281 return watchCtx, exited, cancel 282 } 283 go func() { 284 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled 285 if watchCtx.Err() == nil { 286 exited.Store(true) 287 cancel() // don't forget to cancel the watchCtx... 288 } 289 }() 290 return watchCtx, exited, cancel 291} 292 293func vmCrashLog(vm VMHandle) string { 294 if vm == nil { 295 return "" 296 } 297 logs := vm.Logs() 298 299 var b strings.Builder 300 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" { 301 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail) 302 } 303 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) { 304 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" { 305 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail) 306 } 307 } 308 return strings.TrimRight(b.String(), "\n") 309} 310 311func tailFile(path string, max int64) string { 312 if path == "" { 313 return "" 314 } 315 f, err := os.Open(path) 316 if err != nil { 317 return "" 318 } 319 defer f.Close() 320 if info, err := f.Stat(); err == nil && info.Size() > max { 321 if _, err := f.Seek(-max, io.SeekEnd); err != nil { 322 return "" 323 } 324 } 325 data, err := io.ReadAll(f) 326 if err != nil { 327 return "" 328 } 329 return strings.TrimSpace(string(data)) 330} 331 332func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) { 333 select { 334 case conn := <-connCh: 335 if conn == nil { 336 return nil, fmt.Errorf("agent connection closed before setup") 337 } 338 return conn, nil 339 case <-ctx.Done(): 340 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err()) 341 } 342} 343 344func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) { 345 if logger == nil { 346 logger = slog.Default() 347 } 348 349 runner, err := runnerFor(cfg.Image.RunnerType) 350 if err != nil { 351 return nil, err 352 } 353 if err := cfg.Image.Validate(); err != nil { 354 return nil, err 355 } 356 if err := cfg.Image.validateImageFiles(); err != nil { 357 return nil, err 358 } 359 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil { 360 return nil, err 361 } 362 363 if err := prepareWorkDir(cfg.WorkDir); err != nil { 364 return nil, err 365 } 366 367 mkfsExt4 := cfg.MkfsExt4 368 if mkfsExt4 == "" { 369 mkfsExt4, err = exec.LookPath("mkfs.ext4") 370 if err != nil { 371 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err) 372 } 373 } 374 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4) 375 if err != nil { 376 return nil, err 377 } 378 379 return runner.Start(ctx, cfg, volumePaths, logger) 380}