Monorepo for Tangled
tangled.org
1package eventstream
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "log/slog"
8 "net/http"
9 "strconv"
10 "time"
11
12 "github.com/gorilla/websocket"
13 "tangled.org/core/notifier"
14)
15
16type Event struct {
17 Rkey string `json:"rkey"`
18 Nsid string `json:"nsid"`
19 EventJson json.RawMessage `json:"event"`
20 Created int64 `json:"created"`
21}
22
23type Backend interface {
24 GetEvents(cursor int64, limit int) ([]Event, error)
25}
26
27const (
28 defaultBatchSize = 100
29 defaultMaxBatchesPerDrain = 1_000
30 keepAliveInterval = 30 * time.Second
31 writeDeadline = 10 * time.Second
32)
33
34var ErrDrainCap = errors.New("eventstream: drain cap reached, reconnect to continue")
35
36var upgrader = websocket.Upgrader{
37 ReadBufferSize: 1024,
38 WriteBufferSize: 1024,
39}
40
41type StreamConfig struct {
42 Backend Backend
43 Notifier *notifier.Notifier
44 Logger *slog.Logger
45
46 BatchSize int
47 MaxBatchesPerDrain int
48}
49
50func (c *StreamConfig) batchSize() int {
51 if c.BatchSize > 0 {
52 return c.BatchSize
53 }
54 return defaultBatchSize
55}
56
57func (c *StreamConfig) maxBatchesPerDrain() int {
58 if c.MaxBatchesPerDrain > 0 {
59 return c.MaxBatchesPerDrain
60 }
61 return defaultMaxBatchesPerDrain
62}
63
64func Stream(w http.ResponseWriter, r *http.Request, cfg StreamConfig) error {
65 conn, err := upgrader.Upgrade(w, r, nil)
66 if err != nil {
67 return err
68 }
69 defer conn.Close()
70
71 var cursor int64
72 if raw := r.URL.Query().Get("cursor"); raw != "" {
73 parsed, perr := strconv.ParseInt(raw, 10, 64)
74 if perr != nil {
75 if cfg.Logger != nil {
76 cfg.Logger.Warn("invalid cursor, starting from head", "cursor", raw, "err", perr)
77 }
78 } else {
79 cursor = parsed
80 }
81 }
82
83 ch := cfg.Notifier.Subscribe()
84 defer cfg.Notifier.Unsubscribe(ch)
85
86 ctx, cancel := context.WithCancel(r.Context())
87 defer cancel()
88
89 go func() {
90 for {
91 if _, _, err := conn.NextReader(); err != nil {
92 cancel()
93 return
94 }
95 }
96 }()
97
98 drain := func() error {
99 err := drainUntilShort(conn, cfg, &cursor)
100 if errors.Is(err, ErrDrainCap) {
101 _ = conn.WriteControl(
102 websocket.CloseMessage,
103 websocket.FormatCloseMessage(websocket.CloseTryAgainLater, "drain cap reached, reconnect to continue"),
104 time.Now().Add(writeDeadline),
105 )
106 }
107 return err
108 }
109
110 if err := drain(); err != nil {
111 return err
112 }
113
114 for {
115 select {
116 case <-ctx.Done():
117 return nil
118 case <-ch:
119 if err := drain(); err != nil {
120 return err
121 }
122 case <-time.After(keepAliveInterval):
123 if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeDeadline)); err != nil {
124 return err
125 }
126 }
127 }
128}
129
130func drainUntilShort(conn *websocket.Conn, cfg StreamConfig, cursor *int64) error {
131 limit := cfg.batchSize()
132 for range cfg.maxBatchesPerDrain() {
133 n, err := streamBatch(conn, cfg, cursor)
134 if err != nil {
135 return err
136 }
137 if n < limit {
138 return nil
139 }
140 }
141 if cfg.Logger != nil {
142 cfg.Logger.Warn("drain hit batch cap", "cursor", *cursor, "cap", cfg.maxBatchesPerDrain())
143 }
144 return ErrDrainCap
145}
146
147func streamBatch(conn *websocket.Conn, cfg StreamConfig, cursor *int64) (int, error) {
148 events, err := cfg.Backend.GetEvents(*cursor, cfg.batchSize())
149 if err != nil {
150 return 0, err
151 }
152 for _, ev := range events {
153 msg, err := json.Marshal(ev)
154 if err != nil {
155 return 0, err
156 }
157 if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil {
158 return 0, err
159 }
160 if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
161 return 0, err
162 }
163 *cursor = ev.Created
164 }
165 return len(events), nil
166}