Stitch any CI into Tangled
1package main
2
3// HTTP surface of the spindle.
4//
5// Four roles to keep in mind:
6//
7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
8// spindle registration to confirm the operator owns this instance.
9// 2. Event stream: the appview holds a long-lived websocket against
10// /events to receive sh.tangled.pipeline.status frames as builds
11// progress. Today this is just a keep-alive; payloads land once the
12// Buildkite webhook receiver is wired up.
13// 3. Webhooks: Buildkite POSTs build/job state changes to
14// /webhooks/buildkite, which we'll translate into pipeline.status
15// events on (2).
16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the
17// configured Provider so the appview (or a curling operator) can
18// pull captured workflow output for a specific run.
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "log/slog"
26 "net/http"
27 "strconv"
28 "time"
29
30 "github.com/gorilla/websocket"
31 "tangled.org/core/api/tangled"
32)
33
34// runHTTP starts the spindle's HTTP server and blocks until ctx is
35// cancelled or the listener returns a fatal error. On ctx cancellation it
36// performs a graceful shutdown with a bounded timeout.
37//
38// The logger is read from ctx via loggerFrom. The broker is the
39// in-process pub/sub used by /events to fan published records out to
40// connected websocket subscribers.
41func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider) error {
42 logger := loggerFrom(ctx)
43
44 mux := http.NewServeMux()
45 mux.HandleFunc("GET /", rootHandler())
46 mux.HandleFunc("GET /events", eventsHandler(logger, br))
47 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider))
48 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
49 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler())
50
51 srv := &http.Server{
52 Addr: cfg.Addr,
53 Handler: mux,
54 ReadHeaderTimeout: 5 * time.Second,
55 }
56
57 // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
58 errCh := make(chan error, 1)
59 go func() {
60 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
61 errCh <- srv.ListenAndServe()
62 }()
63
64 select {
65 case <-ctx.Done():
66 logger.Info("shutting down")
67 case err := <-errCh:
68 // ErrServerClosed means we shut ourselves down cleanly elsewhere
69 // — anything else is a real failure to report.
70 if err != nil && !errors.Is(err, http.ErrServerClosed) {
71 return fmt.Errorf("http server: %w", err)
72 }
73 }
74
75 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
76 defer cancel()
77 return srv.Shutdown(shutdownCtx)
78}
79
80// rootHandler responds at "/" with a small identifier. Mainly useful as a
81// liveness check during deployment.
82func rootHandler() http.HandlerFunc {
83 return func(w http.ResponseWriter, r *http.Request) {
84 fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite")
85 }
86}
87
88// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
89// this spindle's owner during registration.
90func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
91 return func(w http.ResponseWriter, r *http.Request) {
92 w.Header().Set("Content-Type", "application/json")
93 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
94 logger.Error("encode owner response", "err", err)
95 }
96 }
97}
98
99// buildkiteWebhookHandler is a placeholder until we implement Buildkite ->
100// pipeline.status translation.
101func buildkiteWebhookHandler() http.HandlerFunc {
102 return func(w http.ResponseWriter, r *http.Request) {
103 http.Error(w, "not implemented", http.StatusNotImplemented)
104 }
105}
106
107// logsHandler serves captured workflow logs over a WebSocket,
108// matching the wire protocol of the upstream Tangled spindle so the
109// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in
110// source. The path shape is
111//
112// GET /logs/{knot}/{pipelineRkey}/{workflow}
113//
114// which matches the (knot, pipelineRkey, workflow) tuple
115// Provider.Spawn is invoked with — the same identity used in the
116// pipeline ATURI. Workflow names commonly contain a dot (e.g.
117// "test.yml"); ServeMux path patterns match a single segment, so the
118// literal value flows straight through r.PathValue.
119//
120// Wire shape per frame: a single TextMessage carrying one JSON
121// LogLine record (defined in provider.go; byte-compatible with
122// tangled.org/core/spindle/models.LogLine). The Provider hands us a
123// channel of LogLine values; we marshal each one and forward it as
124// one frame so the appview's per-line decode path works unchanged.
125//
126// Error mapping is intentionally done *before* the WebSocket upgrade:
127// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500
128// so the appview's websocket.DefaultDialer surfaces a real HTTP
129// status rather than an immediate close.
130func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc {
131 upgrader := websocket.Upgrader{
132 ReadBufferSize: 1024,
133 WriteBufferSize: 1024,
134 }
135 return func(w http.ResponseWriter, r *http.Request) {
136 knot := r.PathValue("knot")
137 pipelineRkey := r.PathValue("pipelineRkey")
138 workflow := r.PathValue("workflow")
139
140 // Defensive: ServeMux won't match an empty segment, but a
141 // future router change shouldn't be allowed to silently
142 // produce an "all logs" query.
143 if knot == "" || pipelineRkey == "" || workflow == "" {
144 http.Error(w, "missing path component", http.StatusBadRequest)
145 return
146 }
147
148 // Establish the log channel BEFORE the WebSocket upgrade so
149 // ErrLogsNotFound / backend errors surface as a real HTTP
150 // status to the appview's dialer rather than as an immediate
151 // post-upgrade close. ctx scopes the producer's lifetime —
152 // it's cancelled below the moment the client disconnects.
153 ctx, cancel := context.WithCancel(r.Context())
154 defer cancel()
155
156 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow)
157 if err != nil {
158 if errors.Is(err, ErrLogsNotFound) {
159 http.Error(w, "logs not found", http.StatusNotFound)
160 return
161 }
162 logger.Error("logs fetch failed",
163 "err", err,
164 "knot", knot,
165 "pipeline_rkey", pipelineRkey,
166 "workflow", workflow,
167 )
168 http.Error(w, "logs unavailable", http.StatusInternalServerError)
169 return
170 }
171
172 conn, err := upgrader.Upgrade(w, r, nil)
173 if err != nil {
174 // Upgrade already wrote a response; just record the
175 // failure for diagnostics.
176 logger.Error("logs websocket upgrade failed",
177 "err", err,
178 "knot", knot,
179 "pipeline_rkey", pipelineRkey,
180 "workflow", workflow,
181 )
182 return
183 }
184 defer func() {
185 // Send a close frame on the way out so the appview proxy
186 // sees a clean shutdown. Mirrors upstream
187 // spindle.(*Spindle).Logs.
188 _ = conn.WriteControl(
189 websocket.CloseMessage,
190 websocket.FormatCloseMessage(
191 websocket.CloseNormalClosure, "log stream complete",
192 ),
193 time.Now().Add(time.Second),
194 )
195 conn.Close()
196 }()
197
198 // Detect client disconnect by trying to read; we don't expect
199 // any payloads from the client, so any read outcome (including
200 // EOF) signals the connection has gone away. The cancel hits
201 // the producer goroutine inside the Provider, which stops
202 // sending and closes ch — our drain loop then exits cleanly.
203 go func() {
204 for {
205 if _, _, err := conn.NextReader(); err != nil {
206 cancel()
207 return
208 }
209 }
210 }()
211
212 // Drain the channel; closure means the run is complete (or
213 // the producer hit ctx). Marshal-then-write each LogLine as
214 // a single TextMessage frame.
215 for {
216 select {
217 case <-ctx.Done():
218 return
219 case line, ok := <-ch:
220 if !ok {
221 return
222 }
223 frame, err := json.Marshal(line)
224 if err != nil {
225 // The struct is fully internal; a marshal failure
226 // is a programmer bug. Log and bail rather than
227 // poison the stream with a half-frame.
228 logger.Error("marshal log line",
229 "err", err,
230 "knot", knot,
231 "pipeline_rkey", pipelineRkey,
232 "workflow", workflow,
233 )
234 return
235 }
236 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
237 logger.Debug("logs frame write failed",
238 "err", err,
239 "knot", knot,
240 "pipeline_rkey", pipelineRkey,
241 "workflow", workflow,
242 )
243 return
244 }
245 }
246 }
247 }
248}
249
250// eventsHandler upgrades to a WebSocket and streams persisted records
251// to the connected client. The wire protocol mirrors the upstream
252// Tangled spindle so the appview's eventconsumer treats us as a
253// drop-in source:
254//
255// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
256// means "from the beginning of our retained log".
257// - We do a backfill pass first (everything with created > cursor),
258// then loop: on each broker signal, drain new rows; on a 30s
259// timer, write a websocket ping so intermediaries don't idle the
260// connection out.
261//
262// We subscribe to the broker *before* the backfill so any Publish that
263// races between the cursor read and the loop entry is captured by the
264// pending channel signal — the loop will see it on its first iteration
265// and call streamEvents again, which is idempotent on the cursor.
266func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
267 upgrader := websocket.Upgrader{
268 ReadBufferSize: 1024,
269 WriteBufferSize: 1024,
270 }
271 return func(w http.ResponseWriter, r *http.Request) {
272 conn, err := upgrader.Upgrade(w, r, nil)
273 if err != nil {
274 logger.Error("websocket upgrade failed", "err", err)
275 return
276 }
277 defer conn.Close()
278
279 // Parse the resume cursor up front. An unparseable cursor is a
280 // client bug, but rather than 4xx the upgraded connection we
281 // log it and start from zero — same behaviour as the upstream
282 // spindle.
283 var cursor int64
284 if raw := r.URL.Query().Get("cursor"); raw != "" {
285 parsed, err := strconv.ParseInt(raw, 10, 64)
286 if err != nil {
287 logger.Warn("events: bad cursor, starting from 0",
288 "cursor", raw, "err", err,
289 )
290 } else {
291 cursor = parsed
292 }
293 }
294 logger.Debug("events client connected",
295 "remote", r.RemoteAddr, "cursor", cursor,
296 )
297
298 // Subscribe before the backfill so a Publish that races between
299 // the EventsAfter read and our select loop is captured by the
300 // pending channel signal — we'll re-drain on the first wake-up.
301 sig := br.Subscribe()
302 defer br.Unsubscribe(sig)
303
304 ctx, cancel := context.WithCancel(r.Context())
305 defer cancel()
306
307 // Detect client disconnect by trying to read; we don't expect
308 // any payloads from the client, so any read outcome (including
309 // EOF) signals the connection has gone away.
310 go func() {
311 for {
312 if _, _, err := conn.NextReader(); err != nil {
313 cancel()
314 return
315 }
316 }
317 }()
318
319 // Initial backfill. If this fails the connection is unusable
320 // (we can't promise ordering after a partial write) so just
321 // return and let the client reconnect with the same cursor.
322 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
323 logger.Debug("events backfill ended", "err", err, "cursor", cursor)
324 return
325 }
326
327 ticker := time.NewTicker(30 * time.Second)
328 defer ticker.Stop()
329 for {
330 select {
331 case <-ctx.Done():
332 logger.Debug("events client disconnected",
333 "remote", r.RemoteAddr, "cursor", cursor,
334 )
335 return
336 case <-sig:
337 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
338 logger.Debug("events stream ended", "err", err, "cursor", cursor)
339 return
340 }
341 case <-ticker.C:
342 if err := conn.WriteControl(
343 websocket.PingMessage, nil,
344 time.Now().Add(time.Second),
345 ); err != nil {
346 logger.Debug("events ping failed", "err", err)
347 return
348 }
349 }
350 }
351 }
352}
353
354// streamEvents drains every event row with `created > *cursor`, writes
355// each as a wire envelope frame, and advances *cursor in lockstep. The
356// cursor is updated *after* the write succeeds so a half-flushed batch
357// (interrupted by a websocket error) replays cleanly on the next
358// connection.
359//
360// It is safe to call repeatedly: when there are no new rows the query
361// returns an empty slice and we noop.
362func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
363 rows, err := st.EventsAfter(ctx, *cursor)
364 if err != nil {
365 return fmt.Errorf("read events: %w", err)
366 }
367 for _, row := range rows {
368 frame, err := json.Marshal(eventsEnvelope{
369 Rkey: row.Rkey,
370 Nsid: row.Nsid,
371 Event: row.EventJSON,
372 Created: row.Created,
373 })
374 if err != nil {
375 return fmt.Errorf("marshal envelope: %w", err)
376 }
377 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
378 return fmt.Errorf("write frame: %w", err)
379 }
380 *cursor = row.Created
381 }
382 return nil
383}