Monorepo for Tangled
tangled.org
1package nixery
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.org/core/api/tangled"
23 "tangled.org/core/log"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32 homeDir = "/tangled/home"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 cfg *config.Config
41
42 cleanupMu sync.Mutex
43 cleanup map[string][]cleanupFunc
44}
45
46type Step struct {
47 name string
48 kind models.StepKind
49 command string
50 environment map[string]string
51}
52
53func (s Step) Name() string {
54 return s.name
55}
56
57func (s Step) Command() string {
58 return s.command
59}
60
61func (s Step) Kind() models.StepKind {
62 return s.kind
63}
64
65// setupSteps get added to start of Steps
66type setupSteps []models.Step
67
68// addStep adds a step to the beginning of the workflow's steps.
69func (ss *setupSteps) addStep(step models.Step) {
70 *ss = append(*ss, step)
71}
72
73type addlFields struct {
74 image string
75 container string
76 mounts []mount.Mount
77}
78
79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
80 swf := &models.Workflow{}
81 addl := addlFields{}
82
83 dwf := &struct {
84 Steps []struct {
85 Command string `yaml:"command"`
86 Name string `yaml:"name"`
87 Environment map[string]string `yaml:"environment"`
88 } `yaml:"steps"`
89 Dependencies map[string][]string `yaml:"dependencies"`
90 Environment map[string]string `yaml:"environment"`
91 }{}
92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
93 if err != nil {
94 return nil, err
95 }
96
97 for _, dstep := range dwf.Steps {
98 sstep := Step{}
99 sstep.environment = dstep.Environment
100 sstep.command = dstep.Command
101 sstep.name = dstep.Name
102 sstep.kind = models.StepKindUser
103 swf.Steps = append(swf.Steps, sstep)
104 }
105 swf.Name = twf.Name
106 swf.Environment = dwf.Environment
107 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
108
109 if sock := e.cfg.Server.DockerSocket; sock != "" {
110 addl.mounts = append(addl.mounts, mount.Mount{
111 Type: mount.TypeBind,
112 Source: sock,
113 Target: sock,
114 ReadOnly: false,
115 })
116 }
117 setup := &setupSteps{}
118
119 setup.addStep(nixConfStep())
120 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
121 // this step could be empty
122 if s := dependencyStep(dwf.Dependencies); s != nil {
123 setup.addStep(*s)
124 }
125
126 // append setup steps in order to the start of workflow steps
127 swf.Steps = append(*setup, swf.Steps...)
128 swf.Data = addl
129
130 return swf, nil
131}
132
133func (e *Engine) WorkflowTimeout() time.Duration {
134 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
135 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
136 if err != nil {
137 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
138 workflowTimeout = 5 * time.Minute
139 }
140
141 return workflowTimeout
142}
143
144func workflowImage(deps map[string][]string, nixery string) string {
145 var dependencies string
146 for reg, ds := range deps {
147 if reg == "nixpkgs" {
148 dependencies = path.Join(ds...)
149 }
150 }
151
152 // load defaults from somewhere else
153 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
154
155 if runtime.GOARCH == "arm64" {
156 dependencies = path.Join("arm64", dependencies)
157 }
158
159 return path.Join(nixery, dependencies)
160}
161
162func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
163 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
164 if err != nil {
165 return nil, err
166 }
167
168 l := log.FromContext(ctx).With("component", "spindle")
169
170 e := &Engine{
171 docker: dcli,
172 l: l,
173 cfg: cfg,
174 }
175
176 e.cleanup = make(map[string][]cleanupFunc)
177
178 return e, nil
179}
180
181func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
182 /// -------------------------INITIAL SETUP------------------------------------------
183 l := e.l.With("workflow", wid)
184 l.Info("setting up workflow")
185
186 setupStep := Step{
187 name: "nixery image pull",
188 kind: models.StepKindSystem,
189 }
190 setupStepIdx := -1
191
192 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0})
193 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0})
194
195 /// -------------------------NETWORK CREATION---------------------------------------
196 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
197 Driver: "bridge",
198 })
199 if err != nil {
200 return err
201 }
202
203 e.registerCleanup(wid, func(ctx context.Context) error {
204 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil {
205 return fmt.Errorf("removing network: %w", err)
206 }
207 return nil
208 })
209
210 /// -------------------------IMAGE PULL---------------------------------------------
211 addl := wf.Data.(addlFields)
212 l.Info("pulling image", "image", addl.image)
213 fmt.Fprintf(
214 wfLogger.DataWriter(setupStepIdx, "stdout"),
215 "pulling image: %s",
216 addl.image,
217 )
218
219 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
220 if err != nil {
221 l.Error("pipeline image pull failed!", "error", err.Error())
222 fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err)
223 return fmt.Errorf("pulling image: %w", err)
224 }
225 defer reader.Close()
226
227 scanner := bufio.NewScanner(reader)
228 for scanner.Scan() {
229 line := scanner.Text()
230 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line))
231 l.Info("image pull progress", "stdout", line)
232 }
233
234 /// -------------------------CONTAINER CREATION-------------------------------------
235 l.Info("creating container")
236 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("creating container..."))
237
238 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
239 Image: addl.image,
240 Cmd: []string{"cat"},
241 OpenStdin: true, // so cat stays alive :3
242 Tty: false,
243 Hostname: "spindle",
244 WorkingDir: workspaceDir,
245 Labels: map[string]string{
246 "sh.tangled.pipeline/workflow_id": wid.String(),
247 },
248 // TODO(winter): investigate whether environment variables passed here
249 // get propagated to ContainerExec processes
250 }, &container.HostConfig{
251 Mounts: append([]mount.Mount{
252 {
253 Type: mount.TypeTmpfs,
254 Target: "/tmp",
255 ReadOnly: false,
256 TmpfsOptions: &mount.TmpfsOptions{
257 Mode: 0o1777, // world-writable sticky bit
258 Options: [][]string{
259 {"exec"},
260 },
261 },
262 },
263 }, addl.mounts...),
264 ReadonlyRootfs: false,
265 CapDrop: []string{"ALL"},
266 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
267 SecurityOpt: []string{"no-new-privileges"},
268 ExtraHosts: []string{"host.docker.internal:host-gateway"},
269 Resources: container.Resources{
270 Memory: e.cfg.NixeryPipelines.MaxJobMemoryMB * 1024 * 1024,
271 },
272 }, nil, nil, "")
273 if err != nil {
274 fmt.Fprintf(
275 wfLogger.DataWriter(setupStepIdx, "stderr"),
276 "container creation failed: %s",
277 err,
278 )
279 return fmt.Errorf("creating container: %w", err)
280 }
281
282 e.registerCleanup(wid, func(ctx context.Context) error {
283 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil {
284 return fmt.Errorf("stopping container: %w", err)
285 }
286
287 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
288 RemoveVolumes: true,
289 RemoveLinks: false,
290 Force: false,
291 })
292 if err != nil {
293 return fmt.Errorf("removing container: %w", err)
294 }
295
296 return nil
297 })
298
299 /// -------------------------CONTAINER START----------------------------------------
300 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("starting container..."))
301 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
302 return fmt.Errorf("starting container: %w", err)
303 }
304
305 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
306 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
307 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
308 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
309 })
310 if err != nil {
311 return err
312 }
313
314 // This actually *starts* the command. Thanks, Docker!
315 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
316 if err != nil {
317 return err
318 }
319 defer execResp.Close()
320
321 // This is apparently best way to wait for the command to complete.
322 _, err = io.ReadAll(execResp.Reader)
323 if err != nil {
324 return err
325 }
326
327 /// -----------------------------------FINISH---------------------------------------
328 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
329 if err != nil {
330 return err
331 }
332
333 if execInspectResp.ExitCode != 0 {
334 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
335 } else if execInspectResp.Running {
336 return errors.New("mkdir is somehow still running??")
337 }
338
339 addl.container = resp.ID
340 wf.Data = addl
341
342 return nil
343}
344
345func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
346 addl := w.Data.(addlFields)
347 workflowEnvs := ConstructEnvs(w.Environment)
348 // TODO(winter): should SetupWorkflow also have secret access?
349 // IMO yes, but probably worth thinking on.
350 for _, s := range secrets {
351 workflowEnvs.AddEnv(s.Key, s.Value)
352 }
353
354 step := w.Steps[idx]
355
356 select {
357 case <-ctx.Done():
358 return ctx.Err()
359 default:
360 }
361
362 envs := append(EnvVars(nil), workflowEnvs...)
363 if nixStep, ok := step.(Step); ok {
364 for k, v := range nixStep.environment {
365 envs.AddEnv(k, v)
366 }
367 }
368
369 envs.AddEnv("HOME", homeDir)
370 existingPath := "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
371 envs.AddEnv("PATH", fmt.Sprintf("%s/.nix-profile/bin:/nix/var/nix/profiles/default/bin:%s", homeDir, existingPath))
372 if sock := e.cfg.Server.DockerSocket; sock != "" {
373 envs.AddEnv("DOCKER_HOST", fmt.Sprintf("unix://%s", sock))
374 }
375
376 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
377 Cmd: []string{"bash", "-c", step.Command()},
378 AttachStdout: true,
379 AttachStderr: true,
380 Env: envs,
381 })
382 if err != nil {
383 return fmt.Errorf("creating exec: %w", err)
384 }
385
386 // start tailing logs in background
387 tailDone := make(chan error, 1)
388 go func() {
389 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx)
390 }()
391
392 select {
393 case <-tailDone:
394
395 case <-ctx.Done():
396 // cleanup will be handled by DestroyWorkflow, since
397 // Docker doesn't provide an API to kill an exec run
398 // (sure, we could grab the PID and kill it ourselves,
399 // but that's wasted effort)
400 e.l.Warn("step timed out", "step", step.Name())
401
402 <-tailDone
403
404 return engine.ErrTimedOut
405 }
406
407 select {
408 case <-ctx.Done():
409 return ctx.Err()
410 default:
411 }
412
413 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
414 if err != nil {
415 return err
416 }
417
418 if execInspectResp.ExitCode != 0 {
419 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
420 if err != nil {
421 return err
422 }
423
424 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
425
426 if inspectResp.State.OOMKilled {
427 return ErrOOMKilled
428 }
429 return engine.ErrWorkflowFailed
430 }
431
432 return nil
433}
434
435func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error {
436 if wfLogger == nil {
437 return nil
438 }
439
440 // This actually *starts* the command. Thanks, Docker!
441 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
442 if err != nil {
443 return err
444 }
445 defer logs.Close()
446
447 _, err = stdcopy.StdCopy(
448 wfLogger.DataWriter(stepIdx, "stdout"),
449 wfLogger.DataWriter(stepIdx, "stderr"),
450 logs.Reader,
451 )
452 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
453 return fmt.Errorf("failed to copy logs: %w", err)
454 }
455
456 return nil
457}
458
459func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
460 fns := e.drainCleanups(wid)
461
462 for _, fn := range fns {
463 if err := fn(ctx); err != nil {
464 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
465 }
466 }
467 return nil
468}
469
470func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
471 e.cleanupMu.Lock()
472 defer e.cleanupMu.Unlock()
473
474 key := wid.String()
475 e.cleanup[key] = append(e.cleanup[key], fn)
476}
477
478func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
479 e.cleanupMu.Lock()
480 key := wid.String()
481
482 fns := e.cleanup[key]
483 delete(e.cleanup, key)
484 e.cleanupMu.Unlock()
485
486 return fns
487}
488
489func networkName(wid models.WorkflowId) string {
490 return fmt.Sprintf("workflow-network-%s", wid)
491}