Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package knotmirror 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "log/slog" 12 "math/rand" 13 "net/http" 14 "net/url" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/bluesky-social/indigo/atproto/syntax" 20 "tangled.org/core/knotmirror/config" 21 "tangled.org/core/knotmirror/db" 22 "tangled.org/core/knotmirror/knotstream" 23 "tangled.org/core/knotmirror/models" 24 "tangled.org/core/log" 25) 26 27type Resyncer struct { 28 logger *slog.Logger 29 db *sql.DB 30 gitm GitMirrorManager 31 cfg *config.Config 32 indexer *knotstream.ParallelScheduler 33 34 claimJobMu sync.Mutex 35 36 runningJobs map[syntax.DID]context.CancelFunc 37 runningJobsMu sync.Mutex 38 39 repoFetchTimeout time.Duration 40 manualResyncTimeout time.Duration 41 parallelism int 42 43 knotBackoff map[string]time.Time 44 knotBackoffMu sync.RWMutex 45 46 httpClient *http.Client 47} 48 49func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer { 50 return &Resyncer{ 51 logger: log.SubLogger(l, "resyncer"), 52 db: db, 53 gitm: gitm, 54 cfg: cfg, 55 indexer: indexer, 56 57 runningJobs: make(map[syntax.DID]context.CancelFunc), 58 59 repoFetchTimeout: cfg.GitRepoFetchTimeout, 60 manualResyncTimeout: 30 * time.Minute, 61 parallelism: cfg.ResyncParallelism, 62 63 knotBackoff: make(map[string]time.Time), 64 65 httpClient: &http.Client{Timeout: 30 * time.Second}, 66 } 67} 68 69func (r *Resyncer) Start(ctx context.Context) { 70 for i := 0; i < r.parallelism; i++ { 71 go r.runResyncWorker(ctx, i) 72 } 73} 74 75func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 76 l := r.logger.With("worker", workerID) 77 for { 78 select { 79 case <-ctx.Done(): 80 l.Info("resync worker shutting down", "error", ctx.Err()) 81 return 82 default: 83 } 84 repoDid, found, err := r.claimResyncJob(ctx) 85 if err != nil { 86 l.Error("failed to claim resync job", "error", err) 87 time.Sleep(time.Second) 88 continue 89 } 90 if !found { 91 time.Sleep(time.Second) 92 continue 93 } 94 l.Info("processing resync", "did", repoDid) 95 if err := r.resyncRepo(ctx, repoDid); err != nil { 96 l.Error("resync failed", "did", repoDid, "error", err) 97 } 98 } 99} 100 101func (r *Resyncer) registerRunning(repo syntax.DID, cancel context.CancelFunc) { 102 r.runningJobsMu.Lock() 103 defer r.runningJobsMu.Unlock() 104 105 if _, exists := r.runningJobs[repo]; exists { 106 return 107 } 108 r.runningJobs[repo] = cancel 109} 110 111func (r *Resyncer) unregisterRunning(repo syntax.DID) { 112 r.runningJobsMu.Lock() 113 defer r.runningJobsMu.Unlock() 114 115 delete(r.runningJobs, repo) 116} 117 118func (r *Resyncer) CancelResyncJob(repo syntax.DID) { 119 r.runningJobsMu.Lock() 120 defer r.runningJobsMu.Unlock() 121 122 cancel, ok := r.runningJobs[repo] 123 if !ok { 124 return 125 } 126 delete(r.runningJobs, repo) 127 cancel() 128} 129 130// TriggerResyncJob manually triggers the resync job 131func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoDid syntax.DID) error { 132 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 133 if err != nil { 134 return fmt.Errorf("failed to get repo: %w", err) 135 } 136 if repo == nil { 137 return fmt.Errorf("repo not found: %s", repoDid) 138 } 139 140 if repo.State == models.RepoStateResyncing { 141 return fmt.Errorf("repo already resyncing") 142 } 143 144 repo.State = models.RepoStatePending 145 repo.RetryAfter = -1 // resyncer will prioritize this 146 147 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 148 return fmt.Errorf("updating repo state to pending %w", err) 149 } 150 return nil 151} 152 153func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.DID, bool, error) { 154 // use mutex to prevent duplicated jobs 155 r.claimJobMu.Lock() 156 defer r.claimJobMu.Unlock() 157 158 var repoDid syntax.DID 159 now := time.Now().Unix() 160 if err := r.db.QueryRowContext(ctx, 161 `update repos 162 set state = $1 163 where repo_did = ( 164 select repo_did from repos 165 where state in ($2, $3, $4) 166 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 167 order by 168 (retry_after = -1) desc, 169 (retry_after = 0) desc, 170 retry_after 171 limit 1 172 ) 173 returning repo_did 174 `, 175 models.RepoStateResyncing, 176 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 177 now, 178 ).Scan(&repoDid); err != nil { 179 if errors.Is(err, sql.ErrNoRows) { 180 return "", false, nil 181 } 182 return "", false, err 183 } 184 185 return repoDid, true, nil 186} 187 188func (r *Resyncer) resyncRepo(ctx context.Context, repoDid syntax.DID) error { 189 // ctx, span := tracer.Start(ctx, "resyncRepo") 190 // span.SetAttributes(attribute.String("aturi", repoAt)) 191 // defer span.End() 192 193 resyncsStarted.Inc() 194 startTime := time.Now() 195 196 jobCtx, cancel := context.WithCancel(ctx) 197 r.registerRunning(repoDid, cancel) 198 defer r.unregisterRunning(repoDid) 199 200 success, err := r.doResync(jobCtx, repoDid) 201 if !success { 202 resyncsFailed.Inc() 203 resyncDuration.Observe(time.Since(startTime).Seconds()) 204 return r.handleResyncFailure(ctx, repoDid, err) 205 } 206 207 resyncsCompleted.Inc() 208 resyncDuration.Observe(time.Since(startTime).Seconds()) 209 return nil 210} 211 212func (r *Resyncer) doResync(ctx context.Context, repoDid syntax.DID) (bool, error) { 213 // ctx, span := tracer.Start(ctx, "doResync") 214 // span.SetAttributes(attribute.String("aturi", repoAt)) 215 // defer span.End() 216 217 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 218 if err != nil { 219 return false, fmt.Errorf("failed to get repo: %w", err) 220 } 221 if repo == nil { // untracked repo, skip 222 return false, nil 223 } 224 225 r.knotBackoffMu.RLock() 226 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain] 227 r.knotBackoffMu.RUnlock() 228 if inBackoff && time.Now().Before(backoffUntil) { 229 return false, nil 230 } 231 232 // HACK: check knot reachability with short timeout before running actual fetch. 233 // This is crucial as git-cli doesn't support http connection timeout. 234 // `http.lowSpeedTime` is only applied _after_ the connection. 235 format, err := r.checkKnot(ctx, repo) 236 if err != nil { 237 if isRateLimitError(err) { 238 r.knotBackoffMu.Lock() 239 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second) 240 r.knotBackoffMu.Unlock() 241 return false, nil 242 } 243 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online 244 return false, fmt.Errorf("knot unreachable: %w", err) 245 } 246 247 if format == models.ObjectFormatSHA256 { 248 return r.suspendUnsupported(ctx, repo) 249 } 250 251 timeout := r.repoFetchTimeout 252 if repo.RetryAfter == -1 { 253 timeout = r.manualResyncTimeout 254 } 255 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 256 defer cancel() 257 258 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 259 return false, err 260 } 261 262 // request index to zoekt server 263 // NOTE: indexing after full git resync is bad design. We are doing _after_ the sync because knotstream event doesn't include repository refs. 264 // NOTE: and zoekt indexer should directly subscribe to the knot. remove this when we have knotrelay. 265 if r.cfg.Search.ZoektUrl != "" { 266 go func() { 267 idxCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 268 defer cancel() 269 defaultBranch, err := r.gitm.DefaultBranch(idxCtx, repo) 270 if err != nil { 271 r.logger.Warn("resolving default branch for indexing failed", "did", repo.RepoDid, "error", err) 272 return 273 } 274 if err := r.requestIndex(idxCtx, repo.RepoDid, []branch{defaultBranch}); err != nil { 275 r.logger.Warn("requesting zoekt index failed", "did", repo.RepoDid, "err", err) 276 } 277 }() 278 } 279 280 // queue repo_stats_update job 281 r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()}) 282 283 // repo.GitRev = <processed git.refUpdate revision> 284 // repo.RepoSha = <sha256 sum of git refs> 285 repo.State = models.RepoStateActive 286 repo.ErrorMsg = "" 287 repo.RetryCount = 0 288 repo.RetryAfter = 0 289 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 290 return false, fmt.Errorf("updating repo state to active %w", err) 291 } 292 return true, nil 293} 294 295type knotStatusError struct { 296 StatusCode int 297} 298 299func (ke *knotStatusError) Error() string { 300 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode) 301} 302 303func isRateLimitError(err error) bool { 304 var knotErr *knotStatusError 305 if errors.As(err, &knotErr) { 306 return knotErr.StatusCode == http.StatusTooManyRequests 307 } 308 return false 309} 310 311func (r *Resyncer) checkKnot(ctx context.Context, repo *models.Repo) (models.ObjectFormat, error) { 312 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL) 313 if err != nil { 314 return "", err 315 } 316 317 repoUrl += "/info/refs?service=git-upload-pack" 318 319 r.logger.Debug("checking knot reachability", "url", repoUrl) 320 321 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 322 if err != nil { 323 return "", err 324 } 325 req.Header.Set("User-Agent", "git/2.x") 326 req.Header.Set("Accept", "*/*") 327 328 resp, err := r.httpClient.Do(req) 329 if err != nil { 330 var uerr *url.Error 331 if errors.As(err, &uerr) { 332 return "", fmt.Errorf("request failed: %w", uerr.Unwrap()) 333 } 334 return "", fmt.Errorf("request failed: %w", err) 335 } 336 defer resp.Body.Close() 337 338 if resp.StatusCode != http.StatusOK { 339 return "", &knotStatusError{resp.StatusCode} 340 } 341 342 // check if target is git server 343 ct := resp.Header.Get("Content-Type") 344 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 345 return "", fmt.Errorf("unexpected content-type: %s", ct) 346 } 347 348 advertisement, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) 349 if err != nil { 350 return "", fmt.Errorf("reading upload-pack advertisement: %w", err) 351 } 352 if bytes.Contains(advertisement, []byte("object-format=sha256")) { 353 return models.ObjectFormatSHA256, nil 354 } 355 356 return models.ObjectFormatSHA1, nil 357} 358 359func (r *Resyncer) suspendUnsupported(ctx context.Context, repo *models.Repo) (bool, error) { 360 if err := r.gitm.Delete(repo); err != nil { 361 r.logger.Warn("failed to remove local clone of suspended repo", "did", repo.RepoDid, "err", err) 362 } 363 364 repo.State = models.RepoStateSuspended 365 repo.ErrorMsg = "unsupported sha256 object format" 366 repo.RetryCount = 0 367 repo.RetryAfter = 0 368 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 369 return false, fmt.Errorf("suspending sha256 repo: %w", err) 370 } 371 372 r.logger.Info("suspended sha256 repo, reads forwarded to knot", "did", repo.RepoDid, "knot", repo.KnotDomain) 373 return true, nil 374} 375 376func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error { 377 r.logger.Debug("handleResyncFailure", "at_uri", repoDid, "err", err) 378 var state models.RepoState 379 var errMsg string 380 if err == nil { 381 state = models.RepoStateDesynchronized 382 errMsg = "" 383 } else { 384 state = models.RepoStateError 385 errMsg = err.Error() 386 } 387 388 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 389 if err != nil { 390 return fmt.Errorf("failed to get repo: %w", err) 391 } 392 if repo == nil { 393 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoDid) 394 } 395 396 // start a 1 min & go up to 1 hr between retries 397 var retryCount = repo.RetryCount + 1 398 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 399 400 // remove null bytes 401 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 402 403 repo.State = state 404 repo.ErrorMsg = errMsg 405 repo.RetryCount = retryCount 406 repo.RetryAfter = retryAfter 407 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 408 return fmt.Errorf("failed to update repo state: %w", err) 409 } 410 return nil 411} 412 413func backoff(retries int, max int) time.Duration { 414 dur := min(1<<retries, max) 415 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 416 return time.Second*time.Duration(dur) + jitter 417} 418 419func (r *Resyncer) requestIndex(ctx context.Context, repoDid syntax.DID, branches []branch) error { 420 r.logger.Info("requesting index", "repo", repoDid, "branches", branches) 421 body, err := json.Marshal(map[string]any{ 422 "repo": repoDid.String(), 423 "branches": branches, 424 }) 425 if err != nil { 426 return fmt.Errorf("marshaling index request: %w", err) 427 } 428 429 endpoint := r.cfg.Search.ZoektUrl + "/indexserver/admin/enqueueIndex" 430 req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) 431 if err != nil { 432 return err 433 } 434 req.Header.Set("Content-Type", "application/json") 435 436 resp, err := r.httpClient.Do(req) 437 if err != nil { 438 return fmt.Errorf("requesting zoekt index: %w", err) 439 } 440 defer resp.Body.Close() 441 442 if resp.StatusCode < 200 || resp.StatusCode >= 300 { 443 return fmt.Errorf("non-ok status: %d", resp.StatusCode) 444 } 445 return nil 446}