Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

wip: appview/pipelines: use new ci models

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit fb886a6d parent 9bee953c change-id pystknpk
+134 -111
+23 -37
appview/pipelines/logs.go
··· 1 1 package pipelines 2 2 3 3 import ( 4 - "fmt" 4 + "errors" 5 5 "html/template" 6 - "net/url" 7 6 "regexp" 8 7 "strings" 8 + "time" 9 9 10 - "github.com/bluesky-social/indigo/atproto/syntax" 11 10 terminal "github.com/buildkite/terminal-to-html/v3" 12 11 "github.com/gorilla/websocket" 13 - "tangled.org/core/api/tangled" 14 12 "tangled.org/core/appview/pages/markup" 15 - "tangled.org/core/hostutil" 16 13 ) 17 14 18 15 // matches any ANSI escape sequence: ESC [ <params> m ··· 55 52 return template.HTML(sanitized) 56 53 } 57 54 58 - type LogEvent struct { 59 - Msg []byte 60 - Err error 61 - } 62 - 63 - func (ev *LogEvent) IsCloseError() bool { 64 - return websocket.IsCloseError( 65 - ev.Err, 66 - websocket.CloseNormalClosure, 67 - websocket.CloseGoingAway, 68 - websocket.CloseAbnormalClosure, 69 - ) 70 - } 71 - 72 - func ReadLogs(conn *websocket.Conn, ch chan LogEvent) { 73 - defer close(ch) 74 - for { 75 - if conn == nil { 76 - return 77 - } 78 - _, msg, err := conn.ReadMessage() 79 - if err != nil { 80 - ch <- LogEvent{Err: err} 81 - return 55 + // isExpectedClose reports whether err is a clean websocket close (or nil). 56 + func isExpectedClose(err error) bool { 57 + if err == nil { 58 + return true 59 + } 60 + var ce *websocket.CloseError 61 + if errors.As(err, &ce) { 62 + switch ce.Code { 63 + case websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure: 64 + return true 82 65 } 83 - ch <- LogEvent{Msg: msg} 84 66 } 67 + return false 85 68 } 86 69 87 - func SubscribeLogsUrl(spindle string, pipeline syntax.TID, workflow string) string { 88 - u, err := hostutil.EnsureWsScheme(spindle) 89 - if err != nil { 70 + func derefStr(s *string) string { 71 + if s == nil { 90 72 return "" 91 73 } 74 + return *s 75 + } 92 76 93 - query := url.Values{} 94 - query.Set("pipeline", pipeline.String()) 95 - query.Set("workflow", workflow) 96 - return fmt.Sprintf("%s/xrpc/%s?%s", u, tangled.CiPipelineSubscribeLogsNSID, query.Encode()) 77 + func parseRFC3339(s string) time.Time { 78 + t, err := time.Parse(time.RFC3339, s) 79 + if err != nil { 80 + return time.Time{} 81 + } 82 + return t 97 83 }
+111 -74
appview/pipelines/pipelines.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 - "encoding/json" 7 6 "log/slog" 8 7 "net/http" 8 + "sync" 9 9 "time" 10 10 11 11 "tangled.org/core/api/tangled" ··· 17 17 "tangled.org/core/appview/reporesolver" 18 18 "tangled.org/core/hostutil" 19 19 "tangled.org/core/idresolver" 20 + "tangled.org/core/lexutil" 20 21 "tangled.org/core/orm" 21 22 "tangled.org/core/rbac" 22 - spindlemodel "tangled.org/core/spindle/models" 23 23 24 24 "github.com/bluesky-social/indigo/atproto/syntax" 25 25 indigoxrpc "github.com/bluesky-social/indigo/xrpc" ··· 180 180 WriteBufferSize: 1024, 181 181 } 182 182 183 + type webLogScheduler struct { 184 + ch chan *tangled.CiPipelineSubscribeLogs_Event 185 + } 186 + 187 + var _ lexutil.Scheduler[tangled.CiPipelineSubscribeLogs_Event] = (*webLogScheduler)(nil) 188 + 189 + // AddWork implements [lexutil.Scheduler]. 190 + func (w *webLogScheduler) AddWork(ctx context.Context, _ string, val *tangled.CiPipelineSubscribeLogs_Event) error { 191 + select { 192 + case w.ch <- val: 193 + return nil 194 + case <-ctx.Done(): 195 + return ctx.Err() 196 + } 197 + } 198 + 199 + // Shutdown implements [lexutil.Scheduler]. 200 + func (w *webLogScheduler) Shutdown() { close(w.ch) } 201 + 183 202 func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 184 203 l := p.logger.With("handler", "logs") 185 204 ··· 209 228 return 210 229 } 211 230 212 - logsUrl := SubscribeLogsUrl(f.Spindle, pipelineId, workflowName) 213 - if logsUrl == "" { 214 - http.Error(w, "invalid spindle hostname", http.StatusBadRequest) 215 - return 216 - } 217 - l = l.With("url", logsUrl) 218 - 219 231 clientConn, err := upgrader.Upgrade(w, r, nil) 220 232 if err != nil { 221 233 l.Error("websocket upgrade failed", "err", err) 222 234 return 223 235 } 224 - defer func() { 225 - _ = clientConn.WriteControl( 226 - websocket.CloseMessage, 227 - websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 228 - time.Now().Add(time.Second), 229 - ) 230 - clientConn.Close() 231 - }() 236 + defer clientConn.Close() 232 237 233 238 ctx, cancel := context.WithCancel(r.Context()) 234 239 defer cancel() 235 240 236 - l.Info("logs endpoint hit") 241 + evChan := make(chan *tangled.CiPipelineSubscribeLogs_Event, 100) 242 + done := make(chan error, 1) 243 + sched := &webLogScheduler{ch: evChan} 244 + xrpcc := &lexutil.Client{Client: indigoxrpc.Client{Host: f.Spindle}} 245 + go func() { 246 + done <- tangled.CiPipelineSubscribeLogs(ctx, xrpcc, pipelineId.String(), []string{workflowName}, sched) 247 + }() 248 + 249 + var lastWriteLk sync.Mutex 250 + lastWrite := time.Now() 237 251 238 - spindleConn, _, err := websocket.DefaultDialer.Dial(logsUrl, nil) 239 - if err != nil { 240 - l.Error("websocket dial failed", "err", err) 241 - return 242 - } 243 - defer spindleConn.Close() 252 + // Start a goroutine to ping the client periodically to check if it's still 253 + // alive. If the client doesn't respond to a ping within 5 seconds, we'll 254 + // close the connection and teardown the consumer. 255 + go func() { 256 + ticker := time.NewTicker(30 * time.Second) 257 + defer ticker.Stop() 258 + for { 259 + select { 260 + case <-ticker.C: 261 + lastWriteLk.Lock() 262 + lw := lastWrite 263 + lastWriteLk.Unlock() 264 + if time.Since(lw) < 30*time.Second { 265 + continue 266 + } 267 + if err := clientConn.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)); err != nil { 268 + l.Warn("failed to ping client", "err", err) 269 + cancel() 270 + return 271 + } 272 + case <-ctx.Done(): 273 + return 274 + } 275 + } 276 + }() 244 277 245 - // create a channel for incoming messages 246 - evChan := make(chan LogEvent, 100) 247 - // start a goroutine to read from spindle 248 - go ReadLogs(spindleConn, evChan) 278 + clientConn.SetPingHandler(func(message string) error { 279 + err := clientConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(60*time.Second)) 280 + if err == websocket.ErrCloseSent { 281 + return nil 282 + } 283 + return err 284 + }) 285 + 286 + // Start a goroutine to read messages from the client and discard them. 287 + go func() { 288 + for { 289 + if _, _, err := clientConn.ReadMessage(); err != nil { 290 + cancel() 291 + return 292 + } 293 + } 294 + }() 249 295 296 + // Main loop: sole writer of data frames to the client. 250 297 stepStartTimes := make(map[int]time.Time) 251 298 stepAnsi := make(map[int]*ansiState) 252 299 var fragment bytes.Buffer ··· 258 305 259 306 case ev, ok := <-evChan: 260 307 if !ok { 261 - continue 262 - } 263 - 264 - if ev.Err != nil && ev.IsCloseError() { 265 - l.Debug("graceful shutdown, tail complete", "err", err) 266 - return 267 - } 268 - if ev.Err != nil { 269 - l.Error("error reading from spindle", "err", err) 308 + // Stream ended: Shutdown closed the upstream channel. 309 + if err := <-done; !isExpectedClose(err) { 310 + l.Error("spindle stream error", "err", err) 311 + } 270 312 return 271 313 } 272 314 273 - var logLine spindlemodel.LogLine 274 - if err = json.Unmarshal(ev.Msg, &logLine); err != nil { 275 - l.Error("failed to parse logline", "err", err) 276 - continue 277 - } 278 - 279 315 fragment.Reset() 280 316 281 - switch logLine.Kind { 282 - case spindlemodel.LogKindControl: 283 - switch logLine.StepStatus { 284 - case spindlemodel.StepStatusStart: 285 - stepStartTimes[logLine.StepId] = logLine.Time 286 - collapsed := false 287 - if logLine.StepKind == spindlemodel.StepKindSystem { 288 - collapsed = true 289 - } 317 + switch { 318 + case ev.Error != nil: 319 + l.Error("spindle error frame", "err", ev.Error.Error, "msg", ev.Error.Message) 320 + return 321 + 322 + case ev.Control != nil: 323 + c := ev.Control 324 + step := int(c.Step) 325 + switch derefStr(c.Status) { 326 + case "start": 327 + t := parseRFC3339(c.Time) 328 + stepStartTimes[step] = t 329 + // "system" steps are injected by the CI runner; collapse them. 330 + collapsed := derefStr(c.Kind) == "system" 290 331 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 291 - Id: logLine.StepId, 292 - Name: logLine.Content, 293 - Command: logLine.StepCommand, 332 + Id: step, 333 + Name: c.Content, 334 + Command: derefStr(c.Command), 294 335 Collapsed: collapsed, 295 - StartTime: logLine.Time, 336 + StartTime: t, 296 337 }) 297 - case spindlemodel.StepStatusEnd: 298 - startTime := stepStartTimes[logLine.StepId] 299 - endTime := logLine.Time 338 + case "end": 300 339 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 301 - Id: logLine.StepId, 302 - StartTime: startTime, 303 - EndTime: endTime, 340 + Id: step, 341 + StartTime: stepStartTimes[step], 342 + EndTime: parseRFC3339(c.Time), 304 343 }) 305 344 } 306 345 307 - case spindlemodel.LogKindData: 308 - ansi, ok := stepAnsi[logLine.StepId] 346 + case ev.Data != nil: 347 + d := ev.Data 348 + step := int(d.Step) 349 + ansi, ok := stepAnsi[step] 309 350 if !ok { 310 351 ansi = NewAnsiState() 311 - stepAnsi[logLine.StepId] = ansi 352 + stepAnsi[step] = ansi 312 353 } 313 354 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 314 - Id: logLine.StepId, 315 - Content: ansi.Render(logLine.Content), 355 + Id: step, 356 + Content: ansi.Render(d.Content), 316 357 }) 317 358 } 318 359 if err != nil { ··· 324 365 l.Error("error writing to client", "err", err) 325 366 return 326 367 } 327 - 328 - case <-time.After(30 * time.Second): 329 - l.Debug("sent keepalive") 330 - if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 331 - l.Error("failed to write control", "err", err) 332 - return 333 - } 368 + lastWriteLk.Lock() 369 + lastWrite = time.Now() 370 + lastWriteLk.Unlock() 334 371 } 335 372 } 336 373 }