Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at icy/lqyotq 3.5 kB View raw
1package eventstream 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "log/slog" 8 "net/http" 9 "strconv" 10 "time" 11 12 "github.com/gorilla/websocket" 13 "tangled.org/core/notifier" 14) 15 16type Event struct { 17 Rkey string `json:"rkey"` 18 Nsid string `json:"nsid"` 19 EventJson json.RawMessage `json:"event"` 20 Created int64 `json:"created"` 21} 22 23type Backend interface { 24 GetEvents(cursor int64, limit int) ([]Event, error) 25} 26 27const ( 28 defaultBatchSize = 100 29 defaultMaxBatchesPerDrain = 1_000 30 keepAliveInterval = 30 * time.Second 31 writeDeadline = 10 * time.Second 32) 33 34var ErrDrainCap = errors.New("eventstream: drain cap reached, reconnect to continue") 35 36var upgrader = websocket.Upgrader{ 37 ReadBufferSize: 1024, 38 WriteBufferSize: 1024, 39} 40 41type StreamConfig struct { 42 Backend Backend 43 Notifier *notifier.Notifier 44 Logger *slog.Logger 45 46 BatchSize int 47 MaxBatchesPerDrain int 48} 49 50func (c *StreamConfig) batchSize() int { 51 if c.BatchSize > 0 { 52 return c.BatchSize 53 } 54 return defaultBatchSize 55} 56 57func (c *StreamConfig) maxBatchesPerDrain() int { 58 if c.MaxBatchesPerDrain > 0 { 59 return c.MaxBatchesPerDrain 60 } 61 return defaultMaxBatchesPerDrain 62} 63 64func Stream(w http.ResponseWriter, r *http.Request, cfg StreamConfig) error { 65 conn, err := upgrader.Upgrade(w, r, nil) 66 if err != nil { 67 return err 68 } 69 defer conn.Close() 70 71 var cursor int64 72 if raw := r.URL.Query().Get("cursor"); raw != "" { 73 parsed, perr := strconv.ParseInt(raw, 10, 64) 74 if perr != nil { 75 if cfg.Logger != nil { 76 cfg.Logger.Warn("invalid cursor, starting from head", "cursor", raw, "err", perr) 77 } 78 } else { 79 cursor = parsed 80 } 81 } 82 83 ch := cfg.Notifier.Subscribe() 84 defer cfg.Notifier.Unsubscribe(ch) 85 86 ctx, cancel := context.WithCancel(r.Context()) 87 defer cancel() 88 89 go func() { 90 for { 91 if _, _, err := conn.NextReader(); err != nil { 92 cancel() 93 return 94 } 95 } 96 }() 97 98 drain := func() error { 99 err := drainUntilShort(conn, cfg, &cursor) 100 if errors.Is(err, ErrDrainCap) { 101 _ = conn.WriteControl( 102 websocket.CloseMessage, 103 websocket.FormatCloseMessage(websocket.CloseTryAgainLater, "drain cap reached, reconnect to continue"), 104 time.Now().Add(writeDeadline), 105 ) 106 } 107 return err 108 } 109 110 if err := drain(); err != nil { 111 return err 112 } 113 114 for { 115 select { 116 case <-ctx.Done(): 117 return nil 118 case <-ch: 119 if err := drain(); err != nil { 120 return err 121 } 122 case <-time.After(keepAliveInterval): 123 if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeDeadline)); err != nil { 124 return err 125 } 126 } 127 } 128} 129 130func drainUntilShort(conn *websocket.Conn, cfg StreamConfig, cursor *int64) error { 131 limit := cfg.batchSize() 132 for range cfg.maxBatchesPerDrain() { 133 n, err := streamBatch(conn, cfg, cursor) 134 if err != nil { 135 return err 136 } 137 if n < limit { 138 return nil 139 } 140 } 141 if cfg.Logger != nil { 142 cfg.Logger.Warn("drain hit batch cap", "cursor", *cursor, "cap", cfg.maxBatchesPerDrain()) 143 } 144 return ErrDrainCap 145} 146 147func streamBatch(conn *websocket.Conn, cfg StreamConfig, cursor *int64) (int, error) { 148 events, err := cfg.Backend.GetEvents(*cursor, cfg.batchSize()) 149 if err != nil { 150 return 0, err 151 } 152 for _, ev := range events { 153 msg, err := json.Marshal(ev) 154 if err != nil { 155 return 0, err 156 } 157 if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { 158 return 0, err 159 } 160 if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { 161 return 0, err 162 } 163 *cursor = ev.Created 164 } 165 return len(events), nil 166}