Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

appview, spindle: remove spindlestream

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 26, 2026, 4:32 PM +0900) commit 1c8161d6 parent f79de854 change-id smnrqutm
+205 -1153
-447
appview/db/pipeline.go
··· 1 - package db 2 - 3 - import ( 4 - "context" 5 - "database/sql" 6 - "fmt" 7 - "slices" 8 - "strings" 9 - "time" 10 - 11 - "github.com/bluesky-social/indigo/atproto/syntax" 12 - "tangled.org/core/appview/models" 13 - "tangled.org/core/orm" 14 - ) 15 - 16 - func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 17 - var pipelines []models.Pipeline 18 - 19 - var conditions []string 20 - var args []any 21 - for _, filter := range filters { 22 - conditions = append(conditions, filter.Condition()) 23 - args = append(args, filter.Arg()...) 24 - } 25 - 26 - whereClause := "" 27 - if conditions != nil { 28 - whereClause = " where " + strings.Join(conditions, " and ") 29 - } 30 - 31 - query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause) 32 - 33 - rows, err := e.Query(query, args...) 34 - 35 - if err != nil { 36 - return nil, err 37 - } 38 - defer rows.Close() 39 - 40 - for rows.Next() { 41 - var pipeline models.Pipeline 42 - var createdAt string 43 - var repoDid sql.NullString 44 - err = rows.Scan( 45 - &pipeline.Id, 46 - &pipeline.Rkey, 47 - &pipeline.Knot, 48 - &pipeline.RepoOwner, 49 - &pipeline.RepoName, 50 - &pipeline.Sha, 51 - &createdAt, 52 - &repoDid, 53 - ) 54 - if err != nil { 55 - return nil, err 56 - } 57 - 58 - if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 59 - pipeline.Created = t 60 - } 61 - if repoDid.Valid { 62 - pipeline.RepoDid = repoDid.String 63 - } 64 - 65 - pipelines = append(pipelines, pipeline) 66 - } 67 - 68 - if err = rows.Err(); err != nil { 69 - return nil, err 70 - } 71 - 72 - return pipelines, nil 73 - } 74 - 75 - func AddPipeline(e Execer, pipeline models.Pipeline) error { 76 - var repoDid *string 77 - if pipeline.RepoDid != "" { 78 - repoDid = &pipeline.RepoDid 79 - } 80 - 81 - args := []any{ 82 - pipeline.Rkey, 83 - pipeline.Knot, 84 - pipeline.RepoOwner, 85 - pipeline.RepoName, 86 - pipeline.TriggerId, 87 - pipeline.Sha, 88 - repoDid, 89 - } 90 - 91 - placeholders := make([]string, len(args)) 92 - for i := range placeholders { 93 - placeholders[i] = "?" 94 - } 95 - 96 - query := fmt.Sprintf(` 97 - insert or ignore into pipelines ( 98 - rkey, 99 - knot, 100 - repo_owner, 101 - repo_name, 102 - trigger_id, 103 - sha, 104 - repo_did 105 - ) values (%s) 106 - `, strings.Join(placeholders, ",")) 107 - 108 - _, err := e.Exec(query, args...) 109 - 110 - return err 111 - } 112 - 113 - func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 114 - args := []any{ 115 - trigger.Kind, 116 - trigger.PushRef, 117 - trigger.PushNewSha, 118 - trigger.PushOldSha, 119 - trigger.PRSourceBranch, 120 - trigger.PRTargetBranch, 121 - trigger.PRSourceSha, 122 - trigger.PRAction, 123 - } 124 - 125 - placeholders := make([]string, len(args)) 126 - for i := range placeholders { 127 - placeholders[i] = "?" 128 - } 129 - 130 - query := fmt.Sprintf(`insert or ignore into triggers ( 131 - kind, 132 - push_ref, 133 - push_new_sha, 134 - push_old_sha, 135 - pr_source_branch, 136 - pr_target_branch, 137 - pr_source_sha, 138 - pr_action 139 - ) values (%s)`, strings.Join(placeholders, ",")) 140 - 141 - res, err := e.Exec(query, args...) 142 - if err != nil { 143 - return 0, err 144 - } 145 - 146 - return res.LastInsertId() 147 - } 148 - 149 - func AddPipelineStatus(ctx context.Context, e Execer, status models.PipelineStatus) error { 150 - args := []any{ 151 - status.Spindle, 152 - status.Rkey, 153 - status.PipelineKnot, 154 - status.PipelineRkey, 155 - status.Workflow, 156 - status.Status, 157 - status.Error, 158 - status.ExitCode, 159 - status.Created.Format(time.RFC3339), 160 - } 161 - 162 - placeholders := make([]string, len(args)) 163 - for i := range placeholders { 164 - placeholders[i] = "?" 165 - } 166 - 167 - query := fmt.Sprintf(` 168 - insert or ignore into pipeline_statuses ( 169 - spindle, 170 - rkey, 171 - pipeline_knot, 172 - pipeline_rkey, 173 - workflow, 174 - status, 175 - error, 176 - exit_code, 177 - created 178 - ) values (%s) 179 - `, strings.Join(placeholders, ",")) 180 - 181 - _, err := e.ExecContext(ctx, query, args...) 182 - return err 183 - } 184 - 185 - // this is a mega query, but the most useful one: 186 - // get N pipelines, for each one get the latest status of its N workflows 187 - // 188 - // the pipelines table is aliased to `p` 189 - // the triggers table is aliased to `t` 190 - func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 191 - var conditions []string 192 - var args []any 193 - for _, filter := range filters { 194 - conditions = append(conditions, filter.Condition()) 195 - args = append(args, filter.Arg()...) 196 - } 197 - 198 - whereClause := "" 199 - if conditions != nil { 200 - whereClause = " where " + strings.Join(conditions, " and ") 201 - } 202 - 203 - query := fmt.Sprintf(` 204 - select 205 - p.id, 206 - p.knot, 207 - p.rkey, 208 - p.repo_owner, 209 - p.repo_name, 210 - p.sha, 211 - p.created, 212 - p.repo_did, 213 - t.id, 214 - t.kind, 215 - t.push_ref, 216 - t.push_new_sha, 217 - t.push_old_sha, 218 - t.pr_source_branch, 219 - t.pr_target_branch, 220 - t.pr_source_sha, 221 - t.pr_action 222 - from 223 - pipelines p 224 - join 225 - triggers t ON p.trigger_id = t.id 226 - %s 227 - order by p.created desc 228 - limit %d 229 - `, whereClause, limit) 230 - 231 - rows, err := e.Query(query, args...) 232 - if err != nil { 233 - return nil, err 234 - } 235 - defer rows.Close() 236 - 237 - pipelines := make(map[syntax.ATURI]models.Pipeline) 238 - for rows.Next() { 239 - var p models.Pipeline 240 - var t models.Trigger 241 - var created string 242 - var repoDid sql.NullString 243 - 244 - err := rows.Scan( 245 - &p.Id, 246 - &p.Knot, 247 - &p.Rkey, 248 - &p.RepoOwner, 249 - &p.RepoName, 250 - &p.Sha, 251 - &created, 252 - &repoDid, 253 - &p.TriggerId, 254 - &t.Kind, 255 - &t.PushRef, 256 - &t.PushNewSha, 257 - &t.PushOldSha, 258 - &t.PRSourceBranch, 259 - &t.PRTargetBranch, 260 - &t.PRSourceSha, 261 - &t.PRAction, 262 - ) 263 - if err != nil { 264 - return nil, err 265 - } 266 - 267 - p.Created, err = time.Parse(time.RFC3339, created) 268 - if err != nil { 269 - return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 270 - } 271 - if repoDid.Valid { 272 - p.RepoDid = repoDid.String 273 - } 274 - 275 - t.Id = p.TriggerId 276 - p.Trigger = &t 277 - p.Statuses = make(map[string]models.WorkflowStatus) 278 - 279 - pipelines[p.AtUri()] = p 280 - } 281 - 282 - // get all statuses 283 - // the where clause here is of the form: 284 - // 285 - // and ( 286 - // (ps.pipeline_knot = k1 and ps.pipeline_rkey = r1) 287 - // or (ps.pipeline_knot = k2 and ps.pipeline_rkey = r2) 288 - // ) 289 - // 290 - // the join on pipelines and repos enforces that the status was emitted 291 - // by the spindle that is actually registered for the pipeline's repo. 292 - conditions = nil 293 - args = nil 294 - for _, p := range pipelines { 295 - knotFilter := orm.FilterEq("ps.pipeline_knot", p.Knot) 296 - rkeyFilter := orm.FilterEq("ps.pipeline_rkey", p.Rkey) 297 - conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 298 - args = append(args, p.Knot) 299 - args = append(args, p.Rkey) 300 - } 301 - whereClause = "" 302 - if conditions != nil { 303 - whereClause = "and (" + strings.Join(conditions, " or ") + ")" 304 - } 305 - query = fmt.Sprintf(` 306 - select 307 - ps.id, ps.spindle, ps.rkey, ps.pipeline_knot, ps.pipeline_rkey, 308 - ps.created, ps.workflow, ps.status, ps.error, ps.exit_code 309 - from 310 - pipeline_statuses ps 311 - join 312 - pipelines p on p.knot = ps.pipeline_knot and p.rkey = ps.pipeline_rkey 313 - join 314 - repos r on r.repo_did = p.repo_did 315 - where 316 - ps.spindle = r.spindle 317 - %s 318 - `, whereClause) 319 - 320 - rows, err = e.Query(query, args...) 321 - if err != nil { 322 - return nil, err 323 - } 324 - defer rows.Close() 325 - 326 - for rows.Next() { 327 - var ps models.PipelineStatus 328 - var created string 329 - 330 - err := rows.Scan( 331 - &ps.ID, 332 - &ps.Spindle, 333 - &ps.Rkey, 334 - &ps.PipelineKnot, 335 - &ps.PipelineRkey, 336 - &created, 337 - &ps.Workflow, 338 - &ps.Status, 339 - &ps.Error, 340 - &ps.ExitCode, 341 - ) 342 - if err != nil { 343 - return nil, err 344 - } 345 - 346 - ps.Created, err = time.Parse(time.RFC3339, created) 347 - if err != nil { 348 - return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 349 - } 350 - 351 - pipelineAt := ps.PipelineAt() 352 - 353 - // extract 354 - pipeline, ok := pipelines[pipelineAt] 355 - if !ok { 356 - continue 357 - } 358 - statuses, _ := pipeline.Statuses[ps.Workflow] 359 - if !ok { 360 - pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 361 - } 362 - 363 - // append 364 - statuses.Data = append(statuses.Data, ps) 365 - 366 - // reassign 367 - pipeline.Statuses[ps.Workflow] = statuses 368 - pipelines[pipelineAt] = pipeline 369 - } 370 - 371 - var all []models.Pipeline 372 - for _, p := range pipelines { 373 - for _, s := range p.Statuses { 374 - slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 375 - if a.Created.After(b.Created) { 376 - return 1 377 - } 378 - if a.Created.Before(b.Created) { 379 - return -1 380 - } 381 - if a.ID > b.ID { 382 - return 1 383 - } 384 - if a.ID < b.ID { 385 - return -1 386 - } 387 - return 0 388 - }) 389 - } 390 - all = append(all, p) 391 - } 392 - 393 - // sort pipelines by date 394 - slices.SortFunc(all, func(a, b models.Pipeline) int { 395 - if a.Created.After(b.Created) { 396 - return -1 397 - } 398 - return 1 399 - }) 400 - 401 - return all, nil 402 - } 403 - 404 - // the pipelines table is aliased to `p` 405 - // the triggers table is aliased to `t` 406 - func GetPipelineCount(e Execer, filters ...orm.Filter) (int64, error) { 407 - var conditions []string 408 - var args []any 409 - for _, filter := range filters { 410 - conditions = append(conditions, filter.Condition()) 411 - args = append(args, filter.Arg()...) 412 - } 413 - 414 - whereClause := "" 415 - if conditions != nil { 416 - whereClause = " where " + strings.Join(conditions, " and ") 417 - } 418 - 419 - query := fmt.Sprintf(` 420 - select 421 - count(1) 422 - from 423 - pipelines p 424 - join 425 - triggers t ON p.trigger_id = t.id 426 - %s 427 - `, whereClause) 428 - 429 - rows, err := e.Query(query, args...) 430 - if err != nil { 431 - return 0, err 432 - } 433 - defer rows.Close() 434 - 435 - for rows.Next() { 436 - var count int64 437 - err := rows.Scan(&count) 438 - if err != nil { 439 - return 0, err 440 - } 441 - 442 - return count, nil 443 - } 444 - 445 - // unreachable 446 - return 0, nil 447 - }
-116
appview/db/pipeline_test.go
··· 1 - package db 2 - 3 - import ( 4 - "context" 5 - "strings" 6 - "testing" 7 - "time" 8 - 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 - "tangled.org/core/appview/models" 11 - "tangled.org/core/orm" 12 - spindle "tangled.org/core/spindle/models" 13 - "tangled.org/core/workflow" 14 - ) 15 - 16 - // seedPipeline inserts a trigger + pipeline row and returns the pipeline. 17 - func seedPipeline(t *testing.T, d *DB, knot, rkey, repoDid string) models.Pipeline { 18 - t.Helper() 19 - sha := strings.Repeat("a", 40) 20 - ref := "refs/heads/main" 21 - newSha := sha 22 - oldSha := strings.Repeat("0", 40) 23 - trigger := models.Trigger{ 24 - Kind: workflow.TriggerKindPush, 25 - PushRef: &ref, 26 - PushNewSha: &newSha, 27 - PushOldSha: &oldSha, 28 - } 29 - tx, err := d.Begin() 30 - if err != nil { 31 - t.Fatalf("Begin: %v", err) 32 - } 33 - triggerID, err := AddTrigger(tx, trigger) 34 - if err != nil { 35 - tx.Rollback() 36 - t.Fatalf("AddTrigger: %v", err) 37 - } 38 - pipeline := models.Pipeline{ 39 - Knot: knot, 40 - Rkey: rkey, 41 - RepoOwner: syntax.DID("did:plc:owner"), 42 - RepoName: "repo", 43 - RepoDid: repoDid, 44 - TriggerId: int(triggerID), 45 - Sha: sha, 46 - } 47 - if err := AddPipeline(tx, pipeline); err != nil { 48 - tx.Rollback() 49 - t.Fatalf("AddPipeline: %v", err) 50 - } 51 - if err := tx.Commit(); err != nil { 52 - t.Fatalf("Commit: %v", err) 53 - } 54 - return pipeline 55 - } 56 - 57 - // seedStatus inserts a pipeline_status row directly. 58 - func seedStatus(t *testing.T, d *DB, spindleInstance, rkey, pipelineKnot, pipelineRkey, workflow string) { 59 - t.Helper() 60 - status := models.PipelineStatus{ 61 - Spindle: spindleInstance, 62 - Rkey: rkey, 63 - PipelineKnot: pipelineKnot, 64 - PipelineRkey: pipelineRkey, 65 - Workflow: workflow, 66 - Status: spindle.StatusKindSuccess, 67 - Created: time.Now(), 68 - } 69 - if err := AddPipelineStatus(context.Background(), d, status); err != nil { 70 - t.Fatalf("AddPipelineStatus: %v", err) 71 - } 72 - } 73 - 74 - // TestGetPipelineStatuses_SpindleValidation verifies that GetPipelineStatuses 75 - // only returns statuses emitted by the spindle registered for the pipeline's 76 - // repo, and silently drops statuses from a rogue spindle. 77 - func TestGetPipelineStatuses_SpindleValidation(t *testing.T) { 78 - d := newTestDB(t) 79 - 80 - const ( 81 - knot = "knot.example.com" 82 - correctSpindle = "spindle.example.com" 83 - rogueSpindle = "evil.example.com" 84 - repoDid = "did:plc:testrepo" 85 - pipelineRkey = "pipeline1" 86 - ) 87 - 88 - // seed repo with the correct spindle 89 - repo := seedRepo(t, d, "did:plc:owner", knot, "repo", "repo", repoDid) 90 - if err := UpdateSpindle(d, repo.RepoDid, &[]string{correctSpindle}[0]); err != nil { 91 - t.Fatalf("UpdateSpindle: %v", err) 92 - } 93 - 94 - // seed the pipeline for this repo 95 - seedPipeline(t, d, knot, pipelineRkey, repoDid) 96 - 97 - // insert one status from the correct spindle, one from a rogue spindle 98 - seedStatus(t, d, correctSpindle, "status-valid", knot, pipelineRkey, "build") 99 - seedStatus(t, d, rogueSpindle, "status-rogue", knot, pipelineRkey, "build") 100 - 101 - pipelines, err := GetPipelineStatuses(d, 10, orm.FilterEq("p.repo_did", repoDid)) 102 - if err != nil { 103 - t.Fatalf("GetPipelineStatuses: %v", err) 104 - } 105 - if len(pipelines) != 1 { 106 - t.Fatalf("expected 1 pipeline, got %d", len(pipelines)) 107 - } 108 - 109 - statuses := pipelines[0].Statuses["build"].Data 110 - if len(statuses) != 1 { 111 - t.Fatalf("expected 1 status (from correct spindle), got %d", len(statuses)) 112 - } 113 - if statuses[0].Spindle != correctSpindle { 114 - t.Errorf("expected spindle %q, got %q", correctSpindle, statuses[0].Spindle) 115 - } 116 - }
+7 -7
appview/pages/pages.go
··· 889 889 EmailToDid map[string]string 890 890 VerifiedCommits commitverify.VerifiedCommits 891 891 Languages []types.RepoLanguageDetails 892 - Pipelines map[string]models.Pipeline 892 + Pipelines map[string]*tangled.CiDefs_Pipeline 893 893 NeedsKnotUpgrade bool 894 894 KnotUnreachable bool 895 895 types.RepoIndexResponse ··· 936 936 Active string 937 937 EmailToDid map[string]string 938 938 VerifiedCommits commitverify.VerifiedCommits 939 - Pipelines map[string]models.Pipeline 939 + Pipelines map[string]*tangled.CiDefs_Pipeline 940 940 941 941 types.RepoLogResponse 942 942 } ··· 951 951 RepoInfo repoinfo.RepoInfo 952 952 Active string 953 953 EmailToDid map[string]string 954 - Pipeline *models.Pipeline 954 + Pipeline *tangled.CiDefs_Pipeline 955 955 DiffOpts types.DiffOpts 956 956 957 957 // singular because it's always going to be just one ··· 1351 1351 FilterQuery string 1352 1352 BaseFilterQuery string 1353 1353 Stacks []models.Stack 1354 - Pipelines map[string]models.Pipeline 1354 + Pipelines map[string]tangled.CiDefs_Pipeline 1355 1355 LabelDefs map[string]*models.LabelDefinition 1356 1356 Page pagination.Page 1357 1357 PullCount int ··· 1391 1391 BranchDeleteStatus *models.BranchDeleteStatus 1392 1392 MergeCheck types.MergeCheckResponse 1393 1393 ResubmitCheck ResubmitResult 1394 - Pipelines map[string]models.Pipeline 1394 + Pipelines map[string]tangled.CiDefs_Pipeline 1395 1395 Diff types.DiffRenderer 1396 1396 DiffOpts types.DiffOpts 1397 1397 ActiveRound int ··· 1563 1563 type PipelinesParams struct { 1564 1564 BaseParams 1565 1565 RepoInfo repoinfo.RepoInfo 1566 - Pipelines []models.Pipeline 1566 + Pipelines []*tangled.CiDefs_Pipeline 1567 1567 Active string 1568 1568 FilterKind string 1569 1569 Total int64 ··· 1617 1617 type WorkflowParams struct { 1618 1618 BaseParams 1619 1619 RepoInfo repoinfo.RepoInfo 1620 - Pipeline models.Pipeline 1620 + Pipeline *tangled.CiDefs_Pipeline 1621 1621 Workflow string 1622 1622 LogUrl string 1623 1623 Active string
+17
appview/pipelines/logs.go
··· 1 1 package pipelines 2 2 3 3 import ( 4 + "fmt" 4 5 "html/template" 6 + "net/url" 5 7 "path" 6 8 "regexp" 7 9 "strings" 8 10 11 + "github.com/bluesky-social/indigo/atproto/syntax" 9 12 terminal "github.com/buildkite/terminal-to-html/v3" 10 13 "github.com/gorilla/websocket" 14 + "tangled.org/core/api/tangled" 11 15 "tangled.org/core/appview/pages/markup" 12 16 "tangled.org/core/hostutil" 13 17 ) ··· 81 85 } 82 86 } 83 87 88 + func SubscribeLogsUrl(spindle string, pipeline syntax.TID, workflow string) string { 89 + u, err := hostutil.EnsureWsScheme(spindle) 90 + if err != nil { 91 + return "" 92 + } 93 + 94 + query := url.Values{} 95 + query.Set("pipeline", pipeline.String()) 96 + query.Set("workflow", workflow) 97 + return fmt.Sprintf("%s/xrpc/%s?%s", u, tangled.CiWorkflowSubscribeLogsNSID, query.Encode()) 98 + } 99 + 100 + // TODO(boltless): deprecate this 84 101 func SpindleURL(spindle, knot, rkey, workflow string) string { 85 102 url, err := hostutil.EnsureWsScheme(spindle) 86 103 if err != nil {
+84 -140
appview/pipelines/pipelines.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 - "fmt" 8 7 "log/slog" 9 8 "net/http" 10 9 "time" ··· 13 12 "tangled.org/core/appview/config" 14 13 "tangled.org/core/appview/db" 15 14 "tangled.org/core/appview/middleware" 16 - "tangled.org/core/appview/models" 17 15 "tangled.org/core/appview/oauth" 18 16 "tangled.org/core/appview/pages" 19 17 "tangled.org/core/appview/reporesolver" 20 - "tangled.org/core/eventconsumer" 21 18 "tangled.org/core/hostutil" 22 19 "tangled.org/core/idresolver" 23 20 "tangled.org/core/orm" 24 21 "tangled.org/core/rbac" 25 22 spindlemodel "tangled.org/core/spindle/models" 26 23 24 + "github.com/bluesky-social/indigo/atproto/syntax" 25 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 27 26 "github.com/go-chi/chi/v5" 28 27 "github.com/gorilla/websocket" 29 28 ) ··· 34 33 config *config.Config 35 34 oauth *oauth.OAuth 36 35 pages *pages.Pages 37 - spindlestream *eventconsumer.Consumer 38 36 pipelineNotifier *StatusNotifier 39 37 db *db.DB 40 38 enforcer *rbac.Enforcer ··· 48 46 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 49 47 r. 50 48 With(mw.RepoPermissionMiddleware("repo:owner")). 51 - Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 49 + Post("/{pipeline}/workflow/{workflow}/cancel", p.CancelWorkflow) 52 50 53 51 return r 54 52 } ··· 57 55 oauth *oauth.OAuth, 58 56 repoResolver *reporesolver.RepoResolver, 59 57 pages *pages.Pages, 60 - spindlestream *eventconsumer.Consumer, 61 58 pipelineNotifier *StatusNotifier, 62 59 idResolver *idresolver.Resolver, 63 60 db *db.DB, ··· 71 68 pages: pages, 72 69 idResolver: idResolver, 73 70 config: config, 74 - spindlestream: spindlestream, 75 71 pipelineNotifier: pipelineNotifier, 76 72 db: db, 77 73 enforcer: enforcer, ··· 103 99 filterKind = "all" 104 100 } 105 101 106 - ps, err := db.GetPipelineStatuses( 107 - p.db, 108 - 30, 109 - filters..., 110 - ) 111 - if err != nil { 112 - l.Error("failed to query db", "err", err) 113 - return 114 - } 115 - 116 - total, err := db.GetPipelineCount(p.db, filters...) 102 + // sh.tangled.ci.queryPipelines(repo, kind, limit=30) 103 + xrpcc := indigoxrpc.Client{Host: f.Spindle} 104 + out, err := tangled.CiQueryPipelines(r.Context(), &xrpcc, nil, "", 1, f.RepoDid) 117 105 if err != nil { 118 - l.Error("failed to query db", "err", err) 119 - return 106 + l.Error("failed to fetch pipelines", "err", err) 107 + panic("unimplemented") // spindle failure, appview should not fail. 120 108 } 121 109 122 110 p.pages.Pipelines(w, pages.PipelinesParams{ 123 111 BaseParams: pages.BaseParamsFromContext(r.Context()), 124 112 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 125 - Pipelines: ps, 113 + Pipelines: out.Pipelines, 126 114 FilterKind: filterKind, 127 - Total: total, 115 + Total: out.Total, 128 116 }) 129 117 } 130 118 ··· 135 123 f, err := p.repoResolver.Resolve(r) 136 124 if err != nil { 137 125 l.Error("failed to get repo and knot", "err", err) 126 + p.pages.Error404(w) 138 127 return 139 128 } 140 129 141 - pipelineId := chi.URLParam(r, "pipeline") 142 - if pipelineId == "" { 143 - l.Error("empty pipeline ID") 130 + pipelineId, err := syntax.ParseTID(chi.URLParam(r, "pipeline")) 131 + if err != nil { 132 + l.Debug("invalid pipeline id", "id", pipelineId) 133 + p.pages.Error404(w) 144 134 return 145 135 } 146 136 147 - workflow := chi.URLParam(r, "workflow") 148 - if workflow == "" { 149 - l.Error("empty workflow name") 137 + workflowName := chi.URLParam(r, "workflow") 138 + if workflowName == "" { 139 + l.Debug("empty workflow name") 140 + p.pages.Error404(w) 150 141 return 151 142 } 152 143 153 - ps, err := db.GetPipelineStatuses( 154 - p.db, 155 - 1, 156 - orm.FilterEq("p.repo_did", f.RepoDid), 157 - orm.FilterEq("p.id", pipelineId), 158 - ) 144 + l = l.With("pipeline", pipelineId, "workflow", workflowName) 145 + 146 + // TODO: change url path to: 147 + // /{owner}/{slug}/pipelines/{spindle}/{pipeline(tid)}/workflow/{workflow} 148 + // do we want spindle DID or just Hostname? <- DID will make more sense since it's not defined in DID doc. 149 + 150 + xrpcc := &indigoxrpc.Client{Host: f.Spindle} 151 + out, err := tangled.CiGetPipeline(r.Context(), xrpcc, pipelineId.String()) 159 152 if err != nil { 160 - l.Error("failed to query db", "err", err) 153 + // TODO(boltless): change behavior based on error 154 + l.Debug("failed to get pipeline", "err", err) 155 + p.pages.Error404(w) 161 156 return 162 157 } 163 158 164 - if len(ps) != 1 { 165 - l.Error("invalid number of pipelines", "len", len(ps)) 159 + // ensure workflow exists 160 + exist := false 161 + for _, workflow := range out.Workflows { 162 + if workflow.Name == workflowName { 163 + exist = true 164 + break 165 + } 166 + } 167 + if !exist { 168 + l.Debug("workflow doesn't exist in pipeline") 169 + p.pages.Error404(w) 166 170 return 167 171 } 168 - 169 - singlePipeline := ps[0] 170 172 171 173 p.pages.Workflow(w, pages.WorkflowParams{ 172 174 BaseParams: pages.BaseParamsFromContext(r.Context()), 173 175 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 174 - Pipeline: singlePipeline, 175 - Workflow: workflow, 176 + Pipeline: out, 177 + Workflow: workflowName, 176 178 }) 177 179 } 178 180 ··· 191 193 return 192 194 } 193 195 194 - pipelineId := chi.URLParam(r, "pipeline") 195 - workflow := chi.URLParam(r, "workflow") 196 - if pipelineId == "" || workflow == "" { 197 - http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 196 + if f.Spindle == "" { 197 + http.Error(w, "invalid repo info", http.StatusBadRequest) 198 198 return 199 199 } 200 200 201 - ps, err := db.GetPipelineStatuses( 202 - p.db, 203 - 1, 204 - orm.FilterEq("p.repo_did", f.RepoDid), 205 - orm.FilterEq("p.id", pipelineId), 206 - ) 207 - if err != nil || len(ps) != 1 { 208 - l.Error("pipeline query failed", "err", err, "count", len(ps)) 209 - http.Error(w, "pipeline not found", http.StatusNotFound) 201 + pipelineId, err := syntax.ParseTID(chi.URLParam(r, "pipeline")) 202 + if err != nil { 203 + l.Debug("invalid pipeline id", "id", pipelineId) 204 + http.Error(w, "invalid pipeline id", http.StatusBadRequest) 210 205 return 211 206 } 212 207 213 - singlePipeline := ps[0] 214 - spindle := f.Spindle 215 - knot := f.Knot 216 - rkey := singlePipeline.Rkey 217 - 218 - statusCh := p.pipelineNotifier.Subscribe(singlePipeline.AtUri()) 219 - defer p.pipelineNotifier.Unsubscribe(singlePipeline.AtUri(), statusCh) 220 - 221 - if spindle == "" || knot == "" || rkey == "" { 222 - http.Error(w, "invalid repo info", http.StatusBadRequest) 208 + workflowName := chi.URLParam(r, "workflow") 209 + if workflowName == "" { 210 + l.Debug("empty workflow name") 211 + http.Error(w, "invalid workflow name", http.StatusBadRequest) 223 212 return 224 213 } 225 214 226 - url := SpindleURL(spindle, knot, rkey, workflow) 227 - if url == "" { 215 + logsUrl := SubscribeLogsUrl(f.Spindle, pipelineId, workflowName) 216 + if logsUrl == "" { 228 217 http.Error(w, "invalid spindle hostname", http.StatusBadRequest) 229 218 return 230 219 } 231 - l = l.With("url", url) 220 + l = l.With("url", logsUrl) 232 221 233 222 clientConn, err := upgrader.Upgrade(w, r, nil) 234 223 if err != nil { ··· 249 238 250 239 l.Info("logs endpoint hit") 251 240 252 - spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 241 + spindleConn, _, err := websocket.DefaultDialer.Dial(logsUrl, nil) 253 242 if err != nil { 254 243 l.Error("websocket dial failed", "err", err) 255 244 return ··· 339 328 return 340 329 } 341 330 342 - case _, ok := <-statusCh: 343 - if !ok { 344 - continue 345 - } 346 - fresh, err := db.GetPipelineStatuses( 347 - p.db, 348 - 1, 349 - orm.FilterEq("p.repo_did", f.RepoDid), 350 - orm.FilterEq("p.id", pipelineId), 351 - ) 352 - if err != nil || len(fresh) == 0 { 353 - continue 354 - } 355 - for name, ws := range fresh[0].Statuses { 356 - fragment.Reset() 357 - if err = p.pages.WorkflowSymbolOOB(&fragment, pages.WorkflowSymbolOOBParams{ 358 - Name: name, 359 - Statuses: ws, 360 - }); err != nil { 361 - l.Error("failed to render workflow symbol OOB", "err", err) 362 - continue 363 - } 364 - if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 365 - l.Error("error writing workflow symbol to client", "err", err) 366 - return 367 - } 368 - } 369 - 370 331 case <-time.After(30 * time.Second): 371 332 l.Debug("sent keepalive") 372 333 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { ··· 377 338 } 378 339 } 379 340 380 - func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 381 - l := p.logger.With("handler", "Cancel") 382 - 383 - var ( 384 - pipelineId = chi.URLParam(r, "pipeline") 385 - workflow = chi.URLParam(r, "workflow") 386 - ) 387 - if pipelineId == "" || workflow == "" { 388 - http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 389 - return 390 - } 341 + func (p *Pipelines) CancelWorkflow(w http.ResponseWriter, r *http.Request) { 342 + l := p.logger.With("handler", "CancelWorkflow") 343 + errorId := "workflow-error" 391 344 392 345 f, err := p.repoResolver.Resolve(r) 393 346 if err != nil { 394 347 l.Error("failed to get repo and knot", "err", err) 395 - http.Error(w, "bad repo/knot", http.StatusBadRequest) 348 + p.pages.Notice(w, errorId, "Failed to cancel workflow") 396 349 return 397 350 } 351 + l = l.With("repo", f.RepoDid) 398 352 399 - pipeline, err := func() (models.Pipeline, error) { 400 - ps, err := db.GetPipelineStatuses( 401 - p.db, 402 - 1, 403 - orm.FilterEq("p.repo_did", f.RepoDid), 404 - orm.FilterEq("p.id", pipelineId), 405 - ) 406 - if err != nil { 407 - return models.Pipeline{}, err 408 - } 409 - if len(ps) != 1 { 410 - return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 411 - } 412 - return ps[0], nil 413 - }() 353 + if f.Spindle == "" { 354 + l.Debug("spindle is empty") 355 + p.pages.Notice(w, errorId, "Failed to cancel workflow") 356 + return 357 + } 358 + 359 + pipelineId, err := syntax.ParseTID(chi.URLParam(r, "pipeline")) 414 360 if err != nil { 415 - l.Error("pipeline query failed", "err", err) 416 - http.Error(w, "pipeline not found", http.StatusNotFound) 361 + l.Debug("invalid pipeline id", "id", pipelineId) 362 + p.pages.Error404(w) 363 + return 417 364 } 418 - var ( 419 - spindle = f.Spindle 420 - knot = f.Knot 421 - rkey = pipeline.Rkey 422 - ) 423 365 424 - if spindle == "" || knot == "" || rkey == "" { 425 - http.Error(w, "invalid repo info", http.StatusBadRequest) 366 + workflowName := chi.URLParam(r, "workflow") 367 + if workflowName == "" { 368 + l.Debug("empty workflow name") 369 + p.pages.Error404(w) 426 370 return 427 371 } 428 372 429 - hostname, noTLS, err := hostutil.ParseHostname(spindle) 373 + l = l.With("pipeline", pipelineId, "workflow", workflowName) 374 + 375 + hostname, noTLS, err := hostutil.ParseHostname(f.Spindle) 430 376 if err != nil { 431 377 http.Error(w, "invalid spindle hostname", http.StatusBadRequest) 432 378 return ··· 440 386 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 441 387 ) 442 388 443 - err = tangled.PipelineCancelPipeline( 389 + if err := tangled.PipelineCancelPipeline( 444 390 r.Context(), 445 391 spindleClient, 446 392 &tangled.PipelineCancelPipeline_Input{ 447 393 Repo: string(f.RepoAt()), 448 - Pipeline: pipeline.AtUri().String(), 449 - Workflow: workflow, 394 + Pipeline: pipelineId.String(), 395 + Workflow: workflowName, 450 396 }, 451 - ) 452 - errorId := "workflow-error" 453 - if err != nil { 397 + ); err != nil { 454 398 l.Error("failed to cancel workflow", "err", err) 455 399 p.pages.Notice(w, errorId, "Failed to cancel workflow") 456 400 return 457 401 } 458 - l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 402 + l.Debug("canceled workflow") 459 403 }
+12 -9
appview/pipelines/ssh/session.go
··· 1 1 package ssh 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 6 7 tea "github.com/charmbracelet/bubbletea" 7 8 "github.com/charmbracelet/ssh" 8 9 wishtea "github.com/charmbracelet/wish/bubbletea" 9 - "tangled.org/core/appview/db" 10 - "tangled.org/core/orm" 10 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 11 + "tangled.org/core/api/tangled" 11 12 ) 12 13 13 14 func (s *Server) teaHandler(sess ssh.Session) (tea.Model, []tea.ProgramOption) { ··· 29 30 30 31 l = l.With("repoDID", repoDID, "sha", sha) 31 32 32 - pipelines, err := db.GetPipelineStatuses(s.db, 1, 33 - orm.FilterEq("p.repo_did", repoDID), 34 - orm.FilterEq("p.sha", sha), 35 - ) 36 - if err != nil || len(pipelines) == 0 { 33 + spindle := "" 34 + 35 + l = l.With("spindle", spindle) 36 + 37 + xrpcc := indigoxrpc.Client{Host: spindle} 38 + out, err := tangled.CiQueryPipelines(context.TODO(), &xrpcc, []string{sha}, "", 1, repoDID) 39 + if err != nil || len(out.Pipelines) == 0 { 37 40 l.Warn("pipeline not found", "err", err) 38 41 return newErrorModel(renderer, fmt.Sprintf("pipeline not found for repo %s @ %s", repoDID, sha)), wishtea.MakeOptions(sess) 39 42 } 40 43 41 - pipeline := pipelines[0] 42 - l.Info("serving pipeline", "workflows", len(pipeline.Statuses)) 44 + pipeline := out.Pipelines[0] 45 + l.Info("serving pipeline", "pipeline", pipeline.Id, "workflows", len(pipeline.Workflows)) 43 46 pty, _, _ := sess.Pty() 44 47 opts := append(wishtea.MakeOptions(sess), tea.WithAltScreen()) 45 48 return newPipelineModel(renderer, s, pipeline, pty.Window.Width, pty.Window.Height), opts
+20 -15
appview/pulls/list.go
··· 14 14 "tangled.org/core/orm" 15 15 16 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 17 18 ) 18 19 19 20 func (s *Pulls) RepoPulls(w http.ResponseWriter, r *http.Request) { ··· 261 262 stacks = append(stacks, stack) 262 263 } 263 264 264 - ps, err := db.GetPipelineStatuses( 265 - s.db, 266 - len(shas), 267 - orm.FilterEq("p.repo_did", f.RepoDid), 268 - orm.FilterIn("p.sha", shas), 269 - ) 270 - if err != nil { 271 - l.Warn("failed to fetch pipeline statuses", "err", err) 272 - // non-fatal 273 - } 274 - m := make(map[string]models.Pipeline) 275 - for _, p := range ps { 276 - m[p.Sha] = p 277 - } 265 + // commitId -> latest pipeline 266 + pipelines := func(ctx context.Context, shas []string) map[string]tangled.CiDefs_Pipeline { 267 + xrpcc := &indigoxrpc.Client{Host: f.Spindle} 268 + out, err := tangled.CiQueryPipelines(ctx, xrpcc, shas, "", 0, f.RepoDid) 269 + if err != nil { 270 + l.Error("failed to fetch pipelines", "err", err) 271 + } 272 + 273 + m := make(map[string]tangled.CiDefs_Pipeline) 274 + 275 + for _, pipeline := range out.Pipelines { 276 + if pipeline == nil { 277 + continue 278 + } 279 + m[pipeline.Commit] = *pipeline 280 + } 281 + return m 282 + }(r.Context(), shas) 278 283 279 284 labelDefs, err := db.GetLabelDefinitions( 280 285 s.db, ··· 317 322 FilterState: filterState, 318 323 FilterQuery: query.String(), 319 324 Stacks: stacks, 320 - Pipelines: m, 325 + Pipelines: pipelines, 321 326 Page: page, 322 327 PullCount: totalPulls, 323 328 VouchRelationships: vouchRelationships,
+19 -16
appview/pulls/single.go
··· 1 1 package pulls 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 "net/http" 6 7 "strconv" ··· 150 151 // can be nil if this pull is not stacked 151 152 stack, _ := r.Context().Value("stack").(models.Stack) 152 153 153 - m := make(map[string]models.Pipeline) 154 - 155 154 var shas []string 156 155 for _, s := range pull.Submissions { 157 156 shas = append(shas, s.SourceRev) ··· 160 159 shas = append(shas, p.LatestSha()) 161 160 } 162 161 163 - ps, err := db.GetPipelineStatuses( 164 - s.db, 165 - len(shas), 166 - orm.FilterEq("p.repo_did", f.RepoDid), 167 - orm.FilterIn("p.sha", shas), 168 - ) 169 - if err != nil { 170 - l.Error("failed to fetch pipeline statuses", "err", err) 171 - // non-fatal 172 - } 162 + // commitId -> latest pipeline 163 + pipelines := func(ctx context.Context) map[string]tangled.CiDefs_Pipeline { 164 + xrpcc := &indigoxrpc.Client{Host: f.Spindle} 165 + out, err := tangled.CiQueryPipelines(ctx, xrpcc, shas, "", 0, f.RepoDid) 166 + if err != nil { 167 + l.Error("failed to fetch pipelines", "err", err) 168 + } 173 169 174 - for _, p := range ps { 175 - m[p.Sha] = p 176 - } 170 + m := make(map[string]tangled.CiDefs_Pipeline) 171 + 172 + for _, pipeline := range out.Pipelines { 173 + if pipeline == nil { 174 + continue 175 + } 176 + m[pipeline.Commit] = *pipeline 177 + } 178 + return m 179 + }(r.Context()) 177 180 178 181 entities := []syntax.ATURI{pull.AtUri()} 179 182 for _, s := range pull.Submissions { ··· 257 260 BranchDeleteStatus: nil, 258 261 MergeCheck: types.MergeCheckResponse{}, 259 262 ResubmitCheck: pages.Unknown, 260 - Pipelines: m, 263 + Pipelines: pipelines, 261 264 Diff: diff, 262 265 DiffOpts: diffOpts, 263 266 ActiveRound: roundIdInt,
+1 -1
appview/repo/index.go
··· 164 164 for _, c := range commitsTrunc { 165 165 shas = append(shas, c.Hash.String()) 166 166 } 167 - pipelines, err := getPipelineStatuses(rp.db, f, shas) 167 + pipelines, err := getPipelineStatuses(r.Context(), f, shas) 168 168 if err != nil { 169 169 l.Error("failed to fetch pipeline statuses", "err", err) 170 170 // non-fatal
+4 -5
appview/repo/log.go
··· 11 11 "tangled.org/core/api/tangled" 12 12 "tangled.org/core/appview/commitverify" 13 13 "tangled.org/core/appview/db" 14 - "tangled.org/core/appview/models" 15 14 "tangled.org/core/appview/pages" 16 15 xrpcclient "tangled.org/core/appview/xrpcclient" 17 16 "tangled.org/core/types" ··· 178 177 for _, c := range xrpcResp.Commits { 179 178 shas = append(shas, c.Hash.String()) 180 179 } 181 - pipelines, err := getPipelineStatuses(rp.db, f, shas) 180 + pipelines, err := getPipelineStatuses(r.Context(), f, shas) 182 181 if err != nil { 183 182 l.Error("failed to getPipelineStatuses", "err", err) 184 183 // non-fatal ··· 250 249 } 251 250 252 251 user := rp.oauth.GetMultiAccountUser(r) 253 - pipelines, err := getPipelineStatuses(rp.db, f, []string{result.Diff.Commit.This}) 252 + pipelines, err := getPipelineStatuses(r.Context(), f, []string{result.Diff.Commit.This}) 254 253 if err != nil { 255 254 l.Error("failed to getPipelineStatuses", "err", err) 256 255 // non-fatal 257 256 } 258 - var pipeline *models.Pipeline 257 + var pipeline *tangled.CiDefs_Pipeline 259 258 if p, ok := pipelines[result.Diff.Commit.This]; ok { 260 - pipeline = &p 259 + pipeline = p 261 260 } 262 261 263 262 rp.pages.RepoCommit(w, pages.RepoCommitParams{
+27 -48
appview/repo/repo.go
··· 29 29 "tangled.org/core/appview/validator" 30 30 xrpcclient "tangled.org/core/appview/xrpcclient" 31 31 "tangled.org/core/consts" 32 - "tangled.org/core/eventconsumer" 33 32 "tangled.org/core/idresolver" 34 33 "tangled.org/core/ogre" 35 34 "tangled.org/core/orm" ··· 46 45 ) 47 46 48 47 type Repo struct { 49 - repoResolver *reporesolver.RepoResolver 50 - idResolver *idresolver.Resolver 51 - config *config.Config 52 - oauth *oauth.OAuth 53 - pages *pages.Pages 54 - spindlestream *eventconsumer.Consumer 55 - db *db.DB 56 - enforcer *rbac.Enforcer 57 - acl *knotacl.Service 58 - notifier notify.Notifier 59 - logger *slog.Logger 60 - serviceAuth *serviceauth.ServiceAuth 61 - validator *validator.Validator 62 - cfClient *cloudflare.Client 63 - ogreClient *ogre.Client 48 + repoResolver *reporesolver.RepoResolver 49 + idResolver *idresolver.Resolver 50 + config *config.Config 51 + oauth *oauth.OAuth 52 + pages *pages.Pages 53 + db *db.DB 54 + enforcer *rbac.Enforcer 55 + acl *knotacl.Service 56 + notifier notify.Notifier 57 + logger *slog.Logger 58 + serviceAuth *serviceauth.ServiceAuth 59 + validator *validator.Validator 60 + cfClient *cloudflare.Client 61 + ogreClient *ogre.Client 64 62 } 65 63 66 64 func New( 67 65 oauth *oauth.OAuth, 68 66 repoResolver *reporesolver.RepoResolver, 69 67 pages *pages.Pages, 70 - spindlestream *eventconsumer.Consumer, 71 68 idResolver *idresolver.Resolver, 72 69 db *db.DB, 73 70 config *config.Config, ··· 79 76 cfClient *cloudflare.Client, 80 77 ) *Repo { 81 78 return &Repo{ 82 - oauth: oauth, 83 - repoResolver: repoResolver, 84 - pages: pages, 85 - idResolver: idResolver, 86 - config: config, 87 - spindlestream: spindlestream, 88 - db: db, 89 - notifier: notifier, 90 - enforcer: enforcer, 91 - acl: acl, 92 - logger: logger, 93 - validator: validator, 94 - cfClient: cfClient, 95 - ogreClient: ogre.NewClient(config.Ogre.Host), 79 + oauth: oauth, 80 + repoResolver: repoResolver, 81 + pages: pages, 82 + idResolver: idResolver, 83 + config: config, 84 + db: db, 85 + notifier: notifier, 86 + enforcer: enforcer, 87 + acl: acl, 88 + logger: logger, 89 + validator: validator, 90 + cfClient: cfClient, 91 + ogreClient: ogre.NewClient(config.Ogre.Host), 96 92 } 97 93 } 98 94 ··· 171 167 if err != nil { 172 168 fail("Failed to update spindle, unable to save to PDS.", err) 173 169 return 174 - } 175 - 176 - oldSpindle := f.Spindle 177 - if oldSpindle != "" && oldSpindle != newSpindle { 178 - remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle)) 179 - if qErr != nil { 180 - l.Warn("failed to count repos using old spindle", "err", qErr) 181 - } else if len(remaining) == 0 { 182 - rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle)) 183 - } 184 - } 185 - 186 - if !removingSpindle { 187 - rp.spindlestream.AddSource( 188 - context.Background(), 189 - eventconsumer.NewSpindleSource(newSpindle), 190 - ) 191 170 } 192 171 193 172 rp.pages.HxRefresh(w)
+10 -13
appview/repo/repo_util.go
··· 1 1 package repo 2 2 3 3 import ( 4 + "context" 4 5 "maps" 5 6 "slices" 6 7 "sort" 7 8 "strings" 8 9 9 - "tangled.org/core/appview/db" 10 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 11 + "tangled.org/core/api/tangled" 10 12 "tangled.org/core/appview/models" 11 - "tangled.org/core/orm" 12 13 "tangled.org/core/types" 13 14 ) 14 15 ··· 90 91 // 91 92 // golang is so blessed that it requires 35 lines of imperative code for this 92 93 func getPipelineStatuses( 93 - d *db.DB, 94 + ctx context.Context, 94 95 repo *models.Repo, 95 96 shas []string, 96 - ) (map[string]models.Pipeline, error) { 97 - m := make(map[string]models.Pipeline) 97 + ) (map[string]*tangled.CiDefs_Pipeline, error) { 98 + m := make(map[string]*tangled.CiDefs_Pipeline) 98 99 99 100 if len(shas) == 0 { 100 101 return m, nil 101 102 } 102 103 103 - ps, err := db.GetPipelineStatuses( 104 - d, 105 - len(shas), 106 - orm.FilterEq("p.repo_did", repo.RepoDid), 107 - orm.FilterIn("p.sha", shas), 108 - ) 104 + xrpcc := &indigoxrpc.Client{Host: repo.Spindle} 105 + out, err := tangled.CiQueryPipelines(ctx, xrpcc, shas, "", 0, repo.RepoDid) 109 106 if err != nil { 110 107 return nil, err 111 108 } 112 109 113 - for _, p := range ps { 114 - m[p.Sha] = p 110 + for _, p := range out.Pipelines { 111 + m[p.Commit] = p 115 112 } 116 113 117 114 return m, nil
+2 -2
appview/state/router.go
··· 147 147 r := chi.NewRouter() 148 148 r.Use(mw.InjectBaseParams) 149 149 150 + // TODO: workflow status update requests (30s polling) 151 + 150 152 r.With(mw.ResolveIdent()).Route("/{user}", func(r chi.Router) { 151 153 r.Get("/", s.Profile) 152 154 r.Get("/feed.atom", s.AtomFeedPage) ··· 402 404 s.oauth, 403 405 s.repoResolver, 404 406 s.pages, 405 - s.spindlestream, 406 407 s.idResolver, 407 408 s.db, 408 409 s.config, ··· 421 422 s.oauth, 422 423 s.repoResolver, 423 424 s.pages, 424 - s.spindlestream, 425 425 s.pipelineNotifier, 426 426 s.idResolver, 427 427 s.db,
-182
appview/state/spindlestream.go
··· 1 - package state 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "strings" 8 - "time" 9 - 10 - "github.com/bluesky-social/indigo/atproto/syntax" 11 - "tangled.org/core/api/tangled" 12 - "tangled.org/core/appview/config" 13 - "tangled.org/core/appview/db" 14 - "tangled.org/core/appview/models" 15 - "tangled.org/core/appview/pipelines" 16 - ec "tangled.org/core/eventconsumer" 17 - "tangled.org/core/eventstream" 18 - "tangled.org/core/log" 19 - "tangled.org/core/orm" 20 - "tangled.org/core/rbac" 21 - spindle "tangled.org/core/spindle/models" 22 - "tangled.org/core/workflow" 23 - ) 24 - 25 - func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) { 26 - spindles, err := db.GetSpindles(ctx, d, orm.FilterIsNot("verified", "null")) 27 - if err != nil { 28 - return nil, err 29 - } 30 - 31 - hosts := make([]string, len(spindles)) 32 - for i, s := range spindles { 33 - hosts[i] = s.Instance 34 - } 35 - 36 - return bootstrapStream( 37 - ctx, "spindlestream", ec.KindSpindle, hosts, c.Redis.Addr, 38 - c.Spindlestream, 39 - spindleIngester(d, pn), 40 - ), nil 41 - } 42 - 43 - func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 44 - return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 45 - switch msg.Nsid { 46 - case tangled.PipelineNSID: 47 - return ingestPipeline(ctx, d, source, msg) 48 - case tangled.PipelineStatusNSID: 49 - return ingestPipelineStatus(ctx, d, pn, source, msg) 50 - } 51 - return nil 52 - } 53 - } 54 - 55 - func ingestPipeline(ctx context.Context, d *db.DB, source ec.Source, msg eventstream.Event) error { 56 - l := log.FromContext(ctx) 57 - 58 - var record tangled.Pipeline 59 - if err := json.Unmarshal(msg.EventJson, &record); err != nil { 60 - return fmt.Errorf("unmarshal pipeline: %w", err) 61 - } 62 - 63 - if record.TriggerMetadata == nil { 64 - return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 65 - } 66 - 67 - if record.TriggerMetadata.Repo == nil { 68 - return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 69 - } 70 - 71 - repoName := "" 72 - if record.TriggerMetadata.Repo.Repo != nil { 73 - repoName = *record.TriggerMetadata.Repo.Repo 74 - } 75 - 76 - repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 77 - if lookupErr != nil { 78 - return fmt.Errorf("failed to look up repo: %w", lookupErr) 79 - } 80 - if repo.Spindle == "" { 81 - return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 82 - } 83 - 84 - // trigger info 85 - var trigger models.Trigger 86 - var sha string 87 - trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 88 - switch trigger.Kind { 89 - case workflow.TriggerKindPush: 90 - trigger.PushRef = &record.TriggerMetadata.Push.Ref 91 - trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 92 - trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 93 - sha = *trigger.PushNewSha 94 - case workflow.TriggerKindPullRequest: 95 - trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 96 - trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 97 - trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 98 - trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 99 - sha = *trigger.PRSourceSha 100 - } 101 - 102 - tx, err := d.Begin() 103 - if err != nil { 104 - return fmt.Errorf("failed to start txn: %w", err) 105 - } 106 - 107 - triggerId, err := db.AddTrigger(tx, trigger) 108 - if err != nil { 109 - return fmt.Errorf("failed to add trigger entry: %w", err) 110 - } 111 - 112 - // TODO: we shouldn't even use knot to identify pipelines 113 - knot := record.TriggerMetadata.Repo.Knot 114 - pipeline := models.Pipeline{ 115 - Rkey: msg.Rkey, 116 - Knot: knot, 117 - RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 118 - RepoName: repoName, 119 - RepoDid: repo.RepoDid, 120 - TriggerId: int(triggerId), 121 - Sha: sha, 122 - } 123 - 124 - err = db.AddPipeline(tx, pipeline) 125 - if err != nil { 126 - return fmt.Errorf("failed to add pipeline: %w", err) 127 - } 128 - 129 - err = tx.Commit() 130 - if err != nil { 131 - return fmt.Errorf("failed to commit txn: %w", err) 132 - } 133 - 134 - l.Info("added pipeline", "pipeline", pipeline) 135 - 136 - return nil 137 - } 138 - 139 - func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error { 140 - var record tangled.PipelineStatus 141 - err := json.Unmarshal(msg.EventJson, &record) 142 - if err != nil { 143 - return err 144 - } 145 - 146 - pipelineUri, err := syntax.ParseATURI(record.Pipeline) 147 - if err != nil { 148 - return err 149 - } 150 - 151 - exitCode := 0 152 - if record.ExitCode != nil { 153 - exitCode = int(*record.ExitCode) 154 - } 155 - 156 - // pick the record creation time if possible, or use time.Now 157 - created := time.Now() 158 - if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 159 - created = t 160 - } 161 - 162 - status := models.PipelineStatus{ 163 - Spindle: source.Host, 164 - Rkey: msg.Rkey, 165 - PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 166 - PipelineRkey: pipelineUri.RecordKey().String(), 167 - Created: created, 168 - Workflow: record.Workflow, 169 - Status: spindle.StatusKind(record.Status), 170 - Error: record.Error, 171 - ExitCode: exitCode, 172 - } 173 - 174 - err = db.AddPipelineStatus(ctx, d, status) 175 - if err != nil { 176 - return fmt.Errorf("failed to add pipeline status: %w", err) 177 - } 178 - 179 - pn.Publish(pipelineUri) 180 - 181 - return nil 182 - }
-144
appview/state/spindlestream_test.go
··· 1 - package state 2 - 3 - import ( 4 - "context" 5 - "io" 6 - "log/slog" 7 - "net/http" 8 - "net/http/httptest" 9 - "path/filepath" 10 - "strings" 11 - "testing" 12 - "time" 13 - 14 - "tangled.org/core/appview/db" 15 - "tangled.org/core/appview/pipelines" 16 - ec "tangled.org/core/eventconsumer" 17 - "tangled.org/core/eventconsumer/cursor" 18 - "tangled.org/core/eventstream" 19 - "tangled.org/core/notifier" 20 - spindledb "tangled.org/core/spindle/db" 21 - spindlemodels "tangled.org/core/spindle/models" 22 - ) 23 - 24 - func TestColdStart_SpindleEventsRebuildPipelineStatuses(t *testing.T) { 25 - ctx := t.Context() 26 - 27 - spindleDB, err := spindledb.Make(ctx, filepath.Join(t.TempDir(), "spindle.db")) 28 - if err != nil { 29 - t.Fatalf("spindle Make: %v", err) 30 - } 31 - t.Cleanup(func() { spindleDB.Close() }) 32 - 33 - n := notifier.New() 34 - workflowId := spindlemodels.WorkflowId{ 35 - PipelineId: spindlemodels.PipelineId{Knot: "knot.boltless.example", Rkey: "pipeline-rk1"}, 36 - Name: "build", 37 - } 38 - for _, step := range []func() error{ 39 - func() error { return spindleDB.StatusPending(workflowId, &n) }, 40 - func() error { return spindleDB.StatusRunning(workflowId, &n) }, 41 - func() error { return spindleDB.StatusSuccess(workflowId, &n) }, 42 - } { 43 - if err := step(); err != nil { 44 - t.Fatalf("seed spindle event: %v", err) 45 - } 46 - } 47 - 48 - mux := http.NewServeMux() 49 - mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 50 - _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 51 - Backend: spindleDB, 52 - Notifier: &n, 53 - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 54 - }) 55 - }) 56 - srv := httptest.NewServer(mux) 57 - t.Cleanup(srv.Close) 58 - source := ec.Source{Kind: "test", Host: strings.TrimPrefix(srv.URL, "http://"), NoTLS: true} 59 - 60 - appviewDB, err := db.Make(ctx, filepath.Join(t.TempDir(), "appview.db")) 61 - if err != nil { 62 - t.Fatalf("appview Make: %v", err) 63 - } 64 - t.Cleanup(func() { appviewDB.Close() }) 65 - 66 - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 67 - processFunc := spindleIngester(appviewDB, pipelines.NewStatusNotifier()) 68 - 69 - cfg := ec.ConsumerConfig{ 70 - ProcessFunc: processFunc, 71 - WorkerCount: 1, 72 - QueueSize: 16, 73 - ConnectionTimeout: 2 * time.Second, 74 - CursorStore: &cursor.MemoryStore{}, 75 - Logger: logger, 76 - } 77 - c := ec.NewConsumer(cfg) 78 - 79 - consumerCtx, cancel := context.WithCancel(ctx) 80 - defer cancel() 81 - c.Start(consumerCtx) 82 - c.AddSource(consumerCtx, source) 83 - 84 - deadline := time.Now().Add(3 * time.Second) 85 - for time.Now().Before(deadline) { 86 - var n int 87 - if err := appviewDB.QueryRow(`select count(*) from pipeline_statuses`).Scan(&n); err != nil { 88 - t.Fatalf("count: %v", err) 89 - } 90 - if n >= 3 { 91 - break 92 - } 93 - time.Sleep(20 * time.Millisecond) 94 - } 95 - 96 - rows, err := appviewDB.Query(` 97 - select spindle, pipeline_knot, pipeline_rkey, workflow, status 98 - from pipeline_statuses 99 - order by created asc 100 - `) 101 - if err != nil { 102 - t.Fatalf("query: %v", err) 103 - } 104 - defer rows.Close() 105 - 106 - type rec struct { 107 - spindle, knot, rkey, workflow, status string 108 - } 109 - var got []rec 110 - for rows.Next() { 111 - var r rec 112 - if err := rows.Scan(&r.spindle, &r.knot, &r.rkey, &r.workflow, &r.status); err != nil { 113 - t.Fatalf("scan: %v", err) 114 - } 115 - got = append(got, r) 116 - } 117 - 118 - if len(got) != 3 { 119 - t.Fatalf("pipeline_statuses rows = %d, want 3: %+v", len(got), got) 120 - } 121 - 122 - wantStatuses := []string{"pending", "running", "success"} 123 - gotStatuses := map[string]bool{} 124 - for _, r := range got { 125 - gotStatuses[r.status] = true 126 - if r.spindle != source.Host { 127 - t.Errorf("spindle = %q, want %q", r.spindle, source.Host) 128 - } 129 - if r.knot != workflowId.Knot { 130 - t.Errorf("pipeline_knot = %q, want %q", r.knot, workflowId.Knot) 131 - } 132 - if r.rkey != workflowId.Rkey { 133 - t.Errorf("pipeline_rkey = %q, want %q", r.rkey, workflowId.Rkey) 134 - } 135 - if r.workflow != workflowId.Name { 136 - t.Errorf("workflow = %q, want %q", r.workflow, workflowId.Name) 137 - } 138 - } 139 - for _, want := range wantStatuses { 140 - if !gotStatuses[want] { 141 - t.Errorf("missing status %q in projection", want) 142 - } 143 - } 144 - }
+2 -8
appview/state/state.go
··· 71 71 repoResolver *reporesolver.RepoResolver 72 72 aclService *knotacl.Service 73 73 knotstream *eventconsumer.Consumer 74 - spindlestream *eventconsumer.Consumer 75 74 pipelineNotifier *pipelines.StatusNotifier 76 75 logger *slog.Logger 77 76 validator *validator.Validator ··· 221 220 } 222 221 knotstream.Start(ctx) 223 222 223 + // TODO: pipeline notifier serves as pipeline store with cache to prevent spamming spindles 224 + // maybe, not super sure... 224 225 pipelineNotifier := pipelines.NewStatusNotifier() 225 - 226 - spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 227 - if err != nil { 228 - return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 229 - } 230 - spindlestream.Start(ctx) 231 226 232 227 state := &State{ 233 228 db: d, ··· 245 240 repoResolver: repoResolver, 246 241 aclService: aclService, 247 242 knotstream: knotstream, 248 - spindlestream: spindlestream, 249 243 pipelineNotifier: pipelineNotifier, 250 244 logger: logger, 251 245 validator: validator,