Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package knotacl 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "slices" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "golang.org/x/sync/singleflight" 16 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/orm" 20) 21 22const ( 23 reconcileTTL = 5 * time.Minute 24 reconcileBackoff = 15 * time.Second 25 cursorRetention = 1 * time.Hour 26) 27 28var errReconcileBackoff = errors.New("reconcile suppressed during backoff") 29 30type Cursor int64 31 32type scopeState struct { 33 mu sync.Mutex 34 gen atomic.Uint64 35 failedAt time.Time 36 refs int 37} 38 39type roster struct { 40 store *db.DB 41 src lister 42 ttl time.Duration 43 now func() time.Time 44 log *slog.Logger 45 group singleflight.Group 46 47 mu sync.Mutex 48 scopes map[string]*scopeState 49} 50 51func newRoster(store *db.DB, src lister, ttl time.Duration, now func() time.Time, logger *slog.Logger) *roster { 52 if now == nil { 53 now = time.Now 54 } 55 if logger == nil { 56 logger = slog.Default() 57 } 58 return &roster{ 59 store: store, 60 src: src, 61 ttl: ttl, 62 now: now, 63 log: logger, 64 scopes: map[string]*scopeState{}, 65 } 66} 67 68func (r *roster) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 69 return r.serve(ctx, memberScope(host), 70 func() error { return r.reconcileMembers(ctx, host) }, 71 func() ([]string, error) { 72 rows, err := db.GetKnotMembers(r.store, orm.FilterEq("domain", host)) 73 if err != nil { 74 return nil, err 75 } 76 return mapSlice(rows, func(km models.KnotMember) string { return km.Subject.String() }), nil 77 }, 78 ) 79} 80 81func (r *roster) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 82 return r.serve(ctx, collabScope(repoDid), 83 func() error { return r.reconcileCollaborators(ctx, host, repoDid) }, 84 func() ([]string, error) { 85 rows, err := db.GetCollaborators(r.store, orm.FilterEq("repo_did", repoDid)) 86 if err != nil { 87 return nil, err 88 } 89 return mapSlice(rows, func(c models.Collaborator) string { return c.SubjectDid.String() }), nil 90 }, 91 ) 92} 93 94func (r *roster) AddKnotMember(host string, subject syntax.DID, cursor Cursor) error { 95 return r.applyDelta(memberScope(host), subject, cursor, func(tx *sql.Tx) error { 96 if err := db.RemoveKnotMember(tx, 97 orm.FilterEq("domain", host), 98 orm.FilterEq("subject", subject.String()), 99 ); err != nil { 100 return err 101 } 102 return db.AddKnotMember(tx, models.KnotMember{Domain: host, Subject: subject}) 103 }) 104} 105 106func (r *roster) RemoveKnotMember(host string, subject syntax.DID, cursor Cursor) error { 107 return r.applyDelta(memberScope(host), subject, cursor, func(tx *sql.Tx) error { 108 return db.RemoveKnotMember(tx, 109 orm.FilterEq("domain", host), 110 orm.FilterEq("subject", subject.String()), 111 ) 112 }) 113} 114 115// NOTE: maybe TODO or not, but no Did of adder means no ability to suggest a vouch for the person they just added as collaborator. 116func (r *roster) AddCollaborator(repoDid, subject syntax.DID, cursor Cursor) error { 117 return r.applyDelta(collabScope(repoDid.String()), subject, cursor, func(tx *sql.Tx) error { 118 return db.AddCollaborator(tx, models.Collaborator{SubjectDid: subject, RepoDid: repoDid}) 119 }) 120} 121 122func (r *roster) RemoveCollaborator(repoDid, subject syntax.DID, cursor Cursor) error { 123 return r.applyDelta(collabScope(repoDid.String()), subject, cursor, func(tx *sql.Tx) error { 124 return db.DeleteCollaborator(tx, 125 orm.FilterEq("repo_did", repoDid.String()), 126 orm.FilterEq("subject_did", subject.String()), 127 ) 128 }) 129} 130 131func (r *roster) applyDelta(scope string, subject syntax.DID, cursor Cursor, mutate func(*sql.Tx) error) error { 132 st := r.acquire(scope) 133 defer r.release(scope, st) 134 135 st.mu.Lock() 136 defer st.mu.Unlock() 137 138 tx, err := r.store.Begin() 139 if err != nil { 140 return err 141 } 142 defer tx.Rollback() 143 144 seen, ok, err := seenCursor(tx, scope, subject) 145 if err != nil { 146 return err 147 } 148 if ok && cursor <= seen { 149 return nil 150 } 151 152 if err := mutate(tx); err != nil { 153 return err 154 } 155 if err := recordCursor(tx, scope, subject, cursor); err != nil { 156 return err 157 } 158 if err := tx.Commit(); err != nil { 159 return err 160 } 161 162 st.gen.Add(1) 163 return nil 164} 165 166func (r *roster) InvalidateMembers(host string) { 167 _ = clearSyncedAt(r.store, memberScope(host)) 168} 169 170func (r *roster) InvalidateCollaborators(host, repoDid string) { 171 _ = clearSyncedAt(r.store, collabScope(repoDid)) 172} 173 174func (r *roster) serve(ctx context.Context, key string, reconcile func() error, read func() ([]string, error)) ([]string, error) { 175 if memo := memoFrom(ctx); memo != nil { 176 if v, ok := memo.get(key); ok { 177 return slices.Clone(v), nil 178 } 179 } 180 181 recErr := r.maybeReconcile(key, reconcile) 182 183 subjects, err := read() 184 if err != nil { 185 return nil, err 186 } 187 188 if len(subjects) == 0 && recErr != nil && !r.everSynced(key) { 189 return nil, fmt.Errorf("%w: %v", ErrKnotUnreachable, recErr) 190 } 191 192 subjects = dedup(subjects) 193 if memo := memoFrom(ctx); memo != nil { 194 memo.put(key, subjects) 195 } 196 return slices.Clone(subjects), nil 197} 198 199func (r *roster) maybeReconcile(key string, reconcile func() error) error { 200 if r.fresh(key) { 201 return nil 202 } 203 if r.backingOff(key) { 204 return errReconcileBackoff 205 } 206 _, err, _ := r.group.Do(key, func() (any, error) { 207 if r.fresh(key) { 208 return nil, nil 209 } 210 if r.backingOff(key) { 211 return nil, errReconcileBackoff 212 } 213 return nil, reconcile() 214 }) 215 return err 216} 217 218func (r *roster) reconcileMembers(ctx context.Context, host string) error { 219 scope := memberScope(host) 220 st := r.acquire(scope) 221 defer r.release(scope, st) 222 223 genBefore := st.gen.Load() 224 subjects, err := r.src.GetKnotMembers(ctx, host) 225 if err != nil { 226 r.markFailed(st) 227 return err 228 } 229 return r.commitReconcile(st, scope, genBefore, func(tx *sql.Tx) error { 230 if err := db.RemoveKnotMember(tx, orm.FilterEq("domain", host)); err != nil { 231 return err 232 } 233 for _, s := range subjects { 234 did, perr := syntax.ParseDID(s) 235 if perr != nil { 236 r.log.Warn("dropping malformed member DID from reconcile", "host", host, "subject", s, "error", perr) 237 continue 238 } 239 if err := db.AddKnotMember(tx, models.KnotMember{Domain: host, Subject: did}); err != nil { 240 return err 241 } 242 } 243 return nil 244 }) 245} 246 247func (r *roster) reconcileCollaborators(ctx context.Context, host, repoDid string) error { 248 repo, perr := syntax.ParseDID(repoDid) 249 if perr != nil { 250 return perr 251 } 252 scope := collabScope(repoDid) 253 st := r.acquire(scope) 254 defer r.release(scope, st) 255 256 genBefore := st.gen.Load() 257 subjects, err := r.src.GetRepoCollaborators(ctx, host, repoDid) 258 if err != nil { 259 r.markFailed(st) 260 return err 261 } 262 return r.commitReconcile(st, scope, genBefore, func(tx *sql.Tx) error { 263 if err := db.DeleteCollaborator(tx, orm.FilterEq("repo_did", repoDid)); err != nil { 264 return err 265 } 266 for _, s := range subjects { 267 did, perr := syntax.ParseDID(s) 268 if perr != nil { 269 r.log.Warn("dropping malformed collaborator DID from reconcile", "repo_did", repoDid, "subject", s, "error", perr) 270 continue 271 } 272 if err := db.AddCollaborator(tx, models.Collaborator{SubjectDid: did, RepoDid: repo}); err != nil { 273 return err 274 } 275 } 276 return nil 277 }) 278} 279 280func (r *roster) commitReconcile(st *scopeState, scope string, genBefore uint64, replace func(*sql.Tx) error) error { 281 st.mu.Lock() 282 defer st.mu.Unlock() 283 284 if st.gen.Load() != genBefore { 285 if err := setSyncedAt(r.store, scope, r.now()); err != nil { 286 return err 287 } 288 r.clearFailed(st) 289 return nil 290 } 291 292 tx, err := r.store.Begin() 293 if err != nil { 294 return err 295 } 296 defer tx.Rollback() 297 if err := replace(tx); err != nil { 298 return err 299 } 300 if err := setSyncedAt(tx, scope, r.now()); err != nil { 301 return err 302 } 303 if err := pruneCursors(tx, scope, int64(cursorRetention)); err != nil { 304 return err 305 } 306 if err := tx.Commit(); err != nil { 307 return err 308 } 309 r.clearFailed(st) 310 return nil 311} 312 313func (r *roster) acquire(scope string) *scopeState { 314 r.mu.Lock() 315 defer r.mu.Unlock() 316 st := r.scopes[scope] 317 if st == nil { 318 st = &scopeState{} 319 r.scopes[scope] = st 320 } 321 st.refs++ 322 return st 323} 324 325func (r *roster) release(scope string, st *scopeState) { 326 r.mu.Lock() 327 defer r.mu.Unlock() 328 st.refs-- 329 if st.refs == 0 && st.failedAt.IsZero() { 330 delete(r.scopes, scope) 331 } 332} 333 334func (r *roster) markFailed(st *scopeState) { 335 r.mu.Lock() 336 defer r.mu.Unlock() 337 st.failedAt = r.now() 338} 339 340func (r *roster) clearFailed(st *scopeState) { 341 r.mu.Lock() 342 defer r.mu.Unlock() 343 st.failedAt = time.Time{} 344} 345 346func (r *roster) backingOff(scope string) bool { 347 r.mu.Lock() 348 defer r.mu.Unlock() 349 st, ok := r.scopes[scope] 350 if !ok { 351 return false 352 } 353 return !st.failedAt.IsZero() && r.now().Sub(st.failedAt) < reconcileBackoff 354} 355 356func (r *roster) fresh(key string) bool { 357 at, ok, err := getSyncedAt(r.store, key) 358 if err != nil || !ok { 359 return false 360 } 361 return r.now().Sub(at) < r.ttl 362} 363 364func (r *roster) everSynced(key string) bool { 365 _, ok, _ := getSyncedAt(r.store, key) 366 return ok 367} 368 369func memberScope(host string) string { return "m\x00" + host } 370 371func collabScope(repoDid string) string { return "c\x00" + repoDid } 372 373func getSyncedAt(e db.Execer, key string) (time.Time, bool, error) { 374 var raw string 375 err := e.QueryRow(`select synced_at from knotacl_sync where scope_key = ?`, key).Scan(&raw) 376 if errors.Is(err, sql.ErrNoRows) { 377 return time.Time{}, false, nil 378 } 379 if err != nil { 380 return time.Time{}, false, err 381 } 382 at, err := time.Parse(time.RFC3339, raw) 383 if err != nil { 384 return time.Time{}, false, err 385 } 386 return at, true, nil 387} 388 389func setSyncedAt(e db.Execer, key string, at time.Time) error { 390 _, err := e.Exec( 391 `insert into knotacl_sync (scope_key, synced_at) values (?, ?) 392 on conflict(scope_key) do update set synced_at = excluded.synced_at`, 393 key, at.UTC().Format(time.RFC3339), 394 ) 395 return err 396} 397 398func clearSyncedAt(e db.Execer, key string) error { 399 _, err := e.Exec(`delete from knotacl_sync where scope_key = ?`, key) 400 return err 401} 402 403func seenCursor(e db.Execer, scope string, subject syntax.DID) (Cursor, bool, error) { 404 var raw int64 405 err := e.QueryRow( 406 `select cursor from knotacl_delta_cursor where scope_key = ? and subject = ?`, 407 scope, subject.String(), 408 ).Scan(&raw) 409 if errors.Is(err, sql.ErrNoRows) { 410 return 0, false, nil 411 } 412 if err != nil { 413 return 0, false, err 414 } 415 return Cursor(raw), true, nil 416} 417 418func recordCursor(e db.Execer, scope string, subject syntax.DID, cursor Cursor) error { 419 _, err := e.Exec( 420 `insert into knotacl_delta_cursor (scope_key, subject, cursor) values (?, ?, ?) 421 on conflict(scope_key, subject) do update set cursor = excluded.cursor`, 422 scope, subject.String(), int64(cursor), 423 ) 424 return err 425} 426 427func pruneCursors(e db.Execer, scope string, retentionNanos int64) error { 428 _, err := e.Exec( 429 `delete from knotacl_delta_cursor 430 where scope_key = ? 431 and cursor < (select max(cursor) from knotacl_delta_cursor where scope_key = ?) - ?`, 432 scope, scope, retentionNanos, 433 ) 434 return err 435}