Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview/knotacl: knot acl client & cache

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 8, 2026, 4:18 PM +0300) commit 7e1a1e76 parent 2bbbfdde change-id zqsmlyzu
+318
+181
appview/knotacl/cache.go
··· 1 + package knotacl 2 + 3 + import ( 4 + "context" 5 + "maps" 6 + "net/http" 7 + "slices" 8 + "sync" 9 + "time" 10 + 11 + "golang.org/x/sync/singleflight" 12 + ) 13 + 14 + const ( 15 + cacheTTL = 15 * time.Second 16 + cacheMaxEntries = 4096 17 + ) 18 + 19 + type lister interface { 20 + GetKnotMembers(ctx context.Context, host string) ([]string, error) 21 + GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) 22 + } 23 + 24 + type cacheEntry struct { 25 + subjects []string 26 + storedAt time.Time 27 + } 28 + 29 + type cache struct { 30 + inner lister 31 + ttl time.Duration 32 + now func() time.Time 33 + 34 + mu sync.Mutex 35 + entries map[string]cacheEntry 36 + group singleflight.Group 37 + } 38 + 39 + func newCache(inner lister, ttl time.Duration, now func() time.Time) *cache { 40 + if now == nil { 41 + now = time.Now 42 + } 43 + return &cache{inner: inner, ttl: ttl, now: now, entries: map[string]cacheEntry{}} 44 + } 45 + 46 + func (c *cache) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 47 + return c.fetch(ctx, memberCacheKey(host), func() ([]string, error) { 48 + return c.inner.GetKnotMembers(ctx, host) 49 + }) 50 + } 51 + 52 + func (c *cache) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 53 + return c.fetch(ctx, collabCacheKey(host, repoDid), func() ([]string, error) { 54 + return c.inner.GetRepoCollaborators(ctx, host, repoDid) 55 + }) 56 + } 57 + 58 + func memberCacheKey(host string) string { return "m\x00" + host } 59 + 60 + func collabCacheKey(host, repoDid string) string { return "c\x00" + host + "\x00" + repoDid } 61 + 62 + func (c *cache) InvalidateMembers(host string) { 63 + c.forget(memberCacheKey(host)) 64 + } 65 + 66 + func (c *cache) InvalidateCollaborators(host, repoDid string) { 67 + c.forget(collabCacheKey(host, repoDid)) 68 + } 69 + 70 + func (c *cache) forget(key string) { 71 + c.mu.Lock() 72 + defer c.mu.Unlock() 73 + delete(c.entries, key) 74 + } 75 + 76 + func (c *cache) fetch(ctx context.Context, key string, load func() ([]string, error)) ([]string, error) { 77 + if memo := memoFrom(ctx); memo != nil { 78 + if v, ok := memo.get(key); ok { 79 + return slices.Clone(v), nil 80 + } 81 + } 82 + v, err := c.load(key, load) 83 + if err != nil { 84 + return nil, err 85 + } 86 + if memo := memoFrom(ctx); memo != nil { 87 + memo.put(key, v) 88 + } 89 + return slices.Clone(v), nil 90 + } 91 + 92 + func (c *cache) load(key string, load func() ([]string, error)) ([]string, error) { 93 + if v, ok := c.lookup(key); ok { 94 + return v, nil 95 + } 96 + v, err, _ := c.group.Do(key, func() (any, error) { 97 + if v, ok := c.lookup(key); ok { 98 + return v, nil 99 + } 100 + fresh, err := load() 101 + if err != nil { 102 + return nil, err 103 + } 104 + c.store(key, fresh) 105 + return fresh, nil 106 + }) 107 + if err != nil { 108 + return nil, err 109 + } 110 + return v.([]string), nil 111 + } 112 + 113 + func (c *cache) lookup(key string) ([]string, bool) { 114 + c.mu.Lock() 115 + defer c.mu.Unlock() 116 + e, ok := c.entries[key] 117 + if !ok || c.now().Sub(e.storedAt) >= c.ttl { 118 + return nil, false 119 + } 120 + return e.subjects, true 121 + } 122 + 123 + func (c *cache) store(key string, subjects []string) { 124 + c.mu.Lock() 125 + defer c.mu.Unlock() 126 + if _, exists := c.entries[key]; !exists && len(c.entries) >= cacheMaxEntries { 127 + maps.DeleteFunc(c.entries, func(_ string, e cacheEntry) bool { 128 + return c.now().Sub(e.storedAt) >= c.ttl 129 + }) 130 + c.evictOldestLocked() 131 + } 132 + c.entries[key] = cacheEntry{subjects: subjects, storedAt: c.now()} 133 + } 134 + 135 + func (c *cache) evictOldestLocked() { 136 + oldestKey := "" 137 + var oldestAt time.Time 138 + for k, e := range c.entries { 139 + if oldestKey == "" || e.storedAt.Before(oldestAt) { 140 + oldestKey, oldestAt = k, e.storedAt 141 + } 142 + } 143 + if len(c.entries) >= cacheMaxEntries && oldestKey != "" { 144 + delete(c.entries, oldestKey) 145 + } 146 + } 147 + 148 + type requestMemo struct { 149 + mu sync.Mutex 150 + entries map[string][]string 151 + } 152 + 153 + type memoCtxKey struct{} 154 + 155 + func WithMemo(ctx context.Context) context.Context { 156 + return context.WithValue(ctx, memoCtxKey{}, &requestMemo{entries: map[string][]string{}}) 157 + } 158 + 159 + func memoFrom(ctx context.Context) *requestMemo { 160 + memo, _ := ctx.Value(memoCtxKey{}).(*requestMemo) 161 + return memo 162 + } 163 + 164 + func (m *requestMemo) get(key string) ([]string, bool) { 165 + m.mu.Lock() 166 + defer m.mu.Unlock() 167 + v, ok := m.entries[key] 168 + return v, ok 169 + } 170 + 171 + func (m *requestMemo) put(key string, subjects []string) { 172 + m.mu.Lock() 173 + defer m.mu.Unlock() 174 + m.entries[key] = subjects 175 + } 176 + 177 + func MemoMiddleware(next http.Handler) http.Handler { 178 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 179 + next.ServeHTTP(w, r.WithContext(WithMemo(r.Context()))) 180 + }) 181 + }
+134
appview/knotacl/client.go
··· 1 + package knotacl 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "slices" 9 + "time" 10 + 11 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 12 + "tangled.org/core/api/tangled" 13 + ) 14 + 15 + const ( 16 + listPageLimit = 1000 17 + maxListPages = 256 18 + requestTimeout = 5 * time.Second 19 + listDrainBudget = 30 * time.Second 20 + ) 21 + 22 + type Client struct { 23 + dev bool 24 + http *http.Client 25 + logger *slog.Logger 26 + } 27 + 28 + func NewClient(dev bool, logger *slog.Logger) *Client { 29 + return &Client{dev: dev, http: &http.Client{Timeout: requestTimeout}, logger: logger} 30 + } 31 + 32 + func (c *Client) xrpcClient(host string) *indigoxrpc.Client { 33 + scheme := "https" 34 + if c.dev { 35 + scheme = "http" 36 + } 37 + return &indigoxrpc.Client{ 38 + Host: fmt.Sprintf("%s://%s", scheme, host), 39 + Client: c.http, 40 + } 41 + } 42 + 43 + func (c *Client) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 44 + ctx, cancel := context.WithTimeout(ctx, listDrainBudget) 45 + defer cancel() 46 + 47 + xc := c.xrpcClient(host) 48 + subjects, truncated, err := drainList( 49 + "", 50 + make(map[string]struct{}), 51 + func(cursor string) ([]*tangled.KnotListMembers_ListItem, *string, error) { 52 + out, err := tangled.KnotListMembers(ctx, xc, cursor, listPageLimit, "", host) 53 + if err != nil { 54 + return nil, nil, err 55 + } 56 + return out.Items, out.Cursor, nil 57 + }, 58 + func(i *tangled.KnotListMembers_ListItem) string { return i.Subject }, 59 + ) 60 + if err != nil { 61 + return nil, err 62 + } 63 + if truncated { 64 + c.logger.Warn("knot member list truncated before draining all pages", "host", host, "limit", maxListPages) 65 + } 66 + return dedup(subjects), nil 67 + } 68 + 69 + func (c *Client) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 70 + ctx, cancel := context.WithTimeout(ctx, listDrainBudget) 71 + defer cancel() 72 + 73 + xc := c.xrpcClient(host) 74 + subjects, truncated, err := drainList( 75 + "", 76 + make(map[string]struct{}), 77 + func(cursor string) ([]*tangled.RepoListCollaborators_ListItem, *string, error) { 78 + out, err := tangled.RepoListCollaborators(ctx, xc, cursor, listPageLimit, "", repoDid) 79 + if err != nil { 80 + return nil, nil, err 81 + } 82 + return out.Items, out.Cursor, nil 83 + }, 84 + func(i *tangled.RepoListCollaborators_ListItem) string { return i.Subject }, 85 + ) 86 + if err != nil { 87 + return nil, err 88 + } 89 + if truncated { 90 + c.logger.Warn("repo collaborator list truncated before draining all pages", "host", host, "repoDid", repoDid, "limit", maxListPages) 91 + } 92 + return dedup(subjects), nil 93 + } 94 + 95 + func drainList[T any]( 96 + cursor string, 97 + seen map[string]struct{}, 98 + page func(cursor string) ([]*T, *string, error), 99 + subject func(*T) string, 100 + ) (subjects []string, truncated bool, err error) { 101 + if len(seen) >= maxListPages { 102 + return nil, true, nil 103 + } 104 + if _, repeated := seen[cursor]; repeated { 105 + return nil, true, nil 106 + } 107 + seen[cursor] = struct{}{} 108 + items, next, err := page(cursor) 109 + if err != nil { 110 + return nil, false, err 111 + } 112 + subjects = mapSlice(items, subject) 113 + if len(items) == 0 || next == nil || *next == "" { 114 + return subjects, false, nil 115 + } 116 + rest, truncated, err := drainList(*next, seen, page, subject) 117 + if err != nil { 118 + return nil, false, err 119 + } 120 + return append(subjects, rest...), truncated, nil 121 + } 122 + 123 + func dedup(subjects []string) []string { 124 + slices.Sort(subjects) 125 + return slices.Compact(subjects) 126 + } 127 + 128 + func mapSlice[T, U any](items []T, f func(T) U) []U { 129 + out := make([]U, len(items)) 130 + for i, it := range items { 131 + out[i] = f(it) 132 + } 133 + return out 134 + }
+3
appview/xrpcclient/xrpc.go
··· 10 10 var ( 11 11 ErrXrpcUnsupported = errors.New("xrpc not supported on this knot") 12 12 ErrXrpcUnauthorized = errors.New("unauthorized xrpc request") 13 + ErrXrpcForbidden = errors.New("forbidden xrpc request") 13 14 ErrXrpcFailed = errors.New("xrpc request failed") 14 15 ErrXrpcInvalid = errors.New("invalid xrpc request") 15 16 ) ··· 30 31 return ErrXrpcUnsupported 31 32 case http.StatusUnauthorized: 32 33 return ErrXrpcUnauthorized 34 + case http.StatusForbidden: 35 + return ErrXrpcForbidden 33 36 default: 34 37 return ErrXrpcFailed 35 38 }