Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "path/filepath" 9 "sync" 10 11 "tangled.org/core/notifier" 12 "tangled.org/core/spindle/config" 13 "tangled.org/core/spindle/db" 14 "tangled.org/core/spindle/models" 15 "tangled.org/core/spindle/secrets" 16) 17 18var ( 19 ErrTimedOut = errors.New("timed out") 20 ErrWorkflowFailed = errors.New("workflow failed") 21) 22 23func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, workflowSem chan struct{}, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 24 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 25 26 // extract secrets 27 var allSecrets []secrets.UnlockedSecret 28 if pipeline.RepoDid != "" { 29 if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(pipeline.RepoDid.String())); err == nil { 30 allSecrets = res 31 } 32 } 33 34 secretValues := make([]string, len(allSecrets)) 35 for i, s := range allSecrets { 36 secretValues[i] = s.Value 37 } 38 39 s3, err := NewS3(cfg.S3.LogBucket) 40 if err != nil { 41 l.Error("error creating s3 client", "err", err) 42 } 43 44 var wg sync.WaitGroup 45 for eng, wfs := range pipeline.Workflows { 46 workflowTimeout := eng.WorkflowTimeout() 47 l.Info("using workflow timeout", "timeout", workflowTimeout) 48 49 for _, w := range wfs { 50 wg.Add(1) 51 go func() { 52 defer wg.Done() 53 54 wid := models.WorkflowId{ 55 PipelineId: pipelineId, 56 Name: w.Name, 57 } 58 59 defer func() { 60 if s3 != nil { 61 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String())) 62 if err := s3.WriteFile(ctx, logFile); err != nil { 63 l.Error("error uploading logs", "err", err) 64 } 65 } 66 }() 67 68 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 69 if err != nil { 70 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 71 wfLogger = models.NullLogger{} 72 } else { 73 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid) 74 defer wfLogger.Close() 75 } 76 77 err = db.StatusRunning(wid, n) 78 if err != nil { 79 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 80 return 81 } 82 83 // acquire semaphore slot before starting the container 84 workflowSem <- struct{}{} 85 defer func() { <-workflowSem }() 86 87 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 88 if err != nil { 89 // TODO(winter): Should this always set StatusFailed? 90 // In the original, we only do in a subset of cases. 91 l.Error("setting up workflow", "wid", wid, "err", err) 92 93 destroyErr := eng.DestroyWorkflow(ctx, wid) 94 if destroyErr != nil { 95 l.Error("failed to destroy workflow after setup failure", "error", destroyErr) 96 } 97 98 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 99 if dbErr != nil { 100 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 101 } 102 return 103 } 104 defer eng.DestroyWorkflow(ctx, wid) 105 106 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 107 defer cancel() 108 109 for stepIdx, step := range w.Steps { 110 // log start of step 111 if wfLogger != nil { 112 wfLogger. 113 ControlWriter(stepIdx, step, models.StepStatusStart). 114 Write([]byte{0}) 115 } 116 117 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 118 119 // log end of step 120 if wfLogger != nil { 121 wfLogger. 122 ControlWriter(stepIdx, step, models.StepStatusEnd). 123 Write([]byte{0}) 124 } 125 126 if err != nil { 127 if errors.Is(err, ErrTimedOut) { 128 dbErr := db.StatusTimeout(wid, n) 129 if dbErr != nil { 130 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr) 131 } 132 } else { 133 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 134 if dbErr != nil { 135 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 136 } 137 } 138 return 139 } 140 } 141 142 err = db.StatusSuccess(wid, n) 143 if err != nil { 144 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 145 } 146 }() 147 } 148 } 149 150 wg.Wait() 151 l.Info("all workflows completed") 152}