Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

knotmirror: add inflight api

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
committer
Tangled
date (May 12, 2026, 11:47 AM +0300) commit 29583954 parent 0cf5cbe5 change-id rqpntlsp
+127 -17
+14 -1
knotmirror/adminpage.go
··· 3 3 import ( 4 4 "database/sql" 5 5 "embed" 6 + "encoding/json" 6 7 "fmt" 7 8 "html" 8 9 "html/template" ··· 16 17 "tangled.org/core/appview/pagination" 17 18 "tangled.org/core/knotmirror/db" 18 19 "tangled.org/core/knotmirror/models" 20 + "tangled.org/core/knotmirror/xrpc" 19 21 ) 20 22 21 23 //go:embed templates/*.html ··· 26 28 type AdminServer struct { 27 29 db *sql.DB 28 30 resyncer *Resyncer 31 + xrpc *xrpc.Xrpc 29 32 logger *slog.Logger 30 33 } 31 34 32 - func NewAdminServer(l *slog.Logger, database *sql.DB, resyncer *Resyncer) *AdminServer { 35 + func NewAdminServer(l *slog.Logger, database *sql.DB, resyncer *Resyncer, x *xrpc.Xrpc) *AdminServer { 33 36 return &AdminServer{ 34 37 db: database, 35 38 resyncer: resyncer, 39 + xrpc: x, 36 40 logger: l, 37 41 } 38 42 } ··· 44 48 45 49 r.Post("/api/triggerRepoResync", s.handleRepoResyncTrigger()) 46 50 r.Post("/api/cancelRepoResync", s.handleRepoResyncCancel()) 51 + r.Get("/api/inflight", s.handleInflight()) 47 52 return r 53 + } 54 + 55 + func (s *AdminServer) handleInflight() http.HandlerFunc { 56 + return func(w http.ResponseWriter, r *http.Request) { 57 + entries := s.xrpc.Inflight() 58 + w.Header().Set("Content-Type", "application/json") 59 + _ = json.NewEncoder(w).Encode(entries) 60 + } 48 61 } 49 62 50 63 func funcmap() template.FuncMap {
+1 -1
knotmirror/knotmirror.go
··· 55 55 knotstream := knotstream.NewKnotStream(logger, db, cfg) 56 56 crawler := NewCrawler(logger, db) 57 57 resyncer := NewResyncer(logger, db, gitm, cfg) 58 - adminpage := NewAdminServer(logger, db, resyncer) 59 58 xrpc := xrpc.New(logger, cfg, db, rdb, resolver, knotstream) 59 + adminpage := NewAdminServer(logger, db, resyncer, xrpc) 60 60 61 61 // maintain repository list with tap 62 62 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+91
knotmirror/xrpc/inflight.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "net/http" 5 + "strings" 6 + "sync" 7 + "sync/atomic" 8 + "time" 9 + ) 10 + 11 + type InflightEntry struct { 12 + ID uint64 `json:"id"` 13 + Method string `json:"method"` 14 + Path string `json:"path"` 15 + RawQuery string `json:"raw_query,omitempty"` 16 + URL string `json:"url"` 17 + Repo string `json:"repo,omitempty"` 18 + RemoteAddr string `json:"remote_addr,omitempty"` 19 + StartedAt time.Time `json:"started_at"` 20 + DurationMs int64 `json:"duration_ms"` 21 + } 22 + 23 + type inflightTracker struct { 24 + mu sync.Mutex 25 + nextID atomic.Uint64 26 + active map[uint64]*InflightEntry 27 + } 28 + 29 + func newInflightTracker() *inflightTracker { 30 + return &inflightTracker{active: make(map[uint64]*InflightEntry)} 31 + } 32 + 33 + func (t *inflightTracker) add(r *http.Request) uint64 { 34 + id := t.nextID.Add(1) 35 + q := r.URL.Query() 36 + entry := &InflightEntry{ 37 + ID: id, 38 + Method: r.Method, 39 + Path: r.URL.Path, 40 + RawQuery: r.URL.RawQuery, 41 + URL: r.URL.RequestURI(), 42 + Repo: q.Get("repo"), 43 + RemoteAddr: clientAddr(r), 44 + StartedAt: time.Now(), 45 + } 46 + t.mu.Lock() 47 + t.active[id] = entry 48 + t.mu.Unlock() 49 + return id 50 + } 51 + 52 + func (t *inflightTracker) remove(id uint64) { 53 + t.mu.Lock() 54 + delete(t.active, id) 55 + t.mu.Unlock() 56 + } 57 + 58 + func (t *inflightTracker) snapshot() []InflightEntry { 59 + t.mu.Lock() 60 + defer t.mu.Unlock() 61 + out := make([]InflightEntry, 0, len(t.active)) 62 + now := time.Now() 63 + for _, e := range t.active { 64 + cp := *e 65 + cp.DurationMs = now.Sub(e.StartedAt).Milliseconds() 66 + out = append(out, cp) 67 + } 68 + return out 69 + } 70 + 71 + func clientAddr(r *http.Request) string { 72 + if xff := r.Header.Get("X-Forwarded-For"); xff != "" { 73 + if i := strings.IndexByte(xff, ','); i >= 0 { 74 + return strings.TrimSpace(xff[:i]) 75 + } 76 + return strings.TrimSpace(xff) 77 + } 78 + return r.RemoteAddr 79 + } 80 + 81 + func (t *inflightTracker) middleware(next http.Handler) http.Handler { 82 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 83 + id := t.add(r) 84 + defer t.remove(id) 85 + next.ServeHTTP(w, r) 86 + }) 87 + } 88 + 89 + func (x *Xrpc) Inflight() []InflightEntry { 90 + return x.inflight.snapshot() 91 + }
+21 -15
knotmirror/xrpc/xrpc.go
··· 26 26 ks *knotstream.KnotStream 27 27 logger *slog.Logger 28 28 httpClient *http.Client 29 + inflight *inflightTracker 29 30 } 30 31 31 32 func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, rdb *redis.Client, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { ··· 39 40 httpClient: &http.Client{ 40 41 Timeout: 30 * time.Second, 41 42 }, 43 + inflight: newInflightTracker(), 42 44 } 43 45 } 44 46 45 47 func (x *Xrpc) Router() http.Handler { 46 48 r := chi.NewRouter() 47 49 48 - r.Get("/"+tangled.GitTempGetArchiveNSID, x.GetArchive) 49 - r.Get("/"+tangled.GitTempGetBlobNSID, x.GetBlob) 50 - r.Get("/"+tangled.GitTempGetBranchNSID, x.GetBranch) 51 - // r.Get("/"+tangled.GitTempGetCommitNSID, x.GetCommit) // todo 52 - // r.Get("/"+tangled.GitTempGetDiffNSID, x.GetDiff) // todo 53 - // r.Get("/"+tangled.GitTempGetEntityNSID, x.GetEntity) // todo 54 - // r.Get("/"+tangled.GitTempGetHeadNSID, x.GetHead) // todo 55 - r.Get("/"+tangled.GitTempGetTagNSID, x.GetTag) // using types.Response 56 - r.Get("/"+tangled.GitTempGetTreeNSID, x.GetTree) 57 - r.Get("/"+tangled.GitTempListBranchesNSID, x.ListBranches) // wip, unknown output 58 - r.Get("/"+tangled.GitTempListCommitsNSID, x.ListCommits) 59 - r.Get("/"+tangled.GitTempListLanguagesNSID, x.ListLanguages) 60 - r.Get("/"+tangled.GitTempListTagsNSID, x.ListTags) 61 - r.Get("/"+tangled.RepoBlobNSID, x.RepoBlob) 62 - r.Post("/"+tangled.SyncRequestCrawlNSID, x.RequestCrawl) 50 + r.Group(func(r chi.Router) { 51 + r.Use(x.inflight.middleware) 52 + 53 + r.Get("/"+tangled.GitTempGetArchiveNSID, x.GetArchive) 54 + r.Get("/"+tangled.GitTempGetBlobNSID, x.GetBlob) 55 + r.Get("/"+tangled.GitTempGetBranchNSID, x.GetBranch) 56 + // r.Get("/"+tangled.GitTempGetCommitNSID, x.GetCommit) // todo 57 + // r.Get("/"+tangled.GitTempGetDiffNSID, x.GetDiff) // todo 58 + // r.Get("/"+tangled.GitTempGetEntityNSID, x.GetEntity) // todo 59 + // r.Get("/"+tangled.GitTempGetHeadNSID, x.GetHead) // todo 60 + r.Get("/"+tangled.GitTempGetTagNSID, x.GetTag) // using types.Response 61 + r.Get("/"+tangled.GitTempGetTreeNSID, x.GetTree) 62 + r.Get("/"+tangled.GitTempListBranchesNSID, x.ListBranches) // wip, unknown output 63 + r.Get("/"+tangled.GitTempListCommitsNSID, x.ListCommits) 64 + r.Get("/"+tangled.GitTempListLanguagesNSID, x.ListLanguages) 65 + r.Get("/"+tangled.GitTempListTagsNSID, x.ListTags) 66 + r.Get("/"+tangled.RepoBlobNSID, x.RepoBlob) 67 + r.Post("/"+tangled.SyncRequestCrawlNSID, x.RequestCrawl) 68 + }) 63 69 64 70 return r 65 71 }