Monorepo for Tangled
tangled.org
1package repoindexer
2
3import (
4 "bufio"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "path/filepath"
12 "strings"
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/go-enry/go-enry/v2"
17 "github.com/go-git/go-git/v5/plumbing"
18 "github.com/go-git/go-git/v5/plumbing/filemode"
19 "github.com/go-git/go-git/v5/plumbing/object"
20 "github.com/redis/go-redis/v9"
21 "tangled.org/core/knotmirror/config"
22 "tangled.org/core/knotmirror/db"
23 "tangled.org/core/knotmirror/knotstream"
24 "tangled.org/core/knotmirror/xrpc/gitea"
25 "tangled.org/core/log"
26)
27
28const (
29 fileSizeLimit = 16 * 1024 // read up to 16 KiB for language detection
30 bigFileSize = 1024 * 1024 // skip content read for blobs over 1 MiB
31 langIndexRepoCommit = "lang_index:%s:%s" // lang_index:{did}:{oid}
32 langIndexRepoCommitTTL = 30 * 24 * time.Hour
33)
34
35// Language indexing strategy:
36//
37// git.refUpdate HEAD -> store to db
38// git.refUpdate other -> on-demand calculation, cache
39//
40// NOTE: currently all event we have is git.refUpdate, so background indexing
41// job will always be triggered.
42// TODO(boltless): don't queue indexing job on "sync" type event while repo is
43// not active.
44
45type Indexer struct {
46 logger *slog.Logger
47 cfg *config.Config
48 rdb *redis.Client
49}
50
51func NewIndexer(l *slog.Logger, cfg *config.Config, rdb *redis.Client) *Indexer {
52 indexer := &Indexer{
53 logger: log.SubLogger(l, "indexer"),
54 cfg: cfg,
55 rdb: rdb,
56 }
57 return indexer
58}
59
60func NewBackgroundIndexScheduler(l *slog.Logger, cfg *config.Config, e *sql.DB, indexer *Indexer) *knotstream.ParallelScheduler {
61 return knotstream.NewParallelScheduler(
62 4,
63 "repo_stats_update", // NOTE: this is unused
64 func(ctx context.Context, t *knotstream.Task) error {
65 start := time.Now()
66 repoId := syntax.DID(t.Key)
67
68 // resolve HEAD to commitId
69 commit, err := gitea.GetCommit(ctx, indexer.repoPath(repoId), "HEAD")
70 if err != nil {
71 return fmt.Errorf("failed to resolve HEAD: %w", err)
72 }
73
74 l := l.With("repo", repoId, "hash", commit.Hash)
75
76 // check if (did,oid) is already indexed
77 indexed, err := db.IsLanguageIndexed(ctx, e, repoId, commit.Hash)
78 if err != nil {
79 l.Error("failed to query langs", "err", err)
80 indexed = false
81 // continue
82 }
83 if indexed {
84 return nil
85 }
86
87 langs, err := indexer.IndexLanguages(ctx, repoId, commit.Hash)
88 if err != nil {
89 return fmt.Errorf("indexing langs: %w", err)
90 }
91
92 l.Info("pre-indexed language stats", "duration", time.Since(start))
93
94 if err := db.InsertLanguages(ctx, e, repoId, commit.Hash, langs); err != nil {
95 return fmt.Errorf("failed to insert langs into db: %w", err)
96 }
97
98 return nil
99 },
100 )
101}
102
103func (i *Indexer) repoPath(repo syntax.DID) string {
104 return filepath.Join(i.cfg.GitRepoBasePath, repo.String())
105}
106
107// IndexLanguages index the repository language stats at given commit
108func (i *Indexer) IndexLanguages(ctx context.Context, repoId syntax.DID, commitId plumbing.Hash) (map[string]int64, error) {
109 if i.rdb != nil {
110 if val, err := i.rdb.Get(ctx, fmt.Sprintf(langIndexRepoCommit, repoId, commitId.String())).Result(); err == nil {
111 i.logger.Debug("serve from cache")
112 var sizes map[string]int64
113 if err := json.Unmarshal([]byte(val), &sizes); err == nil {
114 return sizes, nil
115 }
116 }
117 }
118
119 sizes, err := IndexLanguagesInner(ctx, i.repoPath(repoId), commitId)
120 if err != nil {
121 return nil, err
122 }
123
124 if i.rdb != nil {
125 if encoded, err := json.Marshal(sizes); err == nil {
126 i.logger.Debug("cache language")
127 if err := i.rdb.Set(ctx,
128 fmt.Sprintf(langIndexRepoCommit, repoId, commitId.String()),
129 encoded,
130 langIndexRepoCommitTTL,
131 ).Err(); err != nil {
132 i.logger.Error("failed to cache languages", "err", err)
133 }
134 }
135 }
136 return sizes, nil
137}
138
139func IndexLanguagesInner(ctx context.Context, repoPath string, commitId plumbing.Hash) (map[string]int64, error) {
140 tree, err := gitea.GetTree(ctx, repoPath, commitId.String()+"^{tree}")
141 if err != nil {
142 return nil, err
143 }
144
145 bw, br, close := gitea.CatFileBatch(ctx, repoPath)
146 defer close()
147
148 sizes, err := batchAnalyzeTree(ctx, bw, br, tree)
149 if err != nil {
150 return nil, err
151 }
152 return sizes, nil
153}
154
155func batchAnalyzeTree(ctx context.Context, bw io.WriteCloser, br *bufio.Reader, tree *object.Tree) (map[string]int64, error) {
156 sizes := make(map[string]int64)
157 for _, entry := range tree.Entries {
158 select {
159 case <-ctx.Done():
160 return nil, ctx.Err()
161 default:
162 }
163
164 switch entry.Mode {
165 case filemode.Dir:
166 subTree, err := gitea.BatchGetTree(bw, br, entry.Hash.String())
167 if err != nil {
168 return nil, err
169 }
170 subTreeSizes, err := batchAnalyzeTree(ctx, bw, br, subTree)
171 if err != nil {
172 return nil, err
173 }
174 for name, size := range subTreeSizes {
175 sizes[name] += size
176 }
177 case filemode.Symlink, filemode.Submodule:
178 // skip symlink/submodule
179 default:
180 _, err := bw.Write([]byte(entry.Hash.String() + "\n"))
181 if err != nil {
182 return nil, err
183 }
184 _, _, size, err := gitea.ReadBatchLine(br)
185 if err != nil {
186 return nil, err
187 }
188 // skip large file
189 if size > bigFileSize {
190 if err := gitea.DiscardFull(br, size+1); err != nil {
191 return nil, err
192 }
193 continue
194 }
195
196 sizeToRead := size
197 discard := int64(1)
198 if size > fileSizeLimit {
199 sizeToRead = fileSizeLimit
200 discard = size - fileSizeLimit + 1
201 }
202 content, err := io.ReadAll(io.LimitReader(br, sizeToRead))
203 if err != nil {
204 return nil, err
205 }
206 if err := gitea.DiscardFull(br, discard); err != nil {
207 return nil, err
208 }
209
210 language, noskip := analyzeLanguage(entry.Name, content)
211 if noskip {
212 sizes[language] += size
213 }
214 }
215 }
216 return sizes, nil
217}
218
219func analyzeLanguage(fileName string, content []byte) (string, bool) {
220 // skip generated file
221 // TODO: follow gitattributes, lazyily read content (filter by filename first)
222 if enry.IsGenerated(fileName, content) || enry.IsBinary(content) || strings.HasSuffix(fileName, "bun.lock") {
223 return "", false
224 }
225
226 language := func(fileName string, content []byte) string {
227 language, ok := enry.GetLanguageByExtension(fileName)
228 if ok {
229 return language
230 }
231 language, ok = enry.GetLanguageByFilename(fileName)
232 if ok {
233 return language
234 }
235 if len(content) == 0 {
236 return enry.OtherLanguage
237 }
238 return enry.GetLanguage(fileName, content)
239 }(fileName, content)
240 if group := enry.GetLanguageGroup(language); group != "" {
241 language = group
242 }
243
244 langType := enry.GetLanguageType(language)
245 if langType != enry.Programming && langType != enry.Markup && langType != enry.Unknown {
246 return "", false
247 }
248 return language, true
249}