Stitch any CI into Tangled
1package main
2
3// HTTP surface of the spindle.
4//
5// Four roles to keep in mind:
6//
7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
8// spindle registration to confirm the operator owns this instance.
9// 2. Event stream: the appview holds a long-lived websocket against
10// /events to receive sh.tangled.pipeline.status frames as builds
11// progress. Today this is just a keep-alive; payloads land once the
12// Buildkite webhook receiver is wired up.
13// 3. Webhooks: Buildkite POSTs build/job state changes to
14// /webhooks/buildkite, which we'll translate into pipeline.status
15// events on (2).
16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the
17// configured Provider so the appview (or a curling operator) can
18// pull captured workflow output for a specific run.
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "io"
26 "log/slog"
27 "net/http"
28 "strconv"
29 "time"
30
31 "github.com/gorilla/websocket"
32 "tangled.org/core/api/tangled"
33
34 "go.mitchellh.com/tack/internal/buildkite"
35)
36
37// runHTTP starts the spindle's HTTP server and blocks until ctx is
38// cancelled or the listener returns a fatal error. On ctx cancellation it
39// performs a graceful shutdown with a bounded timeout.
40//
41// The logger is read from ctx via loggerFrom. The broker is the
42// in-process pub/sub used by /events to fan published records out to
43// connected websocket subscribers. bkProvider may be nil — when a
44// deployment runs the fake provider, /webhooks/buildkite still
45// registers but responds 503, so a misdirected Buildkite webhook
46// gets a clear "this spindle isn't accepting Buildkite events" rather
47// than a misleading 200.
48func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error {
49 logger := loggerFrom(ctx)
50
51 mux := http.NewServeMux()
52 mux.HandleFunc("GET /", rootHandler())
53 mux.HandleFunc("GET /events", eventsHandler(logger, br))
54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider))
55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
56 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider))
57
58 srv := &http.Server{
59 Addr: cfg.Addr,
60 Handler: mux,
61 ReadHeaderTimeout: 5 * time.Second,
62 }
63
64 // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
65 errCh := make(chan error, 1)
66 go func() {
67 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
68 errCh <- srv.ListenAndServe()
69 }()
70
71 select {
72 case <-ctx.Done():
73 logger.Info("shutting down")
74 case err := <-errCh:
75 // ErrServerClosed means we shut ourselves down cleanly elsewhere
76 // — anything else is a real failure to report.
77 if err != nil && !errors.Is(err, http.ErrServerClosed) {
78 return fmt.Errorf("http server: %w", err)
79 }
80 }
81
82 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
83 defer cancel()
84 return srv.Shutdown(shutdownCtx)
85}
86
87// rootHandler responds at "/" with a small identifier. Mainly useful as a
88// liveness check during deployment.
89func rootHandler() http.HandlerFunc {
90 return func(w http.ResponseWriter, r *http.Request) {
91 fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite")
92 }
93}
94
95// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
96// this spindle's owner during registration.
97func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
98 return func(w http.ResponseWriter, r *http.Request) {
99 w.Header().Set("Content-Type", "application/json")
100 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
101 logger.Error("encode owner response", "err", err)
102 }
103 }
104}
105
106// buildkiteWebhookHandler receives Buildkite Pipelines webhook events,
107// authenticates the request against whichever scheme the provider was
108// configured with, and hands the decoded payload to the provider for
109// translation into a sh.tangled.pipeline.status publish.
110//
111// Authentication is intentionally fail-closed: when bk is nil (no
112// Buildkite provider configured) we 503 instead of accepting events
113// silently. The body is buffered up front because signature mode
114// HMACs the raw bytes — we can't rely on the JSON decoder reading
115// the request body before verification.
116//
117// Acknowledgement contract with Buildkite: we 200 on any well-formed
118// event we accepted (including events we deliberately ignore, like
119// job.* or builds we don't track), and 5xx only on internal failure
120// the operator should look at. A 4xx/5xx makes Buildkite retry,
121// which we don't want for "this isn't an event we care about".
122func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc {
123 return func(w http.ResponseWriter, r *http.Request) {
124 if bk == nil {
125 http.Error(w, "buildkite provider not configured",
126 http.StatusServiceUnavailable)
127 return
128 }
129
130 // Cap body size so a malicious sender can't exhaust
131 // memory; Buildkite payloads in practice are well under
132 // 64 KiB but a generous-but-bounded ceiling is the
133 // right shape here.
134 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
135 if err != nil {
136 logger.Warn("buildkite webhook: read body", "err", err)
137 http.Error(w, "read body", http.StatusBadRequest)
138 return
139 }
140
141 if err := bk.VerifyWebhook(r.Header, body); err != nil {
142 logger.Warn("buildkite webhook: verify failed",
143 "err", err,
144 "remote", r.RemoteAddr,
145 )
146 http.Error(w, "unauthorized", http.StatusUnauthorized)
147 return
148 }
149
150 var payload buildkite.WebhookPayload
151 if err := json.Unmarshal(body, &payload); err != nil {
152 logger.Warn("buildkite webhook: decode body", "err", err)
153 http.Error(w, "bad payload", http.StatusBadRequest)
154 return
155 }
156 // The X-Buildkite-Event header is authoritative for the
157 // event name; the body field is convenience but doesn't
158 // always match exactly. Prefer the header.
159 if h := r.Header.Get("X-Buildkite-Event"); h != "" {
160 payload.Event = h
161 }
162
163 // Translate + publish on the request context so a slow
164 // store/broker doesn't outlive an aborted webhook
165 // connection.
166 if err := bk.HandleWebhook(r.Context(), payload); err != nil {
167 logger.Error("buildkite webhook: handle", "err", err,
168 "event", payload.Event,
169 "build_uuid", payload.Build.ID,
170 )
171 http.Error(w, "internal error", http.StatusInternalServerError)
172 return
173 }
174 w.WriteHeader(http.StatusOK)
175 }
176}
177
178// logsHandler serves captured workflow logs over a WebSocket,
179// matching the wire protocol of the upstream Tangled spindle so the
180// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in
181// source. The path shape is
182//
183// GET /logs/{knot}/{pipelineRkey}/{workflow}
184//
185// which matches the (knot, pipelineRkey, workflow) tuple
186// Provider.Spawn is invoked with — the same identity used in the
187// pipeline ATURI. Workflow names commonly contain a dot (e.g.
188// "test.yml"); ServeMux path patterns match a single segment, so the
189// literal value flows straight through r.PathValue.
190//
191// Wire shape per frame: a single TextMessage carrying one JSON
192// LogLine record (defined in provider.go; byte-compatible with
193// tangled.org/core/spindle/models.LogLine). The Provider hands us a
194// channel of LogLine values; we marshal each one and forward it as
195// one frame so the appview's per-line decode path works unchanged.
196//
197// Error mapping is intentionally done *before* the WebSocket upgrade:
198// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500
199// so the appview's websocket.DefaultDialer surfaces a real HTTP
200// status rather than an immediate close.
201func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc {
202 upgrader := websocket.Upgrader{
203 ReadBufferSize: 1024,
204 WriteBufferSize: 1024,
205 }
206 return func(w http.ResponseWriter, r *http.Request) {
207 knot := r.PathValue("knot")
208 pipelineRkey := r.PathValue("pipelineRkey")
209 workflow := r.PathValue("workflow")
210
211 // Defensive: ServeMux won't match an empty segment, but a
212 // future router change shouldn't be allowed to silently
213 // produce an "all logs" query.
214 if knot == "" || pipelineRkey == "" || workflow == "" {
215 http.Error(w, "missing path component", http.StatusBadRequest)
216 return
217 }
218
219 // Establish the log channel BEFORE the WebSocket upgrade so
220 // ErrLogsNotFound / backend errors surface as a real HTTP
221 // status to the appview's dialer rather than as an immediate
222 // post-upgrade close. ctx scopes the producer's lifetime —
223 // it's cancelled below the moment the client disconnects.
224 ctx, cancel := context.WithCancel(r.Context())
225 defer cancel()
226
227 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow)
228 if err != nil {
229 if errors.Is(err, ErrLogsNotFound) {
230 http.Error(w, "logs not found", http.StatusNotFound)
231 return
232 }
233 logger.Error("logs fetch failed",
234 "err", err,
235 "knot", knot,
236 "pipeline_rkey", pipelineRkey,
237 "workflow", workflow,
238 )
239 http.Error(w, "logs unavailable", http.StatusInternalServerError)
240 return
241 }
242
243 conn, err := upgrader.Upgrade(w, r, nil)
244 if err != nil {
245 // Upgrade already wrote a response; just record the
246 // failure for diagnostics.
247 logger.Error("logs websocket upgrade failed",
248 "err", err,
249 "knot", knot,
250 "pipeline_rkey", pipelineRkey,
251 "workflow", workflow,
252 )
253 return
254 }
255 defer func() {
256 // Send a close frame on the way out so the appview proxy
257 // sees a clean shutdown. Mirrors upstream
258 // spindle.(*Spindle).Logs.
259 _ = conn.WriteControl(
260 websocket.CloseMessage,
261 websocket.FormatCloseMessage(
262 websocket.CloseNormalClosure, "log stream complete",
263 ),
264 time.Now().Add(time.Second),
265 )
266 conn.Close()
267 }()
268
269 // Detect client disconnect by trying to read; we don't expect
270 // any payloads from the client, so any read outcome (including
271 // EOF) signals the connection has gone away. The cancel hits
272 // the producer goroutine inside the Provider, which stops
273 // sending and closes ch — our drain loop then exits cleanly.
274 go func() {
275 for {
276 if _, _, err := conn.NextReader(); err != nil {
277 cancel()
278 return
279 }
280 }
281 }()
282
283 // Drain the channel; closure means the run is complete (or
284 // the producer hit ctx). Marshal-then-write each LogLine as
285 // a single TextMessage frame.
286 for {
287 select {
288 case <-ctx.Done():
289 return
290 case line, ok := <-ch:
291 if !ok {
292 return
293 }
294 frame, err := json.Marshal(line)
295 if err != nil {
296 // The struct is fully internal; a marshal failure
297 // is a programmer bug. Log and bail rather than
298 // poison the stream with a half-frame.
299 logger.Error("marshal log line",
300 "err", err,
301 "knot", knot,
302 "pipeline_rkey", pipelineRkey,
303 "workflow", workflow,
304 )
305 return
306 }
307 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
308 logger.Debug("logs frame write failed",
309 "err", err,
310 "knot", knot,
311 "pipeline_rkey", pipelineRkey,
312 "workflow", workflow,
313 )
314 return
315 }
316 }
317 }
318 }
319}
320
321// eventsHandler upgrades to a WebSocket and streams persisted records
322// to the connected client. The wire protocol mirrors the upstream
323// Tangled spindle so the appview's eventconsumer treats us as a
324// drop-in source:
325//
326// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
327// means "from the beginning of our retained log".
328// - We do a backfill pass first (everything with created > cursor),
329// then loop: on each broker signal, drain new rows; on a 30s
330// timer, write a websocket ping so intermediaries don't idle the
331// connection out.
332//
333// We subscribe to the broker *before* the backfill so any Publish that
334// races between the cursor read and the loop entry is captured by the
335// pending channel signal — the loop will see it on its first iteration
336// and call streamEvents again, which is idempotent on the cursor.
337func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
338 upgrader := websocket.Upgrader{
339 ReadBufferSize: 1024,
340 WriteBufferSize: 1024,
341 }
342 return func(w http.ResponseWriter, r *http.Request) {
343 conn, err := upgrader.Upgrade(w, r, nil)
344 if err != nil {
345 logger.Error("websocket upgrade failed", "err", err)
346 return
347 }
348 defer conn.Close()
349
350 // Parse the resume cursor up front. An unparseable cursor is a
351 // client bug, but rather than 4xx the upgraded connection we
352 // log it and start from zero — same behaviour as the upstream
353 // spindle.
354 var cursor int64
355 if raw := r.URL.Query().Get("cursor"); raw != "" {
356 parsed, err := strconv.ParseInt(raw, 10, 64)
357 if err != nil {
358 logger.Warn("events: bad cursor, starting from 0",
359 "cursor", raw, "err", err,
360 )
361 } else {
362 cursor = parsed
363 }
364 }
365 logger.Debug("events client connected",
366 "remote", r.RemoteAddr, "cursor", cursor,
367 )
368
369 // Subscribe before the backfill so a Publish that races between
370 // the EventsAfter read and our select loop is captured by the
371 // pending channel signal — we'll re-drain on the first wake-up.
372 sig := br.Subscribe()
373 defer br.Unsubscribe(sig)
374
375 ctx, cancel := context.WithCancel(r.Context())
376 defer cancel()
377
378 // Detect client disconnect by trying to read; we don't expect
379 // any payloads from the client, so any read outcome (including
380 // EOF) signals the connection has gone away.
381 go func() {
382 for {
383 if _, _, err := conn.NextReader(); err != nil {
384 cancel()
385 return
386 }
387 }
388 }()
389
390 // Initial backfill. If this fails the connection is unusable
391 // (we can't promise ordering after a partial write) so just
392 // return and let the client reconnect with the same cursor.
393 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
394 logger.Debug("events backfill ended", "err", err, "cursor", cursor)
395 return
396 }
397
398 ticker := time.NewTicker(30 * time.Second)
399 defer ticker.Stop()
400 for {
401 select {
402 case <-ctx.Done():
403 logger.Debug("events client disconnected",
404 "remote", r.RemoteAddr, "cursor", cursor,
405 )
406 return
407 case <-sig:
408 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
409 logger.Debug("events stream ended", "err", err, "cursor", cursor)
410 return
411 }
412 case <-ticker.C:
413 if err := conn.WriteControl(
414 websocket.PingMessage, nil,
415 time.Now().Add(time.Second),
416 ); err != nil {
417 logger.Debug("events ping failed", "err", err)
418 return
419 }
420 }
421 }
422 }
423}
424
425// streamEvents drains every event row with `created > *cursor`, writes
426// each as a wire envelope frame, and advances *cursor in lockstep. The
427// cursor is updated *after* the write succeeds so a half-flushed batch
428// (interrupted by a websocket error) replays cleanly on the next
429// connection.
430//
431// It is safe to call repeatedly: when there are no new rows the query
432// returns an empty slice and we noop.
433func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
434 rows, err := st.EventsAfter(ctx, *cursor)
435 if err != nil {
436 return fmt.Errorf("read events: %w", err)
437 }
438 for _, row := range rows {
439 frame, err := json.Marshal(eventsEnvelope{
440 Rkey: row.Rkey,
441 Nsid: row.Nsid,
442 Event: row.EventJSON,
443 Created: row.Created,
444 })
445 if err != nil {
446 return fmt.Errorf("marshal envelope: %w", err)
447 }
448 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
449 return fmt.Errorf("write frame: %w", err)
450 }
451 *cursor = row.Created
452 }
453 return nil
454}