Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

at master 2.0 kB View raw
1package xrpc 2 3import ( 4 "net/http" 5 "strings" 6 "sync" 7 "sync/atomic" 8 "time" 9) 10 11type InflightEntry struct { 12 ID uint64 `json:"id"` 13 Method string `json:"method"` 14 Path string `json:"path"` 15 RawQuery string `json:"raw_query,omitempty"` 16 URL string `json:"url"` 17 Repo string `json:"repo,omitempty"` 18 RemoteAddr string `json:"remote_addr,omitempty"` 19 StartedAt time.Time `json:"started_at"` 20 DurationMs int64 `json:"duration_ms"` 21} 22 23type inflightTracker struct { 24 mu sync.Mutex 25 nextID atomic.Uint64 26 active map[uint64]*InflightEntry 27} 28 29func newInflightTracker() *inflightTracker { 30 return &inflightTracker{active: make(map[uint64]*InflightEntry)} 31} 32 33func (t *inflightTracker) add(r *http.Request) uint64 { 34 id := t.nextID.Add(1) 35 q := r.URL.Query() 36 entry := &InflightEntry{ 37 ID: id, 38 Method: r.Method, 39 Path: r.URL.Path, 40 RawQuery: r.URL.RawQuery, 41 URL: r.URL.RequestURI(), 42 Repo: q.Get("repo"), 43 RemoteAddr: clientAddr(r), 44 StartedAt: time.Now(), 45 } 46 t.mu.Lock() 47 t.active[id] = entry 48 t.mu.Unlock() 49 return id 50} 51 52func (t *inflightTracker) remove(id uint64) { 53 t.mu.Lock() 54 delete(t.active, id) 55 t.mu.Unlock() 56} 57 58func (t *inflightTracker) snapshot() []InflightEntry { 59 t.mu.Lock() 60 defer t.mu.Unlock() 61 out := make([]InflightEntry, 0, len(t.active)) 62 now := time.Now() 63 for _, e := range t.active { 64 cp := *e 65 cp.DurationMs = now.Sub(e.StartedAt).Milliseconds() 66 out = append(out, cp) 67 } 68 return out 69} 70 71func clientAddr(r *http.Request) string { 72 if xff := r.Header.Get("X-Forwarded-For"); xff != "" { 73 if i := strings.IndexByte(xff, ','); i >= 0 { 74 return strings.TrimSpace(xff[:i]) 75 } 76 return strings.TrimSpace(xff) 77 } 78 return r.RemoteAddr 79} 80 81func (t *inflightTracker) middleware(next http.Handler) http.Handler { 82 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 83 id := t.add(r) 84 defer t.remove(id) 85 next.ServeHTTP(w, r) 86 }) 87} 88 89func (x *Xrpc) Inflight() []InflightEntry { 90 return x.inflight.snapshot() 91}