Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/yovxsu 6.1 kB View raw
1package migration 2 3import ( 4 "context" 5 "errors" 6 "io" 7 "log/slog" 8 "path/filepath" 9 "sync/atomic" 10 "testing" 11 12 "github.com/bluesky-social/indigo/atproto/atclient" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/models" 17 "tangled.org/core/appview/oauth" 18) 19 20func newTestDB(t *testing.T) *db.DB { 21 t.Helper() 22 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "test.db")) 23 if err != nil { 24 t.Fatalf("db.Make: %v", err) 25 } 26 t.Cleanup(func() { d.Close() }) 27 return d 28} 29 30func seedMigration(t *testing.T, d *db.DB, mig *models.PDSMigration) { 31 t.Helper() 32 if err := db.EnqueuePdsRecordMigration(context.Background(), d, mig.Name, mig.Did, mig.Collection, mig.Rkey); err != nil { 33 t.Fatalf("EnqueuePdsRecordMigration: %v", err) 34 } 35} 36 37func fetch(t *testing.T, d *db.DB, did syntax.DID) *models.PDSMigration { 38 t.Helper() 39 rows, err := d.QueryContext(context.Background(), 40 `select name, did, collection, rkey, status, error_msg, retry_count, retry_after from pds_migration where did = ?`, did) 41 if err != nil { 42 t.Fatalf("query: %v", err) 43 } 44 defer rows.Close() 45 if !rows.Next() { 46 t.Fatalf("no row for did %s", did) 47 } 48 var m models.PDSMigration 49 if err := rows.Scan(&m.Name, &m.Did, &m.Collection, &m.Rkey, &m.Status, &m.ErrorMsg, &m.RetryCount, &m.RetryAfter); err != nil { 50 t.Fatalf("scan: %v", err) 51 } 52 return &m 53} 54 55func newTestMigration(t *testing.T, mig migrator, onPerm permAuthErrHandler) *Migration { 56 t.Helper() 57 return &Migration{ 58 db: newTestDB(t), 59 logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 60 sem: make(chan struct{}, maxConcurrentMigrations), 61 migrators: map[string]migrator{"add-repo-did": mig}, 62 onPermAuthErr: onPerm, 63 } 64} 65 66func newPDSMigration(did syntax.DID) *models.PDSMigration { 67 return &models.PDSMigration{ 68 Name: "add-repo-did", 69 Did: did, 70 Collection: "sh.tangled.repo", 71 Rkey: "abc", 72 Status: models.PDSMigrationStatusRunning, 73 } 74} 75 76func TestMigrateInvalidGrantMarksFailed(t *testing.T) { 77 did := syntax.DID("did:plc:boltless") 78 var permCalled atomic.Int32 79 m := newTestMigration(t, 80 func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { 81 return errors.New("put record: failed to refresh OAuth tokens: token refresh failed: auth server request failed (HTTP 400): invalid_grant") 82 }, 83 func(_ context.Context, _ syntax.DID, _ string, err error) bool { 84 permCalled.Add(1) 85 return oauth.IsPermanentAuthErr(err) 86 }, 87 ) 88 seedMigration(t, m.db, newPDSMigration(did)) 89 pm := newPDSMigration(did) 90 91 if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil { 92 t.Fatalf("migrate: %v", err) 93 } 94 95 got := fetch(t, m.db, did) 96 if got.Status != models.PDSMigrationStatusFailed { 97 t.Fatalf("status = %s, want failed", got.Status) 98 } 99 if got.RetryAfter != 0 { 100 t.Fatalf("RetryAfter = %d, want 0", got.RetryAfter) 101 } 102 if permCalled.Load() != 1 { 103 t.Fatalf("onPermAuthErr called %d times, want 1", permCalled.Load()) 104 } 105} 106 107func TestMigrateTransientErrorStaysPending(t *testing.T) { 108 did := syntax.DID("did:plc:akshay") 109 m := newTestMigration(t, 110 func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { 111 return errors.New("put record: failed to refresh OAuth tokens: token refresh failed (HTTP 429): rate_limited") 112 }, 113 func(_ context.Context, _ syntax.DID, _ string, err error) bool { 114 return oauth.IsPermanentAuthErr(err) 115 }, 116 ) 117 seedMigration(t, m.db, newPDSMigration(did)) 118 pm := newPDSMigration(did) 119 120 if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil { 121 t.Fatalf("migrate: %v", err) 122 } 123 124 got := fetch(t, m.db, did) 125 if got.Status != models.PDSMigrationStatusPending { 126 t.Fatalf("status = %s, want pending", got.Status) 127 } 128 if got.RetryCount != 1 { 129 t.Fatalf("RetryCount = %d, want 1", got.RetryCount) 130 } 131 if got.RetryAfter == 0 { 132 t.Fatalf("RetryAfter not scheduled") 133 } 134} 135 136func TestMigrateSuccess(t *testing.T) { 137 did := syntax.DID("did:plc:boltless") 138 m := newTestMigration(t, 139 func(context.Context, *atclient.APIClient, syntax.DID, syntax.ATURI) error { return nil }, 140 func(context.Context, syntax.DID, string, error) bool { return false }, 141 ) 142 seedMigration(t, m.db, newPDSMigration(did)) 143 pm := newPDSMigration(did) 144 145 if err := m.migrate(context.Background(), &atclient.APIClient{}, "sess1", pm); err != nil { 146 t.Fatalf("migrate: %v", err) 147 } 148 149 got := fetch(t, m.db, did) 150 if got.Status != models.PDSMigrationStatusDone { 151 t.Fatalf("status = %s, want done", got.Status) 152 } 153} 154 155func TestEnqueueResetsFailedToPending(t *testing.T) { 156 d := newTestDB(t) 157 did := syntax.DID("did:plc:boltless") 158 seed := newPDSMigration(did) 159 seedMigration(t, d, seed) 160 161 errMsg := "some prior failure" 162 failed := *seed 163 failed.Status = models.PDSMigrationStatusFailed 164 failed.RetryCount = 7 165 failed.ErrorMsg = &errMsg 166 if err := db.UpdatePdsRecordMigration(context.Background(), d, &failed); err != nil { 167 t.Fatalf("UpdatePdsRecordMigration: %v", err) 168 } 169 170 if err := db.EnqueuePdsRecordMigration(context.Background(), d, seed.Name, seed.Did, seed.Collection, seed.Rkey); err != nil { 171 t.Fatalf("re-enqueue: %v", err) 172 } 173 174 got := fetch(t, d, did) 175 if got.Status != models.PDSMigrationStatusPending { 176 t.Fatalf("status = %s, want pending", got.Status) 177 } 178 if got.RetryCount != 0 { 179 t.Fatalf("RetryCount = %d, want 0", got.RetryCount) 180 } 181 if got.ErrorMsg != nil { 182 t.Fatalf("ErrorMsg = %v, want nil", got.ErrorMsg) 183 } 184} 185 186func TestReapStaleRunning(t *testing.T) { 187 d := newTestDB(t) 188 did := syntax.DID("did:plc:akshay") 189 seed := newPDSMigration(did) 190 seedMigration(t, d, seed) 191 running := *seed 192 running.Status = models.PDSMigrationStatusRunning 193 if err := db.UpdatePdsRecordMigration(context.Background(), d, &running); err != nil { 194 t.Fatalf("update: %v", err) 195 } 196 197 if err := db.ReapStaleRunningMigrations(context.Background(), d); err != nil { 198 t.Fatalf("reap: %v", err) 199 } 200 201 got := fetch(t, d, did) 202 if got.Status != models.PDSMigrationStatusPending { 203 t.Fatalf("status = %s, want pending", got.Status) 204 } 205}