Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/pagination"
11 "tangled.org/core/knotmirror/models"
12)
13
14func UpsertRepo(ctx context.Context, e DBTX, repo *models.Repo) error {
15 if repo.RepoDid == "" {
16 return fmt.Errorf("upsert repo: repo_did is required")
17 }
18 if _, err := e.ExecContext(ctx,
19 `insert into repos (did, rkey, cid, name, knot_domain, repo_did, git_rev, repo_sha, state, error_msg, retry_count, retry_after)
20 values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
21 on conflict(repo_did) do update set
22 did = excluded.did,
23 rkey = excluded.rkey,
24 cid = excluded.cid,
25 name = excluded.name,
26 knot_domain = excluded.knot_domain,
27 git_rev = excluded.git_rev,
28 repo_sha = excluded.repo_sha,
29 state = excluded.state,
30 error_msg = excluded.error_msg,
31 retry_count = excluded.retry_count,
32 retry_after = excluded.retry_after`,
33 repo.Did,
34 repo.Rkey,
35 repo.Cid,
36 repo.Name,
37 repo.KnotDomain,
38 repo.RepoDid,
39 repo.GitRev,
40 repo.RepoSha,
41 repo.State,
42 repo.ErrorMsg,
43 repo.RetryCount,
44 repo.RetryAfter,
45 ); err != nil {
46 return fmt.Errorf("upserting repo: %w", err)
47 }
48 return nil
49}
50
51func UpdateRepoState(ctx context.Context, e DBTX, repoDid syntax.DID, state models.RepoState) error {
52 if _, err := e.ExecContext(ctx,
53 `update repos
54 set state = $1
55 where repo_did = $2`,
56 state,
57 repoDid,
58 ); err != nil {
59 return fmt.Errorf("updating repo: %w", err)
60 }
61 return nil
62}
63
64func DeleteRepo(ctx context.Context, e DBTX, did syntax.DID, rkey syntax.RecordKey) error {
65 if _, err := e.ExecContext(ctx,
66 `delete from repos where did = $1 and rkey = $2`,
67 did,
68 rkey,
69 ); err != nil {
70 return fmt.Errorf("deleting repo: %w", err)
71 }
72 return nil
73}
74
75const repoColumns = `
76 did,
77 rkey,
78 cid,
79 name,
80 knot_domain,
81 repo_did,
82 git_rev,
83 repo_sha,
84 state,
85 error_msg,
86 retry_count,
87 retry_after`
88
89func scanRepo(row interface{ Scan(...any) error }) (*models.Repo, error) {
90 var repo models.Repo
91 if err := row.Scan(
92 &repo.Did,
93 &repo.Rkey,
94 &repo.Cid,
95 &repo.Name,
96 &repo.KnotDomain,
97 &repo.RepoDid,
98 &repo.GitRev,
99 &repo.RepoSha,
100 &repo.State,
101 &repo.ErrorMsg,
102 &repo.RetryCount,
103 &repo.RetryAfter,
104 ); err != nil {
105 return nil, err
106 }
107 return &repo, nil
108}
109
110func GetRepoByRepoDid(ctx context.Context, e DBTX, repoDid syntax.DID) (*models.Repo, error) {
111 row := e.QueryRowContext(ctx,
112 `select`+repoColumns+`
113 from repos
114 where repo_did = $1`,
115 repoDid,
116 )
117 repo, err := scanRepo(row)
118 if err != nil {
119 if errors.Is(err, sql.ErrNoRows) {
120 return nil, nil
121 }
122 return nil, fmt.Errorf("querying repo: %w", err)
123 }
124 return repo, nil
125}
126
127func GetRepoByAtUri(ctx context.Context, e DBTX, aturi syntax.ATURI) (*models.Repo, error) {
128 row := e.QueryRowContext(ctx,
129 `select`+repoColumns+`
130 from repos
131 where at_uri = $1`,
132 aturi,
133 )
134 repo, err := scanRepo(row)
135 if err != nil {
136 if errors.Is(err, sql.ErrNoRows) {
137 return nil, nil
138 }
139 return nil, fmt.Errorf("querying repo: %w", err)
140 }
141 return repo, nil
142}
143
144func ListRepos(ctx context.Context, e DBTX, page pagination.Page, did, knot, state, name string) ([]models.Repo, error) {
145 var conditions []string
146 var args []any
147
148 pageClause := ""
149 if page.Limit > 0 {
150 pageClause = " limit $1 offset $2 "
151 args = append(args, page.Limit, page.Offset)
152 }
153
154 whereClause := ""
155 if did != "" {
156 conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1))
157 args = append(args, did)
158 }
159 if knot != "" {
160 conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1))
161 args = append(args, knot)
162 }
163 if state != "" {
164 conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1))
165 args = append(args, state)
166 }
167 if name != "" {
168 conditions = append(conditions, fmt.Sprintf("name ilike $%d", len(args)+1))
169 args = append(args, "%"+name+"%")
170 }
171 if len(conditions) > 0 {
172 whereClause = "WHERE " + conditions[0]
173 for _, condition := range conditions[1:] {
174 whereClause += " AND " + condition
175 }
176 }
177
178 query := `
179 select` + repoColumns + `
180 from repos
181 ` + whereClause + pageClause
182 rows, err := e.QueryContext(ctx, query, args...)
183 if err != nil {
184 return nil, err
185 }
186 defer rows.Close()
187
188 var repos []models.Repo
189 for rows.Next() {
190 repo, err := scanRepo(rows)
191 if err != nil {
192 return nil, fmt.Errorf("scanning row: %w", err)
193 }
194 repos = append(repos, *repo)
195 }
196 if err := rows.Err(); err != nil {
197 return nil, fmt.Errorf("scanning rows: %w ", err)
198 }
199
200 return repos, nil
201}
202
203func GetRepoCountsByState(ctx context.Context, e DBTX) (map[models.RepoState]int64, error) {
204 const q = `
205 SELECT state, COUNT(*)
206 FROM repos
207 GROUP BY state
208 `
209
210 rows, err := e.QueryContext(ctx, q)
211 if err != nil {
212 return nil, err
213 }
214 defer rows.Close()
215
216 counts := make(map[models.RepoState]int64)
217
218 for rows.Next() {
219 var state string
220 var count int64
221
222 if err := rows.Scan(&state, &count); err != nil {
223 return nil, err
224 }
225
226 counts[models.RepoState(state)] = count
227 }
228
229 if err := rows.Err(); err != nil {
230 return nil, err
231 }
232
233 for _, s := range models.AllRepoStates {
234 if _, ok := counts[s]; !ok {
235 counts[s] = 0
236 }
237 }
238
239 return counts, nil
240}