Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

spindle/microvm: fix "path is not valid" on nix cache pushes, make cache drain not fatal

Signed-off-by: dawn <dawn@tangled.org>

+176 -6
+5 -1
spindle/engines/microvm/engine.go
··· 454 454 } 455 455 456 456 if err := e.drainNixCache(ctx, state); err != nil { 457 - return fmt.Errorf("drain config cache uploads before metadata commit: %w", err) 457 + // a partial upload would leave the cache unable to realize this toplevel, 458 + // so skip the metadata commit rather than poison it with an un-realizable 459 + // key. the config still activated fine, so don't fail the workflow. 460 + e.l.Warn("cache drain failed; skipping config cache metadata commit", "workflow", wid, "configKey", configKey, "toplevel", result.Toplevel, "error", err) 461 + return nil 458 462 } 459 463 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil { 460 464 return err
+4
spindle/engines/microvm/upload_cache_narinfo.go
··· 15 15 URL string 16 16 NarHash string 17 17 NarSize int64 18 + // paths this path depends on 19 + References []string 18 20 } 19 21 20 22 const ( ··· 62 64 return nil, fmt.Errorf("invalid NarSize %q: %w", value, err) 63 65 } 64 66 info.NarSize = n 67 + case "References": 68 + info.References = strings.Fields(value) 65 69 } 66 70 } 67 71 if err := scanner.Err(); err != nil {
+107 -4
spindle/engines/microvm/upload_cache_nix_store.go
··· 278 278 return 279 279 } 280 280 281 - if _, err := writeFileAtomic(dst, ".tmp-narinfo", func(f *os.File) (int64, error) { 282 - n, err := f.Write(body) 283 - return int64(n), err 284 - }); err != nil { 281 + if _, err := writeNarinfoFile(dst, body); err != nil { 285 282 b.logger.Warn("stage narinfo upload failed", "path", relPath, "error", err) 286 283 http.Error(w, "internal error", http.StatusInternalServerError) 287 284 return ··· 346 343 return filepath.Join(b.stagingDir, local), nil 347 344 } 348 345 346 + // makes the full reference graph of rootStorePath resolvable in the staging 347 + // cache. `nix copy` computes the closure from the --from store, so every 348 + // referenced narinfo must be present there or the walk fails with "path ... is 349 + // not valid". newly-built deps are already staged by the guest, but deps that 350 + // live only in a read cache were skipped during upload, so we backfill their 351 + // narinfos here. only the narinfos (the reference graph) are needed: the 352 + // destination supplies the NAR data via --substitute-on-destination. 353 + func (b *NixStoreUploadBackend) ensureClosureStaged(ctx context.Context, rootStorePath string) error { 354 + visited := map[string]bool{} 355 + queue := []string{rootStorePath} 356 + for len(queue) > 0 { 357 + storePath := queue[0] 358 + queue = queue[1:] 359 + if visited[storePath] { 360 + continue 361 + } 362 + visited[storePath] = true 363 + 364 + info, err := b.resolveStagedNarinfo(ctx, storePath) 365 + if err != nil { 366 + // the root must resolve (the guest just staged it); a dep we can't 367 + // find anywhere is left for `nix copy` to surface with its own error. 368 + if storePath == rootStorePath { 369 + return fmt.Errorf("resolve narinfo for %s: %w", storePath, err) 370 + } 371 + b.logger.Warn("closure dep narinfo unresolved; leaving to nix copy", "storePath", storePath, "error", err) 372 + continue 373 + } 374 + 375 + for _, ref := range info.References { 376 + refPath := storePrefix + ref 377 + if refPath == storePath { 378 + continue // self-reference 379 + } 380 + if !visited[refPath] { 381 + queue = append(queue, refPath) 382 + } 383 + } 384 + } 385 + return nil 386 + } 387 + 388 + // returns parsed narinfo for store path, backfilling from readUpstreams if not found 389 + func (b *NixStoreUploadBackend) resolveStagedNarinfo(ctx context.Context, storePath string) (*narinfo, error) { 390 + hash, _, err := parseStorePath(storePath) 391 + if err != nil { 392 + return nil, err 393 + } 394 + localPath := filepath.Join(b.stagingDir, hash+".narinfo") 395 + info, err := readNarinfoFile(localPath) 396 + if err == nil { 397 + return info, nil 398 + } 399 + if !errors.Is(err, os.ErrNotExist) { 400 + return nil, err 401 + } 402 + 403 + // dep missing from staging because it was skipped during upload 404 + // (lives on a read cache) so we backfill it from readUpstreams. 405 + body, err := b.fetchUpstreamNarinfo(ctx, hash) 406 + if err != nil { 407 + return nil, err 408 + } 409 + written, err := writeNarinfoFile(localPath, body) 410 + if err != nil { 411 + return nil, err 412 + } 413 + b.logger.Debug("backfilled narinfo", "hash", hash, "bytes", written) 414 + 415 + return parseNarinfo(bytes.NewReader(body)) 416 + } 417 + 418 + // fetches narinfo from readUpstreams 419 + func (b *NixStoreUploadBackend) fetchUpstreamNarinfo(ctx context.Context, hash string) ([]byte, error) { 420 + if len(b.readUpstreams) == 0 { 421 + return nil, os.ErrNotExist 422 + } 423 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://upstream/"+hash+".narinfo", nil) 424 + if err != nil { 425 + return nil, err 426 + } 427 + resp, err := newNarinfoExistenceTransport(b.readUpstreams, b.logger).RoundTrip(req) 428 + if err != nil { 429 + return nil, err 430 + } 431 + defer resp.Body.Close() 432 + if resp.StatusCode == http.StatusNotFound { 433 + return nil, os.ErrNotExist 434 + } 435 + if resp.StatusCode != http.StatusOK { 436 + return nil, fmt.Errorf("upstream narinfo %s: status %d", hash, resp.StatusCode) 437 + } 438 + return io.ReadAll(io.LimitReader(resp.Body, maxNarinfoSize+1)) 439 + } 440 + 349 441 // todo(dawn): ideally we don't use `nix copy` here but instead have our own 350 442 // `nix copy` impl so we don't need nix on host. but that's a far stretch goal :p 351 443 func (b *NixStoreUploadBackend) importStorePath(ctx context.Context, storePath string) error { 444 + if err := b.ensureClosureStaged(ctx, storePath); err != nil { 445 + return fmt.Errorf("stage closure for %s: %w", storePath, err) 446 + } 447 + 352 448 fromURL := url.URL{Scheme: "file", Path: b.stagingDir} 353 449 args := []string{ 354 450 "copy", ··· 375 471 } 376 472 defer f.Close() 377 473 return parseNarinfo(f) 474 + } 475 + 476 + func writeNarinfoFile(path string, body []byte) (int64, error) { 477 + return writeFileAtomic(path, ".tmp-narinfo", func(f *os.File) (int64, error) { 478 + n, err := f.Write(body) 479 + return int64(n), err 480 + }) 378 481 } 379 482 380 483 func writeFileAtomic(dst, tempPrefix string, write func(*os.File) (int64, error)) (written int64, err error) {
+56
spindle/engines/microvm/upload_cache_nix_store_test.go
··· 223 223 } 224 224 } 225 225 226 + func TestNixStoreBackendBackfillsClosureDepNarinfo(t *testing.T) { 227 + const depHash = "abcdfghijklmnpqrsvwxyz0123456789" 228 + depNarinfo := "StorePath: /nix/store/" + depHash + "-dep\nURL: nar/dep.nar.zst\nNarHash: sha256:dep\nNarSize: 1\n" 229 + 230 + var depRequests int 231 + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 232 + if r.URL.Path == "/"+depHash+".narinfo" { 233 + depRequests++ 234 + _, _ = io.WriteString(w, depNarinfo) 235 + return 236 + } 237 + w.WriteHeader(http.StatusNotFound) 238 + })) 239 + defer upstream.Close() 240 + 241 + upURL, err := url.Parse(upstream.URL) 242 + if err != nil { 243 + t.Fatal(err) 244 + } 245 + 246 + runner := &fakeRunner{} 247 + staging := t.TempDir() 248 + b, err := newNixStoreUploadBackend("ssh-ng://cache-host", staging, []CacheUpstream{{url: upURL}}, slog.Default(), runner) 249 + if err != nil { 250 + t.Fatalf("newNixStoreUploadBackend: %v", err) 251 + } 252 + 253 + mustUploadNar(t, b, "foo.nar.zst", "nar-body") 254 + 255 + // the top path references the dep, which is absent from staging. 256 + narinfo := "StorePath: " + testStorePath + "\nURL: nar/foo.nar.zst\nNarHash: sha256:abc\nNarSize: 123\nReferences: " + depHash + "-dep\n" 257 + rec := httptest.NewRecorder() 258 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/"+testStoreHash+".narinfo", strings.NewReader(narinfo))) 259 + if rec.Code != http.StatusOK { 260 + t.Fatalf("PUT narinfo status: got %d, want 200; body=%q", rec.Code, rec.Body.String()) 261 + } 262 + 263 + // the dep narinfo must have been fetched from the upstream and staged... 264 + if depRequests == 0 { 265 + t.Fatalf("expected the dep narinfo to be fetched from the upstream") 266 + } 267 + staged, err := os.ReadFile(filepath.Join(staging, depHash+".narinfo")) 268 + if err != nil { 269 + t.Fatalf("dep narinfo not backfilled into staging: %v", err) 270 + } 271 + if string(staged) != depNarinfo { 272 + t.Fatalf("backfilled dep narinfo contents: got %q, want %q", string(staged), depNarinfo) 273 + } 274 + 275 + // ...and the import still copies just the requested top path. 276 + calls := runner.Calls() 277 + if len(calls) != 1 || calls[0][len(calls[0])-1] != testStorePath { 278 + t.Fatalf("expected a single nix copy for %s, got %v", testStorePath, calls) 279 + } 280 + } 281 + 226 282 func TestNixStoreBackendRemovesNarinfoOnImportFailure(t *testing.T) { 227 283 runner := &fakeRunner{nextErr: errors.New("nix copy failed")} 228 284 b, staging := newTestNixStoreBackend(t, "ssh://cache-host", runner)
+4 -1
spindle/engines/microvm/vm.go
··· 171 171 ctx = context.WithoutCancel(ctx) 172 172 173 173 var err error 174 - err = errors.Join(err, e.drainNixCache(ctx, state)) 174 + // todo(dawn): expose this error to the user as a warning 175 + if drainErr := e.drainNixCache(ctx, state); drainErr != nil { 176 + e.l.Warn("cache drain failed during cleanup; continuing", "workflow", wid, "error", drainErr) 177 + } 175 178 err = errors.Join(err, e.shutdownVM(ctx, wid, state)) 176 179 err = errors.Join(err, closeIO(&state.Agent)) 177 180 err = errors.Join(err, closeIO(&state.ReadCache))