Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 30 kB View raw
1package main 2 3// tektonProvider implements Provider by creating Tekton PipelineRuns 4// directly inside the Kubernetes cluster. Tack already receives and 5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would 6// duplicate the event-to-run translation layer instead of simplifying it. 7 8import ( 9 "bufio" 10 "context" 11 "crypto/sha256" 12 "encoding/hex" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "log/slog" 18 "sort" 19 "strings" 20 "time" 21 "unicode" 22 23 "tangled.org/core/api/tangled" 24 25 "go.mitchellh.com/tack/internal/k8s" 26 "go.yaml.in/yaml/v2" 27) 28 29const ( 30 tektonAPIVersion = "tekton.dev/v1" 31 tektonRunKind = "PipelineRun" 32 33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by" 34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 35 tektonLabelWorkflow = "tack.mitchellh.com/workflow" 36 37 tektonAnnotationKnot = "tack.mitchellh.com/knot" 38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow" 40 tektonAnnotationActor = "tack.mitchellh.com/actor" 41 tektonAnnotationCommit = "tack.mitchellh.com/commit" 42 tektonAnnotationBranch = "tack.mitchellh.com/branch" 43) 44 45var ( 46 pipelineRunsGVR = k8s.GVR{ 47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns", 48 } 49 taskRunsGVR = k8s.GVR{ 50 Group: "tekton.dev", Version: "v1", Resource: "taskruns", 51 } 52) 53 54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML. 55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are 56// deliberately string-only in v1: tack is meant to select an existing 57// runner and pass a small amount of routing data, not mirror Tekton's 58// entire PipelineRun API. 59type tektonWorkflowConfig struct { 60 Pipeline string `yaml:"pipeline"` 61 ServiceAccount string `yaml:"service_account"` 62 Params map[string]string `yaml:"params"` 63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"` 64} 65 66type tektonWorkspaceConfig struct { 67 Name string `yaml:"name"` 68 AccessModes []string `yaml:"access_modes"` 69 Storage *string `yaml:"storage"` 70 PVC *string `yaml:"pvc"` 71 Secret *string `yaml:"secret"` 72 ConfigMap *string `yaml:"config_map"` 73} 74 75type tektonWorkflowDoc struct { 76 Tack struct { 77 Tekton tektonWorkflowConfig `yaml:"tekton"` 78 } `yaml:"tack"` 79} 80 81// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body. 82func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) { 83 if strings.TrimSpace(raw) == "" { 84 return nil, errors.New("workflow body is empty") 85 } 86 var doc tektonWorkflowDoc 87 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 88 return nil, fmt.Errorf("parse workflow yaml: %w", err) 89 } 90 cfg := doc.Tack.Tekton 91 if cfg.Pipeline == "" { 92 return nil, errors.New( 93 "workflow yaml: `tack.tekton.pipeline` is required", 94 ) 95 } 96 97 // Validate workspaces: each must have exactly one volume source 98 // so the resulting PipelineRun is unambiguous. Without this, 99 // a workspace with e.g. both `storage` and `pvc` set silently 100 // picks whichever the switch statement matches first. 101 for _, ws := range cfg.Workspaces { 102 if ws.Name == "" { 103 return nil, errors.New("workspace: name is required") 104 } 105 sources := 0 106 if ws.Storage != nil { 107 sources++ 108 } 109 if ws.PVC != nil { 110 sources++ 111 } 112 if ws.Secret != nil { 113 sources++ 114 } 115 if ws.ConfigMap != nil { 116 sources++ 117 } 118 if sources == 0 { 119 return nil, fmt.Errorf( 120 "workspace %q: no volume source specified", ws.Name, 121 ) 122 } 123 if sources > 1 { 124 return nil, fmt.Errorf( 125 "workspace %q: multiple volume sources specified"+ 126 " (pick one of storage, pvc, secret, config_map)", 127 ws.Name, 128 ) 129 } 130 } 131 132 return &cfg, nil 133} 134 135type tektonProvider struct { 136 br *broker 137 st *store 138 log *slog.Logger 139 client k8s.Client 140 namespace string 141} 142 143var _ Provider = (*tektonProvider)(nil) 144 145func newTektonProvider( 146 br *broker, 147 st *store, 148 client k8s.Client, 149 namespace string, 150 log *slog.Logger, 151) *tektonProvider { 152 return &tektonProvider{ 153 br: br, 154 st: st, 155 log: log.With("component", "provider", "kind", "tekton"), 156 client: client, 157 namespace: namespace, 158 } 159} 160 161func newInClusterTektonProvider( 162 br *broker, 163 st *store, 164 namespace string, 165 log *slog.Logger, 166) (*tektonProvider, error) { 167 client, err := k8s.NewInClusterClient() 168 if err != nil { 169 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err) 170 } 171 return newTektonProvider(br, st, client, namespace, log), nil 172} 173 174func (p *tektonProvider) Spawn( 175 ctx context.Context, 176 knot string, 177 pipelineRkey string, 178 actor string, 179 trigger *tangled.Pipeline_TriggerMetadata, 180 workflows []*tangled.Pipeline_Workflow, 181) { 182 if len(workflows) == 0 { 183 p.log.Warn("pipeline has no workflows; nothing to spawn", 184 "knot", knot, "rkey", pipelineRkey, 185 ) 186 return 187 } 188 for _, wf := range workflows { 189 if wf == nil || wf.Name == "" { 190 continue 191 } 192 wf := wf 193 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 194 } 195} 196 197func (p *tektonProvider) spawnWorkflow( 198 ctx context.Context, 199 knot string, 200 pipelineRkey string, 201 actor string, 202 trigger *tangled.Pipeline_TriggerMetadata, 203 wf *tangled.Pipeline_Workflow, 204) { 205 logger := p.log.With( 206 "knot", knot, 207 "pipeline_rkey", pipelineRkey, 208 "workflow", wf.Name, 209 "actor", actor, 210 ) 211 212 cfg, err := parseTektonWorkflowConfig(wf.Raw) 213 if err != nil { 214 logger.Error("invalid workflow config; refusing to spawn", "err", err) 215 return 216 } 217 commit, branch := triggerCommitAndBranch(trigger) 218 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch) 219 pr := buildTektonPipelineRun( 220 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf, 221 ) 222 223 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr) 224 if errors.Is(err, k8s.ErrAlreadyExists) { 225 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name) 226 } 227 if err != nil { 228 logger.Error("create tekton PipelineRun", "err", err, 229 "namespace", p.namespace, "pipeline_run", name, 230 "pipeline", cfg.Pipeline, 231 ) 232 return 233 } 234 235 ref := TektonRunRef{ 236 Knot: knot, 237 PipelineRkey: pipelineRkey, 238 Workflow: wf.Name, 239 Namespace: p.namespace, 240 PipelineRunName: name, 241 PipelineRunUID: created.GetUID(), 242 PipelineName: cfg.Pipeline, 243 PipelineURI: pipelineATURI(knot, pipelineRkey), 244 } 245 if err := p.st.InsertTektonRun(ctx, ref); err != nil { 246 logger.Error("persist tekton run mapping", "err", err, 247 "pipeline_run", name, 248 ) 249 return 250 } 251 252 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name, 253 "pending", name, nil, nil); err != nil { 254 logger.Error("publish initial pending status", "err", err) 255 } 256 257 logger.Info("tekton PipelineRun created", 258 "namespace", p.namespace, 259 "pipeline", cfg.Pipeline, 260 "pipeline_run", name, 261 "uid", ref.PipelineRunUID, 262 ) 263 go p.watchPipelineRun(ctx, ref) 264} 265 266func buildTektonPipelineRun( 267 namespace, name string, 268 cfg *tektonWorkflowConfig, 269 knot, pipelineRkey, actor, commit, branch string, 270 wf *tangled.Pipeline_Workflow, 271) k8s.Object { 272 obj := k8s.Object{ 273 "apiVersion": tektonAPIVersion, 274 "kind": tektonRunKind, 275 "metadata": map[string]any{ 276 "name": name, 277 "namespace": namespace, 278 "labels": map[string]any{ 279 tektonLabelManagedBy: "tack", 280 tektonLabelPipelineRkey: labelValue(pipelineRkey), 281 tektonLabelWorkflow: labelValue(wf.Name), 282 }, 283 "annotations": map[string]any{ 284 tektonAnnotationKnot: knot, 285 tektonAnnotationPipelineRkey: pipelineRkey, 286 tektonAnnotationWorkflow: wf.Name, 287 tektonAnnotationActor: actor, 288 tektonAnnotationCommit: commit, 289 tektonAnnotationBranch: branch, 290 }, 291 }, 292 "spec": map[string]any{ 293 "pipelineRef": map[string]any{ 294 "name": cfg.Pipeline, 295 }, 296 "params": []any{ 297 map[string]any{ 298 "name": "commit", 299 "value": commit, 300 }, 301 map[string]any{ 302 "name": "branch", 303 "value": branch, 304 }, 305 map[string]any{ 306 "name": "actor", 307 "value": actor, 308 }, 309 }, 310 }, 311 } 312 313 spec := obj["spec"].(map[string]any) 314 315 if len(cfg.Workspaces) != 0 { 316 spec["podTemplate"] = map[string]any{ 317 "securityContext": map[string]any{ 318 "fsGroup": 65532, 319 }, 320 } 321 322 workspaces := []any{} 323 324 for _, ws := range cfg.Workspaces { 325 switch { 326 case ws.Storage != nil: 327 workspaces = append(workspaces, map[string]any{ 328 "name": ws.Name, 329 "volumeClaimTemplate": map[string]any{ 330 "spec": map[string]any{ 331 "accessModes": ws.AccessModes, 332 "resources": map[string]any{ 333 "requests": map[string]any{ 334 "storage": *ws.Storage, 335 }, 336 }, 337 }, 338 }, 339 }) 340 341 case ws.PVC != nil: 342 workspaces = append(workspaces, map[string]any{ 343 "name": ws.Name, 344 "persistentVolumeClaim": map[string]any{ 345 "claimName": *ws.PVC, 346 }, 347 }) 348 349 case ws.Secret != nil: 350 workspaces = append(workspaces, map[string]any{ 351 "name": ws.Name, 352 "secret": map[string]any{ 353 "secretName": *ws.Secret, 354 }, 355 }) 356 357 case ws.ConfigMap != nil: 358 workspaces = append(workspaces, map[string]any{ 359 "name": ws.Name, 360 "configMap": map[string]any{ 361 "name": *ws.ConfigMap, 362 }, 363 }) 364 } 365 } 366 367 spec["workspaces"] = workspaces 368 } 369 370 if cfg.ServiceAccount != "" { 371 spec["taskRunTemplate"] = map[string]any{ 372 "serviceAccountName": cfg.ServiceAccount, 373 } 374 } 375 376 // Merge user-defined params with the built-in ones (commit, 377 // branch, actor). User params that collide with a built-in name 378 // win, so callers can override the defaults when the upstream 379 // Tekton Pipeline expects a different shape. 380 if len(cfg.Params) > 0 { 381 builtins := map[string]string{ 382 "commit": commit, 383 "branch": branch, 384 "actor": actor, 385 } 386 387 // Collect all param names, user params override built-ins. 388 merged := make(map[string]string, len(builtins)+len(cfg.Params)) 389 for k, v := range builtins { 390 merged[k] = v 391 } 392 for k, v := range cfg.Params { 393 merged[k] = v 394 } 395 396 keys := make([]string, 0, len(merged)) 397 for key := range merged { 398 keys = append(keys, key) 399 } 400 sort.Strings(keys) 401 402 params := make([]any, 0, len(keys)) 403 for _, key := range keys { 404 params = append(params, map[string]any{ 405 "name": key, 406 "value": merged[key], 407 }) 408 } 409 spec["params"] = params 410 } 411 return obj 412} 413 414func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) { 415 logger := p.log.With( 416 "knot", ref.Knot, 417 "pipeline_rkey", ref.PipelineRkey, 418 "workflow", ref.Workflow, 419 "namespace", ref.Namespace, 420 "pipeline_run", ref.PipelineRunName, 421 ) 422 423 logger.Debug("watchPipelineRun: starting") 424 425 last := "" 426 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 427 ref.PipelineRunName); err == nil { 428 status, terminal, ok := mapTektonPipelineRunStatus(obj) 429 logger.Debug("watchPipelineRun: initial status read", 430 "status", status, "terminal", terminal, "ok", ok, 431 ) 432 if ok { 433 last = status 434 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 435 status, ref.PipelineRunName, nil, nil); err != nil { 436 logger.Error("publish tekton status", "err", err, "status", status) 437 } 438 if terminal { 439 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status) 440 return 441 } 442 } 443 } else if errors.Is(err, k8s.ErrNotFound) { 444 logger.Warn("PipelineRun disappeared while watching") 445 return 446 } else { 447 logger.Debug("initial PipelineRun status read", "err", err) 448 } 449 450 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace, 451 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName}, 452 ) 453 if err != nil { 454 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err) 455 p.pollPipelineRun(ctx, ref, logger, last) 456 return 457 } 458 defer w.Stop() 459 460 logger.Debug("watchPipelineRun: watch established; entering event loop") 461 for { 462 select { 463 case <-ctx.Done(): 464 logger.Debug("watchPipelineRun: context cancelled") 465 return 466 case ev, ok := <-w.ResultChan(): 467 if !ok { 468 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling") 469 p.pollPipelineRun(ctx, ref, logger, last) 470 return 471 } 472 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object) 473 logger.Debug("watchPipelineRun: watch event", 474 "event_type", ev.Type, 475 "status", status, "terminal", terminal, "ok", ok, "last", last, 476 ) 477 if !ok || status == last { 478 if terminal { 479 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status) 480 return 481 } 482 continue 483 } 484 last = status 485 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 486 status, ref.PipelineRunName, nil, nil); err != nil { 487 logger.Error("publish tekton status", "err", err, "status", status) 488 continue 489 } 490 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal) 491 if terminal { 492 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status) 493 return 494 } 495 } 496 } 497} 498 499func (p *tektonProvider) pollPipelineRun( 500 ctx context.Context, 501 ref TektonRunRef, 502 logger *slog.Logger, 503 last string, 504) { 505 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s") 506 ticker := time.NewTicker(5 * time.Second) 507 defer ticker.Stop() 508 for { 509 select { 510 case <-ctx.Done(): 511 logger.Debug("pollPipelineRun: context cancelled") 512 return 513 case <-ticker.C: 514 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 515 ref.PipelineRunName, 516 ) 517 if errors.Is(err, k8s.ErrNotFound) { 518 logger.Warn("PipelineRun disappeared while watching") 519 return 520 } 521 if err != nil { 522 logger.Debug("get PipelineRun status", "err", err) 523 continue 524 } 525 status, terminal, ok := mapTektonPipelineRunStatus(obj) 526 logger.Debug("pollPipelineRun: poll tick", 527 "status", status, "terminal", terminal, "ok", ok, "last", last, 528 ) 529 if !ok || status == last { 530 if terminal { 531 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status) 532 return 533 } 534 continue 535 } 536 last = status 537 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 538 status, ref.PipelineRunName, nil, nil); err != nil { 539 logger.Error("publish tekton status", "err", err, "status", status) 540 continue 541 } 542 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal) 543 if terminal { 544 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status) 545 return 546 } 547 } 548 } 549} 550 551// mapTektonPipelineRunStatus translates Tekton's Succeeded condition 552// into the Tangled status strings consumed by the appview. 553func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) { 554 conditions, ok := obj.NestedSlice("status", "conditions") 555 if !ok || len(conditions) == 0 { 556 slog.Debug("mapTektonPipelineRunStatus: no conditions found", 557 "pipeline_run", obj.GetName(), 558 ) 559 return "", false, false 560 } 561 for _, raw := range conditions { 562 cond, _ := raw.(map[string]interface{}) 563 condType, _ := cond["type"].(string) 564 condStatus, _ := cond["status"].(string) 565 reason, _ := cond["reason"].(string) 566 message, _ := cond["message"].(string) 567 slog.Debug("mapTektonPipelineRunStatus: condition", 568 "pipeline_run", obj.GetName(), 569 "type", condType, 570 "status", condStatus, 571 "reason", reason, 572 "message", message, 573 ) 574 if condType != "Succeeded" { 575 continue 576 } 577 switch condStatus { 578 case "True": 579 return "success", true, true 580 case "False": 581 if tektonReasonCancelled(reason) { 582 return "cancelled", true, true 583 } 584 return "failed", true, true 585 case "Unknown": 586 return "running", false, true 587 default: 588 return "", false, false 589 } 590 } 591 return "", false, false 592} 593 594func tektonReasonCancelled(reason string) bool { 595 r := strings.ToLower(reason) 596 return strings.Contains(r, "cancel") || strings.Contains(r, "stop") 597} 598 599func (p *tektonProvider) Logs( 600 ctx context.Context, 601 knot string, 602 pipelineRkey string, 603 workflow string, 604) (<-chan LogLine, error) { 605 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow) 606 if err != nil { 607 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 608 } 609 if ref == nil { 610 // No mapping at all means this provider never spawned a 611 // PipelineRun for the requested tuple, so a 404 is the 612 // honest answer. 613 return nil, ErrLogsNotFound 614 } 615 616 // At this point the workflow *was* spawned — we have a row in the 617 // store mapping the tuple to a PipelineRun. The TaskRuns the 618 // PipelineRun fans out to are created asynchronously by Tekton 619 // once the run is admitted, so a freshly-spawned or still-queueing 620 // PipelineRun will momentarily report zero TaskRuns. Returning 621 // ErrLogsNotFound here would mistranslate that into a 404 at the 622 // HTTP layer (see provider.go contract: ErrLogsNotFound means the 623 // workflow never ran), making just-spawned runs look nonexistent 624 // to the appview. Instead, hand back an open channel and poll 625 // inside the goroutine until TaskRuns materialize, ctx is 626 // cancelled, or the PipelineRun reaches a terminal state with 627 // nothing to stream. 628 out := make(chan LogLine, 32) 629 go func() { 630 defer close(out) 631 p.streamPipelineRunLogs(ctx, out, *ref) 632 }() 633 return out, nil 634} 635 636// streamPipelineRunLogs drives the Logs channel for a single PipelineRun. 637// It polls for TaskRuns until the PipelineRun is terminal (or ctx is 638// cancelled), streaming each TaskRun's logs exactly once. 639// 640// The loop is deliberately a poll rather than a one-shot pass for two 641// reasons: 642// 643// 1. Live runs need follow semantics. Streaming a still-running TaskRun 644// uses Follow=true on the pod log stream, so streamTaskRunLogs only 645// returns once the container actually terminates. The outer loop 646// then re-checks for new TaskRuns and the PipelineRun's terminal 647// state, instead of closing the channel mid-run. 648// 649// 2. PipelineRuns can spawn additional TaskRuns over time (sequential 650// `runAfter` tasks, finally blocks, retries). A single snapshot 651// would silently drop any TaskRun that appears after the snapshot, 652// even though the workflow is still running. 653// 654// The loop terminates only when the PipelineRun is terminal AND the 655// most recent listing produced no new TaskRuns, which together mean no 656// further TaskRuns will ever appear. 657func (p *tektonProvider) streamPipelineRunLogs( 658 ctx context.Context, 659 out chan<- LogLine, 660 ref TektonRunRef, 661) { 662 const pollInterval = 1 * time.Second 663 seen := map[string]bool{} 664 stepID := 0 665 for { 666 if err := ctx.Err(); err != nil { 667 return 668 } 669 670 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 671 if err != nil { 672 p.log.Debug("Logs: list TaskRuns failed", 673 "err", err, "pipeline_run", ref.PipelineRunName, 674 ) 675 } 676 // Snapshot the terminal state *after* the listing so we never 677 // observe terminal=true while still missing a TaskRun that 678 // existed at list time. The reverse race (terminal observed 679 // before a TaskRun spawn) is handled by the next iteration: we 680 // only exit when terminal is true AND no new TaskRuns showed up 681 // in this pass. 682 terminal := p.isPipelineRunTerminal(ctx, ref) 683 684 var fresh []k8s.Object 685 for _, tr := range taskRuns { 686 name := tr.GetName() 687 if name == "" || seen[name] { 688 continue 689 } 690 seen[name] = true 691 fresh = append(fresh, tr) 692 } 693 p.log.Debug("Logs: poll iteration", 694 "pipeline_run", ref.PipelineRunName, 695 "task_runs_total", len(taskRuns), 696 "task_runs_new", len(fresh), 697 "terminal", terminal, 698 ) 699 700 for _, tr := range fresh { 701 taskName := tr.GetName() 702 if taskName == "" { 703 taskName = fmt.Sprintf("task %d", stepID) 704 } 705 p.log.Debug("Logs: streaming TaskRun", 706 "task_run", taskName, "step_id", stepID, "terminal", terminal, 707 ) 708 if !sendLine(ctx, out, LogLine{ 709 Kind: LogKindControl, 710 Time: time.Now(), 711 Content: taskName, 712 StepId: stepID, 713 StepStatus: StepStatusStart, 714 }) { 715 return 716 } 717 718 // terminal is the snapshot taken at the top of this 719 // iteration. If the pipeline was terminal then, all 720 // TaskRuns we see are guaranteed complete and we can 721 // take the snapshot fast-path; otherwise we follow the 722 // pod logs live so StepStatusEnd is only emitted once 723 // the container actually exits. 724 if terminal { 725 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID) 726 } else { 727 p.streamTaskRunLogs(ctx, out, ref, tr, stepID) 728 } 729 730 if !sendLine(ctx, out, LogLine{ 731 Kind: LogKindControl, 732 Time: time.Now(), 733 Content: taskName, 734 StepId: stepID, 735 StepStatus: StepStatusEnd, 736 }) { 737 return 738 } 739 p.log.Debug("Logs: finished TaskRun", 740 "task_run", taskName, "step_id", stepID, 741 ) 742 stepID++ 743 } 744 745 // Done condition: the PipelineRun is terminal AND we found no 746 // new TaskRuns this iteration. Both are required because a 747 // terminal PipelineRun can still expose a freshly-listed 748 // TaskRun whose pod we haven't drained yet. 749 if terminal && len(fresh) == 0 { 750 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns", 751 "pipeline_run", ref.PipelineRunName, 752 ) 753 return 754 } 755 756 // When we processed new TaskRuns this round, loop again 757 // immediately to re-list — Follow=true streaming may have 758 // blocked us long enough for additional TaskRuns or terminal 759 // transitions to have happened. 760 if len(fresh) > 0 { 761 continue 762 } 763 764 select { 765 case <-ctx.Done(): 766 return 767 case <-time.After(pollInterval): 768 } 769 } 770} 771 772// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now. 773func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool { 774 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 775 ref.PipelineRunName, 776 ) 777 if err != nil { 778 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName) 779 return false 780 } 781 _, terminal, ok := mapTektonPipelineRunStatus(obj) 782 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok) 783 return ok && terminal 784} 785 786// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed. 787// It reads each step container's logs in one shot using all-containers traversal, 788// and also inlines the Tekton step status from the TaskRun (exit code, reason) as 789// control messages so the caller gets full context without needing to watch for events. 790func (p *tektonProvider) fetchCompletedTaskRunLogs( 791 ctx context.Context, 792 out chan<- LogLine, 793 ref TektonRunRef, 794 tr k8s.Object, 795 stepID int, 796) { 797 trName := tr.GetName() 798 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName) 799 if err != nil { 800 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err, 801 "task_run", trName, "pipeline_run", ref.PipelineRunName, 802 ) 803 return 804 } 805 p.log.Debug("fetchCompletedTaskRunLogs: found pods", 806 "task_run", trName, "pod_count", len(pods), 807 ) 808 809 // Emit a summary line from the TaskRun status (steps[*].terminated) so we 810 // get exit codes and reasons even if the pod logs are sparse. 811 steps, _ := tr.NestedSlice("status", "steps") 812 for _, rawStep := range steps { 813 step, _ := rawStep.(map[string]any) 814 stepName, _ := step["name"].(string) 815 term, _ := step["terminated"].(map[string]any) 816 if term == nil { 817 continue 818 } 819 exitCode := numberToInt64(term["exitCode"]) 820 reason, _ := term["reason"].(string) 821 msg, _ := term["message"].(string) 822 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason) 823 if msg != "" { 824 line += " " + msg 825 } 826 p.log.Debug("fetchCompletedTaskRunLogs: step terminated", 827 "task_run", trName, "step", stepName, 828 "exit_code", exitCode, "reason", reason, 829 ) 830 if !sendLine(ctx, out, LogLine{ 831 Kind: LogKindData, 832 Time: time.Now(), 833 Content: line + "\n", 834 StepId: stepID, 835 Stream: "stdout", 836 }) { 837 return 838 } 839 } 840 841 for _, pod := range pods { 842 containers := append([]k8s.Container(nil), pod.InitContainers...) 843 containers = append(containers, pod.Containers...) 844 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers", 845 "pod", pod.Name, "container_count", len(containers), 846 ) 847 for _, c := range containers { 848 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs", 849 "pod", pod.Name, "container", c.Name, 850 ) 851 // Snapshot read (Follow=false) is correct here: the TaskRun 852 // has already terminated, so the API server has the full log 853 // available and there is nothing more to wait for. 854 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 855 k8s.LogOptions{Follow: false}, 856 ) 857 if err != nil { 858 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err, 859 "pod", pod.Name, "container", c.Name, 860 ) 861 continue 862 } 863 p.sendReaderLines(ctx, out, rc, stepID) 864 _ = rc.Close() 865 p.log.Debug("fetchCompletedTaskRunLogs: done reading container", 866 "pod", pod.Name, "container", c.Name, 867 ) 868 } 869 } 870} 871 872func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) { 873 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{ 874 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName, 875 }) 876 if err != nil { 877 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err) 878 } 879 items := append([]k8s.Object(nil), list...) 880 sort.Slice(items, func(i, j int) bool { 881 ti := items[i].GetCreationTimestamp() 882 tj := items[j].GetCreationTimestamp() 883 return ti.Before(tj) 884 }) 885 return items, nil 886} 887 888func (p *tektonProvider) streamTaskRunLogs( 889 ctx context.Context, 890 out chan<- LogLine, 891 ref TektonRunRef, 892 tr k8s.Object, 893 stepID int, 894) { 895 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName()) 896 if err != nil { 897 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err, 898 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName, 899 ) 900 return 901 } 902 p.log.Debug("streamTaskRunLogs: found pods", 903 "task_run", tr.GetName(), "pod_count", len(pods), 904 ) 905 for _, pod := range pods { 906 containers := append([]k8s.Container(nil), pod.InitContainers...) 907 containers = append(containers, pod.Containers...) 908 p.log.Debug("streamTaskRunLogs: streaming pod containers", 909 "pod", pod.Name, "container_count", len(containers), 910 ) 911 for _, c := range containers { 912 p.log.Debug("streamTaskRunLogs: streaming container", 913 "pod", pod.Name, "container", c.Name, "step_id", stepID, 914 ) 915 // Live tail with Follow=true: the apiserver holds the 916 // connection open until the container terminates (or ctx is 917 // cancelled), so sendReaderLines only returns once the 918 // container is actually done. Without this, the read EOFs at 919 // the current tail position and the caller emits a spurious 920 // StepStatusEnd while the step is still running. 921 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 922 k8s.LogOptions{Follow: true}, 923 ) 924 if err != nil { 925 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err, 926 "pod", pod.Name, "container", c.Name, 927 ) 928 continue 929 } 930 p.sendReaderLines(ctx, out, rc, stepID) 931 _ = rc.Close() 932 p.log.Debug("streamTaskRunLogs: finished container", 933 "pod", pod.Name, "container", c.Name, 934 ) 935 } 936 } 937} 938 939func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) { 940 list, err := p.client.ListPods(ctx, namespace, 941 "tekton.dev/taskRun="+taskRun, 942 ) 943 if err != nil { 944 return nil, fmt.Errorf("list pods: %w", err) 945 } 946 pods := append([]k8s.Pod(nil), list...) 947 sort.Slice(pods, func(i, j int) bool { 948 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp) 949 }) 950 return pods, nil 951} 952 953func (p *tektonProvider) sendReaderLines( 954 ctx context.Context, 955 out chan<- LogLine, 956 rc io.Reader, 957 stepID int, 958) { 959 scanner := bufio.NewScanner(rc) 960 for scanner.Scan() { 961 if !sendLine(ctx, out, LogLine{ 962 Kind: LogKindData, 963 Time: time.Now(), 964 Content: scanner.Text() + "\n", 965 StepId: stepID, 966 Stream: "stdout", 967 }) { 968 return 969 } 970 } 971 if err := scanner.Err(); err != nil { 972 p.log.Debug("scan pod log", "err", err) 973 } 974} 975 976func numberToInt64(value any) int64 { 977 switch v := value.(type) { 978 case int64: 979 return v 980 case int: 981 return int64(v) 982 case float64: 983 return int64(v) 984 case json.Number: 985 i, _ := v.Int64() 986 return i 987 default: 988 return 0 989 } 990} 991 992func (p *tektonProvider) publishStatus( 993 ctx context.Context, 994 pipelineURI, workflow, status, runName string, 995 errMsg *string, 996 exitCode *int64, 997) error { 998 rec := tangled.PipelineStatus{ 999 LexiconTypeID: tangled.PipelineStatusNSID, 1000 Pipeline: pipelineURI, 1001 Workflow: workflow, 1002 Status: status, 1003 CreatedAt: time.Now().UTC().Format(time.RFC3339), 1004 Error: errMsg, 1005 ExitCode: exitCode, 1006 } 1007 body, err := json.Marshal(rec) 1008 if err != nil { 1009 return fmt.Errorf("marshal pipeline.status: %w", err) 1010 } 1011 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano()) 1012 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 1013 return fmt.Errorf("publish pipeline.status: %w", err) 1014 } 1015 return nil 1016} 1017 1018func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string { 1019 h := sha256.Sum256([]byte(strings.Join( 1020 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00", 1021 ))) 1022 suffix := hex.EncodeToString(h[:])[:12] 1023 base := dnsLabel("tack-" + workflow) 1024 maxBase := 63 - len(suffix) - 1 1025 if len(base) > maxBase { 1026 base = strings.TrimRight(base[:maxBase], "-") 1027 } 1028 if base == "" { 1029 base = "tack" 1030 } 1031 return base + "-" + suffix 1032} 1033 1034func dnsLabel(s string) string { 1035 var b strings.Builder 1036 lastDash := false 1037 for _, r := range strings.ToLower(s) { 1038 ok := unicode.IsLetter(r) || unicode.IsDigit(r) 1039 if ok { 1040 b.WriteRune(r) 1041 lastDash = false 1042 continue 1043 } 1044 if !lastDash { 1045 b.WriteByte('-') 1046 lastDash = true 1047 } 1048 } 1049 return strings.Trim(b.String(), "-") 1050} 1051 1052func labelValue(s string) string { 1053 v := dnsLabel(s) 1054 if len(v) > 63 { 1055 v = strings.TrimRight(v[:63], "-") 1056 } 1057 if v == "" { 1058 return "unknown" 1059 } 1060 return v 1061}