Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// HTTP surface of the spindle. 4// 5// Four roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the 17// configured Provider so the appview (or a curling operator) can 18// pull captured workflow output for a specific run. 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "io" 26 "log/slog" 27 "net/http" 28 "strconv" 29 "time" 30 31 "github.com/gorilla/websocket" 32 "tangled.org/core/api/tangled" 33 34 "go.mitchellh.com/tack/internal/buildkite" 35) 36 37// runHTTP starts the spindle's HTTP server and blocks until ctx is 38// cancelled or the listener returns a fatal error. On ctx cancellation it 39// performs a graceful shutdown with a bounded timeout. 40// 41// The logger is read from ctx via loggerFrom. The broker is the 42// in-process pub/sub used by /events to fan published records out to 43// connected websocket subscribers. bkProvider may be nil — when a 44// deployment runs the fake provider, /webhooks/buildkite still 45// registers but responds 503, so a misdirected Buildkite webhook 46// gets a clear "this spindle isn't accepting Buildkite events" rather 47// than a misleading 200. 48func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error { 49 logger := loggerFrom(ctx) 50 51 mux := http.NewServeMux() 52 mux.HandleFunc("GET /", rootHandler()) 53 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 56 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider)) 57 58 srv := &http.Server{ 59 Addr: cfg.Addr, 60 Handler: mux, 61 ReadHeaderTimeout: 5 * time.Second, 62 } 63 64 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 65 errCh := make(chan error, 1) 66 go func() { 67 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 68 errCh <- srv.ListenAndServe() 69 }() 70 71 select { 72 case <-ctx.Done(): 73 logger.Info("shutting down") 74 case err := <-errCh: 75 // ErrServerClosed means we shut ourselves down cleanly elsewhere 76 // — anything else is a real failure to report. 77 if err != nil && !errors.Is(err, http.ErrServerClosed) { 78 return fmt.Errorf("http server: %w", err) 79 } 80 } 81 82 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 83 defer cancel() 84 return srv.Shutdown(shutdownCtx) 85} 86 87// rootHandler responds at "/" with a small identifier. Mainly useful as a 88// liveness check during deployment. 89func rootHandler() http.HandlerFunc { 90 return func(w http.ResponseWriter, r *http.Request) { 91 fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite") 92 } 93} 94 95// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 96// this spindle's owner during registration. 97func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 98 return func(w http.ResponseWriter, r *http.Request) { 99 w.Header().Set("Content-Type", "application/json") 100 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 101 logger.Error("encode owner response", "err", err) 102 } 103 } 104} 105 106// buildkiteWebhookHandler receives Buildkite Pipelines webhook events, 107// authenticates the request against whichever scheme the provider was 108// configured with, and hands the decoded payload to the provider for 109// translation into a sh.tangled.pipeline.status publish. 110// 111// Authentication is intentionally fail-closed: when bk is nil (no 112// Buildkite provider configured) we 503 instead of accepting events 113// silently. The body is buffered up front because signature mode 114// HMACs the raw bytes — we can't rely on the JSON decoder reading 115// the request body before verification. 116// 117// Acknowledgement contract with Buildkite: we 200 on any well-formed 118// event we accepted (including events we deliberately ignore, like 119// job.* or builds we don't track), and 5xx only on internal failure 120// the operator should look at. A 4xx/5xx makes Buildkite retry, 121// which we don't want for "this isn't an event we care about". 122func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc { 123 return func(w http.ResponseWriter, r *http.Request) { 124 if bk == nil { 125 http.Error(w, "buildkite provider not configured", 126 http.StatusServiceUnavailable) 127 return 128 } 129 130 // Cap body size so a malicious sender can't exhaust 131 // memory; Buildkite payloads in practice are well under 132 // 64 KiB but a generous-but-bounded ceiling is the 133 // right shape here. 134 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) 135 if err != nil { 136 logger.Warn("buildkite webhook: read body", "err", err) 137 http.Error(w, "read body", http.StatusBadRequest) 138 return 139 } 140 141 if err := bk.VerifyWebhook(r.Header, body); err != nil { 142 logger.Warn("buildkite webhook: verify failed", 143 "err", err, 144 "remote", r.RemoteAddr, 145 ) 146 http.Error(w, "unauthorized", http.StatusUnauthorized) 147 return 148 } 149 150 var payload buildkite.WebhookPayload 151 if err := json.Unmarshal(body, &payload); err != nil { 152 logger.Warn("buildkite webhook: decode body", "err", err) 153 http.Error(w, "bad payload", http.StatusBadRequest) 154 return 155 } 156 // The X-Buildkite-Event header is authoritative for the 157 // event name; the body field is convenience but doesn't 158 // always match exactly. Prefer the header. 159 if h := r.Header.Get("X-Buildkite-Event"); h != "" { 160 payload.Event = h 161 } 162 163 // Translate + publish on the request context so a slow 164 // store/broker doesn't outlive an aborted webhook 165 // connection. 166 if err := bk.HandleWebhook(r.Context(), payload); err != nil { 167 logger.Error("buildkite webhook: handle", "err", err, 168 "event", payload.Event, 169 "build_uuid", payload.Build.ID, 170 ) 171 http.Error(w, "internal error", http.StatusInternalServerError) 172 return 173 } 174 w.WriteHeader(http.StatusOK) 175 } 176} 177 178// logsHandler serves captured workflow logs over a WebSocket, 179// matching the wire protocol of the upstream Tangled spindle so the 180// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in 181// source. The path shape is 182// 183// GET /logs/{knot}/{pipelineRkey}/{workflow} 184// 185// which matches the (knot, pipelineRkey, workflow) tuple 186// Provider.Spawn is invoked with — the same identity used in the 187// pipeline ATURI. Workflow names commonly contain a dot (e.g. 188// "test.yml"); ServeMux path patterns match a single segment, so the 189// literal value flows straight through r.PathValue. 190// 191// Wire shape per frame: a single TextMessage carrying one JSON 192// LogLine record (defined in provider.go; byte-compatible with 193// tangled.org/core/spindle/models.LogLine). The Provider hands us a 194// channel of LogLine values; we marshal each one and forward it as 195// one frame so the appview's per-line decode path works unchanged. 196// 197// Error mapping is intentionally done *before* the WebSocket upgrade: 198// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500 199// so the appview's websocket.DefaultDialer surfaces a real HTTP 200// status rather than an immediate close. 201func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc { 202 upgrader := websocket.Upgrader{ 203 ReadBufferSize: 1024, 204 WriteBufferSize: 1024, 205 } 206 return func(w http.ResponseWriter, r *http.Request) { 207 knot := r.PathValue("knot") 208 pipelineRkey := r.PathValue("pipelineRkey") 209 workflow := r.PathValue("workflow") 210 211 // Defensive: ServeMux won't match an empty segment, but a 212 // future router change shouldn't be allowed to silently 213 // produce an "all logs" query. 214 if knot == "" || pipelineRkey == "" || workflow == "" { 215 http.Error(w, "missing path component", http.StatusBadRequest) 216 return 217 } 218 219 // Establish the log channel BEFORE the WebSocket upgrade so 220 // ErrLogsNotFound / backend errors surface as a real HTTP 221 // status to the appview's dialer rather than as an immediate 222 // post-upgrade close. ctx scopes the producer's lifetime — 223 // it's cancelled below the moment the client disconnects. 224 ctx, cancel := context.WithCancel(r.Context()) 225 defer cancel() 226 227 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow) 228 if err != nil { 229 if errors.Is(err, ErrLogsNotFound) { 230 http.Error(w, "logs not found", http.StatusNotFound) 231 return 232 } 233 logger.Error("logs fetch failed", 234 "err", err, 235 "knot", knot, 236 "pipeline_rkey", pipelineRkey, 237 "workflow", workflow, 238 ) 239 http.Error(w, "logs unavailable", http.StatusInternalServerError) 240 return 241 } 242 243 conn, err := upgrader.Upgrade(w, r, nil) 244 if err != nil { 245 // Upgrade already wrote a response; just record the 246 // failure for diagnostics. 247 logger.Error("logs websocket upgrade failed", 248 "err", err, 249 "knot", knot, 250 "pipeline_rkey", pipelineRkey, 251 "workflow", workflow, 252 ) 253 return 254 } 255 defer func() { 256 // Send a close frame on the way out so the appview proxy 257 // sees a clean shutdown. Mirrors upstream 258 // spindle.(*Spindle).Logs. 259 _ = conn.WriteControl( 260 websocket.CloseMessage, 261 websocket.FormatCloseMessage( 262 websocket.CloseNormalClosure, "log stream complete", 263 ), 264 time.Now().Add(time.Second), 265 ) 266 conn.Close() 267 }() 268 269 // Detect client disconnect by trying to read; we don't expect 270 // any payloads from the client, so any read outcome (including 271 // EOF) signals the connection has gone away. The cancel hits 272 // the producer goroutine inside the Provider, which stops 273 // sending and closes ch — our drain loop then exits cleanly. 274 go func() { 275 for { 276 if _, _, err := conn.NextReader(); err != nil { 277 cancel() 278 return 279 } 280 } 281 }() 282 283 // Drain the channel; closure means the run is complete (or 284 // the producer hit ctx). Marshal-then-write each LogLine as 285 // a single TextMessage frame. 286 for { 287 select { 288 case <-ctx.Done(): 289 return 290 case line, ok := <-ch: 291 if !ok { 292 return 293 } 294 frame, err := json.Marshal(line) 295 if err != nil { 296 // The struct is fully internal; a marshal failure 297 // is a programmer bug. Log and bail rather than 298 // poison the stream with a half-frame. 299 logger.Error("marshal log line", 300 "err", err, 301 "knot", knot, 302 "pipeline_rkey", pipelineRkey, 303 "workflow", workflow, 304 ) 305 return 306 } 307 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 308 logger.Debug("logs frame write failed", 309 "err", err, 310 "knot", knot, 311 "pipeline_rkey", pipelineRkey, 312 "workflow", workflow, 313 ) 314 return 315 } 316 } 317 } 318 } 319} 320 321// eventsHandler upgrades to a WebSocket and streams persisted records 322// to the connected client. The wire protocol mirrors the upstream 323// Tangled spindle so the appview's eventconsumer treats us as a 324// drop-in source: 325// 326// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 327// means "from the beginning of our retained log". 328// - We do a backfill pass first (everything with created > cursor), 329// then loop: on each broker signal, drain new rows; on a 30s 330// timer, write a websocket ping so intermediaries don't idle the 331// connection out. 332// 333// We subscribe to the broker *before* the backfill so any Publish that 334// races between the cursor read and the loop entry is captured by the 335// pending channel signal — the loop will see it on its first iteration 336// and call streamEvents again, which is idempotent on the cursor. 337func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 338 upgrader := websocket.Upgrader{ 339 ReadBufferSize: 1024, 340 WriteBufferSize: 1024, 341 } 342 return func(w http.ResponseWriter, r *http.Request) { 343 conn, err := upgrader.Upgrade(w, r, nil) 344 if err != nil { 345 logger.Error("websocket upgrade failed", "err", err) 346 return 347 } 348 defer conn.Close() 349 350 // Parse the resume cursor up front. An unparseable cursor is a 351 // client bug, but rather than 4xx the upgraded connection we 352 // log it and start from zero — same behaviour as the upstream 353 // spindle. 354 var cursor int64 355 if raw := r.URL.Query().Get("cursor"); raw != "" { 356 parsed, err := strconv.ParseInt(raw, 10, 64) 357 if err != nil { 358 logger.Warn("events: bad cursor, starting from 0", 359 "cursor", raw, "err", err, 360 ) 361 } else { 362 cursor = parsed 363 } 364 } 365 logger.Debug("events client connected", 366 "remote", r.RemoteAddr, "cursor", cursor, 367 ) 368 369 // Subscribe before the backfill so a Publish that races between 370 // the EventsAfter read and our select loop is captured by the 371 // pending channel signal — we'll re-drain on the first wake-up. 372 sig := br.Subscribe() 373 defer br.Unsubscribe(sig) 374 375 ctx, cancel := context.WithCancel(r.Context()) 376 defer cancel() 377 378 // Detect client disconnect by trying to read; we don't expect 379 // any payloads from the client, so any read outcome (including 380 // EOF) signals the connection has gone away. 381 go func() { 382 for { 383 if _, _, err := conn.NextReader(); err != nil { 384 cancel() 385 return 386 } 387 } 388 }() 389 390 // Initial backfill. If this fails the connection is unusable 391 // (we can't promise ordering after a partial write) so just 392 // return and let the client reconnect with the same cursor. 393 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 394 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 395 return 396 } 397 398 ticker := time.NewTicker(30 * time.Second) 399 defer ticker.Stop() 400 for { 401 select { 402 case <-ctx.Done(): 403 logger.Debug("events client disconnected", 404 "remote", r.RemoteAddr, "cursor", cursor, 405 ) 406 return 407 case <-sig: 408 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 409 logger.Debug("events stream ended", "err", err, "cursor", cursor) 410 return 411 } 412 case <-ticker.C: 413 if err := conn.WriteControl( 414 websocket.PingMessage, nil, 415 time.Now().Add(time.Second), 416 ); err != nil { 417 logger.Debug("events ping failed", "err", err) 418 return 419 } 420 } 421 } 422 } 423} 424 425// streamEvents drains every event row with `created > *cursor`, writes 426// each as a wire envelope frame, and advances *cursor in lockstep. The 427// cursor is updated *after* the write succeeds so a half-flushed batch 428// (interrupted by a websocket error) replays cleanly on the next 429// connection. 430// 431// It is safe to call repeatedly: when there are no new rows the query 432// returns an empty slice and we noop. 433func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 434 rows, err := st.EventsAfter(ctx, *cursor) 435 if err != nil { 436 return fmt.Errorf("read events: %w", err) 437 } 438 for _, row := range rows { 439 frame, err := json.Marshal(eventsEnvelope{ 440 Rkey: row.Rkey, 441 Nsid: row.Nsid, 442 Event: row.EventJSON, 443 Created: row.Created, 444 }) 445 if err != nil { 446 return fmt.Errorf("marshal envelope: %w", err) 447 } 448 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 449 return fmt.Errorf("write frame: %w", err) 450 } 451 *cursor = row.Created 452 } 453 return nil 454}