Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

at icy/yovxsu 8.9 kB View raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "slices" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/orm" 14) 15 16func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 17 var pipelines []models.Pipeline 18 19 var conditions []string 20 var args []any 21 for _, filter := range filters { 22 conditions = append(conditions, filter.Condition()) 23 args = append(args, filter.Arg()...) 24 } 25 26 whereClause := "" 27 if conditions != nil { 28 whereClause = " where " + strings.Join(conditions, " and ") 29 } 30 31 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause) 32 33 rows, err := e.Query(query, args...) 34 35 if err != nil { 36 return nil, err 37 } 38 defer rows.Close() 39 40 for rows.Next() { 41 var pipeline models.Pipeline 42 var createdAt string 43 var repoDid sql.NullString 44 err = rows.Scan( 45 &pipeline.Id, 46 &pipeline.Rkey, 47 &pipeline.Knot, 48 &pipeline.RepoOwner, 49 &pipeline.RepoName, 50 &pipeline.Sha, 51 &createdAt, 52 &repoDid, 53 ) 54 if err != nil { 55 return nil, err 56 } 57 58 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 59 pipeline.Created = t 60 } 61 if repoDid.Valid { 62 pipeline.RepoDid = repoDid.String 63 } 64 65 pipelines = append(pipelines, pipeline) 66 } 67 68 if err = rows.Err(); err != nil { 69 return nil, err 70 } 71 72 return pipelines, nil 73} 74 75func AddPipeline(e Execer, pipeline models.Pipeline) error { 76 var repoDid *string 77 if pipeline.RepoDid != "" { 78 repoDid = &pipeline.RepoDid 79 } 80 81 args := []any{ 82 pipeline.Rkey, 83 pipeline.Knot, 84 pipeline.RepoOwner, 85 pipeline.RepoName, 86 pipeline.TriggerId, 87 pipeline.Sha, 88 repoDid, 89 } 90 91 placeholders := make([]string, len(args)) 92 for i := range placeholders { 93 placeholders[i] = "?" 94 } 95 96 query := fmt.Sprintf(` 97 insert or ignore into pipelines ( 98 rkey, 99 knot, 100 repo_owner, 101 repo_name, 102 trigger_id, 103 sha, 104 repo_did 105 ) values (%s) 106 `, strings.Join(placeholders, ",")) 107 108 _, err := e.Exec(query, args...) 109 110 return err 111} 112 113func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 114 args := []any{ 115 trigger.Kind, 116 trigger.PushRef, 117 trigger.PushNewSha, 118 trigger.PushOldSha, 119 trigger.PRSourceBranch, 120 trigger.PRTargetBranch, 121 trigger.PRSourceSha, 122 trigger.PRAction, 123 } 124 125 placeholders := make([]string, len(args)) 126 for i := range placeholders { 127 placeholders[i] = "?" 128 } 129 130 query := fmt.Sprintf(`insert or ignore into triggers ( 131 kind, 132 push_ref, 133 push_new_sha, 134 push_old_sha, 135 pr_source_branch, 136 pr_target_branch, 137 pr_source_sha, 138 pr_action 139 ) values (%s)`, strings.Join(placeholders, ",")) 140 141 res, err := e.Exec(query, args...) 142 if err != nil { 143 return 0, err 144 } 145 146 return res.LastInsertId() 147} 148 149func AddPipelineStatus(ctx context.Context, e Execer, status models.PipelineStatus) error { 150 args := []any{ 151 status.Spindle, 152 status.Rkey, 153 status.PipelineKnot, 154 status.PipelineRkey, 155 status.Workflow, 156 status.Status, 157 status.Error, 158 status.ExitCode, 159 status.Created.Format(time.RFC3339), 160 } 161 162 placeholders := make([]string, len(args)) 163 for i := range placeholders { 164 placeholders[i] = "?" 165 } 166 167 query := fmt.Sprintf(` 168 insert or ignore into pipeline_statuses ( 169 spindle, 170 rkey, 171 pipeline_knot, 172 pipeline_rkey, 173 workflow, 174 status, 175 error, 176 exit_code, 177 created 178 ) values (%s) 179 `, strings.Join(placeholders, ",")) 180 181 _, err := e.ExecContext(ctx, query, args...) 182 return err 183} 184 185// this is a mega query, but the most useful one: 186// get N pipelines, for each one get the latest status of its N workflows 187// 188// the pipelines table is aliased to `p` 189// the triggers table is aliased to `t` 190func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 191 var conditions []string 192 var args []any 193 for _, filter := range filters { 194 conditions = append(conditions, filter.Condition()) 195 args = append(args, filter.Arg()...) 196 } 197 198 whereClause := "" 199 if conditions != nil { 200 whereClause = " where " + strings.Join(conditions, " and ") 201 } 202 203 query := fmt.Sprintf(` 204 select 205 p.id, 206 p.knot, 207 p.rkey, 208 p.repo_owner, 209 p.repo_name, 210 p.sha, 211 p.created, 212 p.repo_did, 213 t.id, 214 t.kind, 215 t.push_ref, 216 t.push_new_sha, 217 t.push_old_sha, 218 t.pr_source_branch, 219 t.pr_target_branch, 220 t.pr_source_sha, 221 t.pr_action 222 from 223 pipelines p 224 join 225 triggers t ON p.trigger_id = t.id 226 %s 227 order by p.created desc 228 limit %d 229 `, whereClause, limit) 230 231 rows, err := e.Query(query, args...) 232 if err != nil { 233 return nil, err 234 } 235 defer rows.Close() 236 237 pipelines := make(map[syntax.ATURI]models.Pipeline) 238 for rows.Next() { 239 var p models.Pipeline 240 var t models.Trigger 241 var created string 242 var repoDid sql.NullString 243 244 err := rows.Scan( 245 &p.Id, 246 &p.Knot, 247 &p.Rkey, 248 &p.RepoOwner, 249 &p.RepoName, 250 &p.Sha, 251 &created, 252 &repoDid, 253 &p.TriggerId, 254 &t.Kind, 255 &t.PushRef, 256 &t.PushNewSha, 257 &t.PushOldSha, 258 &t.PRSourceBranch, 259 &t.PRTargetBranch, 260 &t.PRSourceSha, 261 &t.PRAction, 262 ) 263 if err != nil { 264 return nil, err 265 } 266 267 p.Created, err = time.Parse(time.RFC3339, created) 268 if err != nil { 269 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 270 } 271 if repoDid.Valid { 272 p.RepoDid = repoDid.String 273 } 274 275 t.Id = p.TriggerId 276 p.Trigger = &t 277 p.Statuses = make(map[string]models.WorkflowStatus) 278 279 pipelines[p.AtUri()] = p 280 } 281 282 // get all statuses 283 // the where clause here is of the form: 284 // 285 // and ( 286 // (ps.pipeline_knot = k1 and ps.pipeline_rkey = r1) 287 // or (ps.pipeline_knot = k2 and ps.pipeline_rkey = r2) 288 // ) 289 // 290 // the join on pipelines and repos enforces that the status was emitted 291 // by the spindle that is actually registered for the pipeline's repo. 292 conditions = nil 293 args = nil 294 for _, p := range pipelines { 295 knotFilter := orm.FilterEq("ps.pipeline_knot", p.Knot) 296 rkeyFilter := orm.FilterEq("ps.pipeline_rkey", p.Rkey) 297 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 298 args = append(args, p.Knot) 299 args = append(args, p.Rkey) 300 } 301 whereClause = "" 302 if conditions != nil { 303 whereClause = "and (" + strings.Join(conditions, " or ") + ")" 304 } 305 query = fmt.Sprintf(` 306 select 307 ps.id, ps.spindle, ps.rkey, ps.pipeline_knot, ps.pipeline_rkey, 308 ps.created, ps.workflow, ps.status, ps.error, ps.exit_code 309 from 310 pipeline_statuses ps 311 join 312 pipelines p on p.knot = ps.pipeline_knot and p.rkey = ps.pipeline_rkey 313 join 314 repos r on r.repo_did = p.repo_did 315 where 316 ps.spindle = r.spindle 317 %s 318 `, whereClause) 319 320 rows, err = e.Query(query, args...) 321 if err != nil { 322 return nil, err 323 } 324 defer rows.Close() 325 326 for rows.Next() { 327 var ps models.PipelineStatus 328 var created string 329 330 err := rows.Scan( 331 &ps.ID, 332 &ps.Spindle, 333 &ps.Rkey, 334 &ps.PipelineKnot, 335 &ps.PipelineRkey, 336 &created, 337 &ps.Workflow, 338 &ps.Status, 339 &ps.Error, 340 &ps.ExitCode, 341 ) 342 if err != nil { 343 return nil, err 344 } 345 346 ps.Created, err = time.Parse(time.RFC3339, created) 347 if err != nil { 348 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 349 } 350 351 pipelineAt := ps.PipelineAt() 352 353 // extract 354 pipeline, ok := pipelines[pipelineAt] 355 if !ok { 356 continue 357 } 358 statuses, _ := pipeline.Statuses[ps.Workflow] 359 if !ok { 360 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 361 } 362 363 // append 364 statuses.Data = append(statuses.Data, ps) 365 366 // reassign 367 pipeline.Statuses[ps.Workflow] = statuses 368 pipelines[pipelineAt] = pipeline 369 } 370 371 var all []models.Pipeline 372 for _, p := range pipelines { 373 for _, s := range p.Statuses { 374 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 375 if a.Created.After(b.Created) { 376 return 1 377 } 378 if a.Created.Before(b.Created) { 379 return -1 380 } 381 if a.ID > b.ID { 382 return 1 383 } 384 if a.ID < b.ID { 385 return -1 386 } 387 return 0 388 }) 389 } 390 all = append(all, p) 391 } 392 393 // sort pipelines by date 394 slices.SortFunc(all, func(a, b models.Pipeline) int { 395 if a.Created.After(b.Created) { 396 return -1 397 } 398 return 1 399 }) 400 401 return all, nil 402} 403 404// the pipelines table is aliased to `p` 405// the triggers table is aliased to `t` 406func GetPipelineCount(e Execer, filters ...orm.Filter) (int64, error) { 407 var conditions []string 408 var args []any 409 for _, filter := range filters { 410 conditions = append(conditions, filter.Condition()) 411 args = append(args, filter.Arg()...) 412 } 413 414 whereClause := "" 415 if conditions != nil { 416 whereClause = " where " + strings.Join(conditions, " and ") 417 } 418 419 query := fmt.Sprintf(` 420 select 421 count(1) 422 from 423 pipelines p 424 join 425 triggers t ON p.trigger_id = t.id 426 %s 427 `, whereClause) 428 429 rows, err := e.Query(query, args...) 430 if err != nil { 431 return 0, err 432 } 433 defer rows.Close() 434 435 for rows.Next() { 436 var count int64 437 err := rows.Scan(&count) 438 if err != nil { 439 return 0, err 440 } 441 442 return count, nil 443 } 444 445 // unreachable 446 return 0, nil 447}