Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package eventconsumer 2 3import ( 4 "context" 5 "io" 6 "log/slog" 7 "path/filepath" 8 "sync" 9 "testing" 10 "time" 11 12 "tangled.org/core/eventconsumer/cursor" 13 "tangled.org/core/eventstream" 14) 15 16func sqliteCursorStore(t *testing.T) cursor.Store { 17 t.Helper() 18 store, err := cursor.NewSQLiteStore(filepath.Join(t.TempDir(), "spindle.db")) 19 if err != nil { 20 t.Fatalf("new sqlite cursor store: %v", err) 21 } 22 return store 23} 24 25func drainProcessed(t *testing.T, store cursor.Store, source Source) []int64 { 26 t.Helper() 27 28 var mu sync.Mutex 29 var seen []int64 30 31 c := NewConsumer(ConsumerConfig{ 32 ProcessFunc: func(_ context.Context, _ Source, msg eventstream.Event) error { 33 mu.Lock() 34 seen = append(seen, msg.Created) 35 mu.Unlock() 36 return nil 37 }, 38 WorkerCount: 1, 39 QueueSize: 16, 40 ConnectionTimeout: 2 * time.Second, 41 CursorStore: store, 42 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 43 }) 44 45 ctx, cancel := context.WithCancel(context.Background()) 46 t.Cleanup(cancel) 47 c.Start(ctx) 48 c.AddSource(ctx, source) 49 50 deadline := time.Now().Add(3 * time.Second) 51 last, stable := -1, 0 52 for time.Now().Before(deadline) { 53 time.Sleep(100 * time.Millisecond) 54 mu.Lock() 55 n := len(seen) 56 mu.Unlock() 57 if n == last { 58 if stable++; stable >= 3 && n > 0 { 59 break 60 } 61 } else { 62 last, stable = n, 0 63 } 64 } 65 66 mu.Lock() 67 defer mu.Unlock() 68 return append([]int64(nil), seen...) 69} 70 71func TestSpindleUpgrade_OrphanedCursorReplaysFromZero(t *testing.T) { 72 src := &memSrc{} 73 for i := range 8 { 74 src.add(mkEv(i)) 75 } 76 source, _ := startEventServer(t, src) 77 78 store := sqliteCursorStore(t) 79 store.Set(source.Host, 5) 80 81 seen := drainProcessed(t, store, source) 82 83 if len(seen) != 8 { 84 t.Fatalf("orphaned bare-host cursor processed %d events, want a full replay of 8: %v", len(seen), seen) 85 } 86} 87 88func TestSpindleUpgrade_MigratedCursorResumesNoReplay(t *testing.T) { 89 src := &memSrc{} 90 for i := range 8 { 91 src.add(mkEv(i)) 92 } 93 source, _ := startEventServer(t, src) 94 95 store := sqliteCursorStore(t) 96 store.Set(source.Host, 5) 97 98 MigrateLegacyCursor(store, source) 99 100 seen := drainProcessed(t, store, source) 101 102 if len(seen) != 3 { 103 t.Fatalf("migrated cursor processed %d events, want a resume of 3: %v", len(seen), seen) 104 } 105 if seen[0] != 6 || seen[2] != 8 { 106 t.Fatalf("resumed events = %v, want [6 7 8]", seen) 107 } 108}