Monorepo for Tangled
tangled.org
1package migration
2
3import (
4 "context"
5 "errors"
6 "io"
7 "log/slog"
8 "path/filepath"
9 "sync/atomic"
10 "testing"
11
12 "github.com/bluesky-social/indigo/atproto/atclient"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/models"
17 "tangled.org/core/appview/oauth"
18)
19
20func newTestDB(t *testing.T) *db.DB {
21 t.Helper()
22 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "test.db"))
23 if err != nil {
24 t.Fatalf("db.Make: %v", err)
25 }
26 t.Cleanup(func() { d.Close() })
27 return d
28}
29
30func seedMigration(t *testing.T, d *db.DB, mig *models.PDSMigration) {
31 t.Helper()
32 if err := db.EnqueuePdsRecordMigration(context.Background(), d, mig.Name, mig.Did, mig.Collection, mig.Rkey); err != nil {
33 t.Fatalf("EnqueuePdsRecordMigration: %v", err)
34 }
35}
36
37func fetch(t *testing.T, d *db.DB, did syntax.DID) *models.PDSMigration {
38 t.Helper()
39 rows, err := d.QueryContext(context.Background(),
40 `select name, did, collection, rkey, status, error_msg, retry_count, retry_after from pds_migration where did = ?`, did)
41 if err != nil {
42 t.Fatalf("query: %v", err)
43 }
44 defer rows.Close()
45 if !rows.Next() {
46 t.Fatalf("no row for did %s", did)
47 }
48 var m models.PDSMigration
49 if err := rows.Scan(&m.Name, &m.Did, &m.Collection, &m.Rkey, &m.Status, &m.ErrorMsg, &m.RetryCount, &m.RetryAfter); err != nil {
50 t.Fatalf("scan: %v", err)
51 }
52 return &m
53}
54
55func newTestMigration(t *testing.T, mig migrator, onPerm permAuthErrHandler) *Migration {
56 t.Helper()
57 return &Migration{
58 db: newTestDB(t),
59 logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
60 sem: make(chan struct{}, maxConcurrentMigrations),
61 migrators: map[string]migrator{"add-repo-did": mig},
62 onPermAuthErr: onPerm,
63 }
64}
65
66func newPDSMigration(did syntax.DID) *models.PDSMigration {
67 return &models.PDSMigration{
68 Name: "add-repo-did",
69 Did: did,
70 Collection: "sh.tangled.repo",
71 Rkey: "abc",
72 Status: models.PDSMigrationStatusRunning,
73 }
74}
75
76func TestMigrateInvalidGrantMarksFailed(t *testing.T) {
77 did := syntax.DID("did:plc:boltless")
78 var permCalled atomic.Int32
79 m := newTestMigration(t,
80 func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error {
81 return errors.New("put record: failed to refresh OAuth tokens: token refresh failed: auth server request failed (HTTP 400): invalid_grant")
82 },
83 func(_ context.Context, _ syntax.DID, _ string, err error) bool {
84 permCalled.Add(1)
85 return oauth.IsPermanentAuthErr(err)
86 },
87 )
88 seedMigration(t, m.db, newPDSMigration(did))
89 pm := newPDSMigration(did)
90
91 if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil {
92 t.Fatalf("migrate: %v", err)
93 }
94
95 got := fetch(t, m.db, did)
96 if got.Status != models.PDSMigrationStatusFailed {
97 t.Fatalf("status = %s, want failed", got.Status)
98 }
99 if got.RetryAfter != 0 {
100 t.Fatalf("RetryAfter = %d, want 0", got.RetryAfter)
101 }
102 if permCalled.Load() != 1 {
103 t.Fatalf("onPermAuthErr called %d times, want 1", permCalled.Load())
104 }
105}
106
107func TestMigrateTransientErrorStaysPending(t *testing.T) {
108 did := syntax.DID("did:plc:akshay")
109 m := newTestMigration(t,
110 func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error {
111 return errors.New("put record: failed to refresh OAuth tokens: token refresh failed (HTTP 429): rate_limited")
112 },
113 func(_ context.Context, _ syntax.DID, _ string, err error) bool {
114 return oauth.IsPermanentAuthErr(err)
115 },
116 )
117 seedMigration(t, m.db, newPDSMigration(did))
118 pm := newPDSMigration(did)
119
120 if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil {
121 t.Fatalf("migrate: %v", err)
122 }
123
124 got := fetch(t, m.db, did)
125 if got.Status != models.PDSMigrationStatusPending {
126 t.Fatalf("status = %s, want pending", got.Status)
127 }
128 if got.RetryCount != 1 {
129 t.Fatalf("RetryCount = %d, want 1", got.RetryCount)
130 }
131 if got.RetryAfter == 0 {
132 t.Fatalf("RetryAfter not scheduled")
133 }
134}
135
136func TestMigrateSuccess(t *testing.T) {
137 did := syntax.DID("did:plc:boltless")
138 m := newTestMigration(t,
139 func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { return nil },
140 func(context.Context, syntax.DID, string, error) bool { return false },
141 )
142 seedMigration(t, m.db, newPDSMigration(did))
143 pm := newPDSMigration(did)
144
145 if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil {
146 t.Fatalf("migrate: %v", err)
147 }
148
149 got := fetch(t, m.db, did)
150 if got.Status != models.PDSMigrationStatusDone {
151 t.Fatalf("status = %s, want done", got.Status)
152 }
153}
154
155func TestEnqueueResetsFailedToPending(t *testing.T) {
156 d := newTestDB(t)
157 did := syntax.DID("did:plc:boltless")
158 seed := newPDSMigration(did)
159 seedMigration(t, d, seed)
160
161 errMsg := "some prior failure"
162 failed := *seed
163 failed.Status = models.PDSMigrationStatusFailed
164 failed.RetryCount = 7
165 failed.ErrorMsg = &errMsg
166 if err := db.UpdatePdsRecordMigration(context.Background(), d, &failed); err != nil {
167 t.Fatalf("UpdatePdsRecordMigration: %v", err)
168 }
169
170 if err := db.EnqueuePdsRecordMigration(context.Background(), d, seed.Name, seed.Did, seed.Collection, seed.Rkey); err != nil {
171 t.Fatalf("re-enqueue: %v", err)
172 }
173
174 got := fetch(t, d, did)
175 if got.Status != models.PDSMigrationStatusPending {
176 t.Fatalf("status = %s, want pending", got.Status)
177 }
178 if got.RetryCount != 0 {
179 t.Fatalf("RetryCount = %d, want 0", got.RetryCount)
180 }
181 if got.ErrorMsg != nil {
182 t.Fatalf("ErrorMsg = %v, want nil", got.ErrorMsg)
183 }
184}
185
186func TestReapStaleRunning(t *testing.T) {
187 d := newTestDB(t)
188 did := syntax.DID("did:plc:akshay")
189 seed := newPDSMigration(did)
190 seedMigration(t, d, seed)
191 running := *seed
192 running.Status = models.PDSMigrationStatusRunning
193 if err := db.UpdatePdsRecordMigration(context.Background(), d, &running); err != nil {
194 t.Fatalf("update: %v", err)
195 }
196
197 if err := db.ReapStaleRunningMigrations(context.Background(), d); err != nil {
198 t.Fatalf("reap: %v", err)
199 }
200
201 got := fetch(t, d, did)
202 if got.Status != models.PDSMigrationStatusPending {
203 t.Fatalf("status = %s, want pending", got.Status)
204 }
205}