Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "net/url" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "tangled.org/core/knotmirror/config" 18 "tangled.org/core/knotmirror/db" 19 "tangled.org/core/knotmirror/knotstream" 20 "tangled.org/core/knotmirror/models" 21 "tangled.org/core/log" 22) 23 24type Resyncer struct { 25 logger *slog.Logger 26 db *sql.DB 27 gitm GitMirrorManager 28 cfg *config.Config 29 indexer *knotstream.ParallelScheduler 30 31 claimJobMu sync.Mutex 32 33 runningJobs map[syntax.DID]context.CancelFunc 34 runningJobsMu sync.Mutex 35 36 repoFetchTimeout time.Duration 37 manualResyncTimeout time.Duration 38 parallelism int 39 40 knotBackoff map[string]time.Time 41 knotBackoffMu sync.RWMutex 42 43 httpClient *http.Client 44} 45 46func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer { 47 return &Resyncer{ 48 logger: log.SubLogger(l, "resyncer"), 49 db: db, 50 gitm: gitm, 51 cfg: cfg, 52 indexer: indexer, 53 54 runningJobs: make(map[syntax.DID]context.CancelFunc), 55 56 repoFetchTimeout: cfg.GitRepoFetchTimeout, 57 manualResyncTimeout: 30 * time.Minute, 58 parallelism: cfg.ResyncParallelism, 59 60 knotBackoff: make(map[string]time.Time), 61 62 httpClient: &http.Client{Timeout: 30 * time.Second}, 63 } 64} 65 66func (r *Resyncer) Start(ctx context.Context) { 67 for i := 0; i < r.parallelism; i++ { 68 go r.runResyncWorker(ctx, i) 69 } 70} 71 72func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 73 l := r.logger.With("worker", workerID) 74 for { 75 select { 76 case <-ctx.Done(): 77 l.Info("resync worker shutting down", "error", ctx.Err()) 78 return 79 default: 80 } 81 repoDid, found, err := r.claimResyncJob(ctx) 82 if err != nil { 83 l.Error("failed to claim resync job", "error", err) 84 time.Sleep(time.Second) 85 continue 86 } 87 if !found { 88 time.Sleep(time.Second) 89 continue 90 } 91 l.Info("processing resync", "did", repoDid) 92 if err := r.resyncRepo(ctx, repoDid); err != nil { 93 l.Error("resync failed", "did", repoDid, "error", err) 94 } 95 } 96} 97 98func (r *Resyncer) registerRunning(repo syntax.DID, cancel context.CancelFunc) { 99 r.runningJobsMu.Lock() 100 defer r.runningJobsMu.Unlock() 101 102 if _, exists := r.runningJobs[repo]; exists { 103 return 104 } 105 r.runningJobs[repo] = cancel 106} 107 108func (r *Resyncer) unregisterRunning(repo syntax.DID) { 109 r.runningJobsMu.Lock() 110 defer r.runningJobsMu.Unlock() 111 112 delete(r.runningJobs, repo) 113} 114 115func (r *Resyncer) CancelResyncJob(repo syntax.DID) { 116 r.runningJobsMu.Lock() 117 defer r.runningJobsMu.Unlock() 118 119 cancel, ok := r.runningJobs[repo] 120 if !ok { 121 return 122 } 123 delete(r.runningJobs, repo) 124 cancel() 125} 126 127// TriggerResyncJob manually triggers the resync job 128func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoDid syntax.DID) error { 129 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 130 if err != nil { 131 return fmt.Errorf("failed to get repo: %w", err) 132 } 133 if repo == nil { 134 return fmt.Errorf("repo not found: %s", repoDid) 135 } 136 137 if repo.State == models.RepoStateResyncing { 138 return fmt.Errorf("repo already resyncing") 139 } 140 141 repo.State = models.RepoStatePending 142 repo.RetryAfter = -1 // resyncer will prioritize this 143 144 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 145 return fmt.Errorf("updating repo state to pending %w", err) 146 } 147 return nil 148} 149 150func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.DID, bool, error) { 151 // use mutex to prevent duplicated jobs 152 r.claimJobMu.Lock() 153 defer r.claimJobMu.Unlock() 154 155 var repoDid syntax.DID 156 now := time.Now().Unix() 157 if err := r.db.QueryRowContext(ctx, 158 `update repos 159 set state = $1 160 where repo_did = ( 161 select repo_did from repos 162 where state in ($2, $3, $4) 163 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 164 order by 165 (retry_after = -1) desc, 166 (retry_after = 0) desc, 167 retry_after 168 limit 1 169 ) 170 returning repo_did 171 `, 172 models.RepoStateResyncing, 173 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 174 now, 175 ).Scan(&repoDid); err != nil { 176 if errors.Is(err, sql.ErrNoRows) { 177 return "", false, nil 178 } 179 return "", false, err 180 } 181 182 return repoDid, true, nil 183} 184 185func (r *Resyncer) resyncRepo(ctx context.Context, repoDid syntax.DID) error { 186 // ctx, span := tracer.Start(ctx, "resyncRepo") 187 // span.SetAttributes(attribute.String("aturi", repoAt)) 188 // defer span.End() 189 190 resyncsStarted.Inc() 191 startTime := time.Now() 192 193 jobCtx, cancel := context.WithCancel(ctx) 194 r.registerRunning(repoDid, cancel) 195 defer r.unregisterRunning(repoDid) 196 197 success, err := r.doResync(jobCtx, repoDid) 198 if !success { 199 resyncsFailed.Inc() 200 resyncDuration.Observe(time.Since(startTime).Seconds()) 201 return r.handleResyncFailure(ctx, repoDid, err) 202 } 203 204 resyncsCompleted.Inc() 205 resyncDuration.Observe(time.Since(startTime).Seconds()) 206 return nil 207} 208 209func (r *Resyncer) doResync(ctx context.Context, repoDid syntax.DID) (bool, error) { 210 // ctx, span := tracer.Start(ctx, "doResync") 211 // span.SetAttributes(attribute.String("aturi", repoAt)) 212 // defer span.End() 213 214 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 215 if err != nil { 216 return false, fmt.Errorf("failed to get repo: %w", err) 217 } 218 if repo == nil { // untracked repo, skip 219 return false, nil 220 } 221 222 r.knotBackoffMu.RLock() 223 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain] 224 r.knotBackoffMu.RUnlock() 225 if inBackoff && time.Now().Before(backoffUntil) { 226 return false, nil 227 } 228 229 // HACK: check knot reachability with short timeout before running actual fetch. 230 // This is crucial as git-cli doesn't support http connection timeout. 231 // `http.lowSpeedTime` is only applied _after_ the connection. 232 if err := r.checkKnotReachability(ctx, repo); err != nil { 233 if isRateLimitError(err) { 234 r.knotBackoffMu.Lock() 235 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second) 236 r.knotBackoffMu.Unlock() 237 return false, nil 238 } 239 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online 240 return false, fmt.Errorf("knot unreachable: %w", err) 241 } 242 243 timeout := r.repoFetchTimeout 244 if repo.RetryAfter == -1 { 245 timeout = r.manualResyncTimeout 246 } 247 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 248 defer cancel() 249 250 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 251 return false, err 252 } 253 254 // queue repo_stats_update job 255 r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()}) 256 257 // repo.GitRev = <processed git.refUpdate revision> 258 // repo.RepoSha = <sha256 sum of git refs> 259 repo.State = models.RepoStateActive 260 repo.ErrorMsg = "" 261 repo.RetryCount = 0 262 repo.RetryAfter = 0 263 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 264 return false, fmt.Errorf("updating repo state to active %w", err) 265 } 266 return true, nil 267} 268 269type knotStatusError struct { 270 StatusCode int 271} 272 273func (ke *knotStatusError) Error() string { 274 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode) 275} 276 277func isRateLimitError(err error) bool { 278 var knotErr *knotStatusError 279 if errors.As(err, &knotErr) { 280 return knotErr.StatusCode == http.StatusTooManyRequests 281 } 282 return false 283} 284 285// checkKnotReachability checks if Knot is reachable and is valid git remote server 286func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error { 287 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL) 288 if err != nil { 289 return err 290 } 291 292 repoUrl += "/info/refs?service=git-upload-pack" 293 294 r.logger.Debug("checking knot reachability", "url", repoUrl) 295 296 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 297 if err != nil { 298 return err 299 } 300 req.Header.Set("User-Agent", "git/2.x") 301 req.Header.Set("Accept", "*/*") 302 303 resp, err := r.httpClient.Do(req) 304 if err != nil { 305 var uerr *url.Error 306 if errors.As(err, &uerr) { 307 return fmt.Errorf("request failed: %w", uerr.Unwrap()) 308 } 309 return fmt.Errorf("request failed: %w", err) 310 } 311 defer resp.Body.Close() 312 313 if resp.StatusCode != http.StatusOK { 314 return &knotStatusError{resp.StatusCode} 315 } 316 317 // check if target is git server 318 ct := resp.Header.Get("Content-Type") 319 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 320 return fmt.Errorf("unexpected content-type: %s", ct) 321 } 322 323 return nil 324} 325 326func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error { 327 r.logger.Debug("handleResyncFailure", "at_uri", repoDid, "err", err) 328 var state models.RepoState 329 var errMsg string 330 if err == nil { 331 state = models.RepoStateDesynchronized 332 errMsg = "" 333 } else { 334 state = models.RepoStateError 335 errMsg = err.Error() 336 } 337 338 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 339 if err != nil { 340 return fmt.Errorf("failed to get repo: %w", err) 341 } 342 if repo == nil { 343 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoDid) 344 } 345 346 // start a 1 min & go up to 1 hr between retries 347 var retryCount = repo.RetryCount + 1 348 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 349 350 // remove null bytes 351 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 352 353 repo.State = state 354 repo.ErrorMsg = errMsg 355 repo.RetryCount = retryCount 356 repo.RetryAfter = retryAfter 357 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 358 return fmt.Errorf("failed to update repo state: %w", err) 359 } 360 return nil 361} 362 363func backoff(retries int, max int) time.Duration { 364 dur := min(1<<retries, max) 365 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 366 return time.Second*time.Duration(dur) + jitter 367}