Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

at main 4.1 kB View raw
1package jetstream 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 9 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 10) 11 12// Handler applies one commit event. Returning BadRecord(err) tells the 13// Processor that the event is permanently unusable and that the cursor should 14// still advance; any other error is treated as transient and leaves the cursor 15// unchanged for a later retry. 16type Handler interface { 17 HandleJetstreamEvent(context.Context, *jsmodels.Event) error 18} 19 20// HandlerFunc adapts a function to Handler. 21type HandlerFunc func(context.Context, *jsmodels.Event) error 22 23// HandleJetstreamEvent calls f(ctx, event). 24func (f HandlerFunc) HandleJetstreamEvent(ctx context.Context, event *jsmodels.Event) error { 25 return f(ctx, event) 26} 27 28// Processor handles per-event policy that is independent of websocket IO. 29type Processor struct { 30 // Collections is the set of record collection NSIDs to process locally. An 31 // empty slice means "all collections". 32 Collections []string 33 34 // CursorStore persists progress after events are handled. 35 CursorStore CursorStore 36 37 // Handler applies commit events that pass the collection filter. 38 Handler Handler 39 40 // Logger receives apply errors and ignored-collection diagnostics. A nil 41 // Logger uses slog.Default(). 42 Logger *slog.Logger 43} 44 45// HandleEvent applies a single jetstream event and advances the cursor when it 46// is safe to do so. 47func (p *Processor) HandleEvent(ctx context.Context, event *jsmodels.Event) error { 48 if event == nil || event.Kind != jsmodels.EventKindCommit || event.Commit == nil { 49 return nil 50 } 51 52 if err := p.validate(); err != nil { 53 return err 54 } 55 56 logger := loggerOrDefault(p.Logger) 57 wanted, err := p.wantsCollection(event.Commit.Collection) 58 if err != nil { 59 return err 60 } 61 if !wanted { 62 logger.Debug("ignoring unexpected collection", 63 "collection", event.Commit.Collection) 64 return p.saveCursor(ctx, event.TimeUS) 65 } 66 67 applyErr := p.Handler.HandleJetstreamEvent(ctx, event) 68 if applyErr != nil { 69 logger.Error("apply commit", 70 "err", applyErr, 71 "did", event.Did, 72 "collection", event.Commit.Collection, 73 "op", event.Commit.Operation, 74 "rkey", event.Commit.RKey, 75 "transient", !IsBadRecord(applyErr), 76 ) 77 78 if !IsBadRecord(applyErr) { 79 return applyErr 80 } 81 } 82 83 return p.saveCursor(ctx, event.TimeUS) 84} 85 86func (p *Processor) saveCursor(ctx context.Context, cursor int64) error { 87 if err := p.CursorStore.SaveCursor(ctx, cursor); err != nil { 88 return fmt.Errorf("save cursor: %w", err) 89 } 90 return nil 91} 92 93func (p *Processor) validate() error { 94 if p.CursorStore == nil { 95 return errors.New("cursor store is required") 96 } 97 if p.Handler == nil { 98 return errors.New("handler is required") 99 } 100 for _, collection := range p.Collections { 101 if collection == "" { 102 return errors.New("collection must not be empty") 103 } 104 } 105 return nil 106} 107 108func (p *Processor) wantsCollection(collection string) (bool, error) { 109 if len(p.Collections) == 0 { 110 return true, nil 111 } 112 for _, wanted := range p.Collections { 113 if wanted == "" { 114 return false, errors.New("collection must not be empty") 115 } 116 if wanted == collection { 117 return true, nil 118 } 119 } 120 return false, nil 121} 122 123// badRecordError marks a handler failure as caused by the record itself being 124// permanently unusable, e.g. malformed JSON or an unrecoverable schema 125// violation. Processors advance the cursor past these errors so one bad event 126// cannot stall every later event on restart. 127type badRecordError struct{ err error } 128 129func (e *badRecordError) Error() string { return e.err.Error() } 130func (e *badRecordError) Unwrap() error { return e.err } 131 132// BadRecord wraps err so Processor recognizes it as a permanent, 133// cursor-advancing failure. Do not use this for storage, network, or other 134// transient infrastructure failures. 135func BadRecord(err error) error { 136 if err == nil { 137 return nil 138 } 139 return &badRecordError{err: err} 140} 141 142// IsBadRecord reports whether err, or anything it wraps, came from BadRecord. 143func IsBadRecord(err error) bool { 144 var b *badRecordError 145 return errors.As(err, &b) 146}