Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// HTTP surface of the spindle. 4// 5// Four roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the 17// configured Provider so the appview (or a curling operator) can 18// pull captured workflow output for a specific run. 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "io" 26 "log/slog" 27 "net/http" 28 "strconv" 29 "time" 30 31 "github.com/gorilla/websocket" 32 "tangled.org/core/api/tangled" 33 34 "go.mitchellh.com/tack/internal/buildkite" 35) 36 37// runHTTP starts the spindle's HTTP server and blocks until ctx is 38// cancelled or the listener returns a fatal error. On ctx cancellation it 39// performs a graceful shutdown with a bounded timeout. 40// 41// The logger is read from ctx via loggerFrom. The broker is the 42// in-process pub/sub used by /events to fan published records out to 43// connected websocket subscribers. bkProvider may be nil — when a 44// deployment runs the fake provider, /webhooks/buildkite still 45// registers but responds 503, so a misdirected Buildkite webhook 46// gets a clear "this spindle isn't accepting Buildkite events" rather 47// than a misleading 200. 48func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error { 49 logger := loggerFrom(ctx) 50 51 mux := http.NewServeMux() 52 mux.HandleFunc("GET /", rootHandler()) 53 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 56 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider)) 57 58 srv := &http.Server{ 59 Addr: cfg.Addr, 60 Handler: mux, 61 ReadHeaderTimeout: 5 * time.Second, 62 } 63 64 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 65 errCh := make(chan error, 1) 66 go func() { 67 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 68 errCh <- srv.ListenAndServe() 69 }() 70 71 select { 72 case <-ctx.Done(): 73 logger.Info("shutting down") 74 case err := <-errCh: 75 // ErrServerClosed means we shut ourselves down cleanly elsewhere 76 // — anything else is a real failure to report. 77 if err != nil && !errors.Is(err, http.ErrServerClosed) { 78 return fmt.Errorf("http server: %w", err) 79 } 80 } 81 82 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 83 defer cancel() 84 return srv.Shutdown(shutdownCtx) 85} 86 87// rootHandler responds at "/" with a friendly identifier page. Mainly 88// useful as a liveness check during deployment, and as a calling card 89// for the curious operator who hits the root in a browser. 90func rootHandler() http.HandlerFunc { 91 // Plain-text banner. "tack" rendered in figlet's larry3d font; the 92 // shape mirrors the style the bluesky PDS serves at its own root, 93 // which feels like the right vibe for an atproto-adjacent service. 94 const banner = ` __ __ 95/\ \__ /\ \ 96\ \ ,_\ __ ___\ \ \/'\ 97 \ \ \/ /'__'\ /'___\ \ , < 98 \ \ \_/\ \L\.\_/\ \__/\ \ \\'\ 99 \ \__\ \__/.\_\ \____\\ \_\ \_\ 100 \/__/\/__/\/_/\/____/ \/_/\/_/ 101 102 103This is a tack server: a tangled spindle for other CI services. 104 105 Code: https://github.com/mitchellh/tack 106` 107 return func(w http.ResponseWriter, r *http.Request) { 108 // Only respond at exactly "/" — without this guard, the 109 // "GET /" pattern would also catch arbitrary unmatched 110 // paths like "/foo" and lie about being the root. 111 if r.URL.Path != "/" { 112 http.NotFound(w, r) 113 return 114 } 115 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 116 fmt.Fprint(w, banner) 117 } 118} 119 120// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 121// this spindle's owner during registration. 122func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 123 return func(w http.ResponseWriter, r *http.Request) { 124 w.Header().Set("Content-Type", "application/json") 125 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 126 logger.Error("encode owner response", "err", err) 127 } 128 } 129} 130 131// buildkiteWebhookHandler receives Buildkite Pipelines webhook events, 132// authenticates the request against whichever scheme the provider was 133// configured with, and hands the decoded payload to the provider for 134// translation into a sh.tangled.pipeline.status publish. 135// 136// Authentication is intentionally fail-closed: when bk is nil (no 137// Buildkite provider configured) we 503 instead of accepting events 138// silently. The body is buffered up front because signature mode 139// HMACs the raw bytes — we can't rely on the JSON decoder reading 140// the request body before verification. 141// 142// Acknowledgement contract with Buildkite: we 200 on any well-formed 143// event we accepted (including events we deliberately ignore, like 144// job.* or builds we don't track), and 5xx only on internal failure 145// the operator should look at. A 4xx/5xx makes Buildkite retry, 146// which we don't want for "this isn't an event we care about". 147func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc { 148 return func(w http.ResponseWriter, r *http.Request) { 149 if bk == nil { 150 http.Error(w, "buildkite provider not configured", 151 http.StatusServiceUnavailable) 152 return 153 } 154 155 // Cap body size so a malicious sender can't exhaust 156 // memory; Buildkite payloads in practice are well under 157 // 64 KiB but a generous-but-bounded ceiling is the 158 // right shape here. 159 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) 160 if err != nil { 161 logger.Warn("buildkite webhook: read body", "err", err) 162 http.Error(w, "read body", http.StatusBadRequest) 163 return 164 } 165 166 if err := bk.VerifyWebhook(r.Header, body); err != nil { 167 logger.Warn("buildkite webhook: verify failed", 168 "err", err, 169 "remote", r.RemoteAddr, 170 ) 171 http.Error(w, "unauthorized", http.StatusUnauthorized) 172 return 173 } 174 175 var payload buildkite.WebhookPayload 176 if err := json.Unmarshal(body, &payload); err != nil { 177 logger.Warn("buildkite webhook: decode body", "err", err) 178 http.Error(w, "bad payload", http.StatusBadRequest) 179 return 180 } 181 // The X-Buildkite-Event header is authoritative for the 182 // event name; the body field is convenience but doesn't 183 // always match exactly. Prefer the header. 184 if h := r.Header.Get("X-Buildkite-Event"); h != "" { 185 payload.Event = h 186 } 187 188 // Translate + publish on the request context so a slow 189 // store/broker doesn't outlive an aborted webhook 190 // connection. 191 if err := bk.HandleWebhook(r.Context(), payload); err != nil { 192 logger.Error("buildkite webhook: handle", "err", err, 193 "event", payload.Event, 194 "build_uuid", payload.Build.ID, 195 ) 196 http.Error(w, "internal error", http.StatusInternalServerError) 197 return 198 } 199 w.WriteHeader(http.StatusOK) 200 } 201} 202 203// logsHandler serves captured workflow logs over a WebSocket, 204// matching the wire protocol of the upstream Tangled spindle so the 205// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in 206// source. The path shape is 207// 208// GET /logs/{knot}/{pipelineRkey}/{workflow} 209// 210// which matches the (knot, pipelineRkey, workflow) tuple 211// Provider.Spawn is invoked with — the same identity used in the 212// pipeline ATURI. Workflow names commonly contain a dot (e.g. 213// "test.yml"); ServeMux path patterns match a single segment, so the 214// literal value flows straight through r.PathValue. 215// 216// Wire shape per frame: a single TextMessage carrying one JSON 217// LogLine record (defined in provider.go; byte-compatible with 218// tangled.org/core/spindle/models.LogLine). The Provider hands us a 219// channel of LogLine values; we marshal each one and forward it as 220// one frame so the appview's per-line decode path works unchanged. 221// 222// Error mapping is intentionally done *before* the WebSocket upgrade: 223// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500 224// so the appview's websocket.DefaultDialer surfaces a real HTTP 225// status rather than an immediate close. 226func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc { 227 upgrader := websocket.Upgrader{ 228 ReadBufferSize: 1024, 229 WriteBufferSize: 1024, 230 } 231 return func(w http.ResponseWriter, r *http.Request) { 232 knot := r.PathValue("knot") 233 pipelineRkey := r.PathValue("pipelineRkey") 234 workflow := r.PathValue("workflow") 235 236 // Defensive: ServeMux won't match an empty segment, but a 237 // future router change shouldn't be allowed to silently 238 // produce an "all logs" query. 239 if knot == "" || pipelineRkey == "" || workflow == "" { 240 http.Error(w, "missing path component", http.StatusBadRequest) 241 return 242 } 243 244 // Establish the log channel BEFORE the WebSocket upgrade so 245 // ErrLogsNotFound / backend errors surface as a real HTTP 246 // status to the appview's dialer rather than as an immediate 247 // post-upgrade close. ctx scopes the producer's lifetime — 248 // it's cancelled below the moment the client disconnects. 249 ctx, cancel := context.WithCancel(r.Context()) 250 defer cancel() 251 252 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow) 253 if err != nil { 254 if errors.Is(err, ErrLogsNotFound) { 255 http.Error(w, "logs not found", http.StatusNotFound) 256 return 257 } 258 logger.Error("logs fetch failed", 259 "err", err, 260 "knot", knot, 261 "pipeline_rkey", pipelineRkey, 262 "workflow", workflow, 263 ) 264 http.Error(w, "logs unavailable", http.StatusInternalServerError) 265 return 266 } 267 268 conn, err := upgrader.Upgrade(w, r, nil) 269 if err != nil { 270 // Upgrade already wrote a response; just record the 271 // failure for diagnostics. 272 logger.Error("logs websocket upgrade failed", 273 "err", err, 274 "knot", knot, 275 "pipeline_rkey", pipelineRkey, 276 "workflow", workflow, 277 ) 278 return 279 } 280 defer func() { 281 // Send a close frame on the way out so the appview proxy 282 // sees a clean shutdown. Mirrors upstream 283 // spindle.(*Spindle).Logs. 284 _ = conn.WriteControl( 285 websocket.CloseMessage, 286 websocket.FormatCloseMessage( 287 websocket.CloseNormalClosure, "log stream complete", 288 ), 289 time.Now().Add(time.Second), 290 ) 291 conn.Close() 292 }() 293 294 // Detect client disconnect by trying to read; we don't expect 295 // any payloads from the client, so any read outcome (including 296 // EOF) signals the connection has gone away. The cancel hits 297 // the producer goroutine inside the Provider, which stops 298 // sending and closes ch — our drain loop then exits cleanly. 299 go func() { 300 for { 301 if _, _, err := conn.NextReader(); err != nil { 302 cancel() 303 return 304 } 305 } 306 }() 307 308 // Drain the channel; closure means the run is complete (or 309 // the producer hit ctx). Marshal-then-write each LogLine as 310 // a single TextMessage frame. 311 for { 312 select { 313 case <-ctx.Done(): 314 return 315 case line, ok := <-ch: 316 if !ok { 317 return 318 } 319 frame, err := json.Marshal(line) 320 if err != nil { 321 // The struct is fully internal; a marshal failure 322 // is a programmer bug. Log and bail rather than 323 // poison the stream with a half-frame. 324 logger.Error("marshal log line", 325 "err", err, 326 "knot", knot, 327 "pipeline_rkey", pipelineRkey, 328 "workflow", workflow, 329 ) 330 return 331 } 332 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 333 logger.Debug("logs frame write failed", 334 "err", err, 335 "knot", knot, 336 "pipeline_rkey", pipelineRkey, 337 "workflow", workflow, 338 ) 339 return 340 } 341 } 342 } 343 } 344} 345 346// eventsHandler upgrades to a WebSocket and streams persisted records 347// to the connected client. The wire protocol mirrors the upstream 348// Tangled spindle so the appview's eventconsumer treats us as a 349// drop-in source: 350// 351// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 352// means "from the beginning of our retained log". 353// - We do a backfill pass first (everything with created > cursor), 354// then loop: on each broker signal, drain new rows; on a 30s 355// timer, write a websocket ping so intermediaries don't idle the 356// connection out. 357// 358// We subscribe to the broker *before* the backfill so any Publish that 359// races between the cursor read and the loop entry is captured by the 360// pending channel signal — the loop will see it on its first iteration 361// and call streamEvents again, which is idempotent on the cursor. 362func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 363 upgrader := websocket.Upgrader{ 364 ReadBufferSize: 1024, 365 WriteBufferSize: 1024, 366 } 367 return func(w http.ResponseWriter, r *http.Request) { 368 conn, err := upgrader.Upgrade(w, r, nil) 369 if err != nil { 370 logger.Error("websocket upgrade failed", "err", err) 371 return 372 } 373 defer conn.Close() 374 375 // Parse the resume cursor up front. An unparseable cursor is a 376 // client bug, but rather than 4xx the upgraded connection we 377 // log it and start from zero — same behaviour as the upstream 378 // spindle. 379 var cursor int64 380 if raw := r.URL.Query().Get("cursor"); raw != "" { 381 parsed, err := strconv.ParseInt(raw, 10, 64) 382 if err != nil { 383 logger.Warn("events: bad cursor, starting from 0", 384 "cursor", raw, "err", err, 385 ) 386 } else { 387 cursor = parsed 388 } 389 } 390 logger.Debug("events client connected", 391 "remote", r.RemoteAddr, "cursor", cursor, 392 ) 393 394 // Subscribe before the backfill so a Publish that races between 395 // the EventsAfter read and our select loop is captured by the 396 // pending channel signal — we'll re-drain on the first wake-up. 397 sig := br.Subscribe() 398 defer br.Unsubscribe(sig) 399 400 ctx, cancel := context.WithCancel(r.Context()) 401 defer cancel() 402 403 // Detect client disconnect by trying to read; we don't expect 404 // any payloads from the client, so any read outcome (including 405 // EOF) signals the connection has gone away. 406 go func() { 407 for { 408 if _, _, err := conn.NextReader(); err != nil { 409 cancel() 410 return 411 } 412 } 413 }() 414 415 // Initial backfill. If this fails the connection is unusable 416 // (we can't promise ordering after a partial write) so just 417 // return and let the client reconnect with the same cursor. 418 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 419 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 420 return 421 } 422 423 ticker := time.NewTicker(30 * time.Second) 424 defer ticker.Stop() 425 for { 426 select { 427 case <-ctx.Done(): 428 logger.Debug("events client disconnected", 429 "remote", r.RemoteAddr, "cursor", cursor, 430 ) 431 return 432 case <-sig: 433 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 434 logger.Debug("events stream ended", "err", err, "cursor", cursor) 435 return 436 } 437 case <-ticker.C: 438 if err := conn.WriteControl( 439 websocket.PingMessage, nil, 440 time.Now().Add(time.Second), 441 ); err != nil { 442 logger.Debug("events ping failed", "err", err) 443 return 444 } 445 } 446 } 447 } 448} 449 450// streamEvents drains every event row with `created > *cursor`, writes 451// each as a wire envelope frame, and advances *cursor in lockstep. The 452// cursor is updated *after* the write succeeds so a half-flushed batch 453// (interrupted by a websocket error) replays cleanly on the next 454// connection. 455// 456// It is safe to call repeatedly: when there are no new rows the query 457// returns an empty slice and we noop. 458func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 459 rows, err := st.EventsAfter(ctx, *cursor) 460 if err != nil { 461 return fmt.Errorf("read events: %w", err) 462 } 463 for _, row := range rows { 464 frame, err := json.Marshal(eventsEnvelope{ 465 Rkey: row.Rkey, 466 Nsid: row.Nsid, 467 Event: row.EventJSON, 468 Created: row.Created, 469 }) 470 if err != nil { 471 return fmt.Errorf("marshal envelope: %w", err) 472 } 473 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 474 return fmt.Errorf("write frame: %w", err) 475 } 476 *cursor = row.Created 477 } 478 return nil 479}