Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package nixery 2 3import ( 4 "bufio" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.org/core/api/tangled" 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32 homeDir = "/tangled/home" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 cfg *config.Config 41 42 cleanupMu sync.Mutex 43 cleanup map[string][]cleanupFunc 44} 45 46type Step struct { 47 name string 48 kind models.StepKind 49 command string 50 environment map[string]string 51} 52 53func (s Step) Name() string { 54 return s.name 55} 56 57func (s Step) Command() string { 58 return s.command 59} 60 61func (s Step) Kind() models.StepKind { 62 return s.kind 63} 64 65// setupSteps get added to start of Steps 66type setupSteps []models.Step 67 68// addStep adds a step to the beginning of the workflow's steps. 69func (ss *setupSteps) addStep(step models.Step) { 70 *ss = append(*ss, step) 71} 72 73type addlFields struct { 74 image string 75 container string 76 mounts []mount.Mount 77} 78 79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 80 swf := &models.Workflow{} 81 addl := addlFields{} 82 83 dwf := &struct { 84 Steps []struct { 85 Command string `yaml:"command"` 86 Name string `yaml:"name"` 87 Environment map[string]string `yaml:"environment"` 88 } `yaml:"steps"` 89 Dependencies map[string][]string `yaml:"dependencies"` 90 Environment map[string]string `yaml:"environment"` 91 }{} 92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 93 if err != nil { 94 return nil, err 95 } 96 97 for _, dstep := range dwf.Steps { 98 sstep := Step{} 99 sstep.environment = dstep.Environment 100 sstep.command = dstep.Command 101 sstep.name = dstep.Name 102 sstep.kind = models.StepKindUser 103 swf.Steps = append(swf.Steps, sstep) 104 } 105 swf.Name = twf.Name 106 swf.Environment = dwf.Environment 107 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 108 109 if sock := e.cfg.Server.DockerSocket; sock != "" { 110 addl.mounts = append(addl.mounts, mount.Mount{ 111 Type: mount.TypeBind, 112 Source: sock, 113 Target: sock, 114 ReadOnly: false, 115 }) 116 } 117 setup := &setupSteps{} 118 119 setup.addStep(nixConfStep()) 120 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 121 // this step could be empty 122 if s := dependencyStep(dwf.Dependencies); s != nil { 123 setup.addStep(*s) 124 } 125 126 // append setup steps in order to the start of workflow steps 127 swf.Steps = append(*setup, swf.Steps...) 128 swf.Data = addl 129 130 return swf, nil 131} 132 133func (e *Engine) WorkflowTimeout() time.Duration { 134 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 135 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 136 if err != nil { 137 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 138 workflowTimeout = 5 * time.Minute 139 } 140 141 return workflowTimeout 142} 143 144func workflowImage(deps map[string][]string, nixery string) string { 145 var dependencies string 146 for reg, ds := range deps { 147 if reg == "nixpkgs" { 148 dependencies = path.Join(ds...) 149 } 150 } 151 152 // load defaults from somewhere else 153 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 154 155 if runtime.GOARCH == "arm64" { 156 dependencies = path.Join("arm64", dependencies) 157 } 158 159 return path.Join(nixery, dependencies) 160} 161 162func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 163 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 164 if err != nil { 165 return nil, err 166 } 167 168 l := log.FromContext(ctx).With("component", "spindle") 169 170 e := &Engine{ 171 docker: dcli, 172 l: l, 173 cfg: cfg, 174 } 175 176 e.cleanup = make(map[string][]cleanupFunc) 177 178 return e, nil 179} 180 181func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 182 /// -------------------------INITIAL SETUP------------------------------------------ 183 l := e.l.With("workflow", wid) 184 l.Info("setting up workflow") 185 186 setupStep := Step{ 187 name: "nixery image pull", 188 kind: models.StepKindSystem, 189 } 190 setupStepIdx := -1 191 192 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0}) 193 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0}) 194 195 /// -------------------------NETWORK CREATION--------------------------------------- 196 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 197 Driver: "bridge", 198 }) 199 if err != nil { 200 return err 201 } 202 203 e.registerCleanup(wid, func(ctx context.Context) error { 204 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil { 205 return fmt.Errorf("removing network: %w", err) 206 } 207 return nil 208 }) 209 210 /// -------------------------IMAGE PULL--------------------------------------------- 211 addl := wf.Data.(addlFields) 212 l.Info("pulling image", "image", addl.image) 213 fmt.Fprintf( 214 wfLogger.DataWriter(setupStepIdx, "stdout"), 215 "pulling image: %s", 216 addl.image, 217 ) 218 219 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 220 if err != nil { 221 l.Error("pipeline image pull failed!", "error", err.Error()) 222 fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err) 223 return fmt.Errorf("pulling image: %w", err) 224 } 225 defer reader.Close() 226 227 scanner := bufio.NewScanner(reader) 228 for scanner.Scan() { 229 line := scanner.Text() 230 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line)) 231 l.Info("image pull progress", "stdout", line) 232 } 233 234 /// -------------------------CONTAINER CREATION------------------------------------- 235 l.Info("creating container") 236 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("creating container...")) 237 238 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 239 Image: addl.image, 240 Cmd: []string{"cat"}, 241 OpenStdin: true, // so cat stays alive :3 242 Tty: false, 243 Hostname: "spindle", 244 WorkingDir: workspaceDir, 245 Labels: map[string]string{ 246 "sh.tangled.pipeline/workflow_id": wid.String(), 247 }, 248 // TODO(winter): investigate whether environment variables passed here 249 // get propagated to ContainerExec processes 250 }, &container.HostConfig{ 251 Mounts: append([]mount.Mount{ 252 { 253 Type: mount.TypeTmpfs, 254 Target: "/tmp", 255 ReadOnly: false, 256 TmpfsOptions: &mount.TmpfsOptions{ 257 Mode: 0o1777, // world-writable sticky bit 258 Options: [][]string{ 259 {"exec"}, 260 }, 261 }, 262 }, 263 }, addl.mounts...), 264 ReadonlyRootfs: false, 265 CapDrop: []string{"ALL"}, 266 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 267 SecurityOpt: []string{"no-new-privileges"}, 268 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 269 Resources: container.Resources{ 270 Memory: e.cfg.NixeryPipelines.MaxJobMemoryMB * 1024 * 1024, 271 }, 272 }, nil, nil, "") 273 if err != nil { 274 fmt.Fprintf( 275 wfLogger.DataWriter(setupStepIdx, "stderr"), 276 "container creation failed: %s", 277 err, 278 ) 279 return fmt.Errorf("creating container: %w", err) 280 } 281 282 e.registerCleanup(wid, func(ctx context.Context) error { 283 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil { 284 return fmt.Errorf("stopping container: %w", err) 285 } 286 287 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 288 RemoveVolumes: true, 289 RemoveLinks: false, 290 Force: false, 291 }) 292 if err != nil { 293 return fmt.Errorf("removing container: %w", err) 294 } 295 296 return nil 297 }) 298 299 /// -------------------------CONTAINER START---------------------------------------- 300 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("starting container...")) 301 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 302 return fmt.Errorf("starting container: %w", err) 303 } 304 305 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 306 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 307 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 308 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 309 }) 310 if err != nil { 311 return err 312 } 313 314 // This actually *starts* the command. Thanks, Docker! 315 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 316 if err != nil { 317 return err 318 } 319 defer execResp.Close() 320 321 // This is apparently best way to wait for the command to complete. 322 _, err = io.ReadAll(execResp.Reader) 323 if err != nil { 324 return err 325 } 326 327 /// -----------------------------------FINISH--------------------------------------- 328 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 329 if err != nil { 330 return err 331 } 332 333 if execInspectResp.ExitCode != 0 { 334 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 335 } else if execInspectResp.Running { 336 return errors.New("mkdir is somehow still running??") 337 } 338 339 addl.container = resp.ID 340 wf.Data = addl 341 342 return nil 343} 344 345func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 346 addl := w.Data.(addlFields) 347 workflowEnvs := ConstructEnvs(w.Environment) 348 // TODO(winter): should SetupWorkflow also have secret access? 349 // IMO yes, but probably worth thinking on. 350 for _, s := range secrets { 351 workflowEnvs.AddEnv(s.Key, s.Value) 352 } 353 354 step := w.Steps[idx] 355 356 select { 357 case <-ctx.Done(): 358 return ctx.Err() 359 default: 360 } 361 362 envs := append(EnvVars(nil), workflowEnvs...) 363 if nixStep, ok := step.(Step); ok { 364 for k, v := range nixStep.environment { 365 envs.AddEnv(k, v) 366 } 367 } 368 369 envs.AddEnv("HOME", homeDir) 370 existingPath := "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" 371 envs.AddEnv("PATH", fmt.Sprintf("%s/.nix-profile/bin:/nix/var/nix/profiles/default/bin:%s", homeDir, existingPath)) 372 if sock := e.cfg.Server.DockerSocket; sock != "" { 373 envs.AddEnv("DOCKER_HOST", fmt.Sprintf("unix://%s", sock)) 374 } 375 376 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 377 Cmd: []string{"bash", "-c", step.Command()}, 378 AttachStdout: true, 379 AttachStderr: true, 380 Env: envs, 381 }) 382 if err != nil { 383 return fmt.Errorf("creating exec: %w", err) 384 } 385 386 // start tailing logs in background 387 tailDone := make(chan error, 1) 388 go func() { 389 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx) 390 }() 391 392 select { 393 case <-tailDone: 394 395 case <-ctx.Done(): 396 // cleanup will be handled by DestroyWorkflow, since 397 // Docker doesn't provide an API to kill an exec run 398 // (sure, we could grab the PID and kill it ourselves, 399 // but that's wasted effort) 400 e.l.Warn("step timed out", "step", step.Name()) 401 402 <-tailDone 403 404 return engine.ErrTimedOut 405 } 406 407 select { 408 case <-ctx.Done(): 409 return ctx.Err() 410 default: 411 } 412 413 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 414 if err != nil { 415 return err 416 } 417 418 if execInspectResp.ExitCode != 0 { 419 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 420 if err != nil { 421 return err 422 } 423 424 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 425 426 if inspectResp.State.OOMKilled { 427 return ErrOOMKilled 428 } 429 return engine.ErrWorkflowFailed 430 } 431 432 return nil 433} 434 435func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error { 436 if wfLogger == nil { 437 return nil 438 } 439 440 // This actually *starts* the command. Thanks, Docker! 441 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 442 if err != nil { 443 return err 444 } 445 defer logs.Close() 446 447 _, err = stdcopy.StdCopy( 448 wfLogger.DataWriter(stepIdx, "stdout"), 449 wfLogger.DataWriter(stepIdx, "stderr"), 450 logs.Reader, 451 ) 452 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 453 return fmt.Errorf("failed to copy logs: %w", err) 454 } 455 456 return nil 457} 458 459func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 460 fns := e.drainCleanups(wid) 461 462 for _, fn := range fns { 463 if err := fn(ctx); err != nil { 464 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 465 } 466 } 467 return nil 468} 469 470func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 471 e.cleanupMu.Lock() 472 defer e.cleanupMu.Unlock() 473 474 key := wid.String() 475 e.cleanup[key] = append(e.cleanup[key], fn) 476} 477 478func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 479 e.cleanupMu.Lock() 480 key := wid.String() 481 482 fns := e.cleanup[key] 483 delete(e.cleanup, key) 484 e.cleanupMu.Unlock() 485 486 return fns 487} 488 489func networkName(wid models.WorkflowId) string { 490 return fmt.Sprintf("workflow-network-%s", wid) 491}