Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

at icy/ytnwlw 5.2 kB View raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/pagination" 11 "tangled.org/core/knotmirror/models" 12) 13 14func UpsertRepo(ctx context.Context, e DBTX, repo *models.Repo) error { 15 if repo.RepoDid == "" { 16 return fmt.Errorf("upsert repo: repo_did is required") 17 } 18 if _, err := e.ExecContext(ctx, 19 `insert into repos (did, rkey, cid, name, knot_domain, repo_did, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 20 values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 21 on conflict(repo_did) do update set 22 did = excluded.did, 23 rkey = excluded.rkey, 24 cid = excluded.cid, 25 name = excluded.name, 26 knot_domain = excluded.knot_domain, 27 git_rev = excluded.git_rev, 28 repo_sha = excluded.repo_sha, 29 state = excluded.state, 30 error_msg = excluded.error_msg, 31 retry_count = excluded.retry_count, 32 retry_after = excluded.retry_after`, 33 repo.Did, 34 repo.Rkey, 35 repo.Cid, 36 repo.Name, 37 repo.KnotDomain, 38 repo.RepoDid, 39 repo.GitRev, 40 repo.RepoSha, 41 repo.State, 42 repo.ErrorMsg, 43 repo.RetryCount, 44 repo.RetryAfter, 45 ); err != nil { 46 return fmt.Errorf("upserting repo: %w", err) 47 } 48 return nil 49} 50 51func UpdateRepoState(ctx context.Context, e DBTX, repoDid syntax.DID, state models.RepoState) error { 52 if _, err := e.ExecContext(ctx, 53 `update repos 54 set state = $1 55 where repo_did = $2`, 56 state, 57 repoDid, 58 ); err != nil { 59 return fmt.Errorf("updating repo: %w", err) 60 } 61 return nil 62} 63 64func DeleteRepo(ctx context.Context, e DBTX, did syntax.DID, rkey syntax.RecordKey) error { 65 if _, err := e.ExecContext(ctx, 66 `delete from repos where did = $1 and rkey = $2`, 67 did, 68 rkey, 69 ); err != nil { 70 return fmt.Errorf("deleting repo: %w", err) 71 } 72 return nil 73} 74 75const repoColumns = ` 76 did, 77 rkey, 78 cid, 79 name, 80 knot_domain, 81 repo_did, 82 git_rev, 83 repo_sha, 84 state, 85 error_msg, 86 retry_count, 87 retry_after` 88 89func scanRepo(row interface{ Scan(...any) error }) (*models.Repo, error) { 90 var repo models.Repo 91 if err := row.Scan( 92 &repo.Did, 93 &repo.Rkey, 94 &repo.Cid, 95 &repo.Name, 96 &repo.KnotDomain, 97 &repo.RepoDid, 98 &repo.GitRev, 99 &repo.RepoSha, 100 &repo.State, 101 &repo.ErrorMsg, 102 &repo.RetryCount, 103 &repo.RetryAfter, 104 ); err != nil { 105 return nil, err 106 } 107 return &repo, nil 108} 109 110func GetRepoByRepoDid(ctx context.Context, e DBTX, repoDid syntax.DID) (*models.Repo, error) { 111 row := e.QueryRowContext(ctx, 112 `select`+repoColumns+` 113 from repos 114 where repo_did = $1`, 115 repoDid, 116 ) 117 repo, err := scanRepo(row) 118 if err != nil { 119 if errors.Is(err, sql.ErrNoRows) { 120 return nil, nil 121 } 122 return nil, fmt.Errorf("querying repo: %w", err) 123 } 124 return repo, nil 125} 126 127func GetRepoByAtUri(ctx context.Context, e DBTX, aturi syntax.ATURI) (*models.Repo, error) { 128 row := e.QueryRowContext(ctx, 129 `select`+repoColumns+` 130 from repos 131 where at_uri = $1`, 132 aturi, 133 ) 134 repo, err := scanRepo(row) 135 if err != nil { 136 if errors.Is(err, sql.ErrNoRows) { 137 return nil, nil 138 } 139 return nil, fmt.Errorf("querying repo: %w", err) 140 } 141 return repo, nil 142} 143 144func ListRepos(ctx context.Context, e DBTX, page pagination.Page, did, knot, state, name string) ([]models.Repo, error) { 145 var conditions []string 146 var args []any 147 148 pageClause := "" 149 if page.Limit > 0 { 150 pageClause = " limit $1 offset $2 " 151 args = append(args, page.Limit, page.Offset) 152 } 153 154 whereClause := "" 155 if did != "" { 156 conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1)) 157 args = append(args, did) 158 } 159 if knot != "" { 160 conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1)) 161 args = append(args, knot) 162 } 163 if state != "" { 164 conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1)) 165 args = append(args, state) 166 } 167 if name != "" { 168 conditions = append(conditions, fmt.Sprintf("name ilike $%d", len(args)+1)) 169 args = append(args, "%"+name+"%") 170 } 171 if len(conditions) > 0 { 172 whereClause = "WHERE " + conditions[0] 173 for _, condition := range conditions[1:] { 174 whereClause += " AND " + condition 175 } 176 } 177 178 query := ` 179 select` + repoColumns + ` 180 from repos 181 ` + whereClause + pageClause 182 rows, err := e.QueryContext(ctx, query, args...) 183 if err != nil { 184 return nil, err 185 } 186 defer rows.Close() 187 188 var repos []models.Repo 189 for rows.Next() { 190 repo, err := scanRepo(rows) 191 if err != nil { 192 return nil, fmt.Errorf("scanning row: %w", err) 193 } 194 repos = append(repos, *repo) 195 } 196 if err := rows.Err(); err != nil { 197 return nil, fmt.Errorf("scanning rows: %w ", err) 198 } 199 200 return repos, nil 201} 202 203func GetRepoCountsByState(ctx context.Context, e DBTX) (map[models.RepoState]int64, error) { 204 const q = ` 205 SELECT state, COUNT(*) 206 FROM repos 207 GROUP BY state 208 ` 209 210 rows, err := e.QueryContext(ctx, q) 211 if err != nil { 212 return nil, err 213 } 214 defer rows.Close() 215 216 counts := make(map[models.RepoState]int64) 217 218 for rows.Next() { 219 var state string 220 var count int64 221 222 if err := rows.Scan(&state, &count); err != nil { 223 return nil, err 224 } 225 226 counts[models.RepoState(state)] = count 227 } 228 229 if err := rows.Err(); err != nil { 230 return nil, err 231 } 232 233 for _, s := range models.AllRepoStates { 234 if _, ok := counts[s]; !ok { 235 counts[s] = 0 236 } 237 } 238 239 return counts, nil 240}