Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "log/slog" 8 "testing" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 13 "go.mitchellh.com/tack/internal/k8s" 14) 15 16func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) { 17 t.Helper() 18 st := newTestStore(t) 19 br := newBroker(st) 20 client := k8s.NewFakeClient() 21 p := newTektonProvider(br, st, client, "ci", slog.Default()) 22 return p, st, br, client 23} 24 25func TestTektonWorkflowConfig(t *testing.T) { 26 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n" 27 cfg, err := parseTektonWorkflowConfig(raw) 28 if err != nil { 29 t.Fatalf("parse: %v", err) 30 } 31 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" { 32 t.Fatalf("cfg mismatch: %+v", cfg) 33 } 34 if got := cfg.Params["image"]; got != "example/app" { 35 t.Fatalf("params[image] = %q", got) 36 } 37 38 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil { 39 t.Fatal("missing pipeline should fail") 40 } 41} 42 43func TestTektonBuildPipelineRun(t *testing.T) { 44 cfg := &tektonWorkflowConfig{ 45 Pipeline: "repo-ci", 46 ServiceAccount: "runner", 47 Params: map[string]string{ 48 "image": "example/app", 49 }, 50 } 51 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main") 52 if len(name) > 63 || name == "" { 53 t.Fatalf("bad generated name: %q", name) 54 } 55 56 obj := buildTektonPipelineRun("ci", name, cfg, 57 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main", 58 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 59 ) 60 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind { 61 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind()) 62 } 63 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 64 if !ok || pipeline != "repo-ci" { 65 t.Fatalf("pipelineRef.name = %q", pipeline) 66 } 67 sa, ok := obj.NestedString("spec", "serviceAccountName") 68 if !ok || sa != "runner" { 69 t.Fatalf("serviceAccountName = %q", sa) 70 } 71 params, ok := obj.NestedSlice("spec", "params") 72 if !ok || len(params) != 1 { 73 t.Fatalf("params = %+v", params) 74 } 75 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" || 76 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" { 77 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations()) 78 } 79} 80 81func TestTektonBuildPipelineRunWorkspaces(t *testing.T) { 82 storage := "5Gi" 83 pvc := "shared-cache" 84 secret := "git-credentials" 85 cfg := &tektonWorkflowConfig{ 86 Pipeline: "repo-ci", 87 Workspaces: []tektonWorkspaceConfig{ 88 {Name: "scratch", AccessModes: []string{"ReadWriteOnce"}, Storage: &storage}, 89 {Name: "cache", PVC: &pvc}, 90 {Name: "git-auth", Secret: &secret}, 91 }, 92 } 93 94 obj := buildTektonPipelineRun("ci", "run-1", cfg, 95 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main", 96 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 97 ) 98 99 podTemplate, ok := obj.NestedMap("spec", "podTemplate") 100 if !ok { 101 t.Fatal("podTemplate missing for workspace-backed PipelineRun") 102 } 103 fsGroup, ok := k8s.NestedMap(podTemplate, "securityContext") 104 if !ok || fsGroup["fsGroup"] != 65532 { 105 t.Fatalf("podTemplate.securityContext = %+v", podTemplate) 106 } 107 108 workspaces, ok := obj.NestedSlice("spec", "workspaces") 109 if !ok || len(workspaces) != 3 { 110 t.Fatalf("workspaces = %+v", workspaces) 111 } 112 113 scratch, ok := workspaces[0].(map[string]any) 114 if !ok { 115 t.Fatalf("scratch workspace = %#v", workspaces[0]) 116 } 117 if scratch["name"] != "scratch" { 118 t.Fatalf("scratch.name = %#v", scratch["name"]) 119 } 120 storageSpec, ok := k8s.NestedMap(scratch, "volumeClaimTemplate", "spec", "resources", "requests") 121 if !ok || storageSpec["storage"] != "5Gi" { 122 t.Fatalf("scratch volumeClaimTemplate = %+v", scratch) 123 } 124 125 cache, ok := workspaces[1].(map[string]any) 126 if !ok { 127 t.Fatalf("cache workspace = %#v", workspaces[1]) 128 } 129 claim, ok := k8s.NestedMap(cache, "persistentVolumeClaim") 130 if !ok || claim["claimName"] != "shared-cache" { 131 t.Fatalf("cache persistentVolumeClaim = %+v", cache) 132 } 133 134 gitAuth, ok := workspaces[2].(map[string]any) 135 if !ok { 136 t.Fatalf("git-auth workspace = %#v", workspaces[2]) 137 } 138 secretRef, ok := k8s.NestedMap(gitAuth, "secret") 139 if !ok || secretRef["secretName"] != "git-credentials" { 140 t.Fatalf("git-auth secret = %+v", gitAuth) 141 } 142} 143 144func TestTektonStatusMapping(t *testing.T) { 145 tests := []struct { 146 name string 147 cond string 148 reason string 149 status string 150 terminal bool 151 ok bool 152 }{ 153 {name: "unknown", cond: "Unknown", status: "running", ok: true}, 154 {name: "success", cond: "True", status: "success", terminal: true, ok: true}, 155 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true}, 156 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true}, 157 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true}, 158 } 159 for _, tt := range tests { 160 t.Run(tt.name, func(t *testing.T) { 161 obj := tektonStatusObject(tt.cond, tt.reason) 162 status, terminal, ok := mapTektonPipelineRunStatus(obj) 163 if status != tt.status || terminal != tt.terminal || ok != tt.ok { 164 t.Fatalf("got %q/%v/%v; want %q/%v/%v", 165 status, terminal, ok, tt.status, tt.terminal, tt.ok) 166 } 167 }) 168 } 169} 170 171func TestTektonSpawnCreatesPipelineRun(t *testing.T) { 172 p, st, _, client := newTektonTestProvider(t) 173 ctx, cancel := context.WithCancel(context.Background()) 174 defer cancel() 175 176 trigger := &tangled.Pipeline_TriggerMetadata{ 177 Push: &tangled.Pipeline_PushTriggerData{ 178 NewSha: "abcdef0123", 179 Ref: "refs/heads/main", 180 }, 181 } 182 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger, 183 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 184 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 185 ) 186 187 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 188 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" { 189 t.Fatalf("ref mismatch: %+v", ref) 190 } 191 192 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName) 193 if err != nil { 194 t.Fatalf("get PipelineRun: %v", err) 195 } 196 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 197 if !ok || pipeline != "repo-ci" { 198 t.Fatalf("pipelineRef.name = %q", pipeline) 199 } 200 201 rows, err := st.EventsAfter(context.Background(), 0) 202 if err != nil { 203 t.Fatalf("EventsAfter: %v", err) 204 } 205 if len(rows) != 1 { 206 t.Fatalf("got %d events, want 1", len(rows)) 207 } 208 var rec tangled.PipelineStatus 209 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 210 t.Fatalf("decode status: %v", err) 211 } 212 if rec.Status != "pending" || rec.Workflow != "ci.yml" { 213 t.Fatalf("bad pending status: %+v", rec) 214 } 215} 216 217func TestTektonSpawnAlreadyExists(t *testing.T) { 218 p, st, _, client := newTektonTestProvider(t) 219 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main") 220 existing := buildTektonPipelineRun("ci", name, 221 &tektonWorkflowConfig{Pipeline: "repo-ci"}, 222 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main", 223 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 224 ) 225 existing.SetUID("uid-1") 226 client.SeedObject(pipelineRunsGVR, "ci", existing) 227 228 ctx, cancel := context.WithCancel(context.Background()) 229 defer cancel() 230 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", 231 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{ 232 NewSha: "abcdef0123", 233 Ref: "refs/heads/main", 234 }}, 235 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 236 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 237 ) 238 239 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 240 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" { 241 t.Fatalf("ref mismatch: %+v", ref) 242 } 243} 244 245func TestTektonLogsLookup(t *testing.T) { 246 p, st, _, client := newTektonTestProvider(t) 247 ctx := context.Background() 248 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) { 249 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err) 250 } 251 ref := TektonRunRef{ 252 Knot: "knot.example.com", 253 PipelineRkey: "rkey-1", 254 Workflow: "ci.yml", 255 Namespace: "ci", 256 PipelineRunName: "run-1", 257 PipelineRunUID: "uid-1", 258 PipelineName: "repo-ci", 259 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"), 260 } 261 if err := st.InsertTektonRun(ctx, ref); err != nil { 262 t.Fatalf("insert ref: %v", err) 263 } 264 // With the mapping in place but no TaskRuns yet, Logs must NOT 265 // return ErrLogsNotFound: the workflow has been spawned and is 266 // just queueing inside Tekton. Surfacing 404 here mistranslates 267 // "still scheduling" as "doesn't exist" at the HTTP layer (see 268 // http.go's /logs handler). Verify we get an open channel that 269 // stays open until ctx is cancelled. 270 { 271 waitCtx, cancel := context.WithCancel(ctx) 272 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml") 273 if err != nil { 274 cancel() 275 t.Fatalf("logs before TaskRuns err = %v; want nil", err) 276 } 277 if ch == nil { 278 cancel() 279 t.Fatalf("logs before TaskRuns: nil channel") 280 } 281 // Channel must not produce any frames or close before we 282 // cancel. A premature close would mean the goroutine treated 283 // "no TaskRuns yet" as "stream done", which is what we just 284 // fixed. 285 select { 286 case line, ok := <-ch: 287 cancel() 288 if !ok { 289 t.Fatalf("logs channel closed before TaskRuns appeared") 290 } 291 t.Fatalf("unexpected frame before TaskRuns: %+v", line) 292 case <-time.After(50 * time.Millisecond): 293 } 294 cancel() 295 // After cancellation the producer goroutine must close the 296 // channel and not strand any frames. 297 for line := range ch { 298 t.Fatalf("unexpected frame after cancel: %+v", line) 299 } 300 } 301 302 // Seed a terminal PipelineRun so Logs takes the snapshot path 303 // (fetchCompletedTaskRunLogs, Follow=false) and closes the 304 // channel after draining the seeded TaskRun. The non-terminal 305 // path follows pod logs live and would only EOF on ctx cancel, 306 // which is not what this assertion is exercising. 307 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{ 308 "apiVersion": "tekton.dev/v1", 309 "kind": "PipelineRun", 310 "metadata": map[string]any{ 311 "name": "run-1", 312 "namespace": "ci", 313 }, 314 "status": map[string]any{ 315 "conditions": []any{map[string]any{ 316 "type": "Succeeded", 317 "status": "True", 318 }}, 319 }, 320 }) 321 client.SeedObject(taskRunsGVR, "ci", k8s.Object{ 322 "apiVersion": "tekton.dev/v1", 323 "kind": "TaskRun", 324 "metadata": map[string]any{ 325 "name": "task-1", 326 "namespace": "ci", 327 "labels": map[string]any{ 328 "tekton.dev/pipelineRun": "run-1", 329 }, 330 }, 331 }) 332 client.SeedPod("ci", k8s.Pod{ 333 Name: "pod-1", 334 Namespace: "ci", 335 Labels: map[string]string{ 336 "tekton.dev/taskRun": "task-1", 337 }, 338 Containers: []k8s.Container{{Name: "step-test"}}, 339 }) 340 client.SetPodLog("ci", "pod-1", "step-test", "hello\n") 341 342 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml") 343 if err != nil { 344 t.Fatalf("Logs after pods: %v", err) 345 } 346 var got []LogLine 347 for line := range ch { 348 got = append(got, line) 349 } 350 if len(got) < 2 || got[0].StepStatus != StepStatusStart || 351 got[len(got)-1].StepStatus != StepStatusEnd { 352 t.Fatalf("log frames = %+v", got) 353 } 354} 355 356func tektonStatusObject(condStatus, reason string) k8s.Object { 357 return k8s.Object{ 358 "status": map[string]any{ 359 "conditions": []any{map[string]any{ 360 "type": "Succeeded", 361 "status": condStatus, 362 "reason": reason, 363 }}, 364 }, 365 } 366} 367 368func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef { 369 t.Helper() 370 deadline := time.Now().Add(2 * time.Second) 371 for time.Now().Before(deadline) { 372 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow) 373 if err != nil { 374 t.Fatalf("lookup: %v", err) 375 } 376 if ref != nil { 377 return ref 378 } 379 time.Sleep(20 * time.Millisecond) 380 } 381 t.Fatal("tekton run row not persisted within deadline") 382 return nil 383}