Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/config" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/middleware" 16 "tangled.org/core/appview/models" 17 "tangled.org/core/appview/oauth" 18 "tangled.org/core/appview/pages" 19 "tangled.org/core/appview/reporesolver" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/idresolver" 22 "tangled.org/core/orm" 23 "tangled.org/core/rbac" 24 spindlemodel "tangled.org/core/spindle/models" 25 26 "github.com/go-chi/chi/v5" 27 "github.com/gorilla/websocket" 28) 29 30type Pipelines struct { 31 repoResolver *reporesolver.RepoResolver 32 idResolver *idresolver.Resolver 33 config *config.Config 34 oauth *oauth.OAuth 35 pages *pages.Pages 36 spindlestream *eventconsumer.Consumer 37 pipelineNotifier *StatusNotifier 38 db *db.DB 39 enforcer *rbac.Enforcer 40 logger *slog.Logger 41} 42 43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 44 r := chi.NewRouter() 45 r.Get("/", p.Index) 46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 48 r. 49 With(mw.RepoPermissionMiddleware("repo:owner")). 50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 51 52 return r 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 pipelineNotifier *StatusNotifier, 61 idResolver *idresolver.Resolver, 62 db *db.DB, 63 config *config.Config, 64 enforcer *rbac.Enforcer, 65 logger *slog.Logger, 66) *Pipelines { 67 return &Pipelines{ 68 oauth: oauth, 69 repoResolver: repoResolver, 70 pages: pages, 71 idResolver: idResolver, 72 config: config, 73 spindlestream: spindlestream, 74 pipelineNotifier: pipelineNotifier, 75 db: db, 76 enforcer: enforcer, 77 logger: logger, 78 } 79} 80 81func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 82 user := p.oauth.GetMultiAccountUser(r) 83 l := p.logger.With("handler", "Index") 84 85 f, err := p.repoResolver.Resolve(r) 86 if err != nil { 87 l.Error("failed to get repo and knot", "err", err) 88 return 89 } 90 91 filterKind := r.URL.Query().Get("trigger") 92 filters := []orm.Filter{ 93 orm.FilterEq("p.repo_did", f.RepoDid), 94 } 95 switch filterKind { 96 case "push": 97 filters = append(filters, orm.FilterEq("t.kind", "push")) 98 case "pull_request": 99 filters = append(filters, orm.FilterEq("t.kind", "pull_request")) 100 default: 101 // no filters otherwise, default to "all" 102 filterKind = "all" 103 } 104 105 ps, err := db.GetPipelineStatuses( 106 p.db, 107 30, 108 filters..., 109 ) 110 if err != nil { 111 l.Error("failed to query db", "err", err) 112 return 113 } 114 115 total, err := db.GetPipelineCount(p.db, filters...) 116 if err != nil { 117 l.Error("failed to query db", "err", err) 118 return 119 } 120 121 p.pages.Pipelines(w, pages.PipelinesParams{ 122 BaseParams: pages.BaseParamsFromContext(r.Context()), 123 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 124 Pipelines: ps, 125 FilterKind: filterKind, 126 Total: total, 127 }) 128} 129 130func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 131 user := p.oauth.GetMultiAccountUser(r) 132 l := p.logger.With("handler", "Workflow") 133 134 f, err := p.repoResolver.Resolve(r) 135 if err != nil { 136 l.Error("failed to get repo and knot", "err", err) 137 return 138 } 139 140 pipelineId := chi.URLParam(r, "pipeline") 141 if pipelineId == "" { 142 l.Error("empty pipeline ID") 143 return 144 } 145 146 workflow := chi.URLParam(r, "workflow") 147 if workflow == "" { 148 l.Error("empty workflow name") 149 return 150 } 151 152 ps, err := db.GetPipelineStatuses( 153 p.db, 154 1, 155 orm.FilterEq("p.repo_did", f.RepoDid), 156 orm.FilterEq("p.id", pipelineId), 157 ) 158 if err != nil { 159 l.Error("failed to query db", "err", err) 160 return 161 } 162 163 if len(ps) != 1 { 164 l.Error("invalid number of pipelines", "len", len(ps)) 165 return 166 } 167 168 singlePipeline := ps[0] 169 170 p.pages.Workflow(w, pages.WorkflowParams{ 171 BaseParams: pages.BaseParamsFromContext(r.Context()), 172 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 173 Pipeline: singlePipeline, 174 Workflow: workflow, 175 }) 176} 177 178var upgrader = websocket.Upgrader{ 179 ReadBufferSize: 1024, 180 WriteBufferSize: 1024, 181} 182 183func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 184 l := p.logger.With("handler", "logs") 185 186 f, err := p.repoResolver.Resolve(r) 187 if err != nil { 188 l.Error("failed to get repo and knot", "err", err) 189 http.Error(w, "bad repo/knot", http.StatusBadRequest) 190 return 191 } 192 193 pipelineId := chi.URLParam(r, "pipeline") 194 workflow := chi.URLParam(r, "workflow") 195 if pipelineId == "" || workflow == "" { 196 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 197 return 198 } 199 200 ps, err := db.GetPipelineStatuses( 201 p.db, 202 1, 203 orm.FilterEq("p.repo_did", f.RepoDid), 204 orm.FilterEq("p.id", pipelineId), 205 ) 206 if err != nil || len(ps) != 1 { 207 l.Error("pipeline query failed", "err", err, "count", len(ps)) 208 http.Error(w, "pipeline not found", http.StatusNotFound) 209 return 210 } 211 212 singlePipeline := ps[0] 213 spindle := f.Spindle 214 knot := f.Knot 215 rkey := singlePipeline.Rkey 216 217 statusCh := p.pipelineNotifier.Subscribe(singlePipeline.AtUri()) 218 defer p.pipelineNotifier.Unsubscribe(singlePipeline.AtUri(), statusCh) 219 220 if spindle == "" || knot == "" || rkey == "" { 221 http.Error(w, "invalid repo info", http.StatusBadRequest) 222 return 223 } 224 225 url := SpindleURL(p.config.Core.Dev, spindle, knot, rkey, workflow) 226 l = l.With("url", url) 227 228 clientConn, err := upgrader.Upgrade(w, r, nil) 229 if err != nil { 230 l.Error("websocket upgrade failed", "err", err) 231 return 232 } 233 defer func() { 234 _ = clientConn.WriteControl( 235 websocket.CloseMessage, 236 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 237 time.Now().Add(time.Second), 238 ) 239 clientConn.Close() 240 }() 241 242 ctx, cancel := context.WithCancel(r.Context()) 243 defer cancel() 244 245 l.Info("logs endpoint hit") 246 247 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 248 if err != nil { 249 l.Error("websocket dial failed", "err", err) 250 return 251 } 252 defer spindleConn.Close() 253 254 // create a channel for incoming messages 255 evChan := make(chan LogEvent, 100) 256 // start a goroutine to read from spindle 257 go ReadLogs(spindleConn, evChan) 258 259 stepStartTimes := make(map[int]time.Time) 260 stepAnsi := make(map[int]*ansiState) 261 var fragment bytes.Buffer 262 for { 263 select { 264 case <-ctx.Done(): 265 l.Info("client disconnected") 266 return 267 268 case ev, ok := <-evChan: 269 if !ok { 270 continue 271 } 272 273 if ev.Err != nil && ev.IsCloseError() { 274 l.Debug("graceful shutdown, tail complete", "err", err) 275 return 276 } 277 if ev.Err != nil { 278 l.Error("error reading from spindle", "err", err) 279 return 280 } 281 282 var logLine spindlemodel.LogLine 283 if err = json.Unmarshal(ev.Msg, &logLine); err != nil { 284 l.Error("failed to parse logline", "err", err) 285 continue 286 } 287 288 fragment.Reset() 289 290 switch logLine.Kind { 291 case spindlemodel.LogKindControl: 292 switch logLine.StepStatus { 293 case spindlemodel.StepStatusStart: 294 stepStartTimes[logLine.StepId] = logLine.Time 295 collapsed := false 296 if logLine.StepKind == spindlemodel.StepKindSystem { 297 collapsed = true 298 } 299 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 300 Id: logLine.StepId, 301 Name: logLine.Content, 302 Command: logLine.StepCommand, 303 Collapsed: collapsed, 304 StartTime: logLine.Time, 305 }) 306 case spindlemodel.StepStatusEnd: 307 startTime := stepStartTimes[logLine.StepId] 308 endTime := logLine.Time 309 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 310 Id: logLine.StepId, 311 StartTime: startTime, 312 EndTime: endTime, 313 }) 314 } 315 316 case spindlemodel.LogKindData: 317 ansi, ok := stepAnsi[logLine.StepId] 318 if !ok { 319 ansi = NewAnsiState() 320 stepAnsi[logLine.StepId] = ansi 321 } 322 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 323 Id: logLine.StepId, 324 Content: ansi.Render(logLine.Content), 325 }) 326 } 327 if err != nil { 328 l.Error("failed to render log line", "err", err) 329 return 330 } 331 332 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 333 l.Error("error writing to client", "err", err) 334 return 335 } 336 337 case _, ok := <-statusCh: 338 if !ok { 339 continue 340 } 341 fresh, err := db.GetPipelineStatuses( 342 p.db, 343 1, 344 orm.FilterEq("p.repo_did", f.RepoDid), 345 orm.FilterEq("p.id", pipelineId), 346 ) 347 if err != nil || len(fresh) == 0 { 348 continue 349 } 350 for name, ws := range fresh[0].Statuses { 351 fragment.Reset() 352 if err = p.pages.WorkflowSymbolOOB(&fragment, pages.WorkflowSymbolOOBParams{ 353 Name: name, 354 Statuses: ws, 355 }); err != nil { 356 l.Error("failed to render workflow symbol OOB", "err", err) 357 continue 358 } 359 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 360 l.Error("error writing workflow symbol to client", "err", err) 361 return 362 } 363 } 364 365 case <-time.After(30 * time.Second): 366 l.Debug("sent keepalive") 367 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 368 l.Error("failed to write control", "err", err) 369 return 370 } 371 } 372 } 373} 374 375func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 376 l := p.logger.With("handler", "Cancel") 377 378 var ( 379 pipelineId = chi.URLParam(r, "pipeline") 380 workflow = chi.URLParam(r, "workflow") 381 ) 382 if pipelineId == "" || workflow == "" { 383 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 384 return 385 } 386 387 f, err := p.repoResolver.Resolve(r) 388 if err != nil { 389 l.Error("failed to get repo and knot", "err", err) 390 http.Error(w, "bad repo/knot", http.StatusBadRequest) 391 return 392 } 393 394 pipeline, err := func() (models.Pipeline, error) { 395 ps, err := db.GetPipelineStatuses( 396 p.db, 397 1, 398 orm.FilterEq("p.repo_did", f.RepoDid), 399 orm.FilterEq("p.id", pipelineId), 400 ) 401 if err != nil { 402 return models.Pipeline{}, err 403 } 404 if len(ps) != 1 { 405 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 406 } 407 return ps[0], nil 408 }() 409 if err != nil { 410 l.Error("pipeline query failed", "err", err) 411 http.Error(w, "pipeline not found", http.StatusNotFound) 412 } 413 var ( 414 spindle = f.Spindle 415 knot = f.Knot 416 rkey = pipeline.Rkey 417 ) 418 419 if spindle == "" || knot == "" || rkey == "" { 420 http.Error(w, "invalid repo info", http.StatusBadRequest) 421 return 422 } 423 424 spindleClient, err := p.oauth.ServiceClient( 425 r, 426 oauth.WithService(f.Spindle), 427 oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 428 oauth.WithDev(p.config.Core.Dev), 429 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 430 ) 431 432 err = tangled.PipelineCancelPipeline( 433 r.Context(), 434 spindleClient, 435 &tangled.PipelineCancelPipeline_Input{ 436 Repo: string(f.RepoAt()), 437 Pipeline: pipeline.AtUri().String(), 438 Workflow: workflow, 439 }, 440 ) 441 errorId := "workflow-error" 442 if err != nil { 443 l.Error("failed to cancel workflow", "err", err) 444 p.pages.Notice(w, errorId, "Failed to cancel workflow") 445 return 446 } 447 l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 448}