Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package eventconsumer 2 3import ( 4 "context" 5 "io" 6 "log/slog" 7 "path/filepath" 8 "sync" 9 "testing" 10 "time" 11 12 "tangled.org/core/eventconsumer/cursor" 13 "tangled.org/core/eventstream" 14) 15 16func sqliteCursorStore(t *testing.T) cursor.Store { 17 t.Helper() 18 store, err := cursor.NewSQLiteStore(filepath.Join(t.TempDir(), "spindle.db")) 19 if err != nil { 20 t.Fatalf("new sqlite cursor store: %v", err) 21 } 22 return store 23} 24 25func drainProcessed(t *testing.T, store cursor.Store, source Source) []int64 { 26 t.Helper() 27 28 var mu sync.Mutex 29 var seen []int64 30 31 c := NewConsumer(ConsumerConfig{ 32 ProcessFunc: func(_ context.Context, _ Source, msg eventstream.Event) error { 33 mu.Lock() 34 seen = append(seen, msg.Created) 35 mu.Unlock() 36 return nil 37 }, 38 WorkerCount: 1, 39 QueueSize: 16, 40 ConnectionTimeout: 2 * time.Second, 41 CursorStore: store, 42 URLFunc: DefaultURL(true), 43 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 44 }) 45 46 ctx, cancel := context.WithCancel(context.Background()) 47 t.Cleanup(cancel) 48 c.Start(ctx) 49 c.AddSource(ctx, source) 50 51 deadline := time.Now().Add(3 * time.Second) 52 last, stable := -1, 0 53 for time.Now().Before(deadline) { 54 time.Sleep(100 * time.Millisecond) 55 mu.Lock() 56 n := len(seen) 57 mu.Unlock() 58 if n == last { 59 if stable++; stable >= 3 && n > 0 { 60 break 61 } 62 } else { 63 last, stable = n, 0 64 } 65 } 66 67 mu.Lock() 68 defer mu.Unlock() 69 return append([]int64(nil), seen...) 70} 71 72func TestSpindleUpgrade_OrphanedCursorReplaysFromZero(t *testing.T) { 73 src := &memSrc{} 74 for i := range 8 { 75 src.add(mkEv(i)) 76 } 77 source, _ := startEventServer(t, src) 78 79 store := sqliteCursorStore(t) 80 store.Set(source.Host, 5) 81 82 seen := drainProcessed(t, store, source) 83 84 if len(seen) != 8 { 85 t.Fatalf("orphaned bare-host cursor processed %d events, want a full replay of 8: %v", len(seen), seen) 86 } 87} 88 89func TestSpindleUpgrade_MigratedCursorResumesNoReplay(t *testing.T) { 90 src := &memSrc{} 91 for i := range 8 { 92 src.add(mkEv(i)) 93 } 94 source, _ := startEventServer(t, src) 95 96 store := sqliteCursorStore(t) 97 store.Set(source.Host, 5) 98 99 MigrateLegacyCursor(store, source) 100 101 seen := drainProcessed(t, store, source) 102 103 if len(seen) != 3 { 104 t.Fatalf("migrated cursor processed %d events, want a resume of 3: %v", len(seen), seen) 105 } 106 if seen[0] != 6 || seen[2] != 8 { 107 t.Fatalf("resumed events = %v, want [6 7 8]", seen) 108 } 109}