Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

at master 4.0 kB View raw
1package knotmirror 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 _ "net/http/pprof" 8 "time" 9 10 "github.com/go-chi/chi/v5" 11 "github.com/prometheus/client_golang/prometheus/promhttp" 12 "github.com/redis/go-redis/v9" 13 "tangled.org/core/idresolver" 14 "tangled.org/core/knotmirror/config" 15 "tangled.org/core/knotmirror/db" 16 "tangled.org/core/knotmirror/knotstream" 17 "tangled.org/core/knotmirror/models" 18 "tangled.org/core/knotmirror/xrpc" 19 "tangled.org/core/log" 20) 21 22func Run(ctx context.Context, cfg *config.Config) error { 23 // make sure every services are cleaned up on fast return 24 ctx, cancel := context.WithCancel(ctx) 25 defer cancel() 26 27 logger := log.FromContext(ctx) 28 29 db, err := db.Make(ctx, cfg.DbUrl, 32) 30 if err != nil { 31 return fmt.Errorf("initializing db: %w", err) 32 } 33 34 rdb := redis.NewClient(&redis.Options{Addr: cfg.RedisAddr}) 35 36 resolver := idresolver.DefaultResolver(cfg.PlcUrl) 37 38 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 39 gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL) 40 41 res, err := db.ExecContext(ctx, 42 `update repos set state = $1 where state = $2`, 43 models.RepoStateDesynchronized, 44 models.RepoStateResyncing, 45 ) 46 if err != nil { 47 return fmt.Errorf("clearing resyning states: %w", err) 48 } 49 rows, err := res.RowsAffected() 50 if err != nil { 51 return fmt.Errorf("getting affected rows: %w", err) 52 } 53 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 54 55 knotstream := knotstream.NewKnotStream(logger, db, cfg) 56 crawler := NewCrawler(logger, db) 57 resyncer := NewResyncer(logger, db, gitm, cfg) 58 xrpc := xrpc.New(logger, cfg, db, rdb, resolver, knotstream) 59 adminpage := NewAdminServer(logger, db, resyncer, xrpc) 60 61 // maintain repository list with tap 62 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 63 tap := NewTapClient(logger, cfg, db, gitm, knotstream) 64 65 // start http server 66 go func() { 67 logger.Info("starting http server", "addr", cfg.Listen) 68 69 mux := chi.NewRouter() 70 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 71 w.Write([]byte("Welcome to a knotmirror server.\n")) 72 }) 73 mux.Mount("/xrpc", xrpc.Router()) 74 75 if err := http.ListenAndServe(cfg.Listen, mux); err != nil { 76 logger.Error("xrpc server failed", "error", err) 77 } 78 }() 79 80 // start metrics endpoint 81 go func() { 82 metricsAddr := cfg.MetricsListen 83 logger.Info("starting metrics server", "addr", metricsAddr) 84 http.Handle("/metrics", promhttp.Handler()) 85 if err := http.ListenAndServe(metricsAddr, nil); err != nil { 86 logger.Error("metrics server failed", "error", err) 87 } 88 }() 89 90 // start admin page endpoint 91 go func() { 92 logger.Info("starting admin server", "addr", cfg.AdminListen) 93 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 94 logger.Error("admin server failed", "error", err) 95 } 96 }() 97 98 tap.Start(ctx) 99 100 resyncer.Start(ctx) 101 102 // periodically crawl the entire network to mirror the repositories 103 crawler.Start(ctx) 104 105 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 106 knotstream.Start(ctx) 107 108 svcErr := make(chan error, 1) 109 if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 110 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 111 } 112 113 logger.Info("startup complete") 114 select { 115 case <-ctx.Done(): 116 logger.Info("received shutdown signal", "reason", ctx.Err()) 117 case err := <-svcErr: 118 if err != nil { 119 logger.Error("service error", "error", err) 120 } 121 cancel() 122 } 123 124 logger.Info("shutting down knotmirror") 125 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 126 defer shutdownCancel() 127 128 var errs []error 129 if err := knotstream.Shutdown(shutdownCtx); err != nil { 130 errs = append(errs, err) 131 } 132 if err := db.Close(); err != nil { 133 errs = append(errs, err) 134 } 135 for _, err := range errs { 136 logger.Error("error during shutdown", "err", err) 137 } 138 139 logger.Info("shutdown complete") 140 return nil 141}