Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

spindle/microvm: support ssh/ssh-ng and local nix stores as upload cache target

Signed-off-by: dawn <dawn@tangled.org>

author
dawn
committer
Tangled
date (Jun 19, 2026, 2:27 PM +0300) commit 07598d9e parent 099bdb5b change-id ktvvvntn
+1296 -115
+27 -16
cmd/spindle-microvm-run/main_linux.go
··· 10 10 "log/slog" 11 11 "net" 12 12 "os" 13 + "path/filepath" 13 14 "time" 14 15 15 16 "github.com/mdlayher/vsock" ··· 74 75 Usage: "timeout for the guest command", 75 76 }, 76 77 &cli.DurationFlag{ 77 - Name: "cache-drain-timeout", 78 - Value: 5 * time.Minute, 79 - Usage: "how long to wait for queued cache uploads after the guest command exits", 78 + Name: "cache-upload-wait-timeout", 79 + Aliases: []string{"cache-drain-timeout"}, 80 + Value: 5 * time.Minute, 81 + Usage: "how long to wait for guest cache uploads to finish after the command exits", 80 82 }, 81 83 &cli.DurationFlag{ 82 84 Name: "shutdown-timeout", ··· 177 179 } 178 180 jobID := "spindle-microvm-run" 179 181 execID := "dev-1" 182 + var pendingConfigKey string 183 + var pendingConfigToplevel string 184 + var configCacheDB *db.DB 180 185 181 186 fmt.Fprintf(os.Stderr, "listening for agent on %s\n", ln.Addr()) 182 187 conn, err := acceptExpectedVsockConn(ln, vm.CID(), logger) ··· 203 208 var uploadCache *microvm.UploadCacheProxy 204 209 if cmd.String("cache-upload-url") != "" { 205 210 var err error 206 - uploadCache, err = microvm.StartUploadCacheProxy(ctx, vm.CID(), cmd.String("cache-upload-url"), upstreams, logger) 211 + uploadCache, err = microvm.StartUploadCacheProxy(ctx, vm.CID(), cmd.String("cache-upload-url"), upstreams, filepath.Join(vm.WorkDir(), "upload-cache"), logger) 207 212 if err != nil { 208 213 return fmt.Errorf("start upload cache proxy: %w", err) 209 214 } ··· 243 248 return fmt.Errorf("calculate base config hash: %w", err) 244 249 } 245 250 246 - var d *db.DB 247 251 var configKey string 248 252 var cachedToplevel string 249 253 if cmd.String("db") != "" { 250 - d, err = db.Make(ctx, cmd.String("db")) 254 + configCacheDB, err = db.Make(ctx, cmd.String("db")) 251 255 if err != nil { 252 256 return fmt.Errorf("failed to open database: %w", err) 253 257 } 254 - defer d.Close() 258 + defer configCacheDB.Close() 255 259 256 260 configKey, err = microvm.BuildConfigKey(imageSpec, cmd.String("activate-config")) 257 261 if err != nil { 258 262 return fmt.Errorf("calculate config key: %w", err) 259 263 } 260 264 261 - record, err := d.GetNixOSToplevelCacheRecord(configKey) 265 + record, err := configCacheDB.GetNixOSToplevelCacheRecord(configKey) 262 266 if err != nil { 263 267 if !errors.Is(err, sql.ErrNoRows) { 264 268 return fmt.Errorf("lookup config cache: %w", err) ··· 280 284 } 281 285 fmt.Fprintf(os.Stderr, "activated config toplevel: %s\n", result.Toplevel) 282 286 283 - if d != nil && cachedToplevel == "" && result.Toplevel != "" && configKey != "" { 284 - err = d.SaveNixOSToplevelCacheRecord(configKey, result.Toplevel) 285 - if err != nil { 286 - return fmt.Errorf("save config cache: %w", err) 287 + if configCacheDB != nil && cachedToplevel == "" && result.Toplevel != "" && configKey != "" { 288 + if uploadCache == nil { 289 + fmt.Fprintln(os.Stderr, "skipping config cache metadata commit: no cache upload url configured") 290 + } else { 291 + pendingConfigKey = configKey 292 + pendingConfigToplevel = result.Toplevel 287 293 } 288 294 } 289 295 } ··· 302 308 } 303 309 304 310 if uploadCache != nil { 305 - drainCtx := ctx 306 - if cmd.Duration("cache-drain-timeout") > 0 { 311 + uploadWaitCtx := ctx 312 + if cmd.Duration("cache-upload-wait-timeout") > 0 { 307 313 var cancel context.CancelFunc 308 - drainCtx, cancel = context.WithTimeout(ctx, cmd.Duration("cache-drain-timeout")) 314 + uploadWaitCtx, cancel = context.WithTimeout(ctx, cmd.Duration("cache-upload-wait-timeout")) 309 315 defer cancel() 310 316 } 311 - uploaded, err := session.Drain(drainCtx) 317 + uploaded, err := session.Drain(uploadWaitCtx) 312 318 if err != nil { 313 319 return err 314 320 } 315 321 fmt.Printf("cache uploaded: %d\n", uploaded) 322 + if configCacheDB != nil && pendingConfigKey != "" && pendingConfigToplevel != "" { 323 + if err := configCacheDB.SaveNixOSToplevelCacheRecord(pendingConfigKey, pendingConfigToplevel); err != nil { 324 + return fmt.Errorf("save config cache: %w", err) 325 + } 326 + } 316 327 } 317 328 318 329 // mirror the engine shutdown order: ask the agent to power off first,
+30 -3
spindle/engines/microvm/README.md
··· 188 188 Teardown is same whether the workflow succeeded, failed or timed out: drain the 189 189 guest's pending Nix cache uploads, ask the agent to power off and wait for QEMU 190 190 to exit (falling back to QMP `system_powerdown` and finally a kill if it 191 - doesn't), then close the proxies and remove the work directory. 191 + doesn't), then close the proxies and remove the work directory. For non-HTTP 192 + upload targets the host-side import already happened synchronously when the 193 + guest committed each narinfo, so there is no second host-side cache drain step 194 + at teardown. 192 195 193 196 ### Nix cache 194 197 ··· 204 207 The upload proxy goes the other way: paths built inside the guest are pushed to 205 208 spindle's configured upload cache (if any) so the next workflow that needs them 206 209 doesn't rebuild. Paths already present on any configured read cache are skipped. 207 - The agent queues built paths and they're uploaded eagerly as they appear; any 208 - still in flight at teardown block the drain step until they finish. 210 + 211 + For `http://` and `https://` upload targets the proxy just reverse-proxies the 212 + guest's binary-cache upload traffic to the configured remote cache, while still 213 + answering narinfo existence checks across the upload target plus the read 214 + caches. 215 + 216 + For `ssh://`, `ssh-ng://`, `daemon`, and `local` targets spindle implements the 217 + small HTTP binary-cache upload surface itself. It stages uploaded `nar/` objects 218 + and narinfos under the workflow workdir, validates the narinfo, then treats the 219 + narinfo upload as the commit point: once `<hash>.narinfo` is written spindle 220 + runs: 221 + 222 + ```bash 223 + nix copy \ 224 + --from file://<staging-dir> \ 225 + --to <target-store> \ 226 + --no-check-sigs \ 227 + --substitute-on-destination \ 228 + <store-path> 229 + ``` 230 + 231 + That copy is synchronous. If it fails, spindle removes the staged narinfo again 232 + so future `GET`/`HEAD <hash>.narinfo` requests do not falsely dedupe a path that 233 + never made it to the destination store. The guest still only ever sees the same 234 + HTTP binary-cache upload protocol over vsock; it never gets direct access to 235 + SSH credentials or the destination store itself.
+4 -4
spindle/engines/microvm/engine.go
··· 8 8 "io" 9 9 "log/slog" 10 10 "os" 11 + "path/filepath" 11 12 "slices" 12 13 "sync" 13 14 "sync/atomic" ··· 231 232 return err 232 233 } 233 234 state.ReadCache = readCache 234 - uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, l) 235 + stagingDir := filepath.Join(workDir, "upload-cache") 236 + uploadCache, err := StartUploadCacheProxy(ctx, cid, e.cfg.NixCache.UploadURL, upstreams, stagingDir, l) 235 237 if err != nil { 236 238 return err 237 239 } ··· 451 453 return nil 452 454 } 453 455 454 - drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 455 - defer cancel() 456 - if _, err := state.Agent.Drain(drainCtx); err != nil { 456 + if err := e.drainNixCache(ctx, state); err != nil { 457 457 return fmt.Errorf("drain config cache uploads before metadata commit: %w", err) 458 458 } 459 459 if err := state.NixOSToplevelCache.Commit(configKey, result.Toplevel); err != nil {
+80 -4
spindle/engines/microvm/test-spindle-microvm.sh
··· 15 15 declare -a TEST_NAMES=() 16 16 declare -a TEST_STATUSES=() 17 17 declare -a TEST_TIMES=() 18 + SKIP_TEST_RC=200 18 19 19 20 get_time_ms() { 20 21 local t="${EPOCHREALTIME:-}" ··· 46 47 log "test summary" 47 48 echo "=========================================" 48 49 local passed_count=0 50 + local skipped_count=0 49 51 local failed_count=0 50 52 local total_time=0 51 53 for i in "${!TEST_NAMES[@]}"; do ··· 59 61 if [ "$status" = "Failed" ]; then 60 62 status_color="\033[0;31m" 61 63 failed_count=$((failed_count + 1)) 64 + elif [ "$status" = "Skipped" ]; then 65 + status_color="\033[0;33m" 66 + skipped_count=$((skipped_count + 1)) 62 67 else 63 68 passed_count=$((passed_count + 1)) 64 69 fi ··· 70 75 local total_tests="${#TEST_NAMES[@]}" 71 76 local total_time_str 72 77 total_time_str=$(format_duration "$total_time") 73 - printf " total: %d tests, %d passed, %d failed\n" "$total_tests" "$passed_count" "$failed_count" 78 + printf " total: %d tests, %d passed, %d skipped, %d failed\n" "$total_tests" "$passed_count" "$skipped_count" "$failed_count" 74 79 printf " total execution time: %s\n" "$total_time_str" 75 80 echo "=========================================" 76 81 } 77 82 83 + skip_test() { 84 + echo "skipped: $*" 85 + return "$SKIP_TEST_RC" 86 + } 87 + 88 + host_is_nixos() { 89 + [ -e /etc/NIXOS ] && return 0 90 + [ -r /etc/os-release ] && grep -q '^ID=nixos$' /etc/os-release 91 + } 92 + 78 93 JOBS="${JOBS:-4}" 79 94 while [[ $# -gt 0 ]]; do 80 95 case "$1" in ··· 125 140 log "setup local cache & temp environment" 126 141 TEMP_DIR=$(mktemp -d -t test-spindle-microvm-XXXXXX) 127 142 143 + if [ ! -e /dev/vsock ]; then 144 + echo "error: /dev/vsock is missing; run sudo modprobe vhost_vsock" >&2 145 + exit 1 146 + fi 147 + 128 148 log "build spindle & microvm image tarball" 129 149 nix develop --command go build -o spindle/spindle-microvm-run ./cmd/spindle-microvm-run 130 150 TARBALL_PATH=$(nix build .#spindle-nixos-image-tarball --no-link --print-out-paths) ··· 201 221 local name="" 202 222 local timeout="60s" 203 223 local upload=0 224 + local upload_url="" 204 225 local activate="" 205 226 local no_cache=0 206 227 local db="" ··· 224 245 upload=1 225 246 shift 226 247 ;; 248 + --upload-url) 249 + upload=1 250 + upload_url="$2" 251 + shift 2 252 + ;; 227 253 --activate) 228 254 activate="$2" 229 255 shift 2 ··· 266 292 fi 267 293 268 294 if [ "$upload" -eq 1 ]; then 295 + if [ -z "$upload_url" ]; then 296 + upload_url="$CACHE_UPLOAD_URL?secret-key=$CACHE_SECRET_KEY_PATH" 297 + fi 269 298 args+=( 270 - --cache-upload-url "$CACHE_UPLOAD_URL?secret-key=$CACHE_SECRET_KEY_PATH" 299 + --cache-upload-url "$upload_url" 271 300 ) 272 301 fi 273 302 ··· 305 334 log "[$name] start (vsock port $port)" 306 335 307 336 local status="Passed" 308 - if ! "$func" > "$logfile" 2>&1; then 309 - status="Failed" 337 + if "$func" > "$logfile" 2>&1; then 338 + status="Passed" 339 + else 340 + local rc=$? 341 + if [ "$rc" -eq "$SKIP_TEST_RC" ]; then 342 + status="Skipped" 343 + else 344 + status="Failed" 345 + fi 310 346 fi 311 347 312 348 local duration_ms=$(($(get_time_ms) - start)) ··· 316 352 duration_str=$(format_duration "$duration_ms") 317 353 if [ "$status" = "Failed" ]; then 318 354 printf "\n\033[0;31m>>> [%s] FAILED (%s)\033[0m\n" "$name" "$duration_str" 355 + strip_ansi "$logfile" || true 356 + elif [ "$status" = "Skipped" ]; then 357 + printf "\n\033[0;33m>>> [%s] skipped (%s)\033[0m\n" "$name" "$duration_str" 319 358 strip_ansi "$logfile" || true 320 359 else 321 360 printf "\n\033[0;32m>>> [%s] passed (%s)\033[0m\n" "$name" "$duration_str" ··· 435 474 return 1 436 475 fi 437 476 echo "success: store path uploaded to cache" 477 + } 478 + 479 + test_ssh_store_upload() { 480 + if ! host_is_nixos; then 481 + skip_test "ssh store upload smoke only runs on nixos hosts" 482 + fi 483 + 484 + run_ssh_store_upload_case() { 485 + local target="$1" 486 + local label="$2" 487 + local name="uploaded-test-file-${label}" 488 + local content="hello from vm upload via ${label}" 489 + local out 490 + out=$(run_vm --name "$label" --timeout "180s" --upload-url "$target" -- /run/current-system/sw/bin/bash -l -c "nix-build -E 'with import <nixpkgs> {}; writeText \"$name\" \"$content\"' --no-out-link") || return 1 491 + 492 + local built_path 493 + built_path=$(echo "$out" | strip_ansi | grep -v '\.drv' | grep -o "/nix/store/[a-z0-9]*-${name}" | head -n 1 || true) 494 + if [ -z "$built_path" ]; then 495 + echo "error: could not find built store path in vm output for $label" >&2 496 + echo "$out" | strip_ansi >&2 497 + return 1 498 + fi 499 + if [ ! -e "$built_path" ]; then 500 + echo "error: uploaded path missing from host store for $label: $built_path" >&2 501 + return 1 502 + fi 503 + if [ "$(cat "$built_path")" != "$content" ]; then 504 + echo "error: uploaded path content mismatch for $label" >&2 505 + echo "path=$built_path" >&2 506 + return 1 507 + fi 508 + echo "success: store path uploaded to $target via spindle" 509 + } 510 + 511 + run_ssh_store_upload_case "ssh-ng://localhost" "ssh-ng-upload" 512 + run_ssh_store_upload_case "ssh://localhost" "ssh-upload" 438 513 } 439 514 440 515 test_networking() { ··· 831 906 test_alpine_nix 832 907 test_realize 833 908 test_build_upload 909 + test_ssh_store_upload 834 910 test_networking 835 911 test_substitution_and_no_upload 836 912 test_activation_services
+101
spindle/engines/microvm/upload_cache_http.go
··· 1 + package microvm 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "io" 7 + "log/slog" 8 + "net/http" 9 + "net/http/httputil" 10 + "net/url" 11 + "strings" 12 + ) 13 + 14 + // httpUploadBackend reverse-proxies guest binary-cache upload traffic to an 15 + // http(s) upload cache such as ncps. 16 + type httpUploadBackend struct { 17 + handler http.Handler 18 + } 19 + 20 + func newHTTPUploadProxyBackend(target *url.URL, readUpstreams []CacheUpstream, logger *slog.Logger) *httpUploadBackend { 21 + return &httpUploadBackend{handler: uploadProxyHandler(target, readUpstreams, logger)} 22 + } 23 + 24 + func (b *httpUploadBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) { 25 + b.handler.ServeHTTP(w, r) 26 + } 27 + 28 + func (b *httpUploadBackend) Close() error { return nil } 29 + 30 + func uploadProxyHandler(target *url.URL, readUpstreams []CacheUpstream, logger *slog.Logger) http.Handler { 31 + rp := httputil.NewSingleHostReverseProxy(target) 32 + rp.ErrorLog = slog.NewLogLogger(logger.Handler(), slog.LevelError) 33 + 34 + origDirector := rp.Director 35 + rp.Director = func(req *http.Request) { 36 + origDirector(req) 37 + // ensure host matches target 38 + req.Host = target.Host 39 + // the transport doesn't turn URL userinfo into basic auth, only 40 + // http.Client does, so do it ourselves 41 + if user := target.User; user != nil { 42 + password, _ := user.Password() 43 + req.SetBasicAuth(user.Username(), password) 44 + } 45 + } 46 + 47 + // before uploading, nix copy asks the destination whether it already has each 48 + // path by GET/HEAD-ing <hash>.narinfo and skips the ones it does. we answer 49 + // that check across the upload target *and* the read caches: if any of them 50 + // already serves the path there is no point uploading it (the guest would 51 + // just substitute it from there anyway). 52 + narinfoUpstreams := append([]CacheUpstream{{url: target}}, readUpstreams...) 53 + exists := newNarinfoExistenceTransport(narinfoUpstreams, logger) 54 + 55 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 56 + if isNarinfoExistenceCheck(r) { 57 + serveNarinfoExistence(w, r, exists, logger) 58 + return 59 + } 60 + rp.ServeHTTP(w, r) 61 + }) 62 + } 63 + 64 + func newNarinfoExistenceTransport(upstreams []CacheUpstream, logger *slog.Logger) http.RoundTripper { 65 + return &parallelRacingTransport{ 66 + upstreams: upstreams, 67 + underlying: proxyTransport, 68 + guardedUnderlying: guardedProxyTransport, 69 + logger: logger, 70 + } 71 + } 72 + 73 + func isNarinfoExistenceCheck(r *http.Request) bool { 74 + if r.Method != http.MethodGet && r.Method != http.MethodHead { 75 + return false 76 + } 77 + return strings.HasSuffix(r.URL.Path, ".narinfo") 78 + } 79 + 80 + func serveNarinfoExistence(w http.ResponseWriter, r *http.Request, exists http.RoundTripper, logger *slog.Logger) { 81 + probe := r.Clone(r.Context()) 82 + probe.RequestURI = "" 83 + 84 + resp, err := exists.RoundTrip(probe) 85 + if err != nil { 86 + logger.Warn("upload proxy narinfo check failed, treating as not present", "path", r.URL.Path, "error", err) 87 + w.WriteHeader(http.StatusNotFound) 88 + return 89 + } 90 + defer resp.Body.Close() 91 + 92 + for key, values := range resp.Header { 93 + for _, value := range values { 94 + w.Header().Add(key, value) 95 + } 96 + } 97 + w.WriteHeader(resp.StatusCode) 98 + if _, err := io.Copy(w, resp.Body); err != nil && !errors.Is(err, context.Canceled) { 99 + logger.Warn("upload proxy narinfo copy failed", "path", r.URL.Path, "error", err) 100 + } 101 + }
+124
spindle/engines/microvm/upload_cache_narinfo.go
··· 1 + package microvm 2 + 3 + import ( 4 + "bufio" 5 + "fmt" 6 + "io" 7 + "path/filepath" 8 + "regexp" 9 + "strconv" 10 + "strings" 11 + ) 12 + 13 + type narinfo struct { 14 + StorePath string 15 + URL string 16 + NarHash string 17 + NarSize int64 18 + } 19 + 20 + const ( 21 + maxNarinfoSize = 1 << 20 // 1 MiB 22 + storePrefix = "/nix/store/" 23 + maxNarinfoLineLen = maxNarinfoSize 24 + ) 25 + 26 + var nixStorePathBaseRe = regexp.MustCompile(`^[0-9abcdfghijklmnpqrsvwxyz]{32}-[^/]+$`) 27 + 28 + // parseNarinfo parses and validates a narinfo body. 29 + // - required fields must be present 30 + // - StorePath must be under /nix/store/ 31 + // - URL must be a relative, traversal-safe path referencing a NAR in the 32 + // same staging cache 33 + // - NarSize must be a non-negative integer 34 + func parseNarinfo(r io.Reader) (*narinfo, error) { 35 + lr := io.LimitReader(r, maxNarinfoSize+1) 36 + scanner := bufio.NewScanner(lr) 37 + scanner.Buffer(make([]byte, 4096), maxNarinfoLineLen) 38 + 39 + var info narinfo 40 + for scanner.Scan() { 41 + line := scanner.Text() 42 + if line == "" { 43 + continue 44 + } 45 + key, value, ok := strings.Cut(line, ":") 46 + if !ok { 47 + return nil, fmt.Errorf("invalid narinfo line %q", line) 48 + } 49 + key = strings.TrimSpace(key) 50 + value = strings.TrimSpace(value) 51 + 52 + switch key { 53 + case "StorePath": 54 + info.StorePath = value 55 + case "URL": 56 + info.URL = value 57 + case "NarHash": 58 + info.NarHash = value 59 + case "NarSize": 60 + n, err := strconv.ParseInt(value, 10, 64) 61 + if err != nil { 62 + return nil, fmt.Errorf("invalid NarSize %q: %w", value, err) 63 + } 64 + info.NarSize = n 65 + } 66 + } 67 + if err := scanner.Err(); err != nil { 68 + return nil, fmt.Errorf("read narinfo: %w", err) 69 + } 70 + 71 + if err := validateNarinfo(&info); err != nil { 72 + return nil, err 73 + } 74 + return &info, nil 75 + } 76 + 77 + func validateNarinfo(info *narinfo) error { 78 + if info.StorePath == "" { 79 + return fmt.Errorf("narinfo missing StorePath") 80 + } 81 + if _, _, err := parseStorePath(info.StorePath); err != nil { 82 + return fmt.Errorf("invalid StorePath: %w", err) 83 + } 84 + if info.URL == "" { 85 + return fmt.Errorf("narinfo missing URL") 86 + } 87 + if strings.HasPrefix(info.URL, "/") || strings.Contains(info.URL, "..") { 88 + return fmt.Errorf("narinfo URL %q is not a safe relative path", info.URL) 89 + } 90 + if !strings.HasPrefix(info.URL, "nar/") { 91 + return fmt.Errorf("narinfo URL %q must reference a staged nar/ object", info.URL) 92 + } 93 + name := strings.TrimPrefix(info.URL, "nar/") 94 + if name == "" || name == "." || name != filepath.Base(name) || strings.Contains(name, "/") { 95 + return fmt.Errorf("narinfo URL %q is not a safe nar object path", info.URL) 96 + } 97 + if info.NarHash == "" { 98 + return fmt.Errorf("narinfo missing NarHash") 99 + } 100 + if info.NarSize < 0 { 101 + return fmt.Errorf("narinfo NarSize must be non-negative") 102 + } 103 + return nil 104 + } 105 + 106 + func parseStorePath(path string) (hash string, name string, err error) { 107 + if !strings.HasPrefix(path, storePrefix) { 108 + return "", "", fmt.Errorf("store path %q does not start with %q", path, storePrefix) 109 + } 110 + 111 + base := strings.TrimPrefix(path, storePrefix) 112 + if base == "" || strings.Contains(base, "/") { 113 + return "", "", fmt.Errorf("store path %q has invalid base name", path) 114 + } 115 + if !nixStorePathBaseRe.MatchString(base) { 116 + return "", "", fmt.Errorf("store path %q is not a valid nix store path", path) 117 + } 118 + 119 + hash, name, ok := strings.Cut(base, "-") 120 + if !ok || hash == "" || name == "" { 121 + return "", "", fmt.Errorf("store path %q is missing hash or name", path) 122 + } 123 + return hash, name, nil 124 + }
+436
spindle/engines/microvm/upload_cache_nix_store.go
··· 1 + package microvm 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "net/url" 12 + "os" 13 + "os/exec" 14 + "path/filepath" 15 + "strings" 16 + ) 17 + 18 + // we have an interface for running commands so we can swap it in tests 19 + type CommandRunner interface { 20 + Run(ctx context.Context, name string, args ...string) error 21 + } 22 + 23 + type execRunner struct{} 24 + 25 + func (execRunner) Run(ctx context.Context, name string, args ...string) error { 26 + // nosemgrep: go.lang.security.audit.dangerous-exec-command.dangerous-exec-command 27 + cmd := exec.CommandContext(ctx, name, args...) 28 + out, err := cmd.CombinedOutput() 29 + if err != nil { 30 + return fmt.Errorf("%s %s: %w\n%s", name, strings.Join(args, " "), err, string(out)) 31 + } 32 + return nil 33 + } 34 + 35 + const ( 36 + nixStoreCacheInfo = "StoreDir: /nix/store\nWantMassQuery: 1\nPriority: 50\n" 37 + maxNarUploadSize = 5 << 30 // 5gib 38 + ) 39 + 40 + type NixStoreUploadBackend struct { 41 + stagingDir string 42 + targetStore string 43 + readUpstreams []CacheUpstream 44 + logger *slog.Logger 45 + runner CommandRunner 46 + maxNarUploadSize int64 47 + } 48 + 49 + func newNixStoreUploadBackend(targetStore, stagingDir string, readUpstreams []CacheUpstream, logger *slog.Logger, runner CommandRunner) (*NixStoreUploadBackend, error) { 50 + absStaging, err := filepath.Abs(stagingDir) 51 + if err != nil { 52 + return nil, fmt.Errorf("resolve staging dir %q: %w", stagingDir, err) 53 + } 54 + if logger == nil { 55 + logger = slog.Default() 56 + } 57 + 58 + if err := os.MkdirAll(filepath.Join(absStaging, "nar"), 0o755); err != nil { 59 + return nil, fmt.Errorf("create staging cache directories: %w", err) 60 + } 61 + infoPath := filepath.Join(absStaging, "nix-cache-info") 62 + if _, err := os.Stat(infoPath); errors.Is(err, os.ErrNotExist) { 63 + if err := os.WriteFile(infoPath, []byte(nixStoreCacheInfo), 0o644); err != nil { 64 + return nil, fmt.Errorf("write nix-cache-info: %w", err) 65 + } 66 + } 67 + 68 + if runner == nil { 69 + runner = execRunner{} 70 + } 71 + 72 + return &NixStoreUploadBackend{ 73 + stagingDir: absStaging, 74 + targetStore: targetStore, 75 + readUpstreams: readUpstreams, 76 + logger: logger, 77 + runner: runner, 78 + maxNarUploadSize: maxNarUploadSize, 79 + }, nil 80 + } 81 + 82 + func (b *NixStoreUploadBackend) Close() error { return nil } 83 + 84 + func (b *NixStoreUploadBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) { 85 + relPath, err := normalizeUploadCachePath(r.URL.Path) 86 + if err != nil { 87 + b.logger.Warn("refusing upload cache request with unsafe path", "path", r.URL.Path, "error", err) 88 + http.Error(w, "invalid path", http.StatusBadRequest) 89 + return 90 + } 91 + 92 + switch r.Method { 93 + case http.MethodGet, http.MethodHead: 94 + switch { 95 + case relPath == "nix-cache-info": 96 + b.serveCacheInfo(w, r) 97 + return 98 + case isNarinfoObjectPath(relPath): 99 + b.serveNarinfo(w, r, relPath) 100 + return 101 + } 102 + 103 + case http.MethodPut: 104 + switch { 105 + case relPath == "nix-cache-info": 106 + b.putCacheInfo(w, r) 107 + return 108 + case isNarObjectPath(relPath): 109 + b.putNar(w, r, relPath) 110 + return 111 + case isNarinfoObjectPath(relPath): 112 + b.putNarinfo(w, r, relPath) 113 + return 114 + } 115 + } 116 + 117 + http.Error(w, "not found", http.StatusNotFound) 118 + } 119 + 120 + func (b *NixStoreUploadBackend) serveCacheInfo(w http.ResponseWriter, r *http.Request) { 121 + w.Header().Set("Content-Type", "text/x-nix-cache-info") 122 + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(nixStoreCacheInfo))) 123 + if r.Method == http.MethodHead { 124 + w.WriteHeader(http.StatusOK) 125 + return 126 + } 127 + _, _ = w.Write([]byte(nixStoreCacheInfo)) 128 + } 129 + 130 + func (b *NixStoreUploadBackend) putCacheInfo(w http.ResponseWriter, r *http.Request) { 131 + _, _ = io.Copy(io.Discard, io.LimitReader(r.Body, int64(len(nixStoreCacheInfo))+1)) 132 + w.WriteHeader(http.StatusOK) 133 + } 134 + 135 + func (b *NixStoreUploadBackend) serveNarinfo(w http.ResponseWriter, r *http.Request, relPath string) { 136 + localPath, err := b.stagingObjectPath(relPath) 137 + if err != nil { 138 + b.logger.Warn("refusing narinfo request with unsafe path", "path", relPath, "error", err) 139 + http.Error(w, "invalid path", http.StatusBadRequest) 140 + return 141 + } 142 + 143 + fi, err := os.Stat(localPath) 144 + if err == nil && !fi.IsDir() { 145 + if _, err := readNarinfoFile(localPath); err != nil { 146 + b.logger.Warn("staged narinfo is invalid", "path", relPath, "error", err) 147 + http.Error(w, "invalid staged narinfo", http.StatusInternalServerError) 148 + return 149 + } 150 + b.serveLocalFile(w, r, localPath, fi) 151 + return 152 + } 153 + if !errors.Is(err, os.ErrNotExist) { 154 + b.logger.Warn("stat staged narinfo failed", "path", relPath, "error", err) 155 + } 156 + 157 + if len(b.readUpstreams) > 0 { 158 + probe := r.Clone(r.Context()) 159 + probe.URL.Path = "/" + relPath 160 + serveNarinfoExistence(w, probe, newNarinfoExistenceTransport(b.readUpstreams, b.logger), b.logger) 161 + return 162 + } 163 + 164 + http.Error(w, "not found", http.StatusNotFound) 165 + } 166 + 167 + func (b *NixStoreUploadBackend) serveLocalFile(w http.ResponseWriter, r *http.Request, localPath string, fi os.FileInfo) { 168 + w.Header().Set("Content-Type", "text/x-nix-narinfo") 169 + w.Header().Set("Content-Length", fmt.Sprintf("%d", fi.Size())) 170 + w.Header().Set("Last-Modified", fi.ModTime().UTC().Format(http.TimeFormat)) 171 + 172 + if r.Method == http.MethodHead { 173 + w.WriteHeader(http.StatusOK) 174 + return 175 + } 176 + 177 + f, err := os.Open(localPath) 178 + if err != nil { 179 + b.logger.Warn("open staged narinfo failed", "path", localPath, "error", err) 180 + http.Error(w, "not found", http.StatusNotFound) 181 + return 182 + } 183 + defer f.Close() 184 + w.WriteHeader(http.StatusOK) 185 + if _, err := io.Copy(w, f); err != nil && !errors.Is(err, context.Canceled) { 186 + b.logger.Warn("copy staged narinfo failed", "path", localPath, "error", err) 187 + } 188 + } 189 + 190 + func (b *NixStoreUploadBackend) putNar(w http.ResponseWriter, r *http.Request, relPath string) { 191 + name := strings.TrimPrefix(relPath, "nar/") 192 + dst, err := b.stagingObjectPath(relPath) 193 + if err != nil { 194 + b.logger.Warn("refusing nar upload with unsafe path", "name", name, "error", err) 195 + http.Error(w, "invalid nar path", http.StatusBadRequest) 196 + return 197 + } 198 + r.Body = http.MaxBytesReader(w, r.Body, b.maxNarUploadSize) 199 + 200 + var copyErr error 201 + written, err := writeFileAtomic(dst, ".tmp-nar", func(f *os.File) (int64, error) { 202 + n, err := io.Copy(f, r.Body) 203 + copyErr = err 204 + return n, err 205 + }) 206 + if err != nil { 207 + b.logger.Warn("stage nar upload failed", "name", name, "error", err) 208 + var maxErr *http.MaxBytesError 209 + if errors.As(err, &maxErr) { 210 + http.Error(w, "nar too large", http.StatusRequestEntityTooLarge) 211 + return 212 + } 213 + if copyErr != nil { 214 + http.Error(w, "upload failed", http.StatusBadRequest) 215 + return 216 + } 217 + http.Error(w, "internal error", http.StatusInternalServerError) 218 + return 219 + } 220 + 221 + b.logger.Debug("staged nar", "name", name, "bytes", written) 222 + w.WriteHeader(http.StatusOK) 223 + } 224 + 225 + func (b *NixStoreUploadBackend) putNarinfo(w http.ResponseWriter, r *http.Request, relPath string) { 226 + body, err := io.ReadAll(io.LimitReader(r.Body, maxNarinfoSize+1)) 227 + if err != nil { 228 + b.logger.Warn("read narinfo body failed", "path", relPath, "error", err) 229 + http.Error(w, "upload failed", http.StatusBadRequest) 230 + return 231 + } 232 + if len(body) > maxNarinfoSize { 233 + b.logger.Warn("narinfo body exceeds maximum size", "path", relPath, "bytes", len(body)) 234 + http.Error(w, "narinfo too large", http.StatusBadRequest) 235 + return 236 + } 237 + 238 + info, err := parseNarinfo(bytes.NewReader(body)) 239 + if err != nil { 240 + b.logger.Warn("refusing narinfo upload with invalid body", "path", relPath, "error", err) 241 + http.Error(w, "invalid narinfo: "+err.Error(), http.StatusBadRequest) 242 + return 243 + } 244 + storePathHash, _, err := parseStorePath(info.StorePath) 245 + if err != nil { 246 + b.logger.Warn("refusing narinfo upload with invalid store path", "path", relPath, "storePath", info.StorePath, "error", err) 247 + http.Error(w, "invalid StorePath", http.StatusBadRequest) 248 + return 249 + } 250 + fileHash := strings.TrimSuffix(filepath.Base(relPath), ".narinfo") 251 + if fileHash != storePathHash { 252 + b.logger.Warn("refusing narinfo upload with mismatched filename hash", "path", relPath, "storePath", info.StorePath) 253 + http.Error(w, "narinfo filename does not match StorePath hash", http.StatusBadRequest) 254 + return 255 + } 256 + if !isNarObjectPath(info.URL) { 257 + b.logger.Warn("narinfo references invalid nar URL", "path", relPath, "url", info.URL) 258 + http.Error(w, "invalid nar URL", http.StatusBadRequest) 259 + return 260 + } 261 + 262 + narPath, err := b.stagingObjectPath(info.URL) 263 + if err != nil { 264 + b.logger.Warn("narinfo references unsafe nar URL", "path", relPath, "url", info.URL, "error", err) 265 + http.Error(w, "invalid nar URL", http.StatusBadRequest) 266 + return 267 + } 268 + if _, err := os.Stat(narPath); err != nil { 269 + b.logger.Warn("narinfo references missing nar", "path", relPath, "url", info.URL, "error", err) 270 + http.Error(w, "referenced nar does not exist", http.StatusBadRequest) 271 + return 272 + } 273 + 274 + dst, err := b.stagingObjectPath(relPath) 275 + if err != nil { 276 + b.logger.Warn("refusing narinfo upload with unsafe path", "path", relPath, "error", err) 277 + http.Error(w, "invalid path", http.StatusBadRequest) 278 + return 279 + } 280 + 281 + if _, err := writeFileAtomic(dst, ".tmp-narinfo", func(f *os.File) (int64, error) { 282 + n, err := f.Write(body) 283 + return int64(n), err 284 + }); err != nil { 285 + b.logger.Warn("stage narinfo upload failed", "path", relPath, "error", err) 286 + http.Error(w, "internal error", http.StatusInternalServerError) 287 + return 288 + } 289 + 290 + if err := b.importStorePath(r.Context(), info.StorePath); err != nil { 291 + b.logger.Warn("import staged narinfo failed", "path", relPath, "storePath", info.StorePath, "error", err) 292 + if cleanupErr := removeFileAndSyncDir(dst); cleanupErr != nil { 293 + b.logger.Error("remove staged narinfo after failed import", "path", relPath, "error", cleanupErr) 294 + } 295 + http.Error(w, "import failed", http.StatusBadGateway) 296 + return 297 + } 298 + 299 + b.logger.Debug("staged narinfo", "path", relPath, "storePath", info.StorePath) 300 + w.WriteHeader(http.StatusOK) 301 + } 302 + 303 + func normalizeUploadCachePath(path string) (string, error) { 304 + if path == "" || path == "/" { 305 + return "", fmt.Errorf("empty path") 306 + } 307 + if !strings.HasPrefix(path, "/") { 308 + return "", fmt.Errorf("path must start with /") 309 + } 310 + if strings.Contains(path, "..") { 311 + return "", fmt.Errorf("path traversal") 312 + } 313 + 314 + return strings.TrimPrefix(path, "/"), nil 315 + } 316 + 317 + func isNarinfoObjectPath(relPath string) bool { 318 + if !strings.HasSuffix(relPath, ".narinfo") { 319 + return false 320 + } 321 + if relPath != filepath.Base(relPath) { 322 + return false 323 + } 324 + base := filepath.Base(relPath) 325 + return base != "" && base != "." && base != ".narinfo" 326 + } 327 + 328 + func isNarObjectPath(relPath string) bool { 329 + if !strings.HasPrefix(relPath, "nar/") { 330 + return false 331 + } 332 + name := strings.TrimPrefix(relPath, "nar/") 333 + return name != "" && name != "." && name == filepath.Base(name) && !strings.Contains(name, "/") 334 + } 335 + 336 + func (b *NixStoreUploadBackend) stagingObjectPath(relPath string) (string, error) { 337 + if !isNarObjectPath(relPath) && !isNarinfoObjectPath(relPath) { 338 + return "", fmt.Errorf("invalid cache object path %q", relPath) 339 + } 340 + 341 + local, err := filepath.Localize(relPath) 342 + if err != nil { 343 + return "", fmt.Errorf("unsafe cache object path %q: %w", relPath, err) 344 + } 345 + 346 + return filepath.Join(b.stagingDir, local), nil 347 + } 348 + 349 + // todo(dawn): ideally we don't use `nix copy` here but instead have our own 350 + // `nix copy` impl so we don't need nix on host. but that's a far stretch goal :p 351 + func (b *NixStoreUploadBackend) importStorePath(ctx context.Context, storePath string) error { 352 + fromURL := url.URL{Scheme: "file", Path: b.stagingDir} 353 + args := []string{ 354 + "copy", 355 + "--from", fromURL.String(), 356 + "--to", b.targetStore, 357 + // todo(dawn): ideally we support signing in spindle itself. 358 + // but for now harmonia can sign things on serve so this is ok. 359 + "--no-check-sigs", 360 + "--substitute-on-destination", 361 + storePath, 362 + } 363 + 364 + b.logger.Info("importing staged cache path", "target", b.targetStore, "storePath", storePath) 365 + if err := b.runner.Run(ctx, "nix", args...); err != nil { 366 + return fmt.Errorf("nix copy to %s: %w", b.targetStore, err) 367 + } 368 + return nil 369 + } 370 + 371 + func readNarinfoFile(path string) (*narinfo, error) { 372 + f, err := os.Open(path) 373 + if err != nil { 374 + return nil, err 375 + } 376 + defer f.Close() 377 + return parseNarinfo(f) 378 + } 379 + 380 + func writeFileAtomic(dst, tempPrefix string, write func(*os.File) (int64, error)) (written int64, err error) { 381 + dir := filepath.Dir(dst) 382 + if err := os.MkdirAll(dir, 0o755); err != nil { 383 + return 0, fmt.Errorf("create directory %q: %w", dir, err) 384 + } 385 + 386 + tmp, err := os.CreateTemp(dir, tempPrefix+"-*-"+filepath.Base(dst)) 387 + if err != nil { 388 + return 0, fmt.Errorf("create temporary file in %q: %w", dir, err) 389 + } 390 + tmpName := tmp.Name() 391 + defer func() { 392 + if err != nil { 393 + _ = tmp.Close() 394 + _ = os.Remove(tmpName) 395 + } 396 + }() 397 + 398 + written, err = write(tmp) 399 + if err != nil { 400 + return 0, err 401 + } 402 + if err := tmp.Sync(); err != nil { 403 + return 0, fmt.Errorf("fsync temporary file %q: %w", tmpName, err) 404 + } 405 + if err := tmp.Close(); err != nil { 406 + return 0, fmt.Errorf("close temporary file %q: %w", tmpName, err) 407 + } 408 + if err := os.Rename(tmpName, dst); err != nil { 409 + return 0, fmt.Errorf("rename %q to %q: %w", tmpName, dst, err) 410 + } 411 + 412 + if err := syncDir(dir); err != nil { 413 + return 0, err 414 + } 415 + 416 + return written, nil 417 + } 418 + 419 + func removeFileAndSyncDir(path string) error { 420 + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { 421 + return fmt.Errorf("remove %q: %w", path, err) 422 + } 423 + return syncDir(filepath.Dir(path)) 424 + } 425 + 426 + func syncDir(dir string) error { 427 + dirFile, err := os.Open(dir) 428 + if err != nil { 429 + return fmt.Errorf("open directory %q: %w", dir, err) 430 + } 431 + defer dirFile.Close() 432 + if err := dirFile.Sync(); err != nil { 433 + return fmt.Errorf("sync directory %q: %w", dir, err) 434 + } 435 + return nil 436 + }
+438
spindle/engines/microvm/upload_cache_nix_store_test.go
··· 1 + package microvm 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "net/http/httptest" 12 + "net/url" 13 + "os" 14 + "path/filepath" 15 + "slices" 16 + "strings" 17 + "sync" 18 + "testing" 19 + ) 20 + 21 + const ( 22 + testStoreHash = "0123456789abcdfghijklmnpqrsvwxyz" 23 + testStorePath = "/nix/store/" + testStoreHash + "-abc-output" 24 + ) 25 + 26 + func TestUploadCacheBackendSchemeDispatch(t *testing.T) { 27 + staging := t.TempDir() 28 + logger := slog.Default() 29 + 30 + cases := []struct { 31 + uploadURL string 32 + wantErr bool 33 + wantType string 34 + }{ 35 + {"https://cache.example/upload", false, "*microvm.httpUploadBackend"}, 36 + {"http://cache.example/upload", false, "*microvm.httpUploadBackend"}, 37 + {"ssh://cache-host", false, "*microvm.NixStoreUploadBackend"}, 38 + {"ssh-ng://cache-host", false, "*microvm.NixStoreUploadBackend"}, 39 + {"daemon", false, "*microvm.NixStoreUploadBackend"}, 40 + {"local", false, "*microvm.NixStoreUploadBackend"}, 41 + {"ftp://cache.example", true, ""}, 42 + {"/some/path", true, ""}, 43 + } 44 + 45 + for _, tc := range cases { 46 + t.Run(tc.uploadURL, func(t *testing.T) { 47 + backend, err := newUploadCacheBackend(tc.uploadURL, nil, staging, logger) 48 + if tc.wantErr { 49 + if err == nil { 50 + t.Fatalf("expected error for %q", tc.uploadURL) 51 + } 52 + return 53 + } 54 + if err != nil { 55 + t.Fatalf("unexpected error: %v", err) 56 + } 57 + got := fmt.Sprintf("%T", backend) 58 + if got != tc.wantType { 59 + t.Fatalf("backend type: got %s, want %s", got, tc.wantType) 60 + } 61 + }) 62 + } 63 + } 64 + 65 + func TestUploadCacheBackendEmptyURL(t *testing.T) { 66 + backend, err := newUploadCacheBackend("", nil, t.TempDir(), slog.Default()) 67 + if err != nil { 68 + t.Fatalf("unexpected error: %v", err) 69 + } 70 + if backend != nil { 71 + t.Fatalf("expected nil backend for empty URL, got %T", backend) 72 + } 73 + } 74 + 75 + func newTestNixStoreBackend(t *testing.T, target string, runner CommandRunner) (*NixStoreUploadBackend, string) { 76 + t.Helper() 77 + staging := t.TempDir() 78 + if target == "" { 79 + target = "ssh-ng://cache-host" 80 + } 81 + b, err := newNixStoreUploadBackend(target, staging, nil, slog.Default(), runner) 82 + if err != nil { 83 + t.Fatalf("newNixStoreUploadBackend: %v", err) 84 + } 85 + return b, staging 86 + } 87 + 88 + func mustUploadNar(t *testing.T, b *NixStoreUploadBackend, name, body string) { 89 + t.Helper() 90 + rec := httptest.NewRecorder() 91 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/nar/"+name, strings.NewReader(body))) 92 + if rec.Code != http.StatusOK { 93 + t.Fatalf("upload nar %q: got %d, want 200; body=%q", name, rec.Code, rec.Body.String()) 94 + } 95 + } 96 + 97 + func TestNixStoreBackendNixCacheInfo(t *testing.T) { 98 + b, _ := newTestNixStoreBackend(t, "", nil) 99 + 100 + rec := httptest.NewRecorder() 101 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/nix-cache-info", nil)) 102 + if rec.Code != http.StatusOK { 103 + t.Fatalf("GET status: got %d, want 200", rec.Code) 104 + } 105 + if !strings.Contains(rec.Body.String(), "StoreDir: /nix/store") { 106 + t.Fatalf("cache info missing StoreDir: %q", rec.Body.String()) 107 + } 108 + 109 + rec = httptest.NewRecorder() 110 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodHead, "/nix-cache-info", nil)) 111 + if rec.Code != http.StatusOK { 112 + t.Fatalf("HEAD status: got %d, want 200", rec.Code) 113 + } 114 + if rec.Body.Len() != 0 { 115 + t.Fatalf("HEAD body should be empty, got %q", rec.Body.String()) 116 + } 117 + 118 + rec = httptest.NewRecorder() 119 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/nix-cache-info", strings.NewReader("ignored"))) 120 + if rec.Code != http.StatusOK { 121 + t.Fatalf("PUT /nix-cache-info status: got %d, want 200", rec.Code) 122 + } 123 + } 124 + 125 + func TestNixStoreBackendRejectsTraversalNar(t *testing.T) { 126 + b, staging := newTestNixStoreBackend(t, "", nil) 127 + 128 + rec := httptest.NewRecorder() 129 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/nar/../../evil", strings.NewReader("bad"))) 130 + if rec.Code != http.StatusBadRequest { 131 + t.Fatalf("traversal nar status: got %d, want 400", rec.Code) 132 + } 133 + 134 + if _, err := os.Stat(filepath.Join(filepath.Dir(staging), "evil")); !errors.Is(err, os.ErrNotExist) { 135 + t.Fatalf("traversal nar escaped staging dir: %v", err) 136 + } 137 + } 138 + 139 + func TestNixStoreBackendRejectsOversizedNarUpload(t *testing.T) { 140 + b, staging := newTestNixStoreBackend(t, "", nil) 141 + b.maxNarUploadSize = 3 142 + 143 + rec := httptest.NewRecorder() 144 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/nar/foo.nar", strings.NewReader("four"))) 145 + if rec.Code != http.StatusRequestEntityTooLarge { 146 + t.Fatalf("oversized nar status: got %d, want 413; body=%q", rec.Code, rec.Body.String()) 147 + } 148 + 149 + if _, err := os.Stat(filepath.Join(staging, "nar", "foo.nar")); !errors.Is(err, os.ErrNotExist) { 150 + t.Fatalf("oversized nar should not have been staged: %v", err) 151 + } 152 + } 153 + 154 + func TestNixStoreBackendNarinfoRequiresExistingNar(t *testing.T) { 155 + b, _ := newTestNixStoreBackend(t, "", nil) 156 + 157 + narinfo := "StorePath: " + testStorePath + "\nURL: nar/abc.nar.zst\nNarHash: sha256:abc\nNarSize: 123\n" 158 + rec := httptest.NewRecorder() 159 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/"+testStoreHash+".narinfo", strings.NewReader(narinfo))) 160 + if rec.Code != http.StatusBadRequest { 161 + t.Fatalf("narinfo before nar status: got %d, want 400; body=%q", rec.Code, rec.Body.String()) 162 + } 163 + } 164 + 165 + type fakeRunner struct { 166 + mu sync.Mutex 167 + calls [][]string 168 + nextErr error 169 + } 170 + 171 + func (f *fakeRunner) Run(ctx context.Context, name string, args ...string) error { 172 + f.mu.Lock() 173 + defer f.mu.Unlock() 174 + call := append([]string{name}, args...) 175 + f.calls = append(f.calls, call) 176 + return f.nextErr 177 + } 178 + 179 + func (f *fakeRunner) Calls() [][]string { 180 + f.mu.Lock() 181 + defer f.mu.Unlock() 182 + return slices.Clone(f.calls) 183 + } 184 + 185 + func TestNixStoreBackendImportsNarinfoImmediately(t *testing.T) { 186 + runner := &fakeRunner{} 187 + b, staging := newTestNixStoreBackend(t, "ssh-ng://spindle-upload@cache-host", runner) 188 + 189 + mustUploadNar(t, b, "foo.nar.zst", "nar-body") 190 + 191 + narinfo := "StorePath: " + testStorePath + "\nURL: nar/foo.nar.zst\nNarHash: sha256:abc\nNarSize: 123\n" 192 + rec := httptest.NewRecorder() 193 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/"+testStoreHash+".narinfo", strings.NewReader(narinfo))) 194 + if rec.Code != http.StatusOK { 195 + t.Fatalf("PUT narinfo status: got %d, want 200; body=%q", rec.Code, rec.Body.String()) 196 + } 197 + 198 + calls := runner.Calls() 199 + if len(calls) != 1 { 200 + t.Fatalf("expected 1 nix copy call, got %d", len(calls)) 201 + } 202 + call := calls[0] 203 + wantFrom := (&url.URL{Scheme: "file", Path: staging}).String() 204 + want := []string{ 205 + "nix", 206 + "copy", 207 + "--from", wantFrom, 208 + "--to", "ssh-ng://spindle-upload@cache-host", 209 + "--no-check-sigs", 210 + "--substitute-on-destination", 211 + testStorePath, 212 + } 213 + if !slices.Equal(call, want) { 214 + t.Fatalf("nix copy args:\n got: %v\nwant: %v", call, want) 215 + } 216 + 217 + data, err := os.ReadFile(filepath.Join(staging, testStoreHash+".narinfo")) 218 + if err != nil { 219 + t.Fatalf("staged narinfo missing: %v", err) 220 + } 221 + if string(data) != narinfo { 222 + t.Fatalf("staged narinfo contents: got %q, want %q", string(data), narinfo) 223 + } 224 + } 225 + 226 + func TestNixStoreBackendRemovesNarinfoOnImportFailure(t *testing.T) { 227 + runner := &fakeRunner{nextErr: errors.New("nix copy failed")} 228 + b, staging := newTestNixStoreBackend(t, "ssh://cache-host", runner) 229 + 230 + mustUploadNar(t, b, "foo.nar.zst", "nar-body") 231 + 232 + narinfo := "StorePath: " + testStorePath + "\nURL: nar/foo.nar.zst\nNarHash: sha256:abc\nNarSize: 123\n" 233 + rec := httptest.NewRecorder() 234 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/"+testStoreHash+".narinfo", strings.NewReader(narinfo))) 235 + if rec.Code != http.StatusBadGateway { 236 + t.Fatalf("failed import status: got %d, want 502; body=%q", rec.Code, rec.Body.String()) 237 + } 238 + 239 + if _, err := os.Stat(filepath.Join(staging, testStoreHash+".narinfo")); !errors.Is(err, os.ErrNotExist) { 240 + t.Fatalf("narinfo should be removed after failed import: %v", err) 241 + } 242 + } 243 + 244 + func TestNixStoreBackendNarinfoReadUpstream(t *testing.T) { 245 + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 246 + if r.URL.Path == "/miss.narinfo" { 247 + w.WriteHeader(http.StatusNotFound) 248 + return 249 + } 250 + _, _ = io.WriteString(w, "StorePath: /nix/store/upstream\nURL: nar/upstream.nar\nNarHash: sha256:up\nNarSize: 1\n") 251 + })) 252 + defer upstream.Close() 253 + 254 + upURL, err := url.Parse(upstream.URL) 255 + if err != nil { 256 + t.Fatal(err) 257 + } 258 + 259 + staging := t.TempDir() 260 + b, err := newNixStoreUploadBackend("ssh://cache-host", staging, []CacheUpstream{{url: upURL}}, slog.Default(), nil) 261 + if err != nil { 262 + t.Fatalf("newNixStoreUploadBackend: %v", err) 263 + } 264 + 265 + rec := httptest.NewRecorder() 266 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/present.narinfo", nil)) 267 + if rec.Code != http.StatusOK { 268 + t.Fatalf("GET upstream-present narinfo status: got %d, want 200", rec.Code) 269 + } 270 + if !strings.Contains(rec.Body.String(), "/nix/store/upstream") { 271 + t.Fatalf("unexpected upstream narinfo body: %q", rec.Body.String()) 272 + } 273 + 274 + rec = httptest.NewRecorder() 275 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/miss.narinfo", nil)) 276 + if rec.Code != http.StatusNotFound { 277 + t.Fatalf("GET upstream-missing narinfo status: got %d, want 404", rec.Code) 278 + } 279 + 280 + } 281 + 282 + func TestNixStoreBackendRejectsInvalidLocalNarinfo(t *testing.T) { 283 + b, staging := newTestNixStoreBackend(t, "", nil) 284 + 285 + if err := os.WriteFile(filepath.Join(staging, testStoreHash+".narinfo"), []byte("not-a-narinfo\n"), 0o644); err != nil { 286 + t.Fatalf("write invalid staged narinfo: %v", err) 287 + } 288 + 289 + rec := httptest.NewRecorder() 290 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/"+testStoreHash+".narinfo", nil)) 291 + if rec.Code != http.StatusInternalServerError { 292 + t.Fatalf("invalid local narinfo status: got %d, want 500; body=%q", rec.Code, rec.Body.String()) 293 + } 294 + } 295 + 296 + func TestNixStoreBackendNarinfoValidation(t *testing.T) { 297 + b, _ := newTestNixStoreBackend(t, "", nil) 298 + 299 + mustUploadNar(t, b, "x.nar", "x") 300 + 301 + cases := []struct { 302 + name string 303 + body string 304 + wantErr string 305 + }{ 306 + { 307 + name: "missing StorePath", 308 + body: "URL: nar/x.nar\nNarHash: sha256:x\nNarSize: 1\n", 309 + wantErr: "StorePath", 310 + }, 311 + { 312 + name: "bad StorePath", 313 + body: "StorePath: /tmp/evil\nURL: nar/x.nar\nNarHash: sha256:x\nNarSize: 1\n", 314 + wantErr: "invalid StorePath", 315 + }, 316 + { 317 + name: "malformed StorePath", 318 + body: "StorePath: /nix/store/not-a-real-store-path\nURL: nar/x.nar\nNarHash: sha256:x\nNarSize: 1\n", 319 + wantErr: "invalid StorePath", 320 + }, 321 + { 322 + name: "missing URL", 323 + body: "StorePath: " + testStorePath + "\nNarHash: sha256:x\nNarSize: 1\n", 324 + wantErr: "URL", 325 + }, 326 + { 327 + name: "absolute URL", 328 + body: "StorePath: " + testStorePath + "\nURL: /etc/passwd\nNarHash: sha256:x\nNarSize: 1\n", 329 + wantErr: "URL", 330 + }, 331 + { 332 + name: "traversal URL", 333 + body: "StorePath: " + testStorePath + "\nURL: nar/../../etc/passwd\nNarHash: sha256:x\nNarSize: 1\n", 334 + wantErr: "URL", 335 + }, 336 + { 337 + name: "non nar URL", 338 + body: "StorePath: " + testStorePath + "\nURL: nix-cache-info\nNarHash: sha256:x\nNarSize: 1\n", 339 + wantErr: "nar/", 340 + }, 341 + { 342 + name: "nested nar URL", 343 + body: "StorePath: " + testStorePath + "\nURL: nar/dir/x.nar\nNarHash: sha256:x\nNarSize: 1\n", 344 + wantErr: "safe nar object path", 345 + }, 346 + { 347 + name: "missing NarHash", 348 + body: "StorePath: " + testStorePath + "\nURL: nar/x.nar\nNarSize: 1\n", 349 + wantErr: "NarHash", 350 + }, 351 + { 352 + name: "bad NarSize", 353 + body: "StorePath: " + testStorePath + "\nURL: nar/x.nar\nNarHash: sha256:x\nNarSize: huge\n", 354 + wantErr: "NarSize", 355 + }, 356 + } 357 + 358 + for _, tc := range cases { 359 + t.Run(tc.name, func(t *testing.T) { 360 + rec := httptest.NewRecorder() 361 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/"+testStoreHash+".narinfo", strings.NewReader(tc.body))) 362 + if rec.Code != http.StatusBadRequest { 363 + t.Fatalf("status: got %d, want 400; body=%q", rec.Code, rec.Body.String()) 364 + } 365 + if !strings.Contains(rec.Body.String(), tc.wantErr) { 366 + t.Fatalf("body %q should mention %q", rec.Body.String(), tc.wantErr) 367 + } 368 + }) 369 + } 370 + } 371 + 372 + func TestNixStoreBackendUploadCacheInfoFileExists(t *testing.T) { 373 + staging := t.TempDir() 374 + if _, err := newNixStoreUploadBackend("ssh://host", staging, nil, slog.Default(), nil); err != nil { 375 + t.Fatalf("newNixStoreUploadBackend: %v", err) 376 + } 377 + data, err := os.ReadFile(filepath.Join(staging, "nix-cache-info")) 378 + if err != nil { 379 + t.Fatalf("nix-cache-info missing: %v", err) 380 + } 381 + if !bytes.Contains(data, []byte("StoreDir: /nix/store")) { 382 + t.Fatalf("unexpected nix-cache-info: %q", string(data)) 383 + } 384 + } 385 + 386 + func TestNixStoreBackendRejectsTraversalNarinfoPath(t *testing.T) { 387 + b, staging := newTestNixStoreBackend(t, "", nil) 388 + 389 + body := "StorePath: " + testStorePath + "\nURL: nar/x.nar\nNarHash: sha256:x\nNarSize: 1\n" 390 + rec := httptest.NewRecorder() 391 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/../etc/passwd.narinfo", strings.NewReader(body))) 392 + if rec.Code != http.StatusBadRequest { 393 + t.Fatalf("traversal narinfo status: got %d, want 400", rec.Code) 394 + } 395 + 396 + if _, err := os.Stat(filepath.Join(filepath.Dir(staging), "etc", "passwd.narinfo")); !errors.Is(err, os.ErrNotExist) { 397 + t.Fatalf("traversal narinfo escaped staging dir: %v", err) 398 + } 399 + } 400 + 401 + func TestNixStoreBackendRejectsNarinfoFilenameHashMismatch(t *testing.T) { 402 + b, _ := newTestNixStoreBackend(t, "", nil) 403 + mustUploadNar(t, b, "x.nar", "x") 404 + 405 + body := "StorePath: " + testStorePath + "\nURL: nar/x.nar\nNarHash: sha256:x\nNarSize: 1\n" 406 + rec := httptest.NewRecorder() 407 + b.ServeHTTP(rec, httptest.NewRequest(http.MethodPut, "/11111111111111111111111111111111.narinfo", strings.NewReader(body))) 408 + if rec.Code != http.StatusBadRequest { 409 + t.Fatalf("mismatched narinfo status: got %d, want 400; body=%q", rec.Code, rec.Body.String()) 410 + } 411 + if !strings.Contains(rec.Body.String(), "filename does not match") { 412 + t.Fatalf("unexpected body: %q", rec.Body.String()) 413 + } 414 + } 415 + 416 + func TestParseNarinfoAcceptsLargeReferencesLine(t *testing.T) { 417 + var refs []string 418 + for range 12000 { 419 + refs = append(refs, "0123456789abcdfghijklmnpqrsvwxy-ref") 420 + } 421 + 422 + body := strings.Join([]string{ 423 + "StorePath: " + testStorePath, 424 + "URL: nar/x.nar", 425 + "NarHash: sha256:abc", 426 + "NarSize: 1", 427 + "References: " + strings.Join(refs, " "), 428 + "", 429 + }, "\n") 430 + 431 + info, err := parseNarinfo(strings.NewReader(body)) 432 + if err != nil { 433 + t.Fatalf("parseNarinfo failed for large references line: %v", err) 434 + } 435 + if info.StorePath != testStorePath { 436 + t.Fatalf("StorePath: got %q, want %q", info.StorePath, testStorePath) 437 + } 438 + }
+51 -85
spindle/engines/microvm/upload_cache_proxy.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 - "io" 8 7 "log/slog" 9 8 "net" 10 9 "net/http" 11 - "net/http/httputil" 12 10 "net/url" 13 11 "strings" 14 12 "time" ··· 16 14 "github.com/mdlayher/vsock" 17 15 ) 18 16 17 + type UploadCacheBackend interface { 18 + http.Handler 19 + Close() error 20 + } 21 + 19 22 type UploadCacheProxy struct { 20 23 port uint32 21 24 22 - ln *vsock.Listener 23 - server *http.Server 25 + ln *vsock.Listener 26 + server *http.Server 27 + backend UploadCacheBackend 24 28 } 25 29 26 - func StartUploadCacheProxy(ctx context.Context, cid uint32, uploadURL string, readUpstreams []CacheUpstream, logger *slog.Logger) (*UploadCacheProxy, error) { 30 + func StartUploadCacheProxy(ctx context.Context, cid uint32, uploadURL string, readUpstreams []CacheUpstream, stagingDir string, logger *slog.Logger) (*UploadCacheProxy, error) { 27 31 if strings.TrimSpace(uploadURL) == "" { 28 32 return nil, nil 29 33 } ··· 33 37 } 34 38 logger = logger.With("where", "upload_cache_proxy", "cid", cid, "uploadURL", uploadURL) 35 39 36 - target, err := url.Parse(uploadURL) 40 + backend, err := newUploadCacheBackend(uploadURL, readUpstreams, stagingDir, logger) 37 41 if err != nil { 38 - return nil, fmt.Errorf("parse upload URL %q: %w", uploadURL, err) 39 - } 40 - if target.Scheme != "http" && target.Scheme != "https" { 41 - return nil, fmt.Errorf("upload URL %q uses unsupported scheme %q (must be http or https)", uploadURL, target.Scheme) 42 - } 43 - if target.Host == "" { 44 - return nil, fmt.Errorf("upload URL %q is missing host", uploadURL) 42 + return nil, err 45 43 } 46 44 47 45 ln, port, err := listenRandomVsockUploadPort(ctx) ··· 50 48 } 51 49 52 50 proxy := &UploadCacheProxy{ 53 - port: port, 54 - ln: ln, 51 + port: port, 52 + ln: ln, 53 + backend: backend, 55 54 } 56 55 proxy.server = &http.Server{ 57 - Handler: uploadProxyHandler(target, readUpstreams, logger), 56 + Handler: backend, 58 57 Protocols: cacheProxyProtocols(), 59 58 ReadHeaderTimeout: 30 * time.Second, 60 59 } ··· 74 73 return proxy, nil 75 74 } 76 75 76 + func newUploadCacheBackend(uploadURL string, readUpstreams []CacheUpstream, stagingDir string, logger *slog.Logger) (UploadCacheBackend, error) { 77 + if strings.TrimSpace(uploadURL) == "" { 78 + return nil, nil 79 + } 80 + 81 + target, err := url.Parse(uploadURL) 82 + if err != nil { 83 + return nil, fmt.Errorf("parse upload URL %q: %w", uploadURL, err) 84 + } 85 + 86 + switch target.Scheme { 87 + case "http", "https": 88 + if target.Host == "" { 89 + return nil, fmt.Errorf("upload URL %q is missing host", uploadURL) 90 + } 91 + return newHTTPUploadProxyBackend(target, readUpstreams, logger), nil 92 + 93 + case "ssh", "ssh-ng": 94 + return newNixStoreUploadBackend(target.String(), stagingDir, readUpstreams, logger, nil) 95 + 96 + case "": 97 + switch uploadURL { 98 + case "daemon", "local": 99 + return newNixStoreUploadBackend(uploadURL, stagingDir, readUpstreams, logger, nil) 100 + default: 101 + return nil, fmt.Errorf("unsupported upload URL %q", uploadURL) 102 + } 103 + 104 + default: 105 + return nil, fmt.Errorf("upload URL %q uses unsupported scheme %q", uploadURL, target.Scheme) 106 + } 107 + } 108 + 77 109 func (p *UploadCacheProxy) Port() uint32 { 78 110 if p == nil { 79 111 return 0 ··· 97 129 closeErr = errors.Join(closeErr, p.ln.Close()) 98 130 p.ln = nil 99 131 } 100 - return closeErr 101 - } 102 - 103 - func uploadProxyHandler(target *url.URL, readUpstreams []CacheUpstream, logger *slog.Logger) http.Handler { 104 - rp := httputil.NewSingleHostReverseProxy(target) 105 - rp.ErrorLog = slog.NewLogLogger(logger.Handler(), slog.LevelError) 106 - 107 - origDirector := rp.Director 108 - rp.Director = func(req *http.Request) { 109 - origDirector(req) 110 - // ensure host matches target 111 - req.Host = target.Host 112 - // the transport doesn't turn URL userinfo into basic auth, only 113 - // http.Client does, so do it ourselves 114 - if user := target.User; user != nil { 115 - password, _ := user.Password() 116 - req.SetBasicAuth(user.Username(), password) 117 - } 118 - } 119 - 120 - // before uploading, nix copy asks the destination whether it already has each 121 - // path by GET/HEAD-ing <hash>.narinfo and skips the ones it does. we answer 122 - // that check across the upload target *and* the read caches: if any of them 123 - // already serves the path there is no point uploading it (the guest would 124 - // just substitute it from there anyway). 125 - narinfoUpstreams := append([]CacheUpstream{{url: target}}, readUpstreams...) 126 - exists := &parallelRacingTransport{ 127 - upstreams: narinfoUpstreams, 128 - underlying: proxyTransport, 129 - guardedUnderlying: guardedProxyTransport, 130 - logger: logger, 131 - } 132 - 133 - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 134 - if isNarinfoExistenceCheck(r) { 135 - serveNarinfoExistence(w, r, exists, logger) 136 - return 137 - } 138 - rp.ServeHTTP(w, r) 139 - }) 140 - } 141 - 142 - func isNarinfoExistenceCheck(r *http.Request) bool { 143 - if r.Method != http.MethodGet && r.Method != http.MethodHead { 144 - return false 145 - } 146 - return strings.HasSuffix(r.URL.Path, ".narinfo") 147 - } 148 - 149 - func serveNarinfoExistence(w http.ResponseWriter, r *http.Request, exists http.RoundTripper, logger *slog.Logger) { 150 - probe := r.Clone(r.Context()) 151 - probe.RequestURI = "" 152 - 153 - resp, err := exists.RoundTrip(probe) 154 - if err != nil { 155 - logger.Warn("upload proxy narinfo check failed, treating as not present", "path", r.URL.Path, "error", err) 156 - w.WriteHeader(http.StatusNotFound) 157 - return 132 + if p.backend != nil { 133 + closeErr = errors.Join(closeErr, p.backend.Close()) 158 134 } 159 - defer resp.Body.Close() 160 - 161 - for key, values := range resp.Header { 162 - for _, value := range values { 163 - w.Header().Add(key, value) 164 - } 165 - } 166 - w.WriteHeader(resp.StatusCode) 167 - if _, err := io.Copy(w, resp.Body); err != nil && !errors.Is(err, context.Canceled) { 168 - logger.Warn("upload proxy narinfo copy failed", "path", r.URL.Path, "error", err) 169 - } 135 + return closeErr 170 136 } 171 137 172 138 func listenRandomVsockUploadPort(ctx context.Context) (*vsock.Listener, uint32, error) {
+5 -3
spindle/engines/microvm/vm.go
··· 182 182 } 183 183 184 184 func (e *Engine) drainNixCache(ctx context.Context, state *workflowState) error { 185 - if state.Agent == nil || e.cfg.NixCache.UploadURL == "" { 185 + if e.cfg.NixCache.UploadURL == "" { 186 186 return nil 187 187 } 188 188 189 189 drainCtx, cancel := context.WithTimeout(ctx, cacheDrainTimeout) 190 190 defer cancel() 191 191 192 - if _, err := state.Agent.Drain(drainCtx); err != nil { 193 - return fmt.Errorf("drain nix cache: %w", err) 192 + if state.Agent != nil { 193 + if _, err := state.Agent.Drain(drainCtx); err != nil { 194 + return fmt.Errorf("drain guest nix cache uploads: %w", err) 195 + } 194 196 } 195 197 return nil 196 198 }