Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// HTTP surface of the spindle. 4// 5// Three roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16 17import ( 18 "context" 19 "encoding/json" 20 "errors" 21 "fmt" 22 "log/slog" 23 "net/http" 24 "strconv" 25 "time" 26 27 "github.com/gorilla/websocket" 28 "tangled.org/core/api/tangled" 29) 30 31// runHTTP starts the spindle's HTTP server and blocks until ctx is 32// cancelled or the listener returns a fatal error. On ctx cancellation it 33// performs a graceful shutdown with a bounded timeout. 34// 35// The logger is read from ctx via loggerFrom. The broker is the 36// in-process pub/sub used by /events to fan published records out to 37// connected websocket subscribers. 38func runHTTP(ctx context.Context, cfg config, br *broker) error { 39 logger := loggerFrom(ctx) 40 41 mux := http.NewServeMux() 42 mux.HandleFunc("GET /", rootHandler()) 43 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 44 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 45 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler()) 46 47 srv := &http.Server{ 48 Addr: cfg.Addr, 49 Handler: mux, 50 ReadHeaderTimeout: 5 * time.Second, 51 } 52 53 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 54 errCh := make(chan error, 1) 55 go func() { 56 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 57 errCh <- srv.ListenAndServe() 58 }() 59 60 select { 61 case <-ctx.Done(): 62 logger.Info("shutting down") 63 case err := <-errCh: 64 // ErrServerClosed means we shut ourselves down cleanly elsewhere 65 // — anything else is a real failure to report. 66 if err != nil && !errors.Is(err, http.ErrServerClosed) { 67 return fmt.Errorf("http server: %w", err) 68 } 69 } 70 71 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 72 defer cancel() 73 return srv.Shutdown(shutdownCtx) 74} 75 76// rootHandler responds at "/" with a small identifier. Mainly useful as a 77// liveness check during deployment. 78func rootHandler() http.HandlerFunc { 79 return func(w http.ResponseWriter, r *http.Request) { 80 fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite") 81 } 82} 83 84// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 85// this spindle's owner during registration. 86func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 87 return func(w http.ResponseWriter, r *http.Request) { 88 w.Header().Set("Content-Type", "application/json") 89 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 90 logger.Error("encode owner response", "err", err) 91 } 92 } 93} 94 95// buildkiteWebhookHandler is a placeholder until we implement Buildkite -> 96// pipeline.status translation. 97func buildkiteWebhookHandler() http.HandlerFunc { 98 return func(w http.ResponseWriter, r *http.Request) { 99 http.Error(w, "not implemented", http.StatusNotImplemented) 100 } 101} 102 103// eventsHandler upgrades to a WebSocket and streams persisted records 104// to the connected client. The wire protocol mirrors the upstream 105// Tangled spindle so the appview's eventconsumer treats us as a 106// drop-in source: 107// 108// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 109// means "from the beginning of our retained log". 110// - We do a backfill pass first (everything with created > cursor), 111// then loop: on each broker signal, drain new rows; on a 30s 112// timer, write a websocket ping so intermediaries don't idle the 113// connection out. 114// 115// We subscribe to the broker *before* the backfill so any Publish that 116// races between the cursor read and the loop entry is captured by the 117// pending channel signal — the loop will see it on its first iteration 118// and call streamEvents again, which is idempotent on the cursor. 119func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 120 upgrader := websocket.Upgrader{ 121 ReadBufferSize: 1024, 122 WriteBufferSize: 1024, 123 } 124 return func(w http.ResponseWriter, r *http.Request) { 125 conn, err := upgrader.Upgrade(w, r, nil) 126 if err != nil { 127 logger.Error("websocket upgrade failed", "err", err) 128 return 129 } 130 defer conn.Close() 131 132 // Parse the resume cursor up front. An unparseable cursor is a 133 // client bug, but rather than 4xx the upgraded connection we 134 // log it and start from zero — same behaviour as the upstream 135 // spindle. 136 var cursor int64 137 if raw := r.URL.Query().Get("cursor"); raw != "" { 138 parsed, err := strconv.ParseInt(raw, 10, 64) 139 if err != nil { 140 logger.Warn("events: bad cursor, starting from 0", 141 "cursor", raw, "err", err, 142 ) 143 } else { 144 cursor = parsed 145 } 146 } 147 logger.Debug("events client connected", 148 "remote", r.RemoteAddr, "cursor", cursor, 149 ) 150 151 // Subscribe before the backfill so a Publish that races between 152 // the EventsAfter read and our select loop is captured by the 153 // pending channel signal — we'll re-drain on the first wake-up. 154 sig := br.Subscribe() 155 defer br.Unsubscribe(sig) 156 157 ctx, cancel := context.WithCancel(r.Context()) 158 defer cancel() 159 160 // Detect client disconnect by trying to read; we don't expect 161 // any payloads from the client, so any read outcome (including 162 // EOF) signals the connection has gone away. 163 go func() { 164 for { 165 if _, _, err := conn.NextReader(); err != nil { 166 cancel() 167 return 168 } 169 } 170 }() 171 172 // Initial backfill. If this fails the connection is unusable 173 // (we can't promise ordering after a partial write) so just 174 // return and let the client reconnect with the same cursor. 175 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 176 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 177 return 178 } 179 180 ticker := time.NewTicker(30 * time.Second) 181 defer ticker.Stop() 182 for { 183 select { 184 case <-ctx.Done(): 185 logger.Debug("events client disconnected", 186 "remote", r.RemoteAddr, "cursor", cursor, 187 ) 188 return 189 case <-sig: 190 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 191 logger.Debug("events stream ended", "err", err, "cursor", cursor) 192 return 193 } 194 case <-ticker.C: 195 if err := conn.WriteControl( 196 websocket.PingMessage, nil, 197 time.Now().Add(time.Second), 198 ); err != nil { 199 logger.Debug("events ping failed", "err", err) 200 return 201 } 202 } 203 } 204 } 205} 206 207// streamEvents drains every event row with `created > *cursor`, writes 208// each as a wire envelope frame, and advances *cursor in lockstep. The 209// cursor is updated *after* the write succeeds so a half-flushed batch 210// (interrupted by a websocket error) replays cleanly on the next 211// connection. 212// 213// It is safe to call repeatedly: when there are no new rows the query 214// returns an empty slice and we noop. 215func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 216 rows, err := st.EventsAfter(ctx, *cursor) 217 if err != nil { 218 return fmt.Errorf("read events: %w", err) 219 } 220 for _, row := range rows { 221 frame, err := json.Marshal(eventsEnvelope{ 222 Rkey: row.Rkey, 223 Nsid: row.Nsid, 224 Event: row.EventJSON, 225 Created: row.Created, 226 }) 227 if err != nil { 228 return fmt.Errorf("marshal envelope: %w", err) 229 } 230 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 231 return fmt.Errorf("write frame: %w", err) 232 } 233 *cursor = row.Created 234 } 235 return nil 236}