Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview/knotacl: replace cache poll w/ event-driven roster

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 16, 2026, 9:04 PM +0300) commit 929c6330 parent f8c6b8c6 change-id vussqxwt
+987 -368
+22
appview/db/db.go
··· 2218 2218 return err 2219 2219 }) 2220 2220 2221 + orm.RunMigration(conn, logger, "add-knotacl-sync-table", func(tx *sql.Tx) error { 2222 + _, err := tx.Exec(` 2223 + create table if not exists knotacl_sync ( 2224 + scope_key text primary key, 2225 + synced_at text not null 2226 + ); 2227 + `) 2228 + return err 2229 + }) 2230 + 2231 + orm.RunMigration(conn, logger, "add-knotacl-delta-cursor-table", func(tx *sql.Tx) error { 2232 + _, err := tx.Exec(` 2233 + create table if not exists knotacl_delta_cursor ( 2234 + scope_key text not null, 2235 + subject text not null, 2236 + cursor integer not null, 2237 + primary key (scope_key, subject) 2238 + ); 2239 + `) 2240 + return err 2241 + }) 2242 + 2221 2243 return &DB{ 2222 2244 db, 2223 2245 logger,
-134
appview/knotacl/cache.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "maps" 6 5 "net/http" 7 - "slices" 8 6 "sync" 9 - "time" 10 - 11 - "golang.org/x/sync/singleflight" 12 - ) 13 - 14 - const ( 15 - cacheTTL = 15 * time.Second 16 - cacheMaxEntries = 4096 17 7 ) 18 8 19 9 type lister interface { 20 10 GetKnotMembers(ctx context.Context, host string) ([]string, error) 21 11 GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) 22 - } 23 - 24 - type cacheEntry struct { 25 - subjects []string 26 - storedAt time.Time 27 - } 28 - 29 - type cache struct { 30 - inner lister 31 - ttl time.Duration 32 - now func() time.Time 33 - 34 - mu sync.Mutex 35 - entries map[string]cacheEntry 36 - group singleflight.Group 37 - } 38 - 39 - func newCache(inner lister, ttl time.Duration, now func() time.Time) *cache { 40 - if now == nil { 41 - now = time.Now 42 - } 43 - return &cache{inner: inner, ttl: ttl, now: now, entries: map[string]cacheEntry{}} 44 - } 45 - 46 - func (c *cache) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 47 - return c.fetch(ctx, memberCacheKey(host), func() ([]string, error) { 48 - return c.inner.GetKnotMembers(ctx, host) 49 - }) 50 - } 51 - 52 - func (c *cache) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 53 - return c.fetch(ctx, collabCacheKey(host, repoDid), func() ([]string, error) { 54 - return c.inner.GetRepoCollaborators(ctx, host, repoDid) 55 - }) 56 - } 57 - 58 - func memberCacheKey(host string) string { return "m\x00" + host } 59 - 60 - func collabCacheKey(host, repoDid string) string { return "c\x00" + host + "\x00" + repoDid } 61 - 62 - func (c *cache) InvalidateMembers(host string) { 63 - c.forget(memberCacheKey(host)) 64 - } 65 - 66 - func (c *cache) InvalidateCollaborators(host, repoDid string) { 67 - c.forget(collabCacheKey(host, repoDid)) 68 - } 69 - 70 - func (c *cache) forget(key string) { 71 - c.mu.Lock() 72 - defer c.mu.Unlock() 73 - delete(c.entries, key) 74 - } 75 - 76 - func (c *cache) fetch(ctx context.Context, key string, load func() ([]string, error)) ([]string, error) { 77 - if memo := memoFrom(ctx); memo != nil { 78 - if v, ok := memo.get(key); ok { 79 - return slices.Clone(v), nil 80 - } 81 - } 82 - v, err := c.load(key, load) 83 - if err != nil { 84 - return nil, err 85 - } 86 - if memo := memoFrom(ctx); memo != nil { 87 - memo.put(key, v) 88 - } 89 - return slices.Clone(v), nil 90 - } 91 - 92 - func (c *cache) load(key string, load func() ([]string, error)) ([]string, error) { 93 - if v, ok := c.lookup(key); ok { 94 - return v, nil 95 - } 96 - v, err, _ := c.group.Do(key, func() (any, error) { 97 - if v, ok := c.lookup(key); ok { 98 - return v, nil 99 - } 100 - fresh, err := load() 101 - if err != nil { 102 - return nil, err 103 - } 104 - c.store(key, fresh) 105 - return fresh, nil 106 - }) 107 - if err != nil { 108 - return nil, err 109 - } 110 - return v.([]string), nil 111 - } 112 - 113 - func (c *cache) lookup(key string) ([]string, bool) { 114 - c.mu.Lock() 115 - defer c.mu.Unlock() 116 - e, ok := c.entries[key] 117 - if !ok || c.now().Sub(e.storedAt) >= c.ttl { 118 - return nil, false 119 - } 120 - return e.subjects, true 121 - } 122 - 123 - func (c *cache) store(key string, subjects []string) { 124 - c.mu.Lock() 125 - defer c.mu.Unlock() 126 - if _, exists := c.entries[key]; !exists && len(c.entries) >= cacheMaxEntries { 127 - maps.DeleteFunc(c.entries, func(_ string, e cacheEntry) bool { 128 - return c.now().Sub(e.storedAt) >= c.ttl 129 - }) 130 - c.evictOldestLocked() 131 - } 132 - c.entries[key] = cacheEntry{subjects: subjects, storedAt: c.now()} 133 - } 134 - 135 - func (c *cache) evictOldestLocked() { 136 - oldestKey := "" 137 - var oldestAt time.Time 138 - for k, e := range c.entries { 139 - if oldestKey == "" || e.storedAt.Before(oldestAt) { 140 - oldestKey, oldestAt = k, e.storedAt 141 - } 142 - } 143 - if len(c.entries) >= cacheMaxEntries && oldestKey != "" { 144 - delete(c.entries, oldestKey) 145 - } 146 12 } 147 13 148 14 type requestMemo struct {
-226
appview/knotacl/cache_test.go
··· 1 - package knotacl 2 - 3 - import ( 4 - "context" 5 - "errors" 6 - "fmt" 7 - "slices" 8 - "sync" 9 - "testing" 10 - "time" 11 - ) 12 - 13 - var cacheTestBase = time.Unix(1700000000, 0) 14 - 15 - type fakeLister struct { 16 - mu sync.Mutex 17 - memberCalls int 18 - members []string 19 - err error 20 - started chan struct{} 21 - block chan struct{} 22 - } 23 - 24 - func (f *fakeLister) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 25 - f.mu.Lock() 26 - f.memberCalls++ 27 - members, err, started, block := f.members, f.err, f.started, f.block 28 - f.mu.Unlock() 29 - if started != nil { 30 - close(started) 31 - } 32 - if block != nil { 33 - <-block 34 - } 35 - if err != nil { 36 - return nil, err 37 - } 38 - return members, nil 39 - } 40 - 41 - func (f *fakeLister) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 42 - f.mu.Lock() 43 - defer f.mu.Unlock() 44 - return f.members, f.err 45 - } 46 - 47 - func (f *fakeLister) calls() int { 48 - f.mu.Lock() 49 - defer f.mu.Unlock() 50 - return f.memberCalls 51 - } 52 - 53 - func (f *fakeLister) set(members []string, err error) { 54 - f.mu.Lock() 55 - defer f.mu.Unlock() 56 - f.members, f.err = members, err 57 - } 58 - 59 - type fakeClock struct { 60 - mu sync.Mutex 61 - t time.Time 62 - } 63 - 64 - func (c *fakeClock) now() time.Time { 65 - c.mu.Lock() 66 - defer c.mu.Unlock() 67 - return c.t 68 - } 69 - 70 - func (c *fakeClock) advance(d time.Duration) { 71 - c.mu.Lock() 72 - defer c.mu.Unlock() 73 - c.t = c.t.Add(d) 74 - } 75 - 76 - func TestCache_TTLCollapsesThenExpires(t *testing.T) { 77 - clk := &fakeClock{t: cacheTestBase} 78 - f := &fakeLister{members: []string{"did:plc:boltless"}} 79 - c := newCache(f, cacheTTL, clk.now) 80 - ctx := context.Background() 81 - 82 - if _, err := c.GetKnotMembers(ctx, "knot.nel.pet"); err != nil { 83 - t.Fatal(err) 84 - } 85 - if _, err := c.GetKnotMembers(ctx, "knot.nel.pet"); err != nil { 86 - t.Fatal(err) 87 - } 88 - if f.calls() != 1 { 89 - t.Errorf("memberCalls=%d, want 1 within the TTL window", f.calls()) 90 - } 91 - 92 - clk.advance(cacheTTL) 93 - if _, err := c.GetKnotMembers(ctx, "knot.nel.pet"); err != nil { 94 - t.Fatal(err) 95 - } 96 - if f.calls() != 2 { 97 - t.Errorf("memberCalls=%d, want 2 once the entry expired", f.calls()) 98 - } 99 - } 100 - 101 - func TestCache_ErrorsNotCached(t *testing.T) { 102 - clk := &fakeClock{t: cacheTestBase} 103 - f := &fakeLister{err: errors.New("knot unreachable")} 104 - c := newCache(f, cacheTTL, clk.now) 105 - ctx := context.Background() 106 - 107 - if _, err := c.GetKnotMembers(ctx, "knot.nel.pet"); err == nil { 108 - t.Fatal("want error on the first call") 109 - } 110 - f.set([]string{"did:plc:boltless"}, nil) 111 - got, err := c.GetKnotMembers(ctx, "knot.nel.pet") 112 - if err != nil { 113 - t.Fatal(err) 114 - } 115 - if !slices.Equal(got, []string{"did:plc:boltless"}) { 116 - t.Errorf("got %v after recovery, want the live value", got) 117 - } 118 - if f.calls() != 2 { 119 - t.Errorf("memberCalls=%d, want 2; a failed fetch must not be cached", f.calls()) 120 - } 121 - } 122 - 123 - func TestCache_MemoShortCircuitsWithinRequest(t *testing.T) { 124 - clk := &fakeClock{t: cacheTestBase} 125 - f := &fakeLister{members: []string{"did:plc:boltless"}} 126 - c := newCache(f, cacheTTL, clk.now) 127 - ctx := WithMemo(context.Background()) 128 - 129 - first, err := c.GetKnotMembers(ctx, "knot.nel.pet") 130 - if err != nil { 131 - t.Fatal(err) 132 - } 133 - 134 - clk.advance(2 * cacheTTL) 135 - f.set([]string{"did:plc:akshay"}, nil) 136 - 137 - second, err := c.GetKnotMembers(ctx, "knot.nel.pet") 138 - if err != nil { 139 - t.Fatal(err) 140 - } 141 - if !slices.Equal(first, second) { 142 - t.Errorf("memo must hold one snapshot per request: first=%v second=%v", first, second) 143 - } 144 - if f.calls() != 1 { 145 - t.Errorf("memberCalls=%d, want 1; the request memo must not re-query even past the TTL", f.calls()) 146 - } 147 - } 148 - 149 - func TestCache_ReturnedSliceCannotCorruptCache(t *testing.T) { 150 - clk := &fakeClock{t: cacheTestBase} 151 - f := &fakeLister{members: []string{"did:plc:boltless", "did:plc:akshay"}} 152 - c := newCache(f, cacheTTL, clk.now) 153 - ctx := context.Background() 154 - 155 - got, err := c.GetKnotMembers(ctx, "knot.nel.pet") 156 - if err != nil { 157 - t.Fatal(err) 158 - } 159 - for i := range got { 160 - got[i] = "did:plc:squid" 161 - } 162 - 163 - again, err := c.GetKnotMembers(ctx, "knot.nel.pet") 164 - if err != nil { 165 - t.Fatal(err) 166 - } 167 - if slices.Contains(again, "did:plc:squid") { 168 - t.Errorf("a caller mutating its returned slice corrupted the cached entry: %v", again) 169 - } 170 - if f.calls() != 1 { 171 - t.Errorf("memberCalls=%d, want 1; the second read should be served from cache", f.calls()) 172 - } 173 - } 174 - 175 - func TestCache_SingleflightCollapsesConcurrentMisses(t *testing.T) { 176 - clk := &fakeClock{t: cacheTestBase} 177 - started := make(chan struct{}) 178 - release := make(chan struct{}) 179 - f := &fakeLister{members: []string{"did:plc:boltless"}, started: started, block: release} 180 - c := newCache(f, cacheTTL, clk.now) 181 - ctx := context.Background() 182 - 183 - var wg sync.WaitGroup 184 - call := func() { 185 - wg.Add(1) 186 - go func() { 187 - defer wg.Done() 188 - if _, err := c.GetKnotMembers(ctx, "knot.nel.pet"); err != nil { 189 - t.Errorf("GetKnotMembers: %v", err) 190 - } 191 - }() 192 - } 193 - 194 - call() 195 - <-started 196 - for range make([]struct{}, 8) { 197 - call() 198 - } 199 - time.Sleep(20 * time.Millisecond) 200 - close(release) 201 - wg.Wait() 202 - 203 - if f.calls() != 1 { 204 - t.Errorf("memberCalls=%d, want 1; concurrent misses must collapse into a single knot query", f.calls()) 205 - } 206 - } 207 - 208 - func TestCache_CapIsHardUnderFreshFlood(t *testing.T) { 209 - clk := &fakeClock{t: cacheTestBase} 210 - f := &fakeLister{members: []string{"did:plc:limpet"}} 211 - c := newCache(f, cacheTTL, clk.now) 212 - ctx := context.Background() 213 - 214 - for i := 0; i < cacheMaxEntries+100; i++ { 215 - if _, err := c.GetKnotMembers(ctx, fmt.Sprintf("knot-%d.nel.pet", i)); err != nil { 216 - t.Fatal(err) 217 - } 218 - } 219 - 220 - c.mu.Lock() 221 - n := len(c.entries) 222 - c.mu.Unlock() 223 - if n > cacheMaxEntries { 224 - t.Errorf("entries=%d, want <= %d; all-fresh keys must not grow past the cap", n, cacheMaxEntries) 225 - } 226 - }
+1 -1
appview/knotacl/reader.go
··· 65 65 } 66 66 67 67 type nativeReader struct { 68 - client *cache 68 + client *roster 69 69 execer db.Execer 70 70 } 71 71
+435
appview/knotacl/roster.go
··· 1 + package knotacl 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "slices" 10 + "sync" 11 + "sync/atomic" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "golang.org/x/sync/singleflight" 16 + 17 + "tangled.org/core/appview/db" 18 + "tangled.org/core/appview/models" 19 + "tangled.org/core/orm" 20 + ) 21 + 22 + const ( 23 + reconcileTTL = 5 * time.Minute 24 + reconcileBackoff = 15 * time.Second 25 + cursorRetention = 1 * time.Hour 26 + ) 27 + 28 + var errReconcileBackoff = errors.New("reconcile suppressed during backoff") 29 + 30 + type Cursor int64 31 + 32 + type scopeState struct { 33 + mu sync.Mutex 34 + gen atomic.Uint64 35 + failedAt time.Time 36 + refs int 37 + } 38 + 39 + type roster struct { 40 + store *db.DB 41 + src lister 42 + ttl time.Duration 43 + now func() time.Time 44 + log *slog.Logger 45 + group singleflight.Group 46 + 47 + mu sync.Mutex 48 + scopes map[string]*scopeState 49 + } 50 + 51 + func newRoster(store *db.DB, src lister, ttl time.Duration, now func() time.Time, logger *slog.Logger) *roster { 52 + if now == nil { 53 + now = time.Now 54 + } 55 + if logger == nil { 56 + logger = slog.Default() 57 + } 58 + return &roster{ 59 + store: store, 60 + src: src, 61 + ttl: ttl, 62 + now: now, 63 + log: logger, 64 + scopes: map[string]*scopeState{}, 65 + } 66 + } 67 + 68 + func (r *roster) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 69 + return r.serve(ctx, memberScope(host), 70 + func() error { return r.reconcileMembers(ctx, host) }, 71 + func() ([]string, error) { 72 + rows, err := db.GetKnotMembers(r.store, orm.FilterEq("domain", host)) 73 + if err != nil { 74 + return nil, err 75 + } 76 + return mapSlice(rows, func(km models.KnotMember) string { return km.Subject.String() }), nil 77 + }, 78 + ) 79 + } 80 + 81 + func (r *roster) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 82 + return r.serve(ctx, collabScope(repoDid), 83 + func() error { return r.reconcileCollaborators(ctx, host, repoDid) }, 84 + func() ([]string, error) { 85 + rows, err := db.GetCollaborators(r.store, orm.FilterEq("repo_did", repoDid)) 86 + if err != nil { 87 + return nil, err 88 + } 89 + return mapSlice(rows, func(c models.Collaborator) string { return c.SubjectDid.String() }), nil 90 + }, 91 + ) 92 + } 93 + 94 + func (r *roster) AddKnotMember(host string, subject syntax.DID, cursor Cursor) error { 95 + return r.applyDelta(memberScope(host), subject, cursor, func(tx *sql.Tx) error { 96 + if err := db.RemoveKnotMember(tx, 97 + orm.FilterEq("domain", host), 98 + orm.FilterEq("subject", subject.String()), 99 + ); err != nil { 100 + return err 101 + } 102 + return db.AddKnotMember(tx, models.KnotMember{Domain: host, Subject: subject}) 103 + }) 104 + } 105 + 106 + func (r *roster) RemoveKnotMember(host string, subject syntax.DID, cursor Cursor) error { 107 + return r.applyDelta(memberScope(host), subject, cursor, func(tx *sql.Tx) error { 108 + return db.RemoveKnotMember(tx, 109 + orm.FilterEq("domain", host), 110 + orm.FilterEq("subject", subject.String()), 111 + ) 112 + }) 113 + } 114 + 115 + // NOTE: maybe TODO or not, but no Did of adder means no ability to suggest a vouch for the person they just added as collaborator. 116 + func (r *roster) AddCollaborator(repoDid, subject syntax.DID, cursor Cursor) error { 117 + return r.applyDelta(collabScope(repoDid.String()), subject, cursor, func(tx *sql.Tx) error { 118 + return db.AddCollaborator(tx, models.Collaborator{SubjectDid: subject, RepoDid: repoDid}) 119 + }) 120 + } 121 + 122 + func (r *roster) RemoveCollaborator(repoDid, subject syntax.DID, cursor Cursor) error { 123 + return r.applyDelta(collabScope(repoDid.String()), subject, cursor, func(tx *sql.Tx) error { 124 + return db.DeleteCollaborator(tx, 125 + orm.FilterEq("repo_did", repoDid.String()), 126 + orm.FilterEq("subject_did", subject.String()), 127 + ) 128 + }) 129 + } 130 + 131 + func (r *roster) applyDelta(scope string, subject syntax.DID, cursor Cursor, mutate func(*sql.Tx) error) error { 132 + st := r.acquire(scope) 133 + defer r.release(scope, st) 134 + 135 + st.mu.Lock() 136 + defer st.mu.Unlock() 137 + 138 + tx, err := r.store.Begin() 139 + if err != nil { 140 + return err 141 + } 142 + defer tx.Rollback() 143 + 144 + seen, ok, err := seenCursor(tx, scope, subject) 145 + if err != nil { 146 + return err 147 + } 148 + if ok && cursor <= seen { 149 + return nil 150 + } 151 + 152 + if err := mutate(tx); err != nil { 153 + return err 154 + } 155 + if err := recordCursor(tx, scope, subject, cursor); err != nil { 156 + return err 157 + } 158 + if err := tx.Commit(); err != nil { 159 + return err 160 + } 161 + 162 + st.gen.Add(1) 163 + return nil 164 + } 165 + 166 + func (r *roster) InvalidateMembers(host string) { 167 + _ = clearSyncedAt(r.store, memberScope(host)) 168 + } 169 + 170 + func (r *roster) InvalidateCollaborators(host, repoDid string) { 171 + _ = clearSyncedAt(r.store, collabScope(repoDid)) 172 + } 173 + 174 + func (r *roster) serve(ctx context.Context, key string, reconcile func() error, read func() ([]string, error)) ([]string, error) { 175 + if memo := memoFrom(ctx); memo != nil { 176 + if v, ok := memo.get(key); ok { 177 + return slices.Clone(v), nil 178 + } 179 + } 180 + 181 + recErr := r.maybeReconcile(key, reconcile) 182 + 183 + subjects, err := read() 184 + if err != nil { 185 + return nil, err 186 + } 187 + 188 + if len(subjects) == 0 && recErr != nil && !r.everSynced(key) { 189 + return nil, fmt.Errorf("%w: %v", ErrKnotUnreachable, recErr) 190 + } 191 + 192 + subjects = dedup(subjects) 193 + if memo := memoFrom(ctx); memo != nil { 194 + memo.put(key, subjects) 195 + } 196 + return slices.Clone(subjects), nil 197 + } 198 + 199 + func (r *roster) maybeReconcile(key string, reconcile func() error) error { 200 + if r.fresh(key) { 201 + return nil 202 + } 203 + if r.backingOff(key) { 204 + return errReconcileBackoff 205 + } 206 + _, err, _ := r.group.Do(key, func() (any, error) { 207 + if r.fresh(key) { 208 + return nil, nil 209 + } 210 + if r.backingOff(key) { 211 + return nil, errReconcileBackoff 212 + } 213 + return nil, reconcile() 214 + }) 215 + return err 216 + } 217 + 218 + func (r *roster) reconcileMembers(ctx context.Context, host string) error { 219 + scope := memberScope(host) 220 + st := r.acquire(scope) 221 + defer r.release(scope, st) 222 + 223 + genBefore := st.gen.Load() 224 + subjects, err := r.src.GetKnotMembers(ctx, host) 225 + if err != nil { 226 + r.markFailed(st) 227 + return err 228 + } 229 + return r.commitReconcile(st, scope, genBefore, func(tx *sql.Tx) error { 230 + if err := db.RemoveKnotMember(tx, orm.FilterEq("domain", host)); err != nil { 231 + return err 232 + } 233 + for _, s := range subjects { 234 + did, perr := syntax.ParseDID(s) 235 + if perr != nil { 236 + r.log.Warn("dropping malformed member DID from reconcile", "host", host, "subject", s, "error", perr) 237 + continue 238 + } 239 + if err := db.AddKnotMember(tx, models.KnotMember{Domain: host, Subject: did}); err != nil { 240 + return err 241 + } 242 + } 243 + return nil 244 + }) 245 + } 246 + 247 + func (r *roster) reconcileCollaborators(ctx context.Context, host, repoDid string) error { 248 + repo, perr := syntax.ParseDID(repoDid) 249 + if perr != nil { 250 + return perr 251 + } 252 + scope := collabScope(repoDid) 253 + st := r.acquire(scope) 254 + defer r.release(scope, st) 255 + 256 + genBefore := st.gen.Load() 257 + subjects, err := r.src.GetRepoCollaborators(ctx, host, repoDid) 258 + if err != nil { 259 + r.markFailed(st) 260 + return err 261 + } 262 + return r.commitReconcile(st, scope, genBefore, func(tx *sql.Tx) error { 263 + if err := db.DeleteCollaborator(tx, orm.FilterEq("repo_did", repoDid)); err != nil { 264 + return err 265 + } 266 + for _, s := range subjects { 267 + did, perr := syntax.ParseDID(s) 268 + if perr != nil { 269 + r.log.Warn("dropping malformed collaborator DID from reconcile", "repo_did", repoDid, "subject", s, "error", perr) 270 + continue 271 + } 272 + if err := db.AddCollaborator(tx, models.Collaborator{SubjectDid: did, RepoDid: repo}); err != nil { 273 + return err 274 + } 275 + } 276 + return nil 277 + }) 278 + } 279 + 280 + func (r *roster) commitReconcile(st *scopeState, scope string, genBefore uint64, replace func(*sql.Tx) error) error { 281 + st.mu.Lock() 282 + defer st.mu.Unlock() 283 + 284 + if st.gen.Load() != genBefore { 285 + if err := setSyncedAt(r.store, scope, r.now()); err != nil { 286 + return err 287 + } 288 + r.clearFailed(st) 289 + return nil 290 + } 291 + 292 + tx, err := r.store.Begin() 293 + if err != nil { 294 + return err 295 + } 296 + defer tx.Rollback() 297 + if err := replace(tx); err != nil { 298 + return err 299 + } 300 + if err := setSyncedAt(tx, scope, r.now()); err != nil { 301 + return err 302 + } 303 + if err := pruneCursors(tx, scope, int64(cursorRetention)); err != nil { 304 + return err 305 + } 306 + if err := tx.Commit(); err != nil { 307 + return err 308 + } 309 + r.clearFailed(st) 310 + return nil 311 + } 312 + 313 + func (r *roster) acquire(scope string) *scopeState { 314 + r.mu.Lock() 315 + defer r.mu.Unlock() 316 + st := r.scopes[scope] 317 + if st == nil { 318 + st = &scopeState{} 319 + r.scopes[scope] = st 320 + } 321 + st.refs++ 322 + return st 323 + } 324 + 325 + func (r *roster) release(scope string, st *scopeState) { 326 + r.mu.Lock() 327 + defer r.mu.Unlock() 328 + st.refs-- 329 + if st.refs == 0 && st.failedAt.IsZero() { 330 + delete(r.scopes, scope) 331 + } 332 + } 333 + 334 + func (r *roster) markFailed(st *scopeState) { 335 + r.mu.Lock() 336 + defer r.mu.Unlock() 337 + st.failedAt = r.now() 338 + } 339 + 340 + func (r *roster) clearFailed(st *scopeState) { 341 + r.mu.Lock() 342 + defer r.mu.Unlock() 343 + st.failedAt = time.Time{} 344 + } 345 + 346 + func (r *roster) backingOff(scope string) bool { 347 + r.mu.Lock() 348 + defer r.mu.Unlock() 349 + st, ok := r.scopes[scope] 350 + if !ok { 351 + return false 352 + } 353 + return !st.failedAt.IsZero() && r.now().Sub(st.failedAt) < reconcileBackoff 354 + } 355 + 356 + func (r *roster) fresh(key string) bool { 357 + at, ok, err := getSyncedAt(r.store, key) 358 + if err != nil || !ok { 359 + return false 360 + } 361 + return r.now().Sub(at) < r.ttl 362 + } 363 + 364 + func (r *roster) everSynced(key string) bool { 365 + _, ok, _ := getSyncedAt(r.store, key) 366 + return ok 367 + } 368 + 369 + func memberScope(host string) string { return "m\x00" + host } 370 + 371 + func collabScope(repoDid string) string { return "c\x00" + repoDid } 372 + 373 + func getSyncedAt(e db.Execer, key string) (time.Time, bool, error) { 374 + var raw string 375 + err := e.QueryRow(`select synced_at from knotacl_sync where scope_key = ?`, key).Scan(&raw) 376 + if errors.Is(err, sql.ErrNoRows) { 377 + return time.Time{}, false, nil 378 + } 379 + if err != nil { 380 + return time.Time{}, false, err 381 + } 382 + at, err := time.Parse(time.RFC3339, raw) 383 + if err != nil { 384 + return time.Time{}, false, err 385 + } 386 + return at, true, nil 387 + } 388 + 389 + func setSyncedAt(e db.Execer, key string, at time.Time) error { 390 + _, err := e.Exec( 391 + `insert into knotacl_sync (scope_key, synced_at) values (?, ?) 392 + on conflict(scope_key) do update set synced_at = excluded.synced_at`, 393 + key, at.UTC().Format(time.RFC3339), 394 + ) 395 + return err 396 + } 397 + 398 + func clearSyncedAt(e db.Execer, key string) error { 399 + _, err := e.Exec(`delete from knotacl_sync where scope_key = ?`, key) 400 + return err 401 + } 402 + 403 + func seenCursor(e db.Execer, scope string, subject syntax.DID) (Cursor, bool, error) { 404 + var raw int64 405 + err := e.QueryRow( 406 + `select cursor from knotacl_delta_cursor where scope_key = ? and subject = ?`, 407 + scope, subject.String(), 408 + ).Scan(&raw) 409 + if errors.Is(err, sql.ErrNoRows) { 410 + return 0, false, nil 411 + } 412 + if err != nil { 413 + return 0, false, err 414 + } 415 + return Cursor(raw), true, nil 416 + } 417 + 418 + func recordCursor(e db.Execer, scope string, subject syntax.DID, cursor Cursor) error { 419 + _, err := e.Exec( 420 + `insert into knotacl_delta_cursor (scope_key, subject, cursor) values (?, ?, ?) 421 + on conflict(scope_key, subject) do update set cursor = excluded.cursor`, 422 + scope, subject.String(), int64(cursor), 423 + ) 424 + return err 425 + } 426 + 427 + func pruneCursors(e db.Execer, scope string, retentionNanos int64) error { 428 + _, err := e.Exec( 429 + `delete from knotacl_delta_cursor 430 + where scope_key = ? 431 + and cursor < (select max(cursor) from knotacl_delta_cursor where scope_key = ?) - ?`, 432 + scope, scope, retentionNanos, 433 + ) 434 + return err 435 + }
+486
appview/knotacl/roster_test.go
··· 1 + package knotacl 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "path/filepath" 7 + "slices" 8 + "sync" 9 + "testing" 10 + "time" 11 + 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + 14 + "tangled.org/core/appview/db" 15 + "tangled.org/core/appview/models" 16 + "tangled.org/core/orm" 17 + ) 18 + 19 + var rosterTestBase = time.Unix(1700000000, 0) 20 + 21 + const rosterTTL = 5 * time.Minute 22 + 23 + type fakeLister struct { 24 + mu sync.Mutex 25 + memberCalls int 26 + members []string 27 + err error 28 + started chan struct{} 29 + block chan struct{} 30 + } 31 + 32 + func (f *fakeLister) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 33 + f.mu.Lock() 34 + f.memberCalls++ 35 + members, err, started, block := f.members, f.err, f.started, f.block 36 + f.mu.Unlock() 37 + if started != nil { 38 + close(started) 39 + } 40 + if block != nil { 41 + <-block 42 + } 43 + if err != nil { 44 + return nil, err 45 + } 46 + return members, nil 47 + } 48 + 49 + func (f *fakeLister) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 50 + f.mu.Lock() 51 + defer f.mu.Unlock() 52 + return f.members, f.err 53 + } 54 + 55 + func (f *fakeLister) calls() int { 56 + f.mu.Lock() 57 + defer f.mu.Unlock() 58 + return f.memberCalls 59 + } 60 + 61 + func (f *fakeLister) set(members []string, err error) { 62 + f.mu.Lock() 63 + defer f.mu.Unlock() 64 + f.members, f.err = members, err 65 + } 66 + 67 + func (f *fakeLister) arm(started, block chan struct{}) { 68 + f.mu.Lock() 69 + defer f.mu.Unlock() 70 + f.started, f.block = started, block 71 + } 72 + 73 + type fakeClock struct { 74 + mu sync.Mutex 75 + t time.Time 76 + } 77 + 78 + func (c *fakeClock) now() time.Time { 79 + c.mu.Lock() 80 + defer c.mu.Unlock() 81 + return c.t 82 + } 83 + 84 + func (c *fakeClock) advance(d time.Duration) { 85 + c.mu.Lock() 86 + defer c.mu.Unlock() 87 + c.t = c.t.Add(d) 88 + } 89 + 90 + func rosterTestDB(t *testing.T) *db.DB { 91 + t.Helper() 92 + d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "appview.db")) 93 + if err != nil { 94 + t.Fatalf("db.Make: %v", err) 95 + } 96 + t.Cleanup(func() { d.Close() }) 97 + return d 98 + } 99 + 100 + const rosterHost = "knot.nel.pet" 101 + 102 + func membersForHost(t *testing.T, d *db.DB, host string) []models.KnotMember { 103 + t.Helper() 104 + rows, err := db.GetKnotMembers(d, orm.FilterEq("domain", host)) 105 + if err != nil { 106 + t.Fatalf("GetKnotMembers: %v", err) 107 + } 108 + return rows 109 + } 110 + 111 + func TestRoster_BootstrapThenServesFromSqlite(t *testing.T) { 112 + clk := &fakeClock{t: rosterTestBase} 113 + f := &fakeLister{members: []string{"did:plc:boltless"}} 114 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 115 + ctx := context.Background() 116 + 117 + got, err := r.GetKnotMembers(ctx, rosterHost) 118 + if err != nil { 119 + t.Fatal(err) 120 + } 121 + if !slices.Equal(got, []string{"did:plc:boltless"}) { 122 + t.Errorf("bootstrap read = %v, want the drained roster", got) 123 + } 124 + 125 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 126 + t.Fatal(err) 127 + } 128 + if f.calls() != 1 { 129 + t.Errorf("memberCalls=%d, want 1; a read within the reconcile TTL must be served from sqlite", f.calls()) 130 + } 131 + 132 + clk.advance(rosterTTL) 133 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 134 + t.Fatal(err) 135 + } 136 + if f.calls() != 2 { 137 + t.Errorf("memberCalls=%d, want 2; a stale scope must reconcile from XRPC", f.calls()) 138 + } 139 + } 140 + 141 + func TestRoster_EventDeltaVisibleWithoutXRPC(t *testing.T) { 142 + clk := &fakeClock{t: rosterTestBase} 143 + f := &fakeLister{members: []string{"did:plc:boltless"}} 144 + d := rosterTestDB(t) 145 + r := newRoster(d, f, rosterTTL, clk.now, nil) 146 + ctx := context.Background() 147 + 148 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 149 + t.Fatal(err) 150 + } 151 + 152 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:akshay"), 1); err != nil { 153 + t.Fatalf("AddKnotMember: %v", err) 154 + } 155 + 156 + got, err := r.GetKnotMembers(ctx, rosterHost) 157 + if err != nil { 158 + t.Fatal(err) 159 + } 160 + if !slices.Equal(got, []string{"did:plc:akshay", "did:plc:boltless"}) { 161 + t.Errorf("post-delta read = %v, want the pushed member reflected", got) 162 + } 163 + if f.calls() != 1 { 164 + t.Errorf("memberCalls=%d, want 1; a pushed delta must not trigger an XRPC reconcile", f.calls()) 165 + } 166 + } 167 + 168 + func TestRoster_ColdUnreachableErrors(t *testing.T) { 169 + clk := &fakeClock{t: rosterTestBase} 170 + f := &fakeLister{err: errors.New("knot unreachable")} 171 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 172 + 173 + if _, err := r.GetKnotMembers(context.Background(), rosterHost); !errors.Is(err, ErrKnotUnreachable) { 174 + t.Fatalf("err=%v, want ErrKnotUnreachable when cold and the bootstrap drain fails", err) 175 + } 176 + } 177 + 178 + func TestRoster_StaleServedWhenUnreachable(t *testing.T) { 179 + clk := &fakeClock{t: rosterTestBase} 180 + f := &fakeLister{members: []string{"did:plc:boltless"}} 181 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 182 + ctx := context.Background() 183 + 184 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 185 + t.Fatal(err) 186 + } 187 + 188 + clk.advance(2 * rosterTTL) 189 + f.set(nil, errors.New("knot unreachable")) 190 + 191 + got, err := r.GetKnotMembers(ctx, rosterHost) 192 + if err != nil { 193 + t.Fatalf("err=%v, want stale rows served when a once-synced scope goes unreachable", err) 194 + } 195 + if !slices.Equal(got, []string{"did:plc:boltless"}) { 196 + t.Errorf("stale read = %v, want the last good roster", got) 197 + } 198 + } 199 + 200 + func TestRoster_InvalidateForcesReconcile(t *testing.T) { 201 + clk := &fakeClock{t: rosterTestBase} 202 + f := &fakeLister{members: []string{"did:plc:boltless"}} 203 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 204 + ctx := context.Background() 205 + 206 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 207 + t.Fatal(err) 208 + } 209 + r.InvalidateMembers(rosterHost) 210 + 211 + f.set([]string{"did:plc:akshay"}, nil) 212 + got, err := r.GetKnotMembers(ctx, rosterHost) 213 + if err != nil { 214 + t.Fatal(err) 215 + } 216 + if !slices.Equal(got, []string{"did:plc:akshay"}) { 217 + t.Errorf("post-invalidate read = %v, want a fresh reconcile within the TTL", got) 218 + } 219 + if f.calls() != 2 { 220 + t.Errorf("memberCalls=%d, want 2; invalidation must force the next read to reconcile", f.calls()) 221 + } 222 + } 223 + 224 + func TestRoster_SingleflightCollapsesConcurrentColdReads(t *testing.T) { 225 + clk := &fakeClock{t: rosterTestBase} 226 + started := make(chan struct{}) 227 + release := make(chan struct{}) 228 + f := &fakeLister{members: []string{"did:plc:boltless"}, started: started, block: release} 229 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 230 + ctx := context.Background() 231 + 232 + var wg sync.WaitGroup 233 + call := func() { 234 + wg.Add(1) 235 + go func() { 236 + defer wg.Done() 237 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 238 + t.Errorf("GetKnotMembers: %v", err) 239 + } 240 + }() 241 + } 242 + 243 + call() 244 + <-started 245 + for range make([]struct{}, 8) { 246 + call() 247 + } 248 + time.Sleep(20 * time.Millisecond) 249 + close(release) 250 + wg.Wait() 251 + 252 + if f.calls() != 1 { 253 + t.Errorf("memberCalls=%d, want 1; concurrent cold reads must collapse into a single reconcile", f.calls()) 254 + } 255 + } 256 + 257 + func TestRoster_MemberDeltaIdempotentAndScoped(t *testing.T) { 258 + clk := &fakeClock{t: rosterTestBase} 259 + d := rosterTestDB(t) 260 + r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 261 + 262 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 1); err != nil { 263 + t.Fatalf("AddKnotMember: %v", err) 264 + } 265 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 2); err != nil { 266 + t.Fatalf("AddKnotMember: %v", err) 267 + } 268 + if err := r.RemoveKnotMember("other.nel.pet", syntax.DID("did:plc:boltless"), 1); err != nil { 269 + t.Fatalf("RemoveKnotMember other host: %v", err) 270 + } 271 + 272 + if rows := membersForHost(t, d, rosterHost); len(rows) != 1 { 273 + t.Fatalf("members = %v, want a single row after a duplicate add and an unrelated-host remove", rows) 274 + } 275 + 276 + if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 3); err != nil { 277 + t.Fatalf("RemoveKnotMember: %v", err) 278 + } 279 + if rows := membersForHost(t, d, rosterHost); len(rows) != 0 { 280 + t.Fatalf("members = %v, want empty after remove", rows) 281 + } 282 + } 283 + 284 + func TestRoster_NativeAddReplacesLegacyRow(t *testing.T) { 285 + clk := &fakeClock{t: rosterTestBase} 286 + d := rosterTestDB(t) 287 + r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 288 + 289 + if err := db.AddKnotMember(d, models.KnotMember{ 290 + Did: syntax.DID("did:plc:akshay"), 291 + Rkey: "legacy-rkey", 292 + Domain: rosterHost, 293 + Subject: syntax.DID("did:plc:boltless"), 294 + }); err != nil { 295 + t.Fatalf("seed legacy row: %v", err) 296 + } 297 + 298 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 1); err != nil { 299 + t.Fatalf("AddKnotMember: %v", err) 300 + } 301 + 302 + rows := membersForHost(t, d, rosterHost) 303 + if len(rows) != 1 { 304 + t.Fatalf("members = %v, want a single canonical row after a native add over a legacy row", rows) 305 + } 306 + if rows[0].Did != "" { 307 + t.Errorf("row did = %q, want empty; the native write must own the (domain, subject) identity", rows[0].Did) 308 + } 309 + } 310 + 311 + func TestRoster_StaleDeltaIgnored(t *testing.T) { 312 + clk := &fakeClock{t: rosterTestBase} 313 + d := rosterTestDB(t) 314 + r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 315 + 316 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil { 317 + t.Fatalf("AddKnotMember: %v", err) 318 + } 319 + if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 50); err != nil { 320 + t.Fatalf("RemoveKnotMember: %v", err) 321 + } 322 + 323 + if rows := membersForHost(t, d, rosterHost); len(rows) != 1 { 324 + t.Fatalf("members = %v, want the add preserved; a lower-cursor remove arriving late must be ignored", rows) 325 + } 326 + } 327 + 328 + func TestRoster_NewerRemoveWins(t *testing.T) { 329 + clk := &fakeClock{t: rosterTestBase} 330 + d := rosterTestDB(t) 331 + r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 332 + 333 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil { 334 + t.Fatalf("AddKnotMember: %v", err) 335 + } 336 + if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 101); err != nil { 337 + t.Fatalf("RemoveKnotMember: %v", err) 338 + } 339 + 340 + if rows := membersForHost(t, d, rosterHost); len(rows) != 0 { 341 + t.Fatalf("members = %v, want empty; a higher-cursor remove must win", rows) 342 + } 343 + } 344 + 345 + func TestRoster_LateAddAfterRemoveIgnored(t *testing.T) { 346 + clk := &fakeClock{t: rosterTestBase} 347 + d := rosterTestDB(t) 348 + r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 349 + 350 + if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 101); err != nil { 351 + t.Fatalf("RemoveKnotMember: %v", err) 352 + } 353 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil { 354 + t.Fatalf("AddKnotMember: %v", err) 355 + } 356 + 357 + if rows := membersForHost(t, d, rosterHost); len(rows) != 0 { 358 + t.Fatalf("members = %v, want empty; an add older than the applied remove must be ignored", rows) 359 + } 360 + } 361 + 362 + func TestRoster_ReconcilePrunesStaleCursors(t *testing.T) { 363 + clk := &fakeClock{t: rosterTestBase} 364 + d := rosterTestDB(t) 365 + r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 366 + ctx := context.Background() 367 + 368 + stale := Cursor(rosterTestBase.Add(-2 * cursorRetention).UnixNano()) 369 + fresh := Cursor(rosterTestBase.UnixNano()) 370 + 371 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:clam"), stale); err != nil { 372 + t.Fatalf("AddKnotMember stale: %v", err) 373 + } 374 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:whelk"), fresh); err != nil { 375 + t.Fatalf("AddKnotMember fresh: %v", err) 376 + } 377 + 378 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 379 + t.Fatalf("reconcile: %v", err) 380 + } 381 + 382 + scope := memberScope(rosterHost) 383 + if _, ok, err := seenCursor(d, scope, syntax.DID("did:plc:clam")); err != nil || ok { 384 + t.Errorf("stale cursor present (ok=%v err=%v); reconcile must prune cursors older than the retention window", ok, err) 385 + } 386 + if _, ok, err := seenCursor(d, scope, syntax.DID("did:plc:whelk")); err != nil || !ok { 387 + t.Errorf("fresh cursor missing (ok=%v err=%v); reconcile must keep cursors within the retention window", ok, err) 388 + } 389 + } 390 + 391 + func TestRoster_ReconcileDoesNotClobberConcurrentDelta(t *testing.T) { 392 + clk := &fakeClock{t: rosterTestBase} 393 + f := &fakeLister{members: []string{"did:plc:boltless"}} 394 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 395 + ctx := context.Background() 396 + 397 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 398 + t.Fatal(err) 399 + } 400 + 401 + clk.advance(rosterTTL) 402 + 403 + started := make(chan struct{}) 404 + release := make(chan struct{}) 405 + f.arm(started, release) 406 + 407 + done := make(chan []string, 1) 408 + go func() { 409 + got, err := r.GetKnotMembers(ctx, rosterHost) 410 + if err != nil { 411 + t.Errorf("reconcile read: %v", err) 412 + } 413 + done <- got 414 + }() 415 + 416 + <-started 417 + if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:akshay"), 1); err != nil { 418 + t.Fatalf("AddKnotMember: %v", err) 419 + } 420 + close(release) 421 + 422 + got := <-done 423 + if !slices.Contains(got, "did:plc:akshay") { 424 + t.Errorf("post-reconcile read = %v, want the delta that landed during the drain preserved", got) 425 + } 426 + } 427 + 428 + func TestRoster_BackoffSuppressesRepeatedDrains(t *testing.T) { 429 + clk := &fakeClock{t: rosterTestBase} 430 + f := &fakeLister{members: []string{"did:plc:boltless"}} 431 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 432 + ctx := context.Background() 433 + 434 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 435 + t.Fatal(err) 436 + } 437 + if f.calls() != 1 { 438 + t.Fatalf("memberCalls=%d, want 1 after bootstrap", f.calls()) 439 + } 440 + 441 + clk.advance(rosterTTL) 442 + f.set(nil, errors.New("knot unreachable")) 443 + 444 + if got, err := r.GetKnotMembers(ctx, rosterHost); err != nil || !slices.Equal(got, []string{"did:plc:boltless"}) { 445 + t.Fatalf("got %v err %v, want the stale roster served", got, err) 446 + } 447 + if f.calls() != 2 { 448 + t.Fatalf("memberCalls=%d, want 2 after the first stale drain", f.calls()) 449 + } 450 + 451 + if got, err := r.GetKnotMembers(ctx, rosterHost); err != nil || !slices.Equal(got, []string{"did:plc:boltless"}) { 452 + t.Fatalf("got %v err %v, want the stale roster served", got, err) 453 + } 454 + if f.calls() != 2 { 455 + t.Errorf("memberCalls=%d, want 2; a down knot must not be re-probed within the backoff window", f.calls()) 456 + } 457 + 458 + clk.advance(reconcileBackoff) 459 + if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 460 + t.Fatal(err) 461 + } 462 + if f.calls() != 3 { 463 + t.Errorf("memberCalls=%d, want 3; the probe must resume once the backoff window elapses", f.calls()) 464 + } 465 + } 466 + 467 + func TestRoster_BackoffStillErrorsWhenCold(t *testing.T) { 468 + clk := &fakeClock{t: rosterTestBase} 469 + f := &fakeLister{err: errors.New("knot unreachable")} 470 + r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 471 + ctx := context.Background() 472 + 473 + if _, err := r.GetKnotMembers(ctx, rosterHost); !errors.Is(err, ErrKnotUnreachable) { 474 + t.Fatalf("err=%v, want ErrKnotUnreachable on the cold drain failure", err) 475 + } 476 + if f.calls() != 1 { 477 + t.Fatalf("memberCalls=%d, want 1", f.calls()) 478 + } 479 + 480 + if _, err := r.GetKnotMembers(ctx, rosterHost); !errors.Is(err, ErrKnotUnreachable) { 481 + t.Fatalf("err=%v, want ErrKnotUnreachable during backoff for a cold scope", err) 482 + } 483 + if f.calls() != 1 { 484 + t.Errorf("memberCalls=%d, want 1; a cold scope must not be re-probed within the backoff window", f.calls()) 485 + } 486 + }
+19 -2
appview/knotacl/service.go
··· 8 8 "sync" 9 9 "time" 10 10 11 + "github.com/bluesky-social/indigo/atproto/syntax" 11 12 "golang.org/x/sync/errgroup" 12 13 13 14 "tangled.org/core/appview/db" ··· 33 34 nat *nativeReader 34 35 } 35 36 36 - func NewService(enforcer *rbac.Enforcer, execer db.Execer, dev bool, logger *slog.Logger) *Service { 37 + func NewService(enforcer *rbac.Enforcer, store *db.DB, dev bool, logger *slog.Logger) *Service { 37 38 return &Service{ 38 39 dev: dev, 39 40 log: logger, 40 41 leg: &legacyReader{enforcer: enforcer}, 41 - nat: &nativeReader{client: newCache(NewClient(dev, logger), cacheTTL, nil), execer: execer}, 42 + nat: &nativeReader{client: newRoster(store, NewClient(dev, logger), reconcileTTL, nil, logger), execer: store}, 42 43 } 43 44 } 44 45 ··· 87 88 88 89 func (s *Service) InvalidateCollaborators(host, repoDid string) { 89 90 s.nat.client.InvalidateCollaborators(host, repoDid) 91 + } 92 + 93 + func (s *Service) AddKnotMember(host string, subject syntax.DID, cursor Cursor) error { 94 + return s.nat.client.AddKnotMember(host, subject, cursor) 95 + } 96 + 97 + func (s *Service) RemoveKnotMember(host string, subject syntax.DID, cursor Cursor) error { 98 + return s.nat.client.RemoveKnotMember(host, subject, cursor) 99 + } 100 + 101 + func (s *Service) AddCollaborator(repoDid, subject syntax.DID, cursor Cursor) error { 102 + return s.nat.client.AddCollaborator(repoDid, subject, cursor) 103 + } 104 + 105 + func (s *Service) RemoveCollaborator(repoDid, subject syntax.DID, cursor Cursor) error { 106 + return s.nat.client.RemoveCollaborator(repoDid, subject, cursor) 90 107 } 91 108 92 109 func (s *Service) KnotsForUser(ctx context.Context, userDid string) []string {
+24 -5
appview/knotacl/service_test.go
··· 105 105 return &models.Repo{Did: testOwner, Knot: host, RepoDid: testRepoDid, Name: "anemone"} 106 106 } 107 107 108 + func seedRepoRow(t *testing.T, d *db.DB, repo *models.Repo) { 109 + t.Helper() 110 + tx, err := d.Begin() 111 + if err != nil { 112 + t.Fatalf("begin: %v", err) 113 + } 114 + if err := db.AddRepo(tx, repo); err != nil { 115 + t.Fatalf("AddRepo: %v", err) 116 + } 117 + if err := tx.Commit(); err != nil { 118 + t.Fatalf("commit: %v", err) 119 + } 120 + } 121 + 108 122 func seedRepoPolicies(t *testing.T, e *rbac.Enforcer, host string) { 109 123 t.Helper() 110 124 if err := e.AddRepo(testOwner, host, testRepoDid); err != nil { ··· 141 155 func TestService_ParityOldVsNew(t *testing.T) { 142 156 ctx := context.Background() 143 157 oldSvc, _, oldHost := newServiceEnv(t, &fakeKnot{version: "v1.14.0"}, func(e *rbac.Enforcer, h string) { seedRepoPolicies(t, e, h) }) 144 - newSvc, _, newHost := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 158 + newSvc, newDb, newHost := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 159 + seedRepoRow(t, newDb, testRepo(newHost)) 145 160 146 161 for _, did := range []string{testOwner, testCollab} { 147 162 oldRoles := sortedRoles(oldSvc.RolesInRepo(ctx, testRepo(oldHost), did).Roles) ··· 167 182 168 183 func TestService_NewKnotCollaboratorFromList(t *testing.T) { 169 184 ctx := context.Background() 170 - svc, _, host := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 185 + svc, d, host := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 186 + seedRepoRow(t, d, testRepo(host)) 171 187 172 188 collab := svc.RolesInRepo(ctx, testRepo(host), testCollab) 173 189 if !collab.IsCollaborator() || !collab.IsPushAllowed() { ··· 181 197 func TestService_MixedFleet(t *testing.T) { 182 198 ctx := context.Background() 183 199 oldSvc, _, oldHost := newServiceEnv(t, &fakeKnot{version: "v1.14.0"}, func(e *rbac.Enforcer, h string) { seedRepoPolicies(t, e, h) }) 184 - newSvc, _, newHost := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 200 + newSvc, newDb, newHost := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 201 + seedRepoRow(t, newDb, testRepo(newHost)) 185 202 186 203 if !newSvc.RolesInRepo(ctx, testRepo(newHost), testCollab).IsCollaborator() { 187 204 t.Error("new-knot collaborator must resolve from the live query with an empty casbin") ··· 307 324 func TestService_CollaboratorsNewKnot(t *testing.T) { 308 325 ctx := context.Background() 309 326 310 - svc, _, host := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 327 + svc, d, host := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab}}, nil) 328 + seedRepoRow(t, d, testRepo(host)) 311 329 collabs := svc.Collaborators(ctx, testRepo(host)) 312 330 if len(collabs) != 2 { 313 331 t.Fatalf("Collaborators = %v, want owner + one collaborator", collabs) ··· 319 337 t.Errorf("second row = %v, want the collaborator", collabs[1]) 320 338 } 321 339 322 - dupSvc, _, dupHost := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab, testOwner}}, nil) 340 + dupSvc, dupDb, dupHost := newServiceEnv(t, &fakeKnot{version: "v1.15.0", capabilities: capsKnotACL, collaborators: []string{testCollab, testOwner}}, nil) 341 + seedRepoRow(t, dupDb, testRepo(dupHost)) 323 342 rows := dupSvc.Collaborators(ctx, testRepo(dupHost)) 324 343 ownerRows := 0 325 344 for _, c := range rows {