Monorepo for Tangled
tangled.org
1package knotmirror
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "math/rand"
13 "net/http"
14 "net/url"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/bluesky-social/indigo/atproto/syntax"
20 "tangled.org/core/knotmirror/config"
21 "tangled.org/core/knotmirror/db"
22 "tangled.org/core/knotmirror/knotstream"
23 "tangled.org/core/knotmirror/models"
24 "tangled.org/core/log"
25)
26
27type Resyncer struct {
28 logger *slog.Logger
29 db *sql.DB
30 gitm GitMirrorManager
31 cfg *config.Config
32 indexer *knotstream.ParallelScheduler
33
34 claimJobMu sync.Mutex
35
36 runningJobs map[syntax.DID]context.CancelFunc
37 runningJobsMu sync.Mutex
38
39 repoFetchTimeout time.Duration
40 manualResyncTimeout time.Duration
41 parallelism int
42
43 knotBackoff map[string]time.Time
44 knotBackoffMu sync.RWMutex
45
46 httpClient *http.Client
47}
48
49func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer {
50 return &Resyncer{
51 logger: log.SubLogger(l, "resyncer"),
52 db: db,
53 gitm: gitm,
54 cfg: cfg,
55 indexer: indexer,
56
57 runningJobs: make(map[syntax.DID]context.CancelFunc),
58
59 repoFetchTimeout: cfg.GitRepoFetchTimeout,
60 manualResyncTimeout: 30 * time.Minute,
61 parallelism: cfg.ResyncParallelism,
62
63 knotBackoff: make(map[string]time.Time),
64
65 httpClient: &http.Client{Timeout: 30 * time.Second},
66 }
67}
68
69func (r *Resyncer) Start(ctx context.Context) {
70 for i := 0; i < r.parallelism; i++ {
71 go r.runResyncWorker(ctx, i)
72 }
73}
74
75func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
76 l := r.logger.With("worker", workerID)
77 for {
78 select {
79 case <-ctx.Done():
80 l.Info("resync worker shutting down", "error", ctx.Err())
81 return
82 default:
83 }
84 repoDid, found, err := r.claimResyncJob(ctx)
85 if err != nil {
86 l.Error("failed to claim resync job", "error", err)
87 time.Sleep(time.Second)
88 continue
89 }
90 if !found {
91 time.Sleep(time.Second)
92 continue
93 }
94 l.Info("processing resync", "did", repoDid)
95 if err := r.resyncRepo(ctx, repoDid); err != nil {
96 l.Error("resync failed", "did", repoDid, "error", err)
97 }
98 }
99}
100
101func (r *Resyncer) registerRunning(repo syntax.DID, cancel context.CancelFunc) {
102 r.runningJobsMu.Lock()
103 defer r.runningJobsMu.Unlock()
104
105 if _, exists := r.runningJobs[repo]; exists {
106 return
107 }
108 r.runningJobs[repo] = cancel
109}
110
111func (r *Resyncer) unregisterRunning(repo syntax.DID) {
112 r.runningJobsMu.Lock()
113 defer r.runningJobsMu.Unlock()
114
115 delete(r.runningJobs, repo)
116}
117
118func (r *Resyncer) CancelResyncJob(repo syntax.DID) {
119 r.runningJobsMu.Lock()
120 defer r.runningJobsMu.Unlock()
121
122 cancel, ok := r.runningJobs[repo]
123 if !ok {
124 return
125 }
126 delete(r.runningJobs, repo)
127 cancel()
128}
129
130// TriggerResyncJob manually triggers the resync job
131func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoDid syntax.DID) error {
132 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid)
133 if err != nil {
134 return fmt.Errorf("failed to get repo: %w", err)
135 }
136 if repo == nil {
137 return fmt.Errorf("repo not found: %s", repoDid)
138 }
139
140 if repo.State == models.RepoStateResyncing {
141 return fmt.Errorf("repo already resyncing")
142 }
143
144 repo.State = models.RepoStatePending
145 repo.RetryAfter = -1 // resyncer will prioritize this
146
147 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
148 return fmt.Errorf("updating repo state to pending %w", err)
149 }
150 return nil
151}
152
153func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.DID, bool, error) {
154 // use mutex to prevent duplicated jobs
155 r.claimJobMu.Lock()
156 defer r.claimJobMu.Unlock()
157
158 var repoDid syntax.DID
159 now := time.Now().Unix()
160 if err := r.db.QueryRowContext(ctx,
161 `update repos
162 set state = $1
163 where repo_did = (
164 select repo_did from repos
165 where state in ($2, $3, $4)
166 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
167 order by
168 (retry_after = -1) desc,
169 (retry_after = 0) desc,
170 retry_after
171 limit 1
172 )
173 returning repo_did
174 `,
175 models.RepoStateResyncing,
176 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
177 now,
178 ).Scan(&repoDid); err != nil {
179 if errors.Is(err, sql.ErrNoRows) {
180 return "", false, nil
181 }
182 return "", false, err
183 }
184
185 return repoDid, true, nil
186}
187
188func (r *Resyncer) resyncRepo(ctx context.Context, repoDid syntax.DID) error {
189 // ctx, span := tracer.Start(ctx, "resyncRepo")
190 // span.SetAttributes(attribute.String("aturi", repoAt))
191 // defer span.End()
192
193 resyncsStarted.Inc()
194 startTime := time.Now()
195
196 jobCtx, cancel := context.WithCancel(ctx)
197 r.registerRunning(repoDid, cancel)
198 defer r.unregisterRunning(repoDid)
199
200 success, err := r.doResync(jobCtx, repoDid)
201 if !success {
202 resyncsFailed.Inc()
203 resyncDuration.Observe(time.Since(startTime).Seconds())
204 return r.handleResyncFailure(ctx, repoDid, err)
205 }
206
207 resyncsCompleted.Inc()
208 resyncDuration.Observe(time.Since(startTime).Seconds())
209 return nil
210}
211
212func (r *Resyncer) doResync(ctx context.Context, repoDid syntax.DID) (bool, error) {
213 // ctx, span := tracer.Start(ctx, "doResync")
214 // span.SetAttributes(attribute.String("aturi", repoAt))
215 // defer span.End()
216
217 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid)
218 if err != nil {
219 return false, fmt.Errorf("failed to get repo: %w", err)
220 }
221 if repo == nil { // untracked repo, skip
222 return false, nil
223 }
224
225 r.knotBackoffMu.RLock()
226 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain]
227 r.knotBackoffMu.RUnlock()
228 if inBackoff && time.Now().Before(backoffUntil) {
229 return false, nil
230 }
231
232 // HACK: check knot reachability with short timeout before running actual fetch.
233 // This is crucial as git-cli doesn't support http connection timeout.
234 // `http.lowSpeedTime` is only applied _after_ the connection.
235 format, err := r.checkKnot(ctx, repo)
236 if err != nil {
237 if isRateLimitError(err) {
238 r.knotBackoffMu.Lock()
239 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second)
240 r.knotBackoffMu.Unlock()
241 return false, nil
242 }
243 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online
244 return false, fmt.Errorf("knot unreachable: %w", err)
245 }
246
247 if format == models.ObjectFormatSHA256 {
248 return r.suspendUnsupported(ctx, repo)
249 }
250
251 timeout := r.repoFetchTimeout
252 if repo.RetryAfter == -1 {
253 timeout = r.manualResyncTimeout
254 }
255 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
256 defer cancel()
257
258 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
259 return false, err
260 }
261
262 // request index to zoekt server
263 // NOTE: indexing after full git resync is bad design. We are doing _after_ the sync because knotstream event doesn't include repository refs.
264 // NOTE: and zoekt indexer should directly subscribe to the knot. remove this when we have knotrelay.
265 if r.cfg.Search.ZoektUrl != "" {
266 go func() {
267 idxCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
268 defer cancel()
269 defaultBranch, err := r.gitm.DefaultBranch(idxCtx, repo)
270 if err != nil {
271 r.logger.Warn("resolving default branch for indexing failed", "did", repo.RepoDid, "error", err)
272 return
273 }
274 if err := r.requestIndex(idxCtx, repo.RepoDid, []branch{defaultBranch}); err != nil {
275 r.logger.Warn("requesting zoekt index failed", "did", repo.RepoDid, "err", err)
276 }
277 }()
278 }
279
280 // queue repo_stats_update job
281 r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()})
282
283 // repo.GitRev = <processed git.refUpdate revision>
284 // repo.RepoSha = <sha256 sum of git refs>
285 repo.State = models.RepoStateActive
286 repo.ErrorMsg = ""
287 repo.RetryCount = 0
288 repo.RetryAfter = 0
289 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
290 return false, fmt.Errorf("updating repo state to active %w", err)
291 }
292 return true, nil
293}
294
295type knotStatusError struct {
296 StatusCode int
297}
298
299func (ke *knotStatusError) Error() string {
300 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode)
301}
302
303func isRateLimitError(err error) bool {
304 var knotErr *knotStatusError
305 if errors.As(err, &knotErr) {
306 return knotErr.StatusCode == http.StatusTooManyRequests
307 }
308 return false
309}
310
311func (r *Resyncer) checkKnot(ctx context.Context, repo *models.Repo) (models.ObjectFormat, error) {
312 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL)
313 if err != nil {
314 return "", err
315 }
316
317 repoUrl += "/info/refs?service=git-upload-pack"
318
319 r.logger.Debug("checking knot reachability", "url", repoUrl)
320
321 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil)
322 if err != nil {
323 return "", err
324 }
325 req.Header.Set("User-Agent", "git/2.x")
326 req.Header.Set("Accept", "*/*")
327
328 resp, err := r.httpClient.Do(req)
329 if err != nil {
330 var uerr *url.Error
331 if errors.As(err, &uerr) {
332 return "", fmt.Errorf("request failed: %w", uerr.Unwrap())
333 }
334 return "", fmt.Errorf("request failed: %w", err)
335 }
336 defer resp.Body.Close()
337
338 if resp.StatusCode != http.StatusOK {
339 return "", &knotStatusError{resp.StatusCode}
340 }
341
342 // check if target is git server
343 ct := resp.Header.Get("Content-Type")
344 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") {
345 return "", fmt.Errorf("unexpected content-type: %s", ct)
346 }
347
348 advertisement, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
349 if err != nil {
350 return "", fmt.Errorf("reading upload-pack advertisement: %w", err)
351 }
352 if bytes.Contains(advertisement, []byte("object-format=sha256")) {
353 return models.ObjectFormatSHA256, nil
354 }
355
356 return models.ObjectFormatSHA1, nil
357}
358
359func (r *Resyncer) suspendUnsupported(ctx context.Context, repo *models.Repo) (bool, error) {
360 if err := r.gitm.Delete(repo); err != nil {
361 r.logger.Warn("failed to remove local clone of suspended repo", "did", repo.RepoDid, "err", err)
362 }
363
364 repo.State = models.RepoStateSuspended
365 repo.ErrorMsg = "unsupported sha256 object format"
366 repo.RetryCount = 0
367 repo.RetryAfter = 0
368 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
369 return false, fmt.Errorf("suspending sha256 repo: %w", err)
370 }
371
372 r.logger.Info("suspended sha256 repo, reads forwarded to knot", "did", repo.RepoDid, "knot", repo.KnotDomain)
373 return true, nil
374}
375
376func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error {
377 r.logger.Debug("handleResyncFailure", "at_uri", repoDid, "err", err)
378 var state models.RepoState
379 var errMsg string
380 if err == nil {
381 state = models.RepoStateDesynchronized
382 errMsg = ""
383 } else {
384 state = models.RepoStateError
385 errMsg = err.Error()
386 }
387
388 repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid)
389 if err != nil {
390 return fmt.Errorf("failed to get repo: %w", err)
391 }
392 if repo == nil {
393 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoDid)
394 }
395
396 // start a 1 min & go up to 1 hr between retries
397 var retryCount = repo.RetryCount + 1
398 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
399
400 // remove null bytes
401 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
402
403 repo.State = state
404 repo.ErrorMsg = errMsg
405 repo.RetryCount = retryCount
406 repo.RetryAfter = retryAfter
407 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
408 return fmt.Errorf("failed to update repo state: %w", err)
409 }
410 return nil
411}
412
413func backoff(retries int, max int) time.Duration {
414 dur := min(1<<retries, max)
415 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
416 return time.Second*time.Duration(dur) + jitter
417}
418
419func (r *Resyncer) requestIndex(ctx context.Context, repoDid syntax.DID, branches []branch) error {
420 r.logger.Info("requesting index", "repo", repoDid, "branches", branches)
421 body, err := json.Marshal(map[string]any{
422 "repo": repoDid.String(),
423 "branches": branches,
424 })
425 if err != nil {
426 return fmt.Errorf("marshaling index request: %w", err)
427 }
428
429 endpoint := r.cfg.Search.ZoektUrl + "/indexserver/admin/enqueueIndex"
430 req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
431 if err != nil {
432 return err
433 }
434 req.Header.Set("Content-Type", "application/json")
435
436 resp, err := r.httpClient.Do(req)
437 if err != nil {
438 return fmt.Errorf("requesting zoekt index: %w", err)
439 }
440 defer resp.Body.Close()
441
442 if resp.StatusCode < 200 || resp.StatusCode >= 300 {
443 return fmt.Errorf("non-ok status: %d", resp.StatusCode)
444 }
445 return nil
446}