Monorepo for Tangled
tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/config"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/middleware"
16 "tangled.org/core/appview/models"
17 "tangled.org/core/appview/oauth"
18 "tangled.org/core/appview/pages"
19 "tangled.org/core/appview/reporesolver"
20 "tangled.org/core/eventconsumer"
21 "tangled.org/core/hostutil"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 spindlemodel "tangled.org/core/spindle/models"
26
27 "github.com/go-chi/chi/v5"
28 "github.com/gorilla/websocket"
29)
30
31type Pipelines struct {
32 repoResolver *reporesolver.RepoResolver
33 idResolver *idresolver.Resolver
34 config *config.Config
35 oauth *oauth.OAuth
36 pages *pages.Pages
37 spindlestream *eventconsumer.Consumer
38 pipelineNotifier *StatusNotifier
39 db *db.DB
40 enforcer *rbac.Enforcer
41 logger *slog.Logger
42}
43
44func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler {
45 r := chi.NewRouter()
46 r.Get("/", p.Index)
47 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
48 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
49 r.
50 With(mw.RepoPermissionMiddleware("repo:owner")).
51 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel)
52
53 return r
54}
55
56func New(
57 oauth *oauth.OAuth,
58 repoResolver *reporesolver.RepoResolver,
59 pages *pages.Pages,
60 spindlestream *eventconsumer.Consumer,
61 pipelineNotifier *StatusNotifier,
62 idResolver *idresolver.Resolver,
63 db *db.DB,
64 config *config.Config,
65 enforcer *rbac.Enforcer,
66 logger *slog.Logger,
67) *Pipelines {
68 return &Pipelines{
69 oauth: oauth,
70 repoResolver: repoResolver,
71 pages: pages,
72 idResolver: idResolver,
73 config: config,
74 spindlestream: spindlestream,
75 pipelineNotifier: pipelineNotifier,
76 db: db,
77 enforcer: enforcer,
78 logger: logger,
79 }
80}
81
82func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
83 user := p.oauth.GetMultiAccountUser(r)
84 l := p.logger.With("handler", "Index")
85
86 f, err := p.repoResolver.Resolve(r)
87 if err != nil {
88 l.Error("failed to get repo and knot", "err", err)
89 return
90 }
91
92 filterKind := r.URL.Query().Get("trigger")
93 filters := []orm.Filter{
94 orm.FilterEq("p.repo_did", f.RepoDid),
95 }
96 switch filterKind {
97 case "push":
98 filters = append(filters, orm.FilterEq("t.kind", "push"))
99 case "pull_request":
100 filters = append(filters, orm.FilterEq("t.kind", "pull_request"))
101 default:
102 // no filters otherwise, default to "all"
103 filterKind = "all"
104 }
105
106 ps, err := db.GetPipelineStatuses(
107 p.db,
108 30,
109 filters...,
110 )
111 if err != nil {
112 l.Error("failed to query db", "err", err)
113 return
114 }
115
116 total, err := db.GetPipelineCount(p.db, filters...)
117 if err != nil {
118 l.Error("failed to query db", "err", err)
119 return
120 }
121
122 p.pages.Pipelines(w, pages.PipelinesParams{
123 BaseParams: pages.BaseParamsFromContext(r.Context()),
124 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
125 Pipelines: ps,
126 FilterKind: filterKind,
127 Total: total,
128 })
129}
130
131func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
132 user := p.oauth.GetMultiAccountUser(r)
133 l := p.logger.With("handler", "Workflow")
134
135 f, err := p.repoResolver.Resolve(r)
136 if err != nil {
137 l.Error("failed to get repo and knot", "err", err)
138 return
139 }
140
141 pipelineId := chi.URLParam(r, "pipeline")
142 if pipelineId == "" {
143 l.Error("empty pipeline ID")
144 return
145 }
146
147 workflow := chi.URLParam(r, "workflow")
148 if workflow == "" {
149 l.Error("empty workflow name")
150 return
151 }
152
153 ps, err := db.GetPipelineStatuses(
154 p.db,
155 1,
156 orm.FilterEq("p.repo_did", f.RepoDid),
157 orm.FilterEq("p.id", pipelineId),
158 )
159 if err != nil {
160 l.Error("failed to query db", "err", err)
161 return
162 }
163
164 if len(ps) != 1 {
165 l.Error("invalid number of pipelines", "len", len(ps))
166 return
167 }
168
169 singlePipeline := ps[0]
170
171 p.pages.Workflow(w, pages.WorkflowParams{
172 BaseParams: pages.BaseParamsFromContext(r.Context()),
173 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
174 Pipeline: singlePipeline,
175 Workflow: workflow,
176 })
177}
178
179var upgrader = websocket.Upgrader{
180 ReadBufferSize: 1024,
181 WriteBufferSize: 1024,
182}
183
184func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
185 l := p.logger.With("handler", "logs")
186
187 f, err := p.repoResolver.Resolve(r)
188 if err != nil {
189 l.Error("failed to get repo and knot", "err", err)
190 http.Error(w, "bad repo/knot", http.StatusBadRequest)
191 return
192 }
193
194 pipelineId := chi.URLParam(r, "pipeline")
195 workflow := chi.URLParam(r, "workflow")
196 if pipelineId == "" || workflow == "" {
197 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
198 return
199 }
200
201 ps, err := db.GetPipelineStatuses(
202 p.db,
203 1,
204 orm.FilterEq("p.repo_did", f.RepoDid),
205 orm.FilterEq("p.id", pipelineId),
206 )
207 if err != nil || len(ps) != 1 {
208 l.Error("pipeline query failed", "err", err, "count", len(ps))
209 http.Error(w, "pipeline not found", http.StatusNotFound)
210 return
211 }
212
213 singlePipeline := ps[0]
214 spindle := f.Spindle
215 knot := f.Knot
216 rkey := singlePipeline.Rkey
217
218 statusCh := p.pipelineNotifier.Subscribe(singlePipeline.AtUri())
219 defer p.pipelineNotifier.Unsubscribe(singlePipeline.AtUri(), statusCh)
220
221 if spindle == "" || knot == "" || rkey == "" {
222 http.Error(w, "invalid repo info", http.StatusBadRequest)
223 return
224 }
225
226 url := SpindleURL(spindle, knot, rkey, workflow)
227 if url == "" {
228 http.Error(w, "invalid spindle hostname", http.StatusBadRequest)
229 return
230 }
231 l = l.With("url", url)
232
233 clientConn, err := upgrader.Upgrade(w, r, nil)
234 if err != nil {
235 l.Error("websocket upgrade failed", "err", err)
236 return
237 }
238 defer func() {
239 _ = clientConn.WriteControl(
240 websocket.CloseMessage,
241 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
242 time.Now().Add(time.Second),
243 )
244 clientConn.Close()
245 }()
246
247 ctx, cancel := context.WithCancel(r.Context())
248 defer cancel()
249
250 l.Info("logs endpoint hit")
251
252 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
253 if err != nil {
254 l.Error("websocket dial failed", "err", err)
255 return
256 }
257 defer spindleConn.Close()
258
259 // create a channel for incoming messages
260 evChan := make(chan LogEvent, 100)
261 // start a goroutine to read from spindle
262 go ReadLogs(spindleConn, evChan)
263
264 stepStartTimes := make(map[int]time.Time)
265 stepAnsi := make(map[int]*ansiState)
266 var fragment bytes.Buffer
267 for {
268 select {
269 case <-ctx.Done():
270 l.Info("client disconnected")
271 return
272
273 case ev, ok := <-evChan:
274 if !ok {
275 continue
276 }
277
278 if ev.Err != nil && ev.IsCloseError() {
279 l.Debug("graceful shutdown, tail complete", "err", err)
280 return
281 }
282 if ev.Err != nil {
283 l.Error("error reading from spindle", "err", err)
284 return
285 }
286
287 var logLine spindlemodel.LogLine
288 if err = json.Unmarshal(ev.Msg, &logLine); err != nil {
289 l.Error("failed to parse logline", "err", err)
290 continue
291 }
292
293 fragment.Reset()
294
295 switch logLine.Kind {
296 case spindlemodel.LogKindControl:
297 switch logLine.StepStatus {
298 case spindlemodel.StepStatusStart:
299 stepStartTimes[logLine.StepId] = logLine.Time
300 collapsed := false
301 if logLine.StepKind == spindlemodel.StepKindSystem {
302 collapsed = true
303 }
304 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
305 Id: logLine.StepId,
306 Name: logLine.Content,
307 Command: logLine.StepCommand,
308 Collapsed: collapsed,
309 StartTime: logLine.Time,
310 })
311 case spindlemodel.StepStatusEnd:
312 startTime := stepStartTimes[logLine.StepId]
313 endTime := logLine.Time
314 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
315 Id: logLine.StepId,
316 StartTime: startTime,
317 EndTime: endTime,
318 })
319 }
320
321 case spindlemodel.LogKindData:
322 ansi, ok := stepAnsi[logLine.StepId]
323 if !ok {
324 ansi = NewAnsiState()
325 stepAnsi[logLine.StepId] = ansi
326 }
327 err = p.pages.LogLine(&fragment, pages.LogLineParams{
328 Id: logLine.StepId,
329 Content: ansi.Render(logLine.Content),
330 })
331 }
332 if err != nil {
333 l.Error("failed to render log line", "err", err)
334 return
335 }
336
337 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
338 l.Error("error writing to client", "err", err)
339 return
340 }
341
342 case _, ok := <-statusCh:
343 if !ok {
344 continue
345 }
346 fresh, err := db.GetPipelineStatuses(
347 p.db,
348 1,
349 orm.FilterEq("p.repo_did", f.RepoDid),
350 orm.FilterEq("p.id", pipelineId),
351 )
352 if err != nil || len(fresh) == 0 {
353 continue
354 }
355 for name, ws := range fresh[0].Statuses {
356 fragment.Reset()
357 if err = p.pages.WorkflowSymbolOOB(&fragment, pages.WorkflowSymbolOOBParams{
358 Name: name,
359 Statuses: ws,
360 }); err != nil {
361 l.Error("failed to render workflow symbol OOB", "err", err)
362 continue
363 }
364 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
365 l.Error("error writing workflow symbol to client", "err", err)
366 return
367 }
368 }
369
370 case <-time.After(30 * time.Second):
371 l.Debug("sent keepalive")
372 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
373 l.Error("failed to write control", "err", err)
374 return
375 }
376 }
377 }
378}
379
380func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) {
381 l := p.logger.With("handler", "Cancel")
382
383 var (
384 pipelineId = chi.URLParam(r, "pipeline")
385 workflow = chi.URLParam(r, "workflow")
386 )
387 if pipelineId == "" || workflow == "" {
388 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
389 return
390 }
391
392 f, err := p.repoResolver.Resolve(r)
393 if err != nil {
394 l.Error("failed to get repo and knot", "err", err)
395 http.Error(w, "bad repo/knot", http.StatusBadRequest)
396 return
397 }
398
399 pipeline, err := func() (models.Pipeline, error) {
400 ps, err := db.GetPipelineStatuses(
401 p.db,
402 1,
403 orm.FilterEq("p.repo_did", f.RepoDid),
404 orm.FilterEq("p.id", pipelineId),
405 )
406 if err != nil {
407 return models.Pipeline{}, err
408 }
409 if len(ps) != 1 {
410 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps))
411 }
412 return ps[0], nil
413 }()
414 if err != nil {
415 l.Error("pipeline query failed", "err", err)
416 http.Error(w, "pipeline not found", http.StatusNotFound)
417 }
418 var (
419 spindle = f.Spindle
420 knot = f.Knot
421 rkey = pipeline.Rkey
422 )
423
424 if spindle == "" || knot == "" || rkey == "" {
425 http.Error(w, "invalid repo info", http.StatusBadRequest)
426 return
427 }
428
429 hostname, noTLS, err := hostutil.ParseHostname(spindle)
430 if err != nil {
431 http.Error(w, "invalid spindle hostname", http.StatusBadRequest)
432 return
433 }
434
435 spindleClient, err := p.oauth.ServiceClient(
436 r,
437 oauth.WithService(hostname),
438 oauth.WithLxm(tangled.PipelineCancelPipelineNSID),
439 oauth.WithDev(noTLS),
440 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time
441 )
442
443 err = tangled.PipelineCancelPipeline(
444 r.Context(),
445 spindleClient,
446 &tangled.PipelineCancelPipeline_Input{
447 Repo: string(f.RepoAt()),
448 Pipeline: pipeline.AtUri().String(),
449 Workflow: workflow,
450 },
451 )
452 errorId := "workflow-error"
453 if err != nil {
454 l.Error("failed to cancel workflow", "err", err)
455 p.pages.Notice(w, errorId, "Failed to cancel workflow")
456 return
457 }
458 l.Debug("canceled pipeline", "uri", pipeline.AtUri())
459}