Stitch any CI into Tangled
1package main
2
3// HTTP surface of the spindle.
4//
5// Four roles to keep in mind:
6//
7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
8// spindle registration to confirm the operator owns this instance.
9// 2. Event stream: the appview holds a long-lived websocket against
10// /events to receive sh.tangled.pipeline.status frames as builds
11// progress. Today this is just a keep-alive; payloads land once the
12// Buildkite webhook receiver is wired up.
13// 3. Webhooks: Buildkite POSTs build/job state changes to
14// /webhooks/buildkite, which we'll translate into pipeline.status
15// events on (2).
16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the
17// configured Provider so the appview (or a curling operator) can
18// pull captured workflow output for a specific run.
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "io"
26 "log/slog"
27 "net/http"
28 "strconv"
29 "time"
30
31 "github.com/gorilla/websocket"
32 "tangled.org/core/api/tangled"
33
34 "go.mitchellh.com/tack/internal/buildkite"
35)
36
37// runHTTP starts the spindle's HTTP server and blocks until ctx is
38// cancelled or the listener returns a fatal error. On ctx cancellation it
39// performs a graceful shutdown with a bounded timeout.
40//
41// The logger is read from ctx via loggerFrom. The broker is the
42// in-process pub/sub used by /events to fan published records out to
43// connected websocket subscribers. bkProvider may be nil — when a
44// deployment runs the fake provider, /webhooks/buildkite still
45// registers but responds 503, so a misdirected Buildkite webhook
46// gets a clear "this spindle isn't accepting Buildkite events" rather
47// than a misleading 200.
48func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error {
49 logger := loggerFrom(ctx)
50
51 mux := http.NewServeMux()
52 mux.HandleFunc("GET /", rootHandler())
53 mux.HandleFunc("GET /events", eventsHandler(logger, br))
54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider))
55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
56 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider))
57
58 srv := &http.Server{
59 Addr: cfg.Addr,
60 Handler: mux,
61 ReadHeaderTimeout: 5 * time.Second,
62 }
63
64 // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
65 errCh := make(chan error, 1)
66 go func() {
67 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
68 errCh <- srv.ListenAndServe()
69 }()
70
71 select {
72 case <-ctx.Done():
73 logger.Info("shutting down")
74 case err := <-errCh:
75 // ErrServerClosed means we shut ourselves down cleanly elsewhere
76 // — anything else is a real failure to report.
77 if err != nil && !errors.Is(err, http.ErrServerClosed) {
78 return fmt.Errorf("http server: %w", err)
79 }
80 }
81
82 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
83 defer cancel()
84 return srv.Shutdown(shutdownCtx)
85}
86
87// rootHandler responds at "/" with a friendly identifier page. Mainly
88// useful as a liveness check during deployment, and as a calling card
89// for the curious operator who hits the root in a browser.
90func rootHandler() http.HandlerFunc {
91 // Plain-text banner. "tack" rendered in figlet's larry3d font; the
92 // shape mirrors the style the bluesky PDS serves at its own root,
93 // which feels like the right vibe for an atproto-adjacent service.
94 // The two backticks in the larry3d output collide with Go's raw
95 // string delimiter, so we concatenate them in as interpreted
96 // substrings to keep the art byte-identical to figlet's output.
97 const banner = ` __ __
98/\ \__ /\ \
99\ \ ,_\ __ ___\ \ \/'\
100 \ \ \/ /'__` + "`" + `\ /'___\ \ , <
101 \ \ \_/\ \L\.\_/\ \__/\ \ \\` + "`" + `\
102 \ \__\ \__/.\_\ \____\\ \_\ \_\
103 \/__/\/__/\/_/\/____/ \/_/\/_/
104
105
106This is a tack server: a tangled spindle for other CI services.
107
108Most API routes are under /xrpc/
109
110 Code: https://github.com/mitchellh/tack
111`
112 return func(w http.ResponseWriter, r *http.Request) {
113 // Only respond at exactly "/" — without this guard, the
114 // "GET /" pattern would also catch arbitrary unmatched
115 // paths like "/foo" and lie about being the root.
116 if r.URL.Path != "/" {
117 http.NotFound(w, r)
118 return
119 }
120 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
121 fmt.Fprint(w, banner)
122 }
123}
124
125// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
126// this spindle's owner during registration.
127func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
128 return func(w http.ResponseWriter, r *http.Request) {
129 w.Header().Set("Content-Type", "application/json")
130 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
131 logger.Error("encode owner response", "err", err)
132 }
133 }
134}
135
136// buildkiteWebhookHandler receives Buildkite Pipelines webhook events,
137// authenticates the request against whichever scheme the provider was
138// configured with, and hands the decoded payload to the provider for
139// translation into a sh.tangled.pipeline.status publish.
140//
141// Authentication is intentionally fail-closed: when bk is nil (no
142// Buildkite provider configured) we 503 instead of accepting events
143// silently. The body is buffered up front because signature mode
144// HMACs the raw bytes — we can't rely on the JSON decoder reading
145// the request body before verification.
146//
147// Acknowledgement contract with Buildkite: we 200 on any well-formed
148// event we accepted (including events we deliberately ignore, like
149// job.* or builds we don't track), and 5xx only on internal failure
150// the operator should look at. A 4xx/5xx makes Buildkite retry,
151// which we don't want for "this isn't an event we care about".
152func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc {
153 return func(w http.ResponseWriter, r *http.Request) {
154 if bk == nil {
155 http.Error(w, "buildkite provider not configured",
156 http.StatusServiceUnavailable)
157 return
158 }
159
160 // Cap body size so a malicious sender can't exhaust
161 // memory; Buildkite payloads in practice are well under
162 // 64 KiB but a generous-but-bounded ceiling is the
163 // right shape here.
164 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
165 if err != nil {
166 logger.Warn("buildkite webhook: read body", "err", err)
167 http.Error(w, "read body", http.StatusBadRequest)
168 return
169 }
170
171 if err := bk.VerifyWebhook(r.Header, body); err != nil {
172 logger.Warn("buildkite webhook: verify failed",
173 "err", err,
174 "remote", r.RemoteAddr,
175 )
176 http.Error(w, "unauthorized", http.StatusUnauthorized)
177 return
178 }
179
180 var payload buildkite.WebhookPayload
181 if err := json.Unmarshal(body, &payload); err != nil {
182 logger.Warn("buildkite webhook: decode body", "err", err)
183 http.Error(w, "bad payload", http.StatusBadRequest)
184 return
185 }
186 // The X-Buildkite-Event header is authoritative for the
187 // event name; the body field is convenience but doesn't
188 // always match exactly. Prefer the header.
189 if h := r.Header.Get("X-Buildkite-Event"); h != "" {
190 payload.Event = h
191 }
192
193 // Translate + publish on the request context so a slow
194 // store/broker doesn't outlive an aborted webhook
195 // connection.
196 if err := bk.HandleWebhook(r.Context(), payload); err != nil {
197 logger.Error("buildkite webhook: handle", "err", err,
198 "event", payload.Event,
199 "build_uuid", payload.Build.ID,
200 )
201 http.Error(w, "internal error", http.StatusInternalServerError)
202 return
203 }
204 w.WriteHeader(http.StatusOK)
205 }
206}
207
208// logsHandler serves captured workflow logs over a WebSocket,
209// matching the wire protocol of the upstream Tangled spindle so the
210// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in
211// source. The path shape is
212//
213// GET /logs/{knot}/{pipelineRkey}/{workflow}
214//
215// which matches the (knot, pipelineRkey, workflow) tuple
216// Provider.Spawn is invoked with — the same identity used in the
217// pipeline ATURI. Workflow names commonly contain a dot (e.g.
218// "test.yml"); ServeMux path patterns match a single segment, so the
219// literal value flows straight through r.PathValue.
220//
221// Wire shape per frame: a single TextMessage carrying one JSON
222// LogLine record (defined in provider.go; byte-compatible with
223// tangled.org/core/spindle/models.LogLine). The Provider hands us a
224// channel of LogLine values; we marshal each one and forward it as
225// one frame so the appview's per-line decode path works unchanged.
226//
227// Error mapping is intentionally done *before* the WebSocket upgrade:
228// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500
229// so the appview's websocket.DefaultDialer surfaces a real HTTP
230// status rather than an immediate close.
231func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc {
232 upgrader := websocket.Upgrader{
233 ReadBufferSize: 1024,
234 WriteBufferSize: 1024,
235 }
236 return func(w http.ResponseWriter, r *http.Request) {
237 knot := r.PathValue("knot")
238 pipelineRkey := r.PathValue("pipelineRkey")
239 workflow := r.PathValue("workflow")
240
241 // Defensive: ServeMux won't match an empty segment, but a
242 // future router change shouldn't be allowed to silently
243 // produce an "all logs" query.
244 if knot == "" || pipelineRkey == "" || workflow == "" {
245 http.Error(w, "missing path component", http.StatusBadRequest)
246 return
247 }
248
249 // Establish the log channel BEFORE the WebSocket upgrade so
250 // ErrLogsNotFound / backend errors surface as a real HTTP
251 // status to the appview's dialer rather than as an immediate
252 // post-upgrade close. ctx scopes the producer's lifetime —
253 // it's cancelled below the moment the client disconnects.
254 ctx, cancel := context.WithCancel(r.Context())
255 defer cancel()
256
257 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow)
258 if err != nil {
259 if errors.Is(err, ErrLogsNotFound) {
260 http.Error(w, "logs not found", http.StatusNotFound)
261 return
262 }
263 logger.Error("logs fetch failed",
264 "err", err,
265 "knot", knot,
266 "pipeline_rkey", pipelineRkey,
267 "workflow", workflow,
268 )
269 http.Error(w, "logs unavailable", http.StatusInternalServerError)
270 return
271 }
272
273 conn, err := upgrader.Upgrade(w, r, nil)
274 if err != nil {
275 // Upgrade already wrote a response; just record the
276 // failure for diagnostics.
277 logger.Error("logs websocket upgrade failed",
278 "err", err,
279 "knot", knot,
280 "pipeline_rkey", pipelineRkey,
281 "workflow", workflow,
282 )
283 return
284 }
285 defer func() {
286 // Send a close frame on the way out so the appview proxy
287 // sees a clean shutdown. Mirrors upstream
288 // spindle.(*Spindle).Logs.
289 _ = conn.WriteControl(
290 websocket.CloseMessage,
291 websocket.FormatCloseMessage(
292 websocket.CloseNormalClosure, "log stream complete",
293 ),
294 time.Now().Add(time.Second),
295 )
296 conn.Close()
297 }()
298
299 // Detect client disconnect by trying to read; we don't expect
300 // any payloads from the client, so any read outcome (including
301 // EOF) signals the connection has gone away. The cancel hits
302 // the producer goroutine inside the Provider, which stops
303 // sending and closes ch — our drain loop then exits cleanly.
304 go func() {
305 for {
306 if _, _, err := conn.NextReader(); err != nil {
307 cancel()
308 return
309 }
310 }
311 }()
312
313 // Drain the channel; closure means the run is complete (or
314 // the producer hit ctx). Marshal-then-write each LogLine as
315 // a single TextMessage frame.
316 for {
317 select {
318 case <-ctx.Done():
319 return
320 case line, ok := <-ch:
321 if !ok {
322 return
323 }
324 frame, err := json.Marshal(line)
325 if err != nil {
326 // The struct is fully internal; a marshal failure
327 // is a programmer bug. Log and bail rather than
328 // poison the stream with a half-frame.
329 logger.Error("marshal log line",
330 "err", err,
331 "knot", knot,
332 "pipeline_rkey", pipelineRkey,
333 "workflow", workflow,
334 )
335 return
336 }
337 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
338 logger.Debug("logs frame write failed",
339 "err", err,
340 "knot", knot,
341 "pipeline_rkey", pipelineRkey,
342 "workflow", workflow,
343 )
344 return
345 }
346 }
347 }
348 }
349}
350
351// eventsHandler upgrades to a WebSocket and streams persisted records
352// to the connected client. The wire protocol mirrors the upstream
353// Tangled spindle so the appview's eventconsumer treats us as a
354// drop-in source:
355//
356// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
357// means "from the beginning of our retained log".
358// - We do a backfill pass first (everything with created > cursor),
359// then loop: on each broker signal, drain new rows; on a 30s
360// timer, write a websocket ping so intermediaries don't idle the
361// connection out.
362//
363// We subscribe to the broker *before* the backfill so any Publish that
364// races between the cursor read and the loop entry is captured by the
365// pending channel signal — the loop will see it on its first iteration
366// and call streamEvents again, which is idempotent on the cursor.
367func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
368 upgrader := websocket.Upgrader{
369 ReadBufferSize: 1024,
370 WriteBufferSize: 1024,
371 }
372 return func(w http.ResponseWriter, r *http.Request) {
373 conn, err := upgrader.Upgrade(w, r, nil)
374 if err != nil {
375 logger.Error("websocket upgrade failed", "err", err)
376 return
377 }
378 defer conn.Close()
379
380 // Parse the resume cursor up front. An unparseable cursor is a
381 // client bug, but rather than 4xx the upgraded connection we
382 // log it and start from zero — same behaviour as the upstream
383 // spindle.
384 var cursor int64
385 if raw := r.URL.Query().Get("cursor"); raw != "" {
386 parsed, err := strconv.ParseInt(raw, 10, 64)
387 if err != nil {
388 logger.Warn("events: bad cursor, starting from 0",
389 "cursor", raw, "err", err,
390 )
391 } else {
392 cursor = parsed
393 }
394 }
395 logger.Debug("events client connected",
396 "remote", r.RemoteAddr, "cursor", cursor,
397 )
398
399 // Subscribe before the backfill so a Publish that races between
400 // the EventsAfter read and our select loop is captured by the
401 // pending channel signal — we'll re-drain on the first wake-up.
402 sig := br.Subscribe()
403 defer br.Unsubscribe(sig)
404
405 ctx, cancel := context.WithCancel(r.Context())
406 defer cancel()
407
408 // Detect client disconnect by trying to read; we don't expect
409 // any payloads from the client, so any read outcome (including
410 // EOF) signals the connection has gone away.
411 go func() {
412 for {
413 if _, _, err := conn.NextReader(); err != nil {
414 cancel()
415 return
416 }
417 }
418 }()
419
420 // Initial backfill. If this fails the connection is unusable
421 // (we can't promise ordering after a partial write) so just
422 // return and let the client reconnect with the same cursor.
423 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
424 logger.Debug("events backfill ended", "err", err, "cursor", cursor)
425 return
426 }
427
428 ticker := time.NewTicker(30 * time.Second)
429 defer ticker.Stop()
430 for {
431 select {
432 case <-ctx.Done():
433 logger.Debug("events client disconnected",
434 "remote", r.RemoteAddr, "cursor", cursor,
435 )
436 return
437 case <-sig:
438 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
439 logger.Debug("events stream ended", "err", err, "cursor", cursor)
440 return
441 }
442 case <-ticker.C:
443 if err := conn.WriteControl(
444 websocket.PingMessage, nil,
445 time.Now().Add(time.Second),
446 ); err != nil {
447 logger.Debug("events ping failed", "err", err)
448 return
449 }
450 }
451 }
452 }
453}
454
455// streamEvents drains every event row with `created > *cursor`, writes
456// each as a wire envelope frame, and advances *cursor in lockstep. The
457// cursor is updated *after* the write succeeds so a half-flushed batch
458// (interrupted by a websocket error) replays cleanly on the next
459// connection.
460//
461// It is safe to call repeatedly: when there are no new rows the query
462// returns an empty slice and we noop.
463func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
464 rows, err := st.EventsAfter(ctx, *cursor)
465 if err != nil {
466 return fmt.Errorf("read events: %w", err)
467 }
468 for _, row := range rows {
469 frame, err := json.Marshal(eventsEnvelope{
470 Rkey: row.Rkey,
471 Nsid: row.Nsid,
472 Event: row.EventJSON,
473 Created: row.Created,
474 })
475 if err != nil {
476 return fmt.Errorf("marshal envelope: %w", err)
477 }
478 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
479 return fmt.Errorf("write frame: %w", err)
480 }
481 *cursor = row.Created
482 }
483 return nil
484}