Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package eventconsumer 2 3import ( 4 "context" 5 "encoding/json" 6 "log/slog" 7 "math/rand" 8 "net/url" 9 "sync" 10 "time" 11 12 "tangled.org/core/eventconsumer/cursor" 13 "tangled.org/core/log" 14 15 "github.com/avast/retry-go/v4" 16 "github.com/gorilla/websocket" 17) 18 19type ProcessFunc func(ctx context.Context, source Source, message Message) error 20 21type Message struct { 22 Rkey string 23 Nsid string 24 Created int64 `json:"created"` 25 EventJson json.RawMessage `json:"event"` 26} 27 28type ConsumerConfig struct { 29 Sources map[Source]struct{} 30 ProcessFunc ProcessFunc 31 RetryInterval time.Duration 32 MaxRetryInterval time.Duration 33 ConnectionTimeout time.Duration 34 WorkerCount int 35 QueueSize int 36 Logger *slog.Logger 37 Dev bool 38 CursorStore cursor.Store 39} 40 41func NewConsumerConfig() *ConsumerConfig { 42 return &ConsumerConfig{ 43 Sources: make(map[Source]struct{}), 44 } 45} 46 47type Source interface { 48 // url to start streaming events from 49 Url(cursor int64, dev bool) (*url.URL, error) 50 // cache key for cursor storage 51 Key() string 52} 53 54type Consumer struct { 55 wg sync.WaitGroup 56 dialer *websocket.Dialer 57 jobQueue chan job 58 logger *slog.Logger 59 randSource *rand.Rand 60 61 // sourcesMu guards sources. It must only be held for short, non-blocking 62 // map operations; never across a blocking call (dial, read, close). 63 sourcesMu sync.Mutex 64 sources map[Source]*sourceState 65 66 cfg ConsumerConfig 67} 68 69type sourceState struct { 70 cancel context.CancelFunc 71 conn *websocket.Conn 72} 73 74type job struct { 75 source Source 76 message []byte 77} 78 79func NewConsumer(cfg ConsumerConfig) *Consumer { 80 if cfg.RetryInterval == 0 { 81 cfg.RetryInterval = 15 * time.Minute 82 } 83 if cfg.ConnectionTimeout == 0 { 84 cfg.ConnectionTimeout = 10 * time.Second 85 } 86 if cfg.WorkerCount <= 0 { 87 cfg.WorkerCount = 5 88 } 89 if cfg.MaxRetryInterval == 0 { 90 cfg.MaxRetryInterval = 1 * time.Hour 91 } 92 if cfg.Logger == nil { 93 cfg.Logger = log.New("consumer") 94 } 95 if cfg.QueueSize == 0 { 96 cfg.QueueSize = 100 97 } 98 if cfg.CursorStore == nil { 99 cfg.CursorStore = &cursor.MemoryStore{} 100 } 101 return &Consumer{ 102 cfg: cfg, 103 dialer: websocket.DefaultDialer, 104 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 105 logger: cfg.Logger, 106 randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 107 sources: make(map[Source]*sourceState), 108 } 109} 110 111func (c *Consumer) Start(ctx context.Context) { 112 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 113 114 // start workers 115 for range c.cfg.WorkerCount { 116 c.wg.Add(1) 117 go c.worker(ctx) 118 } 119 120 // start streaming 121 for source := range c.cfg.Sources { 122 c.AddSource(ctx, source) 123 } 124} 125 126func (c *Consumer) Stop() { 127 // snapshot conns under lock so we don't hold sourcesMu across Close 128 c.sourcesMu.Lock() 129 conns := make([]*websocket.Conn, 0, len(c.sources)) 130 for _, st := range c.sources { 131 if st.conn != nil { 132 conns = append(conns, st.conn) 133 } 134 } 135 c.sourcesMu.Unlock() 136 137 for _, conn := range conns { 138 conn.Close() 139 } 140 141 c.wg.Wait() 142 close(c.jobQueue) 143} 144 145func (c *Consumer) AddSource(ctx context.Context, s Source) { 146 c.sourcesMu.Lock() 147 if _, ok := c.sources[s]; ok { 148 c.sourcesMu.Unlock() 149 c.logger.Info("source already present", "source", s) 150 return 151 } 152 srcCtx, cancel := context.WithCancel(ctx) 153 c.sources[s] = &sourceState{cancel: cancel} 154 c.sourcesMu.Unlock() 155 156 c.wg.Add(1) 157 go c.startConnectionLoop(srcCtx, s) 158} 159 160func (c *Consumer) RemoveSource(s Source) { 161 c.sourcesMu.Lock() 162 st, ok := c.sources[s] 163 if !ok { 164 c.sourcesMu.Unlock() 165 c.logger.Info("source not present", "source", s) 166 return 167 } 168 delete(c.sources, s) 169 cancel := st.cancel 170 conn := st.conn 171 c.sourcesMu.Unlock() 172 173 // release lock before any potentially blocking call 174 if cancel != nil { 175 cancel() 176 } 177 if conn != nil { 178 conn.Close() 179 } 180} 181 182func (c *Consumer) worker(ctx context.Context) { 183 defer c.wg.Done() 184 for { 185 select { 186 case <-ctx.Done(): 187 return 188 case j, ok := <-c.jobQueue: 189 if !ok { 190 return 191 } 192 193 var msg Message 194 err := json.Unmarshal(j.message, &msg) 195 if err != nil { 196 c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err) 197 return 198 } 199 200 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 201 c.logger.Error("error processing message", "source", j.source, "err", err) 202 } 203 204 cursorVal := msg.Created 205 if cursorVal == 0 { 206 cursorVal = time.Now().UnixNano() 207 } 208 c.cfg.CursorStore.Set(j.source.Key(), cursorVal) 209 } 210 } 211} 212 213func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 214 defer c.wg.Done() 215 216 // attempt connection initially 217 err := c.runConnection(ctx, source) 218 if err != nil { 219 c.logger.Error("failed to run connection", "err", err) 220 } 221 222 timer := time.NewTimer(1 * time.Minute) 223 defer timer.Stop() 224 225 // every subsequent attempt is delayed by 1 minute 226 for { 227 select { 228 case <-ctx.Done(): 229 return 230 case <-timer.C: 231 err := c.runConnection(ctx, source) 232 if err != nil { 233 c.logger.Error("failed to run connection", "err", err) 234 } 235 timer.Reset(1 * time.Minute) 236 } 237 } 238} 239 240func (c *Consumer) runConnection(ctx context.Context, source Source) error { 241 cursor := c.cfg.CursorStore.Get(source.Key()) 242 243 u, err := source.Url(cursor, c.cfg.Dev) 244 if err != nil { 245 return err 246 } 247 248 c.logger.Info("connecting", "url", u.String()) 249 250 retryOpts := []retry.Option{ 251 retry.Attempts(0), // infinite attempts 252 retry.DelayType(retry.BackOffDelay), 253 retry.Delay(c.cfg.RetryInterval), 254 retry.MaxDelay(c.cfg.MaxRetryInterval), 255 retry.MaxJitter(c.cfg.RetryInterval / 5), 256 retry.OnRetry(func(n uint, err error) { 257 c.logger.Info("retrying connection", 258 "source", source, 259 "url", u.String(), 260 "attempt", n+1, 261 "err", err, 262 ) 263 }), 264 retry.Context(ctx), 265 } 266 267 var conn *websocket.Conn 268 269 err = retry.Do(func() error { 270 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 271 defer cancel() 272 conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil) 273 return err 274 }, retryOpts...) 275 if err != nil { 276 return err 277 } 278 279 // Register the conn. If the source was removed (or our ctx cancelled) 280 // while we were dialing, drop this conn instead of installing it. 281 c.sourcesMu.Lock() 282 st, ok := c.sources[source] 283 if !ok || ctx.Err() != nil { 284 c.sourcesMu.Unlock() 285 conn.Close() 286 if ctx.Err() != nil { 287 return ctx.Err() 288 } 289 return nil 290 } 291 st.conn = conn 292 c.sourcesMu.Unlock() 293 294 defer func() { 295 // Clear the conn from state, but only if it's still our conn (a 296 // concurrent RemoveSource may have already done it). 297 c.sourcesMu.Lock() 298 if st, ok := c.sources[source]; ok && st.conn == conn { 299 st.conn = nil 300 } 301 c.sourcesMu.Unlock() 302 conn.Close() 303 }() 304 305 c.logger.Info("connected", "source", source) 306 307 for { 308 select { 309 case <-ctx.Done(): 310 return nil 311 default: 312 msgType, msg, err := conn.ReadMessage() 313 if err != nil { 314 return err 315 } 316 if msgType != websocket.TextMessage { 317 continue 318 } 319 select { 320 case c.jobQueue <- job{source: source, message: msg}: 321 case <-ctx.Done(): 322 return nil 323 } 324 } 325 } 326}