Stitch any CI into Tangled
1package jetstream
2
3import (
4 "context"
5 "errors"
6 "io"
7 "log/slog"
8 "testing"
9
10 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
11)
12
13func testLogger() *slog.Logger {
14 return slog.New(slog.NewTextHandler(io.Discard, nil))
15}
16
17func requireMemoryCursor(t *testing.T, store CursorStore, want *int64) {
18 t.Helper()
19
20 got, err := store.LoadCursor(context.Background())
21 if err != nil {
22 t.Fatalf("load cursor: %v", err)
23 }
24 if want == nil {
25 if got != nil {
26 t.Fatalf("cursor = %v, want nil", *got)
27 }
28 return
29 }
30 if got == nil || *got != *want {
31 t.Fatalf("cursor = %v, want %d", got, *want)
32 }
33}
34
35func testCommit(timeUS int64, collection string) *jsmodels.Event {
36 return &jsmodels.Event{
37 Did: "did:plc:test",
38 TimeUS: timeUS,
39 Kind: jsmodels.EventKindCommit,
40 Commit: &jsmodels.Commit{
41 Operation: "create",
42 Collection: collection,
43 RKey: "rk",
44 },
45 }
46}
47
48func TestProcessorCommitAdvancesCursor(t *testing.T) {
49 store := NewMemoryCursorStore(nil)
50 called := false
51
52 processor := &Processor{
53 Collections: []string{"example.collection"},
54 CursorStore: store,
55 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error {
56 called = true
57 return nil
58 }),
59 Logger: testLogger(),
60 }
61
62 if err := processor.HandleEvent(context.Background(), testCommit(123, "example.collection")); err != nil {
63 t.Fatalf("handle: %v", err)
64 }
65 if !called {
66 t.Fatalf("handler was not called")
67 }
68 want := int64(123)
69 requireMemoryCursor(t, store, &want)
70}
71
72func TestProcessorIgnoresNonCommit(t *testing.T) {
73 store := NewMemoryCursorStore(nil)
74 processor := &Processor{
75 CursorStore: store,
76 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error {
77 t.Fatalf("handler should not be called")
78 return nil
79 }),
80 Logger: testLogger(),
81 }
82
83 event := &jsmodels.Event{
84 Did: "did:plc:test",
85 TimeUS: 123,
86 Kind: jsmodels.EventKindAccount,
87 }
88 if err := processor.HandleEvent(context.Background(), event); err != nil {
89 t.Fatalf("handle: %v", err)
90 }
91 requireMemoryCursor(t, store, nil)
92}
93
94func TestProcessorUnexpectedCollectionAdvancesCursor(t *testing.T) {
95 store := NewMemoryCursorStore(nil)
96 processor := &Processor{
97 Collections: []string{"wanted.collection"},
98 CursorStore: store,
99 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error {
100 t.Fatalf("handler should not be called")
101 return nil
102 }),
103 Logger: testLogger(),
104 }
105
106 if err := processor.HandleEvent(context.Background(), testCommit(456, "other.collection")); err != nil {
107 t.Fatalf("handle: %v", err)
108 }
109 want := int64(456)
110 requireMemoryCursor(t, store, &want)
111}
112
113func TestProcessorBadRecordAdvancesCursor(t *testing.T) {
114 store := NewMemoryCursorStore(nil)
115 processor := &Processor{
116 Collections: []string{"example.collection"},
117 CursorStore: store,
118 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error {
119 return BadRecord(errors.New("decode failed"))
120 }),
121 Logger: testLogger(),
122 }
123
124 if err := processor.HandleEvent(context.Background(), testCommit(789, "example.collection")); err != nil {
125 t.Fatalf("handle: %v", err)
126 }
127 want := int64(789)
128 requireMemoryCursor(t, store, &want)
129}
130
131func TestProcessorTransientErrorDoesNotAdvanceCursor(t *testing.T) {
132 cursor := int64(100)
133 store := NewMemoryCursorStore(&cursor)
134 transientErr := errors.New("database busy")
135 processor := &Processor{
136 Collections: []string{"example.collection"},
137 CursorStore: store,
138 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error {
139 return transientErr
140 }),
141 Logger: testLogger(),
142 }
143
144 err := processor.HandleEvent(context.Background(), testCommit(200, "example.collection"))
145 if !errors.Is(err, transientErr) {
146 t.Fatalf("handle error = %v, want %v", err, transientErr)
147 }
148 requireMemoryCursor(t, store, &cursor)
149}