Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package microvm 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "math" 13 "net" 14 "os" 15 "os/exec" 16 "path/filepath" 17 "slices" 18 "strings" 19 "sync/atomic" 20 "time" 21 22 "tangled.org/core/spindle/models" 23) 24 25const ( 26 minGuestCID = 3 27 vmCrashLogTailBytes = 4096 28) 29 30func AllocateCID() (uint32, error) { 31 var data [4]byte 32 if _, err := rand.Read(data[:]); err != nil { 33 return 0, fmt.Errorf("allocate guest CID: %w", err) 34 } 35 return minGuestCID + binary.BigEndian.Uint32(data[:])%60000, nil 36} 37 38func prepareWorkDir(workDir string) error { 39 if workDir == "" { 40 return fmt.Errorf("microvm work directory is required") 41 } 42 if err := os.MkdirAll(workDir, 0o755); err != nil { 43 return fmt.Errorf("create microvm work directory: %w", err) 44 } 45 return nil 46} 47 48func prepareVolumes(ctx context.Context, workDir string, volumes []Volume, mkfsExt4 string) (map[string]string, error) { 49 paths := make(map[string]string, len(volumes)) 50 for _, volume := range volumes { 51 if volume.ReadOnly { 52 return nil, fmt.Errorf("read-only microvm volume %q is not supported yet", volume.Image) 53 } 54 if volume.FSType != "ext4" { 55 return nil, fmt.Errorf("microvm volume %q uses unsupported fsType %q", volume.Image, volume.FSType) 56 } 57 if volume.ImageType != "" && volume.ImageType != "raw" { 58 return nil, fmt.Errorf("microvm volume %q uses unsupported imageType %q", volume.Image, volume.ImageType) 59 } 60 61 path := filepath.Join(workDir, filepath.Base(volume.Image)) 62 if err := createSparseFile(path, volume.SizeMiB); err != nil { 63 return nil, err 64 } 65 noJournal := volume.MountPoint == "/workspace" 66 if err := runMkfsExt4(ctx, mkfsExt4, path, noJournal); err != nil { 67 return nil, err 68 } 69 paths[volume.Image] = path 70 } 71 return paths, nil 72} 73 74func createSparseFile(path string, sizeMiB int64) error { 75 if sizeMiB <= 0 { 76 return fmt.Errorf("sparse file %q size must be positive", path) 77 } 78 if sizeMiB > math.MaxInt64/(1024*1024) { 79 return fmt.Errorf("sparse file %q size is too large", path) 80 } 81 file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600) 82 if err != nil { 83 return fmt.Errorf("create sparse file %q: %w", path, err) 84 } 85 defer file.Close() 86 87 if err := file.Truncate(sizeMiB * 1024 * 1024); err != nil { 88 return fmt.Errorf("resize sparse file %q: %w", path, err) 89 } 90 return nil 91} 92 93func runMkfsExt4(ctx context.Context, mkfsExt4, path string, noJournal bool) error { 94 if mkfsExt4 == "" { 95 return fmt.Errorf("mkfs.ext4 path is required") 96 } 97 args := []string{"-F"} 98 if noJournal { 99 args = append(args, "-O", "^has_journal") 100 } 101 args = append(args, path) 102 103 cmd := exec.CommandContext(ctx, mkfsExt4, args...) 104 output, err := cmd.CombinedOutput() 105 if err != nil { 106 return fmt.Errorf("mkfs.ext4 %q: %w: %s", path, err, strings.TrimSpace(string(output))) 107 } 108 return nil 109} 110 111func createParentedFile(path string) (*os.File, error) { 112 if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { 113 return nil, fmt.Errorf("create log directory: %w", err) 114 } 115 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) 116 if err != nil { 117 return nil, fmt.Errorf("create log file %q: %w", path, err) 118 } 119 return file, nil 120} 121 122type VMLogs struct { 123 Serial string 124 Extra map[string]string 125} 126 127type VMHandle interface { 128 Shutdown(ctx context.Context) error 129 WaitContext(ctx context.Context) error 130 Close() error 131 Logs() VMLogs 132 CID() uint32 133 WorkDir() string 134 OOMKilled() bool 135} 136 137type VMConfig struct { 138 Image ImageSpec 139 CID uint32 140 EnableKVM bool 141 WorkDir string 142 Cgroup CgroupLimits 143 144 BootTimeout time.Duration 145 MkfsExt4 string 146 Dev bool 147} 148 149type workflowState struct { 150 ImageSpec ImageSpec 151 ImageSpecPath string 152 Config manifestConfig 153 ConfigKey string 154 Image string 155 CacheReadURLs []string 156 CacheTrustedPublicKeys []string 157 VM VMHandle 158 Agent *AgentSession 159 ReadCache *ReadCacheProxy 160 UploadCache *UploadCacheProxy 161 DNSProxy *DNSProxy 162 WorkDir string 163 NixOSToplevelCache nixosToplevelCacheStore 164} 165 166func (e *Engine) cleanupState(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 167 if state == nil { 168 return nil 169 } 170 171 ctx = context.WithoutCancel(ctx) 172 173 var err error 174 err = errors.Join(err, e.drainNixCache(ctx, state)) 175 err = errors.Join(err, e.shutdownVM(ctx, wid, state)) 176 err = errors.Join(err, closeIO(&state.Agent)) 177 err = errors.Join(err, closeIO(&state.ReadCache)) 178 err = errors.Join(err, closeIO(&state.UploadCache)) 179 err = errors.Join(err, closeIO(&state.DNSProxy)) 180 err = errors.Join(err, removeWorkDir(state)) 181 return err 182} 183 184func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error { 185 if state.Agent == nil || e.cfg.NixCache.UploadURL == "" { 186 return nil 187 } 188 189 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 190 defer cancel() 191 192 if _, err := state.Agent.Drain(drainCtx); err != nil { 193 return fmt.Errorf("drain nix cache: %w", err) 194 } 195 return nil 196} 197 198func (e *Engine) shutdownVM(ctx context.Context, wid models.WorkflowId, state *workflowState) error { 199 if state.VM == nil { 200 return nil 201 } 202 203 var err error 204 205 if state.Agent != nil { 206 gracefulCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 207 poweredOff, poweroffErr := e.poweroffViaAgent(gracefulCtx, wid, state) 208 cancel() 209 210 err = errors.Join(err, poweroffErr) 211 if poweredOff { 212 return errors.Join(err, closeIO(&state.VM)) 213 } 214 } 215 216 fallbackCtx, cancel := context.WithTimeout(ctx, vmShutdownTimeout) 217 defer cancel() 218 219 if shutdownErr := state.VM.Shutdown(fallbackCtx); shutdownErr != nil { 220 e.l.Warn("microVM shutdown fallback failed", "workflow", wid, "error", shutdownErr) 221 err = errors.Join(err, shutdownErr) 222 } 223 224 return errors.Join(err, closeIO(&state.VM)) 225} 226 227func (e *Engine) poweroffViaAgent(ctx context.Context, wid models.WorkflowId, state *workflowState) (bool, error) { 228 if err := state.Agent.Poweroff(ctx); err != nil { 229 e.l.Warn("agent poweroff request failed", "workflow", wid, "error", err) 230 return false, err 231 } 232 233 if err := state.VM.WaitContext(ctx); err != nil { 234 e.l.Warn("agent poweroff did not stop microVM", "workflow", wid, "error", err) 235 return false, nil 236 } 237 238 return true, nil 239} 240 241// helper for closing io interfaces, sets to nil to prevent double-close 242func closeIO[T io.Closer](field *T) error { 243 closer := *field 244 var zero T 245 *field = zero 246 if any(closer) == any(zero) { 247 return nil 248 } 249 return closer.Close() 250} 251 252func removeWorkDir(state *workflowState) error { 253 if state.WorkDir == "" { 254 return nil 255 } 256 257 err := os.RemoveAll(state.WorkDir) 258 state.WorkDir = "" 259 return err 260} 261 262// returns a context derived from ctx that is cancelled either when ctx itself 263// is cancelled or when the microVM exits on its own. the returned flag reports 264// whether the VM exited (as opposed to ctx being cancelled for another reason, 265// e.g. the workflow timeout), letting callers tell a crash apart from a 266// timeout. cancel must be called to release the watcher goroutine. 267func watchVMExit(ctx context.Context, vm VMHandle) (context.Context, *atomic.Bool, context.CancelFunc) { 268 exited := &atomic.Bool{} 269 watchCtx, cancel := context.WithCancel(ctx) 270 if vm == nil { 271 return watchCtx, exited, cancel 272 } 273 go func() { 274 _ = vm.WaitContext(watchCtx) // returns when VM exits or watchCtx is cancelled 275 if watchCtx.Err() == nil { 276 exited.Store(true) 277 cancel() // don't forget to cancel the watchCtx... 278 } 279 }() 280 return watchCtx, exited, cancel 281} 282 283func vmCrashLog(vm VMHandle) string { 284 if vm == nil { 285 return "" 286 } 287 logs := vm.Logs() 288 289 var b strings.Builder 290 if tail := tailFile(logs.Serial, vmCrashLogTailBytes); tail != "" { 291 fmt.Fprintf(&b, "==== serial log ====\n%s\n", tail) 292 } 293 for _, name := range slices.Sorted(maps.Keys(logs.Extra)) { 294 if tail := tailFile(logs.Extra[name], vmCrashLogTailBytes); tail != "" { 295 fmt.Fprintf(&b, "==== %s log ====\n%s\n", name, tail) 296 } 297 } 298 return strings.TrimRight(b.String(), "\n") 299} 300 301func tailFile(path string, max int64) string { 302 if path == "" { 303 return "" 304 } 305 f, err := os.Open(path) 306 if err != nil { 307 return "" 308 } 309 defer f.Close() 310 if info, err := f.Stat(); err == nil && info.Size() > max { 311 if _, err := f.Seek(-max, io.SeekEnd); err != nil { 312 return "" 313 } 314 } 315 data, err := io.ReadAll(f) 316 if err != nil { 317 return "" 318 } 319 return strings.TrimSpace(string(data)) 320} 321 322func waitAgentConn(ctx context.Context, connCh <-chan net.Conn) (net.Conn, error) { 323 select { 324 case conn := <-connCh: 325 if conn == nil { 326 return nil, fmt.Errorf("agent connection closed before setup") 327 } 328 return conn, nil 329 case <-ctx.Done(): 330 return nil, fmt.Errorf("waiting for agent: %w", ctx.Err()) 331 } 332} 333 334func StartVM(ctx context.Context, cfg VMConfig, logger *slog.Logger) (VMHandle, error) { 335 if logger == nil { 336 logger = slog.Default() 337 } 338 339 runner, err := runnerFor(cfg.Image.RunnerType) 340 if err != nil { 341 return nil, err 342 } 343 if err := cfg.Image.Validate(); err != nil { 344 return nil, err 345 } 346 if err := cfg.Image.validateImageFiles(); err != nil { 347 return nil, err 348 } 349 if err := runner.Validate(cfg.Image, cfg.EnableKVM); err != nil { 350 return nil, err 351 } 352 353 if err := prepareWorkDir(cfg.WorkDir); err != nil { 354 return nil, err 355 } 356 357 mkfsExt4 := cfg.MkfsExt4 358 if mkfsExt4 == "" { 359 mkfsExt4, err = exec.LookPath("mkfs.ext4") 360 if err != nil { 361 return nil, fmt.Errorf("mkfs.ext4 command not found in PATH: %w", err) 362 } 363 } 364 volumePaths, err := prepareVolumes(ctx, cfg.WorkDir, cfg.Image.Volumes, mkfsExt4) 365 if err != nil { 366 return nil, err 367 } 368 369 return runner.Start(ctx, cfg, volumePaths, logger) 370}