Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5
6 "tangled.org/core/appview/cache"
7 "tangled.org/core/appview/config"
8 ec "tangled.org/core/eventconsumer"
9 "tangled.org/core/eventconsumer/cursor"
10 "tangled.org/core/log"
11)
12
13func bootstrapStream(
14 ctx context.Context,
15 name string,
16 kind ec.Kind,
17 hosts []string,
18 redisAddr string,
19 streamCfg config.ConsumerConfig,
20 processFn ec.ProcessFunc,
21) *ec.Consumer {
22 logger := log.SubLogger(log.FromContext(ctx), name)
23
24 redisCache := cache.New(redisAddr)
25 cursorStore := cursor.NewRedisCursorStore(redisCache)
26
27 srcs := make(map[ec.Source]struct{}, len(hosts))
28 for _, h := range hosts {
29 src := ec.Source{Kind: kind, Host: h}
30 ec.MigrateLegacyCursor(&cursorStore, src)
31 srcs[src] = struct{}{}
32 }
33
34 return ec.NewConsumer(ec.ConsumerConfig{
35 Sources: srcs,
36 ProcessFunc: processFn,
37 RetryInterval: streamCfg.RetryInterval,
38 MaxRetryInterval: streamCfg.MaxRetryInterval,
39 ConnectionTimeout: streamCfg.ConnectionTimeout,
40 WorkerCount: streamCfg.WorkerCount,
41 QueueSize: streamCfg.QueueSize,
42 Logger: logger,
43 CursorStore: &cursorStore,
44 })
45}