Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "path/filepath" 9 "sync" 10 11 "tangled.org/core/notifier" 12 "tangled.org/core/spindle/config" 13 "tangled.org/core/spindle/db" 14 "tangled.org/core/spindle/models" 15 "tangled.org/core/spindle/secrets" 16) 17 18var ( 19 ErrTimedOut = errors.New("timed out") 20 ErrWorkflowFailed = errors.New("workflow failed") 21) 22 23type workflowFinalizer interface { 24 FinalizeWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error 25} 26 27func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 28 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 29 30 // extract secrets 31 var allSecrets []secrets.UnlockedSecret 32 if pipeline.RepoDid != "" { 33 if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(pipeline.RepoDid.String())); err == nil { 34 allSecrets = res 35 } 36 } 37 38 secretValues := make([]string, len(allSecrets)) 39 for i, s := range allSecrets { 40 secretValues[i] = s.Value 41 } 42 43 s3, err := NewS3(cfg.S3.LogBucket) 44 if err != nil { 45 l.Error("error creating s3 client", "err", err) 46 } 47 48 var wg sync.WaitGroup 49 for eng, wfs := range pipeline.Workflows { 50 workflowTimeout := eng.WorkflowTimeout() 51 l.Info("using workflow timeout", "timeout", workflowTimeout) 52 53 for _, w := range wfs { 54 wg.Add(1) 55 go func() { 56 defer wg.Done() 57 58 wid := models.WorkflowId{ 59 PipelineId: pipelineId, 60 Name: w.Name, 61 } 62 63 defer func() { 64 if s3 != nil { 65 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String())) 66 if err := s3.WriteFile(ctx, logFile); err != nil { 67 l.Error("error uploading logs", "err", err) 68 } 69 } 70 }() 71 72 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 73 if err != nil { 74 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 75 wfLogger = models.NullLogger{} 76 } else { 77 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid) 78 defer wfLogger.Close() 79 } 80 81 l.Info("waiting for slot", "wid", wid) 82 slot := WorkflowSlot(NoopSlot{}) 83 if s, ok := eng.(WorkflowSlotter); ok { 84 var err error 85 slot, err = s.AcquireWorkflowSlot(ctx, wid, &w) 86 if err != nil { 87 l.Error("failed to acquire slot", "wid", wid, "err", err) 88 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 89 if dbErr != nil { 90 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 91 } 92 return 93 } 94 } 95 defer slot.Release() 96 97 err = db.StatusRunning(wid, n) 98 if err != nil { 99 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 100 return 101 } 102 103 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 104 if err != nil { 105 // TODO(winter): Should this always set StatusFailed? 106 // In the original, we only do in a subset of cases. 107 l.Error("setting up workflow", "wid", wid, "err", err) 108 109 destroyErr := eng.DestroyWorkflow(ctx, wid) 110 if destroyErr != nil { 111 l.Error("failed to destroy workflow after setup failure", "error", destroyErr) 112 } 113 114 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 115 if dbErr != nil { 116 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 117 } 118 return 119 } 120 defer eng.DestroyWorkflow(ctx, wid) 121 122 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 123 defer cancel() 124 125 for stepIdx, step := range w.Steps { 126 // log start of step 127 if wfLogger != nil { 128 wfLogger. 129 ControlWriter(stepIdx, step, models.StepStatusStart). 130 Write([]byte{0}) 131 } 132 133 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 134 135 // log end of step 136 if wfLogger != nil { 137 wfLogger. 138 ControlWriter(stepIdx, step, models.StepStatusEnd). 139 Write([]byte{0}) 140 } 141 142 if err != nil { 143 if errors.Is(err, ErrTimedOut) { 144 dbErr := db.StatusTimeout(wid, n) 145 if dbErr != nil { 146 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr) 147 } 148 } else { 149 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 150 if dbErr != nil { 151 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 152 } 153 } 154 return 155 } 156 } 157 158 if finalizer, ok := eng.(workflowFinalizer); ok { 159 if err := finalizer.FinalizeWorkflow(ctx, wid, &w, wfLogger); err != nil { 160 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 161 if dbErr != nil { 162 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 163 } 164 return 165 } 166 } 167 168 err = db.StatusSuccess(wid, n) 169 if err != nil { 170 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 171 } 172 }() 173 } 174 } 175 176 wg.Wait() 177 l.Info("all workflows completed") 178}