Monorepo for Tangled
tangled.org
1package eventconsumer
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "net/http/httptest"
11 "strings"
12 "sync"
13 "testing"
14 "time"
15
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/eventstream"
18 "tangled.org/core/notifier"
19)
20
21type memSrc struct {
22 mu sync.Mutex
23 events []eventstream.Event
24}
25
26func (s *memSrc) add(ev eventstream.Event) {
27 s.mu.Lock()
28 defer s.mu.Unlock()
29 s.events = append(s.events, ev)
30}
31
32func (s *memSrc) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) {
33 s.mu.Lock()
34 defer s.mu.Unlock()
35 out := []eventstream.Event{}
36 for _, ev := range s.events {
37 if ev.Created > cursor {
38 out = append(out, ev)
39 if len(out) == limit {
40 break
41 }
42 }
43 }
44 return out, nil
45}
46
47func mkEv(i int) eventstream.Event {
48 return eventstream.Event{
49 Rkey: fmt.Sprintf("rk-%04d", i),
50 Nsid: "sh.tangled.test",
51 EventJson: json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)),
52 Created: int64(i + 1),
53 }
54}
55
56func startEventServer(t *testing.T, src *memSrc) (Source, *notifier.Notifier) {
57 t.Helper()
58 n := notifier.New()
59 mux := http.NewServeMux()
60 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
61 _ = eventstream.Stream(w, r, eventstream.StreamConfig{
62 Backend: src,
63 Notifier: &n,
64 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
65 BatchSize: 5,
66 MaxBatchesPerDrain: 100,
67 })
68 })
69 srv := httptest.NewServer(mux)
70 t.Cleanup(srv.Close)
71 addr := strings.TrimPrefix(srv.URL, "http://")
72 return Source{Kind: "test", Host: addr, NoTLS: true}, &n
73}
74
75func TestConsumer_DrainAdvancesCursor(t *testing.T) {
76 src := &memSrc{}
77 for i := range 8 {
78 src.add(mkEv(i))
79 }
80
81 source, _ := startEventServer(t, src)
82
83 store := &cursor.MemoryStore{}
84 seenMu := sync.Mutex{}
85 seen := []int64{}
86
87 cfg := ConsumerConfig{
88 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error {
89 seenMu.Lock()
90 seen = append(seen, msg.Created)
91 seenMu.Unlock()
92 return nil
93 },
94 WorkerCount: 1,
95 QueueSize: 16,
96 ConnectionTimeout: 2 * time.Second,
97 CursorStore: store,
98 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
99 }
100 c := NewConsumer(cfg)
101
102 ctx, cancel := context.WithCancel(context.Background())
103 defer cancel()
104
105 c.Start(ctx)
106 c.AddSource(ctx, source)
107
108 deadline := time.Now().Add(3 * time.Second)
109 for time.Now().Before(deadline) {
110 seenMu.Lock()
111 n := len(seen)
112 seenMu.Unlock()
113 if n >= 8 {
114 break
115 }
116 time.Sleep(20 * time.Millisecond)
117 }
118
119 seenMu.Lock()
120 defer seenMu.Unlock()
121 if len(seen) != 8 {
122 t.Fatalf("processed %d events, want 8: %v", len(seen), seen)
123 }
124 for i, got := range seen {
125 if got != int64(i+1) {
126 t.Fatalf("event %d: got created=%d want %d", i, got, i+1)
127 }
128 }
129
130 if final := store.Get(source.Key()); final != 8 {
131 t.Fatalf("cursor = %d, want 8", final)
132 }
133}
134
135func TestConsumer_CursorMonotonic_OutOfOrderWorkers(t *testing.T) {
136 src := &memSrc{}
137 for i := range 4 {
138 src.add(mkEv(i))
139 }
140
141 source, _ := startEventServer(t, src)
142
143 store := &cursor.MemoryStore{}
144
145 releaseFirst := make(chan struct{})
146 processed := make(chan int64, 4)
147
148 cfg := ConsumerConfig{
149 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error {
150 if msg.Created == 1 {
151 <-releaseFirst
152 }
153 processed <- msg.Created
154 return nil
155 },
156 WorkerCount: 4,
157 QueueSize: 16,
158 ConnectionTimeout: 2 * time.Second,
159 CursorStore: store,
160 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
161 }
162 c := NewConsumer(cfg)
163
164 ctx, cancel := context.WithCancel(context.Background())
165 defer cancel()
166
167 c.Start(ctx)
168 c.AddSource(ctx, source)
169
170 for range 3 {
171 select {
172 case <-processed:
173 case <-time.After(3 * time.Second):
174 t.Fatal("timed out waiting for events 2-4 to be processed")
175 }
176 }
177
178 if cur := store.Get(source.Key()); cur != 4 {
179 t.Fatalf("cursor before slow worker finished = %d, want 4", cur)
180 }
181
182 close(releaseFirst)
183 select {
184 case <-processed:
185 case <-time.After(3 * time.Second):
186 t.Fatal("timed out waiting for slow worker")
187 }
188
189 if cur := store.Get(source.Key()); cur != 4 {
190 t.Fatalf("cursor regressed after slow worker: %d, want 4", cur)
191 }
192}
193
194func TestConsumer_StopTerminatesWithoutCtxCancel(t *testing.T) {
195 src := &memSrc{}
196 source, _ := startEventServer(t, src)
197
198 cfg := ConsumerConfig{
199 ProcessFunc: func(ctx context.Context, _ Source, _ eventstream.Event) error { return nil },
200 WorkerCount: 2,
201 QueueSize: 8,
202 ConnectionTimeout: 2 * time.Second,
203 CursorStore: &cursor.MemoryStore{},
204 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
205 }
206 c := NewConsumer(cfg)
207
208 c.Start(context.Background())
209 c.AddSource(context.Background(), source)
210
211 done := make(chan struct{})
212 go func() {
213 c.Stop()
214 close(done)
215 }()
216
217 select {
218 case <-done:
219 case <-time.After(5 * time.Second):
220 t.Fatal("Stop did not return within 5s")
221 }
222}
223
224func TestConsumer_ResumesFromStoredCursor(t *testing.T) {
225 src := &memSrc{}
226 for i := range 5 {
227 src.add(mkEv(i))
228 }
229
230 source, _ := startEventServer(t, src)
231
232 store := &cursor.MemoryStore{}
233 store.Set(source.Key(), 3)
234
235 seenMu := sync.Mutex{}
236 seen := []int64{}
237
238 cfg := ConsumerConfig{
239 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error {
240 seenMu.Lock()
241 seen = append(seen, msg.Created)
242 seenMu.Unlock()
243 return nil
244 },
245 WorkerCount: 1,
246 QueueSize: 16,
247 ConnectionTimeout: 2 * time.Second,
248 CursorStore: store,
249 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
250 }
251 c := NewConsumer(cfg)
252
253 ctx, cancel := context.WithCancel(context.Background())
254 defer cancel()
255
256 c.Start(ctx)
257 c.AddSource(ctx, source)
258
259 deadline := time.Now().Add(3 * time.Second)
260 for time.Now().Before(deadline) {
261 seenMu.Lock()
262 n := len(seen)
263 seenMu.Unlock()
264 if n >= 2 {
265 break
266 }
267 time.Sleep(20 * time.Millisecond)
268 }
269
270 seenMu.Lock()
271 defer seenMu.Unlock()
272 if len(seen) < 2 {
273 t.Fatalf("processed %d events, want 2: %v", len(seen), seen)
274 }
275 if seen[0] != 4 || seen[1] != 5 {
276 t.Fatalf("resumed events = %v, want [4 5]", seen)
277 }
278}