Monorepo for Tangled
tangled.org
1package knotmirror
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "math/rand"
12 "net/http"
13 "net/url"
14 "strings"
15 "sync"
16 "time"
17
18 "github.com/bluesky-social/indigo/atproto/syntax"
19 "tangled.org/core/knotmirror/config"
20 "tangled.org/core/knotmirror/db"
21 "tangled.org/core/knotmirror/knotstream"
22 "tangled.org/core/knotmirror/models"
23 "tangled.org/core/log"
24)
25
26type Resyncer struct {
27 logger *slog.Logger
28 db *sql.DB
29 gitm GitMirrorManager
30 cfg *config.Config
31 indexer *knotstream.ParallelScheduler
32
33 claimJobMu sync.Mutex
34
35 runningJobs map[syntax.DID]context.CancelFunc
36 runningJobsMu sync.Mutex
37
38 repoFetchTimeout time.Duration
39 manualResyncTimeout time.Duration
40 parallelism int
41
42 knotBackoff map[string]time.Time
43 knotBackoffMu sync.RWMutex
44
45 httpClient *http.Client
46}
47
48func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer {
49 return &Resyncer{
50 logger: log.SubLogger(l, "resyncer"),
51 db: db,
52 gitm: gitm,
53 cfg: cfg,
54 indexer: indexer,
55
56 runningJobs: make(map[syntax.DID]context.CancelFunc),
57
58 repoFetchTimeout: cfg.GitRepoFetchTimeout,
59 manualResyncTimeout: 30 * time.Minute,
60 parallelism: cfg.ResyncParallelism,
61
62 knotBackoff: make(map[string]time.Time),
63
64 httpClient: &http.Client{Timeout: 30 * time.Second},
65 }
66}
67
68func (r *Resyncer) Start(ctx context.Context) {
69 for i := 0; i < r.parallelism; i++ {
70 go r.runResyncWorker(ctx, i)
71 }
72}
73
74func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
75 l := r.logger.With("worker", workerID)
76 for {
77 select {
78 case <-ctx.Done():
79 l.Info("resync worker shutting down", "error", ctx.Err())
80 return
81 default:
82 }
83 repoDid, found, err := r.claimResyncJob(ctx)
84 if err != nil {
85 l.Error("failed to claim resync job", "error", err)
86 time.Sleep(time.Second)
87 continue
88 }
89 if !found {
90 time.Sleep(time.Second)
91 continue
92 }
93 l.Info("processing resync", "did", repoDid)
94 if err := r.resyncRepo(ctx, repoDid); err != nil {
95 l.Error("resync failed", "did", repoDid, "error", err)
96 }
97 }
98}
99
100func (r *Resyncer) registerRunning(repo syntax.DID, cancel context.CancelFunc) {
101 r.runningJobsMu.Lock()
102 defer r.runningJobsMu.Unlock()
103
104 if _, exists := r.runningJobs[repo]; exists {
105 return
106 }
107 r.runningJobs[repo] = cancel
108}
109
110func (r *Resyncer) unregisterRunning(repo syntax.DID) {
111 r.runningJobsMu.Lock()
112 defer r.runningJobsMu.Unlock()
113
114 delete(r.runningJobs, repo)
115}
116
117func (r *Resyncer) CancelResyncJob(repo syntax.DID) {
118 r.runningJobsMu.Lock()
119 defer r.runningJobsMu.Unlock()
120
121 cancel, ok := r.runningJobs[repo]
122 if !ok {
123 return
124 }
125 delete(r.runningJobs, repo)
126 cancel()
127}
128
129// TriggerResyncJob manually triggers the resync job
130func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoDid syntax.DID) error {
131 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid)
132 if err != nil {
133 return fmt.Errorf("failed to get repo: %w", err)
134 }
135 if repo == nil {
136 return fmt.Errorf("repo not found: %s", repoDid)
137 }
138
139 if repo.State == models.RepoStateResyncing {
140 return fmt.Errorf("repo already resyncing")
141 }
142
143 repo.State = models.RepoStatePending
144 repo.RetryAfter = -1 // resyncer will prioritize this
145
146 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
147 return fmt.Errorf("updating repo state to pending %w", err)
148 }
149 return nil
150}
151
152func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.DID, bool, error) {
153 // use mutex to prevent duplicated jobs
154 r.claimJobMu.Lock()
155 defer r.claimJobMu.Unlock()
156
157 var repoDid syntax.DID
158 now := time.Now().Unix()
159 if err := r.db.QueryRowContext(ctx,
160 `update repos
161 set state = $1
162 where repo_did = (
163 select repo_did from repos
164 where state in ($2, $3, $4)
165 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
166 order by
167 (retry_after = -1) desc,
168 (retry_after = 0) desc,
169 retry_after
170 limit 1
171 )
172 returning repo_did
173 `,
174 models.RepoStateResyncing,
175 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
176 now,
177 ).Scan(&repoDid); err != nil {
178 if errors.Is(err, sql.ErrNoRows) {
179 return "", false, nil
180 }
181 return "", false, err
182 }
183
184 return repoDid, true, nil
185}
186
187func (r *Resyncer) resyncRepo(ctx context.Context, repoDid syntax.DID) error {
188 // ctx, span := tracer.Start(ctx, "resyncRepo")
189 // span.SetAttributes(attribute.String("aturi", repoAt))
190 // defer span.End()
191
192 resyncsStarted.Inc()
193 startTime := time.Now()
194
195 jobCtx, cancel := context.WithCancel(ctx)
196 r.registerRunning(repoDid, cancel)
197 defer r.unregisterRunning(repoDid)
198
199 success, err := r.doResync(jobCtx, repoDid)
200 if !success {
201 resyncsFailed.Inc()
202 resyncDuration.Observe(time.Since(startTime).Seconds())
203 return r.handleResyncFailure(ctx, repoDid, err)
204 }
205
206 resyncsCompleted.Inc()
207 resyncDuration.Observe(time.Since(startTime).Seconds())
208 return nil
209}
210
211func (r *Resyncer) doResync(ctx context.Context, repoDid syntax.DID) (bool, error) {
212 // ctx, span := tracer.Start(ctx, "doResync")
213 // span.SetAttributes(attribute.String("aturi", repoAt))
214 // defer span.End()
215
216 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid)
217 if err != nil {
218 return false, fmt.Errorf("failed to get repo: %w", err)
219 }
220 if repo == nil { // untracked repo, skip
221 return false, nil
222 }
223
224 r.knotBackoffMu.RLock()
225 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain]
226 r.knotBackoffMu.RUnlock()
227 if inBackoff && time.Now().Before(backoffUntil) {
228 return false, nil
229 }
230
231 // HACK: check knot reachability with short timeout before running actual fetch.
232 // This is crucial as git-cli doesn't support http connection timeout.
233 // `http.lowSpeedTime` is only applied _after_ the connection.
234 format, err := r.checkKnot(ctx, repo)
235 if err != nil {
236 if isRateLimitError(err) {
237 r.knotBackoffMu.Lock()
238 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second)
239 r.knotBackoffMu.Unlock()
240 return false, nil
241 }
242 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online
243 return false, fmt.Errorf("knot unreachable: %w", err)
244 }
245
246 if format == models.ObjectFormatSHA256 {
247 return r.suspendUnsupported(ctx, repo)
248 }
249
250 timeout := r.repoFetchTimeout
251 if repo.RetryAfter == -1 {
252 timeout = r.manualResyncTimeout
253 }
254 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
255 defer cancel()
256
257 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
258 return false, err
259 }
260
261 // queue repo_stats_update job
262 r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()})
263
264 // repo.GitRev = <processed git.refUpdate revision>
265 // repo.RepoSha = <sha256 sum of git refs>
266 repo.State = models.RepoStateActive
267 repo.ErrorMsg = ""
268 repo.RetryCount = 0
269 repo.RetryAfter = 0
270 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
271 return false, fmt.Errorf("updating repo state to active %w", err)
272 }
273 return true, nil
274}
275
276type knotStatusError struct {
277 StatusCode int
278}
279
280func (ke *knotStatusError) Error() string {
281 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode)
282}
283
284func isRateLimitError(err error) bool {
285 var knotErr *knotStatusError
286 if errors.As(err, &knotErr) {
287 return knotErr.StatusCode == http.StatusTooManyRequests
288 }
289 return false
290}
291
292func (r *Resyncer) checkKnot(ctx context.Context, repo *models.Repo) (models.ObjectFormat, error) {
293 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL)
294 if err != nil {
295 return "", err
296 }
297
298 repoUrl += "/info/refs?service=git-upload-pack"
299
300 r.logger.Debug("checking knot reachability", "url", repoUrl)
301
302 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil)
303 if err != nil {
304 return "", err
305 }
306 req.Header.Set("User-Agent", "git/2.x")
307 req.Header.Set("Accept", "*/*")
308
309 resp, err := r.httpClient.Do(req)
310 if err != nil {
311 var uerr *url.Error
312 if errors.As(err, &uerr) {
313 return "", fmt.Errorf("request failed: %w", uerr.Unwrap())
314 }
315 return "", fmt.Errorf("request failed: %w", err)
316 }
317 defer resp.Body.Close()
318
319 if resp.StatusCode != http.StatusOK {
320 return "", &knotStatusError{resp.StatusCode}
321 }
322
323 // check if target is git server
324 ct := resp.Header.Get("Content-Type")
325 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") {
326 return "", fmt.Errorf("unexpected content-type: %s", ct)
327 }
328
329 advertisement, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
330 if err != nil {
331 return "", fmt.Errorf("reading upload-pack advertisement: %w", err)
332 }
333 if bytes.Contains(advertisement, []byte("object-format=sha256")) {
334 return models.ObjectFormatSHA256, nil
335 }
336
337 return models.ObjectFormatSHA1, nil
338}
339
340func (r *Resyncer) suspendUnsupported(ctx context.Context, repo *models.Repo) (bool, error) {
341 if err := r.gitm.Delete(repo); err != nil {
342 r.logger.Warn("failed to remove local clone of suspended repo", "did", repo.RepoDid, "err", err)
343 }
344
345 repo.State = models.RepoStateSuspended
346 repo.ErrorMsg = "unsupported sha256 object format"
347 repo.RetryCount = 0
348 repo.RetryAfter = 0
349 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
350 return false, fmt.Errorf("suspending sha256 repo: %w", err)
351 }
352
353 r.logger.Info("suspended sha256 repo, reads forwarded to knot", "did", repo.RepoDid, "knot", repo.KnotDomain)
354 return true, nil
355}
356
357func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error {
358 r.logger.Debug("handleResyncFailure", "at_uri", repoDid, "err", err)
359 var state models.RepoState
360 var errMsg string
361 if err == nil {
362 state = models.RepoStateDesynchronized
363 errMsg = ""
364 } else {
365 state = models.RepoStateError
366 errMsg = err.Error()
367 }
368
369 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid)
370 if err != nil {
371 return fmt.Errorf("failed to get repo: %w", err)
372 }
373 if repo == nil {
374 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoDid)
375 }
376
377 // start a 1 min & go up to 1 hr between retries
378 var retryCount = repo.RetryCount + 1
379 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
380
381 // remove null bytes
382 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
383
384 repo.State = state
385 repo.ErrorMsg = errMsg
386 repo.RetryCount = retryCount
387 repo.RetryAfter = retryAfter
388 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
389 return fmt.Errorf("failed to update repo state: %w", err)
390 }
391 return nil
392}
393
394func backoff(retries int, max int) time.Duration {
395 dur := min(1<<retries, max)
396 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
397 return time.Second*time.Duration(dur) + jitter
398}