Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "path/filepath"
9 "sync"
10
11 "tangled.org/core/notifier"
12 "tangled.org/core/spindle/config"
13 "tangled.org/core/spindle/db"
14 "tangled.org/core/spindle/models"
15 "tangled.org/core/spindle/secrets"
16)
17
18var (
19 ErrTimedOut = errors.New("timed out")
20 ErrWorkflowFailed = errors.New("workflow failed")
21)
22
23func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, workflowSem chan struct{}, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
24 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
25
26 // extract secrets
27 var allSecrets []secrets.UnlockedSecret
28 if pipeline.RepoDid != "" {
29 if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(pipeline.RepoDid.String())); err == nil {
30 allSecrets = res
31 }
32 }
33
34 secretValues := make([]string, len(allSecrets))
35 for i, s := range allSecrets {
36 secretValues[i] = s.Value
37 }
38
39 s3, err := NewS3(cfg.S3.LogBucket)
40 if err != nil {
41 l.Error("error creating s3 client", "err", err)
42 }
43
44 var wg sync.WaitGroup
45 for eng, wfs := range pipeline.Workflows {
46 workflowTimeout := eng.WorkflowTimeout()
47 l.Info("using workflow timeout", "timeout", workflowTimeout)
48
49 for _, w := range wfs {
50 wg.Add(1)
51 go func() {
52 defer wg.Done()
53
54 wid := models.WorkflowId{
55 PipelineId: pipelineId,
56 Name: w.Name,
57 }
58
59 defer func() {
60 if s3 != nil {
61 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String()))
62 if err := s3.WriteFile(ctx, logFile); err != nil {
63 l.Error("error uploading logs", "err", err)
64 }
65 }
66 }()
67
68 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
69 if err != nil {
70 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
71 wfLogger = models.NullLogger{}
72 } else {
73 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid)
74 defer wfLogger.Close()
75 }
76
77 err = db.StatusRunning(wid, n)
78 if err != nil {
79 l.Error("failed to set workflow status to running", "wid", wid, "err", err)
80 return
81 }
82
83 // acquire semaphore slot before starting the container
84 workflowSem <- struct{}{}
85 defer func() { <-workflowSem }()
86
87 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger)
88 if err != nil {
89 // TODO(winter): Should this always set StatusFailed?
90 // In the original, we only do in a subset of cases.
91 l.Error("setting up workflow", "wid", wid, "err", err)
92
93 destroyErr := eng.DestroyWorkflow(ctx, wid)
94 if destroyErr != nil {
95 l.Error("failed to destroy workflow after setup failure", "error", destroyErr)
96 }
97
98 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
99 if dbErr != nil {
100 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
101 }
102 return
103 }
104 defer eng.DestroyWorkflow(ctx, wid)
105
106 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
107 defer cancel()
108
109 for stepIdx, step := range w.Steps {
110 // log start of step
111 if wfLogger != nil {
112 wfLogger.
113 ControlWriter(stepIdx, step, models.StepStatusStart).
114 Write([]byte{0})
115 }
116
117 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
118
119 // log end of step
120 if wfLogger != nil {
121 wfLogger.
122 ControlWriter(stepIdx, step, models.StepStatusEnd).
123 Write([]byte{0})
124 }
125
126 if err != nil {
127 if errors.Is(err, ErrTimedOut) {
128 dbErr := db.StatusTimeout(wid, n)
129 if dbErr != nil {
130 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
131 }
132 } else {
133 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
134 if dbErr != nil {
135 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
136 }
137 }
138 return
139 }
140 }
141
142 err = db.StatusSuccess(wid, n)
143 if err != nil {
144 l.Error("failed to set workflow status to success", "wid", wid, "err", err)
145 }
146 }()
147 }
148 }
149
150 wg.Wait()
151 l.Info("all workflows completed")
152}