Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotmirror/resyncer: always forward sha256

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 22, 2026, 6:51 PM +0300) commit 4d65209a parent ff8c2343 change-id vuolwwvw
+306 -10
+7
knotmirror/models/models.go
··· 56 56 return s == RepoStateResyncing 57 57 } 58 58 59 + type ObjectFormat string 60 + 61 + const ( 62 + ObjectFormatSHA1 ObjectFormat = "sha1" 63 + ObjectFormatSHA256 ObjectFormat = "sha256" 64 + ) 65 + 59 66 type HostCursor struct { 60 67 Hostname string 61 68 LastSeq int64
+41 -10
knotmirror/resyncer.go
··· 1 1 package knotmirror 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "database/sql" 6 7 "errors" 7 8 "fmt" 9 + "io" 8 10 "log/slog" 9 11 "math/rand" 10 12 "net/http" ··· 229 231 // HACK: check knot reachability with short timeout before running actual fetch. 230 232 // This is crucial as git-cli doesn't support http connection timeout. 231 233 // `http.lowSpeedTime` is only applied _after_ the connection. 232 - if err := r.checkKnotReachability(ctx, repo); err != nil { 234 + format, err := r.checkKnot(ctx, repo) 235 + if err != nil { 233 236 if isRateLimitError(err) { 234 237 r.knotBackoffMu.Lock() 235 238 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second) ··· 238 241 } 239 242 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online 240 243 return false, fmt.Errorf("knot unreachable: %w", err) 244 + } 245 + 246 + if format == models.ObjectFormatSHA256 { 247 + return r.suspendUnsupported(ctx, repo) 241 248 } 242 249 243 250 timeout := r.repoFetchTimeout ··· 282 289 return false 283 290 } 284 291 285 - // checkKnotReachability checks if Knot is reachable and is valid git remote server 286 - func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error { 292 + func (r *Resyncer) checkKnot(ctx context.Context, repo *models.Repo) (models.ObjectFormat, error) { 287 293 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL) 288 294 if err != nil { 289 - return err 295 + return "", err 290 296 } 291 297 292 298 repoUrl += "/info/refs?service=git-upload-pack" ··· 295 301 296 302 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 297 303 if err != nil { 298 - return err 304 + return "", err 299 305 } 300 306 req.Header.Set("User-Agent", "git/2.x") 301 307 req.Header.Set("Accept", "*/*") ··· 304 310 if err != nil { 305 311 var uerr *url.Error 306 312 if errors.As(err, &uerr) { 307 - return fmt.Errorf("request failed: %w", uerr.Unwrap()) 313 + return "", fmt.Errorf("request failed: %w", uerr.Unwrap()) 308 314 } 309 - return fmt.Errorf("request failed: %w", err) 315 + return "", fmt.Errorf("request failed: %w", err) 310 316 } 311 317 defer resp.Body.Close() 312 318 313 319 if resp.StatusCode != http.StatusOK { 314 - return &knotStatusError{resp.StatusCode} 320 + return "", &knotStatusError{resp.StatusCode} 315 321 } 316 322 317 323 // check if target is git server 318 324 ct := resp.Header.Get("Content-Type") 319 325 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 320 - return fmt.Errorf("unexpected content-type: %s", ct) 326 + return "", fmt.Errorf("unexpected content-type: %s", ct) 327 + } 328 + 329 + advertisement, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) 330 + if err != nil { 331 + return "", fmt.Errorf("reading upload-pack advertisement: %w", err) 332 + } 333 + if bytes.Contains(advertisement, []byte("object-format=sha256")) { 334 + return models.ObjectFormatSHA256, nil 321 335 } 322 336 323 - return nil 337 + return models.ObjectFormatSHA1, nil 338 + } 339 + 340 + func (r *Resyncer) suspendUnsupported(ctx context.Context, repo *models.Repo) (bool, error) { 341 + if err := r.gitm.Delete(repo); err != nil { 342 + r.logger.Warn("failed to remove local clone of suspended repo", "did", repo.RepoDid, "err", err) 343 + } 344 + 345 + repo.State = models.RepoStateSuspended 346 + repo.ErrorMsg = "unsupported sha256 object format" 347 + repo.RetryCount = 0 348 + repo.RetryAfter = 0 349 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 350 + return false, fmt.Errorf("suspending sha256 repo: %w", err) 351 + } 352 + 353 + r.logger.Info("suspended sha256 repo, reads forwarded to knot", "did", repo.RepoDid, "knot", repo.KnotDomain) 354 + return true, nil 324 355 } 325 356 326 357 func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error {
+132
knotmirror/resyncer_test.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "log/slog" 7 + "net/http" 8 + "net/http/httptest" 9 + "strings" 10 + "testing" 11 + 12 + "tangled.org/core/knotmirror/config" 13 + "tangled.org/core/knotmirror/models" 14 + ) 15 + 16 + func uploadPackAdvert(capabilities string) string { 17 + return "001e# service=git-upload-pack\n" + 18 + "0000" + 19 + "0000000000000000000000000000000000000000 capabilities^{}\x00" + capabilities + "\n" + 20 + "0000" 21 + } 22 + 23 + func TestCheckKnotObjectFormat(t *testing.T) { 24 + const gitContentType = "application/x-git-upload-pack-advertisement" 25 + 26 + tests := []struct { 27 + name string 28 + status int 29 + contentType string 30 + body string 31 + wantFormat models.ObjectFormat 32 + wantErr bool 33 + wantRateLimit bool 34 + }{ 35 + { 36 + name: "sha256 repo is detected", 37 + status: http.StatusOK, 38 + contentType: gitContentType, 39 + body: uploadPackAdvert("multi_ack thin-pack side-band-64k ofs-delta object-format=sha256 agent=git/2.45.0"), 40 + wantFormat: models.ObjectFormatSHA256, 41 + }, 42 + { 43 + name: "explicit sha1 repo stays on sha1", 44 + status: http.StatusOK, 45 + contentType: gitContentType, 46 + body: uploadPackAdvert("multi_ack thin-pack side-band-64k ofs-delta object-format=sha1 agent=git/2.45.0"), 47 + wantFormat: models.ObjectFormatSHA1, 48 + }, 49 + { 50 + name: "advertisement without object-format defaults to sha1", 51 + status: http.StatusOK, 52 + contentType: gitContentType, 53 + body: uploadPackAdvert("multi_ack thin-pack side-band-64k ofs-delta agent=git/2.34.0"), 54 + wantFormat: models.ObjectFormatSHA1, 55 + }, 56 + { 57 + name: "sha256 detected with content-type parameters", 58 + status: http.StatusOK, 59 + contentType: gitContentType + "; charset=utf-8", 60 + body: uploadPackAdvert("object-format=sha256"), 61 + wantFormat: models.ObjectFormatSHA256, 62 + }, 63 + { 64 + name: "rate limited knot", 65 + status: http.StatusTooManyRequests, 66 + wantErr: true, 67 + wantRateLimit: true, 68 + }, 69 + { 70 + name: "missing repo on knot", 71 + status: http.StatusNotFound, 72 + wantErr: true, 73 + }, 74 + { 75 + name: "non git content-type", 76 + status: http.StatusOK, 77 + contentType: "text/html", 78 + body: "<html>not a git server</html>", 79 + wantErr: true, 80 + }, 81 + } 82 + 83 + for _, tt := range tests { 84 + t.Run(tt.name, func(t *testing.T) { 85 + var gotPath string 86 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 87 + gotPath = r.URL.Path 88 + if tt.contentType != "" { 89 + w.Header().Set("Content-Type", tt.contentType) 90 + } 91 + w.WriteHeader(tt.status) 92 + io.WriteString(w, tt.body) 93 + })) 94 + defer srv.Close() 95 + 96 + r := &Resyncer{ 97 + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 98 + cfg: &config.Config{}, 99 + httpClient: srv.Client(), 100 + } 101 + repo := &models.Repo{ 102 + RepoDid: "did:plc:boltless", 103 + KnotDomain: srv.URL, 104 + } 105 + 106 + format, err := r.checkKnot(context.Background(), repo) 107 + 108 + if tt.wantErr { 109 + if err == nil { 110 + t.Fatalf("expected error, got format %q", format) 111 + } 112 + if format != "" { 113 + t.Errorf("expected empty format on error, got %q", format) 114 + } 115 + if got := isRateLimitError(err); got != tt.wantRateLimit { 116 + t.Errorf("isRateLimitError = %v, want %v", got, tt.wantRateLimit) 117 + } 118 + return 119 + } 120 + 121 + if err != nil { 122 + t.Fatalf("unexpected error: %v", err) 123 + } 124 + if format != tt.wantFormat { 125 + t.Errorf("format = %q, want %q", format, tt.wantFormat) 126 + } 127 + if !strings.HasSuffix(gotPath, "/info/refs") { 128 + t.Errorf("expected upstream request to /info/refs, got %q", gotPath) 129 + } 130 + }) 131 + } 132 + }
+125
knotmirror/xrpc/proxy.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "cmp" 4 5 "context" 5 6 "errors" 6 7 "fmt" ··· 8 9 "maps" 9 10 "net/http" 10 11 "net/url" 12 + "path" 11 13 "strings" 12 14 15 + "github.com/bluesky-social/indigo/atproto/atclient" 13 16 "github.com/bluesky-social/indigo/atproto/syntax" 14 17 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 18 + "github.com/go-git/go-git/v5/plumbing/filemode" 15 19 "tangled.org/core/api/tangled" 16 20 "tangled.org/core/knotmirror/db" 17 21 "tangled.org/core/knotmirror/models" ··· 26 30 tangled.GitTempGetTagNSID: tangled.RepoTagNSID, 27 31 tangled.GitTempGetArchiveNSID: tangled.RepoArchiveNSID, 28 32 tangled.GitTempListLanguagesNSID: tangled.RepoLanguagesNSID, 33 + tangled.GitTempGetBlobNSID: tangled.RepoBlobNSID, 29 34 } 30 35 31 36 var hopByHopHeaders = map[string]bool{ ··· 182 187 x.logger.Info("proxy: served from knot", "repo", repoDid, "knot", knot.baseURL, "status", resp.StatusCode) 183 188 return true 184 189 } 190 + 191 + func (x *Xrpc) forwardSuspended(next http.Handler) http.Handler { 192 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 193 + repoDid, err := syntax.ParseDID(r.URL.Query().Get("repo")) 194 + if err != nil { 195 + next.ServeHTTP(w, r) 196 + return 197 + } 198 + 199 + repo, err := db.GetRepoByRepoDid(r.Context(), x.db, repoDid) 200 + if err != nil || repo == nil || repo.State != models.RepoStateSuspended { 201 + next.ServeHTTP(w, r) 202 + return 203 + } 204 + 205 + nsid := strings.TrimPrefix(r.URL.Path, "/xrpc/") 206 + switch nsid { 207 + case tangled.GitTempGetEntryNSID: 208 + x.serveSuspendedEntry(w, r, repoDid) 209 + case tangled.GitTempGetBlobNSID: 210 + q := r.URL.Query() 211 + q.Set("raw", "true") 212 + r.URL.RawQuery = q.Encode() 213 + x.forwardOrFail(w, r, repoDid) 214 + default: 215 + if _, ok := mirrorToKnotNSID[nsid]; !ok { 216 + next.ServeHTTP(w, r) 217 + return 218 + } 219 + x.forwardOrFail(w, r, repoDid) 220 + } 221 + }) 222 + } 223 + 224 + func (x *Xrpc) forwardOrFail(w http.ResponseWriter, r *http.Request, repoDid syntax.DID) { 225 + if x.proxyToKnot(w, r, repoDid) { 226 + return 227 + } 228 + writeJson(w, http.StatusBadGateway, atclient.ErrorBody{Name: "BadGateway", Message: "failed to reach knot for suspended repo"}) 229 + } 230 + 231 + func (x *Xrpc) serveSuspendedEntry(w http.ResponseWriter, r *http.Request, repoDid syntax.DID) { 232 + ref := cmp.Or(r.URL.Query().Get("ref"), "HEAD") 233 + filePath := r.URL.Query().Get("path") 234 + if filePath == "" { 235 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: "missing path parameter"}) 236 + return 237 + } 238 + 239 + knot, err := x.resolveKnot(r.Context(), repoDid) 240 + if err != nil { 241 + x.logger.Warn("suspended entry: failed to resolve knot", "repo", repoDid, "err", err) 242 + writeJson(w, http.StatusBadGateway, atclient.ErrorBody{Name: "BadGateway", Message: "failed to resolve knot for suspended repo"}) 243 + return 244 + } 245 + 246 + client := &indigoxrpc.Client{Host: knot.baseURL, Client: x.httpClient} 247 + out, err := tangled.RepoBlob(r.Context(), client, filePath, false, ref, knot.repoIdentifier) 248 + if err != nil { 249 + x.logger.Warn("suspended entry: knot repo.blob failed", "repo", repoDid, "err", err) 250 + writeJson(w, http.StatusBadGateway, atclient.ErrorBody{Name: "BadGateway", Message: "failed to read entry from knot"}) 251 + return 252 + } 253 + 254 + mode := filemode.Regular 255 + if out.Submodule != nil { 256 + mode = filemode.Submodule 257 + } 258 + 259 + writeJson(w, http.StatusOK, tangled.GitTempGetEntry_Output{ 260 + Name: path.Base(filePath), 261 + Mode: mode.String(), 262 + Size: derefInt64(out.Size), 263 + LastCommit: suspendedLastCommit(out.LastCommit), 264 + Submodule: suspendedSubmodule(out.Submodule), 265 + }) 266 + } 267 + 268 + func suspendedLastCommit(c *tangled.RepoBlob_LastCommit) *tangled.GitTempDefs_Commit { 269 + if c == nil || c.Author == nil { 270 + return nil 271 + } 272 + sig := suspendedSignature(c.Author) 273 + hash := c.Hash 274 + return &tangled.GitTempDefs_Commit{ 275 + Author: sig, 276 + Committer: sig, 277 + Hash: &hash, 278 + Message: c.Message, 279 + } 280 + } 281 + 282 + func suspendedSignature(s *tangled.RepoBlob_Signature) *tangled.GitTempDefs_Signature { 283 + if s == nil { 284 + return nil 285 + } 286 + return &tangled.GitTempDefs_Signature{ 287 + Name: s.Name, 288 + Email: s.Email, 289 + When: s.When, 290 + } 291 + } 292 + 293 + func suspendedSubmodule(s *tangled.RepoBlob_Submodule) *tangled.GitTempDefs_Submodule { 294 + if s == nil { 295 + return nil 296 + } 297 + return &tangled.GitTempDefs_Submodule{ 298 + Name: s.Name, 299 + Url: s.Url, 300 + Branch: s.Branch, 301 + } 302 + } 303 + 304 + func derefInt64(v *int64) int64 { 305 + if v == nil { 306 + return 0 307 + } 308 + return *v 309 + }
+1
knotmirror/xrpc/xrpc.go
··· 58 58 59 59 r.Group(func(r chi.Router) { 60 60 r.Use(x.inflight.middleware) 61 + r.Use(x.forwardSuspended) 61 62 62 63 r.Get("/"+tangled.GitTempGetArchiveNSID, x.GetArchive) 63 64 r.Get("/"+tangled.GitTempGetBlobNSID, x.GetBlob)