Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "path/filepath" 9 "sync" 10 11 securejoin "github.com/cyphar/filepath-securejoin" 12 "tangled.org/core/notifier" 13 "tangled.org/core/spindle/config" 14 "tangled.org/core/spindle/db" 15 "tangled.org/core/spindle/models" 16 "tangled.org/core/spindle/secrets" 17) 18 19var ( 20 ErrTimedOut = errors.New("timed out") 21 ErrWorkflowFailed = errors.New("workflow failed") 22) 23 24func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, workflowSem chan struct{}, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 25 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 26 27 // extract secrets 28 var allSecrets []secrets.UnlockedSecret 29 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 30 if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(didSlashRepo)); err == nil { 31 allSecrets = res 32 } 33 } 34 35 secretValues := make([]string, len(allSecrets)) 36 for i, s := range allSecrets { 37 secretValues[i] = s.Value 38 } 39 40 s3, err := NewS3(cfg.S3.LogBucket) 41 if err != nil { 42 l.Error("error creating s3 client", "err", err) 43 } 44 45 var wg sync.WaitGroup 46 for eng, wfs := range pipeline.Workflows { 47 workflowTimeout := eng.WorkflowTimeout() 48 l.Info("using workflow timeout", "timeout", workflowTimeout) 49 50 for _, w := range wfs { 51 wg.Add(1) 52 go func() { 53 defer wg.Done() 54 55 wid := models.WorkflowId{ 56 PipelineId: pipelineId, 57 Name: w.Name, 58 } 59 60 defer func() { 61 if s3 != nil { 62 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String())) 63 if err := s3.WriteFile(ctx, logFile); err != nil { 64 l.Error("error uploading logs", "err", err) 65 } 66 } 67 }() 68 69 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 70 if err != nil { 71 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 72 wfLogger = models.NullLogger{} 73 } else { 74 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid) 75 defer wfLogger.Close() 76 } 77 78 err = db.StatusRunning(wid, n) 79 if err != nil { 80 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 81 return 82 } 83 84 // acquire semaphore slot before starting the container 85 workflowSem <- struct{}{} 86 defer func() { <-workflowSem }() 87 88 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 89 if err != nil { 90 // TODO(winter): Should this always set StatusFailed? 91 // In the original, we only do in a subset of cases. 92 l.Error("setting up workflow", "wid", wid, "err", err) 93 94 destroyErr := eng.DestroyWorkflow(ctx, wid) 95 if destroyErr != nil { 96 l.Error("failed to destroy workflow after setup failure", "error", destroyErr) 97 } 98 99 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 100 if dbErr != nil { 101 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 102 } 103 return 104 } 105 defer eng.DestroyWorkflow(ctx, wid) 106 107 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 108 defer cancel() 109 110 for stepIdx, step := range w.Steps { 111 // log start of step 112 if wfLogger != nil { 113 wfLogger. 114 ControlWriter(stepIdx, step, models.StepStatusStart). 115 Write([]byte{0}) 116 } 117 118 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 119 120 // log end of step 121 if wfLogger != nil { 122 wfLogger. 123 ControlWriter(stepIdx, step, models.StepStatusEnd). 124 Write([]byte{0}) 125 } 126 127 if err != nil { 128 if errors.Is(err, ErrTimedOut) { 129 dbErr := db.StatusTimeout(wid, n) 130 if dbErr != nil { 131 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr) 132 } 133 } else { 134 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 135 if dbErr != nil { 136 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 137 } 138 } 139 return 140 } 141 } 142 143 err = db.StatusSuccess(wid, n) 144 if err != nil { 145 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 146 } 147 }() 148 } 149 } 150 151 wg.Wait() 152 l.Info("all workflows completed") 153}