Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "path/filepath"
9 "sync"
10
11 securejoin "github.com/cyphar/filepath-securejoin"
12 "tangled.org/core/notifier"
13 "tangled.org/core/spindle/config"
14 "tangled.org/core/spindle/db"
15 "tangled.org/core/spindle/models"
16 "tangled.org/core/spindle/secrets"
17)
18
19var (
20 ErrTimedOut = errors.New("timed out")
21 ErrWorkflowFailed = errors.New("workflow failed")
22)
23
24func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, workflowSem chan struct{}, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
25 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
26
27 // extract secrets
28 var allSecrets []secrets.UnlockedSecret
29 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
30 if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(didSlashRepo)); err == nil {
31 allSecrets = res
32 }
33 }
34
35 secretValues := make([]string, len(allSecrets))
36 for i, s := range allSecrets {
37 secretValues[i] = s.Value
38 }
39
40 s3, err := NewS3(cfg.S3.LogBucket)
41 if err != nil {
42 l.Error("error creating s3 client", "err", err)
43 }
44
45 var wg sync.WaitGroup
46 for eng, wfs := range pipeline.Workflows {
47 workflowTimeout := eng.WorkflowTimeout()
48 l.Info("using workflow timeout", "timeout", workflowTimeout)
49
50 for _, w := range wfs {
51 wg.Add(1)
52 go func() {
53 defer wg.Done()
54
55 wid := models.WorkflowId{
56 PipelineId: pipelineId,
57 Name: w.Name,
58 }
59
60 defer func() {
61 if s3 != nil {
62 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String()))
63 if err := s3.WriteFile(ctx, logFile); err != nil {
64 l.Error("error uploading logs", "err", err)
65 }
66 }
67 }()
68
69 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
70 if err != nil {
71 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
72 wfLogger = models.NullLogger{}
73 } else {
74 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid)
75 defer wfLogger.Close()
76 }
77
78 err = db.StatusRunning(wid, n)
79 if err != nil {
80 l.Error("failed to set workflow status to running", "wid", wid, "err", err)
81 return
82 }
83
84 // acquire semaphore slot before starting the container
85 workflowSem <- struct{}{}
86 defer func() { <-workflowSem }()
87
88 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger)
89 if err != nil {
90 // TODO(winter): Should this always set StatusFailed?
91 // In the original, we only do in a subset of cases.
92 l.Error("setting up workflow", "wid", wid, "err", err)
93
94 destroyErr := eng.DestroyWorkflow(ctx, wid)
95 if destroyErr != nil {
96 l.Error("failed to destroy workflow after setup failure", "error", destroyErr)
97 }
98
99 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
100 if dbErr != nil {
101 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
102 }
103 return
104 }
105 defer eng.DestroyWorkflow(ctx, wid)
106
107 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
108 defer cancel()
109
110 for stepIdx, step := range w.Steps {
111 // log start of step
112 if wfLogger != nil {
113 wfLogger.
114 ControlWriter(stepIdx, step, models.StepStatusStart).
115 Write([]byte{0})
116 }
117
118 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
119
120 // log end of step
121 if wfLogger != nil {
122 wfLogger.
123 ControlWriter(stepIdx, step, models.StepStatusEnd).
124 Write([]byte{0})
125 }
126
127 if err != nil {
128 if errors.Is(err, ErrTimedOut) {
129 dbErr := db.StatusTimeout(wid, n)
130 if dbErr != nil {
131 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
132 }
133 } else {
134 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
135 if dbErr != nil {
136 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
137 }
138 }
139 return
140 }
141 }
142
143 err = db.StatusSuccess(wid, n)
144 if err != nil {
145 l.Error("failed to set workflow status to success", "wid", wid, "err", err)
146 }
147 }()
148 }
149 }
150
151 wg.Wait()
152 l.Info("all workflows completed")
153}