···11+package db
22+33+import (
44+ "context"
55+ "database/sql"
66+ "errors"
77+ "time"
88+99+ "github.com/bluesky-social/indigo/atproto/syntax"
1010+)
1111+1212+type GitRepoMigrationStatus string
1313+1414+const (
1515+ GitRepoMigrationStatusPending GitRepoMigrationStatus = "pending"
1616+ GitRepoMigrationStatusRunning GitRepoMigrationStatus = "running"
1717+ GitRepoMigrationStatusDone GitRepoMigrationStatus = "done"
1818+ GitRepoMigrationStatusFailed GitRepoMigrationStatus = "failed"
1919+)
2020+2121+const (
2222+ GitRepoMigrationSourceGitHub = "github"
2323+)
2424+2525+type GitRepoMigrations []GitRepoMigration
2626+2727+func (m GitRepoMigrations) AnyActive() bool {
2828+ for _, r := range m {
2929+ if r.Status == GitRepoMigrationStatusPending || r.Status == GitRepoMigrationStatusRunning {
3030+ return true
3131+ }
3232+ }
3333+ return false
3434+}
3535+3636+type GitRepoMigration struct {
3737+ ID int64
3838+ OwnerDid syntax.DID
3939+ SourceKind string
4040+ CloneUrl string
4141+ Name string
4242+ Knot string
4343+ Description string
4444+ Website string
4545+ Topics []string
4646+ SessionID string
4747+ Status GitRepoMigrationStatus
4848+ ErrorMsg string
4949+ UpdatedAt time.Time
5050+}
5151+5252+func InsertGitRepoMigrations(ctx context.Context, e *DB, rows []GitRepoMigration) error {
5353+ if len(rows) == 0 {
5454+ return nil
5555+ }
5656+ txx, err := e.BeginTx(ctx, nil)
5757+ if err != nil {
5858+ return err
5959+ }
6060+ defer txx.Rollback()
6161+6262+ stmt, err := txx.PrepareContext(ctx, `
6363+ insert into gitrepo_migrations
6464+ (owner_did, source_kind, clone_url, name, knot, description, session_id,
6565+ status, error_msg, updated_at)
6666+ values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
6767+ on conflict(owner_did, name) do update set
6868+ source_kind = excluded.source_kind,
6969+ clone_url = excluded.clone_url,
7070+ knot = excluded.knot,
7171+ description = excluded.description,
7272+ session_id = excluded.session_id,
7373+ status = 'pending',
7474+ error_msg = '',
7575+ updated_at = excluded.updated_at
7676+ `)
7777+ if err != nil {
7878+ return err
7979+ }
8080+ defer stmt.Close()
8181+8282+ now := time.Now().UTC().Format(time.RFC3339)
8383+ for _, r := range rows {
8484+ status := r.Status
8585+ if status == "" {
8686+ status = GitRepoMigrationStatusPending
8787+ }
8888+ if _, err := stmt.ExecContext(ctx,
8989+ r.OwnerDid, r.SourceKind, r.CloneUrl, r.Name, r.Knot, r.Description, r.SessionID,
9090+ status, r.ErrorMsg, now,
9191+ ); err != nil {
9292+ return err
9393+ }
9494+ }
9595+ return txx.Commit()
9696+}
9797+9898+func ClaimNextPending(ctx context.Context, e Execer) (*GitRepoMigration, bool, error) {
9999+ var (
100100+ m GitRepoMigration
101101+ updatedAt string
102102+ )
103103+ err := e.QueryRowContext(ctx, `
104104+ update gitrepo_migrations
105105+ set status = 'running',
106106+ updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
107107+ where id = (
108108+ select id from gitrepo_migrations gm
109109+ where status = 'pending'
110110+ and not exists (
111111+ select 1 from gitrepo_migrations gm2
112112+ where gm2.owner_did = gm.owner_did
113113+ and gm2.status = 'running'
114114+ )
115115+ order by id
116116+ limit 1
117117+ )
118118+ returning id, owner_did, source_kind, clone_url, name, knot,
119119+ description, session_id, status, error_msg, updated_at
120120+ `).Scan(
121121+ &m.ID, &m.OwnerDid, &m.SourceKind, &m.CloneUrl, &m.Name, &m.Knot,
122122+ &m.Description, &m.SessionID, &m.Status, &m.ErrorMsg,
123123+ &updatedAt,
124124+ )
125125+ if errors.Is(err, sql.ErrNoRows) {
126126+ return nil, false, nil
127127+ }
128128+ if err != nil {
129129+ return nil, false, err
130130+ }
131131+ if t, err := time.Parse(time.RFC3339, updatedAt); err == nil {
132132+ m.UpdatedAt = t
133133+ }
134134+ return &m, true, nil
135135+}
136136+137137+func MarkGitRepoMigrationDone(ctx context.Context, e Execer, id int64) error {
138138+ _, err := e.ExecContext(ctx, `
139139+ update gitrepo_migrations
140140+ set status = 'done',
141141+ error_msg = '',
142142+ updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
143143+ where id = ?
144144+ `, id)
145145+ return err
146146+}
147147+148148+func MarkGitRepoMigrationFailed(ctx context.Context, e Execer, id int64, errMsg string) error {
149149+ _, err := e.ExecContext(ctx, `
150150+ update gitrepo_migrations
151151+ set status = 'failed',
152152+ error_msg = ?,
153153+ updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
154154+ where id = ?
155155+ `, errMsg, id)
156156+ return err
157157+}
158158+159159+func ListGitRepoMigrationsForOwner(ctx context.Context, e Execer, owner string) (GitRepoMigrations, error) {
160160+ rows, err := e.QueryContext(ctx, `
161161+ select id, owner_did, source_kind, clone_url, name, knot,
162162+ description, session_id, status, coalesce(error_msg, ''),
163163+ updated_at
164164+ from gitrepo_migrations
165165+ where owner_did = ?
166166+ order by id desc
167167+ `, owner)
168168+ if err != nil {
169169+ return nil, err
170170+ }
171171+ defer rows.Close()
172172+173173+ var out GitRepoMigrations
174174+ for rows.Next() {
175175+ var (
176176+ m GitRepoMigration
177177+ updatedAt string
178178+ )
179179+ if err := rows.Scan(
180180+ &m.ID, &m.OwnerDid, &m.SourceKind, &m.CloneUrl, &m.Name, &m.Knot,
181181+ &m.Description, &m.SessionID, &m.Status, &m.ErrorMsg,
182182+ &updatedAt,
183183+ ); err != nil {
184184+ return nil, err
185185+ }
186186+ if t, err := time.Parse(time.RFC3339, updatedAt); err == nil {
187187+ m.UpdatedAt = t
188188+ }
189189+ out = append(out, m)
190190+ }
191191+ return out, rows.Err()
192192+}
193193+194194+func ListEnqueuedGitRepoNames(ctx context.Context, e Execer, owner string) (map[string]struct{}, error) {
195195+ rows, err := e.QueryContext(ctx, `
196196+ select name from gitrepo_migrations
197197+ where owner_did = ?
198198+ and status in ('pending', 'running', 'done')
199199+ `, owner)
200200+ if err != nil {
201201+ return nil, err
202202+ }
203203+ defer rows.Close()
204204+205205+ out := map[string]struct{}{}
206206+ for rows.Next() {
207207+ var n string
208208+ if err := rows.Scan(&n); err != nil {
209209+ return nil, err
210210+ }
211211+ out[n] = struct{}{}
212212+ }
213213+ return out, rows.Err()
214214+}
215215+216216+func ReapStaleRunningGitRepoMigrations(ctx context.Context, e Execer) error {
217217+ _, err := e.ExecContext(ctx, `
218218+ update gitrepo_migrations
219219+ set status = 'pending',
220220+ updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
221221+ where status = 'running'
222222+ `)
223223+ return err
224224+}
+24
appview/db/db.go
···22182218 return err
22192219 })
2220222022212221+ orm.RunMigration(conn, logger, "add-gitrepo_migrations", func(tx *sql.Tx) error {
22222222+ _, err := tx.Exec(`
22232223+ create table gitrepo_migrations (
22242224+ id integer primary key autoincrement,
22252225+ owner_did text not null,
22262226+ source_kind text not null,
22272227+ clone_url text not null,
22282228+ name text not null,
22292229+ knot text not null,
22302230+ description text not null,
22312231+ session_id text not null default '',
22322232+22332233+ status text not null default 'pending',
22342234+ error_msg text not null default '',
22352235+ updated_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
22362236+22372237+ unique(owner_did, name)
22382238+ );
22392239+ create index if not exists idx_gitrepo_migrations_status
22402240+ on gitrepo_migrations(status);
22412241+ `)
22422242+ return err
22432243+ })
22442244+22212245 return &DB{
22222246 db,
22232247 logger,