Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// HTTP surface of the spindle. 4// 5// Four roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the 17// configured Provider so the appview (or a curling operator) can 18// pull captured workflow output for a specific run. 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "io" 26 "log/slog" 27 "net/http" 28 "strconv" 29 "time" 30 31 "github.com/gorilla/websocket" 32 "tangled.org/core/api/tangled" 33 34 "go.mitchellh.com/tack/internal/buildkite" 35) 36 37// runHTTP starts the spindle's HTTP server and blocks until ctx is 38// cancelled or the listener returns a fatal error. On ctx cancellation it 39// performs a graceful shutdown with a bounded timeout. 40// 41// The logger is read from ctx via loggerFrom. The broker is the 42// in-process pub/sub used by /events to fan published records out to 43// connected websocket subscribers. bkProvider may be nil — when a 44// deployment runs the fake provider, /webhooks/buildkite still 45// registers but responds 503, so a misdirected Buildkite webhook 46// gets a clear "this spindle isn't accepting Buildkite events" rather 47// than a misleading 200. 48func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error { 49 logger := loggerFrom(ctx) 50 51 mux := http.NewServeMux() 52 mux.HandleFunc("GET /", rootHandler()) 53 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 56 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider)) 57 58 srv := &http.Server{ 59 Addr: cfg.Addr, 60 Handler: mux, 61 ReadHeaderTimeout: 5 * time.Second, 62 } 63 64 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 65 errCh := make(chan error, 1) 66 go func() { 67 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 68 errCh <- srv.ListenAndServe() 69 }() 70 71 select { 72 case <-ctx.Done(): 73 logger.Info("shutting down") 74 case err := <-errCh: 75 // ErrServerClosed means we shut ourselves down cleanly elsewhere 76 // — anything else is a real failure to report. 77 if err != nil && !errors.Is(err, http.ErrServerClosed) { 78 return fmt.Errorf("http server: %w", err) 79 } 80 } 81 82 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 83 defer cancel() 84 return srv.Shutdown(shutdownCtx) 85} 86 87// rootHandler responds at "/" with a friendly identifier page. Mainly 88// useful as a liveness check during deployment, and as a calling card 89// for the curious operator who hits the root in a browser. 90func rootHandler() http.HandlerFunc { 91 // Plain-text banner. "tack" rendered in figlet's larry3d font; the 92 // shape mirrors the style the bluesky PDS serves at its own root, 93 // which feels like the right vibe for an atproto-adjacent service. 94 // The two backticks in the larry3d output collide with Go's raw 95 // string delimiter, so we concatenate them in as interpreted 96 // substrings to keep the art byte-identical to figlet's output. 97 const banner = ` __ __ 98/\ \__ /\ \ 99\ \ ,_\ __ ___\ \ \/'\ 100 \ \ \/ /'__` + "`" + `\ /'___\ \ , < 101 \ \ \_/\ \L\.\_/\ \__/\ \ \\` + "`" + `\ 102 \ \__\ \__/.\_\ \____\\ \_\ \_\ 103 \/__/\/__/\/_/\/____/ \/_/\/_/ 104 105 106This is a tack server: a tangled spindle for other CI services. 107 108Most API routes are under /xrpc/ 109 110 Code: https://github.com/mitchellh/tack 111` 112 return func(w http.ResponseWriter, r *http.Request) { 113 // Only respond at exactly "/" — without this guard, the 114 // "GET /" pattern would also catch arbitrary unmatched 115 // paths like "/foo" and lie about being the root. 116 if r.URL.Path != "/" { 117 http.NotFound(w, r) 118 return 119 } 120 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 121 fmt.Fprint(w, banner) 122 } 123} 124 125// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 126// this spindle's owner during registration. 127func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 128 return func(w http.ResponseWriter, r *http.Request) { 129 w.Header().Set("Content-Type", "application/json") 130 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 131 logger.Error("encode owner response", "err", err) 132 } 133 } 134} 135 136// buildkiteWebhookHandler receives Buildkite Pipelines webhook events, 137// authenticates the request against whichever scheme the provider was 138// configured with, and hands the decoded payload to the provider for 139// translation into a sh.tangled.pipeline.status publish. 140// 141// Authentication is intentionally fail-closed: when bk is nil (no 142// Buildkite provider configured) we 503 instead of accepting events 143// silently. The body is buffered up front because signature mode 144// HMACs the raw bytes — we can't rely on the JSON decoder reading 145// the request body before verification. 146// 147// Acknowledgement contract with Buildkite: we 200 on any well-formed 148// event we accepted (including events we deliberately ignore, like 149// job.* or builds we don't track), and 5xx only on internal failure 150// the operator should look at. A 4xx/5xx makes Buildkite retry, 151// which we don't want for "this isn't an event we care about". 152func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc { 153 return func(w http.ResponseWriter, r *http.Request) { 154 if bk == nil { 155 http.Error(w, "buildkite provider not configured", 156 http.StatusServiceUnavailable) 157 return 158 } 159 160 // Cap body size so a malicious sender can't exhaust 161 // memory; Buildkite payloads in practice are well under 162 // 64 KiB but a generous-but-bounded ceiling is the 163 // right shape here. 164 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) 165 if err != nil { 166 logger.Warn("buildkite webhook: read body", "err", err) 167 http.Error(w, "read body", http.StatusBadRequest) 168 return 169 } 170 171 if err := bk.VerifyWebhook(r.Header, body); err != nil { 172 logger.Warn("buildkite webhook: verify failed", 173 "err", err, 174 "remote", r.RemoteAddr, 175 ) 176 http.Error(w, "unauthorized", http.StatusUnauthorized) 177 return 178 } 179 180 var payload buildkite.WebhookPayload 181 if err := json.Unmarshal(body, &payload); err != nil { 182 logger.Warn("buildkite webhook: decode body", "err", err) 183 http.Error(w, "bad payload", http.StatusBadRequest) 184 return 185 } 186 // The X-Buildkite-Event header is authoritative for the 187 // event name; the body field is convenience but doesn't 188 // always match exactly. Prefer the header. 189 if h := r.Header.Get("X-Buildkite-Event"); h != "" { 190 payload.Event = h 191 } 192 193 // Translate + publish on the request context so a slow 194 // store/broker doesn't outlive an aborted webhook 195 // connection. 196 if err := bk.HandleWebhook(r.Context(), payload); err != nil { 197 logger.Error("buildkite webhook: handle", "err", err, 198 "event", payload.Event, 199 "build_uuid", payload.Build.ID, 200 ) 201 http.Error(w, "internal error", http.StatusInternalServerError) 202 return 203 } 204 w.WriteHeader(http.StatusOK) 205 } 206} 207 208// logsHandler serves captured workflow logs over a WebSocket, 209// matching the wire protocol of the upstream Tangled spindle so the 210// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in 211// source. The path shape is 212// 213// GET /logs/{knot}/{pipelineRkey}/{workflow} 214// 215// which matches the (knot, pipelineRkey, workflow) tuple 216// Provider.Spawn is invoked with — the same identity used in the 217// pipeline ATURI. Workflow names commonly contain a dot (e.g. 218// "test.yml"); ServeMux path patterns match a single segment, so the 219// literal value flows straight through r.PathValue. 220// 221// Wire shape per frame: a single TextMessage carrying one JSON 222// LogLine record (defined in provider.go; byte-compatible with 223// tangled.org/core/spindle/models.LogLine). The Provider hands us a 224// channel of LogLine values; we marshal each one and forward it as 225// one frame so the appview's per-line decode path works unchanged. 226// 227// Error mapping is intentionally done *before* the WebSocket upgrade: 228// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500 229// so the appview's websocket.DefaultDialer surfaces a real HTTP 230// status rather than an immediate close. 231func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc { 232 upgrader := websocket.Upgrader{ 233 ReadBufferSize: 1024, 234 WriteBufferSize: 1024, 235 } 236 return func(w http.ResponseWriter, r *http.Request) { 237 knot := r.PathValue("knot") 238 pipelineRkey := r.PathValue("pipelineRkey") 239 workflow := r.PathValue("workflow") 240 241 // Defensive: ServeMux won't match an empty segment, but a 242 // future router change shouldn't be allowed to silently 243 // produce an "all logs" query. 244 if knot == "" || pipelineRkey == "" || workflow == "" { 245 http.Error(w, "missing path component", http.StatusBadRequest) 246 return 247 } 248 249 // Establish the log channel BEFORE the WebSocket upgrade so 250 // ErrLogsNotFound / backend errors surface as a real HTTP 251 // status to the appview's dialer rather than as an immediate 252 // post-upgrade close. ctx scopes the producer's lifetime — 253 // it's cancelled below the moment the client disconnects. 254 ctx, cancel := context.WithCancel(r.Context()) 255 defer cancel() 256 257 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow) 258 if err != nil { 259 if errors.Is(err, ErrLogsNotFound) { 260 http.Error(w, "logs not found", http.StatusNotFound) 261 return 262 } 263 logger.Error("logs fetch failed", 264 "err", err, 265 "knot", knot, 266 "pipeline_rkey", pipelineRkey, 267 "workflow", workflow, 268 ) 269 http.Error(w, "logs unavailable", http.StatusInternalServerError) 270 return 271 } 272 273 conn, err := upgrader.Upgrade(w, r, nil) 274 if err != nil { 275 // Upgrade already wrote a response; just record the 276 // failure for diagnostics. 277 logger.Error("logs websocket upgrade failed", 278 "err", err, 279 "knot", knot, 280 "pipeline_rkey", pipelineRkey, 281 "workflow", workflow, 282 ) 283 return 284 } 285 defer func() { 286 // Send a close frame on the way out so the appview proxy 287 // sees a clean shutdown. Mirrors upstream 288 // spindle.(*Spindle).Logs. 289 _ = conn.WriteControl( 290 websocket.CloseMessage, 291 websocket.FormatCloseMessage( 292 websocket.CloseNormalClosure, "log stream complete", 293 ), 294 time.Now().Add(time.Second), 295 ) 296 conn.Close() 297 }() 298 299 // Detect client disconnect by trying to read; we don't expect 300 // any payloads from the client, so any read outcome (including 301 // EOF) signals the connection has gone away. The cancel hits 302 // the producer goroutine inside the Provider, which stops 303 // sending and closes ch — our drain loop then exits cleanly. 304 go func() { 305 for { 306 if _, _, err := conn.NextReader(); err != nil { 307 cancel() 308 return 309 } 310 } 311 }() 312 313 // Drain the channel; closure means the run is complete (or 314 // the producer hit ctx). Marshal-then-write each LogLine as 315 // a single TextMessage frame. 316 for { 317 select { 318 case <-ctx.Done(): 319 return 320 case line, ok := <-ch: 321 if !ok { 322 return 323 } 324 frame, err := json.Marshal(line) 325 if err != nil { 326 // The struct is fully internal; a marshal failure 327 // is a programmer bug. Log and bail rather than 328 // poison the stream with a half-frame. 329 logger.Error("marshal log line", 330 "err", err, 331 "knot", knot, 332 "pipeline_rkey", pipelineRkey, 333 "workflow", workflow, 334 ) 335 return 336 } 337 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 338 logger.Debug("logs frame write failed", 339 "err", err, 340 "knot", knot, 341 "pipeline_rkey", pipelineRkey, 342 "workflow", workflow, 343 ) 344 return 345 } 346 } 347 } 348 } 349} 350 351// eventsHandler upgrades to a WebSocket and streams persisted records 352// to the connected client. The wire protocol mirrors the upstream 353// Tangled spindle so the appview's eventconsumer treats us as a 354// drop-in source: 355// 356// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 357// means "from the beginning of our retained log". 358// - We do a backfill pass first (everything with created > cursor), 359// then loop: on each broker signal, drain new rows; on a 30s 360// timer, write a websocket ping so intermediaries don't idle the 361// connection out. 362// 363// We subscribe to the broker *before* the backfill so any Publish that 364// races between the cursor read and the loop entry is captured by the 365// pending channel signal — the loop will see it on its first iteration 366// and call streamEvents again, which is idempotent on the cursor. 367func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 368 upgrader := websocket.Upgrader{ 369 ReadBufferSize: 1024, 370 WriteBufferSize: 1024, 371 } 372 return func(w http.ResponseWriter, r *http.Request) { 373 conn, err := upgrader.Upgrade(w, r, nil) 374 if err != nil { 375 logger.Error("websocket upgrade failed", "err", err) 376 return 377 } 378 defer conn.Close() 379 380 // Parse the resume cursor up front. An unparseable cursor is a 381 // client bug, but rather than 4xx the upgraded connection we 382 // log it and start from zero — same behaviour as the upstream 383 // spindle. 384 var cursor int64 385 if raw := r.URL.Query().Get("cursor"); raw != "" { 386 parsed, err := strconv.ParseInt(raw, 10, 64) 387 if err != nil { 388 logger.Warn("events: bad cursor, starting from 0", 389 "cursor", raw, "err", err, 390 ) 391 } else { 392 cursor = parsed 393 } 394 } 395 logger.Debug("events client connected", 396 "remote", r.RemoteAddr, "cursor", cursor, 397 ) 398 399 // Subscribe before the backfill so a Publish that races between 400 // the EventsAfter read and our select loop is captured by the 401 // pending channel signal — we'll re-drain on the first wake-up. 402 sig := br.Subscribe() 403 defer br.Unsubscribe(sig) 404 405 ctx, cancel := context.WithCancel(r.Context()) 406 defer cancel() 407 408 // Detect client disconnect by trying to read; we don't expect 409 // any payloads from the client, so any read outcome (including 410 // EOF) signals the connection has gone away. 411 go func() { 412 for { 413 if _, _, err := conn.NextReader(); err != nil { 414 cancel() 415 return 416 } 417 } 418 }() 419 420 // Initial backfill. If this fails the connection is unusable 421 // (we can't promise ordering after a partial write) so just 422 // return and let the client reconnect with the same cursor. 423 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 424 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 425 return 426 } 427 428 ticker := time.NewTicker(30 * time.Second) 429 defer ticker.Stop() 430 for { 431 select { 432 case <-ctx.Done(): 433 logger.Debug("events client disconnected", 434 "remote", r.RemoteAddr, "cursor", cursor, 435 ) 436 return 437 case <-sig: 438 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 439 logger.Debug("events stream ended", "err", err, "cursor", cursor) 440 return 441 } 442 case <-ticker.C: 443 if err := conn.WriteControl( 444 websocket.PingMessage, nil, 445 time.Now().Add(time.Second), 446 ); err != nil { 447 logger.Debug("events ping failed", "err", err) 448 return 449 } 450 } 451 } 452 } 453} 454 455// streamEvents drains every event row with `created > *cursor`, writes 456// each as a wire envelope frame, and advances *cursor in lockstep. The 457// cursor is updated *after* the write succeeds so a half-flushed batch 458// (interrupted by a websocket error) replays cleanly on the next 459// connection. 460// 461// It is safe to call repeatedly: when there are no new rows the query 462// returns an empty slice and we noop. 463func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 464 rows, err := st.EventsAfter(ctx, *cursor) 465 if err != nil { 466 return fmt.Errorf("read events: %w", err) 467 } 468 for _, row := range rows { 469 frame, err := json.Marshal(eventsEnvelope{ 470 Rkey: row.Rkey, 471 Nsid: row.Nsid, 472 Event: row.EventJSON, 473 Created: row.Created, 474 }) 475 if err != nil { 476 return fmt.Errorf("marshal envelope: %w", err) 477 } 478 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 479 return fmt.Errorf("write frame: %w", err) 480 } 481 *cursor = row.Created 482 } 483 return nil 484}