Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1package eventconsumer 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 "net/http" 10 "net/http/httptest" 11 "strings" 12 "sync" 13 "testing" 14 "time" 15 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/eventstream" 18 "tangled.org/core/notifier" 19) 20 21type memSrc struct { 22 mu sync.Mutex 23 events []eventstream.Event 24} 25 26func (s *memSrc) add(ev eventstream.Event) { 27 s.mu.Lock() 28 defer s.mu.Unlock() 29 s.events = append(s.events, ev) 30} 31 32func (s *memSrc) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 33 s.mu.Lock() 34 defer s.mu.Unlock() 35 out := []eventstream.Event{} 36 for _, ev := range s.events { 37 if ev.Created > cursor { 38 out = append(out, ev) 39 if len(out) == limit { 40 break 41 } 42 } 43 } 44 return out, nil 45} 46 47func mkEv(i int) eventstream.Event { 48 return eventstream.Event{ 49 Rkey: fmt.Sprintf("rk-%04d", i), 50 Nsid: "sh.tangled.test", 51 EventJson: json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)), 52 Created: int64(i + 1), 53 } 54} 55 56func startEventServer(t *testing.T, src *memSrc) (Source, *notifier.Notifier) { 57 t.Helper() 58 n := notifier.New() 59 mux := http.NewServeMux() 60 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 61 _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 62 Backend: src, 63 Notifier: &n, 64 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 65 BatchSize: 5, 66 MaxBatchesPerDrain: 100, 67 }) 68 }) 69 srv := httptest.NewServer(mux) 70 t.Cleanup(srv.Close) 71 addr := strings.TrimPrefix(srv.URL, "http://") 72 return Source{Kind: "test", Host: addr, NoTLS: true}, &n 73} 74 75func TestConsumer_DrainAdvancesCursor(t *testing.T) { 76 src := &memSrc{} 77 for i := range 8 { 78 src.add(mkEv(i)) 79 } 80 81 source, _ := startEventServer(t, src) 82 83 store := &cursor.MemoryStore{} 84 seenMu := sync.Mutex{} 85 seen := []int64{} 86 87 cfg := ConsumerConfig{ 88 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 89 seenMu.Lock() 90 seen = append(seen, msg.Created) 91 seenMu.Unlock() 92 return nil 93 }, 94 WorkerCount: 1, 95 QueueSize: 16, 96 ConnectionTimeout: 2 * time.Second, 97 CursorStore: store, 98 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 99 } 100 c := NewConsumer(cfg) 101 102 ctx, cancel := context.WithCancel(context.Background()) 103 defer cancel() 104 105 c.Start(ctx) 106 c.AddSource(ctx, source) 107 108 deadline := time.Now().Add(3 * time.Second) 109 for time.Now().Before(deadline) { 110 seenMu.Lock() 111 n := len(seen) 112 seenMu.Unlock() 113 if n >= 8 { 114 break 115 } 116 time.Sleep(20 * time.Millisecond) 117 } 118 119 seenMu.Lock() 120 defer seenMu.Unlock() 121 if len(seen) != 8 { 122 t.Fatalf("processed %d events, want 8: %v", len(seen), seen) 123 } 124 for i, got := range seen { 125 if got != int64(i+1) { 126 t.Fatalf("event %d: got created=%d want %d", i, got, i+1) 127 } 128 } 129 130 if final := store.Get(source.Key()); final != 8 { 131 t.Fatalf("cursor = %d, want 8", final) 132 } 133} 134 135func TestConsumer_CursorMonotonic_OutOfOrderWorkers(t *testing.T) { 136 src := &memSrc{} 137 for i := range 4 { 138 src.add(mkEv(i)) 139 } 140 141 source, _ := startEventServer(t, src) 142 143 store := &cursor.MemoryStore{} 144 145 releaseFirst := make(chan struct{}) 146 processed := make(chan int64, 4) 147 148 cfg := ConsumerConfig{ 149 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 150 if msg.Created == 1 { 151 <-releaseFirst 152 } 153 processed <- msg.Created 154 return nil 155 }, 156 WorkerCount: 4, 157 QueueSize: 16, 158 ConnectionTimeout: 2 * time.Second, 159 CursorStore: store, 160 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 161 } 162 c := NewConsumer(cfg) 163 164 ctx, cancel := context.WithCancel(context.Background()) 165 defer cancel() 166 167 c.Start(ctx) 168 c.AddSource(ctx, source) 169 170 for range 3 { 171 select { 172 case <-processed: 173 case <-time.After(3 * time.Second): 174 t.Fatal("timed out waiting for events 2-4 to be processed") 175 } 176 } 177 178 if cur := store.Get(source.Key()); cur != 4 { 179 t.Fatalf("cursor before slow worker finished = %d, want 4", cur) 180 } 181 182 close(releaseFirst) 183 select { 184 case <-processed: 185 case <-time.After(3 * time.Second): 186 t.Fatal("timed out waiting for slow worker") 187 } 188 189 if cur := store.Get(source.Key()); cur != 4 { 190 t.Fatalf("cursor regressed after slow worker: %d, want 4", cur) 191 } 192} 193 194func TestConsumer_StopTerminatesWithoutCtxCancel(t *testing.T) { 195 src := &memSrc{} 196 source, _ := startEventServer(t, src) 197 198 cfg := ConsumerConfig{ 199 ProcessFunc: func(ctx context.Context, _ Source, _ eventstream.Event) error { return nil }, 200 WorkerCount: 2, 201 QueueSize: 8, 202 ConnectionTimeout: 2 * time.Second, 203 CursorStore: &cursor.MemoryStore{}, 204 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 205 } 206 c := NewConsumer(cfg) 207 208 c.Start(context.Background()) 209 c.AddSource(context.Background(), source) 210 211 done := make(chan struct{}) 212 go func() { 213 c.Stop() 214 close(done) 215 }() 216 217 select { 218 case <-done: 219 case <-time.After(5 * time.Second): 220 t.Fatal("Stop did not return within 5s") 221 } 222} 223 224func TestConsumer_ResumesFromStoredCursor(t *testing.T) { 225 src := &memSrc{} 226 for i := range 5 { 227 src.add(mkEv(i)) 228 } 229 230 source, _ := startEventServer(t, src) 231 232 store := &cursor.MemoryStore{} 233 store.Set(source.Key(), 3) 234 235 seenMu := sync.Mutex{} 236 seen := []int64{} 237 238 cfg := ConsumerConfig{ 239 ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 240 seenMu.Lock() 241 seen = append(seen, msg.Created) 242 seenMu.Unlock() 243 return nil 244 }, 245 WorkerCount: 1, 246 QueueSize: 16, 247 ConnectionTimeout: 2 * time.Second, 248 CursorStore: store, 249 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 250 } 251 c := NewConsumer(cfg) 252 253 ctx, cancel := context.WithCancel(context.Background()) 254 defer cancel() 255 256 c.Start(ctx) 257 c.AddSource(ctx, source) 258 259 deadline := time.Now().Add(3 * time.Second) 260 for time.Now().Before(deadline) { 261 seenMu.Lock() 262 n := len(seen) 263 seenMu.Unlock() 264 if n >= 2 { 265 break 266 } 267 time.Sleep(20 * time.Millisecond) 268 } 269 270 seenMu.Lock() 271 defer seenMu.Unlock() 272 if len(seen) < 2 { 273 t.Fatalf("processed %d events, want 2: %v", len(seen), seen) 274 } 275 if seen[0] != 4 || seen[1] != 5 { 276 t.Fatalf("resumed events = %v, want [4 5]", seen) 277 } 278}