Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// HTTP surface of the spindle. 4// 5// Four roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the 17// configured Provider so the appview (or a curling operator) can 18// pull captured workflow output for a specific run. 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "log/slog" 26 "net/http" 27 "strconv" 28 "time" 29 30 "github.com/gorilla/websocket" 31 "tangled.org/core/api/tangled" 32) 33 34// runHTTP starts the spindle's HTTP server and blocks until ctx is 35// cancelled or the listener returns a fatal error. On ctx cancellation it 36// performs a graceful shutdown with a bounded timeout. 37// 38// The logger is read from ctx via loggerFrom. The broker is the 39// in-process pub/sub used by /events to fan published records out to 40// connected websocket subscribers. 41func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider) error { 42 logger := loggerFrom(ctx) 43 44 mux := http.NewServeMux() 45 mux.HandleFunc("GET /", rootHandler()) 46 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 47 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 48 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 49 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler()) 50 51 srv := &http.Server{ 52 Addr: cfg.Addr, 53 Handler: mux, 54 ReadHeaderTimeout: 5 * time.Second, 55 } 56 57 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 58 errCh := make(chan error, 1) 59 go func() { 60 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 61 errCh <- srv.ListenAndServe() 62 }() 63 64 select { 65 case <-ctx.Done(): 66 logger.Info("shutting down") 67 case err := <-errCh: 68 // ErrServerClosed means we shut ourselves down cleanly elsewhere 69 // — anything else is a real failure to report. 70 if err != nil && !errors.Is(err, http.ErrServerClosed) { 71 return fmt.Errorf("http server: %w", err) 72 } 73 } 74 75 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 76 defer cancel() 77 return srv.Shutdown(shutdownCtx) 78} 79 80// rootHandler responds at "/" with a small identifier. Mainly useful as a 81// liveness check during deployment. 82func rootHandler() http.HandlerFunc { 83 return func(w http.ResponseWriter, r *http.Request) { 84 fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite") 85 } 86} 87 88// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 89// this spindle's owner during registration. 90func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 91 return func(w http.ResponseWriter, r *http.Request) { 92 w.Header().Set("Content-Type", "application/json") 93 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 94 logger.Error("encode owner response", "err", err) 95 } 96 } 97} 98 99// buildkiteWebhookHandler is a placeholder until we implement Buildkite -> 100// pipeline.status translation. 101func buildkiteWebhookHandler() http.HandlerFunc { 102 return func(w http.ResponseWriter, r *http.Request) { 103 http.Error(w, "not implemented", http.StatusNotImplemented) 104 } 105} 106 107// logsHandler serves captured workflow logs over a WebSocket, 108// matching the wire protocol of the upstream Tangled spindle so the 109// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in 110// source. The path shape is 111// 112// GET /logs/{knot}/{pipelineRkey}/{workflow} 113// 114// which matches the (knot, pipelineRkey, workflow) tuple 115// Provider.Spawn is invoked with — the same identity used in the 116// pipeline ATURI. Workflow names commonly contain a dot (e.g. 117// "test.yml"); ServeMux path patterns match a single segment, so the 118// literal value flows straight through r.PathValue. 119// 120// Wire shape per frame: a single TextMessage carrying one JSON 121// LogLine record (defined in provider.go; byte-compatible with 122// tangled.org/core/spindle/models.LogLine). The Provider hands us a 123// channel of LogLine values; we marshal each one and forward it as 124// one frame so the appview's per-line decode path works unchanged. 125// 126// Error mapping is intentionally done *before* the WebSocket upgrade: 127// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500 128// so the appview's websocket.DefaultDialer surfaces a real HTTP 129// status rather than an immediate close. 130func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc { 131 upgrader := websocket.Upgrader{ 132 ReadBufferSize: 1024, 133 WriteBufferSize: 1024, 134 } 135 return func(w http.ResponseWriter, r *http.Request) { 136 knot := r.PathValue("knot") 137 pipelineRkey := r.PathValue("pipelineRkey") 138 workflow := r.PathValue("workflow") 139 140 // Defensive: ServeMux won't match an empty segment, but a 141 // future router change shouldn't be allowed to silently 142 // produce an "all logs" query. 143 if knot == "" || pipelineRkey == "" || workflow == "" { 144 http.Error(w, "missing path component", http.StatusBadRequest) 145 return 146 } 147 148 // Establish the log channel BEFORE the WebSocket upgrade so 149 // ErrLogsNotFound / backend errors surface as a real HTTP 150 // status to the appview's dialer rather than as an immediate 151 // post-upgrade close. ctx scopes the producer's lifetime — 152 // it's cancelled below the moment the client disconnects. 153 ctx, cancel := context.WithCancel(r.Context()) 154 defer cancel() 155 156 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow) 157 if err != nil { 158 if errors.Is(err, ErrLogsNotFound) { 159 http.Error(w, "logs not found", http.StatusNotFound) 160 return 161 } 162 logger.Error("logs fetch failed", 163 "err", err, 164 "knot", knot, 165 "pipeline_rkey", pipelineRkey, 166 "workflow", workflow, 167 ) 168 http.Error(w, "logs unavailable", http.StatusInternalServerError) 169 return 170 } 171 172 conn, err := upgrader.Upgrade(w, r, nil) 173 if err != nil { 174 // Upgrade already wrote a response; just record the 175 // failure for diagnostics. 176 logger.Error("logs websocket upgrade failed", 177 "err", err, 178 "knot", knot, 179 "pipeline_rkey", pipelineRkey, 180 "workflow", workflow, 181 ) 182 return 183 } 184 defer func() { 185 // Send a close frame on the way out so the appview proxy 186 // sees a clean shutdown. Mirrors upstream 187 // spindle.(*Spindle).Logs. 188 _ = conn.WriteControl( 189 websocket.CloseMessage, 190 websocket.FormatCloseMessage( 191 websocket.CloseNormalClosure, "log stream complete", 192 ), 193 time.Now().Add(time.Second), 194 ) 195 conn.Close() 196 }() 197 198 // Detect client disconnect by trying to read; we don't expect 199 // any payloads from the client, so any read outcome (including 200 // EOF) signals the connection has gone away. The cancel hits 201 // the producer goroutine inside the Provider, which stops 202 // sending and closes ch — our drain loop then exits cleanly. 203 go func() { 204 for { 205 if _, _, err := conn.NextReader(); err != nil { 206 cancel() 207 return 208 } 209 } 210 }() 211 212 // Drain the channel; closure means the run is complete (or 213 // the producer hit ctx). Marshal-then-write each LogLine as 214 // a single TextMessage frame. 215 for { 216 select { 217 case <-ctx.Done(): 218 return 219 case line, ok := <-ch: 220 if !ok { 221 return 222 } 223 frame, err := json.Marshal(line) 224 if err != nil { 225 // The struct is fully internal; a marshal failure 226 // is a programmer bug. Log and bail rather than 227 // poison the stream with a half-frame. 228 logger.Error("marshal log line", 229 "err", err, 230 "knot", knot, 231 "pipeline_rkey", pipelineRkey, 232 "workflow", workflow, 233 ) 234 return 235 } 236 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 237 logger.Debug("logs frame write failed", 238 "err", err, 239 "knot", knot, 240 "pipeline_rkey", pipelineRkey, 241 "workflow", workflow, 242 ) 243 return 244 } 245 } 246 } 247 } 248} 249 250// eventsHandler upgrades to a WebSocket and streams persisted records 251// to the connected client. The wire protocol mirrors the upstream 252// Tangled spindle so the appview's eventconsumer treats us as a 253// drop-in source: 254// 255// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 256// means "from the beginning of our retained log". 257// - We do a backfill pass first (everything with created > cursor), 258// then loop: on each broker signal, drain new rows; on a 30s 259// timer, write a websocket ping so intermediaries don't idle the 260// connection out. 261// 262// We subscribe to the broker *before* the backfill so any Publish that 263// races between the cursor read and the loop entry is captured by the 264// pending channel signal — the loop will see it on its first iteration 265// and call streamEvents again, which is idempotent on the cursor. 266func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 267 upgrader := websocket.Upgrader{ 268 ReadBufferSize: 1024, 269 WriteBufferSize: 1024, 270 } 271 return func(w http.ResponseWriter, r *http.Request) { 272 conn, err := upgrader.Upgrade(w, r, nil) 273 if err != nil { 274 logger.Error("websocket upgrade failed", "err", err) 275 return 276 } 277 defer conn.Close() 278 279 // Parse the resume cursor up front. An unparseable cursor is a 280 // client bug, but rather than 4xx the upgraded connection we 281 // log it and start from zero — same behaviour as the upstream 282 // spindle. 283 var cursor int64 284 if raw := r.URL.Query().Get("cursor"); raw != "" { 285 parsed, err := strconv.ParseInt(raw, 10, 64) 286 if err != nil { 287 logger.Warn("events: bad cursor, starting from 0", 288 "cursor", raw, "err", err, 289 ) 290 } else { 291 cursor = parsed 292 } 293 } 294 logger.Debug("events client connected", 295 "remote", r.RemoteAddr, "cursor", cursor, 296 ) 297 298 // Subscribe before the backfill so a Publish that races between 299 // the EventsAfter read and our select loop is captured by the 300 // pending channel signal — we'll re-drain on the first wake-up. 301 sig := br.Subscribe() 302 defer br.Unsubscribe(sig) 303 304 ctx, cancel := context.WithCancel(r.Context()) 305 defer cancel() 306 307 // Detect client disconnect by trying to read; we don't expect 308 // any payloads from the client, so any read outcome (including 309 // EOF) signals the connection has gone away. 310 go func() { 311 for { 312 if _, _, err := conn.NextReader(); err != nil { 313 cancel() 314 return 315 } 316 } 317 }() 318 319 // Initial backfill. If this fails the connection is unusable 320 // (we can't promise ordering after a partial write) so just 321 // return and let the client reconnect with the same cursor. 322 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 323 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 324 return 325 } 326 327 ticker := time.NewTicker(30 * time.Second) 328 defer ticker.Stop() 329 for { 330 select { 331 case <-ctx.Done(): 332 logger.Debug("events client disconnected", 333 "remote", r.RemoteAddr, "cursor", cursor, 334 ) 335 return 336 case <-sig: 337 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 338 logger.Debug("events stream ended", "err", err, "cursor", cursor) 339 return 340 } 341 case <-ticker.C: 342 if err := conn.WriteControl( 343 websocket.PingMessage, nil, 344 time.Now().Add(time.Second), 345 ); err != nil { 346 logger.Debug("events ping failed", "err", err) 347 return 348 } 349 } 350 } 351 } 352} 353 354// streamEvents drains every event row with `created > *cursor`, writes 355// each as a wire envelope frame, and advances *cursor in lockstep. The 356// cursor is updated *after* the write succeeds so a half-flushed batch 357// (interrupted by a websocket error) replays cleanly on the next 358// connection. 359// 360// It is safe to call repeatedly: when there are no new rows the query 361// returns an empty slice and we noop. 362func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 363 rows, err := st.EventsAfter(ctx, *cursor) 364 if err != nil { 365 return fmt.Errorf("read events: %w", err) 366 } 367 for _, row := range rows { 368 frame, err := json.Marshal(eventsEnvelope{ 369 Rkey: row.Rkey, 370 Nsid: row.Nsid, 371 Event: row.EventJSON, 372 Created: row.Created, 373 }) 374 if err != nil { 375 return fmt.Errorf("marshal envelope: %w", err) 376 } 377 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 378 return fmt.Errorf("write frame: %w", err) 379 } 380 *cursor = row.Created 381 } 382 return nil 383}