Stitch any CI into Tangled
1package main
2
3// Tests for handleJetstreamEvent. We exercise the full path — collection
4// dispatch, record decoding, store mutation, and cursor advancement —
5// using an in-memory store from store_test.go's newTestStore helper.
6//
7// Events are constructed by hand rather than recorded from a live
8// jetstream so the tests don't need network access and stay fast.
9
10import (
11 "context"
12 "encoding/json"
13 "path/filepath"
14 "testing"
15
16 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18)
19
20// commitEvent builds a jetstream commit event for tests. timeUS is the
21// cursor value the handler should persist after applying. record can be
22// nil for delete operations, which carry no body.
23func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event {
24 var raw json.RawMessage
25 if record != nil {
26 b, err := json.Marshal(record)
27 if err != nil {
28 panic(err) // test helper — callers control the input
29 }
30 raw = b
31 }
32 return &jsmodels.Event{
33 Did: did,
34 TimeUS: timeUS,
35 Kind: jsmodels.EventKindCommit,
36 Commit: &jsmodels.Commit{
37 Operation: op,
38 Collection: collection,
39 RKey: rkey,
40 Record: raw,
41 },
42 }
43}
44
45// requireCursor asserts the persisted cursor equals want. Pulled out
46// because almost every test in this file checks it.
47func requireCursor(t *testing.T, s *store, want int64) {
48 t.Helper()
49 got, err := s.LoadCursor(context.Background())
50 if err != nil {
51 t.Fatalf("load cursor: %v", err)
52 }
53 if got == nil || *got != want {
54 t.Fatalf("cursor = %v, want %d", got, want)
55 }
56}
57
58// TestHandleNonCommitEvent confirms account/identity events are ignored
59// without error and without advancing the cursor — they don't have a
60// TimeUS we want to commit to.
61func TestHandleNonCommitEvent(t *testing.T) {
62 s := newTestStore(t)
63 ctx := context.Background()
64
65 evt := &jsmodels.Event{
66 Did: "did:plc:foo",
67 TimeUS: 100,
68 Kind: jsmodels.EventKindAccount,
69 }
70 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
71 t.Fatalf("handle: %v", err)
72 }
73 got, err := s.LoadCursor(ctx)
74 if err != nil {
75 t.Fatalf("load cursor: %v", err)
76 }
77 if got != nil {
78 t.Fatalf("expected no cursor for non-commit, got %d", *got)
79 }
80}
81
82// TestHandleSpindleMemberCreate exercises the happy path: a create
83// commit lands a row in spindle_members and advances the cursor.
84func TestHandleSpindleMemberCreate(t *testing.T) {
85 s := newTestStore(t)
86 ctx := context.Background()
87
88 rec := tangled.SpindleMember{
89 Instance: "https://spindle.example",
90 Subject: "did:plc:alice",
91 CreatedAt: "2026-01-01T00:00:00Z",
92 }
93 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec)
94 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
95 t.Fatalf("handle: %v", err)
96 }
97
98 var instance, subject string
99 err := s.db.QueryRowContext(ctx,
100 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`,
101 "did:plc:owner", "rk1",
102 ).Scan(&instance, &subject)
103 if err != nil {
104 t.Fatalf("query: %v", err)
105 }
106 if instance != "https://spindle.example" || subject != "did:plc:alice" {
107 t.Fatalf("got (%q,%q)", instance, subject)
108 }
109 requireCursor(t, s, 12345)
110}
111
112// TestHandleSpindleMemberDelete confirms a delete commit removes the
113// previously-persisted row.
114func TestHandleSpindleMemberDelete(t *testing.T) {
115 s := newTestStore(t)
116 ctx := context.Background()
117
118 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil {
119 t.Fatalf("seed: %v", err)
120 }
121 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil)
122 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
123 t.Fatalf("handle: %v", err)
124 }
125 if n := countRows(t, s, "spindle_members"); n != 0 {
126 t.Fatalf("after delete: %d rows, want 0", n)
127 }
128 requireCursor(t, s, 99)
129}
130
131// TestHandleRepoCreateOptionals verifies pointer-typed optional fields
132// on tangled.Repo are derefed correctly (and nils become empty strings)
133// when written to the store.
134func TestHandleRepoCreateOptionals(t *testing.T) {
135 s := newTestStore(t)
136 ctx := context.Background()
137
138 spindle := "https://spindle.example"
139 repoDid := "did:plc:repo"
140 rec := tangled.Repo{
141 Knot: "knot.example",
142 Name: "myrepo",
143 Spindle: &spindle,
144 RepoDid: &repoDid,
145 CreatedAt: "2026-01-01T00:00:00Z",
146 }
147 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec)
148 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
149 t.Fatalf("handle: %v", err)
150 }
151
152 var gotSpindle, gotRepoDid string
153 err := s.db.QueryRowContext(ctx,
154 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
155 "did:plc:owner", "repo1",
156 ).Scan(&gotSpindle, &gotRepoDid)
157 if err != nil {
158 t.Fatalf("query: %v", err)
159 }
160 if gotSpindle != spindle || gotRepoDid != repoDid {
161 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid)
162 }
163
164 // Now a record with both optionals nil — should land as empty
165 // strings, not crash on a nil dereference in deref().
166 rec2 := tangled.Repo{
167 Knot: "knot.example",
168 Name: "other",
169 CreatedAt: "2026-01-01T00:00:00Z",
170 }
171 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2)
172 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt2); err != nil {
173 t.Fatalf("handle nil-optionals: %v", err)
174 }
175 err = s.db.QueryRowContext(ctx,
176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
177 "did:plc:owner", "repo2",
178 ).Scan(&gotSpindle, &gotRepoDid)
179 if err != nil {
180 t.Fatalf("query nil-optionals: %v", err)
181 }
182 if gotSpindle != "" || gotRepoDid != "" {
183 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid)
184 }
185 requireCursor(t, s, 8)
186}
187
188// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each
189// collection has at least one apply test.
190func TestHandleRepoCollaboratorCreate(t *testing.T) {
191 s := newTestStore(t)
192 ctx := context.Background()
193
194 repo := "myrepo"
195 rec := tangled.RepoCollaborator{
196 Repo: &repo,
197 Subject: "did:plc:carol",
198 CreatedAt: "2026-01-01T00:00:00Z",
199 }
200 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec)
201 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
202 t.Fatalf("handle: %v", err)
203 }
204
205 var subject string
206 err := s.db.QueryRowContext(ctx,
207 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`,
208 "did:plc:owner", "c1",
209 ).Scan(&subject)
210 if err != nil {
211 t.Fatalf("query: %v", err)
212 }
213 if subject != "did:plc:carol" {
214 t.Fatalf("subject = %q", subject)
215 }
216 requireCursor(t, s, 55)
217}
218
219// TestHandleUnknownCollection makes sure a collection we didn't ask for
220// (jetstream filter changes, schema drift) is silently dropped — no
221// error, no row, but cursor still advances so we don't replay it.
222func TestHandleUnknownCollection(t *testing.T) {
223 s := newTestStore(t)
224 ctx := context.Background()
225
226 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"})
227 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
228 t.Fatalf("handle: %v", err)
229 }
230 requireCursor(t, s, 42)
231}
232
233// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to
234// the happy paths: a malformed record body must be logged-and-skipped
235// (not returned as an error that pauses the scheduler) and the cursor
236// must still advance so we don't loop on the same bad event forever.
237func TestHandleBadRecordAdvancesCursor(t *testing.T) {
238 s := newTestStore(t)
239 ctx := context.Background()
240
241 evt := &jsmodels.Event{
242 Did: "did:plc:owner",
243 TimeUS: 1000,
244 Kind: jsmodels.EventKindCommit,
245 Commit: &jsmodels.Commit{
246 Operation: jsOpCreate,
247 Collection: tangled.SpindleMemberNSID,
248 RKey: "broken",
249 Record: json.RawMessage(`{not valid json`),
250 },
251 }
252 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
253 t.Fatalf("handle should swallow decode error, got: %v", err)
254 }
255 if n := countRows(t, s, "spindle_members"); n != 0 {
256 t.Fatalf("bad record should not have inserted; got %d rows", n)
257 }
258 requireCursor(t, s, 1000)
259}
260
261// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to
262// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous
263// position when applyCommit fails for an *infrastructure* reason
264// (here: store closed mid-flight, simulating SQLite busy / shutdown
265// races). Saving the cursor through such a failure would permanently
266// skip a perfectly good record and silently lose membership state.
267func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) {
268 // Build the store inline (not via newTestStore) so the cleanup
269 // tolerates the explicit Close we do below to provoke errors.
270 path := filepath.Join(t.TempDir(), "tack.db")
271 s, err := openStore(path)
272 if err != nil {
273 t.Fatalf("openStore: %v", err)
274 }
275 ctx := context.Background()
276
277 // Seed a baseline cursor; after the failed apply it must still be
278 // 500, not 1000.
279 if err := s.SaveCursor(ctx, 500); err != nil {
280 t.Fatalf("seed cursor: %v", err)
281 }
282
283 // Close the underlying DB. Subsequent store calls will fail with
284 // "sql: database is closed", which is a transient-style error from
285 // applyCommit's perspective: the record itself is well-formed.
286 if err := s.db.Close(); err != nil {
287 t.Fatalf("close db: %v", err)
288 }
289
290 rec := tangled.SpindleMember{
291 Instance: "https://spindle.example",
292 Subject: "did:plc:alice",
293 CreatedAt: "2026-01-01T00:00:00Z",
294 }
295 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec)
296
297 // Expect a non-nil error: handler propagates transient failures.
298 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err == nil {
299 t.Fatalf("handle should return transient error, got nil")
300 }
301
302 // Re-open the same DB file to inspect the persisted cursor; the
303 // closed handle obviously can't answer queries.
304 s2, err := openStore(path)
305 if err != nil {
306 t.Fatalf("re-open: %v", err)
307 }
308 defer s2.Close()
309 got, err := s2.LoadCursor(ctx)
310 if err != nil {
311 t.Fatalf("load cursor: %v", err)
312 }
313 if got == nil || *got != 500 {
314 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got)
315 }
316}
317
318// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a
319// sh.tangled.repo whose .spindle field equals our hostname results in a
320// dynamic AddKnot call. This is the hot path for picking up new repos
321// without a tack restart.
322func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) {
323 s := newTestStore(t)
324 ctx := context.Background()
325
326 const ours = "tack.example"
327 spindle := ours
328 rec := tangled.Repo{
329 Knot: "knot.example",
330 Name: "myrepo",
331 Spindle: &spindle,
332 CreatedAt: "2026-01-01T00:00:00Z",
333 }
334 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
335
336 fake := &fakeKnotConsumer{}
337 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
338 t.Fatalf("handle: %v", err)
339 }
340 added := fake.Added()
341 if len(added) != 1 || added[0] != "knot.example" {
342 t.Fatalf("AddKnot calls = %v, want [knot.example]", added)
343 }
344}
345
346// TestRepoEventIgnoresUnauthorizedPublisher pins the high-severity
347// security gate: a sh.tangled.repo record naming us as its spindle
348// must NOT cause an outbound knot subscription unless its publisher
349// is the spindle owner or an owner-vouched member. Without this gate
350// any firehose publisher could force tack to dial an attacker-chosen
351// websocket simply by minting a matching repo record.
352func TestRepoEventIgnoresUnauthorizedPublisher(t *testing.T) {
353 s := newTestStore(t)
354 ctx := context.Background()
355
356 const ours = "tack.example"
357 const owner = "did:plc:owner"
358
359 // did:plc:rando is neither the owner nor a member.
360 spindle := ours
361 rec := tangled.Repo{
362 Knot: "evil.example",
363 Name: "myrepo",
364 Spindle: &spindle,
365 CreatedAt: "2026-01-01T00:00:00Z",
366 }
367 evt := commitEvent(1, "did:plc:rando", tangled.RepoNSID, jsOpCreate, "rk", rec)
368
369 fake := &fakeKnotConsumer{}
370 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil {
371 t.Fatalf("handle: %v", err)
372 }
373 if added := fake.Added(); len(added) != 0 {
374 t.Fatalf("AddKnot calls = %v, want none (publisher is unauthorized)", added)
375 }
376}
377
378// TestSpindleMemberGrantSubscribesPendingKnots covers the case where
379// a member publishes their sh.tangled.repo *before* the owner's
380// sh.tangled.spindle.member grant arrives over the firehose. Until
381// the grant lands the publisher is unauthorized, so the repo doesn't
382// pull a subscription. Once the grant arrives, the same-rkey reconcile
383// must catch up by AddKnot-ing every knot that member's pending repos
384// already named.
385func TestSpindleMemberGrantSubscribesPendingKnots(t *testing.T) {
386 s := newTestStore(t)
387 ctx := context.Background()
388
389 const ours = "tack.example"
390 const owner = "did:plc:owner"
391 const alice = "did:plc:alice"
392
393 // Step 1: alice publishes a repo claiming us. She isn't a member
394 // yet, so no subscription should happen.
395 spindle := ours
396 repoRec := tangled.Repo{
397 Knot: "knot.example",
398 Name: "myrepo",
399 Spindle: &spindle,
400 CreatedAt: "2026-01-01T00:00:00Z",
401 }
402 repoEvt := commitEvent(1, alice, tangled.RepoNSID, jsOpCreate, "rk", repoRec)
403
404 fake := &fakeKnotConsumer{}
405 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, repoEvt); err != nil {
406 t.Fatalf("handle repo: %v", err)
407 }
408 if added := fake.Added(); len(added) != 0 {
409 t.Fatalf("AddKnot before grant = %v, want none", added)
410 }
411
412 // Step 2: owner publishes a membership grant naming alice. The
413 // reconcile triggered by that grant must subscribe to alice's
414 // already-stored repo's knot.
415 grant := tangled.SpindleMember{
416 Instance: ours,
417 Subject: alice,
418 CreatedAt: "2026-01-02T00:00:00Z",
419 }
420 grantEvt := commitEvent(2, owner, tangled.SpindleMemberNSID, jsOpCreate, "mk", grant)
421 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, grantEvt); err != nil {
422 t.Fatalf("handle grant: %v", err)
423 }
424 if added := fake.Added(); len(added) != 1 || added[0] != "knot.example" {
425 t.Fatalf("AddKnot after grant = %v, want [knot.example]", added)
426 }
427}
428
429// TestSpindleMemberRevokeUnsubscribesKnot verifies that revoking a
430// previously-granted membership tears down the only-held-by-that-member
431// knot subscription. Without this, an attacker who briefly held
432// membership could leave us dialing their chosen knot until restart,
433// the exact lingering-subscription concern called out in KNOWN_ISSUES.
434func TestSpindleMemberRevokeUnsubscribesKnot(t *testing.T) {
435 s := newTestStore(t)
436 ctx := context.Background()
437
438 const ours = "tack.example"
439 const owner = "did:plc:owner"
440 const alice = "did:plc:alice"
441
442 // Seed: owner has previously vouched for alice, and alice has
443 // already published a repo on knot.example pointing at us.
444 if err := s.UpsertSpindleMember(ctx, owner, "mk", ours, alice, "t"); err != nil {
445 t.Fatal(err)
446 }
447 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil {
448 t.Fatal(err)
449 }
450
451 // Owner publishes a delete of the membership grant.
452 revoke := commitEvent(10, owner, tangled.SpindleMemberNSID, jsOpDelete, "mk", nil)
453
454 fake := &fakeKnotConsumer{}
455 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, revoke); err != nil {
456 t.Fatalf("handle revoke: %v", err)
457 }
458 if removed := fake.Removed(); len(removed) != 1 || removed[0] != "knot.example" {
459 t.Fatalf("RemoveKnot calls = %v, want [knot.example]", removed)
460 }
461}
462
463// TestSpindleMemberGrantFromNonOwnerIgnored confirms that a forged
464// membership record published by anyone other than the spindle owner
465// does NOT trigger a knot subscription, even if a matching repo for
466// the named subject exists. This mirrors AuthorizePipelineActor's
467// rule that the membership grant's publisher must equal the spindle
468// owner.
469func TestSpindleMemberGrantFromNonOwnerIgnored(t *testing.T) {
470 s := newTestStore(t)
471 ctx := context.Background()
472
473 const ours = "tack.example"
474 const owner = "did:plc:owner"
475 const alice = "did:plc:alice"
476
477 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil {
478 t.Fatal(err)
479 }
480
481 // Forged grant: alice "vouches" for herself. Stored, but must
482 // not flip her into authorized status.
483 forgery := tangled.SpindleMember{
484 Instance: ours,
485 Subject: alice,
486 CreatedAt: "2026-01-02T00:00:00Z",
487 }
488 evt := commitEvent(2, alice, tangled.SpindleMemberNSID, jsOpCreate, "mk", forgery)
489
490 fake := &fakeKnotConsumer{}
491 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil {
492 t.Fatalf("handle forged grant: %v", err)
493 }
494 if added := fake.Added(); len(added) != 0 {
495 t.Fatalf("AddKnot calls = %v, want none (grant publisher != owner)", added)
496 }
497}
498
499// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a
500// *different* spindle do not pull us into watching their knot. Without
501// this guard, tack would dial every knot named in any sh.tangled.repo
502// it sees over the firehose, which is most of them.
503func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) {
504 s := newTestStore(t)
505 ctx := context.Background()
506
507 other := "other-spindle.example"
508 rec := tangled.Repo{
509 Knot: "knot.example",
510 Name: "myrepo",
511 Spindle: &other,
512 CreatedAt: "2026-01-01T00:00:00Z",
513 }
514 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
515
516 fake := &fakeKnotConsumer{}
517 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", "did:plc:owner", evt); err != nil {
518 t.Fatalf("handle: %v", err)
519 }
520 if added := fake.Added(); len(added) != 0 {
521 t.Fatalf("AddKnot calls = %v, want none", added)
522 }
523}
524
525// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a
526// repo we'd previously been watching gets its .spindle field flipped to
527// some other spindle. Once that's the only repo we had on that knot,
528// the reconciliation must call RemoveKnot.
529func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) {
530 s := newTestStore(t)
531 ctx := context.Background()
532
533 const ours = "tack.example"
534 const knot = "knot.example"
535
536 // Seed: a repo that names us as its spindle on `knot`.
537 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil {
538 t.Fatal(err)
539 }
540
541 // Update: same record, now points at a different spindle.
542 other := "other.example"
543 rec := tangled.Repo{
544 Knot: knot,
545 Name: "repo-a",
546 Spindle: &other,
547 CreatedAt: "2026-01-01T00:00:00Z",
548 }
549 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
550
551 fake := &fakeKnotConsumer{}
552 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
553 t.Fatalf("handle: %v", err)
554 }
555 if added := fake.Added(); len(added) != 0 {
556 t.Fatalf("AddKnot calls = %v, want none", added)
557 }
558 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
559 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
560 }
561}
562
563// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't
564// over-eagerly unsubscribe from a knot when one of multiple repos on
565// that knot moves away from us. The other repo's subscription must keep
566// the knot in our wanted set.
567func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) {
568 s := newTestStore(t)
569 ctx := context.Background()
570
571 const ours = "tack.example"
572 const knot = "knot.example"
573
574 // Both publishers must be vouched for; otherwise IsKnotWanted's
575 // membership filter would zero them out and we'd unsubscribe
576 // regardless. The reconciliation we want to test only matters
577 // when there is a real authorized hold to keep.
578 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil {
579 t.Fatal(err)
580 }
581 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil {
582 t.Fatal(err)
583 }
584
585 // Two repos sharing one knot, both pointed at us.
586 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
587 t.Fatal(err)
588 }
589 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
590 t.Fatal(err)
591 }
592
593 // Repo A flips to a different spindle. B still wants us.
594 other := "other.example"
595 rec := tangled.Repo{
596 Knot: knot,
597 Name: "a",
598 Spindle: &other,
599 CreatedAt: "2026-01-01T00:00:00Z",
600 }
601 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec)
602
603 fake := &fakeKnotConsumer{}
604 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
605 t.Fatalf("handle: %v", err)
606 }
607 if removed := fake.Removed(); len(removed) != 0 {
608 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot)
609 }
610}
611
612// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo
613// staying with us but changing its .knot field unsubscribes the old
614// knot (if no other repo holds it) and subscribes the new one.
615func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) {
616 s := newTestStore(t)
617 ctx := context.Background()
618
619 const ours = "tack.example"
620 const oldKnot = "old.example"
621 const newKnot = "new.example"
622
623 // Vouch for did:plc:a so reconcileKnot will actually call
624 // AddKnot for the new host. Without the grant the publisher
625 // is unauthorized and the AddKnot half is (correctly) skipped.
626 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil {
627 t.Fatal(err)
628 }
629
630 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil {
631 t.Fatal(err)
632 }
633
634 spindle := ours
635 rec := tangled.Repo{
636 Knot: newKnot,
637 Name: "a",
638 Spindle: &spindle,
639 CreatedAt: "2026-01-01T00:00:00Z",
640 }
641 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
642
643 fake := &fakeKnotConsumer{}
644 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
645 t.Fatalf("handle: %v", err)
646 }
647 if added := fake.Added(); len(added) != 1 || added[0] != newKnot {
648 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot)
649 }
650 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot {
651 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot)
652 }
653}
654
655// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on
656// a knot we cared about triggers RemoveKnot.
657func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) {
658 s := newTestStore(t)
659 ctx := context.Background()
660
661 const ours = "tack.example"
662 const knot = "knot.example"
663
664 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil {
665 t.Fatal(err)
666 }
667 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil)
668
669 fake := &fakeKnotConsumer{}
670 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
671 t.Fatalf("handle: %v", err)
672 }
673 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
674 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
675 }
676}
677
678// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple
679// repos on a knot does not unsubscribe — the survivors still want it.
680func TestRepoDeleteKeepsKnotIfShared(t *testing.T) {
681 s := newTestStore(t)
682 ctx := context.Background()
683
684 const ours = "tack.example"
685 const knot = "knot.example"
686
687 // Both publishers vouched for; otherwise IsKnotWanted would
688 // (correctly) report neither holds the knot and we'd unsubscribe.
689 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil {
690 t.Fatal(err)
691 }
692 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil {
693 t.Fatal(err)
694 }
695
696 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
697 t.Fatal(err)
698 }
699 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
700 t.Fatal(err)
701 }
702 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil)
703
704 fake := &fakeKnotConsumer{}
705 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
706 t.Fatalf("handle: %v", err)
707 }
708 if removed := fake.Removed(); len(removed) != 0 {
709 t.Fatalf("RemoveKnot calls = %v, want none", removed)
710 }
711}