Stitch any CI into Tangled
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "log/slog"
8 "strings"
9 "testing"
10 "time"
11
12 "tangled.org/core/api/tangled"
13
14 "go.mitchellh.com/tack/internal/k8s"
15)
16
17func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) {
18 t.Helper()
19 st := newTestStore(t)
20 br := newBroker(st)
21 client := k8s.NewFakeClient()
22 p := newTektonProvider(br, st, client, "ci", slog.Default())
23 return p, st, br, client
24}
25
26func TestTektonWorkflowConfig(t *testing.T) {
27 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n"
28 cfg, err := parseTektonWorkflowConfig(raw)
29 if err != nil {
30 t.Fatalf("parse: %v", err)
31 }
32 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" {
33 t.Fatalf("cfg mismatch: %+v", cfg)
34 }
35 if got := cfg.Params["image"]; got != "example/app" {
36 t.Fatalf("params[image] = %q", got)
37 }
38
39 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil {
40 t.Fatal("missing pipeline should fail")
41 }
42}
43
44func TestTektonWorkspaceValidation(t *testing.T) {
45 tests := []struct {
46 name string
47 yaml string
48 wantErr string
49 }{
50 {
51 name: "empty name",
52 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - storage: 1Gi\n",
53 wantErr: "name is required",
54 },
55 {
56 name: "no source",
57 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - name: scratch\n",
58 wantErr: "no volume source",
59 },
60 {
61 name: "multiple sources",
62 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - name: data\n storage: 5Gi\n pvc: my-pvc\n",
63 wantErr: "multiple volume sources",
64 },
65 {
66 name: "valid single source",
67 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - name: scratch\n storage: 1Gi\n",
68 },
69 }
70 for _, tt := range tests {
71 t.Run(tt.name, func(t *testing.T) {
72 _, err := parseTektonWorkflowConfig(tt.yaml)
73 if tt.wantErr == "" {
74 if err != nil {
75 t.Fatalf("unexpected error: %v", err)
76 }
77 return
78 }
79 if err == nil {
80 t.Fatalf("expected error containing %q", tt.wantErr)
81 }
82 if !strings.Contains(err.Error(), tt.wantErr) {
83 t.Fatalf(
84 "error %q does not contain %q",
85 err.Error(), tt.wantErr,
86 )
87 }
88 })
89 }
90}
91
92func TestTektonBuildPipelineRun(t *testing.T) {
93 cfg := &tektonWorkflowConfig{
94 Pipeline: "repo-ci",
95 ServiceAccount: "runner",
96 Params: map[string]string{
97 "image": "example/app",
98 },
99 }
100 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main")
101 if len(name) > 63 || name == "" {
102 t.Fatalf("bad generated name: %q", name)
103 }
104
105 obj := buildTektonPipelineRun("ci", name, cfg,
106 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main",
107 &tangled.Pipeline_Workflow{Name: "ci.yml"},
108 )
109 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind {
110 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind())
111 }
112 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name")
113 if !ok || pipeline != "repo-ci" {
114 t.Fatalf("pipelineRef.name = %q", pipeline)
115 }
116 sa, ok := obj.NestedString("spec", "taskRunTemplate", "serviceAccountName")
117 if !ok || sa != "runner" {
118 t.Fatalf("serviceAccountName = %q", sa)
119 }
120 // With cfg.Params={"image": "example/app"}, the merged params
121 // should contain the 3 built-ins (actor, branch, commit) plus
122 // the user-supplied "image" — 4 total, sorted alphabetically.
123 params, ok := obj.NestedSlice("spec", "params")
124 if !ok || len(params) != 4 {
125 t.Fatalf("params count = %d, want 4: %+v", len(params), params)
126 }
127 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" ||
128 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" {
129 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations())
130 }
131}
132
133func TestTektonBuildPipelineRunParamsOverride(t *testing.T) {
134 // When a user param collides with a built-in name, the user's
135 // value should win so callers can customize what the upstream
136 // Tekton Pipeline receives.
137 cfg := &tektonWorkflowConfig{
138 Pipeline: "repo-ci",
139 Params: map[string]string{
140 "commit": "user-override-sha",
141 "extra": "bonus",
142 },
143 }
144 obj := buildTektonPipelineRun("ci", "run-1", cfg,
145 "knot.example.com", "rkey-1", "did:plc:actor",
146 "original-sha", "main",
147 &tangled.Pipeline_Workflow{Name: "ci.yml"},
148 )
149 params, ok := obj.NestedSlice("spec", "params")
150 if !ok {
151 t.Fatal("params missing")
152 }
153 // Expect 4: actor, branch, commit (overridden), extra.
154 if len(params) != 4 {
155 t.Fatalf("params count = %d, want 4: %+v", len(params), params)
156 }
157 // Verify the user override took effect.
158 for _, raw := range params {
159 p, _ := raw.(map[string]any)
160 if p["name"] == "commit" && p["value"] != "user-override-sha" {
161 t.Fatalf(
162 "commit param = %q, want user-override-sha",
163 p["value"],
164 )
165 }
166 }
167}
168
169func TestTektonBuildPipelineRunWorkspaces(t *testing.T) {
170 storage := "5Gi"
171 pvc := "shared-cache"
172 secret := "git-credentials"
173 configMap := "app-config"
174 cfg := &tektonWorkflowConfig{
175 Pipeline: "repo-ci",
176 Workspaces: []tektonWorkspaceConfig{
177 {Name: "scratch", AccessModes: []string{"ReadWriteOnce"}, Storage: &storage},
178 {Name: "cache", PVC: &pvc},
179 {Name: "git-auth", Secret: &secret},
180 {Name: "config", ConfigMap: &configMap},
181 },
182 }
183
184 obj := buildTektonPipelineRun("ci", "run-1", cfg,
185 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main",
186 &tangled.Pipeline_Workflow{Name: "ci.yml"},
187 )
188
189 podTemplate, ok := obj.NestedMap("spec", "podTemplate")
190 if !ok {
191 t.Fatal("podTemplate missing for workspace-backed PipelineRun")
192 }
193 fsGroup, ok := k8s.NestedMap(podTemplate, "securityContext")
194 if !ok || fsGroup["fsGroup"] != 65532 {
195 t.Fatalf("podTemplate.securityContext = %+v", podTemplate)
196 }
197
198 workspaces, ok := obj.NestedSlice("spec", "workspaces")
199 if !ok || len(workspaces) != 4 {
200 t.Fatalf("workspaces = %+v", workspaces)
201 }
202
203 scratch, ok := workspaces[0].(map[string]any)
204 if !ok {
205 t.Fatalf("scratch workspace = %#v", workspaces[0])
206 }
207 if scratch["name"] != "scratch" {
208 t.Fatalf("scratch.name = %#v", scratch["name"])
209 }
210 storageSpec, ok := k8s.NestedMap(scratch, "volumeClaimTemplate", "spec", "resources", "requests")
211 if !ok || storageSpec["storage"] != "5Gi" {
212 t.Fatalf("scratch volumeClaimTemplate = %+v", scratch)
213 }
214
215 cache, ok := workspaces[1].(map[string]any)
216 if !ok {
217 t.Fatalf("cache workspace = %#v", workspaces[1])
218 }
219 claim, ok := k8s.NestedMap(cache, "persistentVolumeClaim")
220 if !ok || claim["claimName"] != "shared-cache" {
221 t.Fatalf("cache persistentVolumeClaim = %+v", cache)
222 }
223
224 gitAuth, ok := workspaces[2].(map[string]any)
225 if !ok {
226 t.Fatalf("git-auth workspace = %#v", workspaces[2])
227 }
228 secretRef, ok := k8s.NestedMap(gitAuth, "secret")
229 if !ok || secretRef["secretName"] != "git-credentials" {
230 t.Fatalf("git-auth secret = %+v", gitAuth)
231 }
232
233 cfgWs, ok := workspaces[3].(map[string]any)
234 if !ok {
235 t.Fatalf("config workspace = %#v", workspaces[3])
236 }
237 cmRef, ok := k8s.NestedMap(cfgWs, "configMap")
238 if !ok || cmRef["name"] != "app-config" {
239 t.Fatalf("config configMap = %+v", cfgWs)
240 }
241}
242
243func TestTektonStatusMapping(t *testing.T) {
244 tests := []struct {
245 name string
246 cond string
247 reason string
248 status string
249 terminal bool
250 ok bool
251 }{
252 {name: "unknown", cond: "Unknown", status: "running", ok: true},
253 {name: "success", cond: "True", status: "success", terminal: true, ok: true},
254 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true},
255 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true},
256 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true},
257 }
258 for _, tt := range tests {
259 t.Run(tt.name, func(t *testing.T) {
260 obj := tektonStatusObject(tt.cond, tt.reason)
261 status, terminal, ok := mapTektonPipelineRunStatus(obj)
262 if status != tt.status || terminal != tt.terminal || ok != tt.ok {
263 t.Fatalf("got %q/%v/%v; want %q/%v/%v",
264 status, terminal, ok, tt.status, tt.terminal, tt.ok)
265 }
266 })
267 }
268}
269
270func TestTektonSpawnCreatesPipelineRun(t *testing.T) {
271 p, st, _, client := newTektonTestProvider(t)
272 ctx, cancel := context.WithCancel(context.Background())
273 defer cancel()
274
275 trigger := &tangled.Pipeline_TriggerMetadata{
276 Push: &tangled.Pipeline_PushTriggerData{
277 NewSha: "abcdef0123",
278 Ref: "refs/heads/main",
279 },
280 }
281 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger,
282 []*tangled.Pipeline_Workflow{{Name: "ci.yml",
283 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}},
284 )
285
286 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml")
287 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" {
288 t.Fatalf("ref mismatch: %+v", ref)
289 }
290
291 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName)
292 if err != nil {
293 t.Fatalf("get PipelineRun: %v", err)
294 }
295 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name")
296 if !ok || pipeline != "repo-ci" {
297 t.Fatalf("pipelineRef.name = %q", pipeline)
298 }
299
300 rows, err := st.EventsAfter(context.Background(), 0)
301 if err != nil {
302 t.Fatalf("EventsAfter: %v", err)
303 }
304 if len(rows) != 1 {
305 t.Fatalf("got %d events, want 1", len(rows))
306 }
307 var rec tangled.PipelineStatus
308 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil {
309 t.Fatalf("decode status: %v", err)
310 }
311 if rec.Status != "pending" || rec.Workflow != "ci.yml" {
312 t.Fatalf("bad pending status: %+v", rec)
313 }
314}
315
316func TestTektonSpawnAlreadyExists(t *testing.T) {
317 p, st, _, client := newTektonTestProvider(t)
318 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main")
319 existing := buildTektonPipelineRun("ci", name,
320 &tektonWorkflowConfig{Pipeline: "repo-ci"},
321 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main",
322 &tangled.Pipeline_Workflow{Name: "ci.yml"},
323 )
324 existing.SetUID("uid-1")
325 client.SeedObject(pipelineRunsGVR, "ci", existing)
326
327 ctx, cancel := context.WithCancel(context.Background())
328 defer cancel()
329 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor",
330 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{
331 NewSha: "abcdef0123",
332 Ref: "refs/heads/main",
333 }},
334 []*tangled.Pipeline_Workflow{{Name: "ci.yml",
335 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}},
336 )
337
338 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml")
339 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" {
340 t.Fatalf("ref mismatch: %+v", ref)
341 }
342}
343
344func TestTektonLogsLookup(t *testing.T) {
345 p, st, _, client := newTektonTestProvider(t)
346 ctx := context.Background()
347 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) {
348 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err)
349 }
350 ref := TektonRunRef{
351 Knot: "knot.example.com",
352 PipelineRkey: "rkey-1",
353 Workflow: "ci.yml",
354 Namespace: "ci",
355 PipelineRunName: "run-1",
356 PipelineRunUID: "uid-1",
357 PipelineName: "repo-ci",
358 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"),
359 }
360 if err := st.InsertTektonRun(ctx, ref); err != nil {
361 t.Fatalf("insert ref: %v", err)
362 }
363 // With the mapping in place but no TaskRuns yet, Logs must NOT
364 // return ErrLogsNotFound: the workflow has been spawned and is
365 // just queueing inside Tekton. Surfacing 404 here mistranslates
366 // "still scheduling" as "doesn't exist" at the HTTP layer (see
367 // http.go's /logs handler). Verify we get an open channel that
368 // stays open until ctx is cancelled.
369 {
370 waitCtx, cancel := context.WithCancel(ctx)
371 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml")
372 if err != nil {
373 cancel()
374 t.Fatalf("logs before TaskRuns err = %v; want nil", err)
375 }
376 if ch == nil {
377 cancel()
378 t.Fatalf("logs before TaskRuns: nil channel")
379 }
380 // Channel must not produce any frames or close before we
381 // cancel. A premature close would mean the goroutine treated
382 // "no TaskRuns yet" as "stream done", which is what we just
383 // fixed.
384 select {
385 case line, ok := <-ch:
386 cancel()
387 if !ok {
388 t.Fatalf("logs channel closed before TaskRuns appeared")
389 }
390 t.Fatalf("unexpected frame before TaskRuns: %+v", line)
391 case <-time.After(50 * time.Millisecond):
392 }
393 cancel()
394 // After cancellation the producer goroutine must close the
395 // channel and not strand any frames.
396 for line := range ch {
397 t.Fatalf("unexpected frame after cancel: %+v", line)
398 }
399 }
400
401 // Seed a terminal PipelineRun so Logs takes the snapshot path
402 // (fetchCompletedTaskRunLogs, Follow=false) and closes the
403 // channel after draining the seeded TaskRun. The non-terminal
404 // path follows pod logs live and would only EOF on ctx cancel,
405 // which is not what this assertion is exercising.
406 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{
407 "apiVersion": "tekton.dev/v1",
408 "kind": "PipelineRun",
409 "metadata": map[string]any{
410 "name": "run-1",
411 "namespace": "ci",
412 },
413 "status": map[string]any{
414 "conditions": []any{map[string]any{
415 "type": "Succeeded",
416 "status": "True",
417 }},
418 },
419 })
420 client.SeedObject(taskRunsGVR, "ci", k8s.Object{
421 "apiVersion": "tekton.dev/v1",
422 "kind": "TaskRun",
423 "metadata": map[string]any{
424 "name": "task-1",
425 "namespace": "ci",
426 "labels": map[string]any{
427 "tekton.dev/pipelineRun": "run-1",
428 },
429 },
430 })
431 client.SeedPod("ci", k8s.Pod{
432 Name: "pod-1",
433 Namespace: "ci",
434 Labels: map[string]string{
435 "tekton.dev/taskRun": "task-1",
436 },
437 Containers: []k8s.Container{{Name: "step-test"}},
438 })
439 client.SetPodLog("ci", "pod-1", "step-test", "hello\n")
440
441 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml")
442 if err != nil {
443 t.Fatalf("Logs after pods: %v", err)
444 }
445 var got []LogLine
446 for line := range ch {
447 got = append(got, line)
448 }
449 if len(got) < 2 || got[0].StepStatus != StepStatusStart ||
450 got[len(got)-1].StepStatus != StepStatusEnd {
451 t.Fatalf("log frames = %+v", got)
452 }
453}
454
455func tektonStatusObject(condStatus, reason string) k8s.Object {
456 return k8s.Object{
457 "status": map[string]any{
458 "conditions": []any{map[string]any{
459 "type": "Succeeded",
460 "status": condStatus,
461 "reason": reason,
462 }},
463 },
464 }
465}
466
467func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef {
468 t.Helper()
469 deadline := time.Now().Add(2 * time.Second)
470 for time.Now().Before(deadline) {
471 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow)
472 if err != nil {
473 t.Fatalf("lookup: %v", err)
474 }
475 if ref != nil {
476 return ref
477 }
478 time.Sleep(20 * time.Millisecond)
479 }
480 t.Fatal("tekton run row not persisted within deadline")
481 return nil
482}