Monorepo for Tangled tangled.org
12

Configure Feed

Select the types of activity you want to include in your feed.

1package nixery 2 3import ( 4 "bufio" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.org/core/api/tangled" 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32 homeDir = "/tangled/home" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 cfg *config.Config 41 42 slotter engine.WorkflowSlotter 43 44 cleanupMu sync.Mutex 45 cleanup map[string][]cleanupFunc 46} 47 48type Step struct { 49 name string 50 kind models.StepKind 51 command string 52 environment map[string]string 53} 54 55func (s Step) Name() string { 56 return s.name 57} 58 59func (s Step) Command() string { 60 return s.command 61} 62 63func (s Step) Kind() models.StepKind { 64 return s.kind 65} 66 67// setupSteps get added to start of Steps 68type setupSteps []models.Step 69 70// addStep adds a step to the beginning of the workflow's steps. 71func (ss *setupSteps) addStep(step models.Step) { 72 *ss = append(*ss, step) 73} 74 75type addlFields struct { 76 image string 77 container string 78 mounts []mount.Mount 79} 80 81func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 82 swf := &models.Workflow{} 83 addl := addlFields{} 84 85 dwf := &struct { 86 Steps []struct { 87 Command string `yaml:"command"` 88 Name string `yaml:"name"` 89 Environment map[string]string `yaml:"environment"` 90 } `yaml:"steps"` 91 Dependencies map[string][]string `yaml:"dependencies"` 92 Environment map[string]string `yaml:"environment"` 93 }{} 94 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 95 if err != nil { 96 return nil, err 97 } 98 99 for _, dstep := range dwf.Steps { 100 sstep := Step{} 101 sstep.environment = dstep.Environment 102 sstep.command = dstep.Command 103 sstep.name = dstep.Name 104 sstep.kind = models.StepKindUser 105 swf.Steps = append(swf.Steps, sstep) 106 } 107 swf.Name = twf.Name 108 swf.Environment = dwf.Environment 109 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 110 111 if sock := e.cfg.Server.DockerSocket; sock != "" { 112 addl.mounts = append(addl.mounts, mount.Mount{ 113 Type: mount.TypeBind, 114 Source: sock, 115 Target: sock, 116 ReadOnly: false, 117 }) 118 } 119 setup := &setupSteps{} 120 121 setup.addStep(nixConfStep()) 122 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 123 // this step could be empty 124 if s := dependencyStep(dwf.Dependencies); s != nil { 125 setup.addStep(*s) 126 } 127 128 // append setup steps in order to the start of workflow steps 129 swf.Steps = append(*setup, swf.Steps...) 130 swf.Data = addl 131 132 return swf, nil 133} 134 135func (e *Engine) WorkflowTimeout() time.Duration { 136 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 137 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 138 if err != nil { 139 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 140 workflowTimeout = 5 * time.Minute 141 } 142 143 return workflowTimeout 144} 145 146func workflowImage(deps map[string][]string, nixery string) string { 147 var dependencies string 148 for reg, ds := range deps { 149 if reg == "nixpkgs" { 150 dependencies = path.Join(ds...) 151 } 152 } 153 154 // load defaults from somewhere else 155 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 156 157 if runtime.GOARCH == "arm64" { 158 dependencies = path.Join("arm64", dependencies) 159 } 160 161 return path.Join(nixery, dependencies) 162} 163 164func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 165 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 166 if err != nil { 167 return nil, err 168 } 169 170 l := log.FromContext(ctx).With("component", "spindle") 171 172 e := &Engine{ 173 docker: dcli, 174 l: l, 175 cfg: cfg, 176 slotter: engine.NewSemaphoreSlotter(cfg.NixeryPipelines.MaxConcurrentWorkflows), 177 } 178 179 e.cleanup = make(map[string][]cleanupFunc) 180 181 return e, nil 182} 183 184func (e *Engine) AcquireWorkflowSlot( 185 ctx context.Context, 186 wid models.WorkflowId, 187 wf *models.Workflow, 188) (engine.WorkflowSlot, error) { 189 if e.slotter == nil { 190 return engine.NoopSlot{}, nil 191 } 192 193 return e.slotter.AcquireWorkflowSlot(ctx, wid, wf) 194} 195 196func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 197 /// -------------------------INITIAL SETUP------------------------------------------ 198 l := e.l.With("workflow", wid) 199 l.Info("setting up workflow") 200 201 setupStep := Step{ 202 name: "Pull image from Nixery", 203 kind: models.StepKindSystem, 204 } 205 setupStepIdx := -1 206 207 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0}) 208 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0}) 209 210 /// -------------------------NETWORK CREATION--------------------------------------- 211 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 212 Driver: "bridge", 213 }) 214 if err != nil { 215 return err 216 } 217 218 e.registerCleanup(wid, func(ctx context.Context) error { 219 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil { 220 return fmt.Errorf("removing network: %w", err) 221 } 222 return nil 223 }) 224 225 /// -------------------------IMAGE PULL--------------------------------------------- 226 addl := wf.Data.(addlFields) 227 l.Info("pulling image", "image", addl.image) 228 fmt.Fprintf( 229 wfLogger.DataWriter(setupStepIdx, "stdout"), 230 "Pulling image: %s", 231 addl.image, 232 ) 233 234 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 235 if err != nil { 236 l.Error("pipeline image pull failed!", "error", err.Error()) 237 fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err) 238 return fmt.Errorf("pulling image: %w", err) 239 } 240 defer reader.Close() 241 242 scanner := bufio.NewScanner(reader) 243 for scanner.Scan() { 244 line := scanner.Text() 245 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line)) 246 l.Info("image pull progress", "stdout", line) 247 } 248 249 /// -------------------------CONTAINER CREATION------------------------------------- 250 l.Info("creating container") 251 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("Creating container...")) 252 253 extraHosts := []string{"host.docker.internal:host-gateway"} 254 for _, h := range e.cfg.Server.DevExtraHosts { 255 extraHosts = append(extraHosts, h+":host-gateway") 256 } 257 258 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 259 Image: addl.image, 260 Cmd: []string{"cat"}, 261 OpenStdin: true, // so cat stays alive :3 262 Tty: false, 263 Hostname: "spindle", 264 WorkingDir: workspaceDir, 265 Labels: map[string]string{ 266 "sh.tangled.pipeline/workflow_id": wid.String(), 267 }, 268 // TODO(winter): investigate whether environment variables passed here 269 // get propagated to ContainerExec processes 270 }, &container.HostConfig{ 271 Mounts: append([]mount.Mount{ 272 { 273 Type: mount.TypeTmpfs, 274 Target: "/tmp", 275 ReadOnly: false, 276 TmpfsOptions: &mount.TmpfsOptions{ 277 Mode: 0o1777, // world-writable sticky bit 278 Options: [][]string{ 279 {"exec"}, 280 }, 281 }, 282 }, 283 }, addl.mounts...), 284 ReadonlyRootfs: false, 285 CapDrop: []string{"ALL"}, 286 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 287 SecurityOpt: []string{"no-new-privileges"}, 288 ExtraHosts: extraHosts, 289 Resources: container.Resources{ 290 Memory: e.cfg.NixeryPipelines.MaxJobMemoryMB * 1024 * 1024, 291 }, 292 }, nil, nil, "") 293 if err != nil { 294 fmt.Fprintf( 295 wfLogger.DataWriter(setupStepIdx, "stderr"), 296 "container creation failed: %s", 297 err, 298 ) 299 return fmt.Errorf("creating container: %w", err) 300 } 301 302 e.registerCleanup(wid, func(ctx context.Context) error { 303 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil { 304 return fmt.Errorf("stopping container: %w", err) 305 } 306 307 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 308 RemoveVolumes: true, 309 RemoveLinks: false, 310 Force: false, 311 }) 312 if err != nil { 313 return fmt.Errorf("removing container: %w", err) 314 } 315 316 return nil 317 }) 318 319 /// -------------------------CONTAINER START---------------------------------------- 320 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("Starting container...")) 321 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 322 return fmt.Errorf("starting container: %w", err) 323 } 324 325 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 326 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 327 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 328 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 329 }) 330 if err != nil { 331 return err 332 } 333 334 // This actually *starts* the command. Thanks, Docker! 335 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 336 if err != nil { 337 return err 338 } 339 defer execResp.Close() 340 341 // This is apparently best way to wait for the command to complete. 342 _, err = io.ReadAll(execResp.Reader) 343 if err != nil { 344 return err 345 } 346 347 /// -----------------------------------FINISH--------------------------------------- 348 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 349 if err != nil { 350 return err 351 } 352 353 if execInspectResp.ExitCode != 0 { 354 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 355 } else if execInspectResp.Running { 356 return errors.New("mkdir is somehow still running??") 357 } 358 359 addl.container = resp.ID 360 wf.Data = addl 361 362 return nil 363} 364 365func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 366 addl := w.Data.(addlFields) 367 workflowEnvs := ConstructEnvs(w.Environment) 368 // TODO(winter): should SetupWorkflow also have secret access? 369 // IMO yes, but probably worth thinking on. 370 for _, s := range secrets { 371 workflowEnvs.AddEnv(s.Key, s.Value) 372 } 373 374 step := w.Steps[idx] 375 376 select { 377 case <-ctx.Done(): 378 return ctx.Err() 379 default: 380 } 381 382 envs := append(EnvVars(nil), workflowEnvs...) 383 if nixStep, ok := step.(Step); ok { 384 for k, v := range nixStep.environment { 385 envs.AddEnv(k, v) 386 } 387 } 388 389 envs.AddEnv("HOME", homeDir) 390 existingPath := "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" 391 envs.AddEnv("PATH", fmt.Sprintf("%s/.nix-profile/bin:/nix/var/nix/profiles/default/bin:%s", homeDir, existingPath)) 392 if sock := e.cfg.Server.DockerSocket; sock != "" { 393 envs.AddEnv("DOCKER_HOST", fmt.Sprintf("unix://%s", sock)) 394 } 395 396 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 397 Cmd: []string{"bash", "-c", step.Command()}, 398 AttachStdout: true, 399 AttachStderr: true, 400 Env: envs, 401 }) 402 if err != nil { 403 return fmt.Errorf("creating exec: %w", err) 404 } 405 406 // start tailing logs in background 407 tailDone := make(chan error, 1) 408 go func() { 409 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx) 410 }() 411 412 select { 413 case <-tailDone: 414 415 case <-ctx.Done(): 416 // cleanup will be handled by DestroyWorkflow, since 417 // Docker doesn't provide an API to kill an exec run 418 // (sure, we could grab the PID and kill it ourselves, 419 // but that's wasted effort) 420 e.l.Warn("step timed out", "step", step.Name()) 421 422 <-tailDone 423 424 return engine.ErrTimedOut 425 } 426 427 select { 428 case <-ctx.Done(): 429 return ctx.Err() 430 default: 431 } 432 433 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 434 if err != nil { 435 return err 436 } 437 438 if execInspectResp.ExitCode != 0 { 439 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 440 if err != nil { 441 return err 442 } 443 444 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 445 446 if inspectResp.State.OOMKilled { 447 return ErrOOMKilled 448 } 449 return engine.ErrWorkflowFailed 450 } 451 452 return nil 453} 454 455func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error { 456 if wfLogger == nil { 457 return nil 458 } 459 460 // This actually *starts* the command. Thanks, Docker! 461 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 462 if err != nil { 463 return err 464 } 465 defer logs.Close() 466 467 _, err = stdcopy.StdCopy( 468 wfLogger.DataWriter(stepIdx, "stdout"), 469 wfLogger.DataWriter(stepIdx, "stderr"), 470 logs.Reader, 471 ) 472 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 473 return fmt.Errorf("failed to copy logs: %w", err) 474 } 475 476 return nil 477} 478 479func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 480 fns := e.drainCleanups(wid) 481 482 for _, fn := range fns { 483 if err := fn(ctx); err != nil { 484 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 485 } 486 } 487 return nil 488} 489 490func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 491 e.cleanupMu.Lock() 492 defer e.cleanupMu.Unlock() 493 494 key := wid.String() 495 e.cleanup[key] = append(e.cleanup[key], fn) 496} 497 498func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 499 e.cleanupMu.Lock() 500 key := wid.String() 501 502 fns := e.cleanup[key] 503 delete(e.cleanup, key) 504 e.cleanupMu.Unlock() 505 506 return fns 507} 508 509func networkName(wid models.WorkflowId) string { 510 return fmt.Sprintf("workflow-network-%s", wid) 511}