Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.3 kB View raw
1package jetstream 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/bluesky-social/jetstream/pkg/client" 11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 12) 13 14const ( 15 // DefaultRewind is the cursor safety buffer used on reconnect. Jetstream 16 // cursors are time-based, so reconnecting a few seconds before the saved 17 // cursor avoids exact-boundary gaps at the cost of harmless duplicate 18 // deliveries for idempotent handlers. 19 DefaultRewind = 5 * time.Second 20 21 // DefaultReconnectDelay is the pause between failed websocket reads. 22 DefaultReconnectDelay = 2 * time.Second 23) 24 25// Config configures a reusable jetstream consumer. Use Processor directly when 26// you already have events from another source and do not need websocket setup. 27type Config struct { 28 // WebsocketURL is the jetstream endpoint used by Consumer. 29 WebsocketURL string 30 31 // Collections is the set of record collection NSIDs to request from 32 // jetstream and process locally. An empty slice means "all collections", 33 // matching the upstream client behavior. 34 Collections []string 35 36 CursorStore CursorStore 37 Handler Handler 38 Logger *slog.Logger 39 40 // SchedulerIdent names the upstream sequential scheduler. It defaults to 41 // "jetstream". 42 SchedulerIdent string 43 44 // Rewind controls how far before a saved cursor the Consumer reconnects. 45 // Zero uses DefaultRewind. 46 Rewind time.Duration 47 48 // ReconnectDelay controls how long the Consumer waits before reconnecting 49 // after a failed read loop. Zero uses DefaultReconnectDelay. 50 ReconnectDelay time.Duration 51} 52 53// Consumer owns the upstream jetstream client and reconnect loop. 54type Consumer struct { 55 client *client.Client 56 processor *Processor 57 cursorStore CursorStore 58 logger *slog.Logger 59 rewind time.Duration 60 reconnectDelay time.Duration 61} 62 63// NewConsumer builds a Consumer. Call Run to enter the blocking reconnect loop, 64// or use Start to run it in a background goroutine. 65func NewConsumer(cfg Config) (*Consumer, error) { 66 if cfg.WebsocketURL == "" { 67 return nil, errors.New("websocket URL is required") 68 } 69 70 processor := &Processor{ 71 Collections: cfg.Collections, 72 CursorStore: cfg.CursorStore, 73 Handler: cfg.Handler, 74 Logger: cfg.Logger, 75 } 76 if err := processor.validate(); err != nil { 77 return nil, err 78 } 79 80 cfg = withDefaults(cfg) 81 clientCfg := client.DefaultClientConfig() 82 clientCfg.WebsocketURL = cfg.WebsocketURL 83 clientCfg.WantedCollections = append([]string(nil), cfg.Collections...) 84 85 c, err := client.NewClient( 86 clientCfg, 87 cfg.Logger, 88 sequential.NewScheduler(cfg.SchedulerIdent, cfg.Logger, processor.HandleEvent), 89 ) 90 if err != nil { 91 return nil, fmt.Errorf("new jetstream client: %w", err) 92 } 93 94 return &Consumer{ 95 client: c, 96 processor: processor, 97 cursorStore: cfg.CursorStore, 98 logger: cfg.Logger, 99 rewind: cfg.Rewind, 100 reconnectDelay: cfg.ReconnectDelay, 101 }, nil 102} 103 104// Start creates a Consumer and runs it in a background goroutine for the 105// lifetime of ctx. 106func Start(ctx context.Context, cfg Config) (*Consumer, error) { 107 c, err := NewConsumer(cfg) 108 if err != nil { 109 return nil, err 110 } 111 go c.Run(ctx) 112 return c, nil 113} 114 115// Run consumes jetstream until ctx is cancelled. Connection failures are logged 116// and retried after ReconnectDelay because the websocket is expected to be 117// long-lived but not permanent. 118func (c *Consumer) Run(ctx context.Context) { 119 for { 120 if ctx.Err() != nil { 121 return 122 } 123 124 cur, err := c.cursorStore.LoadCursor(ctx) 125 if err != nil { 126 c.logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 127 cur = nil 128 } 129 130 cursorForLog := cur 131 cursorForRead := c.rewindCursor(cur) 132 if cursorForRead != nil { 133 c.logger.Info("connecting to jetstream", 134 "cursor_us", *cursorForLog, 135 "rewound_us", *cursorForRead, 136 "rewind", c.rewind, 137 ) 138 } else { 139 c.logger.Info("connecting to jetstream from now (no cursor)") 140 } 141 142 if err := c.client.ConnectAndRead(ctx, cursorForRead); err != nil { 143 if ctx.Err() != nil { 144 return 145 } 146 c.logger.Error("jetstream read loop", "err", err) 147 select { 148 case <-ctx.Done(): 149 return 150 case <-time.After(c.reconnectDelay): 151 } 152 continue 153 } 154 if ctx.Err() != nil { 155 return 156 } 157 } 158} 159 160// rewindCursor returns the cursor value to hand to jetstream on reconnect. 161// Jetstream cursors are microsecond timestamps and exact-boundary replay is not 162// guaranteed, so we intentionally resume slightly before the last saved cursor. 163// The duplicate window is expected to be safe for idempotent handlers; clamping 164// at zero keeps brand-new or very-early cursors valid. 165func (c *Consumer) rewindCursor(cur *int64) *int64 { 166 if cur == nil { 167 return nil 168 } 169 rewound := *cur - int64(c.rewind/time.Microsecond) 170 if rewound < 0 { 171 rewound = 0 172 } 173 return &rewound 174} 175 176func withDefaults(cfg Config) Config { 177 cfg.Logger = loggerOrDefault(cfg.Logger) 178 if cfg.SchedulerIdent == "" { 179 cfg.SchedulerIdent = "jetstream" 180 } 181 if cfg.Rewind == 0 { 182 cfg.Rewind = DefaultRewind 183 } 184 if cfg.ReconnectDelay == 0 { 185 cfg.ReconnectDelay = DefaultReconnectDelay 186 } 187 return cfg 188} 189 190func loggerOrDefault(logger *slog.Logger) *slog.Logger { 191 if logger != nil { 192 return logger 193 } 194 return slog.Default() 195}