Monorepo for Tangled
tangled.org
1package knotmirror
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 _ "net/http/pprof"
8 "time"
9
10 "github.com/go-chi/chi/v5"
11 "github.com/prometheus/client_golang/prometheus/promhttp"
12 "github.com/redis/go-redis/v9"
13 "tangled.org/core/idresolver"
14 "tangled.org/core/knotmirror/config"
15 "tangled.org/core/knotmirror/db"
16 "tangled.org/core/knotmirror/knotstream"
17 "tangled.org/core/knotmirror/models"
18 "tangled.org/core/knotmirror/repoindexer"
19 "tangled.org/core/knotmirror/xrpc"
20 "tangled.org/core/log"
21)
22
23func Run(ctx context.Context, cfg *config.Config) error {
24 // make sure every services are cleaned up on fast return
25 ctx, cancel := context.WithCancel(ctx)
26 defer cancel()
27
28 logger := log.FromContext(ctx)
29
30 db, err := db.Make(ctx, cfg.DbUrl, 32)
31 if err != nil {
32 return fmt.Errorf("initializing db: %w", err)
33 }
34
35 rdb := redis.NewClient(&redis.Options{Addr: cfg.RedisAddr})
36
37 resolver, err := idresolver.RedisResolver("redis://"+cfg.RedisAddr, cfg.PlcUrl)
38 if err != nil {
39 logger.Error("failed to create redis resolver for admin, falling back to default", "err", err)
40 resolver = idresolver.DefaultResolver(cfg.PlcUrl)
41 }
42
43 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
44 gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL)
45
46 res, err := db.ExecContext(ctx,
47 `update repos set state = $1 where state = $2`,
48 models.RepoStateDesynchronized,
49 models.RepoStateResyncing,
50 )
51 if err != nil {
52 return fmt.Errorf("clearing resyning states: %w", err)
53 }
54 rows, err := res.RowsAffected()
55 if err != nil {
56 return fmt.Errorf("getting affected rows: %w", err)
57 }
58 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows))
59
60 indexer := repoindexer.NewIndexer(logger, cfg, rdb)
61 indexScheduler := repoindexer.NewBackgroundIndexScheduler(logger, cfg, db, indexer)
62 indexScheduler.Start(ctx)
63
64 knotstream := knotstream.NewKnotStream(logger, db, cfg)
65 crawler := NewCrawler(logger, db)
66 resyncer := NewResyncer(logger, db, gitm, indexScheduler, cfg)
67 xrpc := xrpc.New(logger, cfg, db, rdb, indexer, resolver, knotstream)
68 adminpage := NewAdminServer(logger, db, resyncer, xrpc, resolver)
69
70 // maintain repository list with tap
71 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
72 tap := NewTapClient(logger, cfg, db, gitm, knotstream)
73
74 // start http server
75 go func() {
76 logger.Info("starting http server", "addr", cfg.Listen)
77
78 mux := chi.NewRouter()
79 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
80 w.Write([]byte("Welcome to a knotmirror server.\n"))
81 })
82 mux.Mount("/xrpc", xrpc.Router())
83
84 if err := http.ListenAndServe(cfg.Listen, mux); err != nil {
85 logger.Error("xrpc server failed", "error", err)
86 }
87 }()
88
89 // start metrics endpoint
90 go func() {
91 metricsAddr := cfg.MetricsListen
92 logger.Info("starting metrics server", "addr", metricsAddr)
93 http.Handle("/metrics", promhttp.Handler())
94 if err := http.ListenAndServe(metricsAddr, nil); err != nil {
95 logger.Error("metrics server failed", "error", err)
96 }
97 }()
98
99 // start admin page endpoint
100 go func() {
101 logger.Info("starting admin server", "addr", cfg.AdminListen)
102 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil {
103 logger.Error("admin server failed", "error", err)
104 }
105 }()
106
107 tap.Start(ctx)
108
109 resyncer.Start(ctx)
110
111 // periodically crawl the entire network to mirror the repositories
112 crawler.Start(ctx)
113
114 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots)
115 knotstream.Start(ctx)
116
117 svcErr := make(chan error, 1)
118 if err := knotstream.ResubscribeAllHosts(ctx); err != nil {
119 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err)
120 }
121
122 logger.Info("startup complete")
123 select {
124 case <-ctx.Done():
125 logger.Info("received shutdown signal", "reason", ctx.Err())
126 case err := <-svcErr:
127 if err != nil {
128 logger.Error("service error", "error", err)
129 }
130 cancel()
131 }
132
133 logger.Info("shutting down knotmirror")
134 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
135 defer shutdownCancel()
136
137 var errs []error
138 if err := knotstream.Shutdown(shutdownCtx); err != nil {
139 errs = append(errs, err)
140 }
141 if err := db.Close(); err != nil {
142 errs = append(errs, err)
143 }
144 for _, err := range errs {
145 logger.Error("error during shutdown", "err", err)
146 }
147
148 logger.Info("shutdown complete")
149 return nil
150}