Monorepo for Tangled
tangled.org
1package eventconsumer
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "net/http/httptest"
11 "strings"
12 "sync"
13 "testing"
14 "time"
15
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/eventstream"
18 "tangled.org/core/notifier"
19)
20
21type memSrc struct {
22 mu sync.Mutex
23 events []eventstream.Event
24}
25
26func (s *memSrc) add(ev eventstream.Event) {
27 s.mu.Lock()
28 defer s.mu.Unlock()
29 s.events = append(s.events, ev)
30}
31
32func (s *memSrc) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) {
33 s.mu.Lock()
34 defer s.mu.Unlock()
35 out := []eventstream.Event{}
36 for _, ev := range s.events {
37 if ev.Created > cursor {
38 out = append(out, ev)
39 if len(out) == limit {
40 break
41 }
42 }
43 }
44 return out, nil
45}
46
47func mkEv(i int) eventstream.Event {
48 return eventstream.Event{
49 Rkey: fmt.Sprintf("rk-%04d", i),
50 Nsid: "sh.tangled.test",
51 EventJson: json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)),
52 Created: int64(i + 1),
53 }
54}
55
56func startEventServer(t *testing.T, src *memSrc) (Source, *notifier.Notifier) {
57 t.Helper()
58 n := notifier.New()
59 mux := http.NewServeMux()
60 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
61 _ = eventstream.Stream(w, r, eventstream.StreamConfig{
62 Backend: src,
63 Notifier: &n,
64 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
65 BatchSize: 5,
66 MaxBatchesPerDrain: 100,
67 })
68 })
69 srv := httptest.NewServer(mux)
70 t.Cleanup(srv.Close)
71 addr := strings.TrimPrefix(srv.URL, "http://")
72 return Source{Kind: "test", Host: addr}, &n
73}
74
75func TestConsumer_DrainAdvancesCursor(t *testing.T) {
76 src := &memSrc{}
77 for i := range 8 {
78 src.add(mkEv(i))
79 }
80
81 source, _ := startEventServer(t, src)
82
83 store := &cursor.MemoryStore{}
84 seenMu := sync.Mutex{}
85 seen := []int64{}
86
87 cfg := ConsumerConfig{
88 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error {
89 seenMu.Lock()
90 seen = append(seen, msg.Created)
91 seenMu.Unlock()
92 return nil
93 },
94 WorkerCount: 1,
95 QueueSize: 16,
96 ConnectionTimeout: 2 * time.Second,
97 CursorStore: store,
98 URLFunc: DefaultURL(true),
99 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
100 }
101 c := NewConsumer(cfg)
102
103 ctx, cancel := context.WithCancel(context.Background())
104 defer cancel()
105
106 c.Start(ctx)
107 c.AddSource(ctx, source)
108
109 deadline := time.Now().Add(3 * time.Second)
110 for time.Now().Before(deadline) {
111 seenMu.Lock()
112 n := len(seen)
113 seenMu.Unlock()
114 if n >= 8 {
115 break
116 }
117 time.Sleep(20 * time.Millisecond)
118 }
119
120 seenMu.Lock()
121 defer seenMu.Unlock()
122 if len(seen) != 8 {
123 t.Fatalf("processed %d events, want 8: %v", len(seen), seen)
124 }
125 for i, got := range seen {
126 if got != int64(i+1) {
127 t.Fatalf("event %d: got created=%d want %d", i, got, i+1)
128 }
129 }
130
131 if final := store.Get(source.Key()); final != 8 {
132 t.Fatalf("cursor = %d, want 8", final)
133 }
134}
135
136func TestConsumer_CursorMonotonic_OutOfOrderWorkers(t *testing.T) {
137 src := &memSrc{}
138 for i := range 4 {
139 src.add(mkEv(i))
140 }
141
142 source, _ := startEventServer(t, src)
143
144 store := &cursor.MemoryStore{}
145
146 releaseFirst := make(chan struct{})
147 processed := make(chan int64, 4)
148
149 cfg := ConsumerConfig{
150 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error {
151 if msg.Created == 1 {
152 <-releaseFirst
153 }
154 processed <- msg.Created
155 return nil
156 },
157 WorkerCount: 4,
158 QueueSize: 16,
159 ConnectionTimeout: 2 * time.Second,
160 CursorStore: store,
161 URLFunc: DefaultURL(true),
162 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
163 }
164 c := NewConsumer(cfg)
165
166 ctx, cancel := context.WithCancel(context.Background())
167 defer cancel()
168
169 c.Start(ctx)
170 c.AddSource(ctx, source)
171
172 for range 3 {
173 select {
174 case <-processed:
175 case <-time.After(3 * time.Second):
176 t.Fatal("timed out waiting for events 2-4 to be processed")
177 }
178 }
179
180 if cur := store.Get(source.Key()); cur != 4 {
181 t.Fatalf("cursor before slow worker finished = %d, want 4", cur)
182 }
183
184 close(releaseFirst)
185 select {
186 case <-processed:
187 case <-time.After(3 * time.Second):
188 t.Fatal("timed out waiting for slow worker")
189 }
190
191 if cur := store.Get(source.Key()); cur != 4 {
192 t.Fatalf("cursor regressed after slow worker: %d, want 4", cur)
193 }
194}
195
196func TestConsumer_StopTerminatesWithoutCtxCancel(t *testing.T) {
197 src := &memSrc{}
198 source, _ := startEventServer(t, src)
199
200 cfg := ConsumerConfig{
201 ProcessFunc: func(ctx context.Context, _ Source, _ eventstream.Event) error { return nil },
202 WorkerCount: 2,
203 QueueSize: 8,
204 ConnectionTimeout: 2 * time.Second,
205 CursorStore: &cursor.MemoryStore{},
206 URLFunc: DefaultURL(true),
207 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
208 }
209 c := NewConsumer(cfg)
210
211 c.Start(context.Background())
212 c.AddSource(context.Background(), source)
213
214 done := make(chan struct{})
215 go func() {
216 c.Stop()
217 close(done)
218 }()
219
220 select {
221 case <-done:
222 case <-time.After(5 * time.Second):
223 t.Fatal("Stop did not return within 5s")
224 }
225}
226
227func TestConsumer_ResumesFromStoredCursor(t *testing.T) {
228 src := &memSrc{}
229 for i := range 5 {
230 src.add(mkEv(i))
231 }
232
233 source, _ := startEventServer(t, src)
234
235 store := &cursor.MemoryStore{}
236 store.Set(source.Key(), 3)
237
238 seenMu := sync.Mutex{}
239 seen := []int64{}
240
241 cfg := ConsumerConfig{
242 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error {
243 seenMu.Lock()
244 seen = append(seen, msg.Created)
245 seenMu.Unlock()
246 return nil
247 },
248 WorkerCount: 1,
249 QueueSize: 16,
250 ConnectionTimeout: 2 * time.Second,
251 CursorStore: store,
252 URLFunc: DefaultURL(true),
253 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
254 }
255 c := NewConsumer(cfg)
256
257 ctx, cancel := context.WithCancel(context.Background())
258 defer cancel()
259
260 c.Start(ctx)
261 c.AddSource(ctx, source)
262
263 deadline := time.Now().Add(3 * time.Second)
264 for time.Now().Before(deadline) {
265 seenMu.Lock()
266 n := len(seen)
267 seenMu.Unlock()
268 if n >= 2 {
269 break
270 }
271 time.Sleep(20 * time.Millisecond)
272 }
273
274 seenMu.Lock()
275 defer seenMu.Unlock()
276 if len(seen) < 2 {
277 t.Fatalf("processed %d events, want 2: %v", len(seen), seen)
278 }
279 if seen[0] != 4 || seen[1] != 5 {
280 t.Fatalf("resumed events = %v, want [4 5]", seen)
281 }
282}