Monorepo for Tangled
tangled.org
1package eventconsumer
2
3import (
4 "context"
5 "io"
6 "log/slog"
7 "path/filepath"
8 "sync"
9 "testing"
10 "time"
11
12 "tangled.org/core/eventconsumer/cursor"
13 "tangled.org/core/eventstream"
14)
15
16func sqliteCursorStore(t *testing.T) cursor.Store {
17 t.Helper()
18 store, err := cursor.NewSQLiteStore(filepath.Join(t.TempDir(), "spindle.db"))
19 if err != nil {
20 t.Fatalf("new sqlite cursor store: %v", err)
21 }
22 return store
23}
24
25func drainProcessed(t *testing.T, store cursor.Store, source Source) []int64 {
26 t.Helper()
27
28 var mu sync.Mutex
29 var seen []int64
30
31 c := NewConsumer(ConsumerConfig{
32 ProcessFunc: func(_ context.Context, _ Source, msg eventstream.Event) error {
33 mu.Lock()
34 seen = append(seen, msg.Created)
35 mu.Unlock()
36 return nil
37 },
38 WorkerCount: 1,
39 QueueSize: 16,
40 ConnectionTimeout: 2 * time.Second,
41 CursorStore: store,
42 URLFunc: DefaultURL(true),
43 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
44 })
45
46 ctx, cancel := context.WithCancel(context.Background())
47 t.Cleanup(cancel)
48 c.Start(ctx)
49 c.AddSource(ctx, source)
50
51 deadline := time.Now().Add(3 * time.Second)
52 last, stable := -1, 0
53 for time.Now().Before(deadline) {
54 time.Sleep(100 * time.Millisecond)
55 mu.Lock()
56 n := len(seen)
57 mu.Unlock()
58 if n == last {
59 if stable++; stable >= 3 && n > 0 {
60 break
61 }
62 } else {
63 last, stable = n, 0
64 }
65 }
66
67 mu.Lock()
68 defer mu.Unlock()
69 return append([]int64(nil), seen...)
70}
71
72func TestSpindleUpgrade_OrphanedCursorReplaysFromZero(t *testing.T) {
73 src := &memSrc{}
74 for i := range 8 {
75 src.add(mkEv(i))
76 }
77 source, _ := startEventServer(t, src)
78
79 store := sqliteCursorStore(t)
80 store.Set(source.Host, 5)
81
82 seen := drainProcessed(t, store, source)
83
84 if len(seen) != 8 {
85 t.Fatalf("orphaned bare-host cursor processed %d events, want a full replay of 8: %v", len(seen), seen)
86 }
87}
88
89func TestSpindleUpgrade_MigratedCursorResumesNoReplay(t *testing.T) {
90 src := &memSrc{}
91 for i := range 8 {
92 src.add(mkEv(i))
93 }
94 source, _ := startEventServer(t, src)
95
96 store := sqliteCursorStore(t)
97 store.Set(source.Host, 5)
98
99 MigrateLegacyCursor(store, source)
100
101 seen := drainProcessed(t, store, source)
102
103 if len(seen) != 3 {
104 t.Fatalf("migrated cursor processed %d events, want a resume of 3: %v", len(seen), seen)
105 }
106 if seen[0] != 6 || seen[2] != 8 {
107 t.Fatalf("resumed events = %v, want [6 7 8]", seen)
108 }
109}