Monorepo for Tangled
tangled.org
1package nixery
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.org/core/api/tangled"
23 "tangled.org/core/log"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32 homeDir = "/tangled/home"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 cfg *config.Config
41
42 slotter engine.WorkflowSlotter
43
44 cleanupMu sync.Mutex
45 cleanup map[string][]cleanupFunc
46}
47
48type Step struct {
49 name string
50 kind models.StepKind
51 command string
52 environment map[string]string
53}
54
55func (s Step) Name() string {
56 return s.name
57}
58
59func (s Step) Command() string {
60 return s.command
61}
62
63func (s Step) Kind() models.StepKind {
64 return s.kind
65}
66
67// setupSteps get added to start of Steps
68type setupSteps []models.Step
69
70// addStep adds a step to the beginning of the workflow's steps.
71func (ss *setupSteps) addStep(step models.Step) {
72 *ss = append(*ss, step)
73}
74
75type addlFields struct {
76 image string
77 container string
78 mounts []mount.Mount
79}
80
81func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
82 swf := &models.Workflow{}
83 addl := addlFields{}
84
85 dwf := &struct {
86 Steps []struct {
87 Command string `yaml:"command"`
88 Name string `yaml:"name"`
89 Environment map[string]string `yaml:"environment"`
90 } `yaml:"steps"`
91 Dependencies map[string][]string `yaml:"dependencies"`
92 Environment map[string]string `yaml:"environment"`
93 }{}
94 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
95 if err != nil {
96 return nil, err
97 }
98
99 for _, dstep := range dwf.Steps {
100 sstep := Step{}
101 sstep.environment = dstep.Environment
102 sstep.command = dstep.Command
103 sstep.name = dstep.Name
104 sstep.kind = models.StepKindUser
105 swf.Steps = append(swf.Steps, sstep)
106 }
107 swf.Name = twf.Name
108 swf.Environment = dwf.Environment
109 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
110
111 if sock := e.cfg.Server.DockerSocket; sock != "" {
112 addl.mounts = append(addl.mounts, mount.Mount{
113 Type: mount.TypeBind,
114 Source: sock,
115 Target: sock,
116 ReadOnly: false,
117 })
118 }
119 setup := &setupSteps{}
120
121 setup.addStep(nixConfStep())
122 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
123 // this step could be empty
124 if s := dependencyStep(dwf.Dependencies); s != nil {
125 setup.addStep(*s)
126 }
127
128 // append setup steps in order to the start of workflow steps
129 swf.Steps = append(*setup, swf.Steps...)
130 swf.Data = addl
131
132 return swf, nil
133}
134
135func (e *Engine) WorkflowTimeout() time.Duration {
136 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
137 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
138 if err != nil {
139 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
140 workflowTimeout = 5 * time.Minute
141 }
142
143 return workflowTimeout
144}
145
146func workflowImage(deps map[string][]string, nixery string) string {
147 var dependencies string
148 for reg, ds := range deps {
149 if reg == "nixpkgs" {
150 dependencies = path.Join(ds...)
151 }
152 }
153
154 // load defaults from somewhere else
155 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
156
157 if runtime.GOARCH == "arm64" {
158 dependencies = path.Join("arm64", dependencies)
159 }
160
161 return path.Join(nixery, dependencies)
162}
163
164func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
165 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
166 if err != nil {
167 return nil, err
168 }
169
170 l := log.FromContext(ctx).With("component", "spindle")
171
172 e := &Engine{
173 docker: dcli,
174 l: l,
175 cfg: cfg,
176 slotter: engine.NewSemaphoreSlotter(cfg.NixeryPipelines.MaxConcurrentWorkflows),
177 }
178
179 e.cleanup = make(map[string][]cleanupFunc)
180
181 return e, nil
182}
183
184func (e *Engine) AcquireWorkflowSlot(
185 ctx context.Context,
186 wid models.WorkflowId,
187 wf *models.Workflow,
188) (engine.WorkflowSlot, error) {
189 if e.slotter == nil {
190 return engine.NoopSlot{}, nil
191 }
192
193 return e.slotter.AcquireWorkflowSlot(ctx, wid, wf)
194}
195
196func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
197 /// -------------------------INITIAL SETUP------------------------------------------
198 l := e.l.With("workflow", wid)
199 l.Info("setting up workflow")
200
201 setupStep := Step{
202 name: "Pull image from Nixery",
203 kind: models.StepKindSystem,
204 }
205 setupStepIdx := -1
206
207 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0})
208 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0})
209
210 /// -------------------------NETWORK CREATION---------------------------------------
211 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
212 Driver: "bridge",
213 })
214 if err != nil {
215 return err
216 }
217
218 e.registerCleanup(wid, func(ctx context.Context) error {
219 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil {
220 return fmt.Errorf("removing network: %w", err)
221 }
222 return nil
223 })
224
225 /// -------------------------IMAGE PULL---------------------------------------------
226 addl := wf.Data.(addlFields)
227 l.Info("pulling image", "image", addl.image)
228 fmt.Fprintf(
229 wfLogger.DataWriter(setupStepIdx, "stdout"),
230 "Pulling image: %s",
231 addl.image,
232 )
233
234 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
235 if err != nil {
236 l.Error("pipeline image pull failed!", "error", err.Error())
237 fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err)
238 return fmt.Errorf("pulling image: %w", err)
239 }
240 defer reader.Close()
241
242 scanner := bufio.NewScanner(reader)
243 for scanner.Scan() {
244 line := scanner.Text()
245 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line))
246 l.Info("image pull progress", "stdout", line)
247 }
248
249 /// -------------------------CONTAINER CREATION-------------------------------------
250 l.Info("creating container")
251 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("Creating container..."))
252
253 extraHosts := []string{"host.docker.internal:host-gateway"}
254 for _, h := range e.cfg.Server.DevExtraHosts {
255 extraHosts = append(extraHosts, h+":host-gateway")
256 }
257
258 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
259 Image: addl.image,
260 Cmd: []string{"cat"},
261 OpenStdin: true, // so cat stays alive :3
262 Tty: false,
263 Hostname: "spindle",
264 WorkingDir: workspaceDir,
265 Labels: map[string]string{
266 "sh.tangled.pipeline/workflow_id": wid.String(),
267 },
268 // TODO(winter): investigate whether environment variables passed here
269 // get propagated to ContainerExec processes
270 }, &container.HostConfig{
271 Mounts: append([]mount.Mount{
272 {
273 Type: mount.TypeTmpfs,
274 Target: "/tmp",
275 ReadOnly: false,
276 TmpfsOptions: &mount.TmpfsOptions{
277 Mode: 0o1777, // world-writable sticky bit
278 Options: [][]string{
279 {"exec"},
280 },
281 },
282 },
283 }, addl.mounts...),
284 ReadonlyRootfs: false,
285 CapDrop: []string{"ALL"},
286 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
287 SecurityOpt: []string{"no-new-privileges"},
288 ExtraHosts: extraHosts,
289 Resources: container.Resources{
290 Memory: e.cfg.NixeryPipelines.MaxJobMemoryMB * 1024 * 1024,
291 },
292 }, nil, nil, "")
293 if err != nil {
294 fmt.Fprintf(
295 wfLogger.DataWriter(setupStepIdx, "stderr"),
296 "container creation failed: %s",
297 err,
298 )
299 return fmt.Errorf("creating container: %w", err)
300 }
301
302 e.registerCleanup(wid, func(ctx context.Context) error {
303 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil {
304 return fmt.Errorf("stopping container: %w", err)
305 }
306
307 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
308 RemoveVolumes: true,
309 RemoveLinks: false,
310 Force: false,
311 })
312 if err != nil {
313 return fmt.Errorf("removing container: %w", err)
314 }
315
316 return nil
317 })
318
319 /// -------------------------CONTAINER START----------------------------------------
320 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("Starting container..."))
321 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
322 return fmt.Errorf("starting container: %w", err)
323 }
324
325 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
326 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
327 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
328 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
329 })
330 if err != nil {
331 return err
332 }
333
334 // This actually *starts* the command. Thanks, Docker!
335 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
336 if err != nil {
337 return err
338 }
339 defer execResp.Close()
340
341 // This is apparently best way to wait for the command to complete.
342 _, err = io.ReadAll(execResp.Reader)
343 if err != nil {
344 return err
345 }
346
347 /// -----------------------------------FINISH---------------------------------------
348 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
349 if err != nil {
350 return err
351 }
352
353 if execInspectResp.ExitCode != 0 {
354 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
355 } else if execInspectResp.Running {
356 return errors.New("mkdir is somehow still running??")
357 }
358
359 addl.container = resp.ID
360 wf.Data = addl
361
362 return nil
363}
364
365func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
366 addl := w.Data.(addlFields)
367 workflowEnvs := ConstructEnvs(w.Environment)
368 // TODO(winter): should SetupWorkflow also have secret access?
369 // IMO yes, but probably worth thinking on.
370 for _, s := range secrets {
371 workflowEnvs.AddEnv(s.Key, s.Value)
372 }
373
374 step := w.Steps[idx]
375
376 select {
377 case <-ctx.Done():
378 return ctx.Err()
379 default:
380 }
381
382 envs := append(EnvVars(nil), workflowEnvs...)
383 if nixStep, ok := step.(Step); ok {
384 for k, v := range nixStep.environment {
385 envs.AddEnv(k, v)
386 }
387 }
388
389 envs.AddEnv("HOME", homeDir)
390 existingPath := "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
391 envs.AddEnv("PATH", fmt.Sprintf("%s/.nix-profile/bin:/nix/var/nix/profiles/default/bin:%s", homeDir, existingPath))
392 if sock := e.cfg.Server.DockerSocket; sock != "" {
393 envs.AddEnv("DOCKER_HOST", fmt.Sprintf("unix://%s", sock))
394 }
395
396 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
397 Cmd: []string{"bash", "-c", step.Command()},
398 AttachStdout: true,
399 AttachStderr: true,
400 Env: envs,
401 })
402 if err != nil {
403 return fmt.Errorf("creating exec: %w", err)
404 }
405
406 // start tailing logs in background
407 tailDone := make(chan error, 1)
408 go func() {
409 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx)
410 }()
411
412 select {
413 case <-tailDone:
414
415 case <-ctx.Done():
416 // cleanup will be handled by DestroyWorkflow, since
417 // Docker doesn't provide an API to kill an exec run
418 // (sure, we could grab the PID and kill it ourselves,
419 // but that's wasted effort)
420 e.l.Warn("step timed out", "step", step.Name())
421
422 <-tailDone
423
424 return engine.ErrTimedOut
425 }
426
427 select {
428 case <-ctx.Done():
429 return ctx.Err()
430 default:
431 }
432
433 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
434 if err != nil {
435 return err
436 }
437
438 if execInspectResp.ExitCode != 0 {
439 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
440 if err != nil {
441 return err
442 }
443
444 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
445
446 if inspectResp.State.OOMKilled {
447 return ErrOOMKilled
448 }
449 return engine.ErrWorkflowFailed
450 }
451
452 return nil
453}
454
455func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error {
456 if wfLogger == nil {
457 return nil
458 }
459
460 // This actually *starts* the command. Thanks, Docker!
461 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
462 if err != nil {
463 return err
464 }
465 defer logs.Close()
466
467 _, err = stdcopy.StdCopy(
468 wfLogger.DataWriter(stepIdx, "stdout"),
469 wfLogger.DataWriter(stepIdx, "stderr"),
470 logs.Reader,
471 )
472 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
473 return fmt.Errorf("failed to copy logs: %w", err)
474 }
475
476 return nil
477}
478
479func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
480 fns := e.drainCleanups(wid)
481
482 for _, fn := range fns {
483 if err := fn(ctx); err != nil {
484 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
485 }
486 }
487 return nil
488}
489
490func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
491 e.cleanupMu.Lock()
492 defer e.cleanupMu.Unlock()
493
494 key := wid.String()
495 e.cleanup[key] = append(e.cleanup[key], fn)
496}
497
498func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
499 e.cleanupMu.Lock()
500 key := wid.String()
501
502 fns := e.cleanup[key]
503 delete(e.cleanup, key)
504 e.cleanupMu.Unlock()
505
506 return fns
507}
508
509func networkName(wid models.WorkflowId) string {
510 return fmt.Sprintf("workflow-network-%s", wid)
511}