Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 3.8 kB View raw
1package jetstream 2 3import ( 4 "context" 5 "errors" 6 "io" 7 "log/slog" 8 "testing" 9 10 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 11) 12 13func testLogger() *slog.Logger { 14 return slog.New(slog.NewTextHandler(io.Discard, nil)) 15} 16 17func requireMemoryCursor(t *testing.T, store CursorStore, want *int64) { 18 t.Helper() 19 20 got, err := store.LoadCursor(context.Background()) 21 if err != nil { 22 t.Fatalf("load cursor: %v", err) 23 } 24 if want == nil { 25 if got != nil { 26 t.Fatalf("cursor = %v, want nil", *got) 27 } 28 return 29 } 30 if got == nil || *got != *want { 31 t.Fatalf("cursor = %v, want %d", got, *want) 32 } 33} 34 35func testCommit(timeUS int64, collection string) *jsmodels.Event { 36 return &jsmodels.Event{ 37 Did: "did:plc:test", 38 TimeUS: timeUS, 39 Kind: jsmodels.EventKindCommit, 40 Commit: &jsmodels.Commit{ 41 Operation: "create", 42 Collection: collection, 43 RKey: "rk", 44 }, 45 } 46} 47 48func TestProcessorCommitAdvancesCursor(t *testing.T) { 49 store := NewMemoryCursorStore(nil) 50 called := false 51 52 processor := &Processor{ 53 Collections: []string{"example.collection"}, 54 CursorStore: store, 55 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 56 called = true 57 return nil 58 }), 59 Logger: testLogger(), 60 } 61 62 if err := processor.HandleEvent(context.Background(), testCommit(123, "example.collection")); err != nil { 63 t.Fatalf("handle: %v", err) 64 } 65 if !called { 66 t.Fatalf("handler was not called") 67 } 68 want := int64(123) 69 requireMemoryCursor(t, store, &want) 70} 71 72func TestProcessorIgnoresNonCommit(t *testing.T) { 73 store := NewMemoryCursorStore(nil) 74 processor := &Processor{ 75 CursorStore: store, 76 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 77 t.Fatalf("handler should not be called") 78 return nil 79 }), 80 Logger: testLogger(), 81 } 82 83 event := &jsmodels.Event{ 84 Did: "did:plc:test", 85 TimeUS: 123, 86 Kind: jsmodels.EventKindAccount, 87 } 88 if err := processor.HandleEvent(context.Background(), event); err != nil { 89 t.Fatalf("handle: %v", err) 90 } 91 requireMemoryCursor(t, store, nil) 92} 93 94func TestProcessorUnexpectedCollectionAdvancesCursor(t *testing.T) { 95 store := NewMemoryCursorStore(nil) 96 processor := &Processor{ 97 Collections: []string{"wanted.collection"}, 98 CursorStore: store, 99 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 100 t.Fatalf("handler should not be called") 101 return nil 102 }), 103 Logger: testLogger(), 104 } 105 106 if err := processor.HandleEvent(context.Background(), testCommit(456, "other.collection")); err != nil { 107 t.Fatalf("handle: %v", err) 108 } 109 want := int64(456) 110 requireMemoryCursor(t, store, &want) 111} 112 113func TestProcessorBadRecordAdvancesCursor(t *testing.T) { 114 store := NewMemoryCursorStore(nil) 115 processor := &Processor{ 116 Collections: []string{"example.collection"}, 117 CursorStore: store, 118 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 119 return BadRecord(errors.New("decode failed")) 120 }), 121 Logger: testLogger(), 122 } 123 124 if err := processor.HandleEvent(context.Background(), testCommit(789, "example.collection")); err != nil { 125 t.Fatalf("handle: %v", err) 126 } 127 want := int64(789) 128 requireMemoryCursor(t, store, &want) 129} 130 131func TestProcessorTransientErrorDoesNotAdvanceCursor(t *testing.T) { 132 cursor := int64(100) 133 store := NewMemoryCursorStore(&cursor) 134 transientErr := errors.New("database busy") 135 processor := &Processor{ 136 Collections: []string{"example.collection"}, 137 CursorStore: store, 138 Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 139 return transientErr 140 }), 141 Logger: testLogger(), 142 } 143 144 err := processor.HandleEvent(context.Background(), testCommit(200, "example.collection")) 145 if !errors.Is(err, transientErr) { 146 t.Fatalf("handle error = %v, want %v", err, transientErr) 147 } 148 requireMemoryCursor(t, store, &cursor) 149}