Stitch any CI into Tangled
1package main
2
3// Tests for the /events fan-out: the store's event log methods, the
4// in-process broker, and the eventsHandler's wire output. The handler
5// test boots a real httptest server and a real gorilla websocket client
6// so we exercise the upgrade + envelope codec end to end.
7
8import (
9 "context"
10 "encoding/json"
11 "io"
12 "log/slog"
13 "net/http"
14 "net/http/httptest"
15 "net/url"
16 "strconv"
17 "strings"
18 "testing"
19 "time"
20
21 "github.com/gorilla/websocket"
22)
23
24// TestEventsLogRoundtrip covers InsertEvent / EventsAfter together
25// because they're a tightly-coupled pair: the cursor returned by Insert
26// is the same value EventsAfter must accept to skip past that row.
27func TestEventsLogRoundtrip(t *testing.T) {
28 s := newTestStore(t)
29 ctx := context.Background()
30
31 // Empty log → empty slice (not nil), so callers can range freely.
32 got, err := s.EventsAfter(ctx, 0)
33 if err != nil {
34 t.Fatalf("EventsAfter empty: %v", err)
35 }
36 if got == nil || len(got) != 0 {
37 t.Fatalf("empty log: got %v, want empty non-nil slice", got)
38 }
39
40 c1, err := s.InsertEvent(ctx, "rk1", "sh.tangled.pipeline.status", []byte(`{"a":1}`))
41 if err != nil {
42 t.Fatalf("insert 1: %v", err)
43 }
44 c2, err := s.InsertEvent(ctx, "rk2", "sh.tangled.pipeline.status", []byte(`{"a":2}`))
45 if err != nil {
46 t.Fatalf("insert 2: %v", err)
47 }
48 if c2 <= c1 {
49 t.Fatalf("cursors must be monotonically increasing: c1=%d c2=%d", c1, c2)
50 }
51
52 // cursor=0 returns everything; cursor=c1 skips the first row.
53 got, err = s.EventsAfter(ctx, 0)
54 if err != nil {
55 t.Fatalf("EventsAfter 0: %v", err)
56 }
57 if len(got) != 2 || got[0].Created != c1 || got[1].Created != c2 {
58 t.Fatalf("EventsAfter(0) = %+v, want both rows in order", got)
59 }
60 if got[0].Rkey != "rk1" || got[1].Rkey != "rk2" {
61 t.Fatalf("rkey order wrong: %q %q", got[0].Rkey, got[1].Rkey)
62 }
63 // json.RawMessage round-trip — matters because the /events handler
64 // splices these straight into the envelope.
65 if string(got[0].EventJSON) != `{"a":1}` {
66 t.Fatalf("event_json round-trip = %q", got[0].EventJSON)
67 }
68
69 got, err = s.EventsAfter(ctx, c1)
70 if err != nil {
71 t.Fatalf("EventsAfter c1: %v", err)
72 }
73 if len(got) != 1 || got[0].Created != c2 {
74 t.Fatalf("EventsAfter(c1) = %+v, want only row c2", got)
75 }
76}
77
78// TestBrokerPublishWakesSubscribers asserts the core invariant: a
79// Publish causes Subscribe()'d channels to fire (at least once) and the
80// row is durably visible via EventsAfter so the subscriber can drain
81// it. Two subscribers cover the multi-fanout case.
82func TestBrokerPublishWakesSubscribers(t *testing.T) {
83 s := newTestStore(t)
84 ctx := context.Background()
85 br := newBroker(s)
86
87 a := br.Subscribe()
88 b := br.Subscribe()
89 defer br.Unsubscribe(a)
90 defer br.Unsubscribe(b)
91
92 cursor, err := br.Publish(ctx, "rk", "sh.tangled.pipeline.status", []byte(`{}`))
93 if err != nil {
94 t.Fatalf("publish: %v", err)
95 }
96 if cursor <= 0 {
97 t.Fatalf("publish cursor = %d, want > 0", cursor)
98 }
99
100 // Both subscribers must receive the wake-up promptly. Use a
101 // generous timeout so flaky CI doesn't false-alarm.
102 for name, ch := range map[string]chan struct{}{"a": a, "b": b} {
103 select {
104 case <-ch:
105 case <-time.After(time.Second):
106 t.Fatalf("subscriber %s did not receive signal", name)
107 }
108 }
109
110 rows, err := s.EventsAfter(ctx, 0)
111 if err != nil {
112 t.Fatalf("EventsAfter: %v", err)
113 }
114 if len(rows) != 1 || rows[0].Created != cursor {
115 t.Fatalf("after publish, EventsAfter(0) = %+v, want one row with cursor=%d", rows, cursor)
116 }
117}
118
119// TestBrokerCoalescesPendingSignal ensures Publish never blocks on a
120// subscriber that hasn't drained its channel: a second Publish while
121// the first signal is still pending must coalesce, not deadlock. This
122// is the property that lets slow clients lag without backpressuring
123// the rest of the system.
124func TestBrokerCoalescesPendingSignal(t *testing.T) {
125 s := newTestStore(t)
126 ctx := context.Background()
127 br := newBroker(s)
128
129 ch := br.Subscribe()
130 defer br.Unsubscribe(ch)
131
132 // First publish lands a pending signal in ch (cap=1).
133 if _, err := br.Publish(ctx, "rk1", "n", []byte(`{}`)); err != nil {
134 t.Fatalf("publish 1: %v", err)
135 }
136 // Second publish must succeed immediately — without a default
137 // branch in notify(), this would block forever waiting on the
138 // unread ch.
139 done := make(chan error, 1)
140 go func() {
141 _, err := br.Publish(ctx, "rk2", "n", []byte(`{}`))
142 done <- err
143 }()
144 select {
145 case err := <-done:
146 if err != nil {
147 t.Fatalf("publish 2: %v", err)
148 }
149 case <-time.After(time.Second):
150 t.Fatal("Publish blocked on un-drained subscriber")
151 }
152
153 // One drain is enough — cursor-based catch-up will pick up *both*
154 // rows in a single EventsAfter call.
155 <-ch
156 rows, err := s.EventsAfter(ctx, 0)
157 if err != nil {
158 t.Fatalf("EventsAfter: %v", err)
159 }
160 if len(rows) != 2 {
161 t.Fatalf("expected 2 rows after 2 publishes, got %d", len(rows))
162 }
163}
164
165// TestBrokerUnsubscribeStopsDelivery confirms an unsubscribed channel
166// no longer receives wake-ups. Without this we'd leak signals to dead
167// websockets and (worse) hold their channels in the broker map.
168func TestBrokerUnsubscribeStopsDelivery(t *testing.T) {
169 s := newTestStore(t)
170 ctx := context.Background()
171 br := newBroker(s)
172
173 ch := br.Subscribe()
174 br.Unsubscribe(ch)
175
176 if _, err := br.Publish(ctx, "rk", "n", []byte(`{}`)); err != nil {
177 t.Fatalf("publish: %v", err)
178 }
179 select {
180 case <-ch:
181 t.Fatal("unsubscribed channel still received signal")
182 case <-time.After(50 * time.Millisecond):
183 }
184}
185
186// TestEventsHandlerStreamsLiveAndBackfill exercises the full HTTP
187// surface: open a websocket, observe a backfill of pre-existing rows,
188// publish a new row, observe it arrive live, then reconnect with a
189// cursor and observe only events strictly after that cursor.
190func TestEventsHandlerStreamsLiveAndBackfill(t *testing.T) {
191 s := newTestStore(t)
192 br := newBroker(s)
193 ctx := context.Background()
194
195 // Seed two rows so the first connection has something to backfill.
196 pre1, err := br.Publish(ctx, "rk-pre1", "sh.tangled.pipeline.status", []byte(`{"i":1}`))
197 if err != nil {
198 t.Fatalf("seed 1: %v", err)
199 }
200 pre2, err := br.Publish(ctx, "rk-pre2", "sh.tangled.pipeline.status", []byte(`{"i":2}`))
201 if err != nil {
202 t.Fatalf("seed 2: %v", err)
203 }
204
205 // Boot the handler behind an httptest server. Using a discarding
206 // logger keeps test output quiet.
207 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
208 srv := httptest.NewServer(eventsHandler(logger, br))
209 t.Cleanup(srv.Close)
210
211 // First connection: no cursor, expect both seeded rows plus a
212 // freshly published live row.
213 c1 := dialEvents(t, srv.URL, 0)
214 defer c1.Close()
215
216 got1 := readEnvelope(t, c1)
217 if got1.Created != pre1 || got1.Rkey != "rk-pre1" {
218 t.Fatalf("first frame = %+v, want pre1 (cursor=%d)", got1, pre1)
219 }
220 got2 := readEnvelope(t, c1)
221 if got2.Created != pre2 || got2.Rkey != "rk-pre2" {
222 t.Fatalf("second frame = %+v, want pre2 (cursor=%d)", got2, pre2)
223 }
224 // Verify the wire envelope's `event` field round-trips as the raw
225 // record body, not a re-encoded blob.
226 if strings.TrimSpace(string(got2.Event)) != `{"i":2}` {
227 t.Fatalf("event body = %q, want %q", got2.Event, `{"i":2}`)
228 }
229
230 // Live publish — handler should wake on broker signal and emit it.
231 live, err := br.Publish(ctx, "rk-live", "sh.tangled.pipeline.status", []byte(`{"i":3}`))
232 if err != nil {
233 t.Fatalf("live publish: %v", err)
234 }
235 got3 := readEnvelope(t, c1)
236 if got3.Created != live || got3.Rkey != "rk-live" {
237 t.Fatalf("live frame = %+v, want rk-live (cursor=%d)", got3, live)
238 }
239
240 // Second connection with cursor=pre2: must skip pre1 and pre2,
241 // receive only the live row. No timeout is set; if the handler
242 // over-delivers we'll fail in readEnvelope's assert below.
243 c2 := dialEvents(t, srv.URL, pre2)
244 defer c2.Close()
245 got := readEnvelope(t, c2)
246 if got.Created != live || got.Rkey != "rk-live" {
247 t.Fatalf("cursor-resume frame = %+v, want rk-live (cursor=%d)", got, live)
248 }
249}
250
251// TestEventsHandlerBadCursorStartsFromZero confirms a malformed cursor
252// query parameter doesn't 4xx the upgrade — the handler logs and falls
253// back to the full backfill, matching the upstream spindle's behaviour.
254func TestEventsHandlerBadCursorStartsFromZero(t *testing.T) {
255 s := newTestStore(t)
256 br := newBroker(s)
257 ctx := context.Background()
258
259 if _, err := br.Publish(ctx, "rk", "sh.tangled.pipeline.status", []byte(`{}`)); err != nil {
260 t.Fatalf("publish: %v", err)
261 }
262
263 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
264 srv := httptest.NewServer(eventsHandler(logger, br))
265 t.Cleanup(srv.Close)
266
267 // Build the URL by hand so we can inject a non-numeric cursor.
268 u, _ := url.Parse(srv.URL)
269 u.Scheme = "ws"
270 q := u.Query()
271 q.Set("cursor", "not-a-number")
272 u.RawQuery = q.Encode()
273
274 conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
275 if err != nil {
276 t.Fatalf("dial: %v", err)
277 }
278 defer conn.Close()
279
280 conn.SetReadDeadline(time.Now().Add(2 * time.Second))
281 _, msg, err := conn.ReadMessage()
282 if err != nil {
283 t.Fatalf("read: %v", err)
284 }
285 var env eventsEnvelope
286 if err := json.Unmarshal(msg, &env); err != nil {
287 t.Fatalf("unmarshal: %v", err)
288 }
289 if env.Rkey != "rk" {
290 t.Fatalf("expected backfill of seeded row, got envelope %+v", env)
291 }
292}
293
294// dialEvents opens a websocket against an httptest server (which
295// returns http://) by rewriting the scheme to ws://. cursor=0 omits the
296// query parameter entirely so we exercise the "no cursor" code path.
297func dialEvents(t *testing.T, base string, cursor int64) *websocket.Conn {
298 t.Helper()
299 u, err := url.Parse(base)
300 if err != nil {
301 t.Fatalf("parse url: %v", err)
302 }
303 u.Scheme = "ws"
304 if cursor != 0 {
305 q := u.Query()
306 q.Set("cursor", strconv.FormatInt(cursor, 10))
307 u.RawQuery = q.Encode()
308 }
309 conn, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{})
310 if err != nil {
311 t.Fatalf("dial %s: %v", u, err)
312 }
313 return conn
314}
315
316// readEnvelope reads one TextMessage frame, decodes it as the spindle
317// wire envelope, and returns it. It enforces a read deadline so a
318// handler bug that fails to flush events doesn't hang the test forever.
319func readEnvelope(t *testing.T, conn *websocket.Conn) eventsEnvelope {
320 t.Helper()
321 conn.SetReadDeadline(time.Now().Add(2 * time.Second))
322 mt, msg, err := conn.ReadMessage()
323 if err != nil {
324 t.Fatalf("read: %v", err)
325 }
326 if mt != websocket.TextMessage {
327 t.Fatalf("frame type = %d, want TextMessage", mt)
328 }
329 var env eventsEnvelope
330 if err := json.Unmarshal(msg, &env); err != nil {
331 t.Fatalf("decode envelope: %v (raw: %s)", err, msg)
332 }
333 return env
334}
335