Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

appview: smarter pds rewrites per req

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 12, 2026, 9:38 PM +0300) commit ba18ec20 parent 21212008 change-id lksulkuu
+79 -12
+18
appview/db/migration.go
··· 58 58 return migrations, nil 59 59 } 60 60 61 + func HasPendingPdsRecordMigration(ctx context.Context, e Execer, user syntax.DID) (bool, error) { 62 + var exists bool 63 + err := e.QueryRowContext(ctx, 64 + `select exists( 65 + select 1 from pds_migration 66 + where did = ? 67 + and status = 'pending' 68 + and retry_after < ? 69 + )`, 70 + user, 71 + time.Now().Unix(), 72 + ).Scan(&exists) 73 + if err != nil { 74 + return false, err 75 + } 76 + return exists, nil 77 + } 78 + 61 79 func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error { 62 80 _, err := e.ExecContext(ctx, 63 81 `insert into pds_migration (name, did, collection, rkey)
+42 -9
appview/migration/migration.go
··· 6 6 "log/slog" 7 7 "net/http" 8 8 "strings" 9 + "sync" 9 10 "time" 10 11 11 12 "github.com/bluesky-social/indigo/atproto/atclient" ··· 17 18 "tangled.org/core/appview/oauth" 18 19 ) 19 20 21 + const maxConcurrentMigrations = 8 22 + 20 23 type Migration struct { 21 - db *db.DB 22 - oauth *oauth.OAuth 23 - dir identity.Directory 24 - logger *slog.Logger 24 + db *db.DB 25 + oauth *oauth.OAuth 26 + dir identity.Directory 27 + logger *slog.Logger 28 + inflight sync.Map 29 + sem chan struct{} 25 30 } 26 31 27 32 func NewMigration(db *db.DB, oauth *oauth.OAuth, dir identity.Directory, logger *slog.Logger) *Migration { 28 33 return &Migration{ 29 - db, oauth, dir, logger, 34 + db: db, 35 + oauth: oauth, 36 + dir: dir, 37 + logger: logger, 38 + sem: make(chan struct{}, maxConcurrentMigrations), 30 39 } 31 40 } 32 41 ··· 34 43 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 35 44 defer next.ServeHTTP(w, r) 36 45 37 - client, err := s.oauth.AuthorizedClient(r) 38 - if err != nil { 46 + did := s.oauth.GetDidFromCookie(r) 47 + if did == "" { 48 + return 49 + } 50 + 51 + hasPending, err := db.HasPendingPdsRecordMigration(r.Context(), s.db, did) 52 + if err != nil || !hasPending { 53 + return 54 + } 55 + 56 + if _, loaded := s.inflight.LoadOrStore(did, struct{}{}); loaded { 57 + return 58 + } 59 + 60 + select { 61 + case s.sem <- struct{}{}: 62 + default: 63 + s.inflight.Delete(did) 39 64 return 40 65 } 41 - if client.AccountDID == nil { 66 + 67 + client, err := s.oauth.AuthorizedClient(r) 68 + if err != nil || client.AccountDID == nil { 69 + <-s.sem 70 + s.inflight.Delete(did) 42 71 return 43 72 } 44 73 45 - go s.runPendingMigrations(context.Background(), *client.AccountDID, client) 74 + go func() { 75 + defer s.inflight.Delete(did) 76 + defer func() { <-s.sem }() 77 + s.runPendingMigrations(context.Background(), *client.AccountDID, client) 78 + }() 46 79 }) 47 80 } 48 81
+16
appview/oauth/oauth.go
··· 245 245 return "" 246 246 } 247 247 248 + func (o *OAuth) GetDidFromCookie(r *http.Request) syntax.DID { 249 + userSession, err := o.SessStore.Get(r, SessionName) 250 + if err != nil || userSession.IsNew { 251 + return "" 252 + } 253 + d, ok := userSession.Values[SessionDid].(string) 254 + if !ok { 255 + return "" 256 + } 257 + parsed, err := syntax.ParseDID(d) 258 + if err != nil { 259 + return "" 260 + } 261 + return parsed 262 + } 263 + 248 264 func (o *OAuth) AuthorizedClient(r *http.Request) (*atclient.APIClient, error) { 249 265 session, err := o.ResumeSession(r) 250 266 if err != nil {
+3 -3
appview/state/router.go
··· 13 13 "tangled.org/core/appview/labels" 14 14 "tangled.org/core/appview/metrics" 15 15 "tangled.org/core/appview/middleware" 16 - // "tangled.org/core/appview/migration" 16 + "tangled.org/core/appview/migration" 17 17 "tangled.org/core/appview/notifications" 18 18 "tangled.org/core/appview/pipelines" 19 19 "tangled.org/core/appview/pulls" ··· 41 41 42 42 router.Use(metrics.Middleware) 43 43 44 - // m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger) 45 - // router.Use(m.BackgroundMigrationMiddleware) 44 + m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger) 45 + router.Use(m.BackgroundMigrationMiddleware) 46 46 47 47 router.Get("/pwa-manifest.json", s.WebAppManifest) 48 48 router.Get("/robots.txt", s.RobotsTxt)