Monorepo for Tangled
tangled.org
1package eventstream
2
3import (
4 "database/sql"
5 "encoding/json"
6 "sync"
7 "time"
8
9 "tangled.org/core/notifier"
10)
11
12type Store interface {
13 Exec(query string, args ...any) (sql.Result, error)
14 Query(query string, args ...any) (*sql.Rows, error)
15}
16
17var (
18 clockMu sync.Mutex
19 lastNanos int64
20)
21
22func Insert(s Store, ev Event, n *notifier.Notifier) error {
23 clockMu.Lock()
24 defer clockMu.Unlock()
25
26 if ev.Created == 0 {
27 now := time.Now().UnixNano()
28 if now <= lastNanos {
29 now = lastNanos + 1
30 }
31 ev.Created = now
32 }
33 if ev.Created > lastNanos {
34 lastNanos = ev.Created
35 }
36
37 if _, err := s.Exec(
38 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
39 ev.Rkey,
40 ev.Nsid,
41 []byte(ev.EventJson),
42 ev.Created,
43 ); err != nil {
44 return err
45 }
46 n.NotifyAll()
47 return nil
48}
49
50func List(s Store, cursor int64, limit int) ([]Event, error) {
51 rows, err := s.Query(`
52 select rkey, nsid, event, created
53 from events
54 where created > ?
55 order by created asc
56 limit ?
57 `, cursor, limit)
58 if err != nil {
59 return nil, err
60 }
61 defer rows.Close()
62
63 var out []Event
64 for rows.Next() {
65 var ev Event
66 var eventJsonStr string
67 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &eventJsonStr, &ev.Created); err != nil {
68 return nil, err
69 }
70 ev.EventJson = json.RawMessage(eventJsonStr)
71 out = append(out, ev)
72 }
73
74 if err := rows.Err(); err != nil {
75 return nil, err
76 }
77
78 return out, nil
79}