Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Tests for handleJetstreamEvent. We exercise the full path — collection 4// dispatch, record decoding, store mutation, and cursor advancement — 5// using an in-memory store from store_test.go's newTestStore helper. 6// 7// Events are constructed by hand rather than recorded from a live 8// jetstream so the tests don't need network access and stay fast. 9 10import ( 11 "context" 12 "encoding/json" 13 "path/filepath" 14 "testing" 15 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18) 19 20// commitEvent builds a jetstream commit event for tests. timeUS is the 21// cursor value the handler should persist after applying. record can be 22// nil for delete operations, which carry no body. 23func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event { 24 var raw json.RawMessage 25 if record != nil { 26 b, err := json.Marshal(record) 27 if err != nil { 28 panic(err) // test helper — callers control the input 29 } 30 raw = b 31 } 32 return &jsmodels.Event{ 33 Did: did, 34 TimeUS: timeUS, 35 Kind: jsmodels.EventKindCommit, 36 Commit: &jsmodels.Commit{ 37 Operation: op, 38 Collection: collection, 39 RKey: rkey, 40 Record: raw, 41 }, 42 } 43} 44 45// requireCursor asserts the persisted cursor equals want. Pulled out 46// because almost every test in this file checks it. 47func requireCursor(t *testing.T, s *store, want int64) { 48 t.Helper() 49 got, err := s.LoadCursor(context.Background()) 50 if err != nil { 51 t.Fatalf("load cursor: %v", err) 52 } 53 if got == nil || *got != want { 54 t.Fatalf("cursor = %v, want %d", got, want) 55 } 56} 57 58// TestHandleNonCommitEvent confirms account/identity events are ignored 59// without error and without advancing the cursor — they don't have a 60// TimeUS we want to commit to. 61func TestHandleNonCommitEvent(t *testing.T) { 62 s := newTestStore(t) 63 ctx := context.Background() 64 65 evt := &jsmodels.Event{ 66 Did: "did:plc:foo", 67 TimeUS: 100, 68 Kind: jsmodels.EventKindAccount, 69 } 70 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 71 t.Fatalf("handle: %v", err) 72 } 73 got, err := s.LoadCursor(ctx) 74 if err != nil { 75 t.Fatalf("load cursor: %v", err) 76 } 77 if got != nil { 78 t.Fatalf("expected no cursor for non-commit, got %d", *got) 79 } 80} 81 82// TestHandleSpindleMemberCreate exercises the happy path: a create 83// commit lands a row in spindle_members and advances the cursor. 84func TestHandleSpindleMemberCreate(t *testing.T) { 85 s := newTestStore(t) 86 ctx := context.Background() 87 88 rec := tangled.SpindleMember{ 89 Instance: "https://spindle.example", 90 Subject: "did:plc:alice", 91 CreatedAt: "2026-01-01T00:00:00Z", 92 } 93 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 94 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 95 t.Fatalf("handle: %v", err) 96 } 97 98 var instance, subject string 99 err := s.db.QueryRowContext(ctx, 100 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`, 101 "did:plc:owner", "rk1", 102 ).Scan(&instance, &subject) 103 if err != nil { 104 t.Fatalf("query: %v", err) 105 } 106 if instance != "https://spindle.example" || subject != "did:plc:alice" { 107 t.Fatalf("got (%q,%q)", instance, subject) 108 } 109 requireCursor(t, s, 12345) 110} 111 112// TestHandleSpindleMemberDelete confirms a delete commit removes the 113// previously-persisted row. 114func TestHandleSpindleMemberDelete(t *testing.T) { 115 s := newTestStore(t) 116 ctx := context.Background() 117 118 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil { 119 t.Fatalf("seed: %v", err) 120 } 121 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 122 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 123 t.Fatalf("handle: %v", err) 124 } 125 if n := countRows(t, s, "spindle_members"); n != 0 { 126 t.Fatalf("after delete: %d rows, want 0", n) 127 } 128 requireCursor(t, s, 99) 129} 130 131// TestHandleRepoCreateOptionals verifies pointer-typed optional fields 132// on tangled.Repo are derefed correctly (and nils become empty strings) 133// when written to the store. 134func TestHandleRepoCreateOptionals(t *testing.T) { 135 s := newTestStore(t) 136 ctx := context.Background() 137 138 spindle := "https://spindle.example" 139 repoDid := "did:plc:repo" 140 rec := tangled.Repo{ 141 Knot: "knot.example", 142 Name: "myrepo", 143 Spindle: &spindle, 144 RepoDid: &repoDid, 145 CreatedAt: "2026-01-01T00:00:00Z", 146 } 147 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 148 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 149 t.Fatalf("handle: %v", err) 150 } 151 152 var gotSpindle, gotRepoDid string 153 err := s.db.QueryRowContext(ctx, 154 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 155 "did:plc:owner", "repo1", 156 ).Scan(&gotSpindle, &gotRepoDid) 157 if err != nil { 158 t.Fatalf("query: %v", err) 159 } 160 if gotSpindle != spindle || gotRepoDid != repoDid { 161 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid) 162 } 163 164 // Now a record with both optionals nil — should land as empty 165 // strings, not crash on a nil dereference in deref(). 166 rec2 := tangled.Repo{ 167 Knot: "knot.example", 168 Name: "other", 169 CreatedAt: "2026-01-01T00:00:00Z", 170 } 171 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 172 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt2); err != nil { 173 t.Fatalf("handle nil-optionals: %v", err) 174 } 175 err = s.db.QueryRowContext(ctx, 176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 177 "did:plc:owner", "repo2", 178 ).Scan(&gotSpindle, &gotRepoDid) 179 if err != nil { 180 t.Fatalf("query nil-optionals: %v", err) 181 } 182 if gotSpindle != "" || gotRepoDid != "" { 183 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid) 184 } 185 requireCursor(t, s, 8) 186} 187 188// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each 189// collection has at least one apply test. 190func TestHandleRepoCollaboratorCreate(t *testing.T) { 191 s := newTestStore(t) 192 ctx := context.Background() 193 194 repo := "myrepo" 195 rec := tangled.RepoCollaborator{ 196 Repo: &repo, 197 Subject: "did:plc:carol", 198 CreatedAt: "2026-01-01T00:00:00Z", 199 } 200 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 201 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 202 t.Fatalf("handle: %v", err) 203 } 204 205 var subject string 206 err := s.db.QueryRowContext(ctx, 207 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`, 208 "did:plc:owner", "c1", 209 ).Scan(&subject) 210 if err != nil { 211 t.Fatalf("query: %v", err) 212 } 213 if subject != "did:plc:carol" { 214 t.Fatalf("subject = %q", subject) 215 } 216 requireCursor(t, s, 55) 217} 218 219// TestHandleUnknownCollection makes sure a collection we didn't ask for 220// (jetstream filter changes, schema drift) is silently dropped — no 221// error, no row, but cursor still advances so we don't replay it. 222func TestHandleUnknownCollection(t *testing.T) { 223 s := newTestStore(t) 224 ctx := context.Background() 225 226 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 227 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 228 t.Fatalf("handle: %v", err) 229 } 230 requireCursor(t, s, 42) 231} 232 233// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to 234// the happy paths: a malformed record body must be logged-and-skipped 235// (not returned as an error that pauses the scheduler) and the cursor 236// must still advance so we don't loop on the same bad event forever. 237func TestHandleBadRecordAdvancesCursor(t *testing.T) { 238 s := newTestStore(t) 239 ctx := context.Background() 240 241 evt := &jsmodels.Event{ 242 Did: "did:plc:owner", 243 TimeUS: 1000, 244 Kind: jsmodels.EventKindCommit, 245 Commit: &jsmodels.Commit{ 246 Operation: jsOpCreate, 247 Collection: tangled.SpindleMemberNSID, 248 RKey: "broken", 249 Record: json.RawMessage(`{not valid json`), 250 }, 251 } 252 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 253 t.Fatalf("handle should swallow decode error, got: %v", err) 254 } 255 if n := countRows(t, s, "spindle_members"); n != 0 { 256 t.Fatalf("bad record should not have inserted; got %d rows", n) 257 } 258 requireCursor(t, s, 1000) 259} 260 261// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to 262// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous 263// position when applyCommit fails for an *infrastructure* reason 264// (here: store closed mid-flight, simulating SQLite busy / shutdown 265// races). Saving the cursor through such a failure would permanently 266// skip a perfectly good record and silently lose membership state. 267func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) { 268 // Build the store inline (not via newTestStore) so the cleanup 269 // tolerates the explicit Close we do below to provoke errors. 270 path := filepath.Join(t.TempDir(), "tack.db") 271 s, err := openStore(path) 272 if err != nil { 273 t.Fatalf("openStore: %v", err) 274 } 275 ctx := context.Background() 276 277 // Seed a baseline cursor; after the failed apply it must still be 278 // 500, not 1000. 279 if err := s.SaveCursor(ctx, 500); err != nil { 280 t.Fatalf("seed cursor: %v", err) 281 } 282 283 // Close the underlying DB. Subsequent store calls will fail with 284 // "sql: database is closed", which is a transient-style error from 285 // applyCommit's perspective: the record itself is well-formed. 286 if err := s.db.Close(); err != nil { 287 t.Fatalf("close db: %v", err) 288 } 289 290 rec := tangled.SpindleMember{ 291 Instance: "https://spindle.example", 292 Subject: "did:plc:alice", 293 CreatedAt: "2026-01-01T00:00:00Z", 294 } 295 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 296 297 // Expect a non-nil error: handler propagates transient failures. 298 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err == nil { 299 t.Fatalf("handle should return transient error, got nil") 300 } 301 302 // Re-open the same DB file to inspect the persisted cursor; the 303 // closed handle obviously can't answer queries. 304 s2, err := openStore(path) 305 if err != nil { 306 t.Fatalf("re-open: %v", err) 307 } 308 defer s2.Close() 309 got, err := s2.LoadCursor(ctx) 310 if err != nil { 311 t.Fatalf("load cursor: %v", err) 312 } 313 if got == nil || *got != 500 { 314 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got) 315 } 316} 317 318// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 319// sh.tangled.repo whose .spindle field equals our hostname results in a 320// dynamic AddKnot call. This is the hot path for picking up new repos 321// without a tack restart. 322func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) { 323 s := newTestStore(t) 324 ctx := context.Background() 325 326 const ours = "tack.example" 327 spindle := ours 328 rec := tangled.Repo{ 329 Knot: "knot.example", 330 Name: "myrepo", 331 Spindle: &spindle, 332 CreatedAt: "2026-01-01T00:00:00Z", 333 } 334 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 335 336 fake := &fakeKnotConsumer{} 337 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 338 t.Fatalf("handle: %v", err) 339 } 340 added := fake.Added() 341 if len(added) != 1 || added[0] != "knot.example" { 342 t.Fatalf("AddKnot calls = %v, want [knot.example]", added) 343 } 344} 345 346// TestRepoEventIgnoresUnauthorizedPublisher pins the high-severity 347// security gate: a sh.tangled.repo record naming us as its spindle 348// must NOT cause an outbound knot subscription unless its publisher 349// is the spindle owner or an owner-vouched member. Without this gate 350// any firehose publisher could force tack to dial an attacker-chosen 351// websocket simply by minting a matching repo record. 352func TestRepoEventIgnoresUnauthorizedPublisher(t *testing.T) { 353 s := newTestStore(t) 354 ctx := context.Background() 355 356 const ours = "tack.example" 357 const owner = "did:plc:owner" 358 359 // did:plc:rando is neither the owner nor a member. 360 spindle := ours 361 rec := tangled.Repo{ 362 Knot: "evil.example", 363 Name: "myrepo", 364 Spindle: &spindle, 365 CreatedAt: "2026-01-01T00:00:00Z", 366 } 367 evt := commitEvent(1, "did:plc:rando", tangled.RepoNSID, jsOpCreate, "rk", rec) 368 369 fake := &fakeKnotConsumer{} 370 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 371 t.Fatalf("handle: %v", err) 372 } 373 if added := fake.Added(); len(added) != 0 { 374 t.Fatalf("AddKnot calls = %v, want none (publisher is unauthorized)", added) 375 } 376} 377 378// TestSpindleMemberGrantSubscribesPendingKnots covers the case where 379// a member publishes their sh.tangled.repo *before* the owner's 380// sh.tangled.spindle.member grant arrives over the firehose. Until 381// the grant lands the publisher is unauthorized, so the repo doesn't 382// pull a subscription. Once the grant arrives, the same-rkey reconcile 383// must catch up by AddKnot-ing every knot that member's pending repos 384// already named. 385func TestSpindleMemberGrantSubscribesPendingKnots(t *testing.T) { 386 s := newTestStore(t) 387 ctx := context.Background() 388 389 const ours = "tack.example" 390 const owner = "did:plc:owner" 391 const alice = "did:plc:alice" 392 393 // Step 1: alice publishes a repo claiming us. She isn't a member 394 // yet, so no subscription should happen. 395 spindle := ours 396 repoRec := tangled.Repo{ 397 Knot: "knot.example", 398 Name: "myrepo", 399 Spindle: &spindle, 400 CreatedAt: "2026-01-01T00:00:00Z", 401 } 402 repoEvt := commitEvent(1, alice, tangled.RepoNSID, jsOpCreate, "rk", repoRec) 403 404 fake := &fakeKnotConsumer{} 405 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, repoEvt); err != nil { 406 t.Fatalf("handle repo: %v", err) 407 } 408 if added := fake.Added(); len(added) != 0 { 409 t.Fatalf("AddKnot before grant = %v, want none", added) 410 } 411 412 // Step 2: owner publishes a membership grant naming alice. The 413 // reconcile triggered by that grant must subscribe to alice's 414 // already-stored repo's knot. 415 grant := tangled.SpindleMember{ 416 Instance: ours, 417 Subject: alice, 418 CreatedAt: "2026-01-02T00:00:00Z", 419 } 420 grantEvt := commitEvent(2, owner, tangled.SpindleMemberNSID, jsOpCreate, "mk", grant) 421 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, grantEvt); err != nil { 422 t.Fatalf("handle grant: %v", err) 423 } 424 if added := fake.Added(); len(added) != 1 || added[0] != "knot.example" { 425 t.Fatalf("AddKnot after grant = %v, want [knot.example]", added) 426 } 427} 428 429// TestSpindleMemberRevokeUnsubscribesKnot verifies that revoking a 430// previously-granted membership tears down the only-held-by-that-member 431// knot subscription. Without this, an attacker who briefly held 432// membership could leave us dialing their chosen knot until restart, 433// the exact lingering-subscription concern called out in KNOWN_ISSUES. 434func TestSpindleMemberRevokeUnsubscribesKnot(t *testing.T) { 435 s := newTestStore(t) 436 ctx := context.Background() 437 438 const ours = "tack.example" 439 const owner = "did:plc:owner" 440 const alice = "did:plc:alice" 441 442 // Seed: owner has previously vouched for alice, and alice has 443 // already published a repo on knot.example pointing at us. 444 if err := s.UpsertSpindleMember(ctx, owner, "mk", ours, alice, "t"); err != nil { 445 t.Fatal(err) 446 } 447 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 448 t.Fatal(err) 449 } 450 451 // Owner publishes a delete of the membership grant. 452 revoke := commitEvent(10, owner, tangled.SpindleMemberNSID, jsOpDelete, "mk", nil) 453 454 fake := &fakeKnotConsumer{} 455 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, revoke); err != nil { 456 t.Fatalf("handle revoke: %v", err) 457 } 458 if removed := fake.Removed(); len(removed) != 1 || removed[0] != "knot.example" { 459 t.Fatalf("RemoveKnot calls = %v, want [knot.example]", removed) 460 } 461} 462 463// TestSpindleMemberGrantFromNonOwnerIgnored confirms that a forged 464// membership record published by anyone other than the spindle owner 465// does NOT trigger a knot subscription, even if a matching repo for 466// the named subject exists. This mirrors AuthorizePipelineActor's 467// rule that the membership grant's publisher must equal the spindle 468// owner. 469func TestSpindleMemberGrantFromNonOwnerIgnored(t *testing.T) { 470 s := newTestStore(t) 471 ctx := context.Background() 472 473 const ours = "tack.example" 474 const owner = "did:plc:owner" 475 const alice = "did:plc:alice" 476 477 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 478 t.Fatal(err) 479 } 480 481 // Forged grant: alice "vouches" for herself. Stored, but must 482 // not flip her into authorized status. 483 forgery := tangled.SpindleMember{ 484 Instance: ours, 485 Subject: alice, 486 CreatedAt: "2026-01-02T00:00:00Z", 487 } 488 evt := commitEvent(2, alice, tangled.SpindleMemberNSID, jsOpCreate, "mk", forgery) 489 490 fake := &fakeKnotConsumer{} 491 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 492 t.Fatalf("handle forged grant: %v", err) 493 } 494 if added := fake.Added(); len(added) != 0 { 495 t.Fatalf("AddKnot calls = %v, want none (grant publisher != owner)", added) 496 } 497} 498 499// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 500// *different* spindle do not pull us into watching their knot. Without 501// this guard, tack would dial every knot named in any sh.tangled.repo 502// it sees over the firehose, which is most of them. 503func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) { 504 s := newTestStore(t) 505 ctx := context.Background() 506 507 other := "other-spindle.example" 508 rec := tangled.Repo{ 509 Knot: "knot.example", 510 Name: "myrepo", 511 Spindle: &other, 512 CreatedAt: "2026-01-01T00:00:00Z", 513 } 514 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 515 516 fake := &fakeKnotConsumer{} 517 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", "did:plc:owner", evt); err != nil { 518 t.Fatalf("handle: %v", err) 519 } 520 if added := fake.Added(); len(added) != 0 { 521 t.Fatalf("AddKnot calls = %v, want none", added) 522 } 523} 524 525// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a 526// repo we'd previously been watching gets its .spindle field flipped to 527// some other spindle. Once that's the only repo we had on that knot, 528// the reconciliation must call RemoveKnot. 529func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) { 530 s := newTestStore(t) 531 ctx := context.Background() 532 533 const ours = "tack.example" 534 const knot = "knot.example" 535 536 // Seed: a repo that names us as its spindle on `knot`. 537 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil { 538 t.Fatal(err) 539 } 540 541 // Update: same record, now points at a different spindle. 542 other := "other.example" 543 rec := tangled.Repo{ 544 Knot: knot, 545 Name: "repo-a", 546 Spindle: &other, 547 CreatedAt: "2026-01-01T00:00:00Z", 548 } 549 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 550 551 fake := &fakeKnotConsumer{} 552 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 553 t.Fatalf("handle: %v", err) 554 } 555 if added := fake.Added(); len(added) != 0 { 556 t.Fatalf("AddKnot calls = %v, want none", added) 557 } 558 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 559 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 560 } 561} 562 563// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't 564// over-eagerly unsubscribe from a knot when one of multiple repos on 565// that knot moves away from us. The other repo's subscription must keep 566// the knot in our wanted set. 567func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) { 568 s := newTestStore(t) 569 ctx := context.Background() 570 571 const ours = "tack.example" 572 const knot = "knot.example" 573 574 // Both publishers must be vouched for; otherwise IsKnotWanted's 575 // membership filter would zero them out and we'd unsubscribe 576 // regardless. The reconciliation we want to test only matters 577 // when there is a real authorized hold to keep. 578 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 579 t.Fatal(err) 580 } 581 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 582 t.Fatal(err) 583 } 584 585 // Two repos sharing one knot, both pointed at us. 586 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 587 t.Fatal(err) 588 } 589 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 590 t.Fatal(err) 591 } 592 593 // Repo A flips to a different spindle. B still wants us. 594 other := "other.example" 595 rec := tangled.Repo{ 596 Knot: knot, 597 Name: "a", 598 Spindle: &other, 599 CreatedAt: "2026-01-01T00:00:00Z", 600 } 601 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec) 602 603 fake := &fakeKnotConsumer{} 604 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 605 t.Fatalf("handle: %v", err) 606 } 607 if removed := fake.Removed(); len(removed) != 0 { 608 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot) 609 } 610} 611 612// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo 613// staying with us but changing its .knot field unsubscribes the old 614// knot (if no other repo holds it) and subscribes the new one. 615func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) { 616 s := newTestStore(t) 617 ctx := context.Background() 618 619 const ours = "tack.example" 620 const oldKnot = "old.example" 621 const newKnot = "new.example" 622 623 // Vouch for did:plc:a so reconcileKnot will actually call 624 // AddKnot for the new host. Without the grant the publisher 625 // is unauthorized and the AddKnot half is (correctly) skipped. 626 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 627 t.Fatal(err) 628 } 629 630 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil { 631 t.Fatal(err) 632 } 633 634 spindle := ours 635 rec := tangled.Repo{ 636 Knot: newKnot, 637 Name: "a", 638 Spindle: &spindle, 639 CreatedAt: "2026-01-01T00:00:00Z", 640 } 641 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 642 643 fake := &fakeKnotConsumer{} 644 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 645 t.Fatalf("handle: %v", err) 646 } 647 if added := fake.Added(); len(added) != 1 || added[0] != newKnot { 648 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot) 649 } 650 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot { 651 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot) 652 } 653} 654 655// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on 656// a knot we cared about triggers RemoveKnot. 657func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) { 658 s := newTestStore(t) 659 ctx := context.Background() 660 661 const ours = "tack.example" 662 const knot = "knot.example" 663 664 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil { 665 t.Fatal(err) 666 } 667 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil) 668 669 fake := &fakeKnotConsumer{} 670 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 671 t.Fatalf("handle: %v", err) 672 } 673 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 674 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 675 } 676} 677 678// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple 679// repos on a knot does not unsubscribe — the survivors still want it. 680func TestRepoDeleteKeepsKnotIfShared(t *testing.T) { 681 s := newTestStore(t) 682 ctx := context.Background() 683 684 const ours = "tack.example" 685 const knot = "knot.example" 686 687 // Both publishers vouched for; otherwise IsKnotWanted would 688 // (correctly) report neither holds the knot and we'd unsubscribe. 689 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 690 t.Fatal(err) 691 } 692 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 693 t.Fatal(err) 694 } 695 696 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 697 t.Fatal(err) 698 } 699 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 700 t.Fatal(err) 701 } 702 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil) 703 704 fake := &fakeKnotConsumer{} 705 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 706 t.Fatalf("handle: %v", err) 707 } 708 if removed := fake.Removed(); len(removed) != 0 { 709 t.Fatalf("RemoveKnot calls = %v, want none", removed) 710 } 711}