Stitch any CI into Tangled
1package jetstream
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8
9 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
10)
11
12// Handler applies one commit event. Returning BadRecord(err) tells the
13// Processor that the event is permanently unusable and that the cursor should
14// still advance; any other error is treated as transient and leaves the cursor
15// unchanged for a later retry.
16type Handler interface {
17 HandleJetstreamEvent(context.Context, *jsmodels.Event) error
18}
19
20// HandlerFunc adapts a function to Handler.
21type HandlerFunc func(context.Context, *jsmodels.Event) error
22
23// HandleJetstreamEvent calls f(ctx, event).
24func (f HandlerFunc) HandleJetstreamEvent(ctx context.Context, event *jsmodels.Event) error {
25 return f(ctx, event)
26}
27
28// Processor handles per-event policy that is independent of websocket IO.
29type Processor struct {
30 // Collections is the set of record collection NSIDs to process locally. An
31 // empty slice means "all collections".
32 Collections []string
33
34 // CursorStore persists progress after events are handled.
35 CursorStore CursorStore
36
37 // Handler applies commit events that pass the collection filter.
38 Handler Handler
39
40 // Logger receives apply errors and ignored-collection diagnostics. A nil
41 // Logger uses slog.Default().
42 Logger *slog.Logger
43}
44
45// HandleEvent applies a single jetstream event and advances the cursor when it
46// is safe to do so.
47func (p *Processor) HandleEvent(ctx context.Context, event *jsmodels.Event) error {
48 if event == nil || event.Kind != jsmodels.EventKindCommit || event.Commit == nil {
49 return nil
50 }
51
52 if err := p.validate(); err != nil {
53 return err
54 }
55
56 logger := loggerOrDefault(p.Logger)
57 wanted, err := p.wantsCollection(event.Commit.Collection)
58 if err != nil {
59 return err
60 }
61 if !wanted {
62 logger.Debug("ignoring unexpected collection",
63 "collection", event.Commit.Collection)
64 return p.saveCursor(ctx, event.TimeUS)
65 }
66
67 applyErr := p.Handler.HandleJetstreamEvent(ctx, event)
68 if applyErr != nil {
69 logger.Error("apply commit",
70 "err", applyErr,
71 "did", event.Did,
72 "collection", event.Commit.Collection,
73 "op", event.Commit.Operation,
74 "rkey", event.Commit.RKey,
75 "transient", !IsBadRecord(applyErr),
76 )
77
78 if !IsBadRecord(applyErr) {
79 return applyErr
80 }
81 }
82
83 return p.saveCursor(ctx, event.TimeUS)
84}
85
86func (p *Processor) saveCursor(ctx context.Context, cursor int64) error {
87 if err := p.CursorStore.SaveCursor(ctx, cursor); err != nil {
88 return fmt.Errorf("save cursor: %w", err)
89 }
90 return nil
91}
92
93func (p *Processor) validate() error {
94 if p.CursorStore == nil {
95 return errors.New("cursor store is required")
96 }
97 if p.Handler == nil {
98 return errors.New("handler is required")
99 }
100 for _, collection := range p.Collections {
101 if collection == "" {
102 return errors.New("collection must not be empty")
103 }
104 }
105 return nil
106}
107
108func (p *Processor) wantsCollection(collection string) (bool, error) {
109 if len(p.Collections) == 0 {
110 return true, nil
111 }
112 for _, wanted := range p.Collections {
113 if wanted == "" {
114 return false, errors.New("collection must not be empty")
115 }
116 if wanted == collection {
117 return true, nil
118 }
119 }
120 return false, nil
121}
122
123// badRecordError marks a handler failure as caused by the record itself being
124// permanently unusable, e.g. malformed JSON or an unrecoverable schema
125// violation. Processors advance the cursor past these errors so one bad event
126// cannot stall every later event on restart.
127type badRecordError struct{ err error }
128
129func (e *badRecordError) Error() string { return e.err.Error() }
130func (e *badRecordError) Unwrap() error { return e.err }
131
132// BadRecord wraps err so Processor recognizes it as a permanent,
133// cursor-advancing failure. Do not use this for storage, network, or other
134// transient infrastructure failures.
135func BadRecord(err error) error {
136 if err == nil {
137 return nil
138 }
139 return &badRecordError{err: err}
140}
141
142// IsBadRecord reports whether err, or anything it wraps, came from BadRecord.
143func IsBadRecord(err error) bool {
144 var b *badRecordError
145 return errors.As(err, &b)
146}