Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

appview/db: use narrower query to get pipeline statuses

we still ingest any and all pipeline statuses events.

Signed-off-by: oppiliappan <me@oppi.li>

author
oppiliappan
committer
Tangled
date (May 20, 2026, 11:53 AM +0300) commit 3d29a2ec parent 18fcaf49 change-id rtmmpruw
+135 -7
+19 -7
appview/db/pipeline.go
··· 282 282 // get all statuses 283 283 // the where clause here is of the form: 284 284 // 285 - // where (pipeline_knot = k1 and pipeline_rkey = r1) 286 - // or (pipeline_knot = k2 and pipeline_rkey = r2) 285 + // and ( 286 + // (ps.pipeline_knot = k1 and ps.pipeline_rkey = r1) 287 + // or (ps.pipeline_knot = k2 and ps.pipeline_rkey = r2) 288 + // ) 289 + // 290 + // the join on pipelines and repos enforces that the status was emitted 291 + // by the spindle that is actually registered for the pipeline's repo. 287 292 conditions = nil 288 293 args = nil 289 294 for _, p := range pipelines { 290 - knotFilter := orm.FilterEq("pipeline_knot", p.Knot) 291 - rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey) 295 + knotFilter := orm.FilterEq("ps.pipeline_knot", p.Knot) 296 + rkeyFilter := orm.FilterEq("ps.pipeline_rkey", p.Rkey) 292 297 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 293 298 args = append(args, p.Knot) 294 299 args = append(args, p.Rkey) 295 300 } 296 301 whereClause = "" 297 302 if conditions != nil { 298 - whereClause = "where " + strings.Join(conditions, " or ") 303 + whereClause = "and (" + strings.Join(conditions, " or ") + ")" 299 304 } 300 305 query = fmt.Sprintf(` 301 306 select 302 - id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 307 + ps.id, ps.spindle, ps.rkey, ps.pipeline_knot, ps.pipeline_rkey, 308 + ps.created, ps.workflow, ps.status, ps.error, ps.exit_code 303 309 from 304 - pipeline_statuses 310 + pipeline_statuses ps 311 + join 312 + pipelines p on p.knot = ps.pipeline_knot and p.rkey = ps.pipeline_rkey 313 + join 314 + repos r on r.repo_did = p.repo_did 315 + where 316 + ps.spindle = r.spindle 305 317 %s 306 318 `, whereClause) 307 319
+116
appview/db/pipeline_test.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "strings" 6 + "testing" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/appview/models" 11 + "tangled.org/core/orm" 12 + spindle "tangled.org/core/spindle/models" 13 + "tangled.org/core/workflow" 14 + ) 15 + 16 + // seedPipeline inserts a trigger + pipeline row and returns the pipeline. 17 + func seedPipeline(t *testing.T, d *DB, knot, rkey, repoDid string) models.Pipeline { 18 + t.Helper() 19 + sha := strings.Repeat("a", 40) 20 + ref := "refs/heads/main" 21 + newSha := sha 22 + oldSha := strings.Repeat("0", 40) 23 + trigger := models.Trigger{ 24 + Kind: workflow.TriggerKindPush, 25 + PushRef: &ref, 26 + PushNewSha: &newSha, 27 + PushOldSha: &oldSha, 28 + } 29 + tx, err := d.Begin() 30 + if err != nil { 31 + t.Fatalf("Begin: %v", err) 32 + } 33 + triggerID, err := AddTrigger(tx, trigger) 34 + if err != nil { 35 + tx.Rollback() 36 + t.Fatalf("AddTrigger: %v", err) 37 + } 38 + pipeline := models.Pipeline{ 39 + Knot: knot, 40 + Rkey: rkey, 41 + RepoOwner: syntax.DID("did:plc:owner"), 42 + RepoName: "repo", 43 + RepoDid: repoDid, 44 + TriggerId: int(triggerID), 45 + Sha: sha, 46 + } 47 + if err := AddPipeline(tx, pipeline); err != nil { 48 + tx.Rollback() 49 + t.Fatalf("AddPipeline: %v", err) 50 + } 51 + if err := tx.Commit(); err != nil { 52 + t.Fatalf("Commit: %v", err) 53 + } 54 + return pipeline 55 + } 56 + 57 + // seedStatus inserts a pipeline_status row directly. 58 + func seedStatus(t *testing.T, d *DB, spindleInstance, rkey, pipelineKnot, pipelineRkey, workflow string) { 59 + t.Helper() 60 + status := models.PipelineStatus{ 61 + Spindle: spindleInstance, 62 + Rkey: rkey, 63 + PipelineKnot: pipelineKnot, 64 + PipelineRkey: pipelineRkey, 65 + Workflow: workflow, 66 + Status: spindle.StatusKindSuccess, 67 + Created: time.Now(), 68 + } 69 + if err := AddPipelineStatus(context.Background(), d, status); err != nil { 70 + t.Fatalf("AddPipelineStatus: %v", err) 71 + } 72 + } 73 + 74 + // TestGetPipelineStatuses_SpindleValidation verifies that GetPipelineStatuses 75 + // only returns statuses emitted by the spindle registered for the pipeline's 76 + // repo, and silently drops statuses from a rogue spindle. 77 + func TestGetPipelineStatuses_SpindleValidation(t *testing.T) { 78 + d := newTestDB(t) 79 + 80 + const ( 81 + knot = "knot.example.com" 82 + correctSpindle = "spindle.example.com" 83 + rogueSpindle = "evil.example.com" 84 + repoDid = "did:plc:testrepo" 85 + pipelineRkey = "pipeline1" 86 + ) 87 + 88 + // seed repo with the correct spindle 89 + repo := seedRepo(t, d, "did:plc:owner", knot, "repo", "repo", repoDid) 90 + if err := UpdateSpindle(d, repo.RepoDid, &[]string{correctSpindle}[0]); err != nil { 91 + t.Fatalf("UpdateSpindle: %v", err) 92 + } 93 + 94 + // seed the pipeline for this repo 95 + seedPipeline(t, d, knot, pipelineRkey, repoDid) 96 + 97 + // insert one status from the correct spindle, one from a rogue spindle 98 + seedStatus(t, d, correctSpindle, "status-valid", knot, pipelineRkey, "build") 99 + seedStatus(t, d, rogueSpindle, "status-rogue", knot, pipelineRkey, "build") 100 + 101 + pipelines, err := GetPipelineStatuses(d, 10, orm.FilterEq("p.repo_did", repoDid)) 102 + if err != nil { 103 + t.Fatalf("GetPipelineStatuses: %v", err) 104 + } 105 + if len(pipelines) != 1 { 106 + t.Fatalf("expected 1 pipeline, got %d", len(pipelines)) 107 + } 108 + 109 + statuses := pipelines[0].Statuses["build"].Data 110 + if len(statuses) != 1 { 111 + t.Fatalf("expected 1 status (from correct spindle), got %d", len(statuses)) 112 + } 113 + if statuses[0].Spindle != correctSpindle { 114 + t.Errorf("expected spindle %q, got %q", correctSpindle, statuses[0].Spindle) 115 + } 116 + }