Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 15 kB View raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "log/slog" 8 "strings" 9 "testing" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 14 "go.mitchellh.com/tack/internal/k8s" 15) 16 17func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) { 18 t.Helper() 19 st := newTestStore(t) 20 br := newBroker(st) 21 client := k8s.NewFakeClient() 22 p := newTektonProvider(br, st, client, "ci", slog.Default()) 23 return p, st, br, client 24} 25 26func TestTektonWorkflowConfig(t *testing.T) { 27 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n" 28 cfg, err := parseTektonWorkflowConfig(raw) 29 if err != nil { 30 t.Fatalf("parse: %v", err) 31 } 32 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" { 33 t.Fatalf("cfg mismatch: %+v", cfg) 34 } 35 if got := cfg.Params["image"]; got != "example/app" { 36 t.Fatalf("params[image] = %q", got) 37 } 38 39 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil { 40 t.Fatal("missing pipeline should fail") 41 } 42} 43 44func TestTektonWorkspaceValidation(t *testing.T) { 45 tests := []struct { 46 name string 47 yaml string 48 wantErr string 49 }{ 50 { 51 name: "empty name", 52 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - storage: 1Gi\n", 53 wantErr: "name is required", 54 }, 55 { 56 name: "no source", 57 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - name: scratch\n", 58 wantErr: "no volume source", 59 }, 60 { 61 name: "multiple sources", 62 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - name: data\n storage: 5Gi\n pvc: my-pvc\n", 63 wantErr: "multiple volume sources", 64 }, 65 { 66 name: "valid single source", 67 yaml: "tack:\n tekton:\n pipeline: ci\n workspaces:\n - name: scratch\n storage: 1Gi\n", 68 }, 69 } 70 for _, tt := range tests { 71 t.Run(tt.name, func(t *testing.T) { 72 _, err := parseTektonWorkflowConfig(tt.yaml) 73 if tt.wantErr == "" { 74 if err != nil { 75 t.Fatalf("unexpected error: %v", err) 76 } 77 return 78 } 79 if err == nil { 80 t.Fatalf("expected error containing %q", tt.wantErr) 81 } 82 if !strings.Contains(err.Error(), tt.wantErr) { 83 t.Fatalf( 84 "error %q does not contain %q", 85 err.Error(), tt.wantErr, 86 ) 87 } 88 }) 89 } 90} 91 92func TestTektonBuildPipelineRun(t *testing.T) { 93 cfg := &tektonWorkflowConfig{ 94 Pipeline: "repo-ci", 95 ServiceAccount: "runner", 96 Params: map[string]string{ 97 "image": "example/app", 98 }, 99 } 100 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main") 101 if len(name) > 63 || name == "" { 102 t.Fatalf("bad generated name: %q", name) 103 } 104 105 obj := buildTektonPipelineRun("ci", name, cfg, 106 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main", 107 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 108 ) 109 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind { 110 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind()) 111 } 112 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 113 if !ok || pipeline != "repo-ci" { 114 t.Fatalf("pipelineRef.name = %q", pipeline) 115 } 116 sa, ok := obj.NestedString("spec", "taskRunTemplate", "serviceAccountName") 117 if !ok || sa != "runner" { 118 t.Fatalf("serviceAccountName = %q", sa) 119 } 120 // With cfg.Params={"image": "example/app"}, the merged params 121 // should contain the 3 built-ins (actor, branch, commit) plus 122 // the user-supplied "image" — 4 total, sorted alphabetically. 123 params, ok := obj.NestedSlice("spec", "params") 124 if !ok || len(params) != 4 { 125 t.Fatalf("params count = %d, want 4: %+v", len(params), params) 126 } 127 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" || 128 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" { 129 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations()) 130 } 131} 132 133func TestTektonBuildPipelineRunParamsOverride(t *testing.T) { 134 // When a user param collides with a built-in name, the user's 135 // value should win so callers can customize what the upstream 136 // Tekton Pipeline receives. 137 cfg := &tektonWorkflowConfig{ 138 Pipeline: "repo-ci", 139 Params: map[string]string{ 140 "commit": "user-override-sha", 141 "extra": "bonus", 142 }, 143 } 144 obj := buildTektonPipelineRun("ci", "run-1", cfg, 145 "knot.example.com", "rkey-1", "did:plc:actor", 146 "original-sha", "main", 147 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 148 ) 149 params, ok := obj.NestedSlice("spec", "params") 150 if !ok { 151 t.Fatal("params missing") 152 } 153 // Expect 4: actor, branch, commit (overridden), extra. 154 if len(params) != 4 { 155 t.Fatalf("params count = %d, want 4: %+v", len(params), params) 156 } 157 // Verify the user override took effect. 158 for _, raw := range params { 159 p, _ := raw.(map[string]any) 160 if p["name"] == "commit" && p["value"] != "user-override-sha" { 161 t.Fatalf( 162 "commit param = %q, want user-override-sha", 163 p["value"], 164 ) 165 } 166 } 167} 168 169func TestTektonBuildPipelineRunWorkspaces(t *testing.T) { 170 storage := "5Gi" 171 pvc := "shared-cache" 172 secret := "git-credentials" 173 configMap := "app-config" 174 cfg := &tektonWorkflowConfig{ 175 Pipeline: "repo-ci", 176 Workspaces: []tektonWorkspaceConfig{ 177 {Name: "scratch", AccessModes: []string{"ReadWriteOnce"}, Storage: &storage}, 178 {Name: "cache", PVC: &pvc}, 179 {Name: "git-auth", Secret: &secret}, 180 {Name: "config", ConfigMap: &configMap}, 181 }, 182 } 183 184 obj := buildTektonPipelineRun("ci", "run-1", cfg, 185 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main", 186 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 187 ) 188 189 podTemplate, ok := obj.NestedMap("spec", "podTemplate") 190 if !ok { 191 t.Fatal("podTemplate missing for workspace-backed PipelineRun") 192 } 193 fsGroup, ok := k8s.NestedMap(podTemplate, "securityContext") 194 if !ok || fsGroup["fsGroup"] != 65532 { 195 t.Fatalf("podTemplate.securityContext = %+v", podTemplate) 196 } 197 198 workspaces, ok := obj.NestedSlice("spec", "workspaces") 199 if !ok || len(workspaces) != 4 { 200 t.Fatalf("workspaces = %+v", workspaces) 201 } 202 203 scratch, ok := workspaces[0].(map[string]any) 204 if !ok { 205 t.Fatalf("scratch workspace = %#v", workspaces[0]) 206 } 207 if scratch["name"] != "scratch" { 208 t.Fatalf("scratch.name = %#v", scratch["name"]) 209 } 210 storageSpec, ok := k8s.NestedMap(scratch, "volumeClaimTemplate", "spec", "resources", "requests") 211 if !ok || storageSpec["storage"] != "5Gi" { 212 t.Fatalf("scratch volumeClaimTemplate = %+v", scratch) 213 } 214 215 cache, ok := workspaces[1].(map[string]any) 216 if !ok { 217 t.Fatalf("cache workspace = %#v", workspaces[1]) 218 } 219 claim, ok := k8s.NestedMap(cache, "persistentVolumeClaim") 220 if !ok || claim["claimName"] != "shared-cache" { 221 t.Fatalf("cache persistentVolumeClaim = %+v", cache) 222 } 223 224 gitAuth, ok := workspaces[2].(map[string]any) 225 if !ok { 226 t.Fatalf("git-auth workspace = %#v", workspaces[2]) 227 } 228 secretRef, ok := k8s.NestedMap(gitAuth, "secret") 229 if !ok || secretRef["secretName"] != "git-credentials" { 230 t.Fatalf("git-auth secret = %+v", gitAuth) 231 } 232 233 cfgWs, ok := workspaces[3].(map[string]any) 234 if !ok { 235 t.Fatalf("config workspace = %#v", workspaces[3]) 236 } 237 cmRef, ok := k8s.NestedMap(cfgWs, "configMap") 238 if !ok || cmRef["name"] != "app-config" { 239 t.Fatalf("config configMap = %+v", cfgWs) 240 } 241} 242 243func TestTektonStatusMapping(t *testing.T) { 244 tests := []struct { 245 name string 246 cond string 247 reason string 248 status string 249 terminal bool 250 ok bool 251 }{ 252 {name: "unknown", cond: "Unknown", status: "running", ok: true}, 253 {name: "success", cond: "True", status: "success", terminal: true, ok: true}, 254 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true}, 255 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true}, 256 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true}, 257 } 258 for _, tt := range tests { 259 t.Run(tt.name, func(t *testing.T) { 260 obj := tektonStatusObject(tt.cond, tt.reason) 261 status, terminal, ok := mapTektonPipelineRunStatus(obj) 262 if status != tt.status || terminal != tt.terminal || ok != tt.ok { 263 t.Fatalf("got %q/%v/%v; want %q/%v/%v", 264 status, terminal, ok, tt.status, tt.terminal, tt.ok) 265 } 266 }) 267 } 268} 269 270func TestTektonSpawnCreatesPipelineRun(t *testing.T) { 271 p, st, _, client := newTektonTestProvider(t) 272 ctx, cancel := context.WithCancel(context.Background()) 273 defer cancel() 274 275 trigger := &tangled.Pipeline_TriggerMetadata{ 276 Push: &tangled.Pipeline_PushTriggerData{ 277 NewSha: "abcdef0123", 278 Ref: "refs/heads/main", 279 }, 280 } 281 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger, 282 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 283 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 284 ) 285 286 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 287 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" { 288 t.Fatalf("ref mismatch: %+v", ref) 289 } 290 291 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName) 292 if err != nil { 293 t.Fatalf("get PipelineRun: %v", err) 294 } 295 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 296 if !ok || pipeline != "repo-ci" { 297 t.Fatalf("pipelineRef.name = %q", pipeline) 298 } 299 300 rows, err := st.EventsAfter(context.Background(), 0) 301 if err != nil { 302 t.Fatalf("EventsAfter: %v", err) 303 } 304 if len(rows) != 1 { 305 t.Fatalf("got %d events, want 1", len(rows)) 306 } 307 var rec tangled.PipelineStatus 308 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 309 t.Fatalf("decode status: %v", err) 310 } 311 if rec.Status != "pending" || rec.Workflow != "ci.yml" { 312 t.Fatalf("bad pending status: %+v", rec) 313 } 314} 315 316func TestTektonSpawnAlreadyExists(t *testing.T) { 317 p, st, _, client := newTektonTestProvider(t) 318 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main") 319 existing := buildTektonPipelineRun("ci", name, 320 &tektonWorkflowConfig{Pipeline: "repo-ci"}, 321 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main", 322 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 323 ) 324 existing.SetUID("uid-1") 325 client.SeedObject(pipelineRunsGVR, "ci", existing) 326 327 ctx, cancel := context.WithCancel(context.Background()) 328 defer cancel() 329 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", 330 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{ 331 NewSha: "abcdef0123", 332 Ref: "refs/heads/main", 333 }}, 334 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 335 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 336 ) 337 338 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 339 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" { 340 t.Fatalf("ref mismatch: %+v", ref) 341 } 342} 343 344func TestTektonLogsLookup(t *testing.T) { 345 p, st, _, client := newTektonTestProvider(t) 346 ctx := context.Background() 347 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) { 348 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err) 349 } 350 ref := TektonRunRef{ 351 Knot: "knot.example.com", 352 PipelineRkey: "rkey-1", 353 Workflow: "ci.yml", 354 Namespace: "ci", 355 PipelineRunName: "run-1", 356 PipelineRunUID: "uid-1", 357 PipelineName: "repo-ci", 358 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"), 359 } 360 if err := st.InsertTektonRun(ctx, ref); err != nil { 361 t.Fatalf("insert ref: %v", err) 362 } 363 // With the mapping in place but no TaskRuns yet, Logs must NOT 364 // return ErrLogsNotFound: the workflow has been spawned and is 365 // just queueing inside Tekton. Surfacing 404 here mistranslates 366 // "still scheduling" as "doesn't exist" at the HTTP layer (see 367 // http.go's /logs handler). Verify we get an open channel that 368 // stays open until ctx is cancelled. 369 { 370 waitCtx, cancel := context.WithCancel(ctx) 371 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml") 372 if err != nil { 373 cancel() 374 t.Fatalf("logs before TaskRuns err = %v; want nil", err) 375 } 376 if ch == nil { 377 cancel() 378 t.Fatalf("logs before TaskRuns: nil channel") 379 } 380 // Channel must not produce any frames or close before we 381 // cancel. A premature close would mean the goroutine treated 382 // "no TaskRuns yet" as "stream done", which is what we just 383 // fixed. 384 select { 385 case line, ok := <-ch: 386 cancel() 387 if !ok { 388 t.Fatalf("logs channel closed before TaskRuns appeared") 389 } 390 t.Fatalf("unexpected frame before TaskRuns: %+v", line) 391 case <-time.After(50 * time.Millisecond): 392 } 393 cancel() 394 // After cancellation the producer goroutine must close the 395 // channel and not strand any frames. 396 for line := range ch { 397 t.Fatalf("unexpected frame after cancel: %+v", line) 398 } 399 } 400 401 // Seed a terminal PipelineRun so Logs takes the snapshot path 402 // (fetchCompletedTaskRunLogs, Follow=false) and closes the 403 // channel after draining the seeded TaskRun. The non-terminal 404 // path follows pod logs live and would only EOF on ctx cancel, 405 // which is not what this assertion is exercising. 406 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{ 407 "apiVersion": "tekton.dev/v1", 408 "kind": "PipelineRun", 409 "metadata": map[string]any{ 410 "name": "run-1", 411 "namespace": "ci", 412 }, 413 "status": map[string]any{ 414 "conditions": []any{map[string]any{ 415 "type": "Succeeded", 416 "status": "True", 417 }}, 418 }, 419 }) 420 client.SeedObject(taskRunsGVR, "ci", k8s.Object{ 421 "apiVersion": "tekton.dev/v1", 422 "kind": "TaskRun", 423 "metadata": map[string]any{ 424 "name": "task-1", 425 "namespace": "ci", 426 "labels": map[string]any{ 427 "tekton.dev/pipelineRun": "run-1", 428 }, 429 }, 430 }) 431 client.SeedPod("ci", k8s.Pod{ 432 Name: "pod-1", 433 Namespace: "ci", 434 Labels: map[string]string{ 435 "tekton.dev/taskRun": "task-1", 436 }, 437 Containers: []k8s.Container{{Name: "step-test"}}, 438 }) 439 client.SetPodLog("ci", "pod-1", "step-test", "hello\n") 440 441 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml") 442 if err != nil { 443 t.Fatalf("Logs after pods: %v", err) 444 } 445 var got []LogLine 446 for line := range ch { 447 got = append(got, line) 448 } 449 if len(got) < 2 || got[0].StepStatus != StepStatusStart || 450 got[len(got)-1].StepStatus != StepStatusEnd { 451 t.Fatalf("log frames = %+v", got) 452 } 453} 454 455func tektonStatusObject(condStatus, reason string) k8s.Object { 456 return k8s.Object{ 457 "status": map[string]any{ 458 "conditions": []any{map[string]any{ 459 "type": "Succeeded", 460 "status": condStatus, 461 "reason": reason, 462 }}, 463 }, 464 } 465} 466 467func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef { 468 t.Helper() 469 deadline := time.Now().Add(2 * time.Second) 470 for time.Now().Before(deadline) { 471 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow) 472 if err != nil { 473 t.Fatalf("lookup: %v", err) 474 } 475 if ref != nil { 476 return ref 477 } 478 time.Sleep(20 * time.Millisecond) 479 } 480 t.Fatal("tekton run row not persisted within deadline") 481 return nil 482}