Monorepo for Tangled
tangled.org
1package migration
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/atclient"
12 "github.com/bluesky-social/indigo/atproto/identity"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/models"
17 "tangled.org/core/appview/oauth"
18)
19
20type Migration struct {
21 db *db.DB
22 oauth *oauth.OAuth
23 dir identity.Directory
24 logger *slog.Logger
25}
26
27func NewMigration(db *db.DB, oauth *oauth.OAuth, dir identity.Directory, logger *slog.Logger) *Migration {
28 return &Migration{
29 db, oauth, dir, logger,
30 }
31}
32
33func (s *Migration) BackgroundMigrationMiddleware(next http.Handler) http.Handler {
34 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
35 defer next.ServeHTTP(w, r)
36
37 client, err := s.oauth.AuthorizedClient(r)
38 if err != nil {
39 return
40 }
41 if client.AccountDID == nil {
42 return
43 }
44
45 go s.runPendingMigrations(context.Background(), *client.AccountDID, client)
46 })
47}
48
49func (s *Migration) runPendingMigrations(ctx context.Context, did syntax.DID, client *atclient.APIClient) {
50 l := s.logger.With("did", did)
51 migrations, err := db.ListPendingPdsRecordMigrations(ctx, s.db, did)
52 if err != nil {
53 l.Error("failed to query pending migrations", "err", err)
54 return
55 }
56
57 for _, migration := range migrations {
58 if err := s.migrate(ctx, client, migration); err != nil {
59 l.Error("migration failed", "err", err)
60 }
61 }
62}
63
64func (s *Migration) migrate(ctx context.Context, client *atclient.APIClient, migration *models.PDSMigration) error {
65 l := s.logger.With(
66 "name", migration.Name,
67 "aturi", migration.RecordAtUri(),
68 )
69
70 var err error
71 switch migration.Name {
72 case "add-repo-did":
73 err = s.migrateAddRepoDid(ctx, client, migration.Did, migration.RecordAtUri())
74 default:
75 return fmt.Errorf("unexpected migration name %s", migration.Name)
76 }
77
78 if err == nil {
79 l.Info("migrated")
80 migration.Status = models.PDSMigrationStatusDone
81 } else {
82 l.Warn("failed to migrate", "err", err)
83
84 errMsg := err.Error()
85 var retryCount = migration.RetryCount + 1
86 var retryAfter = time.Now().Add(3 * time.Second).Unix()
87
88 // remove null bytes
89 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
90
91 migration.Status = models.PDSMigrationStatusPending
92 migration.ErrorMsg = &errMsg
93 migration.RetryCount = retryCount
94 migration.RetryAfter = retryAfter
95 }
96 if err := db.UpdatePdsRecordMigration(ctx, s.db, migration); err != nil {
97 return fmt.Errorf("failed to update migration status: %w", err)
98 }
99 return nil
100}