Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

eventstream: shared events package, monotonic unix-nanos cursors

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
date (May 29, 2026, 2:50 PM +0300) commit f8c74ae4 parent 5794021d change-id mxnrkypx
+551
+166
eventstream/eventstream.go
··· 1 + package eventstream 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "log/slog" 8 + "net/http" 9 + "strconv" 10 + "time" 11 + 12 + "github.com/gorilla/websocket" 13 + "tangled.org/core/notifier" 14 + ) 15 + 16 + type Event struct { 17 + Rkey string `json:"rkey"` 18 + Nsid string `json:"nsid"` 19 + EventJson json.RawMessage `json:"event"` 20 + Created int64 `json:"created"` 21 + } 22 + 23 + type Backend interface { 24 + GetEvents(cursor int64, limit int) ([]Event, error) 25 + } 26 + 27 + const ( 28 + defaultBatchSize = 100 29 + defaultMaxBatchesPerDrain = 1_000 30 + keepAliveInterval = 30 * time.Second 31 + writeDeadline = 10 * time.Second 32 + ) 33 + 34 + var ErrDrainCap = errors.New("eventstream: drain cap reached, reconnect to continue") 35 + 36 + var upgrader = websocket.Upgrader{ 37 + ReadBufferSize: 1024, 38 + WriteBufferSize: 1024, 39 + } 40 + 41 + type StreamConfig struct { 42 + Backend Backend 43 + Notifier *notifier.Notifier 44 + Logger *slog.Logger 45 + 46 + BatchSize int 47 + MaxBatchesPerDrain int 48 + } 49 + 50 + func (c *StreamConfig) batchSize() int { 51 + if c.BatchSize > 0 { 52 + return c.BatchSize 53 + } 54 + return defaultBatchSize 55 + } 56 + 57 + func (c *StreamConfig) maxBatchesPerDrain() int { 58 + if c.MaxBatchesPerDrain > 0 { 59 + return c.MaxBatchesPerDrain 60 + } 61 + return defaultMaxBatchesPerDrain 62 + } 63 + 64 + func Stream(w http.ResponseWriter, r *http.Request, cfg StreamConfig) error { 65 + conn, err := upgrader.Upgrade(w, r, nil) 66 + if err != nil { 67 + return err 68 + } 69 + defer conn.Close() 70 + 71 + var cursor int64 72 + if raw := r.URL.Query().Get("cursor"); raw != "" { 73 + parsed, perr := strconv.ParseInt(raw, 10, 64) 74 + if perr != nil { 75 + if cfg.Logger != nil { 76 + cfg.Logger.Warn("invalid cursor, starting from head", "cursor", raw, "err", perr) 77 + } 78 + } else { 79 + cursor = parsed 80 + } 81 + } 82 + 83 + ch := cfg.Notifier.Subscribe() 84 + defer cfg.Notifier.Unsubscribe(ch) 85 + 86 + ctx, cancel := context.WithCancel(r.Context()) 87 + defer cancel() 88 + 89 + go func() { 90 + for { 91 + if _, _, err := conn.NextReader(); err != nil { 92 + cancel() 93 + return 94 + } 95 + } 96 + }() 97 + 98 + drain := func() error { 99 + err := drainUntilShort(conn, cfg, &cursor) 100 + if errors.Is(err, ErrDrainCap) { 101 + _ = conn.WriteControl( 102 + websocket.CloseMessage, 103 + websocket.FormatCloseMessage(websocket.CloseTryAgainLater, "drain cap reached, reconnect to continue"), 104 + time.Now().Add(writeDeadline), 105 + ) 106 + } 107 + return err 108 + } 109 + 110 + if err := drain(); err != nil { 111 + return err 112 + } 113 + 114 + for { 115 + select { 116 + case <-ctx.Done(): 117 + return nil 118 + case <-ch: 119 + if err := drain(); err != nil { 120 + return err 121 + } 122 + case <-time.After(keepAliveInterval): 123 + if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeDeadline)); err != nil { 124 + return err 125 + } 126 + } 127 + } 128 + } 129 + 130 + func drainUntilShort(conn *websocket.Conn, cfg StreamConfig, cursor *int64) error { 131 + limit := cfg.batchSize() 132 + for range cfg.maxBatchesPerDrain() { 133 + n, err := streamBatch(conn, cfg, cursor) 134 + if err != nil { 135 + return err 136 + } 137 + if n < limit { 138 + return nil 139 + } 140 + } 141 + if cfg.Logger != nil { 142 + cfg.Logger.Warn("drain hit batch cap", "cursor", *cursor, "cap", cfg.maxBatchesPerDrain()) 143 + } 144 + return ErrDrainCap 145 + } 146 + 147 + func streamBatch(conn *websocket.Conn, cfg StreamConfig, cursor *int64) (int, error) { 148 + events, err := cfg.Backend.GetEvents(*cursor, cfg.batchSize()) 149 + if err != nil { 150 + return 0, err 151 + } 152 + for _, ev := range events { 153 + msg, err := json.Marshal(ev) 154 + if err != nil { 155 + return 0, err 156 + } 157 + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { 158 + return 0, err 159 + } 160 + if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { 161 + return 0, err 162 + } 163 + *cursor = ev.Created 164 + } 165 + return len(events), nil 166 + }
+306
eventstream/eventstream_test.go
··· 1 + package eventstream 2 + 3 + import ( 4 + "database/sql" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "net/http/httptest" 12 + "strconv" 13 + "strings" 14 + "sync" 15 + "testing" 16 + "time" 17 + 18 + "github.com/gorilla/websocket" 19 + _ "github.com/mattn/go-sqlite3" 20 + "tangled.org/core/notifier" 21 + ) 22 + 23 + type memSource struct { 24 + mu sync.Mutex 25 + events []Event 26 + } 27 + 28 + func (s *memSource) add(ev Event) { 29 + s.mu.Lock() 30 + defer s.mu.Unlock() 31 + s.events = append(s.events, ev) 32 + } 33 + 34 + func (s *memSource) GetEvents(cursor int64, limit int) ([]Event, error) { 35 + s.mu.Lock() 36 + defer s.mu.Unlock() 37 + out := []Event{} 38 + for _, ev := range s.events { 39 + if ev.Created > cursor { 40 + out = append(out, ev) 41 + if len(out) == limit { 42 + break 43 + } 44 + } 45 + } 46 + return out, nil 47 + } 48 + 49 + func mkEvent(i int) Event { 50 + return Event{ 51 + Rkey: fmt.Sprintf("rk-%04d", i), 52 + Nsid: "sh.tangled.test", 53 + EventJson: json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)), 54 + Created: int64(i + 1), 55 + } 56 + } 57 + 58 + func startServer(t *testing.T, src Backend, cfg StreamConfig) (string, *notifier.Notifier, <-chan error) { 59 + t.Helper() 60 + n := notifier.New() 61 + cfg.Backend = src 62 + cfg.Notifier = &n 63 + cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) 64 + 65 + errCh := make(chan error, 1) 66 + mux := http.NewServeMux() 67 + mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 68 + errCh <- Stream(w, r, cfg) 69 + }) 70 + srv := httptest.NewServer(mux) 71 + t.Cleanup(srv.Close) 72 + wsURL := "ws" + strings.TrimPrefix(srv.URL, "http") + "/events" 73 + return wsURL, &n, errCh 74 + } 75 + 76 + func dial(t *testing.T, wsURL string, cursor int64) *websocket.Conn { 77 + t.Helper() 78 + if cursor != 0 { 79 + wsURL += "?cursor=" + strconv.FormatInt(cursor, 10) 80 + } 81 + c, _, err := websocket.DefaultDialer.Dial(wsURL, nil) 82 + if err != nil { 83 + t.Fatalf("dial: %v", err) 84 + } 85 + t.Cleanup(func() { c.Close() }) 86 + return c 87 + } 88 + 89 + func readN(t *testing.T, c *websocket.Conn, n int) []Event { 90 + t.Helper() 91 + c.SetReadDeadline(time.Now().Add(2 * time.Second)) 92 + out := make([]Event, 0, n) 93 + for range n { 94 + _, msg, err := c.ReadMessage() 95 + if err != nil { 96 + t.Fatalf("read message at %d/%d: %v", len(out), n, err) 97 + } 98 + var ev Event 99 + if err := json.Unmarshal(msg, &ev); err != nil { 100 + t.Fatalf("unmarshal: %v", err) 101 + } 102 + out = append(out, ev) 103 + } 104 + return out 105 + } 106 + 107 + func TestStream_DrainStopsOnShortBatch(t *testing.T) { 108 + src := &memSource{} 109 + for i := range 7 { 110 + src.add(mkEvent(i)) 111 + } 112 + 113 + wsURL, _, errCh := startServer(t, src, StreamConfig{ 114 + BatchSize: 3, 115 + MaxBatchesPerDrain: 10, 116 + }) 117 + c := dial(t, wsURL, 0) 118 + 119 + got := readN(t, c, 7) 120 + for i, ev := range got { 121 + if ev.Created != int64(i+1) { 122 + t.Fatalf("event %d: got created=%d", i, ev.Created) 123 + } 124 + } 125 + 126 + c.Close() 127 + select { 128 + case err := <-errCh: 129 + if err != nil && !isCloseErr(err) { 130 + t.Fatalf("server error: %v", err) 131 + } 132 + case <-time.After(2 * time.Second): 133 + t.Fatal("server did not exit") 134 + } 135 + } 136 + 137 + func TestStream_DrainHitsCap_ReturnsErrDrainCap(t *testing.T) { 138 + src := &memSource{} 139 + for i := range 5 { 140 + src.add(mkEvent(i)) 141 + } 142 + 143 + wsURL, _, errCh := startServer(t, src, StreamConfig{ 144 + BatchSize: 2, 145 + MaxBatchesPerDrain: 2, 146 + }) 147 + c := dial(t, wsURL, 0) 148 + 149 + got := readN(t, c, 4) 150 + if len(got) != 4 { 151 + t.Fatalf("want 4 events before cap, got %d", len(got)) 152 + } 153 + if got[3].Created != 4 { 154 + t.Fatalf("last delivered created = %d, want 4", got[3].Created) 155 + } 156 + 157 + select { 158 + case err := <-errCh: 159 + if !errors.Is(err, ErrDrainCap) { 160 + t.Fatalf("want ErrDrainCap, got %v", err) 161 + } 162 + case <-time.After(2 * time.Second): 163 + t.Fatal("server did not return cap error") 164 + } 165 + } 166 + 167 + func TestStream_CursorResume(t *testing.T) { 168 + src := &memSource{} 169 + for i := range 5 { 170 + src.add(mkEvent(i)) 171 + } 172 + 173 + wsURL, _, errCh := startServer(t, src, StreamConfig{ 174 + BatchSize: 10, 175 + MaxBatchesPerDrain: 10, 176 + }) 177 + c := dial(t, wsURL, 3) 178 + 179 + got := readN(t, c, 2) 180 + if got[0].Created != 4 || got[1].Created != 5 { 181 + t.Fatalf("resume from cursor: got %d,%d want 4,5", got[0].Created, got[1].Created) 182 + } 183 + 184 + c.Close() 185 + <-errCh 186 + } 187 + 188 + func TestStream_LiveDelivery(t *testing.T) { 189 + src := &memSource{} 190 + 191 + wsURL, n, errCh := startServer(t, src, StreamConfig{ 192 + BatchSize: 10, 193 + MaxBatchesPerDrain: 10, 194 + }) 195 + c := dial(t, wsURL, 0) 196 + 197 + src.add(mkEvent(42)) 198 + n.NotifyAll() 199 + 200 + got := readN(t, c, 1) 201 + if got[0].Created != 43 { 202 + t.Fatalf("live event created = %d, want 43", got[0].Created) 203 + } 204 + 205 + c.Close() 206 + <-errCh 207 + } 208 + 209 + func TestStream_LiveBurstExceedsBatchSize_DrainsAll(t *testing.T) { 210 + src := &memSource{} 211 + 212 + wsURL, n, errCh := startServer(t, src, StreamConfig{ 213 + BatchSize: 5, 214 + MaxBatchesPerDrain: 100, 215 + }) 216 + c := dial(t, wsURL, 0) 217 + 218 + const burst = 17 219 + for i := range burst { 220 + src.add(mkEvent(i)) 221 + } 222 + n.NotifyAll() 223 + 224 + got := readN(t, c, burst) 225 + if len(got) != burst { 226 + t.Fatalf("got %d events, want %d", len(got), burst) 227 + } 228 + for i, ev := range got { 229 + if ev.Created != int64(i+1) { 230 + t.Fatalf("event %d: got created=%d want %d", i, ev.Created, i+1) 231 + } 232 + } 233 + 234 + c.Close() 235 + <-errCh 236 + } 237 + 238 + func TestInsert_MonotonicCreatedUnderConcurrency(t *testing.T) { 239 + db, err := sql.Open("sqlite3", t.TempDir()+"/events.db") 240 + if err != nil { 241 + t.Fatalf("open: %v", err) 242 + } 243 + t.Cleanup(func() { db.Close() }) 244 + if _, err := db.Exec(`create table events ( 245 + rkey text not null, 246 + nsid text not null, 247 + event text not null, 248 + created integer not null, 249 + primary key (rkey, nsid) 250 + )`); err != nil { 251 + t.Fatalf("schema: %v", err) 252 + } 253 + 254 + n := notifier.New() 255 + const total = 300 256 + var wg sync.WaitGroup 257 + for i := range total { 258 + wg.Add(1) 259 + go func(i int) { 260 + defer wg.Done() 261 + if err := Insert(db, Event{ 262 + Rkey: fmt.Sprintf("rk-%d", i), 263 + Nsid: "sh.tangled.test", 264 + EventJson: json.RawMessage("{}"), 265 + }, &n); err != nil { 266 + t.Errorf("insert %d: %v", i, err) 267 + } 268 + }(i) 269 + } 270 + wg.Wait() 271 + 272 + rows, err := db.Query(`select created from events order by created asc`) 273 + if err != nil { 274 + t.Fatalf("read: %v", err) 275 + } 276 + defer rows.Close() 277 + 278 + var prev int64 279 + count := 0 280 + for rows.Next() { 281 + var c int64 282 + if err := rows.Scan(&c); err != nil { 283 + t.Fatalf("scan: %v", err) 284 + } 285 + if count > 0 && c <= prev { 286 + t.Fatalf("created not strictly increasing: %d <= %d", c, prev) 287 + } 288 + prev = c 289 + count++ 290 + } 291 + if count != total { 292 + t.Fatalf("got %d rows, want %d", count, total) 293 + } 294 + } 295 + 296 + func isCloseErr(err error) bool { 297 + if err == nil { 298 + return false 299 + } 300 + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { 301 + return true 302 + } 303 + return strings.Contains(err.Error(), "use of closed network connection") || 304 + strings.Contains(err.Error(), "websocket: close") || 305 + strings.Contains(err.Error(), "broken pipe") 306 + }
+79
eventstream/store.go
··· 1 + package eventstream 2 + 3 + import ( 4 + "database/sql" 5 + "encoding/json" 6 + "sync" 7 + "time" 8 + 9 + "tangled.org/core/notifier" 10 + ) 11 + 12 + type Store interface { 13 + Exec(query string, args ...any) (sql.Result, error) 14 + Query(query string, args ...any) (*sql.Rows, error) 15 + } 16 + 17 + var ( 18 + clockMu sync.Mutex 19 + lastNanos int64 20 + ) 21 + 22 + func Insert(s Store, ev Event, n *notifier.Notifier) error { 23 + clockMu.Lock() 24 + defer clockMu.Unlock() 25 + 26 + if ev.Created == 0 { 27 + now := time.Now().UnixNano() 28 + if now <= lastNanos { 29 + now = lastNanos + 1 30 + } 31 + ev.Created = now 32 + } 33 + if ev.Created > lastNanos { 34 + lastNanos = ev.Created 35 + } 36 + 37 + if _, err := s.Exec( 38 + `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 39 + ev.Rkey, 40 + ev.Nsid, 41 + []byte(ev.EventJson), 42 + ev.Created, 43 + ); err != nil { 44 + return err 45 + } 46 + n.NotifyAll() 47 + return nil 48 + } 49 + 50 + func List(s Store, cursor int64, limit int) ([]Event, error) { 51 + rows, err := s.Query(` 52 + select rkey, nsid, event, created 53 + from events 54 + where created > ? 55 + order by created asc 56 + limit ? 57 + `, cursor, limit) 58 + if err != nil { 59 + return nil, err 60 + } 61 + defer rows.Close() 62 + 63 + var out []Event 64 + for rows.Next() { 65 + var ev Event 66 + var eventJsonStr string 67 + if err := rows.Scan(&ev.Rkey, &ev.Nsid, &eventJsonStr, &ev.Created); err != nil { 68 + return nil, err 69 + } 70 + ev.EventJson = json.RawMessage(eventJsonStr) 71 + out = append(out, ev) 72 + } 73 + 74 + if err := rows.Err(); err != nil { 75 + return nil, err 76 + } 77 + 78 + return out, nil 79 + }