Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/kxpzqo 6.3 kB View raw
1package eventconsumer 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 "net/http" 10 "net/http/httptest" 11 "strings" 12 "sync" 13 "testing" 14 "time" 15 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/eventstream" 18 "tangled.org/core/notifier" 19) 20 21type memSrc struct { 22 mu sync.Mutex 23 events []eventstream.Event 24} 25 26func (s *memSrc) add(ev eventstream.Event) { 27 s.mu.Lock() 28 defer s.mu.Unlock() 29 s.events = append(s.events, ev) 30} 31 32func (s *memSrc) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 33 s.mu.Lock() 34 defer s.mu.Unlock() 35 out := []eventstream.Event{} 36 for _, ev := range s.events { 37 if ev.Created > cursor { 38 out = append(out, ev) 39 if len(out) == limit { 40 break 41 } 42 } 43 } 44 return out, nil 45} 46 47func mkEv(i int) eventstream.Event { 48 return eventstream.Event{ 49 Rkey: fmt.Sprintf("rk-%04d", i), 50 Nsid: "sh.tangled.test", 51 EventJson: json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)), 52 Created: int64(i + 1), 53 } 54} 55 56func startEventServer(t *testing.T, src *memSrc) (Source, *notifier.Notifier) { 57 t.Helper() 58 n := notifier.New() 59 mux := http.NewServeMux() 60 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 61 _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 62 Backend: src, 63 Notifier: &n, 64 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 65 BatchSize: 5, 66 MaxBatchesPerDrain: 100, 67 }) 68 }) 69 srv := httptest.NewServer(mux) 70 t.Cleanup(srv.Close) 71 addr := strings.TrimPrefix(srv.URL, "http://") 72 return Source{Kind: "test", Host: addr}, &n 73} 74 75func TestConsumer_DrainAdvancesCursor(t *testing.T) { 76 src := &memSrc{} 77 for i := range 8 { 78 src.add(mkEv(i)) 79 } 80 81 source, _ := startEventServer(t, src) 82 83 store := &cursor.MemoryStore{} 84 seenMu := sync.Mutex{} 85 seen := []int64{} 86 87 cfg := ConsumerConfig{ 88 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 89 seenMu.Lock() 90 seen = append(seen, msg.Created) 91 seenMu.Unlock() 92 return nil 93 }, 94 WorkerCount: 1, 95 QueueSize: 16, 96 ConnectionTimeout: 2 * time.Second, 97 CursorStore: store, 98 URLFunc: DefaultURL(true), 99 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 100 } 101 c := NewConsumer(cfg) 102 103 ctx, cancel := context.WithCancel(context.Background()) 104 defer cancel() 105 106 c.Start(ctx) 107 c.AddSource(ctx, source) 108 109 deadline := time.Now().Add(3 * time.Second) 110 for time.Now().Before(deadline) { 111 seenMu.Lock() 112 n := len(seen) 113 seenMu.Unlock() 114 if n >= 8 { 115 break 116 } 117 time.Sleep(20 * time.Millisecond) 118 } 119 120 seenMu.Lock() 121 defer seenMu.Unlock() 122 if len(seen) != 8 { 123 t.Fatalf("processed %d events, want 8: %v", len(seen), seen) 124 } 125 for i, got := range seen { 126 if got != int64(i+1) { 127 t.Fatalf("event %d: got created=%d want %d", i, got, i+1) 128 } 129 } 130 131 if final := store.Get(source.Key()); final != 8 { 132 t.Fatalf("cursor = %d, want 8", final) 133 } 134} 135 136func TestConsumer_CursorMonotonic_OutOfOrderWorkers(t *testing.T) { 137 src := &memSrc{} 138 for i := range 4 { 139 src.add(mkEv(i)) 140 } 141 142 source, _ := startEventServer(t, src) 143 144 store := &cursor.MemoryStore{} 145 146 releaseFirst := make(chan struct{}) 147 processed := make(chan int64, 4) 148 149 cfg := ConsumerConfig{ 150 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 151 if msg.Created == 1 { 152 <-releaseFirst 153 } 154 processed <- msg.Created 155 return nil 156 }, 157 WorkerCount: 4, 158 QueueSize: 16, 159 ConnectionTimeout: 2 * time.Second, 160 CursorStore: store, 161 URLFunc: DefaultURL(true), 162 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 163 } 164 c := NewConsumer(cfg) 165 166 ctx, cancel := context.WithCancel(context.Background()) 167 defer cancel() 168 169 c.Start(ctx) 170 c.AddSource(ctx, source) 171 172 for range 3 { 173 select { 174 case <-processed: 175 case <-time.After(3 * time.Second): 176 t.Fatal("timed out waiting for events 2-4 to be processed") 177 } 178 } 179 180 if cur := store.Get(source.Key()); cur != 4 { 181 t.Fatalf("cursor before slow worker finished = %d, want 4", cur) 182 } 183 184 close(releaseFirst) 185 select { 186 case <-processed: 187 case <-time.After(3 * time.Second): 188 t.Fatal("timed out waiting for slow worker") 189 } 190 191 if cur := store.Get(source.Key()); cur != 4 { 192 t.Fatalf("cursor regressed after slow worker: %d, want 4", cur) 193 } 194} 195 196func TestConsumer_StopTerminatesWithoutCtxCancel(t *testing.T) { 197 src := &memSrc{} 198 source, _ := startEventServer(t, src) 199 200 cfg := ConsumerConfig{ 201 ProcessFunc: func(ctx context.Context, _ Source, _ eventstream.Event) error { return nil }, 202 WorkerCount: 2, 203 QueueSize: 8, 204 ConnectionTimeout: 2 * time.Second, 205 CursorStore: &cursor.MemoryStore{}, 206 URLFunc: DefaultURL(true), 207 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 208 } 209 c := NewConsumer(cfg) 210 211 c.Start(context.Background()) 212 c.AddSource(context.Background(), source) 213 214 done := make(chan struct{}) 215 go func() { 216 c.Stop() 217 close(done) 218 }() 219 220 select { 221 case <-done: 222 case <-time.After(5 * time.Second): 223 t.Fatal("Stop did not return within 5s") 224 } 225} 226 227func TestConsumer_ResumesFromStoredCursor(t *testing.T) { 228 src := &memSrc{} 229 for i := range 5 { 230 src.add(mkEv(i)) 231 } 232 233 source, _ := startEventServer(t, src) 234 235 store := &cursor.MemoryStore{} 236 store.Set(source.Key(), 3) 237 238 seenMu := sync.Mutex{} 239 seen := []int64{} 240 241 cfg := ConsumerConfig{ 242 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 243 seenMu.Lock() 244 seen = append(seen, msg.Created) 245 seenMu.Unlock() 246 return nil 247 }, 248 WorkerCount: 1, 249 QueueSize: 16, 250 ConnectionTimeout: 2 * time.Second, 251 CursorStore: store, 252 URLFunc: DefaultURL(true), 253 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 254 } 255 c := NewConsumer(cfg) 256 257 ctx, cancel := context.WithCancel(context.Background()) 258 defer cancel() 259 260 c.Start(ctx) 261 c.AddSource(ctx, source) 262 263 deadline := time.Now().Add(3 * time.Second) 264 for time.Now().Before(deadline) { 265 seenMu.Lock() 266 n := len(seen) 267 seenMu.Unlock() 268 if n >= 2 { 269 break 270 } 271 time.Sleep(20 * time.Millisecond) 272 } 273 274 seenMu.Lock() 275 defer seenMu.Unlock() 276 if len(seen) < 2 { 277 t.Fatalf("processed %d events, want 2: %v", len(seen), seen) 278 } 279 if seen[0] != 4 || seen[1] != 5 { 280 t.Fatalf("resumed events = %v, want [4 5]", seen) 281 } 282}