Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1package knotmirror 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 _ "net/http/pprof" 8 "time" 9 10 "github.com/go-chi/chi/v5" 11 "github.com/prometheus/client_golang/prometheus/promhttp" 12 "github.com/redis/go-redis/v9" 13 "tangled.org/core/idresolver" 14 "tangled.org/core/knotmirror/config" 15 "tangled.org/core/knotmirror/db" 16 "tangled.org/core/knotmirror/knotstream" 17 "tangled.org/core/knotmirror/models" 18 "tangled.org/core/knotmirror/repoindexer" 19 "tangled.org/core/knotmirror/xrpc" 20 "tangled.org/core/log" 21) 22 23func Run(ctx context.Context, cfg *config.Config) error { 24 // make sure every services are cleaned up on fast return 25 ctx, cancel := context.WithCancel(ctx) 26 defer cancel() 27 28 logger := log.FromContext(ctx) 29 30 db, err := db.Make(ctx, cfg.DbUrl, 32) 31 if err != nil { 32 return fmt.Errorf("initializing db: %w", err) 33 } 34 35 rdb := redis.NewClient(&redis.Options{Addr: cfg.RedisAddr}) 36 37 resolver, err := idresolver.RedisResolver("redis://"+cfg.RedisAddr, cfg.PlcUrl) 38 if err != nil { 39 logger.Error("failed to create redis resolver for admin, falling back to default", "err", err) 40 resolver = idresolver.DefaultResolver(cfg.PlcUrl) 41 } 42 43 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 44 gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL) 45 46 res, err := db.ExecContext(ctx, 47 `update repos set state = $1 where state = $2`, 48 models.RepoStateDesynchronized, 49 models.RepoStateResyncing, 50 ) 51 if err != nil { 52 return fmt.Errorf("clearing resyning states: %w", err) 53 } 54 rows, err := res.RowsAffected() 55 if err != nil { 56 return fmt.Errorf("getting affected rows: %w", err) 57 } 58 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 59 60 indexer := repoindexer.NewIndexer(logger, cfg, rdb) 61 indexScheduler := repoindexer.NewBackgroundIndexScheduler(logger, cfg, db, indexer) 62 indexScheduler.Start(ctx) 63 64 knotstream := knotstream.NewKnotStream(logger, db, cfg) 65 crawler := NewCrawler(logger, db) 66 resyncer := NewResyncer(logger, db, gitm, indexScheduler, cfg) 67 xrpc := xrpc.New(logger, cfg, db, rdb, indexer, resolver, knotstream) 68 adminpage := NewAdminServer(logger, db, resyncer, xrpc, resolver) 69 70 // maintain repository list with tap 71 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 72 tap := NewTapClient(logger, cfg, db, gitm, knotstream) 73 74 // start http server 75 go func() { 76 logger.Info("starting http server", "addr", cfg.Listen) 77 78 mux := chi.NewRouter() 79 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 80 w.Write([]byte("Welcome to a knotmirror server.\n")) 81 }) 82 mux.Mount("/xrpc", xrpc.Router()) 83 84 if err := http.ListenAndServe(cfg.Listen, mux); err != nil { 85 logger.Error("xrpc server failed", "error", err) 86 } 87 }() 88 89 // start metrics endpoint 90 go func() { 91 metricsAddr := cfg.MetricsListen 92 logger.Info("starting metrics server", "addr", metricsAddr) 93 http.Handle("/metrics", promhttp.Handler()) 94 if err := http.ListenAndServe(metricsAddr, nil); err != nil { 95 logger.Error("metrics server failed", "error", err) 96 } 97 }() 98 99 // start admin page endpoint 100 go func() { 101 logger.Info("starting admin server", "addr", cfg.AdminListen) 102 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 103 logger.Error("admin server failed", "error", err) 104 } 105 }() 106 107 tap.Start(ctx) 108 109 resyncer.Start(ctx) 110 111 // periodically crawl the entire network to mirror the repositories 112 crawler.Start(ctx) 113 114 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 115 knotstream.Start(ctx) 116 117 svcErr := make(chan error, 1) 118 if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 119 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 120 } 121 122 logger.Info("startup complete") 123 select { 124 case <-ctx.Done(): 125 logger.Info("received shutdown signal", "reason", ctx.Err()) 126 case err := <-svcErr: 127 if err != nil { 128 logger.Error("service error", "error", err) 129 } 130 cancel() 131 } 132 133 logger.Info("shutting down knotmirror") 134 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 135 defer shutdownCancel() 136 137 var errs []error 138 if err := knotstream.Shutdown(shutdownCtx); err != nil { 139 errs = append(errs, err) 140 } 141 if err := db.Close(); err != nil { 142 errs = append(errs, err) 143 } 144 for _, err := range errs { 145 logger.Error("error during shutdown", "err", err) 146 } 147 148 logger.Info("shutdown complete") 149 return nil 150}