Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// HTTP surface of the spindle. 4// 5// Four roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the 17// configured Provider so the appview (or a curling operator) can 18// pull captured workflow output for a specific run. 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "io" 26 "log/slog" 27 "net/http" 28 "strconv" 29 "time" 30 31 "github.com/gorilla/websocket" 32 "tangled.org/core/api/tangled" 33 34 "go.mitchellh.com/tack/internal/buildkite" 35) 36 37// wsWriteWait bounds how long any single websocket write (frame or 38// control) is allowed to block before we treat the peer as dead. A 39// client that stops reading but keeps the TCP connection open would 40// otherwise hang the handler indefinitely on a full kernel send buffer. 41// 10s is intentionally generous: real backpressure resolves in 42// milliseconds, so anything past that is a stuck peer we'd rather drop 43// than serve. 44const wsWriteWait = 10 * time.Second 45 46// runHTTP starts the spindle's HTTP server and blocks until ctx is 47// cancelled or the listener returns a fatal error. On ctx cancellation it 48// performs a graceful shutdown with a bounded timeout. 49// 50// The logger is read from ctx via loggerFrom. The broker is the 51// in-process pub/sub used by /events to fan published records out to 52// connected websocket subscribers. bkProvider may be nil — when a 53// deployment runs the fake provider, /webhooks/buildkite still 54// registers but responds 503, so a misdirected Buildkite webhook 55// gets a clear "this spindle isn't accepting Buildkite events" rather 56// than a misleading 200. 57func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error { 58 logger := loggerFrom(ctx) 59 60 mux := http.NewServeMux() 61 mux.HandleFunc("GET /", rootHandler()) 62 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 63 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 64 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 65 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider)) 66 67 srv := &http.Server{ 68 Addr: cfg.Addr, 69 Handler: mux, 70 ReadHeaderTimeout: 5 * time.Second, 71 } 72 73 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 74 errCh := make(chan error, 1) 75 go func() { 76 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 77 errCh <- srv.ListenAndServe() 78 }() 79 80 select { 81 case <-ctx.Done(): 82 logger.Info("shutting down") 83 case err := <-errCh: 84 // ErrServerClosed means we shut ourselves down cleanly elsewhere 85 // — anything else is a real failure to report. 86 if err != nil && !errors.Is(err, http.ErrServerClosed) { 87 return fmt.Errorf("http server: %w", err) 88 } 89 } 90 91 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 92 defer cancel() 93 return srv.Shutdown(shutdownCtx) 94} 95 96// rootHandler responds at "/" with a friendly identifier page. Mainly 97// useful as a liveness check during deployment, and as a calling card 98// for the curious operator who hits the root in a browser. 99func rootHandler() http.HandlerFunc { 100 // Plain-text banner. "tack" rendered in figlet's larry3d font; the 101 // shape mirrors the style the bluesky PDS serves at its own root, 102 // which feels like the right vibe for an atproto-adjacent service. 103 // The two backticks in the larry3d output collide with Go's raw 104 // string delimiter, so we concatenate them in as interpreted 105 // substrings to keep the art byte-identical to figlet's output. 106 const banner = ` __ __ 107/\ \__ /\ \ 108\ \ ,_\ __ ___\ \ \/'\ 109 \ \ \/ /'__` + "`" + `\ /'___\ \ , < 110 \ \ \_/\ \L\.\_/\ \__/\ \ \\` + "`" + `\ 111 \ \__\ \__/.\_\ \____\\ \_\ \_\ 112 \/__/\/__/\/_/\/____/ \/_/\/_/ 113 114 115This is a tack server: a tangled spindle for other CI services. 116 117Most API routes are under /xrpc/ 118 119 Code: https://github.com/mitchellh/tack 120` 121 return func(w http.ResponseWriter, r *http.Request) { 122 // Only respond at exactly "/" — without this guard, the 123 // "GET /" pattern would also catch arbitrary unmatched 124 // paths like "/foo" and lie about being the root. 125 if r.URL.Path != "/" { 126 http.NotFound(w, r) 127 return 128 } 129 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 130 fmt.Fprint(w, banner) 131 } 132} 133 134// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 135// this spindle's owner during registration. 136func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 137 return func(w http.ResponseWriter, r *http.Request) { 138 w.Header().Set("Content-Type", "application/json") 139 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 140 logger.Error("encode owner response", "err", err) 141 } 142 } 143} 144 145// buildkiteWebhookHandler receives Buildkite Pipelines webhook events, 146// authenticates the request against whichever scheme the provider was 147// configured with, and hands the decoded payload to the provider for 148// translation into a sh.tangled.pipeline.status publish. 149// 150// Authentication is intentionally fail-closed: when bk is nil (no 151// Buildkite provider configured) we 503 instead of accepting events 152// silently. The body is buffered up front because signature mode 153// HMACs the raw bytes — we can't rely on the JSON decoder reading 154// the request body before verification. 155// 156// Acknowledgement contract with Buildkite: we 200 on any well-formed 157// event we accepted (including events we deliberately ignore, like 158// job.* or builds we don't track), and 5xx only on internal failure 159// the operator should look at. A 4xx/5xx makes Buildkite retry, 160// which we don't want for "this isn't an event we care about". 161func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc { 162 return func(w http.ResponseWriter, r *http.Request) { 163 if bk == nil { 164 http.Error(w, "buildkite provider not configured", 165 http.StatusServiceUnavailable) 166 return 167 } 168 169 // Cap body size so a malicious sender can't exhaust 170 // memory; Buildkite payloads in practice are well under 171 // 64 KiB but a generous-but-bounded ceiling is the 172 // right shape here. 173 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) 174 if err != nil { 175 logger.Warn("buildkite webhook: read body", "err", err) 176 http.Error(w, "read body", http.StatusBadRequest) 177 return 178 } 179 180 if err := bk.VerifyWebhook(r.Header, body); err != nil { 181 logger.Warn("buildkite webhook: verify failed", 182 "err", err, 183 "remote", r.RemoteAddr, 184 ) 185 http.Error(w, "unauthorized", http.StatusUnauthorized) 186 return 187 } 188 189 var payload buildkite.WebhookPayload 190 if err := json.Unmarshal(body, &payload); err != nil { 191 logger.Warn("buildkite webhook: decode body", "err", err) 192 http.Error(w, "bad payload", http.StatusBadRequest) 193 return 194 } 195 // The X-Buildkite-Event header is authoritative for the 196 // event name; the body field is convenience but doesn't 197 // always match exactly. Prefer the header. 198 if h := r.Header.Get("X-Buildkite-Event"); h != "" { 199 payload.Event = h 200 } 201 202 // Translate + publish on the request context so a slow 203 // store/broker doesn't outlive an aborted webhook 204 // connection. 205 if err := bk.HandleWebhook(r.Context(), payload); err != nil { 206 logger.Error("buildkite webhook: handle", "err", err, 207 "event", payload.Event, 208 "build_uuid", payload.Build.ID, 209 ) 210 http.Error(w, "internal error", http.StatusInternalServerError) 211 return 212 } 213 w.WriteHeader(http.StatusOK) 214 } 215} 216 217// logsHandler serves captured workflow logs over a WebSocket, 218// matching the wire protocol of the upstream Tangled spindle so the 219// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in 220// source. The path shape is 221// 222// GET /logs/{knot}/{pipelineRkey}/{workflow} 223// 224// which matches the (knot, pipelineRkey, workflow) tuple 225// Provider.Spawn is invoked with — the same identity used in the 226// pipeline ATURI. Workflow names commonly contain a dot (e.g. 227// "test.yml"); ServeMux path patterns match a single segment, so the 228// literal value flows straight through r.PathValue. 229// 230// Wire shape per frame: a single TextMessage carrying one JSON 231// LogLine record (defined in provider.go; byte-compatible with 232// tangled.org/core/spindle/models.LogLine). The Provider hands us a 233// channel of LogLine values; we marshal each one and forward it as 234// one frame so the appview's per-line decode path works unchanged. 235// 236// Error mapping is intentionally done *before* the WebSocket upgrade: 237// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500 238// so the appview's websocket.DefaultDialer surfaces a real HTTP 239// status rather than an immediate close. 240func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc { 241 upgrader := websocket.Upgrader{ 242 ReadBufferSize: 1024, 243 WriteBufferSize: 1024, 244 } 245 return func(w http.ResponseWriter, r *http.Request) { 246 knot := r.PathValue("knot") 247 pipelineRkey := r.PathValue("pipelineRkey") 248 workflow := r.PathValue("workflow") 249 250 // Defensive: ServeMux won't match an empty segment, but a 251 // future router change shouldn't be allowed to silently 252 // produce an "all logs" query. 253 if knot == "" || pipelineRkey == "" || workflow == "" { 254 http.Error(w, "missing path component", http.StatusBadRequest) 255 return 256 } 257 258 // Establish the log channel BEFORE the WebSocket upgrade so 259 // ErrLogsNotFound / backend errors surface as a real HTTP 260 // status to the appview's dialer rather than as an immediate 261 // post-upgrade close. ctx scopes the producer's lifetime — 262 // it's cancelled below the moment the client disconnects. 263 ctx, cancel := context.WithCancel(r.Context()) 264 defer cancel() 265 266 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow) 267 if err != nil { 268 if errors.Is(err, ErrLogsNotFound) { 269 http.Error(w, "logs not found", http.StatusNotFound) 270 return 271 } 272 logger.Error("logs fetch failed", 273 "err", err, 274 "knot", knot, 275 "pipeline_rkey", pipelineRkey, 276 "workflow", workflow, 277 ) 278 http.Error(w, "logs unavailable", http.StatusInternalServerError) 279 return 280 } 281 282 conn, err := upgrader.Upgrade(w, r, nil) 283 if err != nil { 284 // Upgrade already wrote a response; just record the 285 // failure for diagnostics. 286 logger.Error("logs websocket upgrade failed", 287 "err", err, 288 "knot", knot, 289 "pipeline_rkey", pipelineRkey, 290 "workflow", workflow, 291 ) 292 return 293 } 294 defer func() { 295 // Send a close frame on the way out so the appview proxy 296 // sees a clean shutdown. Mirrors upstream 297 // spindle.(*Spindle).Logs. WriteControl honours the 298 // deadline argument directly, so a stuck peer can't hang 299 // us here. 300 _ = conn.WriteControl( 301 websocket.CloseMessage, 302 websocket.FormatCloseMessage( 303 websocket.CloseNormalClosure, "log stream complete", 304 ), 305 time.Now().Add(wsWriteWait), 306 ) 307 conn.Close() 308 }() 309 310 // Detect client disconnect by trying to read; we don't expect 311 // any payloads from the client, so any read outcome (including 312 // EOF) signals the connection has gone away. The cancel hits 313 // the producer goroutine inside the Provider, which stops 314 // sending and closes ch — our drain loop then exits cleanly. 315 go func() { 316 for { 317 if _, _, err := conn.NextReader(); err != nil { 318 cancel() 319 return 320 } 321 } 322 }() 323 324 // Drain the channel; closure means the run is complete (or 325 // the producer hit ctx). Marshal-then-write each LogLine as 326 // a single TextMessage frame. 327 for { 328 select { 329 case <-ctx.Done(): 330 return 331 case line, ok := <-ch: 332 if !ok { 333 return 334 } 335 frame, err := json.Marshal(line) 336 if err != nil { 337 // The struct is fully internal; a marshal failure 338 // is a programmer bug. Log and bail rather than 339 // poison the stream with a half-frame. 340 logger.Error("marshal log line", 341 "err", err, 342 "knot", knot, 343 "pipeline_rkey", pipelineRkey, 344 "workflow", workflow, 345 ) 346 return 347 } 348 // Bound the write so a client that stopped reading 349 // but kept the TCP connection open can't hang us on a 350 // full kernel send buffer. WriteMessage doesn't take a 351 // deadline argument the way WriteControl does — we 352 // have to set it on the conn before each frame. 353 if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { 354 logger.Debug("logs set write deadline failed", 355 "err", err, 356 "knot", knot, 357 "pipeline_rkey", pipelineRkey, 358 "workflow", workflow, 359 ) 360 return 361 } 362 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 363 logger.Debug("logs frame write failed", 364 "err", err, 365 "knot", knot, 366 "pipeline_rkey", pipelineRkey, 367 "workflow", workflow, 368 ) 369 return 370 } 371 } 372 } 373 } 374} 375 376// eventsHandler upgrades to a WebSocket and streams persisted records 377// to the connected client. The wire protocol mirrors the upstream 378// Tangled spindle so the appview's eventconsumer treats us as a 379// drop-in source: 380// 381// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 382// means "from the beginning of our retained log". 383// - We do a backfill pass first (everything with created > cursor), 384// then loop: on each broker signal, drain new rows; on a 30s 385// timer, write a websocket ping so intermediaries don't idle the 386// connection out. 387// 388// We subscribe to the broker *before* the backfill so any Publish that 389// races between the cursor read and the loop entry is captured by the 390// pending channel signal — the loop will see it on its first iteration 391// and call streamEvents again, which is idempotent on the cursor. 392func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 393 upgrader := websocket.Upgrader{ 394 ReadBufferSize: 1024, 395 WriteBufferSize: 1024, 396 } 397 return func(w http.ResponseWriter, r *http.Request) { 398 conn, err := upgrader.Upgrade(w, r, nil) 399 if err != nil { 400 logger.Error("websocket upgrade failed", "err", err) 401 return 402 } 403 defer conn.Close() 404 405 // Parse the resume cursor up front. An unparseable cursor is a 406 // client bug, but rather than 4xx the upgraded connection we 407 // log it and start from zero — same behaviour as the upstream 408 // spindle. 409 var cursor int64 410 if raw := r.URL.Query().Get("cursor"); raw != "" { 411 parsed, err := strconv.ParseInt(raw, 10, 64) 412 if err != nil { 413 logger.Warn("events: bad cursor, starting from 0", 414 "cursor", raw, "err", err, 415 ) 416 } else { 417 cursor = parsed 418 } 419 } 420 logger.Debug("events client connected", 421 "remote", r.RemoteAddr, "cursor", cursor, 422 ) 423 424 // Subscribe before the backfill so a Publish that races between 425 // the EventsAfter read and our select loop is captured by the 426 // pending channel signal — we'll re-drain on the first wake-up. 427 sig := br.Subscribe() 428 defer br.Unsubscribe(sig) 429 430 ctx, cancel := context.WithCancel(r.Context()) 431 defer cancel() 432 433 // Detect client disconnect by trying to read; we don't expect 434 // any payloads from the client, so any read outcome (including 435 // EOF) signals the connection has gone away. 436 go func() { 437 for { 438 if _, _, err := conn.NextReader(); err != nil { 439 cancel() 440 return 441 } 442 } 443 }() 444 445 // Initial backfill. If this fails the connection is unusable 446 // (we can't promise ordering after a partial write) so just 447 // return and let the client reconnect with the same cursor. 448 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 449 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 450 return 451 } 452 453 ticker := time.NewTicker(30 * time.Second) 454 defer ticker.Stop() 455 for { 456 select { 457 case <-ctx.Done(): 458 logger.Debug("events client disconnected", 459 "remote", r.RemoteAddr, "cursor", cursor, 460 ) 461 return 462 case <-sig: 463 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 464 logger.Debug("events stream ended", "err", err, "cursor", cursor) 465 return 466 } 467 case <-ticker.C: 468 // WriteControl takes its own deadline argument, so 469 // the ping itself can't hang us — but we still want a 470 // generous-but-bounded ceiling to match the per-frame 471 // write timeout. 472 if err := conn.WriteControl( 473 websocket.PingMessage, nil, 474 time.Now().Add(wsWriteWait), 475 ); err != nil { 476 logger.Debug("events ping failed", "err", err) 477 return 478 } 479 } 480 } 481 } 482} 483 484// streamEvents drains every event row with `created > *cursor`, writes 485// each as a wire envelope frame, and advances *cursor in lockstep. The 486// cursor is updated *after* the write succeeds so a half-flushed batch 487// (interrupted by a websocket error) replays cleanly on the next 488// connection. 489// 490// It is safe to call repeatedly: when there are no new rows the query 491// returns an empty slice and we noop. 492func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 493 rows, err := st.EventsAfter(ctx, *cursor) 494 if err != nil { 495 return fmt.Errorf("read events: %w", err) 496 } 497 for _, row := range rows { 498 frame, err := json.Marshal(eventsEnvelope{ 499 Rkey: row.Rkey, 500 Nsid: row.Nsid, 501 Event: row.EventJSON, 502 Created: row.Created, 503 }) 504 if err != nil { 505 return fmt.Errorf("marshal envelope: %w", err) 506 } 507 // Bound the per-frame write so a client that stopped reading 508 // (but didn't close the TCP connection) can't hang the 509 // handler on a full kernel send buffer. WriteMessage has no 510 // deadline argument of its own — we set it on the conn. 511 if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { 512 return fmt.Errorf("set write deadline: %w", err) 513 } 514 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 515 return fmt.Errorf("write frame: %w", err) 516 } 517 *cursor = row.Created 518 } 519 return nil 520}