Stitch any CI into Tangled
1package main
2
3// HTTP surface of the spindle.
4//
5// Four roles to keep in mind:
6//
7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
8// spindle registration to confirm the operator owns this instance.
9// 2. Event stream: the appview holds a long-lived websocket against
10// /events to receive sh.tangled.pipeline.status frames as builds
11// progress. Today this is just a keep-alive; payloads land once the
12// Buildkite webhook receiver is wired up.
13// 3. Webhooks: Buildkite POSTs build/job state changes to
14// /webhooks/buildkite, which we'll translate into pipeline.status
15// events on (2).
16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the
17// configured Provider so the appview (or a curling operator) can
18// pull captured workflow output for a specific run.
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "io"
26 "log/slog"
27 "net/http"
28 "strconv"
29 "time"
30
31 "github.com/gorilla/websocket"
32 "tangled.org/core/api/tangled"
33
34 "go.mitchellh.com/tack/internal/buildkite"
35)
36
37// runHTTP starts the spindle's HTTP server and blocks until ctx is
38// cancelled or the listener returns a fatal error. On ctx cancellation it
39// performs a graceful shutdown with a bounded timeout.
40//
41// The logger is read from ctx via loggerFrom. The broker is the
42// in-process pub/sub used by /events to fan published records out to
43// connected websocket subscribers. bkProvider may be nil — when a
44// deployment runs the fake provider, /webhooks/buildkite still
45// registers but responds 503, so a misdirected Buildkite webhook
46// gets a clear "this spindle isn't accepting Buildkite events" rather
47// than a misleading 200.
48func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error {
49 logger := loggerFrom(ctx)
50
51 mux := http.NewServeMux()
52 mux.HandleFunc("GET /", rootHandler())
53 mux.HandleFunc("GET /events", eventsHandler(logger, br))
54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider))
55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
56 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider))
57
58 srv := &http.Server{
59 Addr: cfg.Addr,
60 Handler: mux,
61 ReadHeaderTimeout: 5 * time.Second,
62 }
63
64 // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
65 errCh := make(chan error, 1)
66 go func() {
67 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
68 errCh <- srv.ListenAndServe()
69 }()
70
71 select {
72 case <-ctx.Done():
73 logger.Info("shutting down")
74 case err := <-errCh:
75 // ErrServerClosed means we shut ourselves down cleanly elsewhere
76 // — anything else is a real failure to report.
77 if err != nil && !errors.Is(err, http.ErrServerClosed) {
78 return fmt.Errorf("http server: %w", err)
79 }
80 }
81
82 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
83 defer cancel()
84 return srv.Shutdown(shutdownCtx)
85}
86
87// rootHandler responds at "/" with a friendly identifier page. Mainly
88// useful as a liveness check during deployment, and as a calling card
89// for the curious operator who hits the root in a browser.
90func rootHandler() http.HandlerFunc {
91 // Plain-text banner. "tack" rendered in figlet's larry3d font; the
92 // shape mirrors the style the bluesky PDS serves at its own root,
93 // which feels like the right vibe for an atproto-adjacent service.
94 const banner = ` __ __
95/\ \__ /\ \
96\ \ ,_\ __ ___\ \ \/'\
97 \ \ \/ /'__'\ /'___\ \ , <
98 \ \ \_/\ \L\.\_/\ \__/\ \ \\'\
99 \ \__\ \__/.\_\ \____\\ \_\ \_\
100 \/__/\/__/\/_/\/____/ \/_/\/_/
101
102
103This is a tack server: a tangled spindle for other CI services.
104
105 Code: https://github.com/mitchellh/tack
106`
107 return func(w http.ResponseWriter, r *http.Request) {
108 // Only respond at exactly "/" — without this guard, the
109 // "GET /" pattern would also catch arbitrary unmatched
110 // paths like "/foo" and lie about being the root.
111 if r.URL.Path != "/" {
112 http.NotFound(w, r)
113 return
114 }
115 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
116 fmt.Fprint(w, banner)
117 }
118}
119
120// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
121// this spindle's owner during registration.
122func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
123 return func(w http.ResponseWriter, r *http.Request) {
124 w.Header().Set("Content-Type", "application/json")
125 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
126 logger.Error("encode owner response", "err", err)
127 }
128 }
129}
130
131// buildkiteWebhookHandler receives Buildkite Pipelines webhook events,
132// authenticates the request against whichever scheme the provider was
133// configured with, and hands the decoded payload to the provider for
134// translation into a sh.tangled.pipeline.status publish.
135//
136// Authentication is intentionally fail-closed: when bk is nil (no
137// Buildkite provider configured) we 503 instead of accepting events
138// silently. The body is buffered up front because signature mode
139// HMACs the raw bytes — we can't rely on the JSON decoder reading
140// the request body before verification.
141//
142// Acknowledgement contract with Buildkite: we 200 on any well-formed
143// event we accepted (including events we deliberately ignore, like
144// job.* or builds we don't track), and 5xx only on internal failure
145// the operator should look at. A 4xx/5xx makes Buildkite retry,
146// which we don't want for "this isn't an event we care about".
147func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc {
148 return func(w http.ResponseWriter, r *http.Request) {
149 if bk == nil {
150 http.Error(w, "buildkite provider not configured",
151 http.StatusServiceUnavailable)
152 return
153 }
154
155 // Cap body size so a malicious sender can't exhaust
156 // memory; Buildkite payloads in practice are well under
157 // 64 KiB but a generous-but-bounded ceiling is the
158 // right shape here.
159 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
160 if err != nil {
161 logger.Warn("buildkite webhook: read body", "err", err)
162 http.Error(w, "read body", http.StatusBadRequest)
163 return
164 }
165
166 if err := bk.VerifyWebhook(r.Header, body); err != nil {
167 logger.Warn("buildkite webhook: verify failed",
168 "err", err,
169 "remote", r.RemoteAddr,
170 )
171 http.Error(w, "unauthorized", http.StatusUnauthorized)
172 return
173 }
174
175 var payload buildkite.WebhookPayload
176 if err := json.Unmarshal(body, &payload); err != nil {
177 logger.Warn("buildkite webhook: decode body", "err", err)
178 http.Error(w, "bad payload", http.StatusBadRequest)
179 return
180 }
181 // The X-Buildkite-Event header is authoritative for the
182 // event name; the body field is convenience but doesn't
183 // always match exactly. Prefer the header.
184 if h := r.Header.Get("X-Buildkite-Event"); h != "" {
185 payload.Event = h
186 }
187
188 // Translate + publish on the request context so a slow
189 // store/broker doesn't outlive an aborted webhook
190 // connection.
191 if err := bk.HandleWebhook(r.Context(), payload); err != nil {
192 logger.Error("buildkite webhook: handle", "err", err,
193 "event", payload.Event,
194 "build_uuid", payload.Build.ID,
195 )
196 http.Error(w, "internal error", http.StatusInternalServerError)
197 return
198 }
199 w.WriteHeader(http.StatusOK)
200 }
201}
202
203// logsHandler serves captured workflow logs over a WebSocket,
204// matching the wire protocol of the upstream Tangled spindle so the
205// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in
206// source. The path shape is
207//
208// GET /logs/{knot}/{pipelineRkey}/{workflow}
209//
210// which matches the (knot, pipelineRkey, workflow) tuple
211// Provider.Spawn is invoked with — the same identity used in the
212// pipeline ATURI. Workflow names commonly contain a dot (e.g.
213// "test.yml"); ServeMux path patterns match a single segment, so the
214// literal value flows straight through r.PathValue.
215//
216// Wire shape per frame: a single TextMessage carrying one JSON
217// LogLine record (defined in provider.go; byte-compatible with
218// tangled.org/core/spindle/models.LogLine). The Provider hands us a
219// channel of LogLine values; we marshal each one and forward it as
220// one frame so the appview's per-line decode path works unchanged.
221//
222// Error mapping is intentionally done *before* the WebSocket upgrade:
223// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500
224// so the appview's websocket.DefaultDialer surfaces a real HTTP
225// status rather than an immediate close.
226func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc {
227 upgrader := websocket.Upgrader{
228 ReadBufferSize: 1024,
229 WriteBufferSize: 1024,
230 }
231 return func(w http.ResponseWriter, r *http.Request) {
232 knot := r.PathValue("knot")
233 pipelineRkey := r.PathValue("pipelineRkey")
234 workflow := r.PathValue("workflow")
235
236 // Defensive: ServeMux won't match an empty segment, but a
237 // future router change shouldn't be allowed to silently
238 // produce an "all logs" query.
239 if knot == "" || pipelineRkey == "" || workflow == "" {
240 http.Error(w, "missing path component", http.StatusBadRequest)
241 return
242 }
243
244 // Establish the log channel BEFORE the WebSocket upgrade so
245 // ErrLogsNotFound / backend errors surface as a real HTTP
246 // status to the appview's dialer rather than as an immediate
247 // post-upgrade close. ctx scopes the producer's lifetime —
248 // it's cancelled below the moment the client disconnects.
249 ctx, cancel := context.WithCancel(r.Context())
250 defer cancel()
251
252 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow)
253 if err != nil {
254 if errors.Is(err, ErrLogsNotFound) {
255 http.Error(w, "logs not found", http.StatusNotFound)
256 return
257 }
258 logger.Error("logs fetch failed",
259 "err", err,
260 "knot", knot,
261 "pipeline_rkey", pipelineRkey,
262 "workflow", workflow,
263 )
264 http.Error(w, "logs unavailable", http.StatusInternalServerError)
265 return
266 }
267
268 conn, err := upgrader.Upgrade(w, r, nil)
269 if err != nil {
270 // Upgrade already wrote a response; just record the
271 // failure for diagnostics.
272 logger.Error("logs websocket upgrade failed",
273 "err", err,
274 "knot", knot,
275 "pipeline_rkey", pipelineRkey,
276 "workflow", workflow,
277 )
278 return
279 }
280 defer func() {
281 // Send a close frame on the way out so the appview proxy
282 // sees a clean shutdown. Mirrors upstream
283 // spindle.(*Spindle).Logs.
284 _ = conn.WriteControl(
285 websocket.CloseMessage,
286 websocket.FormatCloseMessage(
287 websocket.CloseNormalClosure, "log stream complete",
288 ),
289 time.Now().Add(time.Second),
290 )
291 conn.Close()
292 }()
293
294 // Detect client disconnect by trying to read; we don't expect
295 // any payloads from the client, so any read outcome (including
296 // EOF) signals the connection has gone away. The cancel hits
297 // the producer goroutine inside the Provider, which stops
298 // sending and closes ch — our drain loop then exits cleanly.
299 go func() {
300 for {
301 if _, _, err := conn.NextReader(); err != nil {
302 cancel()
303 return
304 }
305 }
306 }()
307
308 // Drain the channel; closure means the run is complete (or
309 // the producer hit ctx). Marshal-then-write each LogLine as
310 // a single TextMessage frame.
311 for {
312 select {
313 case <-ctx.Done():
314 return
315 case line, ok := <-ch:
316 if !ok {
317 return
318 }
319 frame, err := json.Marshal(line)
320 if err != nil {
321 // The struct is fully internal; a marshal failure
322 // is a programmer bug. Log and bail rather than
323 // poison the stream with a half-frame.
324 logger.Error("marshal log line",
325 "err", err,
326 "knot", knot,
327 "pipeline_rkey", pipelineRkey,
328 "workflow", workflow,
329 )
330 return
331 }
332 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
333 logger.Debug("logs frame write failed",
334 "err", err,
335 "knot", knot,
336 "pipeline_rkey", pipelineRkey,
337 "workflow", workflow,
338 )
339 return
340 }
341 }
342 }
343 }
344}
345
346// eventsHandler upgrades to a WebSocket and streams persisted records
347// to the connected client. The wire protocol mirrors the upstream
348// Tangled spindle so the appview's eventconsumer treats us as a
349// drop-in source:
350//
351// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
352// means "from the beginning of our retained log".
353// - We do a backfill pass first (everything with created > cursor),
354// then loop: on each broker signal, drain new rows; on a 30s
355// timer, write a websocket ping so intermediaries don't idle the
356// connection out.
357//
358// We subscribe to the broker *before* the backfill so any Publish that
359// races between the cursor read and the loop entry is captured by the
360// pending channel signal — the loop will see it on its first iteration
361// and call streamEvents again, which is idempotent on the cursor.
362func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
363 upgrader := websocket.Upgrader{
364 ReadBufferSize: 1024,
365 WriteBufferSize: 1024,
366 }
367 return func(w http.ResponseWriter, r *http.Request) {
368 conn, err := upgrader.Upgrade(w, r, nil)
369 if err != nil {
370 logger.Error("websocket upgrade failed", "err", err)
371 return
372 }
373 defer conn.Close()
374
375 // Parse the resume cursor up front. An unparseable cursor is a
376 // client bug, but rather than 4xx the upgraded connection we
377 // log it and start from zero — same behaviour as the upstream
378 // spindle.
379 var cursor int64
380 if raw := r.URL.Query().Get("cursor"); raw != "" {
381 parsed, err := strconv.ParseInt(raw, 10, 64)
382 if err != nil {
383 logger.Warn("events: bad cursor, starting from 0",
384 "cursor", raw, "err", err,
385 )
386 } else {
387 cursor = parsed
388 }
389 }
390 logger.Debug("events client connected",
391 "remote", r.RemoteAddr, "cursor", cursor,
392 )
393
394 // Subscribe before the backfill so a Publish that races between
395 // the EventsAfter read and our select loop is captured by the
396 // pending channel signal — we'll re-drain on the first wake-up.
397 sig := br.Subscribe()
398 defer br.Unsubscribe(sig)
399
400 ctx, cancel := context.WithCancel(r.Context())
401 defer cancel()
402
403 // Detect client disconnect by trying to read; we don't expect
404 // any payloads from the client, so any read outcome (including
405 // EOF) signals the connection has gone away.
406 go func() {
407 for {
408 if _, _, err := conn.NextReader(); err != nil {
409 cancel()
410 return
411 }
412 }
413 }()
414
415 // Initial backfill. If this fails the connection is unusable
416 // (we can't promise ordering after a partial write) so just
417 // return and let the client reconnect with the same cursor.
418 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
419 logger.Debug("events backfill ended", "err", err, "cursor", cursor)
420 return
421 }
422
423 ticker := time.NewTicker(30 * time.Second)
424 defer ticker.Stop()
425 for {
426 select {
427 case <-ctx.Done():
428 logger.Debug("events client disconnected",
429 "remote", r.RemoteAddr, "cursor", cursor,
430 )
431 return
432 case <-sig:
433 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
434 logger.Debug("events stream ended", "err", err, "cursor", cursor)
435 return
436 }
437 case <-ticker.C:
438 if err := conn.WriteControl(
439 websocket.PingMessage, nil,
440 time.Now().Add(time.Second),
441 ); err != nil {
442 logger.Debug("events ping failed", "err", err)
443 return
444 }
445 }
446 }
447 }
448}
449
450// streamEvents drains every event row with `created > *cursor`, writes
451// each as a wire envelope frame, and advances *cursor in lockstep. The
452// cursor is updated *after* the write succeeds so a half-flushed batch
453// (interrupted by a websocket error) replays cleanly on the next
454// connection.
455//
456// It is safe to call repeatedly: when there are no new rows the query
457// returns an empty slice and we noop.
458func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
459 rows, err := st.EventsAfter(ctx, *cursor)
460 if err != nil {
461 return fmt.Errorf("read events: %w", err)
462 }
463 for _, row := range rows {
464 frame, err := json.Marshal(eventsEnvelope{
465 Rkey: row.Rkey,
466 Nsid: row.Nsid,
467 Event: row.EventJSON,
468 Created: row.Created,
469 })
470 if err != nil {
471 return fmt.Errorf("marshal envelope: %w", err)
472 }
473 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
474 return fmt.Errorf("write frame: %w", err)
475 }
476 *cursor = row.Created
477 }
478 return nil
479}