Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package knotmirror 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "math/rand" 12 "net/http" 13 "net/url" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/bluesky-social/indigo/atproto/syntax" 19 "tangled.org/core/knotmirror/config" 20 "tangled.org/core/knotmirror/db" 21 "tangled.org/core/knotmirror/knotstream" 22 "tangled.org/core/knotmirror/models" 23 "tangled.org/core/log" 24) 25 26type Resyncer struct { 27 logger *slog.Logger 28 db *sql.DB 29 gitm GitMirrorManager 30 cfg *config.Config 31 indexer *knotstream.ParallelScheduler 32 33 claimJobMu sync.Mutex 34 35 runningJobs map[syntax.DID]context.CancelFunc 36 runningJobsMu sync.Mutex 37 38 repoFetchTimeout time.Duration 39 manualResyncTimeout time.Duration 40 parallelism int 41 42 knotBackoff map[string]time.Time 43 knotBackoffMu sync.RWMutex 44 45 httpClient *http.Client 46} 47 48func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer { 49 return &Resyncer{ 50 logger: log.SubLogger(l, "resyncer"), 51 db: db, 52 gitm: gitm, 53 cfg: cfg, 54 indexer: indexer, 55 56 runningJobs: make(map[syntax.DID]context.CancelFunc), 57 58 repoFetchTimeout: cfg.GitRepoFetchTimeout, 59 manualResyncTimeout: 30 * time.Minute, 60 parallelism: cfg.ResyncParallelism, 61 62 knotBackoff: make(map[string]time.Time), 63 64 httpClient: &http.Client{Timeout: 30 * time.Second}, 65 } 66} 67 68func (r *Resyncer) Start(ctx context.Context) { 69 for i := 0; i < r.parallelism; i++ { 70 go r.runResyncWorker(ctx, i) 71 } 72} 73 74func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 75 l := r.logger.With("worker", workerID) 76 for { 77 select { 78 case <-ctx.Done(): 79 l.Info("resync worker shutting down", "error", ctx.Err()) 80 return 81 default: 82 } 83 repoDid, found, err := r.claimResyncJob(ctx) 84 if err != nil { 85 l.Error("failed to claim resync job", "error", err) 86 time.Sleep(time.Second) 87 continue 88 } 89 if !found { 90 time.Sleep(time.Second) 91 continue 92 } 93 l.Info("processing resync", "did", repoDid) 94 if err := r.resyncRepo(ctx, repoDid); err != nil { 95 l.Error("resync failed", "did", repoDid, "error", err) 96 } 97 } 98} 99 100func (r *Resyncer) registerRunning(repo syntax.DID, cancel context.CancelFunc) { 101 r.runningJobsMu.Lock() 102 defer r.runningJobsMu.Unlock() 103 104 if _, exists := r.runningJobs[repo]; exists { 105 return 106 } 107 r.runningJobs[repo] = cancel 108} 109 110func (r *Resyncer) unregisterRunning(repo syntax.DID) { 111 r.runningJobsMu.Lock() 112 defer r.runningJobsMu.Unlock() 113 114 delete(r.runningJobs, repo) 115} 116 117func (r *Resyncer) CancelResyncJob(repo syntax.DID) { 118 r.runningJobsMu.Lock() 119 defer r.runningJobsMu.Unlock() 120 121 cancel, ok := r.runningJobs[repo] 122 if !ok { 123 return 124 } 125 delete(r.runningJobs, repo) 126 cancel() 127} 128 129// TriggerResyncJob manually triggers the resync job 130func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoDid syntax.DID) error { 131 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 132 if err != nil { 133 return fmt.Errorf("failed to get repo: %w", err) 134 } 135 if repo == nil { 136 return fmt.Errorf("repo not found: %s", repoDid) 137 } 138 139 if repo.State == models.RepoStateResyncing { 140 return fmt.Errorf("repo already resyncing") 141 } 142 143 repo.State = models.RepoStatePending 144 repo.RetryAfter = -1 // resyncer will prioritize this 145 146 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 147 return fmt.Errorf("updating repo state to pending %w", err) 148 } 149 return nil 150} 151 152func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.DID, bool, error) { 153 // use mutex to prevent duplicated jobs 154 r.claimJobMu.Lock() 155 defer r.claimJobMu.Unlock() 156 157 var repoDid syntax.DID 158 now := time.Now().Unix() 159 if err := r.db.QueryRowContext(ctx, 160 `update repos 161 set state = $1 162 where repo_did = ( 163 select repo_did from repos 164 where state in ($2, $3, $4) 165 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 166 order by 167 (retry_after = -1) desc, 168 (retry_after = 0) desc, 169 retry_after 170 limit 1 171 ) 172 returning repo_did 173 `, 174 models.RepoStateResyncing, 175 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 176 now, 177 ).Scan(&repoDid); err != nil { 178 if errors.Is(err, sql.ErrNoRows) { 179 return "", false, nil 180 } 181 return "", false, err 182 } 183 184 return repoDid, true, nil 185} 186 187func (r *Resyncer) resyncRepo(ctx context.Context, repoDid syntax.DID) error { 188 // ctx, span := tracer.Start(ctx, "resyncRepo") 189 // span.SetAttributes(attribute.String("aturi", repoAt)) 190 // defer span.End() 191 192 resyncsStarted.Inc() 193 startTime := time.Now() 194 195 jobCtx, cancel := context.WithCancel(ctx) 196 r.registerRunning(repoDid, cancel) 197 defer r.unregisterRunning(repoDid) 198 199 success, err := r.doResync(jobCtx, repoDid) 200 if !success { 201 resyncsFailed.Inc() 202 resyncDuration.Observe(time.Since(startTime).Seconds()) 203 return r.handleResyncFailure(ctx, repoDid, err) 204 } 205 206 resyncsCompleted.Inc() 207 resyncDuration.Observe(time.Since(startTime).Seconds()) 208 return nil 209} 210 211func (r *Resyncer) doResync(ctx context.Context, repoDid syntax.DID) (bool, error) { 212 // ctx, span := tracer.Start(ctx, "doResync") 213 // span.SetAttributes(attribute.String("aturi", repoAt)) 214 // defer span.End() 215 216 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 217 if err != nil { 218 return false, fmt.Errorf("failed to get repo: %w", err) 219 } 220 if repo == nil { // untracked repo, skip 221 return false, nil 222 } 223 224 r.knotBackoffMu.RLock() 225 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain] 226 r.knotBackoffMu.RUnlock() 227 if inBackoff && time.Now().Before(backoffUntil) { 228 return false, nil 229 } 230 231 // HACK: check knot reachability with short timeout before running actual fetch. 232 // This is crucial as git-cli doesn't support http connection timeout. 233 // `http.lowSpeedTime` is only applied _after_ the connection. 234 format, err := r.checkKnot(ctx, repo) 235 if err != nil { 236 if isRateLimitError(err) { 237 r.knotBackoffMu.Lock() 238 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second) 239 r.knotBackoffMu.Unlock() 240 return false, nil 241 } 242 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online 243 return false, fmt.Errorf("knot unreachable: %w", err) 244 } 245 246 if format == models.ObjectFormatSHA256 { 247 return r.suspendUnsupported(ctx, repo) 248 } 249 250 timeout := r.repoFetchTimeout 251 if repo.RetryAfter == -1 { 252 timeout = r.manualResyncTimeout 253 } 254 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 255 defer cancel() 256 257 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 258 return false, err 259 } 260 261 // queue repo_stats_update job 262 r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()}) 263 264 // repo.GitRev = <processed git.refUpdate revision> 265 // repo.RepoSha = <sha256 sum of git refs> 266 repo.State = models.RepoStateActive 267 repo.ErrorMsg = "" 268 repo.RetryCount = 0 269 repo.RetryAfter = 0 270 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 271 return false, fmt.Errorf("updating repo state to active %w", err) 272 } 273 return true, nil 274} 275 276type knotStatusError struct { 277 StatusCode int 278} 279 280func (ke *knotStatusError) Error() string { 281 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode) 282} 283 284func isRateLimitError(err error) bool { 285 var knotErr *knotStatusError 286 if errors.As(err, &knotErr) { 287 return knotErr.StatusCode == http.StatusTooManyRequests 288 } 289 return false 290} 291 292func (r *Resyncer) checkKnot(ctx context.Context, repo *models.Repo) (models.ObjectFormat, error) { 293 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL) 294 if err != nil { 295 return "", err 296 } 297 298 repoUrl += "/info/refs?service=git-upload-pack" 299 300 r.logger.Debug("checking knot reachability", "url", repoUrl) 301 302 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 303 if err != nil { 304 return "", err 305 } 306 req.Header.Set("User-Agent", "git/2.x") 307 req.Header.Set("Accept", "*/*") 308 309 resp, err := r.httpClient.Do(req) 310 if err != nil { 311 var uerr *url.Error 312 if errors.As(err, &uerr) { 313 return "", fmt.Errorf("request failed: %w", uerr.Unwrap()) 314 } 315 return "", fmt.Errorf("request failed: %w", err) 316 } 317 defer resp.Body.Close() 318 319 if resp.StatusCode != http.StatusOK { 320 return "", &knotStatusError{resp.StatusCode} 321 } 322 323 // check if target is git server 324 ct := resp.Header.Get("Content-Type") 325 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 326 return "", fmt.Errorf("unexpected content-type: %s", ct) 327 } 328 329 advertisement, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) 330 if err != nil { 331 return "", fmt.Errorf("reading upload-pack advertisement: %w", err) 332 } 333 if bytes.Contains(advertisement, []byte("object-format=sha256")) { 334 return models.ObjectFormatSHA256, nil 335 } 336 337 return models.ObjectFormatSHA1, nil 338} 339 340func (r *Resyncer) suspendUnsupported(ctx context.Context, repo *models.Repo) (bool, error) { 341 if err := r.gitm.Delete(repo); err != nil { 342 r.logger.Warn("failed to remove local clone of suspended repo", "did", repo.RepoDid, "err", err) 343 } 344 345 repo.State = models.RepoStateSuspended 346 repo.ErrorMsg = "unsupported sha256 object format" 347 repo.RetryCount = 0 348 repo.RetryAfter = 0 349 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 350 return false, fmt.Errorf("suspending sha256 repo: %w", err) 351 } 352 353 r.logger.Info("suspended sha256 repo, reads forwarded to knot", "did", repo.RepoDid, "knot", repo.KnotDomain) 354 return true, nil 355} 356 357func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error { 358 r.logger.Debug("handleResyncFailure", "at_uri", repoDid, "err", err) 359 var state models.RepoState 360 var errMsg string 361 if err == nil { 362 state = models.RepoStateDesynchronized 363 errMsg = "" 364 } else { 365 state = models.RepoStateError 366 errMsg = err.Error() 367 } 368 369 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 370 if err != nil { 371 return fmt.Errorf("failed to get repo: %w", err) 372 } 373 if repo == nil { 374 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoDid) 375 } 376 377 // start a 1 min & go up to 1 hr between retries 378 var retryCount = repo.RetryCount + 1 379 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 380 381 // remove null bytes 382 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 383 384 repo.State = state 385 repo.ErrorMsg = errMsg 386 repo.RetryCount = retryCount 387 repo.RetryAfter = retryAfter 388 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 389 return fmt.Errorf("failed to update repo state: %w", err) 390 } 391 return nil 392} 393 394func backoff(retries int, max int) time.Duration { 395 dur := min(1<<retries, max) 396 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 397 return time.Second*time.Duration(dur) + jitter 398}