Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package repoindexer 2 3import ( 4 "bufio" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "path/filepath" 12 "strings" 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/go-enry/go-enry/v2" 17 "github.com/go-git/go-git/v5/plumbing" 18 "github.com/go-git/go-git/v5/plumbing/filemode" 19 "github.com/go-git/go-git/v5/plumbing/object" 20 "github.com/redis/go-redis/v9" 21 "tangled.org/core/knotmirror/config" 22 "tangled.org/core/knotmirror/db" 23 "tangled.org/core/knotmirror/knotstream" 24 "tangled.org/core/knotmirror/xrpc/gitea" 25 "tangled.org/core/log" 26) 27 28const ( 29 fileSizeLimit = 16 * 1024 // read up to 16 KiB for language detection 30 bigFileSize = 1024 * 1024 // skip content read for blobs over 1 MiB 31 langIndexRepoCommit = "lang_index:%s:%s" // lang_index:{did}:{oid} 32 langIndexRepoCommitTTL = 30 * 24 * time.Hour 33) 34 35// Language indexing strategy: 36// 37// git.refUpdate HEAD -> store to db 38// git.refUpdate other -> on-demand calculation, cache 39// 40// NOTE: currently all event we have is git.refUpdate, so background indexing 41// job will always be triggered. 42// TODO(boltless): don't queue indexing job on "sync" type event while repo is 43// not active. 44 45type Indexer struct { 46 logger *slog.Logger 47 cfg *config.Config 48 rdb *redis.Client 49} 50 51func NewIndexer(l *slog.Logger, cfg *config.Config, rdb *redis.Client) *Indexer { 52 indexer := &Indexer{ 53 logger: log.SubLogger(l, "indexer"), 54 cfg: cfg, 55 rdb: rdb, 56 } 57 return indexer 58} 59 60func NewBackgroundIndexScheduler(l *slog.Logger, cfg *config.Config, e *sql.DB, indexer *Indexer) *knotstream.ParallelScheduler { 61 return knotstream.NewParallelScheduler( 62 4, 63 "repo_stats_update", // NOTE: this is unused 64 func(ctx context.Context, t *knotstream.Task) error { 65 start := time.Now() 66 repoId := syntax.DID(t.Key) 67 68 // resolve HEAD to commitId 69 commit, err := gitea.GetCommit(ctx, indexer.repoPath(repoId), "HEAD") 70 if err != nil { 71 return fmt.Errorf("failed to resolve HEAD: %w", err) 72 } 73 74 l := l.With("repo", repoId, "hash", commit.Hash) 75 76 // check if (did,oid) is already indexed 77 indexed, err := db.IsLanguageIndexed(ctx, e, repoId, commit.Hash) 78 if err != nil { 79 l.Error("failed to query langs", "err", err) 80 indexed = false 81 // continue 82 } 83 if indexed { 84 return nil 85 } 86 87 langs, err := indexer.IndexLanguages(ctx, repoId, commit.Hash) 88 if err != nil { 89 return fmt.Errorf("indexing langs: %w", err) 90 } 91 92 l.Info("pre-indexed language stats", "duration", time.Since(start)) 93 94 if err := db.InsertLanguages(ctx, e, repoId, commit.Hash, langs); err != nil { 95 return fmt.Errorf("failed to insert langs into db: %w", err) 96 } 97 98 return nil 99 }, 100 ) 101} 102 103func (i *Indexer) repoPath(repo syntax.DID) string { 104 return filepath.Join(i.cfg.GitRepoBasePath, repo.String()) 105} 106 107// IndexLanguages index the repository language stats at given commit 108func (i *Indexer) IndexLanguages(ctx context.Context, repoId syntax.DID, commitId plumbing.Hash) (map[string]int64, error) { 109 if i.rdb != nil { 110 if val, err := i.rdb.Get(ctx, fmt.Sprintf(langIndexRepoCommit, repoId, commitId.String())).Result(); err == nil { 111 i.logger.Debug("serve from cache") 112 var sizes map[string]int64 113 if err := json.Unmarshal([]byte(val), &sizes); err == nil { 114 return sizes, nil 115 } 116 } 117 } 118 119 sizes, err := IndexLanguagesInner(ctx, i.repoPath(repoId), commitId) 120 if err != nil { 121 return nil, err 122 } 123 124 if i.rdb != nil { 125 if encoded, err := json.Marshal(sizes); err == nil { 126 i.logger.Debug("cache language") 127 if err := i.rdb.Set(ctx, 128 fmt.Sprintf(langIndexRepoCommit, repoId, commitId.String()), 129 encoded, 130 langIndexRepoCommitTTL, 131 ).Err(); err != nil { 132 i.logger.Error("failed to cache languages", "err", err) 133 } 134 } 135 } 136 return sizes, nil 137} 138 139func IndexLanguagesInner(ctx context.Context, repoPath string, commitId plumbing.Hash) (map[string]int64, error) { 140 tree, err := gitea.GetTree(ctx, repoPath, commitId.String()+"^{tree}") 141 if err != nil { 142 return nil, err 143 } 144 145 bw, br, close := gitea.CatFileBatch(ctx, repoPath) 146 defer close() 147 148 sizes, err := batchAnalyzeTree(ctx, bw, br, tree) 149 if err != nil { 150 return nil, err 151 } 152 return sizes, nil 153} 154 155func batchAnalyzeTree(ctx context.Context, bw io.WriteCloser, br *bufio.Reader, tree *object.Tree) (map[string]int64, error) { 156 sizes := make(map[string]int64) 157 for _, entry := range tree.Entries { 158 select { 159 case <-ctx.Done(): 160 return nil, ctx.Err() 161 default: 162 } 163 164 switch entry.Mode { 165 case filemode.Dir: 166 subTree, err := gitea.BatchGetTree(bw, br, entry.Hash.String()) 167 if err != nil { 168 return nil, err 169 } 170 subTreeSizes, err := batchAnalyzeTree(ctx, bw, br, subTree) 171 if err != nil { 172 return nil, err 173 } 174 for name, size := range subTreeSizes { 175 sizes[name] += size 176 } 177 case filemode.Symlink, filemode.Submodule: 178 // skip symlink/submodule 179 default: 180 _, err := bw.Write([]byte(entry.Hash.String() + "\n")) 181 if err != nil { 182 return nil, err 183 } 184 _, _, size, err := gitea.ReadBatchLine(br) 185 if err != nil { 186 return nil, err 187 } 188 // skip large file 189 if size > bigFileSize { 190 if err := gitea.DiscardFull(br, size+1); err != nil { 191 return nil, err 192 } 193 continue 194 } 195 196 sizeToRead := size 197 discard := int64(1) 198 if size > fileSizeLimit { 199 sizeToRead = fileSizeLimit 200 discard = size - fileSizeLimit + 1 201 } 202 content, err := io.ReadAll(io.LimitReader(br, sizeToRead)) 203 if err != nil { 204 return nil, err 205 } 206 if err := gitea.DiscardFull(br, discard); err != nil { 207 return nil, err 208 } 209 210 language, noskip := analyzeLanguage(entry.Name, content) 211 if noskip { 212 sizes[language] += size 213 } 214 } 215 } 216 return sizes, nil 217} 218 219func analyzeLanguage(fileName string, content []byte) (string, bool) { 220 // skip generated file 221 // TODO: follow gitattributes, lazyily read content (filter by filename first) 222 if enry.IsGenerated(fileName, content) || enry.IsBinary(content) || strings.HasSuffix(fileName, "bun.lock") { 223 return "", false 224 } 225 226 language := func(fileName string, content []byte) string { 227 language, ok := enry.GetLanguageByExtension(fileName) 228 if ok { 229 return language 230 } 231 language, ok = enry.GetLanguageByFilename(fileName) 232 if ok { 233 return language 234 } 235 if len(content) == 0 { 236 return enry.OtherLanguage 237 } 238 return enry.GetLanguage(fileName, content) 239 }(fileName, content) 240 if group := enry.GetLanguageGroup(language); group != "" { 241 language = group 242 } 243 244 langType := enry.GetLanguageType(language) 245 if langType != enry.Programming && langType != enry.Markup && langType != enry.Unknown { 246 return "", false 247 } 248 return language, true 249}