Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Tests for handleJetstreamEvent. We exercise the full path — collection 4// dispatch, record decoding, store mutation, and cursor advancement — 5// using an in-memory store from store_test.go's newTestStore helper. 6// 7// Events are constructed by hand rather than recorded from a live 8// jetstream so the tests don't need network access and stay fast. 9 10import ( 11 "context" 12 "encoding/json" 13 "path/filepath" 14 "testing" 15 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18) 19 20// commitEvent builds a jetstream commit event for tests. timeUS is the 21// cursor value the handler should persist after applying. record can be 22// nil for delete operations, which carry no body. 23func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event { 24 var raw json.RawMessage 25 if record != nil { 26 b, err := json.Marshal(record) 27 if err != nil { 28 panic(err) // test helper — callers control the input 29 } 30 raw = b 31 } 32 return &jsmodels.Event{ 33 Did: did, 34 TimeUS: timeUS, 35 Kind: jsmodels.EventKindCommit, 36 Commit: &jsmodels.Commit{ 37 Operation: op, 38 Collection: collection, 39 RKey: rkey, 40 Record: raw, 41 }, 42 } 43} 44 45// requireCursor asserts the persisted cursor equals want. Pulled out 46// because almost every test in this file checks it. 47func requireCursor(t *testing.T, s *store, want int64) { 48 t.Helper() 49 got, err := s.LoadCursor(context.Background()) 50 if err != nil { 51 t.Fatalf("load cursor: %v", err) 52 } 53 if got == nil || *got != want { 54 t.Fatalf("cursor = %v, want %d", got, want) 55 } 56} 57 58// TestHandleNonCommitEvent confirms account/identity events are ignored 59// without error and without advancing the cursor — they don't have a 60// TimeUS we want to commit to. 61func TestHandleNonCommitEvent(t *testing.T) { 62 s := newTestStore(t) 63 ctx := context.Background() 64 65 evt := &jsmodels.Event{ 66 Did: "did:plc:foo", 67 TimeUS: 100, 68 Kind: jsmodels.EventKindAccount, 69 } 70 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 71 t.Fatalf("handle: %v", err) 72 } 73 got, err := s.LoadCursor(ctx) 74 if err != nil { 75 t.Fatalf("load cursor: %v", err) 76 } 77 if got != nil { 78 t.Fatalf("expected no cursor for non-commit, got %d", *got) 79 } 80} 81 82// TestHandleSpindleMemberCreate exercises the happy path: a create 83// commit lands a row in spindle_members and advances the cursor. 84func TestHandleSpindleMemberCreate(t *testing.T) { 85 s := newTestStore(t) 86 ctx := context.Background() 87 88 rec := tangled.SpindleMember{ 89 Instance: "https://spindle.example", 90 Subject: "did:plc:alice", 91 CreatedAt: "2026-01-01T00:00:00Z", 92 } 93 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 94 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 95 t.Fatalf("handle: %v", err) 96 } 97 98 var instance, subject string 99 err := s.db.QueryRowContext(ctx, 100 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`, 101 "did:plc:owner", "rk1", 102 ).Scan(&instance, &subject) 103 if err != nil { 104 t.Fatalf("query: %v", err) 105 } 106 if instance != "https://spindle.example" || subject != "did:plc:alice" { 107 t.Fatalf("got (%q,%q)", instance, subject) 108 } 109 requireCursor(t, s, 12345) 110} 111 112// TestHandleSpindleMemberDelete confirms a delete commit removes the 113// previously-persisted row. 114func TestHandleSpindleMemberDelete(t *testing.T) { 115 s := newTestStore(t) 116 ctx := context.Background() 117 118 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil { 119 t.Fatalf("seed: %v", err) 120 } 121 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 122 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 123 t.Fatalf("handle: %v", err) 124 } 125 if n := countRows(t, s, "spindle_members"); n != 0 { 126 t.Fatalf("after delete: %d rows, want 0", n) 127 } 128 requireCursor(t, s, 99) 129} 130 131// TestHandleRepoCreateOptionals verifies pointer-typed optional fields 132// on tangled.Repo are derefed correctly (and nils become empty strings) 133// when written to the store. 134func TestHandleRepoCreateOptionals(t *testing.T) { 135 s := newTestStore(t) 136 ctx := context.Background() 137 138 spindle := "https://spindle.example" 139 repoDid := "did:plc:repo" 140 rec := tangled.Repo{ 141 Knot: "knot.example", 142 Name: "myrepo", 143 Spindle: &spindle, 144 RepoDid: &repoDid, 145 CreatedAt: "2026-01-01T00:00:00Z", 146 } 147 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 148 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 149 t.Fatalf("handle: %v", err) 150 } 151 152 var gotSpindle, gotRepoDid string 153 err := s.db.QueryRowContext(ctx, 154 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 155 "did:plc:owner", "repo1", 156 ).Scan(&gotSpindle, &gotRepoDid) 157 if err != nil { 158 t.Fatalf("query: %v", err) 159 } 160 if gotSpindle != spindle || gotRepoDid != repoDid { 161 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid) 162 } 163 164 // Now a record with both optionals nil — should land as empty 165 // strings, not crash on a nil dereference in deref(). 166 rec2 := tangled.Repo{ 167 Knot: "knot.example", 168 Name: "other", 169 CreatedAt: "2026-01-01T00:00:00Z", 170 } 171 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 172 if err := handleJetstreamEvent(ctx, s, nil, "", evt2); err != nil { 173 t.Fatalf("handle nil-optionals: %v", err) 174 } 175 err = s.db.QueryRowContext(ctx, 176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 177 "did:plc:owner", "repo2", 178 ).Scan(&gotSpindle, &gotRepoDid) 179 if err != nil { 180 t.Fatalf("query nil-optionals: %v", err) 181 } 182 if gotSpindle != "" || gotRepoDid != "" { 183 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid) 184 } 185 requireCursor(t, s, 8) 186} 187 188// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each 189// collection has at least one apply test. 190func TestHandleRepoCollaboratorCreate(t *testing.T) { 191 s := newTestStore(t) 192 ctx := context.Background() 193 194 repo := "myrepo" 195 rec := tangled.RepoCollaborator{ 196 Repo: &repo, 197 Subject: "did:plc:carol", 198 CreatedAt: "2026-01-01T00:00:00Z", 199 } 200 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 201 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 202 t.Fatalf("handle: %v", err) 203 } 204 205 var subject string 206 err := s.db.QueryRowContext(ctx, 207 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`, 208 "did:plc:owner", "c1", 209 ).Scan(&subject) 210 if err != nil { 211 t.Fatalf("query: %v", err) 212 } 213 if subject != "did:plc:carol" { 214 t.Fatalf("subject = %q", subject) 215 } 216 requireCursor(t, s, 55) 217} 218 219// TestHandleUnknownCollection makes sure a collection we didn't ask for 220// (jetstream filter changes, schema drift) is silently dropped — no 221// error, no row, but cursor still advances so we don't replay it. 222func TestHandleUnknownCollection(t *testing.T) { 223 s := newTestStore(t) 224 ctx := context.Background() 225 226 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 227 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 228 t.Fatalf("handle: %v", err) 229 } 230 requireCursor(t, s, 42) 231} 232 233// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to 234// the happy paths: a malformed record body must be logged-and-skipped 235// (not returned as an error that pauses the scheduler) and the cursor 236// must still advance so we don't loop on the same bad event forever. 237func TestHandleBadRecordAdvancesCursor(t *testing.T) { 238 s := newTestStore(t) 239 ctx := context.Background() 240 241 evt := &jsmodels.Event{ 242 Did: "did:plc:owner", 243 TimeUS: 1000, 244 Kind: jsmodels.EventKindCommit, 245 Commit: &jsmodels.Commit{ 246 Operation: jsOpCreate, 247 Collection: tangled.SpindleMemberNSID, 248 RKey: "broken", 249 Record: json.RawMessage(`{not valid json`), 250 }, 251 } 252 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 253 t.Fatalf("handle should swallow decode error, got: %v", err) 254 } 255 if n := countRows(t, s, "spindle_members"); n != 0 { 256 t.Fatalf("bad record should not have inserted; got %d rows", n) 257 } 258 requireCursor(t, s, 1000) 259} 260 261// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to 262// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous 263// position when applyCommit fails for an *infrastructure* reason 264// (here: store closed mid-flight, simulating SQLite busy / shutdown 265// races). Saving the cursor through such a failure would permanently 266// skip a perfectly good record and silently lose membership state. 267func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) { 268 // Build the store inline (not via newTestStore) so the cleanup 269 // tolerates the explicit Close we do below to provoke errors. 270 path := filepath.Join(t.TempDir(), "tack.db") 271 s, err := openStore(path) 272 if err != nil { 273 t.Fatalf("openStore: %v", err) 274 } 275 ctx := context.Background() 276 277 // Seed a baseline cursor; after the failed apply it must still be 278 // 500, not 1000. 279 if err := s.SaveCursor(ctx, 500); err != nil { 280 t.Fatalf("seed cursor: %v", err) 281 } 282 283 // Close the underlying DB. Subsequent store calls will fail with 284 // "sql: database is closed", which is a transient-style error from 285 // applyCommit's perspective: the record itself is well-formed. 286 if err := s.db.Close(); err != nil { 287 t.Fatalf("close db: %v", err) 288 } 289 290 rec := tangled.SpindleMember{ 291 Instance: "https://spindle.example", 292 Subject: "did:plc:alice", 293 CreatedAt: "2026-01-01T00:00:00Z", 294 } 295 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 296 297 // Expect a non-nil error: handler propagates transient failures. 298 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err == nil { 299 t.Fatalf("handle should return transient error, got nil") 300 } 301 302 // Re-open the same DB file to inspect the persisted cursor; the 303 // closed handle obviously can't answer queries. 304 s2, err := openStore(path) 305 if err != nil { 306 t.Fatalf("re-open: %v", err) 307 } 308 defer s2.Close() 309 got, err := s2.LoadCursor(ctx) 310 if err != nil { 311 t.Fatalf("load cursor: %v", err) 312 } 313 if got == nil || *got != 500 { 314 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got) 315 } 316} 317 318// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 319// sh.tangled.repo whose .spindle field equals our hostname results in a 320// dynamic AddKnot call. This is the hot path for picking up new repos 321// without a tack restart. 322func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) { 323 s := newTestStore(t) 324 ctx := context.Background() 325 326 const ours = "tack.example" 327 spindle := ours 328 rec := tangled.Repo{ 329 Knot: "knot.example", 330 Name: "myrepo", 331 Spindle: &spindle, 332 CreatedAt: "2026-01-01T00:00:00Z", 333 } 334 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 335 336 fake := &fakeKnotConsumer{} 337 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 338 t.Fatalf("handle: %v", err) 339 } 340 added := fake.Added() 341 if len(added) != 1 || added[0] != "knot.example" { 342 t.Fatalf("AddKnot calls = %v, want [knot.example]", added) 343 } 344} 345 346// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 347// *different* spindle do not pull us into watching their knot. Without 348// this guard, tack would dial every knot named in any sh.tangled.repo 349// it sees over the firehose, which is most of them. 350func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) { 351 s := newTestStore(t) 352 ctx := context.Background() 353 354 other := "other-spindle.example" 355 rec := tangled.Repo{ 356 Knot: "knot.example", 357 Name: "myrepo", 358 Spindle: &other, 359 CreatedAt: "2026-01-01T00:00:00Z", 360 } 361 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 362 363 fake := &fakeKnotConsumer{} 364 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", evt); err != nil { 365 t.Fatalf("handle: %v", err) 366 } 367 if added := fake.Added(); len(added) != 0 { 368 t.Fatalf("AddKnot calls = %v, want none", added) 369 } 370} 371 372// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a 373// repo we'd previously been watching gets its .spindle field flipped to 374// some other spindle. Once that's the only repo we had on that knot, 375// the reconciliation must call RemoveKnot. 376func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) { 377 s := newTestStore(t) 378 ctx := context.Background() 379 380 const ours = "tack.example" 381 const knot = "knot.example" 382 383 // Seed: a repo that names us as its spindle on `knot`. 384 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil { 385 t.Fatal(err) 386 } 387 388 // Update: same record, now points at a different spindle. 389 other := "other.example" 390 rec := tangled.Repo{ 391 Knot: knot, 392 Name: "repo-a", 393 Spindle: &other, 394 CreatedAt: "2026-01-01T00:00:00Z", 395 } 396 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 397 398 fake := &fakeKnotConsumer{} 399 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 400 t.Fatalf("handle: %v", err) 401 } 402 if added := fake.Added(); len(added) != 0 { 403 t.Fatalf("AddKnot calls = %v, want none", added) 404 } 405 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 406 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 407 } 408} 409 410// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't 411// over-eagerly unsubscribe from a knot when one of multiple repos on 412// that knot moves away from us. The other repo's subscription must keep 413// the knot in our wanted set. 414func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) { 415 s := newTestStore(t) 416 ctx := context.Background() 417 418 const ours = "tack.example" 419 const knot = "knot.example" 420 421 // Two repos sharing one knot, both pointed at us. 422 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 423 t.Fatal(err) 424 } 425 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 426 t.Fatal(err) 427 } 428 429 // Repo A flips to a different spindle. B still wants us. 430 other := "other.example" 431 rec := tangled.Repo{ 432 Knot: knot, 433 Name: "a", 434 Spindle: &other, 435 CreatedAt: "2026-01-01T00:00:00Z", 436 } 437 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec) 438 439 fake := &fakeKnotConsumer{} 440 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 441 t.Fatalf("handle: %v", err) 442 } 443 if removed := fake.Removed(); len(removed) != 0 { 444 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot) 445 } 446} 447 448// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo 449// staying with us but changing its .knot field unsubscribes the old 450// knot (if no other repo holds it) and subscribes the new one. 451func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) { 452 s := newTestStore(t) 453 ctx := context.Background() 454 455 const ours = "tack.example" 456 const oldKnot = "old.example" 457 const newKnot = "new.example" 458 459 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil { 460 t.Fatal(err) 461 } 462 463 spindle := ours 464 rec := tangled.Repo{ 465 Knot: newKnot, 466 Name: "a", 467 Spindle: &spindle, 468 CreatedAt: "2026-01-01T00:00:00Z", 469 } 470 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 471 472 fake := &fakeKnotConsumer{} 473 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 474 t.Fatalf("handle: %v", err) 475 } 476 if added := fake.Added(); len(added) != 1 || added[0] != newKnot { 477 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot) 478 } 479 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot { 480 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot) 481 } 482} 483 484// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on 485// a knot we cared about triggers RemoveKnot. 486func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) { 487 s := newTestStore(t) 488 ctx := context.Background() 489 490 const ours = "tack.example" 491 const knot = "knot.example" 492 493 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil { 494 t.Fatal(err) 495 } 496 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil) 497 498 fake := &fakeKnotConsumer{} 499 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 500 t.Fatalf("handle: %v", err) 501 } 502 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 503 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 504 } 505} 506 507// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple 508// repos on a knot does not unsubscribe — the survivors still want it. 509func TestRepoDeleteKeepsKnotIfShared(t *testing.T) { 510 s := newTestStore(t) 511 ctx := context.Background() 512 513 const ours = "tack.example" 514 const knot = "knot.example" 515 516 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 517 t.Fatal(err) 518 } 519 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 520 t.Fatal(err) 521 } 522 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil) 523 524 fake := &fakeKnotConsumer{} 525 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 526 t.Fatalf("handle: %v", err) 527 } 528 if removed := fake.Removed(); len(removed) != 0 { 529 t.Fatalf("RemoveKnot calls = %v, want none", removed) 530 } 531}