Monorepo for Tangled
tangled.org
1package eventconsumer
2
3import (
4 "context"
5 "io"
6 "log/slog"
7 "path/filepath"
8 "sync"
9 "testing"
10 "time"
11
12 "tangled.org/core/eventconsumer/cursor"
13 "tangled.org/core/eventstream"
14)
15
16func sqliteCursorStore(t *testing.T) cursor.Store {
17 t.Helper()
18 store, err := cursor.NewSQLiteStore(filepath.Join(t.TempDir(), "spindle.db"))
19 if err != nil {
20 t.Fatalf("new sqlite cursor store: %v", err)
21 }
22 return store
23}
24
25func drainProcessed(t *testing.T, store cursor.Store, source Source) []int64 {
26 t.Helper()
27
28 var mu sync.Mutex
29 var seen []int64
30
31 c := NewConsumer(ConsumerConfig{
32 ProcessFunc: func(_ context.Context, _ Source, msg eventstream.Event) error {
33 mu.Lock()
34 seen = append(seen, msg.Created)
35 mu.Unlock()
36 return nil
37 },
38 WorkerCount: 1,
39 QueueSize: 16,
40 ConnectionTimeout: 2 * time.Second,
41 CursorStore: store,
42 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
43 })
44
45 ctx, cancel := context.WithCancel(context.Background())
46 t.Cleanup(cancel)
47 c.Start(ctx)
48 c.AddSource(ctx, source)
49
50 deadline := time.Now().Add(3 * time.Second)
51 last, stable := -1, 0
52 for time.Now().Before(deadline) {
53 time.Sleep(100 * time.Millisecond)
54 mu.Lock()
55 n := len(seen)
56 mu.Unlock()
57 if n == last {
58 if stable++; stable >= 3 && n > 0 {
59 break
60 }
61 } else {
62 last, stable = n, 0
63 }
64 }
65
66 mu.Lock()
67 defer mu.Unlock()
68 return append([]int64(nil), seen...)
69}
70
71func TestSpindleUpgrade_OrphanedCursorReplaysFromZero(t *testing.T) {
72 src := &memSrc{}
73 for i := range 8 {
74 src.add(mkEv(i))
75 }
76 source, _ := startEventServer(t, src)
77
78 store := sqliteCursorStore(t)
79 store.Set(source.Host, 5)
80
81 seen := drainProcessed(t, store, source)
82
83 if len(seen) != 8 {
84 t.Fatalf("orphaned bare-host cursor processed %d events, want a full replay of 8: %v", len(seen), seen)
85 }
86}
87
88func TestSpindleUpgrade_MigratedCursorResumesNoReplay(t *testing.T) {
89 src := &memSrc{}
90 for i := range 8 {
91 src.add(mkEv(i))
92 }
93 source, _ := startEventServer(t, src)
94
95 store := sqliteCursorStore(t)
96 store.Set(source.Host, 5)
97
98 MigrateLegacyCursor(store, source)
99
100 seen := drainProcessed(t, store, source)
101
102 if len(seen) != 3 {
103 t.Fatalf("migrated cursor processed %d events, want a resume of 3: %v", len(seen), seen)
104 }
105 if seen[0] != 6 || seen[2] != 8 {
106 t.Fatalf("resumed events = %v, want [6 7 8]", seen)
107 }
108}