Stitch any CI into Tangled
1package main
2
3// Tests for handleJetstreamEvent. We exercise the full path — collection
4// dispatch, record decoding, store mutation, and cursor advancement —
5// using an in-memory store from store_test.go's newTestStore helper.
6//
7// Events are constructed by hand rather than recorded from a live
8// jetstream so the tests don't need network access and stay fast.
9
10import (
11 "context"
12 "encoding/json"
13 "path/filepath"
14 "testing"
15
16 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18)
19
20// commitEvent builds a jetstream commit event for tests. timeUS is the
21// cursor value the handler should persist after applying. record can be
22// nil for delete operations, which carry no body.
23func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event {
24 var raw json.RawMessage
25 if record != nil {
26 b, err := json.Marshal(record)
27 if err != nil {
28 panic(err) // test helper — callers control the input
29 }
30 raw = b
31 }
32 return &jsmodels.Event{
33 Did: did,
34 TimeUS: timeUS,
35 Kind: jsmodels.EventKindCommit,
36 Commit: &jsmodels.Commit{
37 Operation: op,
38 Collection: collection,
39 RKey: rkey,
40 Record: raw,
41 },
42 }
43}
44
45// requireCursor asserts the persisted cursor equals want. Pulled out
46// because almost every test in this file checks it.
47func requireCursor(t *testing.T, s *store, want int64) {
48 t.Helper()
49 got, err := s.LoadCursor(context.Background())
50 if err != nil {
51 t.Fatalf("load cursor: %v", err)
52 }
53 if got == nil || *got != want {
54 t.Fatalf("cursor = %v, want %d", got, want)
55 }
56}
57
58// TestHandleNonCommitEvent confirms account/identity events are ignored
59// without error and without advancing the cursor — they don't have a
60// TimeUS we want to commit to.
61func TestHandleNonCommitEvent(t *testing.T) {
62 s := newTestStore(t)
63 ctx := context.Background()
64
65 evt := &jsmodels.Event{
66 Did: "did:plc:foo",
67 TimeUS: 100,
68 Kind: jsmodels.EventKindAccount,
69 }
70 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
71 t.Fatalf("handle: %v", err)
72 }
73 got, err := s.LoadCursor(ctx)
74 if err != nil {
75 t.Fatalf("load cursor: %v", err)
76 }
77 if got != nil {
78 t.Fatalf("expected no cursor for non-commit, got %d", *got)
79 }
80}
81
82// TestHandleSpindleMemberCreate exercises the happy path: a create
83// commit lands a row in spindle_members and advances the cursor.
84func TestHandleSpindleMemberCreate(t *testing.T) {
85 s := newTestStore(t)
86 ctx := context.Background()
87
88 rec := tangled.SpindleMember{
89 Instance: "https://spindle.example",
90 Subject: "did:plc:alice",
91 CreatedAt: "2026-01-01T00:00:00Z",
92 }
93 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec)
94 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
95 t.Fatalf("handle: %v", err)
96 }
97
98 var instance, subject string
99 err := s.db.QueryRowContext(ctx,
100 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`,
101 "did:plc:owner", "rk1",
102 ).Scan(&instance, &subject)
103 if err != nil {
104 t.Fatalf("query: %v", err)
105 }
106 if instance != "https://spindle.example" || subject != "did:plc:alice" {
107 t.Fatalf("got (%q,%q)", instance, subject)
108 }
109 requireCursor(t, s, 12345)
110}
111
112// TestHandleSpindleMemberDelete confirms a delete commit removes the
113// previously-persisted row.
114func TestHandleSpindleMemberDelete(t *testing.T) {
115 s := newTestStore(t)
116 ctx := context.Background()
117
118 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil {
119 t.Fatalf("seed: %v", err)
120 }
121 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil)
122 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
123 t.Fatalf("handle: %v", err)
124 }
125 if n := countRows(t, s, "spindle_members"); n != 0 {
126 t.Fatalf("after delete: %d rows, want 0", n)
127 }
128 requireCursor(t, s, 99)
129}
130
131// TestHandleRepoCreateOptionals verifies pointer-typed optional fields
132// on tangled.Repo are derefed correctly (and nils become empty strings)
133// when written to the store.
134func TestHandleRepoCreateOptionals(t *testing.T) {
135 s := newTestStore(t)
136 ctx := context.Background()
137
138 spindle := "https://spindle.example"
139 repoDid := "did:plc:repo"
140 rec := tangled.Repo{
141 Knot: "knot.example",
142 Name: "myrepo",
143 Spindle: &spindle,
144 RepoDid: &repoDid,
145 CreatedAt: "2026-01-01T00:00:00Z",
146 }
147 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec)
148 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
149 t.Fatalf("handle: %v", err)
150 }
151
152 var gotSpindle, gotRepoDid string
153 err := s.db.QueryRowContext(ctx,
154 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
155 "did:plc:owner", "repo1",
156 ).Scan(&gotSpindle, &gotRepoDid)
157 if err != nil {
158 t.Fatalf("query: %v", err)
159 }
160 if gotSpindle != spindle || gotRepoDid != repoDid {
161 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid)
162 }
163
164 // Now a record with both optionals nil — should land as empty
165 // strings, not crash on a nil dereference in deref().
166 rec2 := tangled.Repo{
167 Knot: "knot.example",
168 Name: "other",
169 CreatedAt: "2026-01-01T00:00:00Z",
170 }
171 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2)
172 if err := handleJetstreamEvent(ctx, s, nil, "", evt2); err != nil {
173 t.Fatalf("handle nil-optionals: %v", err)
174 }
175 err = s.db.QueryRowContext(ctx,
176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
177 "did:plc:owner", "repo2",
178 ).Scan(&gotSpindle, &gotRepoDid)
179 if err != nil {
180 t.Fatalf("query nil-optionals: %v", err)
181 }
182 if gotSpindle != "" || gotRepoDid != "" {
183 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid)
184 }
185 requireCursor(t, s, 8)
186}
187
188// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each
189// collection has at least one apply test.
190func TestHandleRepoCollaboratorCreate(t *testing.T) {
191 s := newTestStore(t)
192 ctx := context.Background()
193
194 repo := "myrepo"
195 rec := tangled.RepoCollaborator{
196 Repo: &repo,
197 Subject: "did:plc:carol",
198 CreatedAt: "2026-01-01T00:00:00Z",
199 }
200 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec)
201 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
202 t.Fatalf("handle: %v", err)
203 }
204
205 var subject string
206 err := s.db.QueryRowContext(ctx,
207 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`,
208 "did:plc:owner", "c1",
209 ).Scan(&subject)
210 if err != nil {
211 t.Fatalf("query: %v", err)
212 }
213 if subject != "did:plc:carol" {
214 t.Fatalf("subject = %q", subject)
215 }
216 requireCursor(t, s, 55)
217}
218
219// TestHandleUnknownCollection makes sure a collection we didn't ask for
220// (jetstream filter changes, schema drift) is silently dropped — no
221// error, no row, but cursor still advances so we don't replay it.
222func TestHandleUnknownCollection(t *testing.T) {
223 s := newTestStore(t)
224 ctx := context.Background()
225
226 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"})
227 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
228 t.Fatalf("handle: %v", err)
229 }
230 requireCursor(t, s, 42)
231}
232
233// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to
234// the happy paths: a malformed record body must be logged-and-skipped
235// (not returned as an error that pauses the scheduler) and the cursor
236// must still advance so we don't loop on the same bad event forever.
237func TestHandleBadRecordAdvancesCursor(t *testing.T) {
238 s := newTestStore(t)
239 ctx := context.Background()
240
241 evt := &jsmodels.Event{
242 Did: "did:plc:owner",
243 TimeUS: 1000,
244 Kind: jsmodels.EventKindCommit,
245 Commit: &jsmodels.Commit{
246 Operation: jsOpCreate,
247 Collection: tangled.SpindleMemberNSID,
248 RKey: "broken",
249 Record: json.RawMessage(`{not valid json`),
250 },
251 }
252 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
253 t.Fatalf("handle should swallow decode error, got: %v", err)
254 }
255 if n := countRows(t, s, "spindle_members"); n != 0 {
256 t.Fatalf("bad record should not have inserted; got %d rows", n)
257 }
258 requireCursor(t, s, 1000)
259}
260
261// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to
262// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous
263// position when applyCommit fails for an *infrastructure* reason
264// (here: store closed mid-flight, simulating SQLite busy / shutdown
265// races). Saving the cursor through such a failure would permanently
266// skip a perfectly good record and silently lose membership state.
267func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) {
268 // Build the store inline (not via newTestStore) so the cleanup
269 // tolerates the explicit Close we do below to provoke errors.
270 path := filepath.Join(t.TempDir(), "tack.db")
271 s, err := openStore(path)
272 if err != nil {
273 t.Fatalf("openStore: %v", err)
274 }
275 ctx := context.Background()
276
277 // Seed a baseline cursor; after the failed apply it must still be
278 // 500, not 1000.
279 if err := s.SaveCursor(ctx, 500); err != nil {
280 t.Fatalf("seed cursor: %v", err)
281 }
282
283 // Close the underlying DB. Subsequent store calls will fail with
284 // "sql: database is closed", which is a transient-style error from
285 // applyCommit's perspective: the record itself is well-formed.
286 if err := s.db.Close(); err != nil {
287 t.Fatalf("close db: %v", err)
288 }
289
290 rec := tangled.SpindleMember{
291 Instance: "https://spindle.example",
292 Subject: "did:plc:alice",
293 CreatedAt: "2026-01-01T00:00:00Z",
294 }
295 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec)
296
297 // Expect a non-nil error: handler propagates transient failures.
298 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err == nil {
299 t.Fatalf("handle should return transient error, got nil")
300 }
301
302 // Re-open the same DB file to inspect the persisted cursor; the
303 // closed handle obviously can't answer queries.
304 s2, err := openStore(path)
305 if err != nil {
306 t.Fatalf("re-open: %v", err)
307 }
308 defer s2.Close()
309 got, err := s2.LoadCursor(ctx)
310 if err != nil {
311 t.Fatalf("load cursor: %v", err)
312 }
313 if got == nil || *got != 500 {
314 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got)
315 }
316}
317
318// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a
319// sh.tangled.repo whose .spindle field equals our hostname results in a
320// dynamic AddKnot call. This is the hot path for picking up new repos
321// without a tack restart.
322func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) {
323 s := newTestStore(t)
324 ctx := context.Background()
325
326 const ours = "tack.example"
327 spindle := ours
328 rec := tangled.Repo{
329 Knot: "knot.example",
330 Name: "myrepo",
331 Spindle: &spindle,
332 CreatedAt: "2026-01-01T00:00:00Z",
333 }
334 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
335
336 fake := &fakeKnotConsumer{}
337 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
338 t.Fatalf("handle: %v", err)
339 }
340 added := fake.Added()
341 if len(added) != 1 || added[0] != "knot.example" {
342 t.Fatalf("AddKnot calls = %v, want [knot.example]", added)
343 }
344}
345
346// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a
347// *different* spindle do not pull us into watching their knot. Without
348// this guard, tack would dial every knot named in any sh.tangled.repo
349// it sees over the firehose, which is most of them.
350func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) {
351 s := newTestStore(t)
352 ctx := context.Background()
353
354 other := "other-spindle.example"
355 rec := tangled.Repo{
356 Knot: "knot.example",
357 Name: "myrepo",
358 Spindle: &other,
359 CreatedAt: "2026-01-01T00:00:00Z",
360 }
361 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
362
363 fake := &fakeKnotConsumer{}
364 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", evt); err != nil {
365 t.Fatalf("handle: %v", err)
366 }
367 if added := fake.Added(); len(added) != 0 {
368 t.Fatalf("AddKnot calls = %v, want none", added)
369 }
370}
371
372// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a
373// repo we'd previously been watching gets its .spindle field flipped to
374// some other spindle. Once that's the only repo we had on that knot,
375// the reconciliation must call RemoveKnot.
376func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) {
377 s := newTestStore(t)
378 ctx := context.Background()
379
380 const ours = "tack.example"
381 const knot = "knot.example"
382
383 // Seed: a repo that names us as its spindle on `knot`.
384 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil {
385 t.Fatal(err)
386 }
387
388 // Update: same record, now points at a different spindle.
389 other := "other.example"
390 rec := tangled.Repo{
391 Knot: knot,
392 Name: "repo-a",
393 Spindle: &other,
394 CreatedAt: "2026-01-01T00:00:00Z",
395 }
396 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
397
398 fake := &fakeKnotConsumer{}
399 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
400 t.Fatalf("handle: %v", err)
401 }
402 if added := fake.Added(); len(added) != 0 {
403 t.Fatalf("AddKnot calls = %v, want none", added)
404 }
405 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
406 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
407 }
408}
409
410// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't
411// over-eagerly unsubscribe from a knot when one of multiple repos on
412// that knot moves away from us. The other repo's subscription must keep
413// the knot in our wanted set.
414func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) {
415 s := newTestStore(t)
416 ctx := context.Background()
417
418 const ours = "tack.example"
419 const knot = "knot.example"
420
421 // Two repos sharing one knot, both pointed at us.
422 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
423 t.Fatal(err)
424 }
425 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
426 t.Fatal(err)
427 }
428
429 // Repo A flips to a different spindle. B still wants us.
430 other := "other.example"
431 rec := tangled.Repo{
432 Knot: knot,
433 Name: "a",
434 Spindle: &other,
435 CreatedAt: "2026-01-01T00:00:00Z",
436 }
437 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec)
438
439 fake := &fakeKnotConsumer{}
440 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
441 t.Fatalf("handle: %v", err)
442 }
443 if removed := fake.Removed(); len(removed) != 0 {
444 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot)
445 }
446}
447
448// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo
449// staying with us but changing its .knot field unsubscribes the old
450// knot (if no other repo holds it) and subscribes the new one.
451func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) {
452 s := newTestStore(t)
453 ctx := context.Background()
454
455 const ours = "tack.example"
456 const oldKnot = "old.example"
457 const newKnot = "new.example"
458
459 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil {
460 t.Fatal(err)
461 }
462
463 spindle := ours
464 rec := tangled.Repo{
465 Knot: newKnot,
466 Name: "a",
467 Spindle: &spindle,
468 CreatedAt: "2026-01-01T00:00:00Z",
469 }
470 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
471
472 fake := &fakeKnotConsumer{}
473 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
474 t.Fatalf("handle: %v", err)
475 }
476 if added := fake.Added(); len(added) != 1 || added[0] != newKnot {
477 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot)
478 }
479 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot {
480 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot)
481 }
482}
483
484// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on
485// a knot we cared about triggers RemoveKnot.
486func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) {
487 s := newTestStore(t)
488 ctx := context.Background()
489
490 const ours = "tack.example"
491 const knot = "knot.example"
492
493 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil {
494 t.Fatal(err)
495 }
496 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil)
497
498 fake := &fakeKnotConsumer{}
499 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
500 t.Fatalf("handle: %v", err)
501 }
502 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
503 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
504 }
505}
506
507// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple
508// repos on a knot does not unsubscribe — the survivors still want it.
509func TestRepoDeleteKeepsKnotIfShared(t *testing.T) {
510 s := newTestStore(t)
511 ctx := context.Background()
512
513 const ours = "tack.example"
514 const knot = "knot.example"
515
516 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
517 t.Fatal(err)
518 }
519 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
520 t.Fatal(err)
521 }
522 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil)
523
524 fake := &fakeKnotConsumer{}
525 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
526 t.Fatalf("handle: %v", err)
527 }
528 if removed := fake.Removed(); len(removed) != 0 {
529 t.Fatalf("RemoveKnot calls = %v, want none", removed)
530 }
531}