Monorepo for Tangled
tangled.org
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "net/url"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "tangled.org/core/knotmirror/config"
18 "tangled.org/core/knotmirror/db"
19 "tangled.org/core/knotmirror/knotstream"
20 "tangled.org/core/knotmirror/models"
21 "tangled.org/core/log"
22)
23
24type Resyncer struct {
25 logger *slog.Logger
26 db *sql.DB
27 gitm GitMirrorManager
28 cfg *config.Config
29 indexer *knotstream.ParallelScheduler
30
31 claimJobMu sync.Mutex
32
33 runningJobs map[syntax.ATURI]context.CancelFunc
34 runningJobsMu sync.Mutex
35
36 repoFetchTimeout time.Duration
37 manualResyncTimeout time.Duration
38 parallelism int
39
40 knotBackoff map[string]time.Time
41 knotBackoffMu sync.RWMutex
42
43 httpClient *http.Client
44}
45
46func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer {
47 return &Resyncer{
48 logger: log.SubLogger(l, "resyncer"),
49 db: db,
50 gitm: gitm,
51 cfg: cfg,
52 indexer: indexer,
53
54 runningJobs: make(map[syntax.ATURI]context.CancelFunc),
55
56 repoFetchTimeout: cfg.GitRepoFetchTimeout,
57 manualResyncTimeout: 30 * time.Minute,
58 parallelism: cfg.ResyncParallelism,
59
60 knotBackoff: make(map[string]time.Time),
61
62 httpClient: &http.Client{Timeout: 30 * time.Second},
63 }
64}
65
66func (r *Resyncer) Start(ctx context.Context) {
67 for i := 0; i < r.parallelism; i++ {
68 go r.runResyncWorker(ctx, i)
69 }
70}
71
72func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
73 l := r.logger.With("worker", workerID)
74 for {
75 select {
76 case <-ctx.Done():
77 l.Info("resync worker shutting down", "error", ctx.Err())
78 return
79 default:
80 }
81 repoAt, found, err := r.claimResyncJob(ctx)
82 if err != nil {
83 l.Error("failed to claim resync job", "error", err)
84 time.Sleep(time.Second)
85 continue
86 }
87 if !found {
88 time.Sleep(time.Second)
89 continue
90 }
91 l.Info("processing resync", "aturi", repoAt)
92 if err := r.resyncRepo(ctx, repoAt); err != nil {
93 l.Error("resync failed", "aturi", repoAt, "error", err)
94 }
95 }
96}
97
98func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) {
99 r.runningJobsMu.Lock()
100 defer r.runningJobsMu.Unlock()
101
102 if _, exists := r.runningJobs[repo]; exists {
103 return
104 }
105 r.runningJobs[repo] = cancel
106}
107
108func (r *Resyncer) unregisterRunning(repo syntax.ATURI) {
109 r.runningJobsMu.Lock()
110 defer r.runningJobsMu.Unlock()
111
112 delete(r.runningJobs, repo)
113}
114
115func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) {
116 r.runningJobsMu.Lock()
117 defer r.runningJobsMu.Unlock()
118
119 cancel, ok := r.runningJobs[repo]
120 if !ok {
121 return
122 }
123 delete(r.runningJobs, repo)
124 cancel()
125}
126
127// TriggerResyncJob manually triggers the resync job
128func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error {
129 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
130 if err != nil {
131 return fmt.Errorf("failed to get repo: %w", err)
132 }
133 if repo == nil {
134 return fmt.Errorf("repo not found: %s", repoAt)
135 }
136
137 if repo.State == models.RepoStateResyncing {
138 return fmt.Errorf("repo already resyncing")
139 }
140
141 repo.State = models.RepoStatePending
142 repo.RetryAfter = -1 // resyncer will prioritize this
143
144 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
145 return fmt.Errorf("updating repo state to pending %w", err)
146 }
147 return nil
148}
149
150func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
151 // use mutex to prevent duplicated jobs
152 r.claimJobMu.Lock()
153 defer r.claimJobMu.Unlock()
154
155 var repoAt syntax.ATURI
156 now := time.Now().Unix()
157 if err := r.db.QueryRowContext(ctx,
158 `update repos
159 set state = $1
160 where at_uri = (
161 select at_uri from repos
162 where state in ($2, $3, $4)
163 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
164 order by
165 (retry_after = -1) desc,
166 (retry_after = 0) desc,
167 retry_after
168 limit 1
169 )
170 returning at_uri
171 `,
172 models.RepoStateResyncing,
173 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
174 now,
175 ).Scan(&repoAt); err != nil {
176 if errors.Is(err, sql.ErrNoRows) {
177 return "", false, nil
178 }
179 return "", false, err
180 }
181
182 return repoAt, true, nil
183}
184
185func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
186 // ctx, span := tracer.Start(ctx, "resyncRepo")
187 // span.SetAttributes(attribute.String("aturi", repoAt))
188 // defer span.End()
189
190 resyncsStarted.Inc()
191 startTime := time.Now()
192
193 jobCtx, cancel := context.WithCancel(ctx)
194 r.registerRunning(repoAt, cancel)
195 defer r.unregisterRunning(repoAt)
196
197 success, err := r.doResync(jobCtx, repoAt)
198 if !success {
199 resyncsFailed.Inc()
200 resyncDuration.Observe(time.Since(startTime).Seconds())
201 return r.handleResyncFailure(ctx, repoAt, err)
202 }
203
204 resyncsCompleted.Inc()
205 resyncDuration.Observe(time.Since(startTime).Seconds())
206 return nil
207}
208
209func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
210 // ctx, span := tracer.Start(ctx, "doResync")
211 // span.SetAttributes(attribute.String("aturi", repoAt))
212 // defer span.End()
213
214 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
215 if err != nil {
216 return false, fmt.Errorf("failed to get repo: %w", err)
217 }
218 if repo == nil { // untracked repo, skip
219 return false, nil
220 }
221
222 r.knotBackoffMu.RLock()
223 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain]
224 r.knotBackoffMu.RUnlock()
225 if inBackoff && time.Now().Before(backoffUntil) {
226 return false, nil
227 }
228
229 // HACK: check knot reachability with short timeout before running actual fetch.
230 // This is crucial as git-cli doesn't support http connection timeout.
231 // `http.lowSpeedTime` is only applied _after_ the connection.
232 if err := r.checkKnotReachability(ctx, repo); err != nil {
233 if isRateLimitError(err) {
234 r.knotBackoffMu.Lock()
235 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second)
236 r.knotBackoffMu.Unlock()
237 return false, nil
238 }
239 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online
240 return false, fmt.Errorf("knot unreachable: %w", err)
241 }
242
243 timeout := r.repoFetchTimeout
244 if repo.RetryAfter == -1 {
245 timeout = r.manualResyncTimeout
246 }
247 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
248 defer cancel()
249
250 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
251 return false, err
252 }
253
254 // queue repo_stats_update job
255 r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()})
256
257 // repo.GitRev = <processed git.refUpdate revision>
258 // repo.RepoSha = <sha256 sum of git refs>
259 repo.State = models.RepoStateActive
260 repo.ErrorMsg = ""
261 repo.RetryCount = 0
262 repo.RetryAfter = 0
263 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
264 return false, fmt.Errorf("updating repo state to active %w", err)
265 }
266 return true, nil
267}
268
269type knotStatusError struct {
270 StatusCode int
271}
272
273func (ke *knotStatusError) Error() string {
274 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode)
275}
276
277func isRateLimitError(err error) bool {
278 var knotErr *knotStatusError
279 if errors.As(err, &knotErr) {
280 return knotErr.StatusCode == http.StatusTooManyRequests
281 }
282 return false
283}
284
285// checkKnotReachability checks if Knot is reachable and is valid git remote server
286func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error {
287 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL)
288 if err != nil {
289 return err
290 }
291
292 repoUrl += "/info/refs?service=git-upload-pack"
293
294 r.logger.Debug("checking knot reachability", "url", repoUrl)
295
296 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil)
297 if err != nil {
298 return err
299 }
300 req.Header.Set("User-Agent", "git/2.x")
301 req.Header.Set("Accept", "*/*")
302
303 resp, err := r.httpClient.Do(req)
304 if err != nil {
305 var uerr *url.Error
306 if errors.As(err, &uerr) {
307 return fmt.Errorf("request failed: %w", uerr.Unwrap())
308 }
309 return fmt.Errorf("request failed: %w", err)
310 }
311 defer resp.Body.Close()
312
313 if resp.StatusCode != http.StatusOK {
314 return &knotStatusError{resp.StatusCode}
315 }
316
317 // check if target is git server
318 ct := resp.Header.Get("Content-Type")
319 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") {
320 return fmt.Errorf("unexpected content-type: %s", ct)
321 }
322
323 return nil
324}
325
326func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
327 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
328 var state models.RepoState
329 var errMsg string
330 if err == nil {
331 state = models.RepoStateDesynchronized
332 errMsg = ""
333 } else {
334 state = models.RepoStateError
335 errMsg = err.Error()
336 }
337
338 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
339 if err != nil {
340 return fmt.Errorf("failed to get repo: %w", err)
341 }
342 if repo == nil {
343 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
344 }
345
346 // start a 1 min & go up to 1 hr between retries
347 var retryCount = repo.RetryCount + 1
348 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
349
350 // remove null bytes
351 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
352
353 repo.State = state
354 repo.ErrorMsg = errMsg
355 repo.RetryCount = retryCount
356 repo.RetryAfter = retryAfter
357 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
358 return fmt.Errorf("failed to update repo state: %w", err)
359 }
360 return nil
361}
362
363func backoff(retries int, max int) time.Duration {
364 dur := min(1<<retries, max)
365 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
366 return time.Second*time.Duration(dur) + jitter
367}