Stitch any CI into Tangled
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "log/slog"
8 "testing"
9 "time"
10
11 "tangled.org/core/api/tangled"
12
13 "go.mitchellh.com/tack/internal/k8s"
14)
15
16func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) {
17 t.Helper()
18 st := newTestStore(t)
19 br := newBroker(st)
20 client := k8s.NewFakeClient()
21 p := newTektonProvider(br, st, client, "ci", slog.Default())
22 return p, st, br, client
23}
24
25func TestTektonWorkflowConfig(t *testing.T) {
26 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n"
27 cfg, err := parseTektonWorkflowConfig(raw)
28 if err != nil {
29 t.Fatalf("parse: %v", err)
30 }
31 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" {
32 t.Fatalf("cfg mismatch: %+v", cfg)
33 }
34 if got := cfg.Params["image"]; got != "example/app" {
35 t.Fatalf("params[image] = %q", got)
36 }
37
38 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil {
39 t.Fatal("missing pipeline should fail")
40 }
41}
42
43func TestTektonBuildPipelineRun(t *testing.T) {
44 cfg := &tektonWorkflowConfig{
45 Pipeline: "repo-ci",
46 ServiceAccount: "runner",
47 Params: map[string]string{
48 "image": "example/app",
49 },
50 }
51 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main")
52 if len(name) > 63 || name == "" {
53 t.Fatalf("bad generated name: %q", name)
54 }
55
56 obj := buildTektonPipelineRun("ci", name, cfg,
57 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main",
58 &tangled.Pipeline_Workflow{Name: "ci.yml"},
59 )
60 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind {
61 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind())
62 }
63 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name")
64 if !ok || pipeline != "repo-ci" {
65 t.Fatalf("pipelineRef.name = %q", pipeline)
66 }
67 sa, ok := obj.NestedString("spec", "serviceAccountName")
68 if !ok || sa != "runner" {
69 t.Fatalf("serviceAccountName = %q", sa)
70 }
71 params, ok := obj.NestedSlice("spec", "params")
72 if !ok || len(params) != 1 {
73 t.Fatalf("params = %+v", params)
74 }
75 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" ||
76 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" {
77 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations())
78 }
79}
80
81func TestTektonBuildPipelineRunWorkspaces(t *testing.T) {
82 storage := "5Gi"
83 pvc := "shared-cache"
84 secret := "git-credentials"
85 cfg := &tektonWorkflowConfig{
86 Pipeline: "repo-ci",
87 Workspaces: []tektonWorkspaceConfig{
88 {Name: "scratch", AccessModes: []string{"ReadWriteOnce"}, Storage: &storage},
89 {Name: "cache", PVC: &pvc},
90 {Name: "git-auth", Secret: &secret},
91 },
92 }
93
94 obj := buildTektonPipelineRun("ci", "run-1", cfg,
95 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main",
96 &tangled.Pipeline_Workflow{Name: "ci.yml"},
97 )
98
99 podTemplate, ok := obj.NestedMap("spec", "podTemplate")
100 if !ok {
101 t.Fatal("podTemplate missing for workspace-backed PipelineRun")
102 }
103 fsGroup, ok := k8s.NestedMap(podTemplate, "securityContext")
104 if !ok || fsGroup["fsGroup"] != 65532 {
105 t.Fatalf("podTemplate.securityContext = %+v", podTemplate)
106 }
107
108 workspaces, ok := obj.NestedSlice("spec", "workspaces")
109 if !ok || len(workspaces) != 3 {
110 t.Fatalf("workspaces = %+v", workspaces)
111 }
112
113 scratch, ok := workspaces[0].(map[string]any)
114 if !ok {
115 t.Fatalf("scratch workspace = %#v", workspaces[0])
116 }
117 if scratch["name"] != "scratch" {
118 t.Fatalf("scratch.name = %#v", scratch["name"])
119 }
120 storageSpec, ok := k8s.NestedMap(scratch, "volumeClaimTemplate", "spec", "resources", "requests")
121 if !ok || storageSpec["storage"] != "5Gi" {
122 t.Fatalf("scratch volumeClaimTemplate = %+v", scratch)
123 }
124
125 cache, ok := workspaces[1].(map[string]any)
126 if !ok {
127 t.Fatalf("cache workspace = %#v", workspaces[1])
128 }
129 claim, ok := k8s.NestedMap(cache, "persistentVolumeClaim")
130 if !ok || claim["claimName"] != "shared-cache" {
131 t.Fatalf("cache persistentVolumeClaim = %+v", cache)
132 }
133
134 gitAuth, ok := workspaces[2].(map[string]any)
135 if !ok {
136 t.Fatalf("git-auth workspace = %#v", workspaces[2])
137 }
138 secretRef, ok := k8s.NestedMap(gitAuth, "secret")
139 if !ok || secretRef["secretName"] != "git-credentials" {
140 t.Fatalf("git-auth secret = %+v", gitAuth)
141 }
142}
143
144func TestTektonStatusMapping(t *testing.T) {
145 tests := []struct {
146 name string
147 cond string
148 reason string
149 status string
150 terminal bool
151 ok bool
152 }{
153 {name: "unknown", cond: "Unknown", status: "running", ok: true},
154 {name: "success", cond: "True", status: "success", terminal: true, ok: true},
155 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true},
156 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true},
157 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true},
158 }
159 for _, tt := range tests {
160 t.Run(tt.name, func(t *testing.T) {
161 obj := tektonStatusObject(tt.cond, tt.reason)
162 status, terminal, ok := mapTektonPipelineRunStatus(obj)
163 if status != tt.status || terminal != tt.terminal || ok != tt.ok {
164 t.Fatalf("got %q/%v/%v; want %q/%v/%v",
165 status, terminal, ok, tt.status, tt.terminal, tt.ok)
166 }
167 })
168 }
169}
170
171func TestTektonSpawnCreatesPipelineRun(t *testing.T) {
172 p, st, _, client := newTektonTestProvider(t)
173 ctx, cancel := context.WithCancel(context.Background())
174 defer cancel()
175
176 trigger := &tangled.Pipeline_TriggerMetadata{
177 Push: &tangled.Pipeline_PushTriggerData{
178 NewSha: "abcdef0123",
179 Ref: "refs/heads/main",
180 },
181 }
182 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger,
183 []*tangled.Pipeline_Workflow{{Name: "ci.yml",
184 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}},
185 )
186
187 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml")
188 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" {
189 t.Fatalf("ref mismatch: %+v", ref)
190 }
191
192 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName)
193 if err != nil {
194 t.Fatalf("get PipelineRun: %v", err)
195 }
196 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name")
197 if !ok || pipeline != "repo-ci" {
198 t.Fatalf("pipelineRef.name = %q", pipeline)
199 }
200
201 rows, err := st.EventsAfter(context.Background(), 0)
202 if err != nil {
203 t.Fatalf("EventsAfter: %v", err)
204 }
205 if len(rows) != 1 {
206 t.Fatalf("got %d events, want 1", len(rows))
207 }
208 var rec tangled.PipelineStatus
209 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil {
210 t.Fatalf("decode status: %v", err)
211 }
212 if rec.Status != "pending" || rec.Workflow != "ci.yml" {
213 t.Fatalf("bad pending status: %+v", rec)
214 }
215}
216
217func TestTektonSpawnAlreadyExists(t *testing.T) {
218 p, st, _, client := newTektonTestProvider(t)
219 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main")
220 existing := buildTektonPipelineRun("ci", name,
221 &tektonWorkflowConfig{Pipeline: "repo-ci"},
222 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main",
223 &tangled.Pipeline_Workflow{Name: "ci.yml"},
224 )
225 existing.SetUID("uid-1")
226 client.SeedObject(pipelineRunsGVR, "ci", existing)
227
228 ctx, cancel := context.WithCancel(context.Background())
229 defer cancel()
230 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor",
231 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{
232 NewSha: "abcdef0123",
233 Ref: "refs/heads/main",
234 }},
235 []*tangled.Pipeline_Workflow{{Name: "ci.yml",
236 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}},
237 )
238
239 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml")
240 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" {
241 t.Fatalf("ref mismatch: %+v", ref)
242 }
243}
244
245func TestTektonLogsLookup(t *testing.T) {
246 p, st, _, client := newTektonTestProvider(t)
247 ctx := context.Background()
248 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) {
249 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err)
250 }
251 ref := TektonRunRef{
252 Knot: "knot.example.com",
253 PipelineRkey: "rkey-1",
254 Workflow: "ci.yml",
255 Namespace: "ci",
256 PipelineRunName: "run-1",
257 PipelineRunUID: "uid-1",
258 PipelineName: "repo-ci",
259 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"),
260 }
261 if err := st.InsertTektonRun(ctx, ref); err != nil {
262 t.Fatalf("insert ref: %v", err)
263 }
264 // With the mapping in place but no TaskRuns yet, Logs must NOT
265 // return ErrLogsNotFound: the workflow has been spawned and is
266 // just queueing inside Tekton. Surfacing 404 here mistranslates
267 // "still scheduling" as "doesn't exist" at the HTTP layer (see
268 // http.go's /logs handler). Verify we get an open channel that
269 // stays open until ctx is cancelled.
270 {
271 waitCtx, cancel := context.WithCancel(ctx)
272 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml")
273 if err != nil {
274 cancel()
275 t.Fatalf("logs before TaskRuns err = %v; want nil", err)
276 }
277 if ch == nil {
278 cancel()
279 t.Fatalf("logs before TaskRuns: nil channel")
280 }
281 // Channel must not produce any frames or close before we
282 // cancel. A premature close would mean the goroutine treated
283 // "no TaskRuns yet" as "stream done", which is what we just
284 // fixed.
285 select {
286 case line, ok := <-ch:
287 cancel()
288 if !ok {
289 t.Fatalf("logs channel closed before TaskRuns appeared")
290 }
291 t.Fatalf("unexpected frame before TaskRuns: %+v", line)
292 case <-time.After(50 * time.Millisecond):
293 }
294 cancel()
295 // After cancellation the producer goroutine must close the
296 // channel and not strand any frames.
297 for line := range ch {
298 t.Fatalf("unexpected frame after cancel: %+v", line)
299 }
300 }
301
302 // Seed a terminal PipelineRun so Logs takes the snapshot path
303 // (fetchCompletedTaskRunLogs, Follow=false) and closes the
304 // channel after draining the seeded TaskRun. The non-terminal
305 // path follows pod logs live and would only EOF on ctx cancel,
306 // which is not what this assertion is exercising.
307 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{
308 "apiVersion": "tekton.dev/v1",
309 "kind": "PipelineRun",
310 "metadata": map[string]any{
311 "name": "run-1",
312 "namespace": "ci",
313 },
314 "status": map[string]any{
315 "conditions": []any{map[string]any{
316 "type": "Succeeded",
317 "status": "True",
318 }},
319 },
320 })
321 client.SeedObject(taskRunsGVR, "ci", k8s.Object{
322 "apiVersion": "tekton.dev/v1",
323 "kind": "TaskRun",
324 "metadata": map[string]any{
325 "name": "task-1",
326 "namespace": "ci",
327 "labels": map[string]any{
328 "tekton.dev/pipelineRun": "run-1",
329 },
330 },
331 })
332 client.SeedPod("ci", k8s.Pod{
333 Name: "pod-1",
334 Namespace: "ci",
335 Labels: map[string]string{
336 "tekton.dev/taskRun": "task-1",
337 },
338 Containers: []k8s.Container{{Name: "step-test"}},
339 })
340 client.SetPodLog("ci", "pod-1", "step-test", "hello\n")
341
342 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml")
343 if err != nil {
344 t.Fatalf("Logs after pods: %v", err)
345 }
346 var got []LogLine
347 for line := range ch {
348 got = append(got, line)
349 }
350 if len(got) < 2 || got[0].StepStatus != StepStatusStart ||
351 got[len(got)-1].StepStatus != StepStatusEnd {
352 t.Fatalf("log frames = %+v", got)
353 }
354}
355
356func tektonStatusObject(condStatus, reason string) k8s.Object {
357 return k8s.Object{
358 "status": map[string]any{
359 "conditions": []any{map[string]any{
360 "type": "Succeeded",
361 "status": condStatus,
362 "reason": reason,
363 }},
364 },
365 }
366}
367
368func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef {
369 t.Helper()
370 deadline := time.Now().Add(2 * time.Second)
371 for time.Now().Before(deadline) {
372 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow)
373 if err != nil {
374 t.Fatalf("lookup: %v", err)
375 }
376 if ref != nil {
377 return ref
378 }
379 time.Sleep(20 * time.Millisecond)
380 }
381 t.Fatal("tekton run row not persisted within deadline")
382 return nil
383}