Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "path/filepath"
9 "sync"
10
11 "tangled.org/core/notifier"
12 "tangled.org/core/spindle/config"
13 "tangled.org/core/spindle/db"
14 "tangled.org/core/spindle/models"
15 "tangled.org/core/spindle/secrets"
16)
17
18var (
19 ErrTimedOut = errors.New("timed out")
20 ErrWorkflowFailed = errors.New("workflow failed")
21)
22
23type workflowFinalizer interface {
24 FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error
25}
26
27func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
28 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
29
30 // extract secrets
31 var allSecrets []secrets.UnlockedSecret
32 if pipeline.RepoDid != "" {
33 if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(pipeline.RepoDid.String())); err == nil {
34 allSecrets = res
35 }
36 }
37
38 secretValues := make([]string, len(allSecrets))
39 for i, s := range allSecrets {
40 secretValues[i] = s.Value
41 }
42
43 s3, err := NewS3(cfg.S3.LogBucket)
44 if err != nil {
45 l.Error("error creating s3 client", "err", err)
46 }
47
48 var wg sync.WaitGroup
49 for eng, wfs := range pipeline.Workflows {
50 workflowTimeout := eng.WorkflowTimeout()
51 l.Info("using workflow timeout", "timeout", workflowTimeout)
52
53 for _, w := range wfs {
54 wg.Add(1)
55 go func() {
56 defer wg.Done()
57
58 wid := models.WorkflowId{
59 PipelineId: pipelineId,
60 Name: w.Name,
61 }
62
63 defer func() {
64 if s3 != nil {
65 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String()))
66 if err := s3.WriteFile(ctx, logFile); err != nil {
67 l.Error("error uploading logs", "err", err)
68 }
69 }
70 }()
71
72 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
73 if err != nil {
74 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
75 wfLogger = models.NullLogger{}
76 } else {
77 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid)
78 defer wfLogger.Close()
79 }
80
81 l.Info("waiting for slot", "wid", wid)
82 slot := WorkflowSlot(NoopSlot{})
83 if s, ok := eng.(WorkflowSlotter); ok {
84 var err error
85 slot, err = s.AcquireWorkflowSlot(ctx, wid, &w)
86 if err != nil {
87 l.Error("failed to acquire slot", "wid", wid, "err", err)
88 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
89 if dbErr != nil {
90 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
91 }
92 return
93 }
94 }
95 defer slot.Release()
96
97 err = db.StatusRunning(wid, n)
98 if err != nil {
99 l.Error("failed to set workflow status to running", "wid", wid, "err", err)
100 return
101 }
102
103 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger)
104 if err != nil {
105 // TODO(winter): Should this always set StatusFailed?
106 // In the original, we only do in a subset of cases.
107 l.Error("setting up workflow", "wid", wid, "err", err)
108
109 destroyErr := eng.DestroyWorkflow(ctx, wid)
110 if destroyErr != nil {
111 l.Error("failed to destroy workflow after setup failure", "error", destroyErr)
112 }
113
114 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
115 if dbErr != nil {
116 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
117 }
118 return
119 }
120 defer eng.DestroyWorkflow(ctx, wid)
121
122 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
123 defer cancel()
124
125 for stepIdx, step := range w.Steps {
126 // log start of step
127 if wfLogger != nil {
128 wfLogger.
129 ControlWriter(stepIdx, step, models.StepStatusStart).
130 Write([]byte{0})
131 }
132
133 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
134
135 // log end of step
136 if wfLogger != nil {
137 wfLogger.
138 ControlWriter(stepIdx, step, models.StepStatusEnd).
139 Write([]byte{0})
140 }
141
142 if err != nil {
143 if errors.Is(err, ErrTimedOut) {
144 dbErr := db.StatusTimeout(wid, n)
145 if dbErr != nil {
146 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
147 }
148 } else {
149 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
150 if dbErr != nil {
151 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
152 }
153 }
154 return
155 }
156 }
157
158 if finalizer, ok := eng.(workflowFinalizer); ok {
159 if err := finalizer.FinalizeWorkflow(ctx, wid, &w, wfLogger); err != nil {
160 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
161 if dbErr != nil {
162 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
163 }
164 return
165 }
166 }
167
168 err = db.StatusSuccess(wid, n)
169 if err != nil {
170 l.Error("failed to set workflow status to success", "wid", wid, "err", err)
171 }
172 }()
173 }
174 }
175
176 wg.Wait()
177 l.Info("all workflows completed")
178}