Monorepo for Tangled
tangled.org
1package knotacl
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "slices"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "golang.org/x/sync/singleflight"
16
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/orm"
20)
21
22const (
23 reconcileTTL = 5 * time.Minute
24 reconcileBackoff = 15 * time.Second
25 cursorRetention = 1 * time.Hour
26)
27
28var errReconcileBackoff = errors.New("reconcile suppressed during backoff")
29
30type Cursor int64
31
32type scopeState struct {
33 mu sync.Mutex
34 gen atomic.Uint64
35 failedAt time.Time
36 refs int
37}
38
39type roster struct {
40 store *db.DB
41 src lister
42 ttl time.Duration
43 now func() time.Time
44 log *slog.Logger
45 group singleflight.Group
46
47 mu sync.Mutex
48 scopes map[string]*scopeState
49}
50
51func newRoster(store *db.DB, src lister, ttl time.Duration, now func() time.Time, logger *slog.Logger) *roster {
52 if now == nil {
53 now = time.Now
54 }
55 if logger == nil {
56 logger = slog.Default()
57 }
58 return &roster{
59 store: store,
60 src: src,
61 ttl: ttl,
62 now: now,
63 log: logger,
64 scopes: map[string]*scopeState{},
65 }
66}
67
68func (r *roster) GetKnotMembers(ctx context.Context, host string) ([]string, error) {
69 return r.serve(ctx, memberScope(host),
70 func() error { return r.reconcileMembers(ctx, host) },
71 func() ([]string, error) {
72 rows, err := db.GetKnotMembers(r.store, orm.FilterEq("domain", host))
73 if err != nil {
74 return nil, err
75 }
76 return mapSlice(rows, func(km models.KnotMember) string { return km.Subject.String() }), nil
77 },
78 )
79}
80
81func (r *roster) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) {
82 return r.serve(ctx, collabScope(repoDid),
83 func() error { return r.reconcileCollaborators(ctx, host, repoDid) },
84 func() ([]string, error) {
85 rows, err := db.GetCollaborators(r.store, orm.FilterEq("repo_did", repoDid))
86 if err != nil {
87 return nil, err
88 }
89 return mapSlice(rows, func(c models.Collaborator) string { return c.SubjectDid.String() }), nil
90 },
91 )
92}
93
94func (r *roster) AddKnotMember(host string, subject syntax.DID, cursor Cursor) error {
95 return r.applyDelta(memberScope(host), subject, cursor, func(tx *sql.Tx) error {
96 if err := db.RemoveKnotMember(tx,
97 orm.FilterEq("domain", host),
98 orm.FilterEq("subject", subject.String()),
99 ); err != nil {
100 return err
101 }
102 return db.AddKnotMember(tx, models.KnotMember{Domain: host, Subject: subject})
103 })
104}
105
106func (r *roster) RemoveKnotMember(host string, subject syntax.DID, cursor Cursor) error {
107 return r.applyDelta(memberScope(host), subject, cursor, func(tx *sql.Tx) error {
108 return db.RemoveKnotMember(tx,
109 orm.FilterEq("domain", host),
110 orm.FilterEq("subject", subject.String()),
111 )
112 })
113}
114
115// NOTE: maybe TODO or not, but no Did of adder means no ability to suggest a vouch for the person they just added as collaborator.
116func (r *roster) AddCollaborator(repoDid, subject syntax.DID, cursor Cursor) error {
117 return r.applyDelta(collabScope(repoDid.String()), subject, cursor, func(tx *sql.Tx) error {
118 return db.AddCollaborator(tx, models.Collaborator{SubjectDid: subject, RepoDid: repoDid})
119 })
120}
121
122func (r *roster) RemoveCollaborator(repoDid, subject syntax.DID, cursor Cursor) error {
123 return r.applyDelta(collabScope(repoDid.String()), subject, cursor, func(tx *sql.Tx) error {
124 return db.DeleteCollaborator(tx,
125 orm.FilterEq("repo_did", repoDid.String()),
126 orm.FilterEq("subject_did", subject.String()),
127 )
128 })
129}
130
131func (r *roster) applyDelta(scope string, subject syntax.DID, cursor Cursor, mutate func(*sql.Tx) error) error {
132 st := r.acquire(scope)
133 defer r.release(scope, st)
134
135 st.mu.Lock()
136 defer st.mu.Unlock()
137
138 tx, err := r.store.Begin()
139 if err != nil {
140 return err
141 }
142 defer tx.Rollback()
143
144 seen, ok, err := seenCursor(tx, scope, subject)
145 if err != nil {
146 return err
147 }
148 if ok && cursor <= seen {
149 return nil
150 }
151
152 if err := mutate(tx); err != nil {
153 return err
154 }
155 if err := recordCursor(tx, scope, subject, cursor); err != nil {
156 return err
157 }
158 if err := tx.Commit(); err != nil {
159 return err
160 }
161
162 st.gen.Add(1)
163 return nil
164}
165
166func (r *roster) InvalidateMembers(host string) {
167 _ = clearSyncedAt(r.store, memberScope(host))
168}
169
170func (r *roster) InvalidateCollaborators(host, repoDid string) {
171 _ = clearSyncedAt(r.store, collabScope(repoDid))
172}
173
174func (r *roster) serve(ctx context.Context, key string, reconcile func() error, read func() ([]string, error)) ([]string, error) {
175 if memo := memoFrom(ctx); memo != nil {
176 if v, ok := memo.get(key); ok {
177 return slices.Clone(v), nil
178 }
179 }
180
181 recErr := r.maybeReconcile(key, reconcile)
182
183 subjects, err := read()
184 if err != nil {
185 return nil, err
186 }
187
188 if len(subjects) == 0 && recErr != nil && !r.everSynced(key) {
189 return nil, fmt.Errorf("%w: %v", ErrKnotUnreachable, recErr)
190 }
191
192 subjects = dedup(subjects)
193 if memo := memoFrom(ctx); memo != nil {
194 memo.put(key, subjects)
195 }
196 return slices.Clone(subjects), nil
197}
198
199func (r *roster) maybeReconcile(key string, reconcile func() error) error {
200 if r.fresh(key) {
201 return nil
202 }
203 if r.backingOff(key) {
204 return errReconcileBackoff
205 }
206 _, err, _ := r.group.Do(key, func() (any, error) {
207 if r.fresh(key) {
208 return nil, nil
209 }
210 if r.backingOff(key) {
211 return nil, errReconcileBackoff
212 }
213 return nil, reconcile()
214 })
215 return err
216}
217
218func (r *roster) reconcileMembers(ctx context.Context, host string) error {
219 scope := memberScope(host)
220 st := r.acquire(scope)
221 defer r.release(scope, st)
222
223 genBefore := st.gen.Load()
224 subjects, err := r.src.GetKnotMembers(ctx, host)
225 if err != nil {
226 r.markFailed(st)
227 return err
228 }
229 return r.commitReconcile(st, scope, genBefore, func(tx *sql.Tx) error {
230 if err := db.RemoveKnotMember(tx, orm.FilterEq("domain", host)); err != nil {
231 return err
232 }
233 for _, s := range subjects {
234 did, perr := syntax.ParseDID(s)
235 if perr != nil {
236 r.log.Warn("dropping malformed member DID from reconcile", "host", host, "subject", s, "error", perr)
237 continue
238 }
239 if err := db.AddKnotMember(tx, models.KnotMember{Domain: host, Subject: did}); err != nil {
240 return err
241 }
242 }
243 return nil
244 })
245}
246
247func (r *roster) reconcileCollaborators(ctx context.Context, host, repoDid string) error {
248 repo, perr := syntax.ParseDID(repoDid)
249 if perr != nil {
250 return perr
251 }
252 scope := collabScope(repoDid)
253 st := r.acquire(scope)
254 defer r.release(scope, st)
255
256 genBefore := st.gen.Load()
257 subjects, err := r.src.GetRepoCollaborators(ctx, host, repoDid)
258 if err != nil {
259 r.markFailed(st)
260 return err
261 }
262 return r.commitReconcile(st, scope, genBefore, func(tx *sql.Tx) error {
263 if err := db.DeleteCollaborator(tx, orm.FilterEq("repo_did", repoDid)); err != nil {
264 return err
265 }
266 for _, s := range subjects {
267 did, perr := syntax.ParseDID(s)
268 if perr != nil {
269 r.log.Warn("dropping malformed collaborator DID from reconcile", "repo_did", repoDid, "subject", s, "error", perr)
270 continue
271 }
272 if err := db.AddCollaborator(tx, models.Collaborator{SubjectDid: did, RepoDid: repo}); err != nil {
273 return err
274 }
275 }
276 return nil
277 })
278}
279
280func (r *roster) commitReconcile(st *scopeState, scope string, genBefore uint64, replace func(*sql.Tx) error) error {
281 st.mu.Lock()
282 defer st.mu.Unlock()
283
284 if st.gen.Load() != genBefore {
285 if err := setSyncedAt(r.store, scope, r.now()); err != nil {
286 return err
287 }
288 r.clearFailed(st)
289 return nil
290 }
291
292 tx, err := r.store.Begin()
293 if err != nil {
294 return err
295 }
296 defer tx.Rollback()
297 if err := replace(tx); err != nil {
298 return err
299 }
300 if err := setSyncedAt(tx, scope, r.now()); err != nil {
301 return err
302 }
303 if err := pruneCursors(tx, scope, int64(cursorRetention)); err != nil {
304 return err
305 }
306 if err := tx.Commit(); err != nil {
307 return err
308 }
309 r.clearFailed(st)
310 return nil
311}
312
313func (r *roster) acquire(scope string) *scopeState {
314 r.mu.Lock()
315 defer r.mu.Unlock()
316 st := r.scopes[scope]
317 if st == nil {
318 st = &scopeState{}
319 r.scopes[scope] = st
320 }
321 st.refs++
322 return st
323}
324
325func (r *roster) release(scope string, st *scopeState) {
326 r.mu.Lock()
327 defer r.mu.Unlock()
328 st.refs--
329 if st.refs == 0 && st.failedAt.IsZero() {
330 delete(r.scopes, scope)
331 }
332}
333
334func (r *roster) markFailed(st *scopeState) {
335 r.mu.Lock()
336 defer r.mu.Unlock()
337 st.failedAt = r.now()
338}
339
340func (r *roster) clearFailed(st *scopeState) {
341 r.mu.Lock()
342 defer r.mu.Unlock()
343 st.failedAt = time.Time{}
344}
345
346func (r *roster) backingOff(scope string) bool {
347 r.mu.Lock()
348 defer r.mu.Unlock()
349 st, ok := r.scopes[scope]
350 if !ok {
351 return false
352 }
353 return !st.failedAt.IsZero() && r.now().Sub(st.failedAt) < reconcileBackoff
354}
355
356func (r *roster) fresh(key string) bool {
357 at, ok, err := getSyncedAt(r.store, key)
358 if err != nil || !ok {
359 return false
360 }
361 return r.now().Sub(at) < r.ttl
362}
363
364func (r *roster) everSynced(key string) bool {
365 _, ok, _ := getSyncedAt(r.store, key)
366 return ok
367}
368
369func memberScope(host string) string { return "m\x00" + host }
370
371func collabScope(repoDid string) string { return "c\x00" + repoDid }
372
373func getSyncedAt(e db.Execer, key string) (time.Time, bool, error) {
374 var raw string
375 err := e.QueryRow(`select synced_at from knotacl_sync where scope_key = ?`, key).Scan(&raw)
376 if errors.Is(err, sql.ErrNoRows) {
377 return time.Time{}, false, nil
378 }
379 if err != nil {
380 return time.Time{}, false, err
381 }
382 at, err := time.Parse(time.RFC3339, raw)
383 if err != nil {
384 return time.Time{}, false, err
385 }
386 return at, true, nil
387}
388
389func setSyncedAt(e db.Execer, key string, at time.Time) error {
390 _, err := e.Exec(
391 `insert into knotacl_sync (scope_key, synced_at) values (?, ?)
392 on conflict(scope_key) do update set synced_at = excluded.synced_at`,
393 key, at.UTC().Format(time.RFC3339),
394 )
395 return err
396}
397
398func clearSyncedAt(e db.Execer, key string) error {
399 _, err := e.Exec(`delete from knotacl_sync where scope_key = ?`, key)
400 return err
401}
402
403func seenCursor(e db.Execer, scope string, subject syntax.DID) (Cursor, bool, error) {
404 var raw int64
405 err := e.QueryRow(
406 `select cursor from knotacl_delta_cursor where scope_key = ? and subject = ?`,
407 scope, subject.String(),
408 ).Scan(&raw)
409 if errors.Is(err, sql.ErrNoRows) {
410 return 0, false, nil
411 }
412 if err != nil {
413 return 0, false, err
414 }
415 return Cursor(raw), true, nil
416}
417
418func recordCursor(e db.Execer, scope string, subject syntax.DID, cursor Cursor) error {
419 _, err := e.Exec(
420 `insert into knotacl_delta_cursor (scope_key, subject, cursor) values (?, ?, ?)
421 on conflict(scope_key, subject) do update set cursor = excluded.cursor`,
422 scope, subject.String(), int64(cursor),
423 )
424 return err
425}
426
427func pruneCursors(e db.Execer, scope string, retentionNanos int64) error {
428 _, err := e.Exec(
429 `delete from knotacl_delta_cursor
430 where scope_key = ?
431 and cursor < (select max(cursor) from knotacl_delta_cursor where scope_key = ?) - ?`,
432 scope, scope, retentionNanos,
433 )
434 return err
435}