Monorepo for Tangled
tangled.org
1package ssh
2
3import (
4 "encoding/json"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/charmbracelet/bubbles/spinner"
10 "github.com/charmbracelet/bubbles/viewport"
11 tea "github.com/charmbracelet/bubbletea"
12 "github.com/charmbracelet/lipgloss"
13 "github.com/gorilla/websocket"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 "tangled.org/core/appview/pipelines"
17 "tangled.org/core/orm"
18 spindlemodel "tangled.org/core/spindle/models"
19)
20
21var (
22 colorFg lipgloss.NoColor = lipgloss.NoColor{}
23 colorBlue lipgloss.ANSIColor = 4
24 colorBrightBlack lipgloss.ANSIColor = 8
25)
26
27type tickMsg time.Time
28
29type statusUpdateMsg struct {
30 pipeline models.Pipeline
31}
32
33type statusUpdateErrMsg struct{ err error }
34
35type pipelineModel struct {
36 renderer *lipgloss.Renderer
37 server *Server
38 pipeline models.Pipeline
39 workflows []string
40 selected int
41 logs map[string]*workflowLogs
42 statusCh chan struct{}
43 spinner spinner.Model
44 width int
45 height int
46}
47
48type workflowLogs struct {
49 steps []step
50 stepIndex map[int]int
51 vp viewport.Model
52 ready bool
53 done bool
54 err error
55}
56
57func newPipelineModel(renderer *lipgloss.Renderer, s *Server, pipeline models.Pipeline, width, height int) *pipelineModel {
58 workflows := pipeline.Workflows()
59 logs := make(map[string]*workflowLogs, len(workflows))
60 for _, wf := range workflows {
61 logs[wf] = &workflowLogs{stepIndex: make(map[int]int)}
62 }
63 statusCh := s.pipelineNotifier.Subscribe(pipeline.AtUri())
64 sp := spinner.New(spinner.WithSpinner(spinner.Line))
65 return &pipelineModel{
66 renderer: renderer,
67 server: s,
68 pipeline: pipeline,
69 workflows: workflows,
70 logs: logs,
71 statusCh: statusCh,
72 spinner: sp,
73 width: width,
74 height: height,
75 }
76}
77
78func (m *pipelineModel) Init() tea.Cmd {
79 cmds := []tea.Cmd{tick(), m.spinner.Tick, m.waitForStatusUpdate(m.statusCh)}
80 for _, wf := range m.workflows {
81 cmds = append(cmds, m.connectCmd(wf))
82 }
83 return tea.Batch(cmds...)
84}
85
86func tick() tea.Cmd {
87 return tea.Tick(time.Second, func(t time.Time) tea.Msg { return tickMsg(t) })
88}
89
90// waitForStatusUpdate blocks on the notifier channel, re-fetches pipeline statuses, and returns the result as a tea.Msg.
91func (m *pipelineModel) waitForStatusUpdate(ch chan struct{}) tea.Cmd {
92 knot := m.pipeline.Knot
93 rkey := m.pipeline.Rkey
94 return func() tea.Msg {
95 if _, ok := <-ch; !ok {
96 return nil
97 }
98 ps, err := db.GetPipelineStatuses(m.server.db, 1,
99 orm.FilterEq("p.knot", knot),
100 orm.FilterEq("p.rkey", rkey),
101 )
102 if err != nil || len(ps) == 0 {
103 return statusUpdateErrMsg{err: fmt.Errorf("refreshing pipeline: %w", err)}
104 }
105 return statusUpdateMsg{pipeline: ps[0]}
106 }
107}
108
109// connectCmd dials the spindle websocket for the given workflow and starts streaming log events.
110func (m *pipelineModel) connectCmd(workflow string) tea.Cmd {
111 return func() tea.Msg {
112 ws, ok := m.pipeline.Statuses[workflow]
113 if !ok || len(ws.Data) == 0 {
114 return logDoneMsg{workflow: workflow}
115 }
116 url := pipelines.SpindleURL(m.server.config.Core.Dev, ws.Data[0].Spindle, m.pipeline.Knot, m.pipeline.Rkey, workflow)
117 conn, _, err := websocket.DefaultDialer.Dial(url, nil)
118 if err != nil {
119 return logDoneMsg{workflow: workflow, err: fmt.Errorf("connecting to spindle: %w", err)}
120 }
121 ch := make(chan pipelines.LogEvent, 100)
122 go pipelines.ReadLogs(conn, ch)
123 return readNextLogEvent(workflow, conn, ch)
124 }
125}
126
127func (m *pipelineModel) vpHeight() int {
128 return max(m.height-2, 1) // topbar + empty line take 2 lines
129}
130
131// resizeViewports updates all viewport dimensions and re-renders their content after a terminal resize.
132//
133// TODO: can be tedious if we have logs of logs
134func (m *pipelineModel) resizeViewports() {
135 for _, wl := range m.logs {
136 if !wl.ready {
137 continue
138 }
139 atBottom := wl.vp.AtBottom()
140 wl.vp.Width = m.width
141 wl.vp.Height = m.vpHeight()
142 wl.vp.SetContent(renderLogs(m.renderer, wl, m.width))
143 if atBottom {
144 wl.vp.GotoBottom()
145 }
146 }
147}
148
149func (m *pipelineModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
150 switch msg := msg.(type) {
151 case tea.WindowSizeMsg:
152 m.width, m.height = msg.Width, msg.Height
153 m.resizeViewports()
154
155 case tickMsg:
156 return m, tick()
157
158 case spinner.TickMsg:
159 var cmd tea.Cmd
160 m.spinner, cmd = m.spinner.Update(msg)
161 return m, cmd
162
163 case tea.KeyMsg:
164 switch msg.String() {
165 case "q", "ctrl+c":
166 m.server.pipelineNotifier.Unsubscribe(m.pipeline.AtUri(), m.statusCh)
167 return m, tea.Quit
168 case "tab", "right", "l":
169 m.selected = (m.selected + 1) % len(m.workflows)
170 return m, nil
171 case "shift+tab", "left", "h":
172 m.selected = (m.selected - 1 + len(m.workflows)) % len(m.workflows)
173 return m, nil
174 }
175 if wl := m.selectedLogs(); wl != nil && wl.ready {
176 switch msg.String() {
177 case "g":
178 wl.vp.GotoTop()
179 return m, nil
180 case "G":
181 wl.vp.GotoBottom()
182 return m, nil
183 case "ctrl+e":
184 wl.vp.ScrollDown(1)
185 return m, nil
186 case "ctrl+y":
187 wl.vp.ScrollUp(1)
188 return m, nil
189 }
190 var cmd tea.Cmd
191 wl.vp, cmd = wl.vp.Update(msg)
192 return m, cmd
193 }
194
195 case logEventMsg:
196 return m, m.handleLogEvent(msg)
197
198 case logDoneMsg:
199 if wl, ok := m.logs[msg.workflow]; ok {
200 wl.done, wl.err = true, msg.err
201 m.initViewport(wl)
202 wl.vp.SetContent(renderLogs(m.renderer, wl, m.width))
203 wl.vp.GotoBottom()
204 }
205
206 case statusUpdateMsg:
207 // detect any workflows that are new since the last update
208 known := make(map[string]bool, len(m.workflows))
209 for _, wf := range m.workflows {
210 known[wf] = true
211 }
212 m.pipeline = msg.pipeline
213 var newCmds []tea.Cmd
214 for _, wf := range msg.pipeline.Workflows() {
215 if !known[wf] {
216 m.workflows = append(m.workflows, wf)
217 m.logs[wf] = &workflowLogs{stepIndex: make(map[int]int)}
218 newCmds = append(newCmds, m.connectCmd(wf))
219 }
220 }
221 // re-subscribe for the next update
222 newCmds = append(newCmds, m.waitForStatusUpdate(m.statusCh))
223 return m, tea.Batch(newCmds...)
224
225 case statusUpdateErrMsg:
226 // re-subscribe even on error so we don't stop listening
227 return m, m.waitForStatusUpdate(m.statusCh)
228 }
229
230 return m, nil
231}
232
233func (m *pipelineModel) selectedLogs() *workflowLogs {
234 if len(m.workflows) == 0 {
235 return nil
236 }
237 return m.logs[m.workflows[m.selected]]
238}
239
240func (m *pipelineModel) initViewport(wl *workflowLogs) {
241 if wl.ready {
242 return
243 }
244 wl.vp = viewport.New(m.width, m.vpHeight())
245 wl.ready = true
246}
247
248// handleLogEvent processes a single log event, updates the step state, and re-renders the viewport.
249func (m *pipelineModel) handleLogEvent(msg logEventMsg) tea.Cmd {
250 wl, ok := m.logs[msg.workflow]
251 if !ok {
252 return nil
253 }
254 if msg.ev.Err != nil {
255 wl.done = true
256 if !msg.ev.IsCloseError() {
257 wl.err = msg.ev.Err
258 }
259 m.initViewport(wl)
260 wl.vp.SetContent(renderLogs(m.renderer, wl, m.width))
261 return nil
262 }
263 var line spindlemodel.LogLine
264 if err := json.Unmarshal(msg.ev.Msg, &line); err != nil {
265 return readNextCmd(msg.workflow, msg.conn, msg.ch)
266 }
267 applyLogLine(wl, line)
268 m.initViewport(wl)
269 atBottom := wl.vp.AtBottom()
270 wl.vp.SetContent(renderLogs(m.renderer, wl, m.width))
271 if atBottom {
272 wl.vp.GotoBottom()
273 }
274 return readNextCmd(msg.workflow, msg.conn, msg.ch)
275}
276
277// applyLogLine mutates wl by appending the log line to the appropriate step.
278func applyLogLine(wl *workflowLogs, line spindlemodel.LogLine) {
279 switch line.Kind {
280 case spindlemodel.LogKindControl:
281 switch line.StepStatus {
282 case spindlemodel.StepStatusStart:
283 idx := len(wl.steps)
284 wl.stepIndex[line.StepId] = idx
285 wl.steps = append(wl.steps, step{
286 id: line.StepId, name: line.Content, command: line.StepCommand,
287 kind: line.StepKind, startTime: line.Time,
288 })
289 case spindlemodel.StepStatusEnd:
290 if idx, ok := wl.stepIndex[line.StepId]; ok {
291 wl.steps[idx].endTime, wl.steps[idx].finished = line.Time, true
292 }
293 }
294 case spindlemodel.LogKindData:
295 if idx, ok := wl.stepIndex[line.StepId]; ok {
296 wl.steps[idx].lines = append(wl.steps[idx].lines, line.Content)
297 }
298 }
299}
300
301// renderLogs builds the full log content string for a workflow, used as viewport content.
302func renderLogs(r *lipgloss.Renderer, wl *workflowLogs, width int) string {
303 headerStyle := r.NewStyle().Foreground(colorFg).Bold(true)
304 cmdStyle := r.NewStyle().Foreground(colorBlue).Width(width)
305 dimStyle := r.NewStyle().Faint(true)
306 now := time.Now()
307 var sb strings.Builder
308 for i := range wl.steps {
309 st := &wl.steps[i]
310 dur := ""
311 if st.finished {
312 dur = st.endTime.Sub(st.startTime).Round(time.Millisecond).String()
313 } else if !st.startTime.IsZero() {
314 dur = now.Sub(st.startTime).Round(time.Second).String()
315 }
316 // build overlay: "── name ──...── dur ──"
317 nameStr := headerStyle.Render(st.name + " ")
318 durStr := headerStyle.Render(" " + dur + " ")
319 nameW := lipgloss.Width(nameStr)
320 durW := lipgloss.Width(durStr)
321 fillW := max(width-nameW-durW, 0)
322 fill := dimStyle.Render(strings.Repeat("─", fillW))
323 header := nameStr + fill + durStr
324 sb.WriteString(header + "\n")
325 if st.command != "" {
326 sb.WriteString(cmdStyle.Render(st.command) + "\n")
327 }
328 for _, l := range st.lines {
329 sb.WriteString(l + "\n")
330 }
331 sb.WriteString("\n")
332 }
333 if wl.done && wl.err != nil {
334 sb.WriteString("error: " + wl.err.Error() + "\n")
335 }
336 return sb.String()
337}
338
339func (m *pipelineModel) View() string {
340 body := ""
341 if wl := m.selectedLogs(); wl != nil && wl.ready {
342 body = wl.vp.View()
343 }
344 return lipgloss.JoinVertical(lipgloss.Left, m.topbarView(), "", body)
345}
346
347// topbarView renders the single-line tab bar with workflow tabs left and trigger info + help right.
348func (m *pipelineModel) topbarView() string {
349 r := m.renderer
350 activeStyle := r.NewStyle().Background(colorBlue).Foreground(colorFg).Bold(true)
351
352 now := time.Now()
353
354 var tabs strings.Builder
355 for i, wf := range m.workflows {
356 status := spindlemodel.StatusKindPending
357 elapsed := ""
358 if ws, ok := m.pipeline.Statuses[wf]; ok {
359 latest := ws.Latest()
360 status = latest.Status
361 if t := ws.TimeTaken(); t > 0 {
362 elapsed = t.Round(time.Second).String()
363 } else {
364 elapsed = now.Sub(latest.Created).Round(time.Second).String()
365 }
366 }
367 dim := r.NewStyle().Faint(true)
368 base := " " + statusIcon(status, m.spinner.View()) + " " + wf
369 if i == m.selected {
370 tab := base
371 if elapsed != "" {
372 tab += " " + elapsed
373 }
374 tab += " "
375 tabs.WriteString(activeStyle.Render(tab))
376 } else {
377 tabs.WriteString(base)
378 if elapsed != "" {
379 tabs.WriteString(" " + dim.Render(elapsed))
380 }
381 tabs.WriteString(" ")
382 }
383 }
384
385 tabsStr := tabs.String()
386 infoStr := triggerLine(r, m.pipeline.Trigger, m.pipeline.Sha) + " · " + helpText(r)
387
388 gap := max(m.width-lipgloss.Width(tabsStr)-lipgloss.Width(infoStr), 1)
389
390 return tabsStr + strings.Repeat(" ", gap) + infoStr
391}
392
393func helpText(r *lipgloss.Renderer) string {
394 key := r.NewStyle().Foreground(colorFg)
395 action := r.NewStyle().Faint(true)
396 sep := action.Render(" · ")
397
398 items := []string{
399 key.Render("←/→") + " " + action.Render("switch"),
400 key.Render("↑/↓") + " " + action.Render("scroll"),
401 key.Render("q") + " " + action.Render("quit"),
402 }
403 return strings.Join(items, sep)
404}
405
406func shortSha(sha string) string {
407 if len(sha) >= 8 {
408 return sha[:8]
409 }
410 return sha
411}
412
413func triggerLine(r *lipgloss.Renderer, t *models.Trigger, sha string) string {
414 hash := shortSha(sha)
415 dim := r.NewStyle().Faint(true)
416 if t == nil {
417 return dim.Render(hash)
418 }
419 if t.IsPush() {
420 return t.TargetRef() + dim.Render("@"+hash) + dim.Render(" (push)")
421 }
422 if t.IsPullRequest() {
423 source := ""
424 if t.PRSourceBranch != nil {
425 source = *t.PRSourceBranch
426 }
427 return t.TargetRef() + dim.Render(" <- "+source+"@"+hash) + dim.Render(" (pull-request)")
428 }
429 return dim.Render(hash)
430}
431
432func statusIcon(status spindlemodel.StatusKind, spinnerFrame string) string {
433 switch status {
434 case spindlemodel.StatusKindSuccess:
435 return "✓"
436 case spindlemodel.StatusKindFailed:
437 return "×"
438 case spindlemodel.StatusKindRunning:
439 return spinnerFrame
440 case spindlemodel.StatusKindPending:
441 return "·"
442 case spindlemodel.StatusKindTimeout:
443 return "⌀"
444 case spindlemodel.StatusKindCancelled:
445 return "-"
446 default:
447 return "?"
448 }
449}