Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5
6 "tangled.org/core/appview/cache"
7 "tangled.org/core/appview/config"
8 ec "tangled.org/core/eventconsumer"
9 "tangled.org/core/eventconsumer/cursor"
10 "tangled.org/core/log"
11)
12
13func bootstrapStream(
14 ctx context.Context,
15 name string,
16 kind ec.Kind,
17 hosts []string,
18 redisAddr string,
19 streamCfg config.ConsumerConfig,
20 dev bool,
21 processFn ec.ProcessFunc,
22) *ec.Consumer {
23 logger := log.SubLogger(log.FromContext(ctx), name)
24
25 redisCache := cache.New(redisAddr)
26 cursorStore := cursor.NewRedisCursorStore(redisCache)
27
28 srcs := make(map[ec.Source]struct{}, len(hosts))
29 for _, h := range hosts {
30 src := ec.Source{Kind: kind, Host: h}
31 ec.MigrateLegacyCursor(&cursorStore, src)
32 srcs[src] = struct{}{}
33 }
34
35 return ec.NewConsumer(ec.ConsumerConfig{
36 Sources: srcs,
37 ProcessFunc: processFn,
38 RetryInterval: streamCfg.RetryInterval,
39 MaxRetryInterval: streamCfg.MaxRetryInterval,
40 ConnectionTimeout: streamCfg.ConnectionTimeout,
41 WorkerCount: streamCfg.WorkerCount,
42 QueueSize: streamCfg.QueueSize,
43 Logger: logger,
44 URLFunc: ec.DefaultURL(dev),
45 CursorStore: &cursorStore,
46 })
47}