Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// tektonProvider implements Provider by creating Tekton PipelineRuns 4// directly inside the Kubernetes cluster. Tack already receives and 5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would 6// duplicate the event-to-run translation layer instead of simplifying it. 7 8import ( 9 "bufio" 10 "context" 11 "crypto/sha256" 12 "encoding/hex" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "log/slog" 18 "sort" 19 "strings" 20 "time" 21 "unicode" 22 23 "tangled.org/core/api/tangled" 24 25 "go.mitchellh.com/tack/internal/k8s" 26 "go.yaml.in/yaml/v2" 27) 28 29const ( 30 tektonAPIVersion = "tekton.dev/v1" 31 tektonRunKind = "PipelineRun" 32 33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by" 34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 35 tektonLabelWorkflow = "tack.mitchellh.com/workflow" 36 37 tektonAnnotationKnot = "tack.mitchellh.com/knot" 38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow" 40 tektonAnnotationActor = "tack.mitchellh.com/actor" 41 tektonAnnotationCommit = "tack.mitchellh.com/commit" 42 tektonAnnotationBranch = "tack.mitchellh.com/branch" 43) 44 45var ( 46 pipelineRunsGVR = k8s.GVR{ 47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns", 48 } 49 taskRunsGVR = k8s.GVR{ 50 Group: "tekton.dev", Version: "v1", Resource: "taskruns", 51 } 52) 53 54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML. 55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are 56// deliberately string-only in v1: tack is meant to select an existing 57// runner and pass a small amount of routing data, not mirror Tekton's 58// entire PipelineRun API. 59type tektonWorkflowConfig struct { 60 Pipeline string `yaml:"pipeline"` 61 ServiceAccount string `yaml:"service_account"` 62 Params map[string]string `yaml:"params"` 63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"` 64} 65 66type tektonWorkspaceConfig struct { 67 Name string `yaml:"name"` 68 AccessModes []string `yaml:"access_modes"` 69 Storage *string `yaml:"storage"` 70 PVC *string `yaml:"pvc"` 71 Secret *string `yaml:"secret"` 72 ConfigMap *string `yaml:"config_map"` 73} 74 75type tektonWorkflowDoc struct { 76 Tack struct { 77 Tekton tektonWorkflowConfig `yaml:"tekton"` 78 } `yaml:"tack"` 79} 80 81// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body. 82func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) { 83 if strings.TrimSpace(raw) == "" { 84 return nil, errors.New("workflow body is empty") 85 } 86 var doc tektonWorkflowDoc 87 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 88 return nil, fmt.Errorf("parse workflow yaml: %w", err) 89 } 90 cfg := doc.Tack.Tekton 91 if cfg.Pipeline == "" { 92 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required") 93 } 94 return &cfg, nil 95} 96 97type tektonProvider struct { 98 br *broker 99 st *store 100 log *slog.Logger 101 client k8s.Client 102 namespace string 103} 104 105var _ Provider = (*tektonProvider)(nil) 106 107func newTektonProvider( 108 br *broker, 109 st *store, 110 client k8s.Client, 111 namespace string, 112 log *slog.Logger, 113) *tektonProvider { 114 return &tektonProvider{ 115 br: br, 116 st: st, 117 log: log.With("component", "provider", "kind", "tekton"), 118 client: client, 119 namespace: namespace, 120 } 121} 122 123func newInClusterTektonProvider( 124 br *broker, 125 st *store, 126 namespace string, 127 log *slog.Logger, 128) (*tektonProvider, error) { 129 client, err := k8s.NewInClusterClient() 130 if err != nil { 131 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err) 132 } 133 return newTektonProvider(br, st, client, namespace, log), nil 134} 135 136func (p *tektonProvider) Spawn( 137 ctx context.Context, 138 knot string, 139 pipelineRkey string, 140 actor string, 141 trigger *tangled.Pipeline_TriggerMetadata, 142 workflows []*tangled.Pipeline_Workflow, 143) { 144 if len(workflows) == 0 { 145 p.log.Warn("pipeline has no workflows; nothing to spawn", 146 "knot", knot, "rkey", pipelineRkey, 147 ) 148 return 149 } 150 for _, wf := range workflows { 151 if wf == nil || wf.Name == "" { 152 continue 153 } 154 wf := wf 155 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 156 } 157} 158 159func (p *tektonProvider) spawnWorkflow( 160 ctx context.Context, 161 knot string, 162 pipelineRkey string, 163 actor string, 164 trigger *tangled.Pipeline_TriggerMetadata, 165 wf *tangled.Pipeline_Workflow, 166) { 167 logger := p.log.With( 168 "knot", knot, 169 "pipeline_rkey", pipelineRkey, 170 "workflow", wf.Name, 171 "actor", actor, 172 ) 173 174 cfg, err := parseTektonWorkflowConfig(wf.Raw) 175 if err != nil { 176 logger.Error("invalid workflow config; refusing to spawn", "err", err) 177 return 178 } 179 commit, branch := triggerCommitAndBranch(trigger) 180 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch) 181 pr := buildTektonPipelineRun( 182 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf, 183 ) 184 185 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr) 186 if errors.Is(err, k8s.ErrAlreadyExists) { 187 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name) 188 } 189 if err != nil { 190 logger.Error("create tekton PipelineRun", "err", err, 191 "namespace", p.namespace, "pipeline_run", name, 192 "pipeline", cfg.Pipeline, 193 ) 194 return 195 } 196 197 ref := TektonRunRef{ 198 Knot: knot, 199 PipelineRkey: pipelineRkey, 200 Workflow: wf.Name, 201 Namespace: p.namespace, 202 PipelineRunName: name, 203 PipelineRunUID: created.GetUID(), 204 PipelineName: cfg.Pipeline, 205 PipelineURI: pipelineATURI(knot, pipelineRkey), 206 } 207 if err := p.st.InsertTektonRun(ctx, ref); err != nil { 208 logger.Error("persist tekton run mapping", "err", err, 209 "pipeline_run", name, 210 ) 211 return 212 } 213 214 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name, 215 "pending", name, nil, nil); err != nil { 216 logger.Error("publish initial pending status", "err", err) 217 } 218 219 logger.Info("tekton PipelineRun created", 220 "namespace", p.namespace, 221 "pipeline", cfg.Pipeline, 222 "pipeline_run", name, 223 "uid", ref.PipelineRunUID, 224 ) 225 go p.watchPipelineRun(ctx, ref) 226} 227 228func buildTektonPipelineRun( 229 namespace, name string, 230 cfg *tektonWorkflowConfig, 231 knot, pipelineRkey, actor, commit, branch string, 232 wf *tangled.Pipeline_Workflow, 233) k8s.Object { 234 obj := k8s.Object{ 235 "apiVersion": tektonAPIVersion, 236 "kind": tektonRunKind, 237 "metadata": map[string]any{ 238 "name": name, 239 "namespace": namespace, 240 "labels": map[string]any{ 241 tektonLabelManagedBy: "tack", 242 tektonLabelPipelineRkey: labelValue(pipelineRkey), 243 tektonLabelWorkflow: labelValue(wf.Name), 244 }, 245 "annotations": map[string]any{ 246 tektonAnnotationKnot: knot, 247 tektonAnnotationPipelineRkey: pipelineRkey, 248 tektonAnnotationWorkflow: wf.Name, 249 tektonAnnotationActor: actor, 250 tektonAnnotationCommit: commit, 251 tektonAnnotationBranch: branch, 252 }, 253 }, 254 "spec": map[string]any{ 255 "pipelineRef": map[string]any{ 256 "name": cfg.Pipeline, 257 }, 258 "params": []any{ 259 map[string]any{ 260 "name": "commit", 261 "value": commit, 262 }, 263 map[string]any{ 264 "name": "branch", 265 "value": branch, 266 }, 267 map[string]any{ 268 "name": "actor", 269 "value": actor, 270 }, 271 }, 272 }, 273 } 274 275 spec := obj["spec"].(map[string]any) 276 277 if len(cfg.Workspaces) != 0 { 278 spec["podTemplate"] = map[string]any{ 279 "securityContext": map[string]any{ 280 "fsGroup": 65532, 281 }, 282 } 283 284 workspaces := []any{} 285 286 for _, ws := range cfg.Workspaces { 287 switch { 288 case ws.Storage != nil: 289 workspaces = append(workspaces, map[string]any{ 290 "name": ws.Name, 291 "volumeClaimTemplate": map[string]any{ 292 "spec": map[string]any{ 293 "accessModes": ws.AccessModes, 294 "resources": map[string]any{ 295 "requests": map[string]any{ 296 "storage": *ws.Storage, 297 }, 298 }, 299 }, 300 }, 301 }) 302 303 case ws.PVC != nil: 304 workspaces = append(workspaces, map[string]any{ 305 "name": ws.Name, 306 "persistentVolumeClaim": map[string]any{ 307 "claimName": *ws.PVC, 308 }, 309 }) 310 311 case ws.Secret != nil: 312 workspaces = append(workspaces, map[string]any{ 313 "name": ws.Name, 314 "secret": map[string]any{ 315 "secretName": *ws.Secret, 316 }, 317 }) 318 319 case ws.ConfigMap != nil: 320 workspaces = append(workspaces, map[string]any{ 321 "name": ws.Name, 322 "configMap": map[string]any{ 323 "name": *ws.ConfigMap, 324 }, 325 }) 326 } 327 } 328 329 spec["workspaces"] = workspaces 330 } 331 332 if cfg.ServiceAccount != "" { 333 spec["serviceAccountName"] = cfg.ServiceAccount 334 } 335 if len(cfg.Params) > 0 { 336 keys := make([]string, 0, len(cfg.Params)) 337 for key := range cfg.Params { 338 keys = append(keys, key) 339 } 340 sort.Strings(keys) 341 params := make([]any, 0, len(keys)) 342 for _, key := range keys { 343 params = append(params, map[string]any{ 344 "name": key, 345 "value": cfg.Params[key], 346 }) 347 } 348 spec["params"] = params 349 } 350 return obj 351} 352 353func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) { 354 logger := p.log.With( 355 "knot", ref.Knot, 356 "pipeline_rkey", ref.PipelineRkey, 357 "workflow", ref.Workflow, 358 "namespace", ref.Namespace, 359 "pipeline_run", ref.PipelineRunName, 360 ) 361 362 logger.Debug("watchPipelineRun: starting") 363 364 last := "" 365 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 366 ref.PipelineRunName); err == nil { 367 status, terminal, ok := mapTektonPipelineRunStatus(obj) 368 logger.Debug("watchPipelineRun: initial status read", 369 "status", status, "terminal", terminal, "ok", ok, 370 ) 371 if ok { 372 last = status 373 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 374 status, ref.PipelineRunName, nil, nil); err != nil { 375 logger.Error("publish tekton status", "err", err, "status", status) 376 } 377 if terminal { 378 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status) 379 return 380 } 381 } 382 } else if errors.Is(err, k8s.ErrNotFound) { 383 logger.Warn("PipelineRun disappeared while watching") 384 return 385 } else { 386 logger.Debug("initial PipelineRun status read", "err", err) 387 } 388 389 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace, 390 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName}, 391 ) 392 if err != nil { 393 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err) 394 p.pollPipelineRun(ctx, ref, logger, last) 395 return 396 } 397 defer w.Stop() 398 399 logger.Debug("watchPipelineRun: watch established; entering event loop") 400 for { 401 select { 402 case <-ctx.Done(): 403 logger.Debug("watchPipelineRun: context cancelled") 404 return 405 case ev, ok := <-w.ResultChan(): 406 if !ok { 407 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling") 408 p.pollPipelineRun(ctx, ref, logger, last) 409 return 410 } 411 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object) 412 logger.Debug("watchPipelineRun: watch event", 413 "event_type", ev.Type, 414 "status", status, "terminal", terminal, "ok", ok, "last", last, 415 ) 416 if !ok || status == last { 417 if terminal { 418 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status) 419 return 420 } 421 continue 422 } 423 last = status 424 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 425 status, ref.PipelineRunName, nil, nil); err != nil { 426 logger.Error("publish tekton status", "err", err, "status", status) 427 continue 428 } 429 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal) 430 if terminal { 431 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status) 432 return 433 } 434 } 435 } 436} 437 438func (p *tektonProvider) pollPipelineRun( 439 ctx context.Context, 440 ref TektonRunRef, 441 logger *slog.Logger, 442 last string, 443) { 444 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s") 445 ticker := time.NewTicker(5 * time.Second) 446 defer ticker.Stop() 447 for { 448 select { 449 case <-ctx.Done(): 450 logger.Debug("pollPipelineRun: context cancelled") 451 return 452 case <-ticker.C: 453 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 454 ref.PipelineRunName, 455 ) 456 if errors.Is(err, k8s.ErrNotFound) { 457 logger.Warn("PipelineRun disappeared while watching") 458 return 459 } 460 if err != nil { 461 logger.Debug("get PipelineRun status", "err", err) 462 continue 463 } 464 status, terminal, ok := mapTektonPipelineRunStatus(obj) 465 logger.Debug("pollPipelineRun: poll tick", 466 "status", status, "terminal", terminal, "ok", ok, "last", last, 467 ) 468 if !ok || status == last { 469 if terminal { 470 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status) 471 return 472 } 473 continue 474 } 475 last = status 476 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 477 status, ref.PipelineRunName, nil, nil); err != nil { 478 logger.Error("publish tekton status", "err", err, "status", status) 479 continue 480 } 481 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal) 482 if terminal { 483 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status) 484 return 485 } 486 } 487 } 488} 489 490// mapTektonPipelineRunStatus translates Tekton's Succeeded condition 491// into the Tangled status strings consumed by the appview. 492func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) { 493 conditions, ok := obj.NestedSlice("status", "conditions") 494 if !ok || len(conditions) == 0 { 495 slog.Debug("mapTektonPipelineRunStatus: no conditions found", 496 "pipeline_run", obj.GetName(), 497 ) 498 return "", false, false 499 } 500 for _, raw := range conditions { 501 cond, _ := raw.(map[string]interface{}) 502 condType, _ := cond["type"].(string) 503 condStatus, _ := cond["status"].(string) 504 reason, _ := cond["reason"].(string) 505 message, _ := cond["message"].(string) 506 slog.Debug("mapTektonPipelineRunStatus: condition", 507 "pipeline_run", obj.GetName(), 508 "type", condType, 509 "status", condStatus, 510 "reason", reason, 511 "message", message, 512 ) 513 if condType != "Succeeded" { 514 continue 515 } 516 switch condStatus { 517 case "True": 518 return "success", true, true 519 case "False": 520 if tektonReasonCancelled(reason) { 521 return "cancelled", true, true 522 } 523 return "failed", true, true 524 case "Unknown": 525 return "running", false, true 526 default: 527 return "", false, false 528 } 529 } 530 return "", false, false 531} 532 533func tektonReasonCancelled(reason string) bool { 534 r := strings.ToLower(reason) 535 return strings.Contains(r, "cancel") || strings.Contains(r, "stop") 536} 537 538func (p *tektonProvider) Logs( 539 ctx context.Context, 540 knot string, 541 pipelineRkey string, 542 workflow string, 543) (<-chan LogLine, error) { 544 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow) 545 if err != nil { 546 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 547 } 548 if ref == nil { 549 // No mapping at all means this provider never spawned a 550 // PipelineRun for the requested tuple, so a 404 is the 551 // honest answer. 552 return nil, ErrLogsNotFound 553 } 554 555 // At this point the workflow *was* spawned — we have a row in the 556 // store mapping the tuple to a PipelineRun. The TaskRuns the 557 // PipelineRun fans out to are created asynchronously by Tekton 558 // once the run is admitted, so a freshly-spawned or still-queueing 559 // PipelineRun will momentarily report zero TaskRuns. Returning 560 // ErrLogsNotFound here would mistranslate that into a 404 at the 561 // HTTP layer (see provider.go contract: ErrLogsNotFound means the 562 // workflow never ran), making just-spawned runs look nonexistent 563 // to the appview. Instead, hand back an open channel and poll 564 // inside the goroutine until TaskRuns materialize, ctx is 565 // cancelled, or the PipelineRun reaches a terminal state with 566 // nothing to stream. 567 out := make(chan LogLine, 32) 568 go func() { 569 defer close(out) 570 p.streamPipelineRunLogs(ctx, out, *ref) 571 }() 572 return out, nil 573} 574 575// streamPipelineRunLogs drives the Logs channel for a single PipelineRun. 576// It polls for TaskRuns until the PipelineRun is terminal (or ctx is 577// cancelled), streaming each TaskRun's logs exactly once. 578// 579// The loop is deliberately a poll rather than a one-shot pass for two 580// reasons: 581// 582// 1. Live runs need follow semantics. Streaming a still-running TaskRun 583// uses Follow=true on the pod log stream, so streamTaskRunLogs only 584// returns once the container actually terminates. The outer loop 585// then re-checks for new TaskRuns and the PipelineRun's terminal 586// state, instead of closing the channel mid-run. 587// 588// 2. PipelineRuns can spawn additional TaskRuns over time (sequential 589// `runAfter` tasks, finally blocks, retries). A single snapshot 590// would silently drop any TaskRun that appears after the snapshot, 591// even though the workflow is still running. 592// 593// The loop terminates only when the PipelineRun is terminal AND the 594// most recent listing produced no new TaskRuns, which together mean no 595// further TaskRuns will ever appear. 596func (p *tektonProvider) streamPipelineRunLogs( 597 ctx context.Context, 598 out chan<- LogLine, 599 ref TektonRunRef, 600) { 601 const pollInterval = 1 * time.Second 602 seen := map[string]bool{} 603 stepID := 0 604 for { 605 if err := ctx.Err(); err != nil { 606 return 607 } 608 609 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 610 if err != nil { 611 p.log.Debug("Logs: list TaskRuns failed", 612 "err", err, "pipeline_run", ref.PipelineRunName, 613 ) 614 } 615 // Snapshot the terminal state *after* the listing so we never 616 // observe terminal=true while still missing a TaskRun that 617 // existed at list time. The reverse race (terminal observed 618 // before a TaskRun spawn) is handled by the next iteration: we 619 // only exit when terminal is true AND no new TaskRuns showed up 620 // in this pass. 621 terminal := p.isPipelineRunTerminal(ctx, ref) 622 623 var fresh []k8s.Object 624 for _, tr := range taskRuns { 625 name := tr.GetName() 626 if name == "" || seen[name] { 627 continue 628 } 629 seen[name] = true 630 fresh = append(fresh, tr) 631 } 632 p.log.Debug("Logs: poll iteration", 633 "pipeline_run", ref.PipelineRunName, 634 "task_runs_total", len(taskRuns), 635 "task_runs_new", len(fresh), 636 "terminal", terminal, 637 ) 638 639 for _, tr := range fresh { 640 taskName := tr.GetName() 641 if taskName == "" { 642 taskName = fmt.Sprintf("task %d", stepID) 643 } 644 p.log.Debug("Logs: streaming TaskRun", 645 "task_run", taskName, "step_id", stepID, "terminal", terminal, 646 ) 647 if !sendLine(ctx, out, LogLine{ 648 Kind: LogKindControl, 649 Time: time.Now(), 650 Content: taskName, 651 StepId: stepID, 652 StepStatus: StepStatusStart, 653 }) { 654 return 655 } 656 657 // terminal is the snapshot taken at the top of this 658 // iteration. If the pipeline was terminal then, all 659 // TaskRuns we see are guaranteed complete and we can 660 // take the snapshot fast-path; otherwise we follow the 661 // pod logs live so StepStatusEnd is only emitted once 662 // the container actually exits. 663 if terminal { 664 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID) 665 } else { 666 p.streamTaskRunLogs(ctx, out, ref, tr, stepID) 667 } 668 669 if !sendLine(ctx, out, LogLine{ 670 Kind: LogKindControl, 671 Time: time.Now(), 672 Content: taskName, 673 StepId: stepID, 674 StepStatus: StepStatusEnd, 675 }) { 676 return 677 } 678 p.log.Debug("Logs: finished TaskRun", 679 "task_run", taskName, "step_id", stepID, 680 ) 681 stepID++ 682 } 683 684 // Done condition: the PipelineRun is terminal AND we found no 685 // new TaskRuns this iteration. Both are required because a 686 // terminal PipelineRun can still expose a freshly-listed 687 // TaskRun whose pod we haven't drained yet. 688 if terminal && len(fresh) == 0 { 689 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns", 690 "pipeline_run", ref.PipelineRunName, 691 ) 692 return 693 } 694 695 // When we processed new TaskRuns this round, loop again 696 // immediately to re-list — Follow=true streaming may have 697 // blocked us long enough for additional TaskRuns or terminal 698 // transitions to have happened. 699 if len(fresh) > 0 { 700 continue 701 } 702 703 select { 704 case <-ctx.Done(): 705 return 706 case <-time.After(pollInterval): 707 } 708 } 709} 710 711// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now. 712func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool { 713 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 714 ref.PipelineRunName, 715 ) 716 if err != nil { 717 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName) 718 return false 719 } 720 _, terminal, ok := mapTektonPipelineRunStatus(obj) 721 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok) 722 return ok && terminal 723} 724 725// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed. 726// It reads each step container's logs in one shot using all-containers traversal, 727// and also inlines the Tekton step status from the TaskRun (exit code, reason) as 728// control messages so the caller gets full context without needing to watch for events. 729func (p *tektonProvider) fetchCompletedTaskRunLogs( 730 ctx context.Context, 731 out chan<- LogLine, 732 ref TektonRunRef, 733 tr k8s.Object, 734 stepID int, 735) { 736 trName := tr.GetName() 737 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName) 738 if err != nil { 739 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err, 740 "task_run", trName, "pipeline_run", ref.PipelineRunName, 741 ) 742 return 743 } 744 p.log.Debug("fetchCompletedTaskRunLogs: found pods", 745 "task_run", trName, "pod_count", len(pods), 746 ) 747 748 // Emit a summary line from the TaskRun status (steps[*].terminated) so we 749 // get exit codes and reasons even if the pod logs are sparse. 750 steps, _ := tr.NestedSlice("status", "steps") 751 for _, rawStep := range steps { 752 step, _ := rawStep.(map[string]any) 753 stepName, _ := step["name"].(string) 754 term, _ := step["terminated"].(map[string]any) 755 if term == nil { 756 continue 757 } 758 exitCode := numberToInt64(term["exitCode"]) 759 reason, _ := term["reason"].(string) 760 msg, _ := term["message"].(string) 761 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason) 762 if msg != "" { 763 line += " " + msg 764 } 765 p.log.Debug("fetchCompletedTaskRunLogs: step terminated", 766 "task_run", trName, "step", stepName, 767 "exit_code", exitCode, "reason", reason, 768 ) 769 if !sendLine(ctx, out, LogLine{ 770 Kind: LogKindData, 771 Time: time.Now(), 772 Content: line + "\n", 773 StepId: stepID, 774 Stream: "stdout", 775 }) { 776 return 777 } 778 } 779 780 for _, pod := range pods { 781 containers := append([]k8s.Container(nil), pod.InitContainers...) 782 containers = append(containers, pod.Containers...) 783 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers", 784 "pod", pod.Name, "container_count", len(containers), 785 ) 786 for _, c := range containers { 787 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs", 788 "pod", pod.Name, "container", c.Name, 789 ) 790 // Snapshot read (Follow=false) is correct here: the TaskRun 791 // has already terminated, so the API server has the full log 792 // available and there is nothing more to wait for. 793 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 794 k8s.LogOptions{Follow: false}, 795 ) 796 if err != nil { 797 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err, 798 "pod", pod.Name, "container", c.Name, 799 ) 800 continue 801 } 802 p.sendReaderLines(ctx, out, rc, stepID) 803 _ = rc.Close() 804 p.log.Debug("fetchCompletedTaskRunLogs: done reading container", 805 "pod", pod.Name, "container", c.Name, 806 ) 807 } 808 } 809} 810 811func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) { 812 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{ 813 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName, 814 }) 815 if err != nil { 816 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err) 817 } 818 items := append([]k8s.Object(nil), list...) 819 sort.Slice(items, func(i, j int) bool { 820 ti := items[i].GetCreationTimestamp() 821 tj := items[j].GetCreationTimestamp() 822 return ti.Before(tj) 823 }) 824 return items, nil 825} 826 827func (p *tektonProvider) streamTaskRunLogs( 828 ctx context.Context, 829 out chan<- LogLine, 830 ref TektonRunRef, 831 tr k8s.Object, 832 stepID int, 833) { 834 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName()) 835 if err != nil { 836 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err, 837 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName, 838 ) 839 return 840 } 841 p.log.Debug("streamTaskRunLogs: found pods", 842 "task_run", tr.GetName(), "pod_count", len(pods), 843 ) 844 for _, pod := range pods { 845 containers := append([]k8s.Container(nil), pod.InitContainers...) 846 containers = append(containers, pod.Containers...) 847 p.log.Debug("streamTaskRunLogs: streaming pod containers", 848 "pod", pod.Name, "container_count", len(containers), 849 ) 850 for _, c := range containers { 851 p.log.Debug("streamTaskRunLogs: streaming container", 852 "pod", pod.Name, "container", c.Name, "step_id", stepID, 853 ) 854 // Live tail with Follow=true: the apiserver holds the 855 // connection open until the container terminates (or ctx is 856 // cancelled), so sendReaderLines only returns once the 857 // container is actually done. Without this, the read EOFs at 858 // the current tail position and the caller emits a spurious 859 // StepStatusEnd while the step is still running. 860 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 861 k8s.LogOptions{Follow: true}, 862 ) 863 if err != nil { 864 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err, 865 "pod", pod.Name, "container", c.Name, 866 ) 867 continue 868 } 869 p.sendReaderLines(ctx, out, rc, stepID) 870 _ = rc.Close() 871 p.log.Debug("streamTaskRunLogs: finished container", 872 "pod", pod.Name, "container", c.Name, 873 ) 874 } 875 } 876} 877 878func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) { 879 list, err := p.client.ListPods(ctx, namespace, 880 "tekton.dev/taskRun="+taskRun, 881 ) 882 if err != nil { 883 return nil, fmt.Errorf("list pods: %w", err) 884 } 885 pods := append([]k8s.Pod(nil), list...) 886 sort.Slice(pods, func(i, j int) bool { 887 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp) 888 }) 889 return pods, nil 890} 891 892func (p *tektonProvider) sendReaderLines( 893 ctx context.Context, 894 out chan<- LogLine, 895 rc io.Reader, 896 stepID int, 897) { 898 scanner := bufio.NewScanner(rc) 899 for scanner.Scan() { 900 if !sendLine(ctx, out, LogLine{ 901 Kind: LogKindData, 902 Time: time.Now(), 903 Content: scanner.Text() + "\n", 904 StepId: stepID, 905 Stream: "stdout", 906 }) { 907 return 908 } 909 } 910 if err := scanner.Err(); err != nil { 911 p.log.Debug("scan pod log", "err", err) 912 } 913} 914 915func numberToInt64(value any) int64 { 916 switch v := value.(type) { 917 case int64: 918 return v 919 case int: 920 return int64(v) 921 case float64: 922 return int64(v) 923 case json.Number: 924 i, _ := v.Int64() 925 return i 926 default: 927 return 0 928 } 929} 930 931func (p *tektonProvider) publishStatus( 932 ctx context.Context, 933 pipelineURI, workflow, status, runName string, 934 errMsg *string, 935 exitCode *int64, 936) error { 937 rec := tangled.PipelineStatus{ 938 LexiconTypeID: tangled.PipelineStatusNSID, 939 Pipeline: pipelineURI, 940 Workflow: workflow, 941 Status: status, 942 CreatedAt: time.Now().UTC().Format(time.RFC3339), 943 Error: errMsg, 944 ExitCode: exitCode, 945 } 946 body, err := json.Marshal(rec) 947 if err != nil { 948 return fmt.Errorf("marshal pipeline.status: %w", err) 949 } 950 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano()) 951 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 952 return fmt.Errorf("publish pipeline.status: %w", err) 953 } 954 return nil 955} 956 957func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string { 958 h := sha256.Sum256([]byte(strings.Join( 959 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00", 960 ))) 961 suffix := hex.EncodeToString(h[:])[:12] 962 base := dnsLabel("tack-" + workflow) 963 maxBase := 63 - len(suffix) - 1 964 if len(base) > maxBase { 965 base = strings.TrimRight(base[:maxBase], "-") 966 } 967 if base == "" { 968 base = "tack" 969 } 970 return base + "-" + suffix 971} 972 973func dnsLabel(s string) string { 974 var b strings.Builder 975 lastDash := false 976 for _, r := range strings.ToLower(s) { 977 ok := unicode.IsLetter(r) || unicode.IsDigit(r) 978 if ok { 979 b.WriteRune(r) 980 lastDash = false 981 continue 982 } 983 if !lastDash { 984 b.WriteByte('-') 985 lastDash = true 986 } 987 } 988 return strings.Trim(b.String(), "-") 989} 990 991func labelValue(s string) string { 992 v := dnsLabel(s) 993 if len(v) > 63 { 994 v = strings.TrimRight(v[:63], "-") 995 } 996 if v == "" { 997 return "unknown" 998 } 999 return v 1000}