Stitch any CI into Tangled
1package jetstream
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/jetstream/pkg/client"
11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12)
13
14const (
15 // DefaultRewind is the cursor safety buffer used on reconnect. Jetstream
16 // cursors are time-based, so reconnecting a few seconds before the saved
17 // cursor avoids exact-boundary gaps at the cost of harmless duplicate
18 // deliveries for idempotent handlers.
19 DefaultRewind = 5 * time.Second
20
21 // DefaultReconnectDelay is the pause between failed websocket reads.
22 DefaultReconnectDelay = 2 * time.Second
23)
24
25// Config configures a reusable jetstream consumer. Use Processor directly when
26// you already have events from another source and do not need websocket setup.
27type Config struct {
28 // WebsocketURL is the jetstream endpoint used by Consumer.
29 WebsocketURL string
30
31 // Collections is the set of record collection NSIDs to request from
32 // jetstream and process locally. An empty slice means "all collections",
33 // matching the upstream client behavior.
34 Collections []string
35
36 CursorStore CursorStore
37 Handler Handler
38 Logger *slog.Logger
39
40 // SchedulerIdent names the upstream sequential scheduler. It defaults to
41 // "jetstream".
42 SchedulerIdent string
43
44 // Rewind controls how far before a saved cursor the Consumer reconnects.
45 // Zero uses DefaultRewind.
46 Rewind time.Duration
47
48 // ReconnectDelay controls how long the Consumer waits before reconnecting
49 // after a failed read loop. Zero uses DefaultReconnectDelay.
50 ReconnectDelay time.Duration
51}
52
53// Consumer owns the upstream jetstream client and reconnect loop.
54type Consumer struct {
55 client *client.Client
56 processor *Processor
57 cursorStore CursorStore
58 logger *slog.Logger
59 rewind time.Duration
60 reconnectDelay time.Duration
61}
62
63// NewConsumer builds a Consumer. Call Run to enter the blocking reconnect loop,
64// or use Start to run it in a background goroutine.
65func NewConsumer(cfg Config) (*Consumer, error) {
66 if cfg.WebsocketURL == "" {
67 return nil, errors.New("websocket URL is required")
68 }
69
70 processor := &Processor{
71 Collections: cfg.Collections,
72 CursorStore: cfg.CursorStore,
73 Handler: cfg.Handler,
74 Logger: cfg.Logger,
75 }
76 if err := processor.validate(); err != nil {
77 return nil, err
78 }
79
80 cfg = withDefaults(cfg)
81 clientCfg := client.DefaultClientConfig()
82 clientCfg.WebsocketURL = cfg.WebsocketURL
83 clientCfg.WantedCollections = append([]string(nil), cfg.Collections...)
84
85 c, err := client.NewClient(
86 clientCfg,
87 cfg.Logger,
88 sequential.NewScheduler(cfg.SchedulerIdent, cfg.Logger, processor.HandleEvent),
89 )
90 if err != nil {
91 return nil, fmt.Errorf("new jetstream client: %w", err)
92 }
93
94 return &Consumer{
95 client: c,
96 processor: processor,
97 cursorStore: cfg.CursorStore,
98 logger: cfg.Logger,
99 rewind: cfg.Rewind,
100 reconnectDelay: cfg.ReconnectDelay,
101 }, nil
102}
103
104// Start creates a Consumer and runs it in a background goroutine for the
105// lifetime of ctx.
106func Start(ctx context.Context, cfg Config) (*Consumer, error) {
107 c, err := NewConsumer(cfg)
108 if err != nil {
109 return nil, err
110 }
111 go c.Run(ctx)
112 return c, nil
113}
114
115// Run consumes jetstream until ctx is cancelled. Connection failures are logged
116// and retried after ReconnectDelay because the websocket is expected to be
117// long-lived but not permanent.
118func (c *Consumer) Run(ctx context.Context) {
119 for {
120 if ctx.Err() != nil {
121 return
122 }
123
124 cur, err := c.cursorStore.LoadCursor(ctx)
125 if err != nil {
126 c.logger.Warn("ignoring unreadable cursor; resuming from now", "err", err)
127 cur = nil
128 }
129
130 cursorForLog := cur
131 cursorForRead := c.rewindCursor(cur)
132 if cursorForRead != nil {
133 c.logger.Info("connecting to jetstream",
134 "cursor_us", *cursorForLog,
135 "rewound_us", *cursorForRead,
136 "rewind", c.rewind,
137 )
138 } else {
139 c.logger.Info("connecting to jetstream from now (no cursor)")
140 }
141
142 if err := c.client.ConnectAndRead(ctx, cursorForRead); err != nil {
143 if ctx.Err() != nil {
144 return
145 }
146 c.logger.Error("jetstream read loop", "err", err)
147 select {
148 case <-ctx.Done():
149 return
150 case <-time.After(c.reconnectDelay):
151 }
152 continue
153 }
154 if ctx.Err() != nil {
155 return
156 }
157 }
158}
159
160// rewindCursor returns the cursor value to hand to jetstream on reconnect.
161// Jetstream cursors are microsecond timestamps and exact-boundary replay is not
162// guaranteed, so we intentionally resume slightly before the last saved cursor.
163// The duplicate window is expected to be safe for idempotent handlers; clamping
164// at zero keeps brand-new or very-early cursors valid.
165func (c *Consumer) rewindCursor(cur *int64) *int64 {
166 if cur == nil {
167 return nil
168 }
169 rewound := *cur - int64(c.rewind/time.Microsecond)
170 if rewound < 0 {
171 rewound = 0
172 }
173 return &rewound
174}
175
176func withDefaults(cfg Config) Config {
177 cfg.Logger = loggerOrDefault(cfg.Logger)
178 if cfg.SchedulerIdent == "" {
179 cfg.SchedulerIdent = "jetstream"
180 }
181 if cfg.Rewind == 0 {
182 cfg.Rewind = DefaultRewind
183 }
184 if cfg.ReconnectDelay == 0 {
185 cfg.ReconnectDelay = DefaultReconnectDelay
186 }
187 return cfg
188}
189
190func loggerOrDefault(logger *slog.Logger) *slog.Logger {
191 if logger != nil {
192 return logger
193 }
194 return slog.Default()
195}