Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "slices"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/appview/models"
13 "tangled.org/core/orm"
14)
15
16func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) {
17 var pipelines []models.Pipeline
18
19 var conditions []string
20 var args []any
21 for _, filter := range filters {
22 conditions = append(conditions, filter.Condition())
23 args = append(args, filter.Arg()...)
24 }
25
26 whereClause := ""
27 if conditions != nil {
28 whereClause = " where " + strings.Join(conditions, " and ")
29 }
30
31 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause)
32
33 rows, err := e.Query(query, args...)
34
35 if err != nil {
36 return nil, err
37 }
38 defer rows.Close()
39
40 for rows.Next() {
41 var pipeline models.Pipeline
42 var createdAt string
43 var repoDid sql.NullString
44 err = rows.Scan(
45 &pipeline.Id,
46 &pipeline.Rkey,
47 &pipeline.Knot,
48 &pipeline.RepoOwner,
49 &pipeline.RepoName,
50 &pipeline.Sha,
51 &createdAt,
52 &repoDid,
53 )
54 if err != nil {
55 return nil, err
56 }
57
58 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
59 pipeline.Created = t
60 }
61 if repoDid.Valid {
62 pipeline.RepoDid = repoDid.String
63 }
64
65 pipelines = append(pipelines, pipeline)
66 }
67
68 if err = rows.Err(); err != nil {
69 return nil, err
70 }
71
72 return pipelines, nil
73}
74
75func AddPipeline(e Execer, pipeline models.Pipeline) error {
76 var repoDid *string
77 if pipeline.RepoDid != "" {
78 repoDid = &pipeline.RepoDid
79 }
80
81 args := []any{
82 pipeline.Rkey,
83 pipeline.Knot,
84 pipeline.RepoOwner,
85 pipeline.RepoName,
86 pipeline.TriggerId,
87 pipeline.Sha,
88 repoDid,
89 }
90
91 placeholders := make([]string, len(args))
92 for i := range placeholders {
93 placeholders[i] = "?"
94 }
95
96 query := fmt.Sprintf(`
97 insert or ignore into pipelines (
98 rkey,
99 knot,
100 repo_owner,
101 repo_name,
102 trigger_id,
103 sha,
104 repo_did
105 ) values (%s)
106 `, strings.Join(placeholders, ","))
107
108 _, err := e.Exec(query, args...)
109
110 return err
111}
112
113func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
114 args := []any{
115 trigger.Kind,
116 trigger.PushRef,
117 trigger.PushNewSha,
118 trigger.PushOldSha,
119 trigger.PRSourceBranch,
120 trigger.PRTargetBranch,
121 trigger.PRSourceSha,
122 trigger.PRAction,
123 }
124
125 placeholders := make([]string, len(args))
126 for i := range placeholders {
127 placeholders[i] = "?"
128 }
129
130 query := fmt.Sprintf(`insert or ignore into triggers (
131 kind,
132 push_ref,
133 push_new_sha,
134 push_old_sha,
135 pr_source_branch,
136 pr_target_branch,
137 pr_source_sha,
138 pr_action
139 ) values (%s)`, strings.Join(placeholders, ","))
140
141 res, err := e.Exec(query, args...)
142 if err != nil {
143 return 0, err
144 }
145
146 return res.LastInsertId()
147}
148
149func AddPipelineStatus(ctx context.Context, e Execer, status models.PipelineStatus) error {
150 args := []any{
151 status.Spindle,
152 status.Rkey,
153 status.PipelineKnot,
154 status.PipelineRkey,
155 status.Workflow,
156 status.Status,
157 status.Error,
158 status.ExitCode,
159 status.Created.Format(time.RFC3339),
160 }
161
162 placeholders := make([]string, len(args))
163 for i := range placeholders {
164 placeholders[i] = "?"
165 }
166
167 query := fmt.Sprintf(`
168 insert or ignore into pipeline_statuses (
169 spindle,
170 rkey,
171 pipeline_knot,
172 pipeline_rkey,
173 workflow,
174 status,
175 error,
176 exit_code,
177 created
178 ) values (%s)
179 `, strings.Join(placeholders, ","))
180
181 _, err := e.ExecContext(ctx, query, args...)
182 return err
183}
184
185// this is a mega query, but the most useful one:
186// get N pipelines, for each one get the latest status of its N workflows
187//
188// the pipelines table is aliased to `p`
189// the triggers table is aliased to `t`
190func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) {
191 var conditions []string
192 var args []any
193 for _, filter := range filters {
194 conditions = append(conditions, filter.Condition())
195 args = append(args, filter.Arg()...)
196 }
197
198 whereClause := ""
199 if conditions != nil {
200 whereClause = " where " + strings.Join(conditions, " and ")
201 }
202
203 query := fmt.Sprintf(`
204 select
205 p.id,
206 p.knot,
207 p.rkey,
208 p.repo_owner,
209 p.repo_name,
210 p.sha,
211 p.created,
212 p.repo_did,
213 t.id,
214 t.kind,
215 t.push_ref,
216 t.push_new_sha,
217 t.push_old_sha,
218 t.pr_source_branch,
219 t.pr_target_branch,
220 t.pr_source_sha,
221 t.pr_action
222 from
223 pipelines p
224 join
225 triggers t ON p.trigger_id = t.id
226 %s
227 order by p.created desc
228 limit %d
229 `, whereClause, limit)
230
231 rows, err := e.Query(query, args...)
232 if err != nil {
233 return nil, err
234 }
235 defer rows.Close()
236
237 pipelines := make(map[syntax.ATURI]models.Pipeline)
238 for rows.Next() {
239 var p models.Pipeline
240 var t models.Trigger
241 var created string
242 var repoDid sql.NullString
243
244 err := rows.Scan(
245 &p.Id,
246 &p.Knot,
247 &p.Rkey,
248 &p.RepoOwner,
249 &p.RepoName,
250 &p.Sha,
251 &created,
252 &repoDid,
253 &p.TriggerId,
254 &t.Kind,
255 &t.PushRef,
256 &t.PushNewSha,
257 &t.PushOldSha,
258 &t.PRSourceBranch,
259 &t.PRTargetBranch,
260 &t.PRSourceSha,
261 &t.PRAction,
262 )
263 if err != nil {
264 return nil, err
265 }
266
267 p.Created, err = time.Parse(time.RFC3339, created)
268 if err != nil {
269 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
270 }
271 if repoDid.Valid {
272 p.RepoDid = repoDid.String
273 }
274
275 t.Id = p.TriggerId
276 p.Trigger = &t
277 p.Statuses = make(map[string]models.WorkflowStatus)
278
279 pipelines[p.AtUri()] = p
280 }
281
282 // get all statuses
283 // the where clause here is of the form:
284 //
285 // and (
286 // (ps.pipeline_knot = k1 and ps.pipeline_rkey = r1)
287 // or (ps.pipeline_knot = k2 and ps.pipeline_rkey = r2)
288 // )
289 //
290 // the join on pipelines and repos enforces that the status was emitted
291 // by the spindle that is actually registered for the pipeline's repo.
292 conditions = nil
293 args = nil
294 for _, p := range pipelines {
295 knotFilter := orm.FilterEq("ps.pipeline_knot", p.Knot)
296 rkeyFilter := orm.FilterEq("ps.pipeline_rkey", p.Rkey)
297 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
298 args = append(args, p.Knot)
299 args = append(args, p.Rkey)
300 }
301 whereClause = ""
302 if conditions != nil {
303 whereClause = "and (" + strings.Join(conditions, " or ") + ")"
304 }
305 query = fmt.Sprintf(`
306 select
307 ps.id, ps.spindle, ps.rkey, ps.pipeline_knot, ps.pipeline_rkey,
308 ps.created, ps.workflow, ps.status, ps.error, ps.exit_code
309 from
310 pipeline_statuses ps
311 join
312 pipelines p on p.knot = ps.pipeline_knot and p.rkey = ps.pipeline_rkey
313 join
314 repos r on r.repo_did = p.repo_did
315 where
316 ps.spindle = r.spindle
317 %s
318 `, whereClause)
319
320 rows, err = e.Query(query, args...)
321 if err != nil {
322 return nil, err
323 }
324 defer rows.Close()
325
326 for rows.Next() {
327 var ps models.PipelineStatus
328 var created string
329
330 err := rows.Scan(
331 &ps.ID,
332 &ps.Spindle,
333 &ps.Rkey,
334 &ps.PipelineKnot,
335 &ps.PipelineRkey,
336 &created,
337 &ps.Workflow,
338 &ps.Status,
339 &ps.Error,
340 &ps.ExitCode,
341 )
342 if err != nil {
343 return nil, err
344 }
345
346 ps.Created, err = time.Parse(time.RFC3339, created)
347 if err != nil {
348 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
349 }
350
351 pipelineAt := ps.PipelineAt()
352
353 // extract
354 pipeline, ok := pipelines[pipelineAt]
355 if !ok {
356 continue
357 }
358 statuses, _ := pipeline.Statuses[ps.Workflow]
359 if !ok {
360 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
361 }
362
363 // append
364 statuses.Data = append(statuses.Data, ps)
365
366 // reassign
367 pipeline.Statuses[ps.Workflow] = statuses
368 pipelines[pipelineAt] = pipeline
369 }
370
371 var all []models.Pipeline
372 for _, p := range pipelines {
373 for _, s := range p.Statuses {
374 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
375 if a.Created.After(b.Created) {
376 return 1
377 }
378 if a.Created.Before(b.Created) {
379 return -1
380 }
381 if a.ID > b.ID {
382 return 1
383 }
384 if a.ID < b.ID {
385 return -1
386 }
387 return 0
388 })
389 }
390 all = append(all, p)
391 }
392
393 // sort pipelines by date
394 slices.SortFunc(all, func(a, b models.Pipeline) int {
395 if a.Created.After(b.Created) {
396 return -1
397 }
398 return 1
399 })
400
401 return all, nil
402}
403
404// the pipelines table is aliased to `p`
405// the triggers table is aliased to `t`
406func GetPipelineCount(e Execer, filters ...orm.Filter) (int64, error) {
407 var conditions []string
408 var args []any
409 for _, filter := range filters {
410 conditions = append(conditions, filter.Condition())
411 args = append(args, filter.Arg()...)
412 }
413
414 whereClause := ""
415 if conditions != nil {
416 whereClause = " where " + strings.Join(conditions, " and ")
417 }
418
419 query := fmt.Sprintf(`
420 select
421 count(1)
422 from
423 pipelines p
424 join
425 triggers t ON p.trigger_id = t.id
426 %s
427 `, whereClause)
428
429 rows, err := e.Query(query, args...)
430 if err != nil {
431 return 0, err
432 }
433 defer rows.Close()
434
435 for rows.Next() {
436 var count int64
437 err := rows.Scan(&count)
438 if err != nil {
439 return 0, err
440 }
441
442 return count, nil
443 }
444
445 // unreachable
446 return 0, nil
447}