Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

wip: appview/pipelines/ssh: use new ci models

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit 9bee953c parent 5dc27981 change-id nuqutkuv
+308 -188
+43
appview/pipelines/ssh/cihelpers.go
··· 1 + package ssh 2 + 3 + import ( 4 + "time" 5 + 6 + "tangled.org/core/api/tangled" 7 + ) 8 + 9 + // helper functions against generated code 10 + 11 + func workflowElapsed(wf *tangled.CiDefs_Workflow, now time.Time) time.Duration { 12 + if wf.StartedAt == nil { 13 + return 0 14 + } 15 + started, err := time.Parse(time.RFC3339, *wf.StartedAt) 16 + if err != nil { 17 + return 0 18 + } 19 + if wf.FinishedAt == nil { 20 + return now.Sub(started) 21 + } 22 + finished, err := time.Parse(time.RFC3339, *wf.FinishedAt) 23 + if err != nil { 24 + return 0 25 + } 26 + return finished.Sub(started) 27 + } 28 + 29 + var finishedStatuses = map[string]bool{ 30 + "failed": true, 31 + "timeout": true, 32 + "cancelled": true, 33 + "success": true, 34 + } 35 + 36 + func pipelineFinished(p *tangled.CiDefs_Pipeline) bool { 37 + for _, wf := range p.Workflows { 38 + if !finishedStatuses[wf.Status] { 39 + return false 40 + } 41 + } 42 + return true 43 + }
+21 -19
appview/pipelines/ssh/logstream.go
··· 1 1 package ssh 2 2 3 3 import ( 4 + "context" 4 5 "time" 5 6 6 - tea "github.com/charmbracelet/bubbletea" 7 - "github.com/gorilla/websocket" 8 - "tangled.org/core/appview/pipelines" 7 + "tangled.org/core/api/tangled" 9 8 ) 10 9 11 10 type step struct { 12 - id int 11 + id int64 13 12 name string 14 13 command string 15 14 lines []string ··· 19 18 } 20 19 21 20 type logDoneMsg struct { 22 - workflow string 23 - err error 21 + err error 24 22 } 25 23 26 24 type logEventMsg struct { 27 - workflow string 28 - ev pipelines.LogEvent 29 - conn *websocket.Conn 30 - ch chan pipelines.LogEvent 25 + ev *tangled.CiPipelineSubscribeLogs_Event 26 + events chan *tangled.CiPipelineSubscribeLogs_Event 27 + done chan error 31 28 } 32 29 33 - func readNextCmd(workflow string, conn *websocket.Conn, ch chan pipelines.LogEvent) tea.Cmd { 34 - return func() tea.Msg { 35 - return readNextLogEvent(workflow, conn, ch) 36 - } 30 + type eventScheduler struct { 31 + ch chan *tangled.CiPipelineSubscribeLogs_Event 37 32 } 38 33 39 - func readNextLogEvent(workflow string, conn *websocket.Conn, ch chan pipelines.LogEvent) tea.Msg { 40 - ev, ok := <-ch 41 - if !ok { 42 - return logDoneMsg{workflow: workflow} 34 + func newEventScheduler() *eventScheduler { 35 + return &eventScheduler{ch: make(chan *tangled.CiPipelineSubscribeLogs_Event, 1024)} 36 + } 37 + 38 + func (s *eventScheduler) AddWork(ctx context.Context, _ string, v *tangled.CiPipelineSubscribeLogs_Event) error { 39 + select { 40 + case s.ch <- v: 41 + return nil 42 + case <-ctx.Done(): 43 + return ctx.Err() 43 44 } 44 - return logEventMsg{workflow: workflow, ev: ev, conn: conn, ch: ch} 45 45 } 46 + 47 + func (s *eventScheduler) Shutdown() { close(s.ch) }
+30 -7
appview/pipelines/ssh/session.go
··· 1 1 package ssh 2 2 3 3 import ( 4 - "context" 5 4 "fmt" 6 5 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 7 8 tea "github.com/charmbracelet/bubbletea" 8 9 "github.com/charmbracelet/ssh" 9 10 wishtea "github.com/charmbracelet/wish/bubbletea" 10 - indigoxrpc "github.com/bluesky-social/indigo/xrpc" 11 11 "tangled.org/core/api/tangled" 12 + "tangled.org/core/appview/db" 13 + "tangled.org/core/hostutil" 14 + extlexutil "tangled.org/core/lexutil" 15 + "tangled.org/core/orm" 12 16 ) 13 17 14 18 func (s *Server) teaHandler(sess ssh.Session) (tea.Model, []tea.ProgramOption) { ··· 30 34 31 35 l = l.With("repoDID", repoDID, "sha", sha) 32 36 33 - spindle := "" 37 + repo, err := db.GetRepo(s.db, orm.FilterEq("repo_did", repoDID)) 38 + if err != nil { 39 + l.Warn("repo not found", "err", err) 40 + return newErrorModel(renderer, fmt.Sprintf("repo %s not found", repoDID)), wishtea.MakeOptions(sess) 41 + } 42 + if repo.Spindle == "" { 43 + l.Warn("no spindle configured") 44 + return newErrorModel(renderer, "no spindle configured for this repo"), wishtea.MakeOptions(sess) 45 + } 46 + 47 + l = l.With("spindle", repo.Spindle) 34 48 35 - l = l.With("spindle", spindle) 49 + host, err := hostutil.EnsureHttpScheme(repo.Spindle) 50 + if err != nil { 51 + l.Warn("invalid spindlie hostname", "err", err) 52 + return newErrorModel(renderer, fmt.Sprintf("invalid spindle host %q", repo.Spindle)), wishtea.MakeOptions(sess) 53 + } 36 54 37 - xrpcc := indigoxrpc.Client{Host: spindle} 38 - out, err := tangled.CiQueryPipelines(context.TODO(), &xrpcc, []string{sha}, "", 1, repoDID) 55 + xrpcc := extlexutil.Client{Client: indigoxrpc.Client{Host: host}} 56 + out, err := tangled.CiQueryPipelines(sess.Context(), &xrpcc, []string{sha}, "", 1, repoDID) 39 57 if err != nil || len(out.Pipelines) == 0 { 40 58 l.Warn("pipeline not found", "err", err) 41 59 return newErrorModel(renderer, fmt.Sprintf("pipeline not found for repo %s @ %s", repoDID, sha)), wishtea.MakeOptions(sess) 42 60 } 43 61 44 62 pipeline := out.Pipelines[0] 63 + if _, err := syntax.ParseTID(pipeline.Id); err != nil { 64 + l.Warn("invalid pipeline id", "id", pipeline.Id, "err", err) 65 + return newErrorModel(renderer, fmt.Sprintf("invalid pipeline id %q", pipeline.Id)), wishtea.MakeOptions(sess) 66 + } 67 + 45 68 l.Info("serving pipeline", "pipeline", pipeline.Id, "workflows", len(pipeline.Workflows)) 46 69 pty, _, _ := sess.Pty() 47 70 opts := append(wishtea.MakeOptions(sess), tea.WithAltScreen()) 48 - return newPipelineModel(renderer, s, pipeline, pty.Window.Width, pty.Window.Height), opts 71 + return newPipelineModel(renderer, &xrpcc, pipeline, pty.Window.Width, pty.Window.Height), opts 49 72 }
+214 -162
appview/pipelines/ssh/tui.go
··· 1 1 package ssh 2 2 3 3 import ( 4 - "encoding/json" 4 + "context" 5 + "errors" 5 6 "fmt" 6 7 "strings" 7 8 "time" ··· 11 12 tea "github.com/charmbracelet/bubbletea" 12 13 "github.com/charmbracelet/lipgloss" 13 14 "github.com/gorilla/websocket" 14 - "tangled.org/core/appview/db" 15 - "tangled.org/core/appview/models" 16 - "tangled.org/core/appview/pipelines" 17 - "tangled.org/core/orm" 18 - spindlemodel "tangled.org/core/spindle/models" 15 + "tangled.org/core/api/tangled" 16 + extlexutil "tangled.org/core/lexutil" 19 17 ) 20 18 21 19 var ( ··· 27 25 type tickMsg time.Time 28 26 29 27 type statusUpdateMsg struct { 30 - pipeline models.Pipeline 28 + pipeline *tangled.CiDefs_Pipeline 31 29 } 32 30 33 31 type statusUpdateErrMsg struct{ err error } 34 32 35 33 type pipelineModel struct { 36 - renderer *lipgloss.Renderer 37 - server *Server 38 - pipeline models.Pipeline 39 - workflows []string 40 - selected int 41 - logs map[string]*workflowLogs 42 - statusCh chan struct{} 43 - spinner spinner.Model 44 - width int 45 - height int 34 + renderer *lipgloss.Renderer 35 + xrpcc *extlexutil.Client 36 + pipeline *tangled.CiDefs_Pipeline 37 + selected int 38 + logs map[string]*workflowLogs 39 + 40 + // pipeline log stream: cancel tears down the consumer goroutine on quit. 41 + // the event/done channels are threaded through log messages, not stored here. 42 + cancel context.CancelFunc 43 + streamDone bool 44 + streamErr error 45 + 46 + spinner spinner.Model 47 + width int 48 + height int 46 49 } 47 50 48 51 type workflowLogs struct { 49 52 steps []step 50 - stepIndex map[int]int 53 + stepIndex map[int64]int // stepId -> index map 51 54 vp viewport.Model 52 55 ready bool 53 - done bool 54 - err error 55 56 } 56 57 57 - func newPipelineModel(renderer *lipgloss.Renderer, s *Server, pipeline models.Pipeline, width, height int) *pipelineModel { 58 - workflows := pipeline.Workflows() 59 - logs := make(map[string]*workflowLogs, len(workflows)) 60 - for _, wf := range workflows { 61 - logs[wf] = &workflowLogs{stepIndex: make(map[int]int)} 58 + func newPipelineModel(renderer *lipgloss.Renderer, xrpcc *extlexutil.Client, pipeline *tangled.CiDefs_Pipeline, width, height int) *pipelineModel { 59 + logs := make(map[string]*workflowLogs, len(pipeline.Workflows)) 60 + for _, wf := range pipeline.Workflows { 61 + logs[wf.Name] = &workflowLogs{stepIndex: make(map[int64]int)} 62 62 } 63 - statusCh := s.pipelineNotifier.Subscribe(pipeline.AtUri()) 64 63 sp := spinner.New(spinner.WithSpinner(spinner.Line)) 65 64 return &pipelineModel{ 66 - renderer: renderer, 67 - server: s, 68 - pipeline: pipeline, 69 - workflows: workflows, 70 - logs: logs, 71 - statusCh: statusCh, 72 - spinner: sp, 73 - width: width, 74 - height: height, 65 + renderer: renderer, 66 + xrpcc: xrpcc, 67 + pipeline: pipeline, 68 + logs: logs, 69 + spinner: sp, 70 + width: width, 71 + height: height, 75 72 } 76 73 } 77 74 78 75 func (m *pipelineModel) Init() tea.Cmd { 79 - cmds := []tea.Cmd{tick(), m.spinner.Tick, m.waitForStatusUpdate(m.statusCh)} 80 - for _, wf := range m.workflows { 81 - cmds = append(cmds, m.connectCmd(wf)) 82 - } 83 - return tea.Batch(cmds...) 76 + return tea.Batch(tick(), m.spinner.Tick, m.subscribeCmd()) 84 77 } 85 78 86 79 func tick() tea.Cmd { 87 80 return tea.Tick(time.Second, func(t time.Time) tea.Msg { return tickMsg(t) }) 88 81 } 89 82 90 - // waitForStatusUpdate blocks on the notifier channel, re-fetches pipeline statuses, and returns the result as a tea.Msg. 91 - func (m *pipelineModel) waitForStatusUpdate(ch chan struct{}) tea.Cmd { 92 - knot := m.pipeline.Knot 93 - rkey := m.pipeline.Rkey 83 + // subscribeCmd opens the ci.pipeline.subscribeLogs stream for current pipeline. 84 + // A consumer goroutine pushes decoded events onto the scheduler channel; the 85 + // returned command yields the first event into the bubbletea loop. 86 + func (m *pipelineModel) subscribeCmd() tea.Cmd { 87 + // cancel existing subscriptions just in case 88 + if m.cancel != nil { 89 + m.cancel() 90 + } 91 + sched := newEventScheduler() 92 + done := make(chan error, 1) 93 + ctx, cancel := context.WithCancel(context.Background()) 94 + m.cancel = cancel 95 + 96 + pipelineId := m.pipeline.Id 97 + go func() { 98 + err := tangled.CiPipelineSubscribeLogs(ctx, m.xrpcc, pipelineId, nil, sched) 99 + done <- err 100 + }() 101 + 102 + return readEventCmd(sched.ch, done) 103 + } 104 + 105 + func readEventCmd(events chan *tangled.CiPipelineSubscribeLogs_Event, done chan error) tea.Cmd { 94 106 return func() tea.Msg { 95 - if _, ok := <-ch; !ok { 96 - return nil 97 - } 98 - ps, err := db.GetPipelineStatuses(m.server.db, 1, 99 - orm.FilterEq("p.knot", knot), 100 - orm.FilterEq("p.rkey", rkey), 101 - ) 102 - if err != nil || len(ps) == 0 { 103 - return statusUpdateErrMsg{err: fmt.Errorf("refreshing pipeline: %w", err)} 107 + ev, ok := <-events 108 + if !ok { 109 + return logDoneMsg{err: <-done} 104 110 } 105 - return statusUpdateMsg{pipeline: ps[0]} 111 + return logEventMsg{ev: ev, events: events, done: done} 106 112 } 107 113 } 108 114 109 - // connectCmd dials the spindle websocket for the given workflow and starts streaming log events. 110 - func (m *pipelineModel) connectCmd(workflow string) tea.Cmd { 115 + // fetchStatusCmd re-fetches the pipeline 116 + func (m *pipelineModel) fetchStatusCmd() tea.Cmd { 117 + pipelineId := m.pipeline.Id 111 118 return func() tea.Msg { 112 - ws, ok := m.pipeline.Statuses[workflow] 113 - if !ok || len(ws.Data) == 0 { 114 - return logDoneMsg{workflow: workflow} 115 - } 116 - url := pipelines.SpindleURL(ws.Data[0].Spindle, m.pipeline.Knot, m.pipeline.Rkey, workflow) 117 - conn, _, err := websocket.DefaultDialer.Dial(url, nil) 119 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 120 + defer cancel() 121 + out, err := tangled.CiGetPipeline(ctx, m.xrpcc, pipelineId) 118 122 if err != nil { 119 - return logDoneMsg{workflow: workflow, err: fmt.Errorf("connecting to spindle: %w", err)} 123 + return statusUpdateErrMsg{err: fmt.Errorf("refreshing pipeline: %w", err)} 120 124 } 121 - ch := make(chan pipelines.LogEvent, 100) 122 - go pipelines.ReadLogs(conn, ch) 123 - return readNextLogEvent(workflow, conn, ch) 125 + return statusUpdateMsg{pipeline: out} 124 126 } 125 127 } 126 128 ··· 153 155 m.resizeViewports() 154 156 155 157 case tickMsg: 158 + // re-render running workflows so elapsed times advance 159 + m.refreshRunning() 156 160 return m, tick() 157 161 158 162 case spinner.TickMsg: ··· 163 167 case tea.KeyMsg: 164 168 switch msg.String() { 165 169 case "q", "ctrl+c": 166 - m.server.pipelineNotifier.Unsubscribe(m.pipeline.AtUri(), m.statusCh) 170 + if m.cancel != nil { 171 + m.cancel() 172 + } 167 173 return m, tea.Quit 168 174 case "tab", "right", "l": 169 - m.selected = (m.selected + 1) % len(m.workflows) 175 + m.selected = (m.selected + 1) % len(m.pipeline.Workflows) 170 176 return m, nil 171 177 case "shift+tab", "left", "h": 172 - m.selected = (m.selected - 1 + len(m.workflows)) % len(m.workflows) 178 + m.selected = (m.selected - 1 + len(m.pipeline.Workflows)) % len(m.pipeline.Workflows) 173 179 return m, nil 174 180 } 175 181 if wl := m.selectedLogs(); wl != nil && wl.ready { ··· 193 199 } 194 200 195 201 case logEventMsg: 196 - return m, m.handleLogEvent(msg) 202 + m.applyEvent(msg.ev) 203 + return m, readEventCmd(msg.events, msg.done) 197 204 198 205 case logDoneMsg: 199 - if wl, ok := m.logs[msg.workflow]; ok { 200 - wl.done, wl.err = true, msg.err 201 - m.initViewport(wl) 202 - wl.vp.SetContent(renderLogs(m.renderer, wl, m.width)) 203 - wl.vp.GotoBottom() 206 + m.streamDone = true 207 + if !isExpectedClose(msg.err) { 208 + m.streamErr = msg.err 204 209 } 210 + m.refreshAll() 211 + // resolve final workflow statuses once now that the stream has ended 212 + return m, m.fetchStatusCmd() 205 213 206 214 case statusUpdateMsg: 207 - // detect any workflows that are new since the last update 208 - known := make(map[string]bool, len(m.workflows)) 209 - for _, wf := range m.workflows { 210 - known[wf] = true 215 + m.pipeline = msg.pipeline 216 + known := make(map[string]bool, len(m.pipeline.Workflows)) 217 + for _, wf := range m.pipeline.Workflows { 218 + known[wf.Name] = true 211 219 } 212 - m.pipeline = msg.pipeline 213 - var newCmds []tea.Cmd 214 - for _, wf := range msg.pipeline.Workflows() { 215 - if !known[wf] { 216 - m.workflows = append(m.workflows, wf) 217 - m.logs[wf] = &workflowLogs{stepIndex: make(map[int]int)} 218 - newCmds = append(newCmds, m.connectCmd(wf)) 220 + for name := range m.logs { 221 + if !known[name] { 222 + delete(m.logs, name) 219 223 } 220 224 } 221 - // re-subscribe for the next update 222 - newCmds = append(newCmds, m.waitForStatusUpdate(m.statusCh)) 223 - return m, tea.Batch(newCmds...) 225 + if m.selected >= len(m.pipeline.Workflows) { 226 + m.selected = max(len(m.pipeline.Workflows)-1, 0) 227 + } 228 + m.refreshAll() 224 229 225 230 case statusUpdateErrMsg: 226 - // re-subscribe even on error so we don't stop listening 227 - return m, m.waitForStatusUpdate(m.statusCh) 231 + // best-effort final status refresh; ignore failures 228 232 } 229 233 230 234 return m, nil 231 235 } 232 236 233 237 func (m *pipelineModel) selectedLogs() *workflowLogs { 234 - if len(m.workflows) == 0 { 238 + if len(m.pipeline.Workflows) < 1+m.selected { 235 239 return nil 236 240 } 237 - return m.logs[m.workflows[m.selected]] 241 + return m.logs[m.pipeline.Workflows[m.selected].Name] 238 242 } 239 243 240 244 func (m *pipelineModel) initViewport(wl *workflowLogs) { ··· 245 249 wl.ready = true 246 250 } 247 251 248 - // handleLogEvent processes a single log event, updates the step state, and re-renders the viewport. 249 - func (m *pipelineModel) handleLogEvent(msg logEventMsg) tea.Cmd { 250 - wl, ok := m.logs[msg.workflow] 252 + // ensureWorkflow returns the log state for a workflow, lazily creating its routing entry. 253 + func (m *pipelineModel) ensureWorkflow(name string) *workflowLogs { 254 + wl, ok := m.logs[name] 251 255 if !ok { 252 - return nil 256 + wl = &workflowLogs{stepIndex: make(map[int64]int)} 257 + m.logs[name] = wl 253 258 } 254 - if msg.ev.Err != nil { 255 - wl.done = true 256 - if !msg.ev.IsCloseError() { 257 - wl.err = msg.ev.Err 258 - } 259 - m.initViewport(wl) 260 - wl.vp.SetContent(renderLogs(m.renderer, wl, m.width)) 261 - return nil 262 - } 263 - var line spindlemodel.LogLine 264 - if err := json.Unmarshal(msg.ev.Msg, &line); err != nil { 265 - return readNextCmd(msg.workflow, msg.conn, msg.ch) 266 - } 267 - applyLogLine(wl, line) 259 + return wl 260 + } 261 + 262 + // renderWorkflow re-renders a workflow's viewport, preserving bottom-stickiness. 263 + func (m *pipelineModel) renderWorkflow(wl *workflowLogs) { 268 264 m.initViewport(wl) 269 265 atBottom := wl.vp.AtBottom() 270 266 wl.vp.SetContent(renderLogs(m.renderer, wl, m.width)) 271 267 if atBottom { 272 268 wl.vp.GotoBottom() 273 269 } 274 - return readNextCmd(msg.workflow, msg.conn, msg.ch) 270 + } 271 + 272 + // refreshAll re-renders every initialized viewport. 273 + func (m *pipelineModel) refreshAll() { 274 + for _, wl := range m.logs { 275 + m.renderWorkflow(wl) 276 + } 275 277 } 276 278 277 - // applyLogLine mutates wl by appending the log line to the appropriate step. 278 - func applyLogLine(wl *workflowLogs, line spindlemodel.LogLine) { 279 - switch line.Kind { 280 - case spindlemodel.LogKindControl: 281 - switch line.StepStatus { 282 - case spindlemodel.StepStatusStart: 283 - idx := len(wl.steps) 284 - wl.stepIndex[line.StepId] = idx 279 + // refreshRunning re-renders workflows with unfinished steps so elapsed times advance. 280 + func (m *pipelineModel) refreshRunning() { 281 + if m.streamDone { 282 + return 283 + } 284 + for _, wl := range m.logs { 285 + if !wl.ready { 286 + continue 287 + } 288 + for i := range wl.steps { 289 + if !wl.steps[i].finished { 290 + m.renderWorkflow(wl) 291 + break 292 + } 293 + } 294 + } 295 + } 296 + 297 + // applyEvent routes a decoded subscribeLogs event into the matching workflow. 298 + func (m *pipelineModel) applyEvent(ev *tangled.CiPipelineSubscribeLogs_Event) { 299 + switch { 300 + case ev.Error != nil: 301 + if ev.Error.Message != "" { 302 + m.streamErr = fmt.Errorf("%s: %s", ev.Error.Error, ev.Error.Message) 303 + } else { 304 + m.streamErr = fmt.Errorf("%s", ev.Error.Error) 305 + } 306 + 307 + case ev.Control != nil: 308 + c := ev.Control 309 + wl := m.ensureWorkflow(c.Workflow) 310 + switch derefStr(c.Status) { 311 + case "start": 312 + wl.stepIndex[c.Step] = len(wl.steps) 285 313 wl.steps = append(wl.steps, step{ 286 - id: line.StepId, name: line.Content, command: line.StepCommand, 287 - kind: line.StepKind, startTime: line.Time, 314 + id: c.Step, name: c.Content, command: derefStr(c.Command), startTime: parseRFC3339(c.Time), 288 315 }) 289 - case spindlemodel.StepStatusEnd: 290 - if idx, ok := wl.stepIndex[line.StepId]; ok { 291 - wl.steps[idx].endTime, wl.steps[idx].finished = line.Time, true 316 + case "end": 317 + if idx, ok := wl.stepIndex[c.Step]; ok { 318 + wl.steps[idx].endTime, wl.steps[idx].finished = parseRFC3339(c.Time), true 292 319 } 293 320 } 294 - case spindlemodel.LogKindData: 295 - if idx, ok := wl.stepIndex[line.StepId]; ok { 296 - wl.steps[idx].lines = append(wl.steps[idx].lines, line.Content) 321 + m.renderWorkflow(wl) 322 + 323 + case ev.Data != nil: 324 + d := ev.Data 325 + wl := m.ensureWorkflow(d.Workflow) 326 + if idx, ok := wl.stepIndex[d.Step]; ok { 327 + wl.steps[idx].lines = append(wl.steps[idx].lines, d.Content) 297 328 } 329 + m.renderWorkflow(wl) 298 330 } 299 331 } 300 332 333 + // isExpectedClose reports whether err is a clean websocket close (or nil). 334 + func isExpectedClose(err error) bool { 335 + if err == nil { 336 + return true 337 + } 338 + var ce *websocket.CloseError 339 + if errors.As(err, &ce) { 340 + switch ce.Code { 341 + case websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure: 342 + return true 343 + } 344 + } 345 + return false 346 + } 347 + 301 348 // renderLogs builds the full log content string for a workflow, used as viewport content. 302 349 func renderLogs(r *lipgloss.Renderer, wl *workflowLogs, width int) string { 303 350 headerStyle := r.NewStyle().Foreground(colorFg).Bold(true) 304 351 cmdStyle := r.NewStyle().Foreground(colorBlue).Width(width) 305 352 dimStyle := r.NewStyle().Faint(true) 306 - now := time.Now() 307 353 var sb strings.Builder 308 354 for i := range wl.steps { 309 355 st := &wl.steps[i] ··· 311 357 if st.finished { 312 358 dur = st.endTime.Sub(st.startTime).Round(time.Millisecond).String() 313 359 } else if !st.startTime.IsZero() { 314 - dur = now.Sub(st.startTime).Round(time.Second).String() 360 + dur = time.Since(st.startTime).Round(time.Second).String() 315 361 } 316 362 // build overlay: "── name ──...── dur ──" 317 363 nameStr := headerStyle.Render(st.name + " ") ··· 330 376 } 331 377 sb.WriteString("\n") 332 378 } 333 - if wl.done && wl.err != nil { 334 - sb.WriteString("error: " + wl.err.Error() + "\n") 335 - } 336 379 return sb.String() 337 380 } 338 381 ··· 340 383 body := "" 341 384 if wl := m.selectedLogs(); wl != nil && wl.ready { 342 385 body = wl.vp.View() 386 + } 387 + if m.streamErr != nil { 388 + body = lipgloss.JoinVertical(lipgloss.Left, body, m.renderer.NewStyle().Foreground(colorBlue).Render("stream error: "+m.streamErr.Error())) 343 389 } 344 390 return lipgloss.JoinVertical(lipgloss.Left, m.topbarView(), "", body) 345 391 } ··· 352 398 now := time.Now() 353 399 354 400 var tabs strings.Builder 355 - for i, wf := range m.workflows { 356 - status := spindlemodel.StatusKindPending 357 - elapsed := "" 358 - if ws, ok := m.pipeline.Statuses[wf]; ok { 359 - latest := ws.Latest() 360 - status = latest.Status 361 - if t := ws.TimeTaken(); t > 0 { 362 - elapsed = t.Round(time.Second).String() 363 - } else { 364 - elapsed = now.Sub(latest.Created).Round(time.Second).String() 365 - } 366 - } 367 - dim := r.NewStyle().Faint(true) 368 - base := " " + statusIcon(status, m.spinner.View()) + " " + wf 401 + for i, wf := range m.pipeline.Workflows { 402 + status := wf.Status 403 + elapsed := workflowElapsed(wf, now).Round(time.Second).String() 404 + base := " " + statusIcon(status, m.spinner.View()) + " " + wf.Name 369 405 if i == m.selected { 370 406 tab := base 371 407 if elapsed != "" { ··· 376 412 } else { 377 413 tabs.WriteString(base) 378 414 if elapsed != "" { 415 + dim := r.NewStyle().Faint(true) 379 416 tabs.WriteString(" " + dim.Render(elapsed)) 380 417 } 381 418 tabs.WriteString(" ") ··· 383 420 } 384 421 385 422 tabsStr := tabs.String() 386 - infoStr := triggerLine(r, m.pipeline.Trigger, m.pipeline.Sha) + " · " + helpText(r) 423 + infoStr := triggerLine(r, m.pipeline.Trigger, m.pipeline.Commit) + " · " + helpText(r) 387 424 388 425 gap := max(m.width-lipgloss.Width(tabsStr)-lipgloss.Width(infoStr), 1) 389 426 ··· 410 447 return sha 411 448 } 412 449 413 - func triggerLine(r *lipgloss.Renderer, t *models.Trigger, sha string) string { 450 + func triggerLine(r *lipgloss.Renderer, t *tangled.CiDefs_Pipeline_Trigger, sha string) string { 414 451 hash := shortSha(sha) 415 452 dim := r.NewStyle().Faint(true) 416 453 if t == nil { 417 454 return dim.Render(hash) 418 455 } 419 - if t.IsPush() { 420 - return t.TargetRef() + dim.Render("@"+hash) + dim.Render(" (push)") 456 + if t.CiTrigger_Push != nil { 457 + return t.CiTrigger_Push.Ref + dim.Render("@"+hash) + dim.Render(" (push)") 421 458 } 422 - if t.IsPullRequest() { 459 + if t.CiTrigger_PullRequest != nil { 423 460 source := "" 424 - if t.PRSourceBranch != nil { 425 - source = *t.PRSourceBranch 461 + if t.CiTrigger_PullRequest.SourceBranch != nil { 462 + source = *t.CiTrigger_PullRequest.SourceBranch 426 463 } 427 - return t.TargetRef() + dim.Render(" <- "+source+"@"+hash) + dim.Render(" (pull-request)") 464 + return t.CiTrigger_PullRequest.TargetBranch + dim.Render(" <- "+source+"@"+hash) + dim.Render(" (pull-request)") 428 465 } 429 466 return dim.Render(hash) 430 467 } 431 468 432 - func statusIcon(status spindlemodel.StatusKind, spinnerFrame string) string { 469 + func statusIcon(status string, spinnerFrame string) string { 433 470 switch status { 434 - case spindlemodel.StatusKindSuccess: 471 + case "success": 435 472 return "✓" 436 - case spindlemodel.StatusKindFailed: 473 + case "failed": 437 474 return "×" 438 - case spindlemodel.StatusKindRunning: 475 + case "running": 439 476 return spinnerFrame 440 - case spindlemodel.StatusKindPending: 477 + case "pending": 441 478 return "·" 442 - case spindlemodel.StatusKindTimeout: 479 + case "timeout": 443 480 return "⌀" 444 - case spindlemodel.StatusKindCancelled: 481 + case "cancelled": 445 482 return "-" 446 483 default: 447 484 return "?" 448 485 } 449 486 } 487 + 488 + func derefStr(s *string) string { 489 + if s == nil { 490 + return "" 491 + } 492 + return *s 493 + } 494 + 495 + func parseRFC3339(s string) time.Time { 496 + t, err := time.Parse(time.RFC3339, s) 497 + if err != nil { 498 + return time.Time{} 499 + } 500 + return t 501 + }