Monorepo for Tangled
tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/config"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/middleware"
16 "tangled.org/core/appview/models"
17 "tangled.org/core/appview/oauth"
18 "tangled.org/core/appview/pages"
19 "tangled.org/core/appview/reporesolver"
20 "tangled.org/core/eventconsumer"
21 "tangled.org/core/idresolver"
22 "tangled.org/core/orm"
23 "tangled.org/core/rbac"
24 spindlemodel "tangled.org/core/spindle/models"
25
26 "github.com/go-chi/chi/v5"
27 "github.com/gorilla/websocket"
28)
29
30type Pipelines struct {
31 repoResolver *reporesolver.RepoResolver
32 idResolver *idresolver.Resolver
33 config *config.Config
34 oauth *oauth.OAuth
35 pages *pages.Pages
36 spindlestream *eventconsumer.Consumer
37 pipelineNotifier *StatusNotifier
38 db *db.DB
39 enforcer *rbac.Enforcer
40 logger *slog.Logger
41}
42
43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler {
44 r := chi.NewRouter()
45 r.Get("/", p.Index)
46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
48 r.
49 With(mw.RepoPermissionMiddleware("repo:owner")).
50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel)
51
52 return r
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 pipelineNotifier *StatusNotifier,
61 idResolver *idresolver.Resolver,
62 db *db.DB,
63 config *config.Config,
64 enforcer *rbac.Enforcer,
65 logger *slog.Logger,
66) *Pipelines {
67 return &Pipelines{
68 oauth: oauth,
69 repoResolver: repoResolver,
70 pages: pages,
71 idResolver: idResolver,
72 config: config,
73 spindlestream: spindlestream,
74 pipelineNotifier: pipelineNotifier,
75 db: db,
76 enforcer: enforcer,
77 logger: logger,
78 }
79}
80
81func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
82 user := p.oauth.GetMultiAccountUser(r)
83 l := p.logger.With("handler", "Index")
84
85 f, err := p.repoResolver.Resolve(r)
86 if err != nil {
87 l.Error("failed to get repo and knot", "err", err)
88 return
89 }
90
91 filterKind := r.URL.Query().Get("trigger")
92 filters := []orm.Filter{
93 orm.FilterEq("p.repo_did", f.RepoDid),
94 }
95 switch filterKind {
96 case "push":
97 filters = append(filters, orm.FilterEq("t.kind", "push"))
98 case "pull_request":
99 filters = append(filters, orm.FilterEq("t.kind", "pull_request"))
100 default:
101 // no filters otherwise, default to "all"
102 filterKind = "all"
103 }
104
105 ps, err := db.GetPipelineStatuses(
106 p.db,
107 30,
108 filters...,
109 )
110 if err != nil {
111 l.Error("failed to query db", "err", err)
112 return
113 }
114
115 total, err := db.GetPipelineCount(p.db, filters...)
116 if err != nil {
117 l.Error("failed to query db", "err", err)
118 return
119 }
120
121 p.pages.Pipelines(w, pages.PipelinesParams{
122 BaseParams: pages.BaseParamsFromContext(r.Context()),
123 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
124 Pipelines: ps,
125 FilterKind: filterKind,
126 Total: total,
127 })
128}
129
130func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
131 user := p.oauth.GetMultiAccountUser(r)
132 l := p.logger.With("handler", "Workflow")
133
134 f, err := p.repoResolver.Resolve(r)
135 if err != nil {
136 l.Error("failed to get repo and knot", "err", err)
137 return
138 }
139
140 pipelineId := chi.URLParam(r, "pipeline")
141 if pipelineId == "" {
142 l.Error("empty pipeline ID")
143 return
144 }
145
146 workflow := chi.URLParam(r, "workflow")
147 if workflow == "" {
148 l.Error("empty workflow name")
149 return
150 }
151
152 ps, err := db.GetPipelineStatuses(
153 p.db,
154 1,
155 orm.FilterEq("p.repo_did", f.RepoDid),
156 orm.FilterEq("p.id", pipelineId),
157 )
158 if err != nil {
159 l.Error("failed to query db", "err", err)
160 return
161 }
162
163 if len(ps) != 1 {
164 l.Error("invalid number of pipelines", "len", len(ps))
165 return
166 }
167
168 singlePipeline := ps[0]
169
170 p.pages.Workflow(w, pages.WorkflowParams{
171 BaseParams: pages.BaseParamsFromContext(r.Context()),
172 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
173 Pipeline: singlePipeline,
174 Workflow: workflow,
175 })
176}
177
178var upgrader = websocket.Upgrader{
179 ReadBufferSize: 1024,
180 WriteBufferSize: 1024,
181}
182
183func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
184 l := p.logger.With("handler", "logs")
185
186 f, err := p.repoResolver.Resolve(r)
187 if err != nil {
188 l.Error("failed to get repo and knot", "err", err)
189 http.Error(w, "bad repo/knot", http.StatusBadRequest)
190 return
191 }
192
193 pipelineId := chi.URLParam(r, "pipeline")
194 workflow := chi.URLParam(r, "workflow")
195 if pipelineId == "" || workflow == "" {
196 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
197 return
198 }
199
200 ps, err := db.GetPipelineStatuses(
201 p.db,
202 1,
203 orm.FilterEq("p.repo_did", f.RepoDid),
204 orm.FilterEq("p.id", pipelineId),
205 )
206 if err != nil || len(ps) != 1 {
207 l.Error("pipeline query failed", "err", err, "count", len(ps))
208 http.Error(w, "pipeline not found", http.StatusNotFound)
209 return
210 }
211
212 singlePipeline := ps[0]
213 spindle := f.Spindle
214 knot := f.Knot
215 rkey := singlePipeline.Rkey
216
217 statusCh := p.pipelineNotifier.Subscribe(singlePipeline.AtUri())
218 defer p.pipelineNotifier.Unsubscribe(singlePipeline.AtUri(), statusCh)
219
220 if spindle == "" || knot == "" || rkey == "" {
221 http.Error(w, "invalid repo info", http.StatusBadRequest)
222 return
223 }
224
225 url := SpindleURL(p.config.Core.Dev, spindle, knot, rkey, workflow)
226 l = l.With("url", url)
227
228 clientConn, err := upgrader.Upgrade(w, r, nil)
229 if err != nil {
230 l.Error("websocket upgrade failed", "err", err)
231 return
232 }
233 defer func() {
234 _ = clientConn.WriteControl(
235 websocket.CloseMessage,
236 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
237 time.Now().Add(time.Second),
238 )
239 clientConn.Close()
240 }()
241
242 ctx, cancel := context.WithCancel(r.Context())
243 defer cancel()
244
245 l.Info("logs endpoint hit")
246
247 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
248 if err != nil {
249 l.Error("websocket dial failed", "err", err)
250 return
251 }
252 defer spindleConn.Close()
253
254 // create a channel for incoming messages
255 evChan := make(chan LogEvent, 100)
256 // start a goroutine to read from spindle
257 go ReadLogs(spindleConn, evChan)
258
259 stepStartTimes := make(map[int]time.Time)
260 stepAnsi := make(map[int]*ansiState)
261 var fragment bytes.Buffer
262 for {
263 select {
264 case <-ctx.Done():
265 l.Info("client disconnected")
266 return
267
268 case ev, ok := <-evChan:
269 if !ok {
270 continue
271 }
272
273 if ev.Err != nil && ev.IsCloseError() {
274 l.Debug("graceful shutdown, tail complete", "err", err)
275 return
276 }
277 if ev.Err != nil {
278 l.Error("error reading from spindle", "err", err)
279 return
280 }
281
282 var logLine spindlemodel.LogLine
283 if err = json.Unmarshal(ev.Msg, &logLine); err != nil {
284 l.Error("failed to parse logline", "err", err)
285 continue
286 }
287
288 fragment.Reset()
289
290 switch logLine.Kind {
291 case spindlemodel.LogKindControl:
292 switch logLine.StepStatus {
293 case spindlemodel.StepStatusStart:
294 stepStartTimes[logLine.StepId] = logLine.Time
295 collapsed := false
296 if logLine.StepKind == spindlemodel.StepKindSystem {
297 collapsed = true
298 }
299 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
300 Id: logLine.StepId,
301 Name: logLine.Content,
302 Command: logLine.StepCommand,
303 Collapsed: collapsed,
304 StartTime: logLine.Time,
305 })
306 case spindlemodel.StepStatusEnd:
307 startTime := stepStartTimes[logLine.StepId]
308 endTime := logLine.Time
309 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
310 Id: logLine.StepId,
311 StartTime: startTime,
312 EndTime: endTime,
313 })
314 }
315
316 case spindlemodel.LogKindData:
317 ansi, ok := stepAnsi[logLine.StepId]
318 if !ok {
319 ansi = NewAnsiState()
320 stepAnsi[logLine.StepId] = ansi
321 }
322 err = p.pages.LogLine(&fragment, pages.LogLineParams{
323 Id: logLine.StepId,
324 Content: ansi.Render(logLine.Content),
325 })
326 }
327 if err != nil {
328 l.Error("failed to render log line", "err", err)
329 return
330 }
331
332 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
333 l.Error("error writing to client", "err", err)
334 return
335 }
336
337 case _, ok := <-statusCh:
338 if !ok {
339 continue
340 }
341 fresh, err := db.GetPipelineStatuses(
342 p.db,
343 1,
344 orm.FilterEq("p.repo_did", f.RepoDid),
345 orm.FilterEq("p.id", pipelineId),
346 )
347 if err != nil || len(fresh) == 0 {
348 continue
349 }
350 for name, ws := range fresh[0].Statuses {
351 fragment.Reset()
352 if err = p.pages.WorkflowSymbolOOB(&fragment, pages.WorkflowSymbolOOBParams{
353 Name: name,
354 Statuses: ws,
355 }); err != nil {
356 l.Error("failed to render workflow symbol OOB", "err", err)
357 continue
358 }
359 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
360 l.Error("error writing workflow symbol to client", "err", err)
361 return
362 }
363 }
364
365 case <-time.After(30 * time.Second):
366 l.Debug("sent keepalive")
367 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
368 l.Error("failed to write control", "err", err)
369 return
370 }
371 }
372 }
373}
374
375func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) {
376 l := p.logger.With("handler", "Cancel")
377
378 var (
379 pipelineId = chi.URLParam(r, "pipeline")
380 workflow = chi.URLParam(r, "workflow")
381 )
382 if pipelineId == "" || workflow == "" {
383 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
384 return
385 }
386
387 f, err := p.repoResolver.Resolve(r)
388 if err != nil {
389 l.Error("failed to get repo and knot", "err", err)
390 http.Error(w, "bad repo/knot", http.StatusBadRequest)
391 return
392 }
393
394 pipeline, err := func() (models.Pipeline, error) {
395 ps, err := db.GetPipelineStatuses(
396 p.db,
397 1,
398 orm.FilterEq("p.repo_did", f.RepoDid),
399 orm.FilterEq("p.id", pipelineId),
400 )
401 if err != nil {
402 return models.Pipeline{}, err
403 }
404 if len(ps) != 1 {
405 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps))
406 }
407 return ps[0], nil
408 }()
409 if err != nil {
410 l.Error("pipeline query failed", "err", err)
411 http.Error(w, "pipeline not found", http.StatusNotFound)
412 }
413 var (
414 spindle = f.Spindle
415 knot = f.Knot
416 rkey = pipeline.Rkey
417 )
418
419 if spindle == "" || knot == "" || rkey == "" {
420 http.Error(w, "invalid repo info", http.StatusBadRequest)
421 return
422 }
423
424 spindleClient, err := p.oauth.ServiceClient(
425 r,
426 oauth.WithService(f.Spindle),
427 oauth.WithLxm(tangled.PipelineCancelPipelineNSID),
428 oauth.WithDev(p.config.Core.Dev),
429 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time
430 )
431
432 err = tangled.PipelineCancelPipeline(
433 r.Context(),
434 spindleClient,
435 &tangled.PipelineCancelPipeline_Input{
436 Repo: string(f.RepoAt()),
437 Pipeline: pipeline.AtUri().String(),
438 Workflow: workflow,
439 },
440 )
441 errorId := "workflow-error"
442 if err != nil {
443 l.Error("failed to cancel workflow", "err", err)
444 p.pages.Notice(w, errorId, "Failed to cancel workflow")
445 return
446 }
447 l.Debug("canceled pipeline", "uri", pipeline.AtUri())
448}