Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/yovxsu 3.3 kB View raw
1package db 2 3import ( 4 "context" 5 "time" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/appview/models" 9) 10 11// "migration" for records stored in user's PDS, not AppView DB 12 13// ListPendingPdsRecordMigrations queries list of pending PDS migrations for given user. 14// Only pending migrations whose `retry_after` has elapsed are returned. 15func ListPendingPdsRecordMigrations(ctx context.Context, e Execer, user syntax.DID) ([]*models.PDSMigration, error) { 16 rows, err := e.QueryContext(ctx, 17 `with picked as ( 18 select rowid 19 from pds_migration 20 where did = ? 21 and status = 'pending' 22 and retry_after < ? 23 ) 24 update pds_migration 25 set status = ? 26 where rowid in (select rowid from picked) 27 returning name, did, collection, rkey, status, error_msg, retry_count, retry_after`, 28 user, 29 time.Now().Unix(), 30 models.PDSMigrationStatusRunning, 31 ) 32 if err != nil { 33 return nil, err 34 } 35 defer rows.Close() 36 37 var migrations []*models.PDSMigration 38 for rows.Next() { 39 var migration models.PDSMigration 40 if err := rows.Scan( 41 &migration.Name, 42 &migration.Did, 43 &migration.Collection, 44 &migration.Rkey, 45 &migration.Status, 46 &migration.ErrorMsg, 47 &migration.RetryCount, 48 &migration.RetryAfter, 49 ); err != nil { 50 return nil, err 51 } 52 migrations = append(migrations, &migration) 53 } 54 if err := rows.Err(); err != nil { 55 return nil, err 56 } 57 58 return migrations, nil 59} 60 61func HasPendingPdsRecordMigration(ctx context.Context, e Execer, user syntax.DID) (bool, error) { 62 var exists bool 63 err := e.QueryRowContext(ctx, 64 `select exists( 65 select 1 from pds_migration 66 where did = ? 67 and status = 'pending' 68 and retry_after < ? 69 )`, 70 user, 71 time.Now().Unix(), 72 ).Scan(&exists) 73 if err != nil { 74 return false, err 75 } 76 return exists, nil 77} 78 79func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error { 80 _, err := e.ExecContext(ctx, 81 `insert into pds_migration (name, did, collection, rkey) 82 values (?, ?, ?, ?) 83 on conflict(name, did, collection, rkey) do update set 84 status = case when pds_migration.status = 'failed' then 'pending' else pds_migration.status end, 85 retry_count = case when pds_migration.status = 'failed' then 0 else pds_migration.retry_count end, 86 retry_after = case when pds_migration.status = 'failed' then 0 else pds_migration.retry_after end, 87 error_msg = case when pds_migration.status = 'failed' then null else pds_migration.error_msg end`, 88 name, did, collection, rkey, 89 ) 90 return err 91} 92 93func ReapStaleRunningMigrations(ctx context.Context, e Execer) error { 94 _, err := e.ExecContext(ctx, 95 `update pds_migration set status = 'pending' where status = 'running'`, 96 ) 97 return err 98} 99 100func UpdatePdsRecordMigration(ctx context.Context, e Execer, migration *models.PDSMigration) error { 101 _, err := e.ExecContext(ctx, 102 `update pds_migration 103 set status = ?, 104 error_msg = ?, 105 retry_count = ?, 106 retry_after = ?, 107 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 108 where name = ? and did = ? and collection = ? and rkey = ?`, 109 migration.Status, 110 migration.ErrorMsg, 111 migration.RetryCount, 112 migration.RetryAfter, 113 migration.Name, 114 migration.Did, 115 migration.Collection, 116 migration.Rkey, 117 ) 118 return err 119}