Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 25 kB View raw
1package main 2 3// Tests for handleJetstreamEvent. We exercise the full path — collection 4// dispatch, record decoding, store mutation, and cursor advancement — 5// using an in-memory store from store_test.go's newTestStore helper. 6// 7// Events are constructed by hand rather than recorded from a live 8// jetstream so the tests don't need network access and stay fast. 9 10import ( 11 "context" 12 "encoding/json" 13 "path/filepath" 14 "testing" 15 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 17 js "go.mitchellh.com/tack/internal/jetstream" 18 "tangled.org/core/api/tangled" 19) 20 21// commitEvent builds a jetstream commit event for tests. timeUS is the 22// cursor value the handler should persist after applying. record can be 23// nil for delete operations, which carry no body. 24func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event { 25 var raw json.RawMessage 26 if record != nil { 27 b, err := json.Marshal(record) 28 if err != nil { 29 panic(err) // test helper — callers control the input 30 } 31 raw = b 32 } 33 return &jsmodels.Event{ 34 Did: did, 35 TimeUS: timeUS, 36 Kind: jsmodels.EventKindCommit, 37 Commit: &jsmodels.Commit{ 38 Operation: op, 39 Collection: collection, 40 RKey: rkey, 41 Record: raw, 42 }, 43 } 44} 45 46// requireCursor asserts the persisted cursor equals want. Pulled out 47// because almost every test in this file checks it. 48func requireCursor(t *testing.T, s *store, want int64) { 49 t.Helper() 50 got, err := s.LoadCursor(context.Background()) 51 if err != nil { 52 t.Fatalf("load cursor: %v", err) 53 } 54 if got == nil || *got != want { 55 t.Fatalf("cursor = %v, want %d", got, want) 56 } 57} 58 59// handleJetstreamEvent is the testable per-event entry point. The generic 60// jetstream processor owns cursor advancement; this wrapper only supplies 61// Tack's collection list and commit application callback. 62func handleJetstreamEvent( 63 ctx context.Context, 64 st *store, 65 knots KnotConsumer, 66 hostname, ownerDID string, 67 evt *jsmodels.Event, 68) error { 69 processor := &js.Processor{ 70 Collections: jetstreamCollections, 71 CursorStore: st, 72 Handler: js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error { 73 return applyCommit(ctx, st, knots, hostname, ownerDID, evt) 74 }), 75 Logger: loggerFrom(ctx), 76 } 77 return processor.HandleEvent(ctx, evt) 78} 79 80// TestHandleNonCommitEvent confirms account/identity events are ignored 81// without error and without advancing the cursor — they don't have a 82// TimeUS we want to commit to. 83func TestHandleNonCommitEvent(t *testing.T) { 84 s := newTestStore(t) 85 ctx := context.Background() 86 87 evt := &jsmodels.Event{ 88 Did: "did:plc:foo", 89 TimeUS: 100, 90 Kind: jsmodels.EventKindAccount, 91 } 92 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 93 t.Fatalf("handle: %v", err) 94 } 95 got, err := s.LoadCursor(ctx) 96 if err != nil { 97 t.Fatalf("load cursor: %v", err) 98 } 99 if got != nil { 100 t.Fatalf("expected no cursor for non-commit, got %d", *got) 101 } 102} 103 104// TestHandleSpindleMemberCreate exercises the happy path: a create 105// commit lands a row in spindle_members and advances the cursor. 106func TestHandleSpindleMemberCreate(t *testing.T) { 107 s := newTestStore(t) 108 ctx := context.Background() 109 110 rec := tangled.SpindleMember{ 111 Instance: "https://spindle.example", 112 Subject: "did:plc:alice", 113 CreatedAt: "2026-01-01T00:00:00Z", 114 } 115 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 116 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 117 t.Fatalf("handle: %v", err) 118 } 119 120 var instance, subject string 121 err := s.db.QueryRowContext(ctx, 122 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`, 123 "did:plc:owner", "rk1", 124 ).Scan(&instance, &subject) 125 if err != nil { 126 t.Fatalf("query: %v", err) 127 } 128 if instance != "https://spindle.example" || subject != "did:plc:alice" { 129 t.Fatalf("got (%q,%q)", instance, subject) 130 } 131 requireCursor(t, s, 12345) 132} 133 134// TestHandleSpindleMemberDelete confirms a delete commit removes the 135// previously-persisted row. 136func TestHandleSpindleMemberDelete(t *testing.T) { 137 s := newTestStore(t) 138 ctx := context.Background() 139 140 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil { 141 t.Fatalf("seed: %v", err) 142 } 143 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 144 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 145 t.Fatalf("handle: %v", err) 146 } 147 if n := countRows(t, s, "spindle_members"); n != 0 { 148 t.Fatalf("after delete: %d rows, want 0", n) 149 } 150 requireCursor(t, s, 99) 151} 152 153// TestHandleRepoCreateOptionals verifies pointer-typed optional fields 154// on tangled.Repo are derefed correctly (and nils become empty strings) 155// when written to the store. 156func TestHandleRepoCreateOptionals(t *testing.T) { 157 s := newTestStore(t) 158 ctx := context.Background() 159 160 spindle := "https://spindle.example" 161 repoDid := "did:plc:repo" 162 rec := tangled.Repo{ 163 Knot: "knot.example", 164 Name: "myrepo", 165 Spindle: &spindle, 166 RepoDid: &repoDid, 167 CreatedAt: "2026-01-01T00:00:00Z", 168 } 169 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 170 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 171 t.Fatalf("handle: %v", err) 172 } 173 174 var gotSpindle, gotRepoDid string 175 err := s.db.QueryRowContext(ctx, 176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 177 "did:plc:owner", "repo1", 178 ).Scan(&gotSpindle, &gotRepoDid) 179 if err != nil { 180 t.Fatalf("query: %v", err) 181 } 182 if gotSpindle != spindle || gotRepoDid != repoDid { 183 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid) 184 } 185 186 // Now a record with both optionals nil — should land as empty 187 // strings, not crash on a nil dereference in deref(). 188 rec2 := tangled.Repo{ 189 Knot: "knot.example", 190 Name: "other", 191 CreatedAt: "2026-01-01T00:00:00Z", 192 } 193 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 194 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt2); err != nil { 195 t.Fatalf("handle nil-optionals: %v", err) 196 } 197 err = s.db.QueryRowContext(ctx, 198 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 199 "did:plc:owner", "repo2", 200 ).Scan(&gotSpindle, &gotRepoDid) 201 if err != nil { 202 t.Fatalf("query nil-optionals: %v", err) 203 } 204 if gotSpindle != "" || gotRepoDid != "" { 205 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid) 206 } 207 requireCursor(t, s, 8) 208} 209 210// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each 211// collection has at least one apply test. 212func TestHandleRepoCollaboratorCreate(t *testing.T) { 213 s := newTestStore(t) 214 ctx := context.Background() 215 216 repo := "myrepo" 217 rec := tangled.RepoCollaborator{ 218 Repo: &repo, 219 Subject: "did:plc:carol", 220 CreatedAt: "2026-01-01T00:00:00Z", 221 } 222 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 223 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 224 t.Fatalf("handle: %v", err) 225 } 226 227 var subject string 228 err := s.db.QueryRowContext(ctx, 229 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`, 230 "did:plc:owner", "c1", 231 ).Scan(&subject) 232 if err != nil { 233 t.Fatalf("query: %v", err) 234 } 235 if subject != "did:plc:carol" { 236 t.Fatalf("subject = %q", subject) 237 } 238 requireCursor(t, s, 55) 239} 240 241// TestHandleUnknownCollection makes sure a collection we didn't ask for 242// (jetstream filter changes, schema drift) is silently dropped — no 243// error, no row, but cursor still advances so we don't replay it. 244func TestHandleUnknownCollection(t *testing.T) { 245 s := newTestStore(t) 246 ctx := context.Background() 247 248 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 249 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 250 t.Fatalf("handle: %v", err) 251 } 252 requireCursor(t, s, 42) 253} 254 255// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to 256// the happy paths: a malformed record body must be logged-and-skipped 257// (not returned as an error that pauses the scheduler) and the cursor 258// must still advance so we don't loop on the same bad event forever. 259func TestHandleBadRecordAdvancesCursor(t *testing.T) { 260 s := newTestStore(t) 261 ctx := context.Background() 262 263 evt := &jsmodels.Event{ 264 Did: "did:plc:owner", 265 TimeUS: 1000, 266 Kind: jsmodels.EventKindCommit, 267 Commit: &jsmodels.Commit{ 268 Operation: jsOpCreate, 269 Collection: tangled.SpindleMemberNSID, 270 RKey: "broken", 271 Record: json.RawMessage(`{not valid json`), 272 }, 273 } 274 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 275 t.Fatalf("handle should swallow decode error, got: %v", err) 276 } 277 if n := countRows(t, s, "spindle_members"); n != 0 { 278 t.Fatalf("bad record should not have inserted; got %d rows", n) 279 } 280 requireCursor(t, s, 1000) 281} 282 283// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to 284// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous 285// position when applyCommit fails for an *infrastructure* reason 286// (here: store closed mid-flight, simulating SQLite busy / shutdown 287// races). Saving the cursor through such a failure would permanently 288// skip a perfectly good record and silently lose membership state. 289func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) { 290 // Build the store inline (not via newTestStore) so the cleanup 291 // tolerates the explicit Close we do below to provoke errors. 292 path := filepath.Join(t.TempDir(), "tack.db") 293 s, err := openStore(path) 294 if err != nil { 295 t.Fatalf("openStore: %v", err) 296 } 297 ctx := context.Background() 298 299 // Seed a baseline cursor; after the failed apply it must still be 300 // 500, not 1000. 301 if err := s.SaveCursor(ctx, 500); err != nil { 302 t.Fatalf("seed cursor: %v", err) 303 } 304 305 // Close the underlying DB. Subsequent store calls will fail with 306 // "sql: database is closed", which is a transient-style error from 307 // applyCommit's perspective: the record itself is well-formed. 308 if err := s.db.Close(); err != nil { 309 t.Fatalf("close db: %v", err) 310 } 311 312 rec := tangled.SpindleMember{ 313 Instance: "https://spindle.example", 314 Subject: "did:plc:alice", 315 CreatedAt: "2026-01-01T00:00:00Z", 316 } 317 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 318 319 // Expect a non-nil error: handler propagates transient failures. 320 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err == nil { 321 t.Fatalf("handle should return transient error, got nil") 322 } 323 324 // Re-open the same DB file to inspect the persisted cursor; the 325 // closed handle obviously can't answer queries. 326 s2, err := openStore(path) 327 if err != nil { 328 t.Fatalf("re-open: %v", err) 329 } 330 defer s2.Close() 331 got, err := s2.LoadCursor(ctx) 332 if err != nil { 333 t.Fatalf("load cursor: %v", err) 334 } 335 if got == nil || *got != 500 { 336 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got) 337 } 338} 339 340// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 341// sh.tangled.repo whose .spindle field equals our hostname results in a 342// dynamic AddKnot call. This is the hot path for picking up new repos 343// without a tack restart. 344func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) { 345 s := newTestStore(t) 346 ctx := context.Background() 347 348 const ours = "tack.example" 349 spindle := ours 350 rec := tangled.Repo{ 351 Knot: "knot.example", 352 Name: "myrepo", 353 Spindle: &spindle, 354 CreatedAt: "2026-01-01T00:00:00Z", 355 } 356 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 357 358 fake := &fakeKnotConsumer{} 359 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 360 t.Fatalf("handle: %v", err) 361 } 362 added := fake.Added() 363 if len(added) != 1 || added[0] != "knot.example" { 364 t.Fatalf("AddKnot calls = %v, want [knot.example]", added) 365 } 366} 367 368// TestRepoEventIgnoresUnauthorizedPublisher pins the high-severity 369// security gate: a sh.tangled.repo record naming us as its spindle 370// must NOT cause an outbound knot subscription unless its publisher 371// is the spindle owner or an owner-vouched member. Without this gate 372// any firehose publisher could force tack to dial an attacker-chosen 373// websocket simply by minting a matching repo record. 374func TestRepoEventIgnoresUnauthorizedPublisher(t *testing.T) { 375 s := newTestStore(t) 376 ctx := context.Background() 377 378 const ours = "tack.example" 379 const owner = "did:plc:owner" 380 381 // did:plc:rando is neither the owner nor a member. 382 spindle := ours 383 rec := tangled.Repo{ 384 Knot: "evil.example", 385 Name: "myrepo", 386 Spindle: &spindle, 387 CreatedAt: "2026-01-01T00:00:00Z", 388 } 389 evt := commitEvent(1, "did:plc:rando", tangled.RepoNSID, jsOpCreate, "rk", rec) 390 391 fake := &fakeKnotConsumer{} 392 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 393 t.Fatalf("handle: %v", err) 394 } 395 if added := fake.Added(); len(added) != 0 { 396 t.Fatalf("AddKnot calls = %v, want none (publisher is unauthorized)", added) 397 } 398} 399 400// TestSpindleMemberGrantSubscribesPendingKnots covers the case where 401// a member publishes their sh.tangled.repo *before* the owner's 402// sh.tangled.spindle.member grant arrives over the firehose. Until 403// the grant lands the publisher is unauthorized, so the repo doesn't 404// pull a subscription. Once the grant arrives, the same-rkey reconcile 405// must catch up by AddKnot-ing every knot that member's pending repos 406// already named. 407func TestSpindleMemberGrantSubscribesPendingKnots(t *testing.T) { 408 s := newTestStore(t) 409 ctx := context.Background() 410 411 const ours = "tack.example" 412 const owner = "did:plc:owner" 413 const alice = "did:plc:alice" 414 415 // Step 1: alice publishes a repo claiming us. She isn't a member 416 // yet, so no subscription should happen. 417 spindle := ours 418 repoRec := tangled.Repo{ 419 Knot: "knot.example", 420 Name: "myrepo", 421 Spindle: &spindle, 422 CreatedAt: "2026-01-01T00:00:00Z", 423 } 424 repoEvt := commitEvent(1, alice, tangled.RepoNSID, jsOpCreate, "rk", repoRec) 425 426 fake := &fakeKnotConsumer{} 427 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, repoEvt); err != nil { 428 t.Fatalf("handle repo: %v", err) 429 } 430 if added := fake.Added(); len(added) != 0 { 431 t.Fatalf("AddKnot before grant = %v, want none", added) 432 } 433 434 // Step 2: owner publishes a membership grant naming alice. The 435 // reconcile triggered by that grant must subscribe to alice's 436 // already-stored repo's knot. 437 grant := tangled.SpindleMember{ 438 Instance: ours, 439 Subject: alice, 440 CreatedAt: "2026-01-02T00:00:00Z", 441 } 442 grantEvt := commitEvent(2, owner, tangled.SpindleMemberNSID, jsOpCreate, "mk", grant) 443 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, grantEvt); err != nil { 444 t.Fatalf("handle grant: %v", err) 445 } 446 if added := fake.Added(); len(added) != 1 || added[0] != "knot.example" { 447 t.Fatalf("AddKnot after grant = %v, want [knot.example]", added) 448 } 449} 450 451// TestSpindleMemberRevokeUnsubscribesKnot verifies that revoking a 452// previously-granted membership tears down the only-held-by-that-member 453// knot subscription. Without this, an attacker who briefly held 454// membership could leave us dialing their chosen knot until restart. 455func TestSpindleMemberRevokeUnsubscribesKnot(t *testing.T) { 456 s := newTestStore(t) 457 ctx := context.Background() 458 459 const ours = "tack.example" 460 const owner = "did:plc:owner" 461 const alice = "did:plc:alice" 462 463 // Seed: owner has previously vouched for alice, and alice has 464 // already published a repo on knot.example pointing at us. 465 if err := s.UpsertSpindleMember(ctx, owner, "mk", ours, alice, "t"); err != nil { 466 t.Fatal(err) 467 } 468 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 469 t.Fatal(err) 470 } 471 472 // Owner publishes a delete of the membership grant. 473 revoke := commitEvent(10, owner, tangled.SpindleMemberNSID, jsOpDelete, "mk", nil) 474 475 fake := &fakeKnotConsumer{} 476 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, revoke); err != nil { 477 t.Fatalf("handle revoke: %v", err) 478 } 479 if removed := fake.Removed(); len(removed) != 1 || removed[0] != "knot.example" { 480 t.Fatalf("RemoveKnot calls = %v, want [knot.example]", removed) 481 } 482} 483 484// TestSpindleMemberGrantFromNonOwnerIgnored confirms that a forged 485// membership record published by anyone other than the spindle owner 486// does NOT trigger a knot subscription, even if a matching repo for 487// the named subject exists. This mirrors AuthorizePipelineActor's 488// rule that the membership grant's publisher must equal the spindle 489// owner. 490func TestSpindleMemberGrantFromNonOwnerIgnored(t *testing.T) { 491 s := newTestStore(t) 492 ctx := context.Background() 493 494 const ours = "tack.example" 495 const owner = "did:plc:owner" 496 const alice = "did:plc:alice" 497 498 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 499 t.Fatal(err) 500 } 501 502 // Forged grant: alice "vouches" for herself. Stored, but must 503 // not flip her into authorized status. 504 forgery := tangled.SpindleMember{ 505 Instance: ours, 506 Subject: alice, 507 CreatedAt: "2026-01-02T00:00:00Z", 508 } 509 evt := commitEvent(2, alice, tangled.SpindleMemberNSID, jsOpCreate, "mk", forgery) 510 511 fake := &fakeKnotConsumer{} 512 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 513 t.Fatalf("handle forged grant: %v", err) 514 } 515 if added := fake.Added(); len(added) != 0 { 516 t.Fatalf("AddKnot calls = %v, want none (grant publisher != owner)", added) 517 } 518} 519 520// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 521// *different* spindle do not pull us into watching their knot. Without 522// this guard, tack would dial every knot named in any sh.tangled.repo 523// it sees over the firehose, which is most of them. 524func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) { 525 s := newTestStore(t) 526 ctx := context.Background() 527 528 other := "other-spindle.example" 529 rec := tangled.Repo{ 530 Knot: "knot.example", 531 Name: "myrepo", 532 Spindle: &other, 533 CreatedAt: "2026-01-01T00:00:00Z", 534 } 535 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 536 537 fake := &fakeKnotConsumer{} 538 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", "did:plc:owner", evt); err != nil { 539 t.Fatalf("handle: %v", err) 540 } 541 if added := fake.Added(); len(added) != 0 { 542 t.Fatalf("AddKnot calls = %v, want none", added) 543 } 544} 545 546// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a 547// repo we'd previously been watching gets its .spindle field flipped to 548// some other spindle. Once that's the only repo we had on that knot, 549// the reconciliation must call RemoveKnot. 550func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) { 551 s := newTestStore(t) 552 ctx := context.Background() 553 554 const ours = "tack.example" 555 const knot = "knot.example" 556 557 // Seed: a repo that names us as its spindle on `knot`. 558 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil { 559 t.Fatal(err) 560 } 561 562 // Update: same record, now points at a different spindle. 563 other := "other.example" 564 rec := tangled.Repo{ 565 Knot: knot, 566 Name: "repo-a", 567 Spindle: &other, 568 CreatedAt: "2026-01-01T00:00:00Z", 569 } 570 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 571 572 fake := &fakeKnotConsumer{} 573 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 574 t.Fatalf("handle: %v", err) 575 } 576 if added := fake.Added(); len(added) != 0 { 577 t.Fatalf("AddKnot calls = %v, want none", added) 578 } 579 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 580 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 581 } 582} 583 584// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't 585// over-eagerly unsubscribe from a knot when one of multiple repos on 586// that knot moves away from us. The other repo's subscription must keep 587// the knot in our wanted set. 588func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) { 589 s := newTestStore(t) 590 ctx := context.Background() 591 592 const ours = "tack.example" 593 const knot = "knot.example" 594 595 // Both publishers must be vouched for; otherwise IsKnotWanted's 596 // membership filter would zero them out and we'd unsubscribe 597 // regardless. The reconciliation we want to test only matters 598 // when there is a real authorized hold to keep. 599 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 600 t.Fatal(err) 601 } 602 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 603 t.Fatal(err) 604 } 605 606 // Two repos sharing one knot, both pointed at us. 607 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 608 t.Fatal(err) 609 } 610 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 611 t.Fatal(err) 612 } 613 614 // Repo A flips to a different spindle. B still wants us. 615 other := "other.example" 616 rec := tangled.Repo{ 617 Knot: knot, 618 Name: "a", 619 Spindle: &other, 620 CreatedAt: "2026-01-01T00:00:00Z", 621 } 622 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec) 623 624 fake := &fakeKnotConsumer{} 625 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 626 t.Fatalf("handle: %v", err) 627 } 628 if removed := fake.Removed(); len(removed) != 0 { 629 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot) 630 } 631} 632 633// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo 634// staying with us but changing its .knot field unsubscribes the old 635// knot (if no other repo holds it) and subscribes the new one. 636func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) { 637 s := newTestStore(t) 638 ctx := context.Background() 639 640 const ours = "tack.example" 641 const oldKnot = "old.example" 642 const newKnot = "new.example" 643 644 // Vouch for did:plc:a so reconcileKnot will actually call 645 // AddKnot for the new host. Without the grant the publisher 646 // is unauthorized and the AddKnot half is (correctly) skipped. 647 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 648 t.Fatal(err) 649 } 650 651 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil { 652 t.Fatal(err) 653 } 654 655 spindle := ours 656 rec := tangled.Repo{ 657 Knot: newKnot, 658 Name: "a", 659 Spindle: &spindle, 660 CreatedAt: "2026-01-01T00:00:00Z", 661 } 662 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 663 664 fake := &fakeKnotConsumer{} 665 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 666 t.Fatalf("handle: %v", err) 667 } 668 if added := fake.Added(); len(added) != 1 || added[0] != newKnot { 669 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot) 670 } 671 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot { 672 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot) 673 } 674} 675 676// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on 677// a knot we cared about triggers RemoveKnot. 678func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) { 679 s := newTestStore(t) 680 ctx := context.Background() 681 682 const ours = "tack.example" 683 const knot = "knot.example" 684 685 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil { 686 t.Fatal(err) 687 } 688 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil) 689 690 fake := &fakeKnotConsumer{} 691 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 692 t.Fatalf("handle: %v", err) 693 } 694 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 695 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 696 } 697} 698 699// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple 700// repos on a knot does not unsubscribe — the survivors still want it. 701func TestRepoDeleteKeepsKnotIfShared(t *testing.T) { 702 s := newTestStore(t) 703 ctx := context.Background() 704 705 const ours = "tack.example" 706 const knot = "knot.example" 707 708 // Both publishers vouched for; otherwise IsKnotWanted would 709 // (correctly) report neither holds the knot and we'd unsubscribe. 710 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 711 t.Fatal(err) 712 } 713 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 714 t.Fatal(err) 715 } 716 717 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 718 t.Fatal(err) 719 } 720 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 721 t.Fatal(err) 722 } 723 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil) 724 725 fake := &fakeKnotConsumer{} 726 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 727 t.Fatalf("handle: %v", err) 728 } 729 if removed := fake.Removed(); len(removed) != 0 { 730 t.Fatalf("RemoveKnot calls = %v, want none", removed) 731 } 732}