Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview: oauth session cache, backoff on stuck migrations

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 13, 2026, 4:51 PM +0300) commit 1212e6d1 parent 69b60c66 change-id kxlornww
+634 -50
+13 -1
appview/db/migration.go
··· 79 79 func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error { 80 80 _, err := e.ExecContext(ctx, 81 81 `insert into pds_migration (name, did, collection, rkey) 82 - values (?, ?, ?, ?)`, 82 + values (?, ?, ?, ?) 83 + on conflict(name, did, collection, rkey) do update set 84 + status = case when pds_migration.status = 'failed' then 'pending' else pds_migration.status end, 85 + retry_count = case when pds_migration.status = 'failed' then 0 else pds_migration.retry_count end, 86 + retry_after = case when pds_migration.status = 'failed' then 0 else pds_migration.retry_after end, 87 + error_msg = case when pds_migration.status = 'failed' then null else pds_migration.error_msg end`, 83 88 name, did, collection, rkey, 89 + ) 90 + return err 91 + } 92 + 93 + func ReapStaleRunningMigrations(ctx context.Context, e Execer) error { 94 + _, err := e.ExecContext(ctx, 95 + `update pds_migration set status = 'pending' where status = 'running'`, 84 96 ) 85 97 return err 86 98 }
+45 -31
appview/migration/migration.go
··· 20 20 21 21 const maxConcurrentMigrations = 8 22 22 23 + type migrator func(ctx context.Context, client *atclient.APIClient, did syntax.DID, aturi syntax.ATURI) error 24 + 25 + type permAuthErrHandler func(ctx context.Context, did syntax.DID, sessId string, err error) bool 26 + 23 27 type Migration struct { 24 - db *db.DB 25 - oauth *oauth.OAuth 26 - dir identity.Directory 27 - logger *slog.Logger 28 - inflight sync.Map 29 - sem chan struct{} 28 + db *db.DB 29 + oauth *oauth.OAuth 30 + dir identity.Directory 31 + logger *slog.Logger 32 + inflight sync.Map 33 + sem chan struct{} 34 + migrators map[string]migrator 35 + onPermAuthErr permAuthErrHandler 30 36 } 31 37 32 38 func NewMigration(db *db.DB, oauth *oauth.OAuth, dir identity.Directory, logger *slog.Logger) *Migration { 33 - return &Migration{ 34 - db: db, 35 - oauth: oauth, 36 - dir: dir, 37 - logger: logger, 38 - sem: make(chan struct{}, maxConcurrentMigrations), 39 + m := &Migration{ 40 + db: db, 41 + oauth: oauth, 42 + dir: dir, 43 + logger: logger, 44 + sem: make(chan struct{}, maxConcurrentMigrations), 45 + onPermAuthErr: oauth.HandlePermanentAuthErr, 39 46 } 47 + m.migrators = map[string]migrator{ 48 + "add-repo-did": m.migrateAddRepoDid, 49 + } 50 + return m 40 51 } 41 52 42 53 func (s *Migration) BackgroundMigrationMiddleware(next http.Handler) http.Handler { ··· 64 75 return 65 76 } 66 77 78 + sessId := s.oauth.GetSessIdFromCookie(r) 67 79 client, err := s.oauth.AuthorizedClient(r) 68 80 if err != nil || client.AccountDID == nil { 69 81 <-s.sem ··· 74 86 go func() { 75 87 defer s.inflight.Delete(did) 76 88 defer func() { <-s.sem }() 77 - s.runPendingMigrations(context.Background(), *client.AccountDID, client) 89 + s.runPendingMigrations(context.Background(), *client.AccountDID, sessId, client) 78 90 }() 79 91 }) 80 92 } 81 93 82 - func (s *Migration) runPendingMigrations(ctx context.Context, did syntax.DID, client *atclient.APIClient) { 94 + func (s *Migration) runPendingMigrations(ctx context.Context, did syntax.DID, sessId string, client *atclient.APIClient) { 83 95 l := s.logger.With("did", did) 84 96 migrations, err := db.ListPendingPdsRecordMigrations(ctx, s.db, did) 85 97 if err != nil { ··· 88 100 } 89 101 90 102 for _, migration := range migrations { 91 - if err := s.migrate(ctx, client, migration); err != nil { 103 + if err := s.migrate(ctx, client, sessId, migration); err != nil { 92 104 l.Error("migration failed", "err", err) 93 105 } 94 106 } 95 107 } 96 108 97 - func (s *Migration) migrate(ctx context.Context, client *atclient.APIClient, migration *models.PDSMigration) error { 109 + func (s *Migration) migrate(ctx context.Context, client *atclient.APIClient, sessId string, migration *models.PDSMigration) error { 98 110 l := s.logger.With( 99 111 "name", migration.Name, 100 112 "aturi", migration.RecordAtUri(), 101 113 ) 102 114 103 - var err error 104 - switch migration.Name { 105 - case "add-repo-did": 106 - err = s.migrateAddRepoDid(ctx, client, migration.Did, migration.RecordAtUri()) 107 - default: 115 + mig, ok := s.migrators[migration.Name] 116 + if !ok { 108 117 return fmt.Errorf("unexpected migration name %s", migration.Name) 109 118 } 119 + err := mig(ctx, client, migration.Did, migration.RecordAtUri()) 110 120 111 121 if err == nil { 112 122 l.Info("migrated") ··· 114 124 } else { 115 125 l.Warn("failed to migrate", "err", err) 116 126 117 - errMsg := err.Error() 118 - var retryCount = migration.RetryCount + 1 119 - var retryAfter = time.Now().Add(3 * time.Second).Unix() 120 - 121 - // remove null bytes 122 - errMsg = strings.ReplaceAll(errMsg, "\x00", "") 127 + errMsg := strings.ReplaceAll(err.Error(), "\x00", "") 128 + migration.ErrorMsg = &errMsg 129 + migration.RetryCount++ 123 130 124 - migration.Status = models.PDSMigrationStatusPending 125 - migration.ErrorMsg = &errMsg 126 - migration.RetryCount = retryCount 127 - migration.RetryAfter = retryAfter 131 + if s.onPermAuthErr(ctx, migration.Did, sessId, err) { 132 + migration.Status = models.PDSMigrationStatusFailed 133 + migration.RetryAfter = 0 134 + } else { 135 + migration.Status = models.PDSMigrationStatusPending 136 + migration.RetryAfter = time.Now().Add(retryBackoff(migration.RetryCount)).Unix() 137 + } 128 138 } 129 139 if err := db.UpdatePdsRecordMigration(ctx, s.db, migration); err != nil { 130 140 return fmt.Errorf("failed to update migration status: %w", err) 131 141 } 132 142 return nil 133 143 } 144 + 145 + func retryBackoff(retries int) time.Duration { 146 + return min(time.Duration(retries)*5*time.Second, time.Hour) 147 + }
+205
appview/migration/migration_test.go
··· 1 + package migration 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "io" 7 + "log/slog" 8 + "path/filepath" 9 + "sync/atomic" 10 + "testing" 11 + 12 + "github.com/bluesky-social/indigo/atproto/atclient" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + 15 + "tangled.org/core/appview/db" 16 + "tangled.org/core/appview/models" 17 + "tangled.org/core/appview/oauth" 18 + ) 19 + 20 + func newTestDB(t *testing.T) *db.DB { 21 + t.Helper() 22 + d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "test.db")) 23 + if err != nil { 24 + t.Fatalf("db.Make: %v", err) 25 + } 26 + t.Cleanup(func() { d.Close() }) 27 + return d 28 + } 29 + 30 + func seedMigration(t *testing.T, d *db.DB, mig *models.PDSMigration) { 31 + t.Helper() 32 + if err := db.EnqueuePdsRecordMigration(context.Background(), d, mig.Name, mig.Did, mig.Collection, mig.Rkey); err != nil { 33 + t.Fatalf("EnqueuePdsRecordMigration: %v", err) 34 + } 35 + } 36 + 37 + func fetch(t *testing.T, d *db.DB, did syntax.DID) *models.PDSMigration { 38 + t.Helper() 39 + rows, err := d.QueryContext(context.Background(), 40 + `select name, did, collection, rkey, status, error_msg, retry_count, retry_after from pds_migration where did = ?`, did) 41 + if err != nil { 42 + t.Fatalf("query: %v", err) 43 + } 44 + defer rows.Close() 45 + if !rows.Next() { 46 + t.Fatalf("no row for did %s", did) 47 + } 48 + var m models.PDSMigration 49 + if err := rows.Scan(&m.Name, &m.Did, &m.Collection, &m.Rkey, &m.Status, &m.ErrorMsg, &m.RetryCount, &m.RetryAfter); err != nil { 50 + t.Fatalf("scan: %v", err) 51 + } 52 + return &m 53 + } 54 + 55 + func newTestMigration(t *testing.T, mig migrator, onPerm permAuthErrHandler) *Migration { 56 + t.Helper() 57 + return &Migration{ 58 + db: newTestDB(t), 59 + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 60 + sem: make(chan struct{}, maxConcurrentMigrations), 61 + migrators: map[string]migrator{"add-repo-did": mig}, 62 + onPermAuthErr: onPerm, 63 + } 64 + } 65 + 66 + func newPDSMigration(did syntax.DID) *models.PDSMigration { 67 + return &models.PDSMigration{ 68 + Name: "add-repo-did", 69 + Did: did, 70 + Collection: "sh.tangled.repo", 71 + Rkey: "abc", 72 + Status: models.PDSMigrationStatusRunning, 73 + } 74 + } 75 + 76 + func TestMigrateInvalidGrantMarksFailed(t *testing.T) { 77 + did := syntax.DID("did:plc:boltless") 78 + var permCalled atomic.Int32 79 + m := newTestMigration(t, 80 + func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { 81 + return errors.New("put record: failed to refresh OAuth tokens: token refresh failed: auth server request failed (HTTP 400): invalid_grant") 82 + }, 83 + func(_ context.Context, _ syntax.DID, _ string, err error) bool { 84 + permCalled.Add(1) 85 + return oauth.IsPermanentAuthErr(err) 86 + }, 87 + ) 88 + seedMigration(t, m.db, newPDSMigration(did)) 89 + pm := newPDSMigration(did) 90 + 91 + if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil { 92 + t.Fatalf("migrate: %v", err) 93 + } 94 + 95 + got := fetch(t, m.db, did) 96 + if got.Status != models.PDSMigrationStatusFailed { 97 + t.Fatalf("status = %s, want failed", got.Status) 98 + } 99 + if got.RetryAfter != 0 { 100 + t.Fatalf("RetryAfter = %d, want 0", got.RetryAfter) 101 + } 102 + if permCalled.Load() != 1 { 103 + t.Fatalf("onPermAuthErr called %d times, want 1", permCalled.Load()) 104 + } 105 + } 106 + 107 + func TestMigrateTransientErrorStaysPending(t *testing.T) { 108 + did := syntax.DID("did:plc:akshay") 109 + m := newTestMigration(t, 110 + func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { 111 + return errors.New("put record: failed to refresh OAuth tokens: token refresh failed (HTTP 429): rate_limited") 112 + }, 113 + func(_ context.Context, _ syntax.DID, _ string, err error) bool { 114 + return oauth.IsPermanentAuthErr(err) 115 + }, 116 + ) 117 + seedMigration(t, m.db, newPDSMigration(did)) 118 + pm := newPDSMigration(did) 119 + 120 + if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil { 121 + t.Fatalf("migrate: %v", err) 122 + } 123 + 124 + got := fetch(t, m.db, did) 125 + if got.Status != models.PDSMigrationStatusPending { 126 + t.Fatalf("status = %s, want pending", got.Status) 127 + } 128 + if got.RetryCount != 1 { 129 + t.Fatalf("RetryCount = %d, want 1", got.RetryCount) 130 + } 131 + if got.RetryAfter == 0 { 132 + t.Fatalf("RetryAfter not scheduled") 133 + } 134 + } 135 + 136 + func TestMigrateSuccess(t *testing.T) { 137 + did := syntax.DID("did:plc:boltless") 138 + m := newTestMigration(t, 139 + func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { return nil }, 140 + func(context.Context, syntax.DID, string, error) bool { return false }, 141 + ) 142 + seedMigration(t, m.db, newPDSMigration(did)) 143 + pm := newPDSMigration(did) 144 + 145 + if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil { 146 + t.Fatalf("migrate: %v", err) 147 + } 148 + 149 + got := fetch(t, m.db, did) 150 + if got.Status != models.PDSMigrationStatusDone { 151 + t.Fatalf("status = %s, want done", got.Status) 152 + } 153 + } 154 + 155 + func TestEnqueueResetsFailedToPending(t *testing.T) { 156 + d := newTestDB(t) 157 + did := syntax.DID("did:plc:boltless") 158 + seed := newPDSMigration(did) 159 + seedMigration(t, d, seed) 160 + 161 + errMsg := "some prior failure" 162 + failed := *seed 163 + failed.Status = models.PDSMigrationStatusFailed 164 + failed.RetryCount = 7 165 + failed.ErrorMsg = &errMsg 166 + if err := db.UpdatePdsRecordMigration(context.Background(), d, &failed); err != nil { 167 + t.Fatalf("UpdatePdsRecordMigration: %v", err) 168 + } 169 + 170 + if err := db.EnqueuePdsRecordMigration(context.Background(), d, seed.Name, seed.Did, seed.Collection, seed.Rkey); err != nil { 171 + t.Fatalf("re-enqueue: %v", err) 172 + } 173 + 174 + got := fetch(t, d, did) 175 + if got.Status != models.PDSMigrationStatusPending { 176 + t.Fatalf("status = %s, want pending", got.Status) 177 + } 178 + if got.RetryCount != 0 { 179 + t.Fatalf("RetryCount = %d, want 0", got.RetryCount) 180 + } 181 + if got.ErrorMsg != nil { 182 + t.Fatalf("ErrorMsg = %v, want nil", got.ErrorMsg) 183 + } 184 + } 185 + 186 + func TestReapStaleRunning(t *testing.T) { 187 + d := newTestDB(t) 188 + did := syntax.DID("did:plc:akshay") 189 + seed := newPDSMigration(did) 190 + seedMigration(t, d, seed) 191 + running := *seed 192 + running.Status = models.PDSMigrationStatusRunning 193 + if err := db.UpdatePdsRecordMigration(context.Background(), d, &running); err != nil { 194 + t.Fatalf("update: %v", err) 195 + } 196 + 197 + if err := db.ReapStaleRunningMigrations(context.Background(), d); err != nil { 198 + t.Fatalf("reap: %v", err) 199 + } 200 + 201 + got := fetch(t, d, did) 202 + if got.Status != models.PDSMigrationStatusPending { 203 + t.Fatalf("status = %s, want pending", got.Status) 204 + } 205 + }
+1
appview/models/migration.go
··· 30 30 PDSMigrationStatusPending PDSMigrationStatus = "pending" 31 31 PDSMigrationStatusRunning PDSMigrationStatus = "running" 32 32 PDSMigrationStatusDone PDSMigrationStatus = "done" 33 + PDSMigrationStatusFailed PDSMigrationStatus = "failed" 33 34 ) 34 35 35 36 func (m *PDSMigration) RecordAtUri() syntax.ATURI {
+198
appview/oauth/cache_test.go
··· 1 + package oauth 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "io" 7 + "log/slog" 8 + "sync" 9 + "sync/atomic" 10 + "testing" 11 + 12 + "github.com/bluesky-social/indigo/atproto/atcrypto" 13 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/hashicorp/golang-lru/v2/expirable" 16 + ) 17 + 18 + func discardLogger(t *testing.T) *slog.Logger { 19 + t.Helper() 20 + return slog.New(slog.NewTextHandler(io.Discard, nil)) 21 + } 22 + 23 + type stubStore struct { 24 + mu sync.Mutex 25 + data map[string]oauth.ClientSessionData 26 + getSessionCalls atomic.Int32 27 + deleteCalls atomic.Int32 28 + } 29 + 30 + func (s *stubStore) key(did syntax.DID, sessId string) string { 31 + return string(did) + ":" + sessId 32 + } 33 + 34 + func (s *stubStore) GetSession(_ context.Context, did syntax.DID, sessId string) (*oauth.ClientSessionData, error) { 35 + s.getSessionCalls.Add(1) 36 + s.mu.Lock() 37 + defer s.mu.Unlock() 38 + v, ok := s.data[s.key(did, sessId)] 39 + if !ok { 40 + return nil, errors.New("no such session") 41 + } 42 + clone := v 43 + return &clone, nil 44 + } 45 + 46 + func (s *stubStore) SaveSession(_ context.Context, sess oauth.ClientSessionData) error { 47 + s.mu.Lock() 48 + defer s.mu.Unlock() 49 + s.data[s.key(sess.AccountDID, sess.SessionID)] = sess 50 + return nil 51 + } 52 + 53 + func (s *stubStore) DeleteSession(_ context.Context, did syntax.DID, sessId string) error { 54 + s.deleteCalls.Add(1) 55 + s.mu.Lock() 56 + defer s.mu.Unlock() 57 + delete(s.data, s.key(did, sessId)) 58 + return nil 59 + } 60 + 61 + func (s *stubStore) GetAuthRequestInfo(context.Context, string) (*oauth.AuthRequestData, error) { 62 + return nil, errors.New("not used") 63 + } 64 + func (s *stubStore) SaveAuthRequestInfo(context.Context, oauth.AuthRequestData) error { 65 + return nil 66 + } 67 + func (s *stubStore) DeleteAuthRequestInfo(context.Context, string) error { return nil } 68 + 69 + func newTestOAuth(t *testing.T) (*OAuth, *stubStore) { 70 + t.Helper() 71 + priv, err := atcrypto.GeneratePrivateKeyP256() 72 + if err != nil { 73 + t.Fatalf("generate key: %v", err) 74 + } 75 + store := &stubStore{data: map[string]oauth.ClientSessionData{}} 76 + store.data[store.key("did:plc:boltless", "sess1")] = oauth.ClientSessionData{ 77 + AccountDID: "did:plc:boltless", 78 + SessionID: "sess1", 79 + HostURL: "https://pds.example", 80 + AuthServerURL: "https://pds.example", 81 + AuthServerTokenEndpoint: "https://pds.example/oauth/token", 82 + DPoPPrivateKeyMultibase: priv.Multibase(), 83 + } 84 + 85 + cfg := oauth.NewLocalhostConfig("http://127.0.0.1/cb", []string{"atproto"}) 86 + app := oauth.NewClientApp(&cfg, store) 87 + o := &OAuth{ 88 + ClientApp: app, 89 + Logger: discardLogger(t), 90 + sessionCache: expirable.NewLRU[string, *oauth.ClientSession](sessionCacheSize, nil, sessionCacheTTL), 91 + } 92 + return o, store 93 + } 94 + 95 + func TestResumeSessionSingleflightDedupes(t *testing.T) { 96 + o, store := newTestOAuth(t) 97 + 98 + const n = 32 99 + var wg sync.WaitGroup 100 + results := make([]*oauth.ClientSession, n) 101 + errs := make([]error, n) 102 + wg.Add(n) 103 + for i := range n { 104 + go func() { 105 + defer wg.Done() 106 + sess, err := o.resumeSession(context.Background(), "did:plc:boltless", "sess1") 107 + results[i] = sess 108 + errs[i] = err 109 + }() 110 + } 111 + wg.Wait() 112 + 113 + for i, err := range errs { 114 + if err != nil { 115 + t.Fatalf("goroutine %d: %v", i, err) 116 + } 117 + } 118 + first := results[0] 119 + if first == nil { 120 + t.Fatal("first session is nil") 121 + } 122 + for i, s := range results { 123 + if s != first { 124 + t.Fatalf("goroutine %d got different *ClientSession (%p vs %p)", i, s, first) 125 + } 126 + } 127 + calls := store.getSessionCalls.Load() 128 + if calls > 1 { 129 + t.Fatalf("GetSession called %d times, want 1", calls) 130 + } 131 + } 132 + 133 + func TestResumeSessionReuseAfterCache(t *testing.T) { 134 + o, store := newTestOAuth(t) 135 + 136 + a, err := o.resumeSession(context.Background(), "did:plc:boltless", "sess1") 137 + if err != nil { 138 + t.Fatalf("first: %v", err) 139 + } 140 + b, err := o.resumeSession(context.Background(), "did:plc:boltless", "sess1") 141 + if err != nil { 142 + t.Fatalf("second: %v", err) 143 + } 144 + if a != b { 145 + t.Fatalf("expected same pointer across cache hit") 146 + } 147 + if got := store.getSessionCalls.Load(); got != 1 { 148 + t.Fatalf("GetSession called %d times, want 1", got) 149 + } 150 + } 151 + 152 + func TestHandlePermanentAuthErrEvictsAndLogsOut(t *testing.T) { 153 + o, store := newTestOAuth(t) 154 + 155 + if _, err := o.resumeSession(context.Background(), "did:plc:boltless", "sess1"); err != nil { 156 + t.Fatalf("seed: %v", err) 157 + } 158 + if _, ok := o.sessionCache.Get(sessionCacheKey("did:plc:boltless", "sess1")); !ok { 159 + t.Fatal("cache missing after resume") 160 + } 161 + 162 + handled := o.HandlePermanentAuthErr( 163 + context.Background(), "did:plc:boltless", "sess1", 164 + errors.New("auth server request failed (HTTP 400): invalid_grant"), 165 + ) 166 + if !handled { 167 + t.Fatal("HandlePermanentAuthErr returned false") 168 + } 169 + if _, ok := o.sessionCache.Get(sessionCacheKey("did:plc:boltless", "sess1")); ok { 170 + t.Fatal("cache still holds entry after HandlePermanentAuthErr") 171 + } 172 + if got := store.deleteCalls.Load(); got != 1 { 173 + t.Fatalf("store.DeleteSession called %d times, want 1", got) 174 + } 175 + if _, ok := store.data[store.key("did:plc:boltless", "sess1")]; ok { 176 + t.Fatal("store still holds session after Logout") 177 + } 178 + } 179 + 180 + func TestHandlePermanentAuthErrIgnoresTransient(t *testing.T) { 181 + o, store := newTestOAuth(t) 182 + if _, err := o.resumeSession(context.Background(), "did:plc:boltless", "sess1"); err != nil { 183 + t.Fatalf("seed: %v", err) 184 + } 185 + handled := o.HandlePermanentAuthErr( 186 + context.Background(), "did:plc:boltless", "sess1", 187 + errors.New("token refresh failed (HTTP 429): rate_limited"), 188 + ) 189 + if handled { 190 + t.Fatal("HandlePermanentAuthErr matched a transient error") 191 + } 192 + if _, ok := o.sessionCache.Get(sessionCacheKey("did:plc:boltless", "sess1")); !ok { 193 + t.Fatal("transient error evicted cache") 194 + } 195 + if got := store.deleteCalls.Load(); got != 0 { 196 + t.Fatalf("store.DeleteSession called %d times, want 0", got) 197 + } 198 + }
+22
appview/oauth/errors.go
··· 1 + package oauth 2 + 3 + import "regexp" 4 + 5 + var ( 6 + permanentAuthErrorRe = regexp.MustCompile(`\b(invalid_grant|invalid_client|unauthorized_client)\b`) 7 + staleAccessTokenErrRe = regexp.MustCompile(`HTTP 401\b.*(AuthenticationRequired|Invalid OAuth access token|invalid_token)`) 8 + ) 9 + 10 + func IsPermanentAuthErr(err error) bool { 11 + if err == nil { 12 + return false 13 + } 14 + return permanentAuthErrorRe.MatchString(err.Error()) 15 + } 16 + 17 + func IsStaleAccessTokenErr(err error) bool { 18 + if err == nil { 19 + return false 20 + } 21 + return staleAccessTokenErrRe.MatchString(err.Error()) 22 + }
+58
appview/oauth/errors_test.go
··· 1 + package oauth 2 + 3 + import ( 4 + "errors" 5 + "fmt" 6 + "testing" 7 + ) 8 + 9 + func TestIsPermanentAuthErr(t *testing.T) { 10 + cases := []struct { 11 + name string 12 + err error 13 + want bool 14 + }{ 15 + {"nil", nil, false}, 16 + {"empty", errors.New(""), false}, 17 + {"random", errors.New("network unreachable"), false}, 18 + {"rate limited", errors.New("token refresh failed (HTTP 429): rate_limited"), false}, 19 + {"invalid grant direct", errors.New("token refresh failed (HTTP 400): invalid_grant"), true}, 20 + {"invalid grant wrapped", fmt.Errorf("put record: %w", errors.New("failed to refresh OAuth tokens: token refresh failed: auth server request failed (HTTP 400): invalid_grant")), true}, 21 + {"invalid client", errors.New("auth server request failed (HTTP 401): invalid_client"), true}, 22 + {"unauthorized client", errors.New("token refresh failed (HTTP 400): unauthorized_client"), true}, 23 + {"substring trap", errors.New("our invalid_grant_alternative ran out"), false}, 24 + {"case-sensitive", errors.New("INVALID_GRANT"), false}, 25 + } 26 + for _, c := range cases { 27 + t.Run(c.name, func(t *testing.T) { 28 + got := IsPermanentAuthErr(c.err) 29 + if got != c.want { 30 + t.Fatalf("got %v want %v", got, c.want) 31 + } 32 + }) 33 + } 34 + } 35 + 36 + func TestIsStaleAccessTokenErr(t *testing.T) { 37 + cases := []struct { 38 + name string 39 + err error 40 + want bool 41 + }{ 42 + {"nil", nil, false}, 43 + {"random", errors.New("hello"), false}, 44 + {"500", errors.New("API request failed (HTTP 500): InternalError"), false}, 45 + {"401 auth required", errors.New("API request failed (HTTP 401): AuthenticationRequired: Invalid OAuth access token"), true}, 46 + {"401 invalid token", errors.New("API request failed (HTTP 401): invalid_token"), true}, 47 + {"401 wrapped", fmt.Errorf("put record: %w", errors.New("API request failed (HTTP 401): AuthenticationRequired")), true}, 48 + {"403 forbidden", errors.New("API request failed (HTTP 403): Forbidden"), false}, 49 + } 50 + for _, c := range cases { 51 + t.Run(c.name, func(t *testing.T) { 52 + got := IsStaleAccessTokenErr(c.err) 53 + if got != c.want { 54 + t.Fatalf("got %v want %v", got, c.want) 55 + } 56 + }) 57 + } 58 + }
+1 -1
appview/oauth/handler.go
··· 238 238 239 239 l.Debug("creating empty Tangled profile") 240 240 241 - sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID) 241 + sess, err := o.resumeSession(ctx, sessData.AccountDID, sessData.SessionID) 242 242 if err != nil { 243 243 l.Error("failed to resume session for profile creation", "err", err) 244 244 return
+82 -13
appview/oauth/oauth.go
··· 17 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 18 xrpc "github.com/bluesky-social/indigo/xrpc" 19 19 "github.com/gorilla/sessions" 20 + "github.com/hashicorp/golang-lru/v2/expirable" 20 21 "github.com/posthog/posthog-go" 22 + "golang.org/x/sync/singleflight" 21 23 "tangled.org/core/appview/config" 22 24 "tangled.org/core/appview/db" 23 25 "tangled.org/core/idresolver" 24 26 "tangled.org/core/rbac" 25 27 ) 26 28 29 + const ( 30 + sessionCacheSize = 10000 31 + sessionCacheTTL = time.Hour 32 + ) 33 + 27 34 type OAuth struct { 28 35 ClientApp *oauth.ClientApp 29 36 SessStore *sessions.CookieStore ··· 39 46 40 47 appPasswordSession *AppPasswordSession 41 48 appPasswordSessionMu sync.Mutex 49 + 50 + sessionCache *expirable.LRU[string, *oauth.ClientSession] 51 + sessionSF singleflight.Group 52 + } 53 + 54 + func sessionCacheKey(did syntax.DID, sessionId string) string { 55 + return string(did) + ":" + sessionId 56 + } 57 + 58 + func (o *OAuth) resumeSession(ctx context.Context, did syntax.DID, sessionId string) (*oauth.ClientSession, error) { 59 + key := sessionCacheKey(did, sessionId) 60 + if v, ok := o.sessionCache.Get(key); ok { 61 + return v, nil 62 + } 63 + v, err, _ := o.sessionSF.Do(key, func() (any, error) { 64 + if v, ok := o.sessionCache.Get(key); ok { 65 + return v, nil 66 + } 67 + sess, err := o.ClientApp.ResumeSession(ctx, did, sessionId) 68 + if err != nil { 69 + return nil, err 70 + } 71 + o.sessionCache.Add(key, sess) 72 + return sess, nil 73 + }) 74 + if err != nil { 75 + return nil, err 76 + } 77 + return v.(*oauth.ClientSession), nil 78 + } 79 + 80 + func (o *OAuth) EvictSession(did syntax.DID, sessionId string) { 81 + o.sessionCache.Remove(sessionCacheKey(did, sessionId)) 82 + } 83 + 84 + func (o *OAuth) HandlePermanentAuthErr(ctx context.Context, did syntax.DID, sessionId string, err error) bool { 85 + if !IsPermanentAuthErr(err) { 86 + return false 87 + } 88 + o.EvictSession(did, sessionId) 89 + if logoutErr := o.ClientApp.Logout(ctx, did, sessionId); logoutErr != nil { 90 + o.Logger.Warn("store logout after permanent auth error failed", "did", did, "err", logoutErr) 91 + } 92 + return true 42 93 } 43 94 44 95 func New(config *config.Config, ph posthog.Client, db *db.DB, enforcer *rbac.Enforcer, res *idresolver.Resolver, logger *slog.Logger) (*OAuth, error) { ··· 89 140 90 141 logger.Info("oauth setup successfully", "IsConfidential", clientApp.Config.IsConfidential()) 91 142 return &OAuth{ 92 - ClientApp: clientApp, 93 - Config: config, 94 - SessStore: sessStore, 95 - JwksUri: jwksUri, 96 - ClientName: clientName, 97 - ClientUri: clientUri, 98 - Posthog: ph, 99 - Db: db, 100 - Enforcer: enforcer, 101 - IdResolver: res, 102 - Logger: logger, 143 + ClientApp: clientApp, 144 + Config: config, 145 + SessStore: sessStore, 146 + JwksUri: jwksUri, 147 + ClientName: clientName, 148 + ClientUri: clientUri, 149 + Posthog: ph, 150 + Db: db, 151 + Enforcer: enforcer, 152 + IdResolver: res, 153 + Logger: logger, 154 + sessionCache: expirable.NewLRU[string, *oauth.ClientSession](sessionCacheSize, nil, sessionCacheTTL), 103 155 }, nil 104 156 } 105 157 ··· 148 200 149 201 sessId := userSession.Values[SessionId].(string) 150 202 151 - clientSess, err := o.ClientApp.ResumeSession(r.Context(), sessDid, sessId) 203 + clientSess, err := o.resumeSession(r.Context(), sessDid, sessId) 152 204 if err != nil { 153 205 return nil, fmt.Errorf("failed to resume session: %w", err) 154 206 } ··· 172 224 } 173 225 174 226 sessId := userSession.Values[SessionId].(string) 227 + 228 + o.EvictSession(sessDid, sessId) 175 229 176 230 // delete the session 177 231 err1 := o.ClientApp.Logout(r.Context(), sessDid, sessId) 178 232 if err1 != nil { 179 233 err1 = fmt.Errorf("failed to logout: %w", err1) 180 234 } 235 + o.EvictSession(sessDid, sessId) 181 236 182 237 // remove the cookie 183 238 userSession.Options.MaxAge = -1 ··· 201 256 return fmt.Errorf("invalid DID: %w", err) 202 257 } 203 258 204 - sess, err := o.ClientApp.ResumeSession(r.Context(), did, account.SessionId) 259 + sess, err := o.resumeSession(r.Context(), did, account.SessionId) 205 260 if err != nil { 206 261 registry.RemoveAccount(targetDid) 207 262 _ = o.saveAccounts(w, r, registry) ··· 230 285 231 286 did, err := syntax.ParseDID(targetDid) 232 287 if err == nil { 288 + o.EvictSession(did, account.SessionId) 233 289 _ = o.ClientApp.Logout(r.Context(), did, account.SessionId) 290 + o.EvictSession(did, account.SessionId) 234 291 } 235 292 236 293 registry.RemoveAccount(targetDid) ··· 259 316 return "" 260 317 } 261 318 return parsed 319 + } 320 + 321 + func (o *OAuth) GetSessIdFromCookie(r *http.Request) string { 322 + userSession, err := o.SessStore.Get(r, SessionName) 323 + if err != nil || userSession.IsNew { 324 + return "" 325 + } 326 + s, ok := userSession.Values[SessionId].(string) 327 + if !ok { 328 + return "" 329 + } 330 + return s 262 331 } 263 332 264 333 func (o *OAuth) AuthorizedClient(r *http.Request) (*atclient.APIClient, error) {
+8 -3
appview/state/router.go
··· 1 1 package state 2 2 3 3 import ( 4 + "context" 4 5 "database/sql" 5 6 "errors" 6 7 "net/http" ··· 13 14 "tangled.org/core/appview/labels" 14 15 "tangled.org/core/appview/metrics" 15 16 "tangled.org/core/appview/middleware" 16 - "tangled.org/core/appview/migration" 17 + // "tangled.org/core/appview/migration" 17 18 "tangled.org/core/appview/notifications" 18 19 "tangled.org/core/appview/pipelines" 19 20 "tangled.org/core/appview/pulls" ··· 41 42 42 43 router.Use(metrics.Middleware) 43 44 44 - m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger) 45 - router.Use(m.BackgroundMigrationMiddleware) 45 + if err := db.ReapStaleRunningMigrations(context.Background(), s.db); err != nil { 46 + s.logger.Warn("failed to reap stale running migrations", "err", err) 47 + } 48 + // PDS record migrator disabled while we isolate OAuth refresh behaviour. 49 + // m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger) 50 + // router.Use(m.BackgroundMigrationMiddleware) 46 51 47 52 router.Get("/pwa-manifest.json", s.WebAppManifest) 48 53 router.Get("/robots.txt", s.RobotsTxt)
+1 -1
go.mod
··· 34 34 github.com/gorilla/feeds v1.2.0 35 35 github.com/gorilla/sessions v1.4.0 36 36 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 37 + github.com/hashicorp/golang-lru/v2 v2.0.7 37 38 github.com/hiddeco/sshsig v0.2.0 38 39 github.com/hpcloud/tail v1.0.0 39 40 github.com/ipfs/go-cid v0.6.0 ··· 159 160 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect 160 161 github.com/hashicorp/go-sockaddr v1.0.7 // indirect 161 162 github.com/hashicorp/golang-lru v1.0.2 // indirect 162 - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 163 163 github.com/hashicorp/hcl v1.0.1-vault-7 // indirect 164 164 github.com/hexops/gotextdiff v1.0.3 // indirect 165 165 github.com/ipfs/bbloom v0.0.4 // indirect