Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "time"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/appview/models"
9)
10
11// "migration" for records stored in user's PDS, not AppView DB
12
13// ListPendingPdsRecordMigrations queries list of pending PDS migrations for given user.
14// Only pending migrations whose `retry_after` has elapsed are returned.
15func ListPendingPdsRecordMigrations(ctx context.Context, e Execer, user syntax.DID) ([]*models.PDSMigration, error) {
16 rows, err := e.QueryContext(ctx,
17 `with picked as (
18 select rowid
19 from pds_migration
20 where did = ?
21 and status = 'pending'
22 and retry_after < ?
23 )
24 update pds_migration
25 set status = ?
26 where rowid in (select rowid from picked)
27 returning name, did, collection, rkey, status, error_msg, retry_count, retry_after`,
28 user,
29 time.Now().Unix(),
30 models.PDSMigrationStatusRunning,
31 )
32 if err != nil {
33 return nil, err
34 }
35 defer rows.Close()
36
37 var migrations []*models.PDSMigration
38 for rows.Next() {
39 var migration models.PDSMigration
40 if err := rows.Scan(
41 &migration.Name,
42 &migration.Did,
43 &migration.Collection,
44 &migration.Rkey,
45 &migration.Status,
46 &migration.ErrorMsg,
47 &migration.RetryCount,
48 &migration.RetryAfter,
49 ); err != nil {
50 return nil, err
51 }
52 migrations = append(migrations, &migration)
53 }
54 if err := rows.Err(); err != nil {
55 return nil, err
56 }
57
58 return migrations, nil
59}
60
61func HasPendingPdsRecordMigration(ctx context.Context, e Execer, user syntax.DID) (bool, error) {
62 var exists bool
63 err := e.QueryRowContext(ctx,
64 `select exists(
65 select 1 from pds_migration
66 where did = ?
67 and status = 'pending'
68 and retry_after < ?
69 )`,
70 user,
71 time.Now().Unix(),
72 ).Scan(&exists)
73 if err != nil {
74 return false, err
75 }
76 return exists, nil
77}
78
79func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error {
80 _, err := e.ExecContext(ctx,
81 `insert into pds_migration (name, did, collection, rkey)
82 values (?, ?, ?, ?)
83 on conflict(name, did, collection, rkey) do update set
84 status = case when pds_migration.status = 'failed' then 'pending' else pds_migration.status end,
85 retry_count = case when pds_migration.status = 'failed' then 0 else pds_migration.retry_count end,
86 retry_after = case when pds_migration.status = 'failed' then 0 else pds_migration.retry_after end,
87 error_msg = case when pds_migration.status = 'failed' then null else pds_migration.error_msg end`,
88 name, did, collection, rkey,
89 )
90 return err
91}
92
93func ReapStaleRunningMigrations(ctx context.Context, e Execer) error {
94 _, err := e.ExecContext(ctx,
95 `update pds_migration set status = 'pending' where status = 'running'`,
96 )
97 return err
98}
99
100func UpdatePdsRecordMigration(ctx context.Context, e Execer, migration *models.PDSMigration) error {
101 _, err := e.ExecContext(ctx,
102 `update pds_migration
103 set status = ?,
104 error_msg = ?,
105 retry_count = ?,
106 retry_after = ?,
107 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
108 where name = ? and did = ? and collection = ? and rkey = ?`,
109 migration.Status,
110 migration.ErrorMsg,
111 migration.RetryCount,
112 migration.RetryAfter,
113 migration.Name,
114 migration.Did,
115 migration.Collection,
116 migration.Rkey,
117 )
118 return err
119}