Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Tests for the /events fan-out: the store's event log methods, the 4// in-process broker, and the eventsHandler's wire output. The handler 5// test boots a real httptest server and a real gorilla websocket client 6// so we exercise the upgrade + envelope codec end to end. 7 8import ( 9 "context" 10 "encoding/json" 11 "io" 12 "log/slog" 13 "net/http" 14 "net/http/httptest" 15 "net/url" 16 "strconv" 17 "strings" 18 "testing" 19 "time" 20 21 "github.com/gorilla/websocket" 22) 23 24// TestEventsLogRoundtrip covers InsertEvent / EventsAfter together 25// because they're a tightly-coupled pair: the cursor returned by Insert 26// is the same value EventsAfter must accept to skip past that row. 27func TestEventsLogRoundtrip(t *testing.T) { 28 s := newTestStore(t) 29 ctx := context.Background() 30 31 // Empty log → empty slice (not nil), so callers can range freely. 32 got, err := s.EventsAfter(ctx, 0) 33 if err != nil { 34 t.Fatalf("EventsAfter empty: %v", err) 35 } 36 if got == nil || len(got) != 0 { 37 t.Fatalf("empty log: got %v, want empty non-nil slice", got) 38 } 39 40 c1, err := s.InsertEvent(ctx, "rk1", "sh.tangled.pipeline.status", []byte(`{"a":1}`)) 41 if err != nil { 42 t.Fatalf("insert 1: %v", err) 43 } 44 c2, err := s.InsertEvent(ctx, "rk2", "sh.tangled.pipeline.status", []byte(`{"a":2}`)) 45 if err != nil { 46 t.Fatalf("insert 2: %v", err) 47 } 48 if c2 <= c1 { 49 t.Fatalf("cursors must be monotonically increasing: c1=%d c2=%d", c1, c2) 50 } 51 52 // cursor=0 returns everything; cursor=c1 skips the first row. 53 got, err = s.EventsAfter(ctx, 0) 54 if err != nil { 55 t.Fatalf("EventsAfter 0: %v", err) 56 } 57 if len(got) != 2 || got[0].Created != c1 || got[1].Created != c2 { 58 t.Fatalf("EventsAfter(0) = %+v, want both rows in order", got) 59 } 60 if got[0].Rkey != "rk1" || got[1].Rkey != "rk2" { 61 t.Fatalf("rkey order wrong: %q %q", got[0].Rkey, got[1].Rkey) 62 } 63 // json.RawMessage round-trip — matters because the /events handler 64 // splices these straight into the envelope. 65 if string(got[0].EventJSON) != `{"a":1}` { 66 t.Fatalf("event_json round-trip = %q", got[0].EventJSON) 67 } 68 69 got, err = s.EventsAfter(ctx, c1) 70 if err != nil { 71 t.Fatalf("EventsAfter c1: %v", err) 72 } 73 if len(got) != 1 || got[0].Created != c2 { 74 t.Fatalf("EventsAfter(c1) = %+v, want only row c2", got) 75 } 76} 77 78// TestBrokerPublishWakesSubscribers asserts the core invariant: a 79// Publish causes Subscribe()'d channels to fire (at least once) and the 80// row is durably visible via EventsAfter so the subscriber can drain 81// it. Two subscribers cover the multi-fanout case. 82func TestBrokerPublishWakesSubscribers(t *testing.T) { 83 s := newTestStore(t) 84 ctx := context.Background() 85 br := newBroker(s) 86 87 a := br.Subscribe() 88 b := br.Subscribe() 89 defer br.Unsubscribe(a) 90 defer br.Unsubscribe(b) 91 92 cursor, err := br.Publish(ctx, "rk", "sh.tangled.pipeline.status", []byte(`{}`)) 93 if err != nil { 94 t.Fatalf("publish: %v", err) 95 } 96 if cursor <= 0 { 97 t.Fatalf("publish cursor = %d, want > 0", cursor) 98 } 99 100 // Both subscribers must receive the wake-up promptly. Use a 101 // generous timeout so flaky CI doesn't false-alarm. 102 for name, ch := range map[string]chan struct{}{"a": a, "b": b} { 103 select { 104 case <-ch: 105 case <-time.After(time.Second): 106 t.Fatalf("subscriber %s did not receive signal", name) 107 } 108 } 109 110 rows, err := s.EventsAfter(ctx, 0) 111 if err != nil { 112 t.Fatalf("EventsAfter: %v", err) 113 } 114 if len(rows) != 1 || rows[0].Created != cursor { 115 t.Fatalf("after publish, EventsAfter(0) = %+v, want one row with cursor=%d", rows, cursor) 116 } 117} 118 119// TestBrokerCoalescesPendingSignal ensures Publish never blocks on a 120// subscriber that hasn't drained its channel: a second Publish while 121// the first signal is still pending must coalesce, not deadlock. This 122// is the property that lets slow clients lag without backpressuring 123// the rest of the system. 124func TestBrokerCoalescesPendingSignal(t *testing.T) { 125 s := newTestStore(t) 126 ctx := context.Background() 127 br := newBroker(s) 128 129 ch := br.Subscribe() 130 defer br.Unsubscribe(ch) 131 132 // First publish lands a pending signal in ch (cap=1). 133 if _, err := br.Publish(ctx, "rk1", "n", []byte(`{}`)); err != nil { 134 t.Fatalf("publish 1: %v", err) 135 } 136 // Second publish must succeed immediately — without a default 137 // branch in notify(), this would block forever waiting on the 138 // unread ch. 139 done := make(chan error, 1) 140 go func() { 141 _, err := br.Publish(ctx, "rk2", "n", []byte(`{}`)) 142 done <- err 143 }() 144 select { 145 case err := <-done: 146 if err != nil { 147 t.Fatalf("publish 2: %v", err) 148 } 149 case <-time.After(time.Second): 150 t.Fatal("Publish blocked on un-drained subscriber") 151 } 152 153 // One drain is enough — cursor-based catch-up will pick up *both* 154 // rows in a single EventsAfter call. 155 <-ch 156 rows, err := s.EventsAfter(ctx, 0) 157 if err != nil { 158 t.Fatalf("EventsAfter: %v", err) 159 } 160 if len(rows) != 2 { 161 t.Fatalf("expected 2 rows after 2 publishes, got %d", len(rows)) 162 } 163} 164 165// TestBrokerUnsubscribeStopsDelivery confirms an unsubscribed channel 166// no longer receives wake-ups. Without this we'd leak signals to dead 167// websockets and (worse) hold their channels in the broker map. 168func TestBrokerUnsubscribeStopsDelivery(t *testing.T) { 169 s := newTestStore(t) 170 ctx := context.Background() 171 br := newBroker(s) 172 173 ch := br.Subscribe() 174 br.Unsubscribe(ch) 175 176 if _, err := br.Publish(ctx, "rk", "n", []byte(`{}`)); err != nil { 177 t.Fatalf("publish: %v", err) 178 } 179 select { 180 case <-ch: 181 t.Fatal("unsubscribed channel still received signal") 182 case <-time.After(50 * time.Millisecond): 183 } 184} 185 186// TestEventsHandlerStreamsLiveAndBackfill exercises the full HTTP 187// surface: open a websocket, observe a backfill of pre-existing rows, 188// publish a new row, observe it arrive live, then reconnect with a 189// cursor and observe only events strictly after that cursor. 190func TestEventsHandlerStreamsLiveAndBackfill(t *testing.T) { 191 s := newTestStore(t) 192 br := newBroker(s) 193 ctx := context.Background() 194 195 // Seed two rows so the first connection has something to backfill. 196 pre1, err := br.Publish(ctx, "rk-pre1", "sh.tangled.pipeline.status", []byte(`{"i":1}`)) 197 if err != nil { 198 t.Fatalf("seed 1: %v", err) 199 } 200 pre2, err := br.Publish(ctx, "rk-pre2", "sh.tangled.pipeline.status", []byte(`{"i":2}`)) 201 if err != nil { 202 t.Fatalf("seed 2: %v", err) 203 } 204 205 // Boot the handler behind an httptest server. Using a discarding 206 // logger keeps test output quiet. 207 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 208 srv := httptest.NewServer(eventsHandler(logger, br)) 209 t.Cleanup(srv.Close) 210 211 // First connection: no cursor, expect both seeded rows plus a 212 // freshly published live row. 213 c1 := dialEvents(t, srv.URL, 0) 214 defer c1.Close() 215 216 got1 := readEnvelope(t, c1) 217 if got1.Created != pre1 || got1.Rkey != "rk-pre1" { 218 t.Fatalf("first frame = %+v, want pre1 (cursor=%d)", got1, pre1) 219 } 220 got2 := readEnvelope(t, c1) 221 if got2.Created != pre2 || got2.Rkey != "rk-pre2" { 222 t.Fatalf("second frame = %+v, want pre2 (cursor=%d)", got2, pre2) 223 } 224 // Verify the wire envelope's `event` field round-trips as the raw 225 // record body, not a re-encoded blob. 226 if strings.TrimSpace(string(got2.Event)) != `{"i":2}` { 227 t.Fatalf("event body = %q, want %q", got2.Event, `{"i":2}`) 228 } 229 230 // Live publish — handler should wake on broker signal and emit it. 231 live, err := br.Publish(ctx, "rk-live", "sh.tangled.pipeline.status", []byte(`{"i":3}`)) 232 if err != nil { 233 t.Fatalf("live publish: %v", err) 234 } 235 got3 := readEnvelope(t, c1) 236 if got3.Created != live || got3.Rkey != "rk-live" { 237 t.Fatalf("live frame = %+v, want rk-live (cursor=%d)", got3, live) 238 } 239 240 // Second connection with cursor=pre2: must skip pre1 and pre2, 241 // receive only the live row. No timeout is set; if the handler 242 // over-delivers we'll fail in readEnvelope's assert below. 243 c2 := dialEvents(t, srv.URL, pre2) 244 defer c2.Close() 245 got := readEnvelope(t, c2) 246 if got.Created != live || got.Rkey != "rk-live" { 247 t.Fatalf("cursor-resume frame = %+v, want rk-live (cursor=%d)", got, live) 248 } 249} 250 251// TestEventsHandlerBadCursorStartsFromZero confirms a malformed cursor 252// query parameter doesn't 4xx the upgrade — the handler logs and falls 253// back to the full backfill, matching the upstream spindle's behaviour. 254func TestEventsHandlerBadCursorStartsFromZero(t *testing.T) { 255 s := newTestStore(t) 256 br := newBroker(s) 257 ctx := context.Background() 258 259 if _, err := br.Publish(ctx, "rk", "sh.tangled.pipeline.status", []byte(`{}`)); err != nil { 260 t.Fatalf("publish: %v", err) 261 } 262 263 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 264 srv := httptest.NewServer(eventsHandler(logger, br)) 265 t.Cleanup(srv.Close) 266 267 // Build the URL by hand so we can inject a non-numeric cursor. 268 u, _ := url.Parse(srv.URL) 269 u.Scheme = "ws" 270 q := u.Query() 271 q.Set("cursor", "not-a-number") 272 u.RawQuery = q.Encode() 273 274 conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) 275 if err != nil { 276 t.Fatalf("dial: %v", err) 277 } 278 defer conn.Close() 279 280 conn.SetReadDeadline(time.Now().Add(2 * time.Second)) 281 _, msg, err := conn.ReadMessage() 282 if err != nil { 283 t.Fatalf("read: %v", err) 284 } 285 var env eventsEnvelope 286 if err := json.Unmarshal(msg, &env); err != nil { 287 t.Fatalf("unmarshal: %v", err) 288 } 289 if env.Rkey != "rk" { 290 t.Fatalf("expected backfill of seeded row, got envelope %+v", env) 291 } 292} 293 294// dialEvents opens a websocket against an httptest server (which 295// returns http://) by rewriting the scheme to ws://. cursor=0 omits the 296// query parameter entirely so we exercise the "no cursor" code path. 297func dialEvents(t *testing.T, base string, cursor int64) *websocket.Conn { 298 t.Helper() 299 u, err := url.Parse(base) 300 if err != nil { 301 t.Fatalf("parse url: %v", err) 302 } 303 u.Scheme = "ws" 304 if cursor != 0 { 305 q := u.Query() 306 q.Set("cursor", strconv.FormatInt(cursor, 10)) 307 u.RawQuery = q.Encode() 308 } 309 conn, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{}) 310 if err != nil { 311 t.Fatalf("dial %s: %v", u, err) 312 } 313 return conn 314} 315 316// readEnvelope reads one TextMessage frame, decodes it as the spindle 317// wire envelope, and returns it. It enforces a read deadline so a 318// handler bug that fails to flush events doesn't hang the test forever. 319func readEnvelope(t *testing.T, conn *websocket.Conn) eventsEnvelope { 320 t.Helper() 321 conn.SetReadDeadline(time.Now().Add(2 * time.Second)) 322 mt, msg, err := conn.ReadMessage() 323 if err != nil { 324 t.Fatalf("read: %v", err) 325 } 326 if mt != websocket.TextMessage { 327 t.Fatalf("frame type = %d, want TextMessage", mt) 328 } 329 var env eventsEnvelope 330 if err := json.Unmarshal(msg, &env); err != nil { 331 t.Fatalf("decode envelope: %v (raw: %s)", err, msg) 332 } 333 return env 334} 335