Monorepo for Tangled
tangled.org
1package eventconsumer
2
3import (
4 "context"
5 "encoding/json"
6 "log/slog"
7 "math/rand"
8 "net/url"
9 "sync"
10 "time"
11
12 "tangled.org/core/eventconsumer/cursor"
13 "tangled.org/core/log"
14
15 "github.com/avast/retry-go/v4"
16 "github.com/gorilla/websocket"
17)
18
19type ProcessFunc func(ctx context.Context, source Source, message Message) error
20
21type Message struct {
22 Rkey string
23 Nsid string
24 Created int64 `json:"created"`
25 EventJson json.RawMessage `json:"event"`
26}
27
28type ConsumerConfig struct {
29 Sources map[Source]struct{}
30 ProcessFunc ProcessFunc
31 RetryInterval time.Duration
32 MaxRetryInterval time.Duration
33 ConnectionTimeout time.Duration
34 WorkerCount int
35 QueueSize int
36 Logger *slog.Logger
37 Dev bool
38 CursorStore cursor.Store
39}
40
41func NewConsumerConfig() *ConsumerConfig {
42 return &ConsumerConfig{
43 Sources: make(map[Source]struct{}),
44 }
45}
46
47type Source interface {
48 // url to start streaming events from
49 Url(cursor int64, dev bool) (*url.URL, error)
50 // cache key for cursor storage
51 Key() string
52}
53
54type Consumer struct {
55 wg sync.WaitGroup
56 dialer *websocket.Dialer
57 jobQueue chan job
58 logger *slog.Logger
59 randSource *rand.Rand
60
61 // sourcesMu guards sources. It must only be held for short, non-blocking
62 // map operations; never across a blocking call (dial, read, close).
63 sourcesMu sync.Mutex
64 sources map[Source]*sourceState
65
66 cfg ConsumerConfig
67}
68
69type sourceState struct {
70 cancel context.CancelFunc
71 conn *websocket.Conn
72}
73
74type job struct {
75 source Source
76 message []byte
77}
78
79func NewConsumer(cfg ConsumerConfig) *Consumer {
80 if cfg.RetryInterval == 0 {
81 cfg.RetryInterval = 15 * time.Minute
82 }
83 if cfg.ConnectionTimeout == 0 {
84 cfg.ConnectionTimeout = 10 * time.Second
85 }
86 if cfg.WorkerCount <= 0 {
87 cfg.WorkerCount = 5
88 }
89 if cfg.MaxRetryInterval == 0 {
90 cfg.MaxRetryInterval = 1 * time.Hour
91 }
92 if cfg.Logger == nil {
93 cfg.Logger = log.New("consumer")
94 }
95 if cfg.QueueSize == 0 {
96 cfg.QueueSize = 100
97 }
98 if cfg.CursorStore == nil {
99 cfg.CursorStore = &cursor.MemoryStore{}
100 }
101 return &Consumer{
102 cfg: cfg,
103 dialer: websocket.DefaultDialer,
104 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue
105 logger: cfg.Logger,
106 randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
107 sources: make(map[Source]*sourceState),
108 }
109}
110
111func (c *Consumer) Start(ctx context.Context) {
112 c.cfg.Logger.Info("starting consumer", "config", c.cfg)
113
114 // start workers
115 for range c.cfg.WorkerCount {
116 c.wg.Add(1)
117 go c.worker(ctx)
118 }
119
120 // start streaming
121 for source := range c.cfg.Sources {
122 c.AddSource(ctx, source)
123 }
124}
125
126func (c *Consumer) Stop() {
127 // snapshot conns under lock so we don't hold sourcesMu across Close
128 c.sourcesMu.Lock()
129 conns := make([]*websocket.Conn, 0, len(c.sources))
130 for _, st := range c.sources {
131 if st.conn != nil {
132 conns = append(conns, st.conn)
133 }
134 }
135 c.sourcesMu.Unlock()
136
137 for _, conn := range conns {
138 conn.Close()
139 }
140
141 c.wg.Wait()
142 close(c.jobQueue)
143}
144
145func (c *Consumer) AddSource(ctx context.Context, s Source) {
146 c.sourcesMu.Lock()
147 if _, ok := c.sources[s]; ok {
148 c.sourcesMu.Unlock()
149 c.logger.Info("source already present", "source", s)
150 return
151 }
152 srcCtx, cancel := context.WithCancel(ctx)
153 c.sources[s] = &sourceState{cancel: cancel}
154 c.sourcesMu.Unlock()
155
156 c.wg.Add(1)
157 go c.startConnectionLoop(srcCtx, s)
158}
159
160func (c *Consumer) RemoveSource(s Source) {
161 c.sourcesMu.Lock()
162 st, ok := c.sources[s]
163 if !ok {
164 c.sourcesMu.Unlock()
165 c.logger.Info("source not present", "source", s)
166 return
167 }
168 delete(c.sources, s)
169 cancel := st.cancel
170 conn := st.conn
171 c.sourcesMu.Unlock()
172
173 // release lock before any potentially blocking call
174 if cancel != nil {
175 cancel()
176 }
177 if conn != nil {
178 conn.Close()
179 }
180}
181
182func (c *Consumer) worker(ctx context.Context) {
183 defer c.wg.Done()
184 for {
185 select {
186 case <-ctx.Done():
187 return
188 case j, ok := <-c.jobQueue:
189 if !ok {
190 return
191 }
192
193 var msg Message
194 err := json.Unmarshal(j.message, &msg)
195 if err != nil {
196 c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err)
197 return
198 }
199
200 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
201 c.logger.Error("error processing message", "source", j.source, "err", err)
202 }
203
204 cursorVal := msg.Created
205 if cursorVal == 0 {
206 cursorVal = time.Now().UnixNano()
207 }
208 c.cfg.CursorStore.Set(j.source.Key(), cursorVal)
209 }
210 }
211}
212
213func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
214 defer c.wg.Done()
215
216 // attempt connection initially
217 err := c.runConnection(ctx, source)
218 if err != nil {
219 c.logger.Error("failed to run connection", "err", err)
220 }
221
222 timer := time.NewTimer(1 * time.Minute)
223 defer timer.Stop()
224
225 // every subsequent attempt is delayed by 1 minute
226 for {
227 select {
228 case <-ctx.Done():
229 return
230 case <-timer.C:
231 err := c.runConnection(ctx, source)
232 if err != nil {
233 c.logger.Error("failed to run connection", "err", err)
234 }
235 timer.Reset(1 * time.Minute)
236 }
237 }
238}
239
240func (c *Consumer) runConnection(ctx context.Context, source Source) error {
241 cursor := c.cfg.CursorStore.Get(source.Key())
242
243 u, err := source.Url(cursor, c.cfg.Dev)
244 if err != nil {
245 return err
246 }
247
248 c.logger.Info("connecting", "url", u.String())
249
250 retryOpts := []retry.Option{
251 retry.Attempts(0), // infinite attempts
252 retry.DelayType(retry.BackOffDelay),
253 retry.Delay(c.cfg.RetryInterval),
254 retry.MaxDelay(c.cfg.MaxRetryInterval),
255 retry.MaxJitter(c.cfg.RetryInterval / 5),
256 retry.OnRetry(func(n uint, err error) {
257 c.logger.Info("retrying connection",
258 "source", source,
259 "url", u.String(),
260 "attempt", n+1,
261 "err", err,
262 )
263 }),
264 retry.Context(ctx),
265 }
266
267 var conn *websocket.Conn
268
269 err = retry.Do(func() error {
270 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
271 defer cancel()
272 conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil)
273 return err
274 }, retryOpts...)
275 if err != nil {
276 return err
277 }
278
279 // Register the conn. If the source was removed (or our ctx cancelled)
280 // while we were dialing, drop this conn instead of installing it.
281 c.sourcesMu.Lock()
282 st, ok := c.sources[source]
283 if !ok || ctx.Err() != nil {
284 c.sourcesMu.Unlock()
285 conn.Close()
286 if ctx.Err() != nil {
287 return ctx.Err()
288 }
289 return nil
290 }
291 st.conn = conn
292 c.sourcesMu.Unlock()
293
294 defer func() {
295 // Clear the conn from state, but only if it's still our conn (a
296 // concurrent RemoveSource may have already done it).
297 c.sourcesMu.Lock()
298 if st, ok := c.sources[source]; ok && st.conn == conn {
299 st.conn = nil
300 }
301 c.sourcesMu.Unlock()
302 conn.Close()
303 }()
304
305 c.logger.Info("connected", "source", source)
306
307 for {
308 select {
309 case <-ctx.Done():
310 return nil
311 default:
312 msgType, msg, err := conn.ReadMessage()
313 if err != nil {
314 return err
315 }
316 if msgType != websocket.TextMessage {
317 continue
318 }
319 select {
320 case c.jobQueue <- job{source: source, message: msg}:
321 case <-ctx.Done():
322 return nil
323 }
324 }
325 }
326}