Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "time"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/appview/models"
9)
10
11// "migration" for records stored in user's PDS, not AppView DB
12
13// ListPendingPdsRecordMigrations queries list of pending PDS migrations for given user.
14// Only pending migrations whose `retry_after` has elapsed are returned.
15func ListPendingPdsRecordMigrations(ctx context.Context, e Execer, user syntax.DID) ([]*models.PDSMigration, error) {
16 rows, err := e.QueryContext(ctx,
17 `with picked as (
18 select rowid
19 from pds_migration
20 where did = ?
21 and status = 'pending'
22 and retry_after < ?
23 )
24 update pds_migration
25 set status = ?
26 where rowid in (select rowid from picked)
27 returning name, did, collection, rkey, status, error_msg, retry_count, retry_after`,
28 user,
29 time.Now().Unix(),
30 models.PDSMigrationStatusRunning,
31 )
32 if err != nil {
33 return nil, err
34 }
35 defer rows.Close()
36
37 var migrations []*models.PDSMigration
38 for rows.Next() {
39 var migration models.PDSMigration
40 if err := rows.Scan(
41 &migration.Name,
42 &migration.Did,
43 &migration.Collection,
44 &migration.Rkey,
45 &migration.Status,
46 &migration.ErrorMsg,
47 &migration.RetryCount,
48 &migration.RetryAfter,
49 ); err != nil {
50 return nil, err
51 }
52 migrations = append(migrations, &migration)
53 }
54 if err := rows.Err(); err != nil {
55 return nil, err
56 }
57
58 return migrations, nil
59}
60
61func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error {
62 _, err := e.ExecContext(ctx,
63 `insert into pds_migration (name, did, collection, rkey)
64 values (?, ?, ?, ?)`,
65 name, did, collection, rkey,
66 )
67 return err
68}
69
70func UpdatePdsRecordMigration(ctx context.Context, e Execer, migration *models.PDSMigration) error {
71 _, err := e.ExecContext(ctx,
72 `update pds_migration
73 set status = ?,
74 error_msg = ?,
75 retry_count = ?,
76 retry_after = ?,
77 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
78 where name = ? and did = ? and collection = ? and rkey = ?`,
79 migration.Status,
80 migration.ErrorMsg,
81 migration.RetryCount,
82 migration.RetryAfter,
83 migration.Name,
84 migration.Did,
85 migration.Collection,
86 migration.Rkey,
87 )
88 return err
89}