Stitch any CI into Tangled
1package main
2
3// HTTP surface of the spindle.
4//
5// Three roles to keep in mind:
6//
7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
8// spindle registration to confirm the operator owns this instance.
9// 2. Event stream: the appview holds a long-lived websocket against
10// /events to receive sh.tangled.pipeline.status frames as builds
11// progress. Today this is just a keep-alive; payloads land once the
12// Buildkite webhook receiver is wired up.
13// 3. Webhooks: Buildkite POSTs build/job state changes to
14// /webhooks/buildkite, which we'll translate into pipeline.status
15// events on (2).
16
17import (
18 "context"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "log/slog"
23 "net/http"
24 "strconv"
25 "time"
26
27 "github.com/gorilla/websocket"
28 "tangled.org/core/api/tangled"
29)
30
31// runHTTP starts the spindle's HTTP server and blocks until ctx is
32// cancelled or the listener returns a fatal error. On ctx cancellation it
33// performs a graceful shutdown with a bounded timeout.
34//
35// The logger is read from ctx via loggerFrom. The broker is the
36// in-process pub/sub used by /events to fan published records out to
37// connected websocket subscribers.
38func runHTTP(ctx context.Context, cfg config, br *broker) error {
39 logger := loggerFrom(ctx)
40
41 mux := http.NewServeMux()
42 mux.HandleFunc("GET /", rootHandler())
43 mux.HandleFunc("GET /events", eventsHandler(logger, br))
44 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
45 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler())
46
47 srv := &http.Server{
48 Addr: cfg.Addr,
49 Handler: mux,
50 ReadHeaderTimeout: 5 * time.Second,
51 }
52
53 // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
54 errCh := make(chan error, 1)
55 go func() {
56 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
57 errCh <- srv.ListenAndServe()
58 }()
59
60 select {
61 case <-ctx.Done():
62 logger.Info("shutting down")
63 case err := <-errCh:
64 // ErrServerClosed means we shut ourselves down cleanly elsewhere
65 // — anything else is a real failure to report.
66 if err != nil && !errors.Is(err, http.ErrServerClosed) {
67 return fmt.Errorf("http server: %w", err)
68 }
69 }
70
71 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
72 defer cancel()
73 return srv.Shutdown(shutdownCtx)
74}
75
76// rootHandler responds at "/" with a small identifier. Mainly useful as a
77// liveness check during deployment.
78func rootHandler() http.HandlerFunc {
79 return func(w http.ResponseWriter, r *http.Request) {
80 fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite")
81 }
82}
83
84// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
85// this spindle's owner during registration.
86func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
87 return func(w http.ResponseWriter, r *http.Request) {
88 w.Header().Set("Content-Type", "application/json")
89 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
90 logger.Error("encode owner response", "err", err)
91 }
92 }
93}
94
95// buildkiteWebhookHandler is a placeholder until we implement Buildkite ->
96// pipeline.status translation.
97func buildkiteWebhookHandler() http.HandlerFunc {
98 return func(w http.ResponseWriter, r *http.Request) {
99 http.Error(w, "not implemented", http.StatusNotImplemented)
100 }
101}
102
103// eventsHandler upgrades to a WebSocket and streams persisted records
104// to the connected client. The wire protocol mirrors the upstream
105// Tangled spindle so the appview's eventconsumer treats us as a
106// drop-in source:
107//
108// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
109// means "from the beginning of our retained log".
110// - We do a backfill pass first (everything with created > cursor),
111// then loop: on each broker signal, drain new rows; on a 30s
112// timer, write a websocket ping so intermediaries don't idle the
113// connection out.
114//
115// We subscribe to the broker *before* the backfill so any Publish that
116// races between the cursor read and the loop entry is captured by the
117// pending channel signal — the loop will see it on its first iteration
118// and call streamEvents again, which is idempotent on the cursor.
119func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
120 upgrader := websocket.Upgrader{
121 ReadBufferSize: 1024,
122 WriteBufferSize: 1024,
123 }
124 return func(w http.ResponseWriter, r *http.Request) {
125 conn, err := upgrader.Upgrade(w, r, nil)
126 if err != nil {
127 logger.Error("websocket upgrade failed", "err", err)
128 return
129 }
130 defer conn.Close()
131
132 // Parse the resume cursor up front. An unparseable cursor is a
133 // client bug, but rather than 4xx the upgraded connection we
134 // log it and start from zero — same behaviour as the upstream
135 // spindle.
136 var cursor int64
137 if raw := r.URL.Query().Get("cursor"); raw != "" {
138 parsed, err := strconv.ParseInt(raw, 10, 64)
139 if err != nil {
140 logger.Warn("events: bad cursor, starting from 0",
141 "cursor", raw, "err", err,
142 )
143 } else {
144 cursor = parsed
145 }
146 }
147 logger.Debug("events client connected",
148 "remote", r.RemoteAddr, "cursor", cursor,
149 )
150
151 // Subscribe before the backfill so a Publish that races between
152 // the EventsAfter read and our select loop is captured by the
153 // pending channel signal — we'll re-drain on the first wake-up.
154 sig := br.Subscribe()
155 defer br.Unsubscribe(sig)
156
157 ctx, cancel := context.WithCancel(r.Context())
158 defer cancel()
159
160 // Detect client disconnect by trying to read; we don't expect
161 // any payloads from the client, so any read outcome (including
162 // EOF) signals the connection has gone away.
163 go func() {
164 for {
165 if _, _, err := conn.NextReader(); err != nil {
166 cancel()
167 return
168 }
169 }
170 }()
171
172 // Initial backfill. If this fails the connection is unusable
173 // (we can't promise ordering after a partial write) so just
174 // return and let the client reconnect with the same cursor.
175 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
176 logger.Debug("events backfill ended", "err", err, "cursor", cursor)
177 return
178 }
179
180 ticker := time.NewTicker(30 * time.Second)
181 defer ticker.Stop()
182 for {
183 select {
184 case <-ctx.Done():
185 logger.Debug("events client disconnected",
186 "remote", r.RemoteAddr, "cursor", cursor,
187 )
188 return
189 case <-sig:
190 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
191 logger.Debug("events stream ended", "err", err, "cursor", cursor)
192 return
193 }
194 case <-ticker.C:
195 if err := conn.WriteControl(
196 websocket.PingMessage, nil,
197 time.Now().Add(time.Second),
198 ); err != nil {
199 logger.Debug("events ping failed", "err", err)
200 return
201 }
202 }
203 }
204 }
205}
206
207// streamEvents drains every event row with `created > *cursor`, writes
208// each as a wire envelope frame, and advances *cursor in lockstep. The
209// cursor is updated *after* the write succeeds so a half-flushed batch
210// (interrupted by a websocket error) replays cleanly on the next
211// connection.
212//
213// It is safe to call repeatedly: when there are no new rows the query
214// returns an empty slice and we noop.
215func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
216 rows, err := st.EventsAfter(ctx, *cursor)
217 if err != nil {
218 return fmt.Errorf("read events: %w", err)
219 }
220 for _, row := range rows {
221 frame, err := json.Marshal(eventsEnvelope{
222 Rkey: row.Rkey,
223 Nsid: row.Nsid,
224 Event: row.EventJSON,
225 Created: row.Created,
226 })
227 if err != nil {
228 return fmt.Errorf("marshal envelope: %w", err)
229 }
230 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
231 return fmt.Errorf("write frame: %w", err)
232 }
233 *cursor = row.Created
234 }
235 return nil
236}