Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

knotmirror: performant language indexer

`git.listLanguages` has been one of the method that fails most often.
Opening multiple git repos simultaneously can eaily cause OOM and
language indexing itself usually takes super long.

So several changes:

- use `gitea.CatFileBatch` instead of go-git to avoid OOM
- skip files larger than 16KB
- sync HEAD ref language stats in knotmirror db
- cache language stats info by commits (30d TTL)

When syncing HEAD ref language stats, we do indexing on background.
KnotMirror maintains internal "repo_stats_update" queue and right after
`doResync` is done, enqueue the language stat indexing job so we can
pre-index the language stats of HEAD ref.

It's ok to spam this queue because all later events will be eventually
ignored as we are resolving HEAD lazily.

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
committer
Tangled
date (May 19, 2026, 11:38 AM +0300) commit 817fd58d parent b7d12ed1 change-id muqmmutp
+398 -102
+1
appview/state/knotstream.go
··· 98 98 } 99 99 } 100 100 101 + // TODO(boltless): remove this. knotmirror should do all sort of indexing 101 102 func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error { 102 103 logger := log.FromContext(ctx) 103 104
+14
knotmirror/db/db.go
··· 69 69 constraint hosts_pkey primary key (hostname) 70 70 ); 71 71 72 + -- repo language stats at HEAD 73 + create table if not exists repo_head_languages ( 74 + repo text not null, -- repo identifier (did) 75 + commit text not null, -- commit id (oid) 76 + language text not null, 77 + size integer not null check (size >= 0), 78 + 79 + constraint repo_head_languages_pkey 80 + primary key (repo, commit, language) 81 + ); 82 + 72 83 create index if not exists idx_repos_aturi on repos (at_uri); 73 84 create index if not exists idx_repos_db_updated_at on repos (db_updated_at desc); 74 85 create index if not exists idx_hosts_db_updated_at on hosts (db_updated_at desc); 86 + 87 + create index if not exists idx_repo_head_languages_repo_commit 88 + on repo_head_languages (repo, commit); 75 89 76 90 create or replace function set_updated_at() 77 91 returns trigger as $$
+68
knotmirror/db/repo_index.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + "github.com/go-git/go-git/v5/plumbing" 10 + ) 11 + 12 + func IsLanguageIndexed(ctx context.Context, e *sql.DB, repoId syntax.DID, commitId plumbing.Hash) (bool, error) { 13 + var exists bool 14 + err := e.QueryRow(`select exists(select 1 from repo_head_languages where repo = $1)`, repoId).Scan(&exists) 15 + return exists, err 16 + } 17 + 18 + func InsertLanguages(ctx context.Context, e *sql.DB, repoId syntax.DID, commitId plumbing.Hash, langs map[string]int64) error { 19 + tx, err := e.BeginTx(ctx, nil) 20 + if err != nil { 21 + return fmt.Errorf("BeginTx: %w", err) 22 + } 23 + defer tx.Rollback() 24 + 25 + if _, err := tx.Exec(`delete from repo_head_languages where repo = $1`, repoId); err != nil { 26 + return fmt.Errorf("deleting old languages: %w", err) 27 + } 28 + 29 + for lang, size := range langs { 30 + if _, err := tx.Exec( 31 + `insert into repo_head_languages (repo, commit, language, size) 32 + values ($1, $2, $3, $4)`, 33 + repoId, commitId.String(), lang, size, 34 + ); err != nil { 35 + return fmt.Errorf("inserting language: %w", err) 36 + } 37 + } 38 + 39 + if err := tx.Commit(); err != nil { 40 + return fmt.Errorf("tx.Commit: %w", err) 41 + } 42 + return nil 43 + } 44 + 45 + func ListLanguages(ctx context.Context, e *sql.DB, repoId syntax.DID, commitId plumbing.Hash) (map[string]int64, error) { 46 + sizes := make(map[string]int64) 47 + 48 + rows, err := e.Query(`select language, size from repo_head_languages where repo = $1 and commit = $2`, repoId, commitId.String()) 49 + if err != nil { 50 + return nil, err 51 + } 52 + defer rows.Close() 53 + 54 + for rows.Next() { 55 + var lang string 56 + var size int64 57 + if err := rows.Scan(&lang, &size); err != nil { 58 + return nil, err 59 + } 60 + sizes[lang] = size 61 + } 62 + 63 + if err := rows.Err(); err != nil { 64 + return nil, err 65 + } 66 + 67 + return sizes, nil 68 + }
+7 -2
knotmirror/knotmirror.go
··· 14 14 "tangled.org/core/knotmirror/config" 15 15 "tangled.org/core/knotmirror/db" 16 16 "tangled.org/core/knotmirror/knotstream" 17 + "tangled.org/core/knotmirror/repoindexer" 17 18 "tangled.org/core/knotmirror/models" 18 19 "tangled.org/core/knotmirror/xrpc" 19 20 "tangled.org/core/log" ··· 56 57 } 57 58 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 58 59 60 + indexer := repoindexer.NewIndexer(logger, cfg, rdb) 61 + indexScheduler := repoindexer.NewBackgroundIndexScheduler(logger, cfg, db, indexer) 62 + indexScheduler.Start(ctx) 63 + 59 64 knotstream := knotstream.NewKnotStream(logger, db, cfg) 60 65 crawler := NewCrawler(logger, db) 61 - resyncer := NewResyncer(logger, db, gitm, cfg) 62 - xrpc := xrpc.New(logger, cfg, db, rdb, resolver, knotstream) 66 + resyncer := NewResyncer(logger, db, gitm, indexScheduler, cfg) 67 + xrpc := xrpc.New(logger, cfg, db, rdb, indexer, resolver, knotstream) 63 68 adminpage := NewAdminServer(logger, db, resyncer, xrpc, resolver) 64 69 65 70 // maintain repository list with tap
+7 -7
knotmirror/knotstream/scheduler.go
··· 24 24 } 25 25 26 26 type Task struct { 27 - key string 27 + Key string 28 28 message []byte 29 29 } 30 30 ··· 46 46 47 47 func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) { 48 48 s.lk.Lock() 49 - if st, ok := s.scheduled[task.key]; ok { 49 + if st, ok := s.scheduled[task.Key]; ok { 50 50 // schedule task 51 - s.scheduled[task.key] = append(st, task) 51 + s.scheduled[task.Key] = append(st, task) 52 52 s.lk.Unlock() 53 53 return 54 54 } 55 - s.scheduled[task.key] = []*Task{} 55 + s.scheduled[task.Key] = []*Task{} 56 56 s.lk.Unlock() 57 57 58 58 select { ··· 77 77 78 78 s.lk.Lock() 79 79 func() { 80 - rem, ok := s.scheduled[task.key] 80 + rem, ok := s.scheduled[task.Key] 81 81 if !ok { 82 82 s.logger.Error("should always have an 'active' entry if a worker is processing a job") 83 83 } 84 84 if len(rem) == 0 { 85 - delete(s.scheduled, task.key) 85 + delete(s.scheduled, task.Key) 86 86 task = nil 87 87 } else { 88 88 task = rem[0] 89 - s.scheduled[task.key] = rem[1:] 89 + s.scheduled[task.Key] = rem[1:] 90 90 } 91 91 92 92 // TODO: update seq from received message
+2 -2
knotmirror/knotstream/slurper.go
··· 257 257 } 258 258 259 259 sub.scheduler.AddTask(ctx, &Task{ 260 - key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 260 + Key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 261 261 message: msg, 262 262 }) 263 263 } ··· 281 281 return fmt.Errorf("unmarshaling message: %w", err) 282 282 } 283 283 284 - if err := s.ProcessLegacyGitRefUpdate(ctx, task.key, &legacyMessage); err != nil { 284 + if err := s.ProcessLegacyGitRefUpdate(ctx, task.Key, &legacyMessage); err != nil { 285 285 return fmt.Errorf("processing gitRefUpdate: %w", err) 286 286 } 287 287 return nil
+249
knotmirror/repoindexer/language.go
··· 1 + package repoindexer 2 + 3 + import ( 4 + "bufio" 5 + "context" 6 + "database/sql" 7 + "encoding/json" 8 + "fmt" 9 + "io" 10 + "log/slog" 11 + "path/filepath" 12 + "strings" 13 + "time" 14 + 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + "github.com/go-enry/go-enry/v2" 17 + "github.com/go-git/go-git/v5/plumbing" 18 + "github.com/go-git/go-git/v5/plumbing/filemode" 19 + "github.com/go-git/go-git/v5/plumbing/object" 20 + "github.com/redis/go-redis/v9" 21 + "tangled.org/core/knotmirror/config" 22 + "tangled.org/core/knotmirror/db" 23 + "tangled.org/core/knotmirror/knotstream" 24 + "tangled.org/core/knotmirror/xrpc/gitea" 25 + "tangled.org/core/log" 26 + ) 27 + 28 + const ( 29 + fileSizeLimit = 16 * 1024 // read up to 16 KiB for language detection 30 + bigFileSize = 1024 * 1024 // skip content read for blobs over 1 MiB 31 + langIndexRepoCommit = "lang_index:%s:%s" // lang_index:{did}:{oid} 32 + langIndexRepoCommitTTL = 30 * 24 * time.Hour 33 + ) 34 + 35 + // Language indexing strategy: 36 + // 37 + // git.refUpdate HEAD -> store to db 38 + // git.refUpdate other -> on-demand calculation, cache 39 + // 40 + // NOTE: currently all event we have is git.refUpdate, so background indexing 41 + // job will always be triggered. 42 + // TODO(boltless): don't queue indexing job on "sync" type event while repo is 43 + // not active. 44 + 45 + type Indexer struct { 46 + logger *slog.Logger 47 + cfg *config.Config 48 + rdb *redis.Client 49 + } 50 + 51 + func NewIndexer(l *slog.Logger, cfg *config.Config, rdb *redis.Client) *Indexer { 52 + indexer := &Indexer{ 53 + logger: log.SubLogger(l, "indexer"), 54 + cfg: cfg, 55 + rdb: rdb, 56 + } 57 + return indexer 58 + } 59 + 60 + func NewBackgroundIndexScheduler(l *slog.Logger, cfg *config.Config, e *sql.DB, indexer *Indexer) *knotstream.ParallelScheduler { 61 + return knotstream.NewParallelScheduler( 62 + 4, 63 + "repo_stats_update", // NOTE: this is unused 64 + func(ctx context.Context, t *knotstream.Task) error { 65 + start := time.Now() 66 + repoId := syntax.DID(t.Key) 67 + 68 + // resolve HEAD to commitId 69 + commit, err := gitea.GetCommit(ctx, indexer.repoPath(repoId), "HEAD") 70 + if err != nil { 71 + return fmt.Errorf("failed to resolve HEAD: %w", err) 72 + } 73 + 74 + l := l.With("repo", repoId, "hash", commit.Hash) 75 + 76 + // check if (did,oid) is already indexed 77 + indexed, err := db.IsLanguageIndexed(ctx, e, repoId, commit.Hash) 78 + if err != nil { 79 + l.Error("failed to query langs", "err", err) 80 + indexed = false 81 + // continue 82 + } 83 + if indexed { 84 + return nil 85 + } 86 + 87 + langs, err := indexer.IndexLanguages(ctx, repoId, commit.Hash) 88 + if err != nil { 89 + return fmt.Errorf("indexing langs: %w", err) 90 + } 91 + 92 + l.Info("pre-indexed language stats", "duration", time.Since(start)) 93 + 94 + if err := db.InsertLanguages(ctx, e, repoId, commit.Hash, langs); err != nil { 95 + return fmt.Errorf("failed to insert langs into db: %w", err) 96 + } 97 + 98 + return nil 99 + }, 100 + ) 101 + } 102 + 103 + func (i *Indexer) repoPath(repo syntax.DID) string { 104 + return filepath.Join(i.cfg.GitRepoBasePath, repo.String()) 105 + } 106 + 107 + // IndexLanguages index the repository language stats at given commit 108 + func (i *Indexer) IndexLanguages(ctx context.Context, repoId syntax.DID, commitId plumbing.Hash) (map[string]int64, error) { 109 + if i.rdb != nil { 110 + if val, err := i.rdb.Get(ctx, fmt.Sprintf(langIndexRepoCommit, repoId, commitId.String())).Result(); err == nil { 111 + i.logger.Debug("serve from cache") 112 + var sizes map[string]int64 113 + if err := json.Unmarshal([]byte(val), &sizes); err == nil { 114 + return sizes, nil 115 + } 116 + } 117 + } 118 + 119 + sizes, err := IndexLanguagesInner(ctx, i.repoPath(repoId), commitId) 120 + if err != nil { 121 + return nil, err 122 + } 123 + 124 + if i.rdb != nil { 125 + if encoded, err := json.Marshal(sizes); err == nil { 126 + i.logger.Debug("cache language") 127 + if err := i.rdb.Set(ctx, 128 + fmt.Sprintf(langIndexRepoCommit, repoId, commitId.String()), 129 + encoded, 130 + langIndexRepoCommitTTL, 131 + ).Err(); err != nil { 132 + i.logger.Error("failed to cache languages", "err", err) 133 + } 134 + } 135 + } 136 + return sizes, nil 137 + } 138 + 139 + func IndexLanguagesInner(ctx context.Context, repoPath string, commitId plumbing.Hash) (map[string]int64, error) { 140 + tree, err := gitea.GetTree(ctx, repoPath, commitId.String()+"^{tree}") 141 + if err != nil { 142 + return nil, err 143 + } 144 + 145 + bw, br, close := gitea.CatFileBatch(ctx, repoPath) 146 + defer close() 147 + 148 + sizes, err := batchAnalyzeTree(ctx, bw, br, tree) 149 + if err != nil { 150 + return nil, err 151 + } 152 + return sizes, nil 153 + } 154 + 155 + func batchAnalyzeTree(ctx context.Context, bw io.WriteCloser, br *bufio.Reader, tree *object.Tree) (map[string]int64, error) { 156 + sizes := make(map[string]int64) 157 + for _, entry := range tree.Entries { 158 + select { 159 + case <-ctx.Done(): 160 + return nil, ctx.Err() 161 + default: 162 + } 163 + 164 + switch entry.Mode { 165 + case filemode.Dir: 166 + subTree, err := gitea.BatchGetTree(bw, br, entry.Hash.String()) 167 + if err != nil { 168 + return nil, err 169 + } 170 + subTreeSizes, err := batchAnalyzeTree(ctx, bw, br, subTree) 171 + if err != nil { 172 + return nil, err 173 + } 174 + for name, size := range subTreeSizes { 175 + sizes[name] += size 176 + } 177 + case filemode.Symlink, filemode.Submodule: 178 + // skip symlink/submodule 179 + default: 180 + _, err := bw.Write([]byte(entry.Hash.String() + "\n")) 181 + if err != nil { 182 + return nil, err 183 + } 184 + _, _, size, err := gitea.ReadBatchLine(br) 185 + if err != nil { 186 + return nil, err 187 + } 188 + // skip large file 189 + if size > bigFileSize { 190 + if err := gitea.DiscardFull(br, size+1); err != nil { 191 + return nil, err 192 + } 193 + continue 194 + } 195 + 196 + sizeToRead := size 197 + discard := int64(1) 198 + if size > fileSizeLimit { 199 + sizeToRead = fileSizeLimit 200 + discard = size - fileSizeLimit + 1 201 + } 202 + content, err := io.ReadAll(io.LimitReader(br, sizeToRead)) 203 + if err != nil { 204 + return nil, err 205 + } 206 + if err := gitea.DiscardFull(br, discard); err != nil { 207 + return nil, err 208 + } 209 + 210 + language, noskip := analyzeLanguage(entry.Name, content) 211 + if noskip { 212 + sizes[language] += size 213 + } 214 + } 215 + } 216 + return sizes, nil 217 + } 218 + 219 + func analyzeLanguage(fileName string, content []byte) (string, bool) { 220 + // skip generated file 221 + // TODO: follow gitattributes, lazyily read content (filter by filename first) 222 + if enry.IsGenerated(fileName, content) || enry.IsBinary(content) || strings.HasSuffix(fileName, "bun.lock") { 223 + return "", false 224 + } 225 + 226 + language := func(fileName string, content []byte) string { 227 + language, ok := enry.GetLanguageByExtension(fileName) 228 + if ok { 229 + return language 230 + } 231 + language, ok = enry.GetLanguageByFilename(fileName) 232 + if ok { 233 + return language 234 + } 235 + if len(content) == 0 { 236 + return enry.OtherLanguage 237 + } 238 + return enry.GetLanguage(fileName, content) 239 + }(fileName, content) 240 + if group := enry.GetLanguageGroup(language); group != "" { 241 + language = group 242 + } 243 + 244 + langType := enry.GetLanguageType(language) 245 + if langType != enry.Programming && langType != enry.Markup && langType != enry.Unknown { 246 + return "", false 247 + } 248 + return language, true 249 + }
+15 -9
knotmirror/resyncer.go
··· 16 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 17 "tangled.org/core/knotmirror/config" 18 18 "tangled.org/core/knotmirror/db" 19 + "tangled.org/core/knotmirror/knotstream" 19 20 "tangled.org/core/knotmirror/models" 20 21 "tangled.org/core/log" 21 22 ) 22 23 23 24 type Resyncer struct { 24 - logger *slog.Logger 25 - db *sql.DB 26 - gitm GitMirrorManager 27 - cfg *config.Config 25 + logger *slog.Logger 26 + db *sql.DB 27 + gitm GitMirrorManager 28 + cfg *config.Config 29 + indexer *knotstream.ParallelScheduler 28 30 29 31 claimJobMu sync.Mutex 30 32 ··· 41 43 httpClient *http.Client 42 44 } 43 45 44 - func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 46 + func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, indexer *knotstream.ParallelScheduler, cfg *config.Config) *Resyncer { 45 47 return &Resyncer{ 46 - logger: log.SubLogger(l, "resyncer"), 47 - db: db, 48 - gitm: gitm, 49 - cfg: cfg, 48 + logger: log.SubLogger(l, "resyncer"), 49 + db: db, 50 + gitm: gitm, 51 + cfg: cfg, 52 + indexer: indexer, 50 53 51 54 runningJobs: make(map[syntax.ATURI]context.CancelFunc), 52 55 ··· 247 250 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 248 251 return false, err 249 252 } 253 + 254 + // queue repo_stats_update job 255 + r.indexer.AddTask(context.TODO(), &knotstream.Task{Key: repo.RepoDid.String()}) 250 256 251 257 // repo.GitRev = <processed git.refUpdate revision> 252 258 // repo.RepoSha = <sha256 sum of git refs>
+22 -76
knotmirror/xrpc/git_list_languages.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 - "math" 8 6 "net/http" 9 7 "time" 10 8 11 9 "github.com/bluesky-social/indigo/atproto/atclient" 12 10 "github.com/bluesky-social/indigo/atproto/syntax" 13 11 "tangled.org/core/api/tangled" 14 - "tangled.org/core/knotserver/git" 15 - ) 16 - 17 - const ( 18 - RepoLanguagesByDid = "git_list_languages:repo:%s:%s" 19 - RepoLanguagesTTL = 24 * time.Hour 12 + "tangled.org/core/knotmirror/xrpc/gitea" 20 13 ) 21 14 22 15 func (x *Xrpc) ListLanguages(w http.ResponseWriter, r *http.Request) { ··· 34 27 return 35 28 } 36 29 37 - if val, err := x.rdb.Get(r.Context(), fmt.Sprintf(RepoLanguagesByDid, repo, ref)).Result(); err == nil { 38 - l.Debug("served from cache") 39 - var langs []*tangled.GitTempListLanguages_Language 40 - err = json.Unmarshal([]byte(val), &langs) 41 - if err == nil { 42 - writeJson(w, http.StatusOK, &tangled.GitTempListLanguages_Output{ 43 - Ref: ref, 44 - Languages: langs, 45 - }) 46 - return 47 - } 48 - } 30 + ctx := r.Context() 49 31 50 - var out *tangled.GitTempListLanguages_Output 51 - out, err = x.listLanguages(r.Context(), repo, ref) 52 - if err != nil { 53 - l.Warn("local mirror failed, trying proxy", "err", err) 54 - if x.proxyToKnot(w, r, repo) { 55 - return 56 - } 57 - writeErr(w, err) 58 - return 59 - } 60 - 61 - go func() { 62 - ctx := context.Background() 63 - encoded, err := json.Marshal(out.Languages) 64 - if err != nil { 65 - return 66 - } 67 - x.rdb.Set(ctx, fmt.Sprintf(RepoLanguagesByDid, repo, ref), encoded, RepoLanguagesTTL) 68 - }() 69 - 70 - writeJson(w, http.StatusOK, out) 71 - } 72 - 73 - func (x *Xrpc) listLanguages(ctx context.Context, repo syntax.DID, ref string) (*tangled.GitTempListLanguages_Output, error) { 74 32 repoPath, err := x.makeRepoPath(ctx, repo) 75 33 if err != nil { 76 - return nil, fmt.Errorf("resolving repo did: %w", err) 34 + l.Error("failed to make repo path", "err", err) 35 + writeJson(w, http.StatusNotFound, atclient.ErrorBody{Name: "RepoNotFound", Message: fmt.Sprintf("unknown repository: %s", repo)}) 36 + return 77 37 } 78 38 79 - gr, err := git.Open(repoPath, ref) 39 + commit, err := gitea.GetCommit(ctx, repoPath, ref) 80 40 if err != nil { 81 - return nil, &atclient.APIError{StatusCode: http.StatusNotFound, Name: "RepoNotFound", Message: "failed to find git repo"} 41 + l.Error("failed to get commit", "err", err) 42 + writeJson(w, http.StatusNotFound, atclient.ErrorBody{Name: "RefNotFound", Message: fmt.Sprintf("unknown git ref: %s", repo)}) 43 + return 82 44 } 83 45 84 - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) 46 + indexCtx, cancel := context.WithTimeout(ctx, 1 * time.Second) 85 47 defer cancel() 86 - 87 - sizes, err := gr.AnalyzeLanguages(ctx) 48 + sizes, err := x.indexer.IndexLanguages(indexCtx, repo, commit.Hash) 88 49 if err != nil { 89 - return nil, fmt.Errorf("analyzing languages: %w", err) 50 + l.Error("failed to serve languages", "err", err) 51 + writeJson(w, http.StatusNotFound, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to serve languages"}) 52 + return 90 53 } 91 54 92 - return &tangled.GitTempListLanguages_Output{ 93 - Ref: ref, 94 - Languages: sizesToLanguages(sizes), 95 - }, nil 96 - } 97 - 98 - func sizesToLanguages(sizes git.LangBreakdown) []*tangled.GitTempListLanguages_Language { 99 - var apiLanguages []*tangled.GitTempListLanguages_Language 100 - var totalSize int64 101 - for _, size := range sizes { 102 - totalSize += size 55 + var out tangled.GitTempListLanguages_Output 56 + for lang, size := range sizes { 57 + out.Total += size 58 + out.Languages = append(out.Languages, &tangled.GitTempListLanguages_Language{ 59 + Name: lang, 60 + Size: size, 61 + }) 103 62 } 104 63 105 - for name, size := range sizes { 106 - percentagef64 := float64(size) / float64(totalSize) * 100 107 - percentage := math.Round(percentagef64) 108 - 109 - lang := &tangled.GitTempListLanguages_Language{ 110 - Name: name, 111 - Size: size, 112 - Percentage: int64(percentage), 113 - } 114 - 115 - apiLanguages = append(apiLanguages, lang) 116 - } 117 - 118 - return apiLanguages 64 + writeJson(w, http.StatusOK, &out) 119 65 }
+9 -5
knotmirror/xrpc/gitea/batch.go
··· 49 49 } 50 50 51 51 func GetTree(ctx context.Context, repoPath, rev string) (*object.Tree, error) { 52 - wr, rd, cancel := CatFileBatch(ctx, repoPath) 52 + bw, br, cancel := CatFileBatch(ctx, repoPath) 53 53 defer cancel() 54 54 55 - if _, err := wr.Write([]byte(rev + "\n")); err != nil { 55 + return BatchGetTree(bw, br, rev) 56 + } 57 + 58 + func BatchGetTree(bw io.WriteCloser, br *bufio.Reader, rev string) (*object.Tree, error) { 59 + if _, err := bw.Write([]byte(rev + "\n")); err != nil { 56 60 return nil, fmt.Errorf("write rev: %w", err) 57 61 } 58 - sha, typ, size, err := ReadBatchLine(rd) 62 + sha, typ, size, err := ReadBatchLine(br) 59 63 if err != nil { 60 64 return nil, fmt.Errorf("resolve %s: %w", rev, err) 61 65 } 62 66 if typ != "tree" { 63 - if err := DiscardFull(rd, size+1); err != nil { 67 + if err := DiscardFull(br, size+1); err != nil { 64 68 return nil, err 65 69 } 66 70 return nil, fmt.Errorf("unexpected type: %s for tree: %s", typ, rev) 67 71 } 68 72 69 - entries, err := catBatchParseTreeEntries(rd, size) 73 + entries, err := catBatchParseTreeEntries(br, size) 70 74 if err != nil { 71 75 return nil, fmt.Errorf("read tree %s: %w", rev, err) 72 76 }
+4 -1
knotmirror/xrpc/xrpc.go
··· 15 15 "tangled.org/core/api/tangled" 16 16 "tangled.org/core/idresolver" 17 17 "tangled.org/core/knotmirror/config" 18 + "tangled.org/core/knotmirror/repoindexer" 18 19 "tangled.org/core/knotmirror/knotstream" 19 20 "tangled.org/core/log" 20 21 ) ··· 23 24 cfg *config.Config 24 25 db *sql.DB 25 26 rdb *redis.Client 27 + indexer *repoindexer.Indexer 26 28 resolver *idresolver.Resolver 27 29 ks *knotstream.KnotStream 28 30 logger *slog.Logger ··· 30 32 inflight *inflightTracker 31 33 } 32 34 33 - func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, rdb *redis.Client, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 35 + func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, rdb *redis.Client, indexer *repoindexer.Indexer, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 34 36 httpClient := &http.Client{ 35 37 Timeout: 30 * time.Second, 36 38 } ··· 41 43 cfg: cfg, 42 44 db: db, 43 45 rdb: rdb, 46 + indexer: indexer, 44 47 resolver: resolver, 45 48 ks: ks, 46 49 logger: log.SubLogger(logger, "xrpc"),