Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "log/slog" 8 "testing" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 13 "go.mitchellh.com/tack/internal/k8s" 14) 15 16func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) { 17 t.Helper() 18 st := newTestStore(t) 19 br := newBroker(st) 20 client := k8s.NewFakeClient() 21 p := newTektonProvider(br, st, client, "ci", slog.Default()) 22 return p, st, br, client 23} 24 25func TestTektonWorkflowConfig(t *testing.T) { 26 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n" 27 cfg, err := parseTektonWorkflowConfig(raw) 28 if err != nil { 29 t.Fatalf("parse: %v", err) 30 } 31 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" { 32 t.Fatalf("cfg mismatch: %+v", cfg) 33 } 34 if got := cfg.Params["image"]; got != "example/app" { 35 t.Fatalf("params[image] = %q", got) 36 } 37 38 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil { 39 t.Fatal("missing pipeline should fail") 40 } 41} 42 43func TestTektonBuildPipelineRun(t *testing.T) { 44 cfg := &tektonWorkflowConfig{ 45 Pipeline: "repo-ci", 46 ServiceAccount: "runner", 47 Params: map[string]string{ 48 "image": "example/app", 49 }, 50 } 51 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main") 52 if len(name) > 63 || name == "" { 53 t.Fatalf("bad generated name: %q", name) 54 } 55 56 obj := buildTektonPipelineRun("ci", name, cfg, 57 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main", 58 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 59 ) 60 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind { 61 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind()) 62 } 63 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 64 if !ok || pipeline != "repo-ci" { 65 t.Fatalf("pipelineRef.name = %q", pipeline) 66 } 67 sa, ok := obj.NestedString("spec", "serviceAccountName") 68 if !ok || sa != "runner" { 69 t.Fatalf("serviceAccountName = %q", sa) 70 } 71 params, ok := obj.NestedSlice("spec", "params") 72 if !ok || len(params) != 1 { 73 t.Fatalf("params = %+v", params) 74 } 75 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" || 76 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" { 77 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations()) 78 } 79} 80 81func TestTektonStatusMapping(t *testing.T) { 82 tests := []struct { 83 name string 84 cond string 85 reason string 86 status string 87 terminal bool 88 ok bool 89 }{ 90 {name: "unknown", cond: "Unknown", status: "running", ok: true}, 91 {name: "success", cond: "True", status: "success", terminal: true, ok: true}, 92 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true}, 93 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true}, 94 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true}, 95 } 96 for _, tt := range tests { 97 t.Run(tt.name, func(t *testing.T) { 98 obj := tektonStatusObject(tt.cond, tt.reason) 99 status, terminal, ok := mapTektonPipelineRunStatus(obj) 100 if status != tt.status || terminal != tt.terminal || ok != tt.ok { 101 t.Fatalf("got %q/%v/%v; want %q/%v/%v", 102 status, terminal, ok, tt.status, tt.terminal, tt.ok) 103 } 104 }) 105 } 106} 107 108func TestTektonSpawnCreatesPipelineRun(t *testing.T) { 109 p, st, _, client := newTektonTestProvider(t) 110 ctx, cancel := context.WithCancel(context.Background()) 111 defer cancel() 112 113 trigger := &tangled.Pipeline_TriggerMetadata{ 114 Push: &tangled.Pipeline_PushTriggerData{ 115 NewSha: "abcdef0123", 116 Ref: "refs/heads/main", 117 }, 118 } 119 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger, 120 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 121 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 122 ) 123 124 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 125 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" { 126 t.Fatalf("ref mismatch: %+v", ref) 127 } 128 129 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName) 130 if err != nil { 131 t.Fatalf("get PipelineRun: %v", err) 132 } 133 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 134 if !ok || pipeline != "repo-ci" { 135 t.Fatalf("pipelineRef.name = %q", pipeline) 136 } 137 138 rows, err := st.EventsAfter(context.Background(), 0) 139 if err != nil { 140 t.Fatalf("EventsAfter: %v", err) 141 } 142 if len(rows) != 1 { 143 t.Fatalf("got %d events, want 1", len(rows)) 144 } 145 var rec tangled.PipelineStatus 146 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 147 t.Fatalf("decode status: %v", err) 148 } 149 if rec.Status != "pending" || rec.Workflow != "ci.yml" { 150 t.Fatalf("bad pending status: %+v", rec) 151 } 152} 153 154func TestTektonSpawnAlreadyExists(t *testing.T) { 155 p, st, _, client := newTektonTestProvider(t) 156 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main") 157 existing := buildTektonPipelineRun("ci", name, 158 &tektonWorkflowConfig{Pipeline: "repo-ci"}, 159 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main", 160 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 161 ) 162 existing.SetUID("uid-1") 163 client.SeedObject(pipelineRunsGVR, "ci", existing) 164 165 ctx, cancel := context.WithCancel(context.Background()) 166 defer cancel() 167 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", 168 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{ 169 NewSha: "abcdef0123", 170 Ref: "refs/heads/main", 171 }}, 172 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 173 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 174 ) 175 176 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 177 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" { 178 t.Fatalf("ref mismatch: %+v", ref) 179 } 180} 181 182func TestTektonLogsLookup(t *testing.T) { 183 p, st, _, client := newTektonTestProvider(t) 184 ctx := context.Background() 185 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) { 186 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err) 187 } 188 ref := TektonRunRef{ 189 Knot: "knot.example.com", 190 PipelineRkey: "rkey-1", 191 Workflow: "ci.yml", 192 Namespace: "ci", 193 PipelineRunName: "run-1", 194 PipelineRunUID: "uid-1", 195 PipelineName: "repo-ci", 196 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"), 197 } 198 if err := st.InsertTektonRun(ctx, ref); err != nil { 199 t.Fatalf("insert ref: %v", err) 200 } 201 // With the mapping in place but no TaskRuns yet, Logs must NOT 202 // return ErrLogsNotFound: the workflow has been spawned and is 203 // just queueing inside Tekton. Surfacing 404 here mistranslates 204 // "still scheduling" as "doesn't exist" at the HTTP layer (see 205 // http.go's /logs handler). Verify we get an open channel that 206 // stays open until ctx is cancelled. 207 { 208 waitCtx, cancel := context.WithCancel(ctx) 209 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml") 210 if err != nil { 211 cancel() 212 t.Fatalf("logs before TaskRuns err = %v; want nil", err) 213 } 214 if ch == nil { 215 cancel() 216 t.Fatalf("logs before TaskRuns: nil channel") 217 } 218 // Channel must not produce any frames or close before we 219 // cancel. A premature close would mean the goroutine treated 220 // "no TaskRuns yet" as "stream done", which is what we just 221 // fixed. 222 select { 223 case line, ok := <-ch: 224 cancel() 225 if !ok { 226 t.Fatalf("logs channel closed before TaskRuns appeared") 227 } 228 t.Fatalf("unexpected frame before TaskRuns: %+v", line) 229 case <-time.After(50 * time.Millisecond): 230 } 231 cancel() 232 // After cancellation the producer goroutine must close the 233 // channel and not strand any frames. 234 for line := range ch { 235 t.Fatalf("unexpected frame after cancel: %+v", line) 236 } 237 } 238 239 // Seed a terminal PipelineRun so Logs takes the snapshot path 240 // (fetchCompletedTaskRunLogs, Follow=false) and closes the 241 // channel after draining the seeded TaskRun. The non-terminal 242 // path follows pod logs live and would only EOF on ctx cancel, 243 // which is not what this assertion is exercising. 244 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{ 245 "apiVersion": "tekton.dev/v1", 246 "kind": "PipelineRun", 247 "metadata": map[string]any{ 248 "name": "run-1", 249 "namespace": "ci", 250 }, 251 "status": map[string]any{ 252 "conditions": []any{map[string]any{ 253 "type": "Succeeded", 254 "status": "True", 255 }}, 256 }, 257 }) 258 client.SeedObject(taskRunsGVR, "ci", k8s.Object{ 259 "apiVersion": "tekton.dev/v1", 260 "kind": "TaskRun", 261 "metadata": map[string]any{ 262 "name": "task-1", 263 "namespace": "ci", 264 "labels": map[string]any{ 265 "tekton.dev/pipelineRun": "run-1", 266 }, 267 }, 268 }) 269 client.SeedPod("ci", k8s.Pod{ 270 Name: "pod-1", 271 Namespace: "ci", 272 Labels: map[string]string{ 273 "tekton.dev/taskRun": "task-1", 274 }, 275 Containers: []k8s.Container{{Name: "step-test"}}, 276 }) 277 client.SetPodLog("ci", "pod-1", "step-test", "hello\n") 278 279 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml") 280 if err != nil { 281 t.Fatalf("Logs after pods: %v", err) 282 } 283 var got []LogLine 284 for line := range ch { 285 got = append(got, line) 286 } 287 if len(got) < 2 || got[0].StepStatus != StepStatusStart || 288 got[len(got)-1].StepStatus != StepStatusEnd { 289 t.Fatalf("log frames = %+v", got) 290 } 291} 292 293func tektonStatusObject(condStatus, reason string) k8s.Object { 294 return k8s.Object{ 295 "status": map[string]any{ 296 "conditions": []any{map[string]any{ 297 "type": "Succeeded", 298 "status": condStatus, 299 "reason": reason, 300 }}, 301 }, 302 } 303} 304 305func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef { 306 t.Helper() 307 deadline := time.Now().Add(2 * time.Second) 308 for time.Now().Before(deadline) { 309 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow) 310 if err != nil { 311 t.Fatalf("lookup: %v", err) 312 } 313 if ref != nil { 314 return ref 315 } 316 time.Sleep(20 * time.Millisecond) 317 } 318 t.Fatal("tekton run row not persisted within deadline") 319 return nil 320}