Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/config" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/middleware" 16 "tangled.org/core/appview/models" 17 "tangled.org/core/appview/oauth" 18 "tangled.org/core/appview/pages" 19 "tangled.org/core/appview/reporesolver" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/hostutil" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 spindlemodel "tangled.org/core/spindle/models" 26 27 "github.com/go-chi/chi/v5" 28 "github.com/gorilla/websocket" 29) 30 31type Pipelines struct { 32 repoResolver *reporesolver.RepoResolver 33 idResolver *idresolver.Resolver 34 config *config.Config 35 oauth *oauth.OAuth 36 pages *pages.Pages 37 spindlestream *eventconsumer.Consumer 38 pipelineNotifier *StatusNotifier 39 db *db.DB 40 enforcer *rbac.Enforcer 41 logger *slog.Logger 42} 43 44func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 45 r := chi.NewRouter() 46 r.Get("/", p.Index) 47 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 48 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 49 r. 50 With(mw.RepoPermissionMiddleware("repo:owner")). 51 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 52 53 return r 54} 55 56func New( 57 oauth *oauth.OAuth, 58 repoResolver *reporesolver.RepoResolver, 59 pages *pages.Pages, 60 spindlestream *eventconsumer.Consumer, 61 pipelineNotifier *StatusNotifier, 62 idResolver *idresolver.Resolver, 63 db *db.DB, 64 config *config.Config, 65 enforcer *rbac.Enforcer, 66 logger *slog.Logger, 67) *Pipelines { 68 return &Pipelines{ 69 oauth: oauth, 70 repoResolver: repoResolver, 71 pages: pages, 72 idResolver: idResolver, 73 config: config, 74 spindlestream: spindlestream, 75 pipelineNotifier: pipelineNotifier, 76 db: db, 77 enforcer: enforcer, 78 logger: logger, 79 } 80} 81 82func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 83 user := p.oauth.GetMultiAccountUser(r) 84 l := p.logger.With("handler", "Index") 85 86 f, err := p.repoResolver.Resolve(r) 87 if err != nil { 88 l.Error("failed to get repo and knot", "err", err) 89 return 90 } 91 92 filterKind := r.URL.Query().Get("trigger") 93 filters := []orm.Filter{ 94 orm.FilterEq("p.repo_did", f.RepoDid), 95 } 96 switch filterKind { 97 case "push": 98 filters = append(filters, orm.FilterEq("t.kind", "push")) 99 case "pull_request": 100 filters = append(filters, orm.FilterEq("t.kind", "pull_request")) 101 default: 102 // no filters otherwise, default to "all" 103 filterKind = "all" 104 } 105 106 ps, err := db.GetPipelineStatuses( 107 p.db, 108 30, 109 filters..., 110 ) 111 if err != nil { 112 l.Error("failed to query db", "err", err) 113 return 114 } 115 116 total, err := db.GetPipelineCount(p.db, filters...) 117 if err != nil { 118 l.Error("failed to query db", "err", err) 119 return 120 } 121 122 p.pages.Pipelines(w, pages.PipelinesParams{ 123 BaseParams: pages.BaseParamsFromContext(r.Context()), 124 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 125 Pipelines: ps, 126 FilterKind: filterKind, 127 Total: total, 128 }) 129} 130 131func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 132 user := p.oauth.GetMultiAccountUser(r) 133 l := p.logger.With("handler", "Workflow") 134 135 f, err := p.repoResolver.Resolve(r) 136 if err != nil { 137 l.Error("failed to get repo and knot", "err", err) 138 return 139 } 140 141 pipelineId := chi.URLParam(r, "pipeline") 142 if pipelineId == "" { 143 l.Error("empty pipeline ID") 144 return 145 } 146 147 workflow := chi.URLParam(r, "workflow") 148 if workflow == "" { 149 l.Error("empty workflow name") 150 return 151 } 152 153 ps, err := db.GetPipelineStatuses( 154 p.db, 155 1, 156 orm.FilterEq("p.repo_did", f.RepoDid), 157 orm.FilterEq("p.id", pipelineId), 158 ) 159 if err != nil { 160 l.Error("failed to query db", "err", err) 161 return 162 } 163 164 if len(ps) != 1 { 165 l.Error("invalid number of pipelines", "len", len(ps)) 166 return 167 } 168 169 singlePipeline := ps[0] 170 171 p.pages.Workflow(w, pages.WorkflowParams{ 172 BaseParams: pages.BaseParamsFromContext(r.Context()), 173 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 174 Pipeline: singlePipeline, 175 Workflow: workflow, 176 }) 177} 178 179var upgrader = websocket.Upgrader{ 180 ReadBufferSize: 1024, 181 WriteBufferSize: 1024, 182} 183 184func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 185 l := p.logger.With("handler", "logs") 186 187 f, err := p.repoResolver.Resolve(r) 188 if err != nil { 189 l.Error("failed to get repo and knot", "err", err) 190 http.Error(w, "bad repo/knot", http.StatusBadRequest) 191 return 192 } 193 194 pipelineId := chi.URLParam(r, "pipeline") 195 workflow := chi.URLParam(r, "workflow") 196 if pipelineId == "" || workflow == "" { 197 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 198 return 199 } 200 201 ps, err := db.GetPipelineStatuses( 202 p.db, 203 1, 204 orm.FilterEq("p.repo_did", f.RepoDid), 205 orm.FilterEq("p.id", pipelineId), 206 ) 207 if err != nil || len(ps) != 1 { 208 l.Error("pipeline query failed", "err", err, "count", len(ps)) 209 http.Error(w, "pipeline not found", http.StatusNotFound) 210 return 211 } 212 213 singlePipeline := ps[0] 214 spindle := f.Spindle 215 knot := f.Knot 216 rkey := singlePipeline.Rkey 217 218 statusCh := p.pipelineNotifier.Subscribe(singlePipeline.AtUri()) 219 defer p.pipelineNotifier.Unsubscribe(singlePipeline.AtUri(), statusCh) 220 221 if spindle == "" || knot == "" || rkey == "" { 222 http.Error(w, "invalid repo info", http.StatusBadRequest) 223 return 224 } 225 226 url := SpindleURL(spindle, knot, rkey, workflow) 227 if url == "" { 228 http.Error(w, "invalid spindle hostname", http.StatusBadRequest) 229 return 230 } 231 l = l.With("url", url) 232 233 clientConn, err := upgrader.Upgrade(w, r, nil) 234 if err != nil { 235 l.Error("websocket upgrade failed", "err", err) 236 return 237 } 238 defer func() { 239 _ = clientConn.WriteControl( 240 websocket.CloseMessage, 241 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 242 time.Now().Add(time.Second), 243 ) 244 clientConn.Close() 245 }() 246 247 ctx, cancel := context.WithCancel(r.Context()) 248 defer cancel() 249 250 l.Info("logs endpoint hit") 251 252 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 253 if err != nil { 254 l.Error("websocket dial failed", "err", err) 255 return 256 } 257 defer spindleConn.Close() 258 259 // create a channel for incoming messages 260 evChan := make(chan LogEvent, 100) 261 // start a goroutine to read from spindle 262 go ReadLogs(spindleConn, evChan) 263 264 stepStartTimes := make(map[int]time.Time) 265 stepAnsi := make(map[int]*ansiState) 266 var fragment bytes.Buffer 267 for { 268 select { 269 case <-ctx.Done(): 270 l.Info("client disconnected") 271 return 272 273 case ev, ok := <-evChan: 274 if !ok { 275 continue 276 } 277 278 if ev.Err != nil && ev.IsCloseError() { 279 l.Debug("graceful shutdown, tail complete", "err", err) 280 return 281 } 282 if ev.Err != nil { 283 l.Error("error reading from spindle", "err", err) 284 return 285 } 286 287 var logLine spindlemodel.LogLine 288 if err = json.Unmarshal(ev.Msg, &logLine); err != nil { 289 l.Error("failed to parse logline", "err", err) 290 continue 291 } 292 293 fragment.Reset() 294 295 switch logLine.Kind { 296 case spindlemodel.LogKindControl: 297 switch logLine.StepStatus { 298 case spindlemodel.StepStatusStart: 299 stepStartTimes[logLine.StepId] = logLine.Time 300 collapsed := false 301 if logLine.StepKind == spindlemodel.StepKindSystem { 302 collapsed = true 303 } 304 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 305 Id: logLine.StepId, 306 Name: logLine.Content, 307 Command: logLine.StepCommand, 308 Collapsed: collapsed, 309 StartTime: logLine.Time, 310 }) 311 case spindlemodel.StepStatusEnd: 312 startTime := stepStartTimes[logLine.StepId] 313 endTime := logLine.Time 314 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 315 Id: logLine.StepId, 316 StartTime: startTime, 317 EndTime: endTime, 318 }) 319 } 320 321 case spindlemodel.LogKindData: 322 ansi, ok := stepAnsi[logLine.StepId] 323 if !ok { 324 ansi = NewAnsiState() 325 stepAnsi[logLine.StepId] = ansi 326 } 327 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 328 Id: logLine.StepId, 329 Content: ansi.Render(logLine.Content), 330 }) 331 } 332 if err != nil { 333 l.Error("failed to render log line", "err", err) 334 return 335 } 336 337 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 338 l.Error("error writing to client", "err", err) 339 return 340 } 341 342 case _, ok := <-statusCh: 343 if !ok { 344 continue 345 } 346 fresh, err := db.GetPipelineStatuses( 347 p.db, 348 1, 349 orm.FilterEq("p.repo_did", f.RepoDid), 350 orm.FilterEq("p.id", pipelineId), 351 ) 352 if err != nil || len(fresh) == 0 { 353 continue 354 } 355 for name, ws := range fresh[0].Statuses { 356 fragment.Reset() 357 if err = p.pages.WorkflowSymbolOOB(&fragment, pages.WorkflowSymbolOOBParams{ 358 Name: name, 359 Statuses: ws, 360 }); err != nil { 361 l.Error("failed to render workflow symbol OOB", "err", err) 362 continue 363 } 364 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 365 l.Error("error writing workflow symbol to client", "err", err) 366 return 367 } 368 } 369 370 case <-time.After(30 * time.Second): 371 l.Debug("sent keepalive") 372 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 373 l.Error("failed to write control", "err", err) 374 return 375 } 376 } 377 } 378} 379 380func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 381 l := p.logger.With("handler", "Cancel") 382 383 var ( 384 pipelineId = chi.URLParam(r, "pipeline") 385 workflow = chi.URLParam(r, "workflow") 386 ) 387 if pipelineId == "" || workflow == "" { 388 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 389 return 390 } 391 392 f, err := p.repoResolver.Resolve(r) 393 if err != nil { 394 l.Error("failed to get repo and knot", "err", err) 395 http.Error(w, "bad repo/knot", http.StatusBadRequest) 396 return 397 } 398 399 pipeline, err := func() (models.Pipeline, error) { 400 ps, err := db.GetPipelineStatuses( 401 p.db, 402 1, 403 orm.FilterEq("p.repo_did", f.RepoDid), 404 orm.FilterEq("p.id", pipelineId), 405 ) 406 if err != nil { 407 return models.Pipeline{}, err 408 } 409 if len(ps) != 1 { 410 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 411 } 412 return ps[0], nil 413 }() 414 if err != nil { 415 l.Error("pipeline query failed", "err", err) 416 http.Error(w, "pipeline not found", http.StatusNotFound) 417 } 418 var ( 419 spindle = f.Spindle 420 knot = f.Knot 421 rkey = pipeline.Rkey 422 ) 423 424 if spindle == "" || knot == "" || rkey == "" { 425 http.Error(w, "invalid repo info", http.StatusBadRequest) 426 return 427 } 428 429 hostname, noTLS, err := hostutil.ParseHostname(spindle) 430 if err != nil { 431 http.Error(w, "invalid spindle hostname", http.StatusBadRequest) 432 return 433 } 434 435 spindleClient, err := p.oauth.ServiceClient( 436 r, 437 oauth.WithService(hostname), 438 oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 439 oauth.WithDev(noTLS), 440 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 441 ) 442 443 err = tangled.PipelineCancelPipeline( 444 r.Context(), 445 spindleClient, 446 &tangled.PipelineCancelPipeline_Input{ 447 Repo: string(f.RepoAt()), 448 Pipeline: pipeline.AtUri().String(), 449 Workflow: workflow, 450 }, 451 ) 452 errorId := "workflow-error" 453 if err != nil { 454 l.Error("failed to cancel workflow", "err", err) 455 p.pages.Notice(w, errorId, "Failed to cancel workflow") 456 return 457 } 458 l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 459}