Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// tektonProvider implements Provider by creating Tekton PipelineRuns 4// directly inside the Kubernetes cluster. Tack already receives and 5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would 6// duplicate the event-to-run translation layer instead of simplifying it. 7 8import ( 9 "bufio" 10 "context" 11 "crypto/sha256" 12 "encoding/hex" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "log/slog" 18 "sort" 19 "strings" 20 "time" 21 "unicode" 22 23 "tangled.org/core/api/tangled" 24 25 "go.mitchellh.com/tack/internal/k8s" 26 "go.yaml.in/yaml/v2" 27) 28 29const ( 30 tektonAPIVersion = "tekton.dev/v1" 31 tektonRunKind = "PipelineRun" 32 33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by" 34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 35 tektonLabelWorkflow = "tack.mitchellh.com/workflow" 36 37 tektonAnnotationKnot = "tack.mitchellh.com/knot" 38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow" 40 tektonAnnotationActor = "tack.mitchellh.com/actor" 41 tektonAnnotationCommit = "tack.mitchellh.com/commit" 42 tektonAnnotationBranch = "tack.mitchellh.com/branch" 43) 44 45var ( 46 pipelineRunsGVR = k8s.GVR{ 47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns", 48 } 49 taskRunsGVR = k8s.GVR{ 50 Group: "tekton.dev", Version: "v1", Resource: "taskruns", 51 } 52) 53 54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML. 55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are 56// deliberately string-only in v1: tack is meant to select an existing 57// runner and pass a small amount of routing data, not mirror Tekton's 58// entire PipelineRun API. 59type tektonWorkflowConfig struct { 60 Pipeline string `yaml:"pipeline"` 61 ServiceAccount string `yaml:"service_account"` 62 Params map[string]string `yaml:"params"` 63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"` 64} 65 66type tektonWorkspaceConfig struct { 67 Name string `yaml:"name"` 68 AccessModes []string `yaml:"access_modes"` 69 Storage *string `yaml:"storage"` 70 PVC *string `yaml:"pvc"` 71 Secret *string `yaml:"secret"` 72} 73 74type tektonWorkflowDoc struct { 75 Tack struct { 76 Tekton tektonWorkflowConfig `yaml:"tekton"` 77 } `yaml:"tack"` 78} 79 80// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body. 81func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) { 82 if strings.TrimSpace(raw) == "" { 83 return nil, errors.New("workflow body is empty") 84 } 85 var doc tektonWorkflowDoc 86 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 87 return nil, fmt.Errorf("parse workflow yaml: %w", err) 88 } 89 cfg := doc.Tack.Tekton 90 if cfg.Pipeline == "" { 91 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required") 92 } 93 return &cfg, nil 94} 95 96type tektonProvider struct { 97 br *broker 98 st *store 99 log *slog.Logger 100 client k8s.Client 101 namespace string 102} 103 104var _ Provider = (*tektonProvider)(nil) 105 106func newTektonProvider( 107 br *broker, 108 st *store, 109 client k8s.Client, 110 namespace string, 111 log *slog.Logger, 112) *tektonProvider { 113 return &tektonProvider{ 114 br: br, 115 st: st, 116 log: log.With("component", "provider", "kind", "tekton"), 117 client: client, 118 namespace: namespace, 119 } 120} 121 122func newInClusterTektonProvider( 123 br *broker, 124 st *store, 125 namespace string, 126 log *slog.Logger, 127) (*tektonProvider, error) { 128 client, err := k8s.NewInClusterClient() 129 if err != nil { 130 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err) 131 } 132 return newTektonProvider(br, st, client, namespace, log), nil 133} 134 135func (p *tektonProvider) Spawn( 136 ctx context.Context, 137 knot string, 138 pipelineRkey string, 139 actor string, 140 trigger *tangled.Pipeline_TriggerMetadata, 141 workflows []*tangled.Pipeline_Workflow, 142) { 143 if len(workflows) == 0 { 144 p.log.Warn("pipeline has no workflows; nothing to spawn", 145 "knot", knot, "rkey", pipelineRkey, 146 ) 147 return 148 } 149 for _, wf := range workflows { 150 if wf == nil || wf.Name == "" { 151 continue 152 } 153 wf := wf 154 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 155 } 156} 157 158func (p *tektonProvider) spawnWorkflow( 159 ctx context.Context, 160 knot string, 161 pipelineRkey string, 162 actor string, 163 trigger *tangled.Pipeline_TriggerMetadata, 164 wf *tangled.Pipeline_Workflow, 165) { 166 logger := p.log.With( 167 "knot", knot, 168 "pipeline_rkey", pipelineRkey, 169 "workflow", wf.Name, 170 "actor", actor, 171 ) 172 173 cfg, err := parseTektonWorkflowConfig(wf.Raw) 174 if err != nil { 175 logger.Error("invalid workflow config; refusing to spawn", "err", err) 176 return 177 } 178 commit, branch := triggerCommitAndBranch(trigger) 179 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch) 180 pr := buildTektonPipelineRun( 181 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf, 182 ) 183 184 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr) 185 if errors.Is(err, k8s.ErrAlreadyExists) { 186 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name) 187 } 188 if err != nil { 189 logger.Error("create tekton PipelineRun", "err", err, 190 "namespace", p.namespace, "pipeline_run", name, 191 "pipeline", cfg.Pipeline, 192 ) 193 return 194 } 195 196 ref := TektonRunRef{ 197 Knot: knot, 198 PipelineRkey: pipelineRkey, 199 Workflow: wf.Name, 200 Namespace: p.namespace, 201 PipelineRunName: name, 202 PipelineRunUID: created.GetUID(), 203 PipelineName: cfg.Pipeline, 204 PipelineURI: pipelineATURI(knot, pipelineRkey), 205 } 206 if err := p.st.InsertTektonRun(ctx, ref); err != nil { 207 logger.Error("persist tekton run mapping", "err", err, 208 "pipeline_run", name, 209 ) 210 return 211 } 212 213 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name, 214 "pending", name, nil, nil); err != nil { 215 logger.Error("publish initial pending status", "err", err) 216 } 217 218 logger.Info("tekton PipelineRun created", 219 "namespace", p.namespace, 220 "pipeline", cfg.Pipeline, 221 "pipeline_run", name, 222 "uid", ref.PipelineRunUID, 223 ) 224 go p.watchPipelineRun(ctx, ref) 225} 226 227func buildTektonPipelineRun( 228 namespace, name string, 229 cfg *tektonWorkflowConfig, 230 knot, pipelineRkey, actor, commit, branch string, 231 wf *tangled.Pipeline_Workflow, 232) k8s.Object { 233 obj := k8s.Object{ 234 "apiVersion": tektonAPIVersion, 235 "kind": tektonRunKind, 236 "metadata": map[string]any{ 237 "name": name, 238 "namespace": namespace, 239 "labels": map[string]any{ 240 tektonLabelManagedBy: "tack", 241 tektonLabelPipelineRkey: labelValue(pipelineRkey), 242 tektonLabelWorkflow: labelValue(wf.Name), 243 }, 244 "annotations": map[string]any{ 245 tektonAnnotationKnot: knot, 246 tektonAnnotationPipelineRkey: pipelineRkey, 247 tektonAnnotationWorkflow: wf.Name, 248 tektonAnnotationActor: actor, 249 tektonAnnotationCommit: commit, 250 tektonAnnotationBranch: branch, 251 }, 252 }, 253 "spec": map[string]any{ 254 "pipelineRef": map[string]any{ 255 "name": cfg.Pipeline, 256 }, 257 "params": []any{ 258 map[string]any{ 259 "name": "commit", 260 "value": commit, 261 }, 262 map[string]any{ 263 "name": "branch", 264 "value": branch, 265 }, 266 map[string]any{ 267 "name": "actor", 268 "value": actor, 269 }, 270 }, 271 }, 272 } 273 274 spec := obj["spec"].(map[string]any) 275 276 if len(cfg.Workspaces) != 0 { 277 spec["podTemplate"] = map[string]any{ 278 "securityContext": map[string]any{ 279 "fsGroup": 65532, 280 }, 281 } 282 283 workspaces := []any{} 284 285 for _, ws := range cfg.Workspaces { 286 switch { 287 case ws.Storage != nil: 288 workspaces = append(workspaces, map[string]any{ 289 "name": ws.Name, 290 "volumeClaimTemplate": map[string]any{ 291 "spec": map[string]any{ 292 "accessModes": ws.AccessModes, 293 "resources": map[string]any{ 294 "requests": map[string]any{ 295 "storage": *ws.Storage, 296 }, 297 }, 298 }, 299 }, 300 }) 301 302 case ws.PVC != nil: 303 workspaces = append(workspaces, map[string]any{ 304 "name": ws.Name, 305 "persistentVolumeClaim": map[string]any{ 306 "claimName": *ws.PVC, 307 }, 308 }) 309 310 case ws.Secret != nil: 311 workspaces = append(workspaces, map[string]any{ 312 "name": ws.Name, 313 "secret": map[string]any{ 314 "secretName": *ws.Secret, 315 }, 316 }) 317 } 318 } 319 320 spec["workspaces"] = workspaces 321 } 322 323 if cfg.ServiceAccount != "" { 324 spec["serviceAccountName"] = cfg.ServiceAccount 325 } 326 if len(cfg.Params) > 0 { 327 keys := make([]string, 0, len(cfg.Params)) 328 for key := range cfg.Params { 329 keys = append(keys, key) 330 } 331 sort.Strings(keys) 332 params := make([]any, 0, len(keys)) 333 for _, key := range keys { 334 params = append(params, map[string]any{ 335 "name": key, 336 "value": cfg.Params[key], 337 }) 338 } 339 spec["params"] = params 340 } 341 return obj 342} 343 344func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) { 345 logger := p.log.With( 346 "knot", ref.Knot, 347 "pipeline_rkey", ref.PipelineRkey, 348 "workflow", ref.Workflow, 349 "namespace", ref.Namespace, 350 "pipeline_run", ref.PipelineRunName, 351 ) 352 353 logger.Debug("watchPipelineRun: starting") 354 355 last := "" 356 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 357 ref.PipelineRunName); err == nil { 358 status, terminal, ok := mapTektonPipelineRunStatus(obj) 359 logger.Debug("watchPipelineRun: initial status read", 360 "status", status, "terminal", terminal, "ok", ok, 361 ) 362 if ok { 363 last = status 364 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 365 status, ref.PipelineRunName, nil, nil); err != nil { 366 logger.Error("publish tekton status", "err", err, "status", status) 367 } 368 if terminal { 369 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status) 370 return 371 } 372 } 373 } else if errors.Is(err, k8s.ErrNotFound) { 374 logger.Warn("PipelineRun disappeared while watching") 375 return 376 } else { 377 logger.Debug("initial PipelineRun status read", "err", err) 378 } 379 380 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace, 381 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName}, 382 ) 383 if err != nil { 384 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err) 385 p.pollPipelineRun(ctx, ref, logger, last) 386 return 387 } 388 defer w.Stop() 389 390 logger.Debug("watchPipelineRun: watch established; entering event loop") 391 for { 392 select { 393 case <-ctx.Done(): 394 logger.Debug("watchPipelineRun: context cancelled") 395 return 396 case ev, ok := <-w.ResultChan(): 397 if !ok { 398 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling") 399 p.pollPipelineRun(ctx, ref, logger, last) 400 return 401 } 402 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object) 403 logger.Debug("watchPipelineRun: watch event", 404 "event_type", ev.Type, 405 "status", status, "terminal", terminal, "ok", ok, "last", last, 406 ) 407 if !ok || status == last { 408 if terminal { 409 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status) 410 return 411 } 412 continue 413 } 414 last = status 415 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 416 status, ref.PipelineRunName, nil, nil); err != nil { 417 logger.Error("publish tekton status", "err", err, "status", status) 418 continue 419 } 420 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal) 421 if terminal { 422 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status) 423 return 424 } 425 } 426 } 427} 428 429func (p *tektonProvider) pollPipelineRun( 430 ctx context.Context, 431 ref TektonRunRef, 432 logger *slog.Logger, 433 last string, 434) { 435 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s") 436 ticker := time.NewTicker(5 * time.Second) 437 defer ticker.Stop() 438 for { 439 select { 440 case <-ctx.Done(): 441 logger.Debug("pollPipelineRun: context cancelled") 442 return 443 case <-ticker.C: 444 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 445 ref.PipelineRunName, 446 ) 447 if errors.Is(err, k8s.ErrNotFound) { 448 logger.Warn("PipelineRun disappeared while watching") 449 return 450 } 451 if err != nil { 452 logger.Debug("get PipelineRun status", "err", err) 453 continue 454 } 455 status, terminal, ok := mapTektonPipelineRunStatus(obj) 456 logger.Debug("pollPipelineRun: poll tick", 457 "status", status, "terminal", terminal, "ok", ok, "last", last, 458 ) 459 if !ok || status == last { 460 if terminal { 461 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status) 462 return 463 } 464 continue 465 } 466 last = status 467 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 468 status, ref.PipelineRunName, nil, nil); err != nil { 469 logger.Error("publish tekton status", "err", err, "status", status) 470 continue 471 } 472 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal) 473 if terminal { 474 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status) 475 return 476 } 477 } 478 } 479} 480 481// mapTektonPipelineRunStatus translates Tekton's Succeeded condition 482// into the Tangled status strings consumed by the appview. 483func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) { 484 conditions, ok := obj.NestedSlice("status", "conditions") 485 if !ok || len(conditions) == 0 { 486 slog.Debug("mapTektonPipelineRunStatus: no conditions found", 487 "pipeline_run", obj.GetName(), 488 ) 489 return "", false, false 490 } 491 for _, raw := range conditions { 492 cond, _ := raw.(map[string]interface{}) 493 condType, _ := cond["type"].(string) 494 condStatus, _ := cond["status"].(string) 495 reason, _ := cond["reason"].(string) 496 message, _ := cond["message"].(string) 497 slog.Debug("mapTektonPipelineRunStatus: condition", 498 "pipeline_run", obj.GetName(), 499 "type", condType, 500 "status", condStatus, 501 "reason", reason, 502 "message", message, 503 ) 504 if condType != "Succeeded" { 505 continue 506 } 507 switch condStatus { 508 case "True": 509 return "success", true, true 510 case "False": 511 if tektonReasonCancelled(reason) { 512 return "cancelled", true, true 513 } 514 return "failed", true, true 515 case "Unknown": 516 return "running", false, true 517 default: 518 return "", false, false 519 } 520 } 521 return "", false, false 522} 523 524func tektonReasonCancelled(reason string) bool { 525 r := strings.ToLower(reason) 526 return strings.Contains(r, "cancel") || strings.Contains(r, "stop") 527} 528 529func (p *tektonProvider) Logs( 530 ctx context.Context, 531 knot string, 532 pipelineRkey string, 533 workflow string, 534) (<-chan LogLine, error) { 535 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow) 536 if err != nil { 537 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 538 } 539 if ref == nil { 540 // No mapping at all means this provider never spawned a 541 // PipelineRun for the requested tuple, so a 404 is the 542 // honest answer. 543 return nil, ErrLogsNotFound 544 } 545 546 // At this point the workflow *was* spawned — we have a row in the 547 // store mapping the tuple to a PipelineRun. The TaskRuns the 548 // PipelineRun fans out to are created asynchronously by Tekton 549 // once the run is admitted, so a freshly-spawned or still-queueing 550 // PipelineRun will momentarily report zero TaskRuns. Returning 551 // ErrLogsNotFound here would mistranslate that into a 404 at the 552 // HTTP layer (see provider.go contract: ErrLogsNotFound means the 553 // workflow never ran), making just-spawned runs look nonexistent 554 // to the appview. Instead, hand back an open channel and poll 555 // inside the goroutine until TaskRuns materialize, ctx is 556 // cancelled, or the PipelineRun reaches a terminal state with 557 // nothing to stream. 558 out := make(chan LogLine, 32) 559 go func() { 560 defer close(out) 561 p.streamPipelineRunLogs(ctx, out, *ref) 562 }() 563 return out, nil 564} 565 566// streamPipelineRunLogs drives the Logs channel for a single PipelineRun. 567// It polls for TaskRuns until the PipelineRun is terminal (or ctx is 568// cancelled), streaming each TaskRun's logs exactly once. 569// 570// The loop is deliberately a poll rather than a one-shot pass for two 571// reasons: 572// 573// 1. Live runs need follow semantics. Streaming a still-running TaskRun 574// uses Follow=true on the pod log stream, so streamTaskRunLogs only 575// returns once the container actually terminates. The outer loop 576// then re-checks for new TaskRuns and the PipelineRun's terminal 577// state, instead of closing the channel mid-run. 578// 579// 2. PipelineRuns can spawn additional TaskRuns over time (sequential 580// `runAfter` tasks, finally blocks, retries). A single snapshot 581// would silently drop any TaskRun that appears after the snapshot, 582// even though the workflow is still running. 583// 584// The loop terminates only when the PipelineRun is terminal AND the 585// most recent listing produced no new TaskRuns, which together mean no 586// further TaskRuns will ever appear. 587func (p *tektonProvider) streamPipelineRunLogs( 588 ctx context.Context, 589 out chan<- LogLine, 590 ref TektonRunRef, 591) { 592 const pollInterval = 1 * time.Second 593 seen := map[string]bool{} 594 stepID := 0 595 for { 596 if err := ctx.Err(); err != nil { 597 return 598 } 599 600 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 601 if err != nil { 602 p.log.Debug("Logs: list TaskRuns failed", 603 "err", err, "pipeline_run", ref.PipelineRunName, 604 ) 605 } 606 // Snapshot the terminal state *after* the listing so we never 607 // observe terminal=true while still missing a TaskRun that 608 // existed at list time. The reverse race (terminal observed 609 // before a TaskRun spawn) is handled by the next iteration: we 610 // only exit when terminal is true AND no new TaskRuns showed up 611 // in this pass. 612 terminal := p.isPipelineRunTerminal(ctx, ref) 613 614 var fresh []k8s.Object 615 for _, tr := range taskRuns { 616 name := tr.GetName() 617 if name == "" || seen[name] { 618 continue 619 } 620 seen[name] = true 621 fresh = append(fresh, tr) 622 } 623 p.log.Debug("Logs: poll iteration", 624 "pipeline_run", ref.PipelineRunName, 625 "task_runs_total", len(taskRuns), 626 "task_runs_new", len(fresh), 627 "terminal", terminal, 628 ) 629 630 for _, tr := range fresh { 631 taskName := tr.GetName() 632 if taskName == "" { 633 taskName = fmt.Sprintf("task %d", stepID) 634 } 635 p.log.Debug("Logs: streaming TaskRun", 636 "task_run", taskName, "step_id", stepID, "terminal", terminal, 637 ) 638 if !sendLine(ctx, out, LogLine{ 639 Kind: LogKindControl, 640 Time: time.Now(), 641 Content: taskName, 642 StepId: stepID, 643 StepStatus: StepStatusStart, 644 }) { 645 return 646 } 647 648 // terminal is the snapshot taken at the top of this 649 // iteration. If the pipeline was terminal then, all 650 // TaskRuns we see are guaranteed complete and we can 651 // take the snapshot fast-path; otherwise we follow the 652 // pod logs live so StepStatusEnd is only emitted once 653 // the container actually exits. 654 if terminal { 655 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID) 656 } else { 657 p.streamTaskRunLogs(ctx, out, ref, tr, stepID) 658 } 659 660 if !sendLine(ctx, out, LogLine{ 661 Kind: LogKindControl, 662 Time: time.Now(), 663 Content: taskName, 664 StepId: stepID, 665 StepStatus: StepStatusEnd, 666 }) { 667 return 668 } 669 p.log.Debug("Logs: finished TaskRun", 670 "task_run", taskName, "step_id", stepID, 671 ) 672 stepID++ 673 } 674 675 // Done condition: the PipelineRun is terminal AND we found no 676 // new TaskRuns this iteration. Both are required because a 677 // terminal PipelineRun can still expose a freshly-listed 678 // TaskRun whose pod we haven't drained yet. 679 if terminal && len(fresh) == 0 { 680 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns", 681 "pipeline_run", ref.PipelineRunName, 682 ) 683 return 684 } 685 686 // When we processed new TaskRuns this round, loop again 687 // immediately to re-list — Follow=true streaming may have 688 // blocked us long enough for additional TaskRuns or terminal 689 // transitions to have happened. 690 if len(fresh) > 0 { 691 continue 692 } 693 694 select { 695 case <-ctx.Done(): 696 return 697 case <-time.After(pollInterval): 698 } 699 } 700} 701 702// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now. 703func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool { 704 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 705 ref.PipelineRunName, 706 ) 707 if err != nil { 708 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName) 709 return false 710 } 711 _, terminal, ok := mapTektonPipelineRunStatus(obj) 712 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok) 713 return ok && terminal 714} 715 716// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed. 717// It reads each step container's logs in one shot using all-containers traversal, 718// and also inlines the Tekton step status from the TaskRun (exit code, reason) as 719// control messages so the caller gets full context without needing to watch for events. 720func (p *tektonProvider) fetchCompletedTaskRunLogs( 721 ctx context.Context, 722 out chan<- LogLine, 723 ref TektonRunRef, 724 tr k8s.Object, 725 stepID int, 726) { 727 trName := tr.GetName() 728 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName) 729 if err != nil { 730 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err, 731 "task_run", trName, "pipeline_run", ref.PipelineRunName, 732 ) 733 return 734 } 735 p.log.Debug("fetchCompletedTaskRunLogs: found pods", 736 "task_run", trName, "pod_count", len(pods), 737 ) 738 739 // Emit a summary line from the TaskRun status (steps[*].terminated) so we 740 // get exit codes and reasons even if the pod logs are sparse. 741 steps, _ := tr.NestedSlice("status", "steps") 742 for _, rawStep := range steps { 743 step, _ := rawStep.(map[string]any) 744 stepName, _ := step["name"].(string) 745 term, _ := step["terminated"].(map[string]any) 746 if term == nil { 747 continue 748 } 749 exitCode := numberToInt64(term["exitCode"]) 750 reason, _ := term["reason"].(string) 751 msg, _ := term["message"].(string) 752 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason) 753 if msg != "" { 754 line += " " + msg 755 } 756 p.log.Debug("fetchCompletedTaskRunLogs: step terminated", 757 "task_run", trName, "step", stepName, 758 "exit_code", exitCode, "reason", reason, 759 ) 760 if !sendLine(ctx, out, LogLine{ 761 Kind: LogKindData, 762 Time: time.Now(), 763 Content: line + "\n", 764 StepId: stepID, 765 Stream: "stdout", 766 }) { 767 return 768 } 769 } 770 771 for _, pod := range pods { 772 containers := append([]k8s.Container(nil), pod.InitContainers...) 773 containers = append(containers, pod.Containers...) 774 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers", 775 "pod", pod.Name, "container_count", len(containers), 776 ) 777 for _, c := range containers { 778 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs", 779 "pod", pod.Name, "container", c.Name, 780 ) 781 // Snapshot read (Follow=false) is correct here: the TaskRun 782 // has already terminated, so the API server has the full log 783 // available and there is nothing more to wait for. 784 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 785 k8s.LogOptions{Follow: false}, 786 ) 787 if err != nil { 788 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err, 789 "pod", pod.Name, "container", c.Name, 790 ) 791 continue 792 } 793 p.sendReaderLines(ctx, out, rc, stepID) 794 _ = rc.Close() 795 p.log.Debug("fetchCompletedTaskRunLogs: done reading container", 796 "pod", pod.Name, "container", c.Name, 797 ) 798 } 799 } 800} 801 802func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) { 803 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{ 804 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName, 805 }) 806 if err != nil { 807 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err) 808 } 809 items := append([]k8s.Object(nil), list...) 810 sort.Slice(items, func(i, j int) bool { 811 ti := items[i].GetCreationTimestamp() 812 tj := items[j].GetCreationTimestamp() 813 return ti.Before(tj) 814 }) 815 return items, nil 816} 817 818func (p *tektonProvider) streamTaskRunLogs( 819 ctx context.Context, 820 out chan<- LogLine, 821 ref TektonRunRef, 822 tr k8s.Object, 823 stepID int, 824) { 825 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName()) 826 if err != nil { 827 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err, 828 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName, 829 ) 830 return 831 } 832 p.log.Debug("streamTaskRunLogs: found pods", 833 "task_run", tr.GetName(), "pod_count", len(pods), 834 ) 835 for _, pod := range pods { 836 containers := append([]k8s.Container(nil), pod.InitContainers...) 837 containers = append(containers, pod.Containers...) 838 p.log.Debug("streamTaskRunLogs: streaming pod containers", 839 "pod", pod.Name, "container_count", len(containers), 840 ) 841 for _, c := range containers { 842 p.log.Debug("streamTaskRunLogs: streaming container", 843 "pod", pod.Name, "container", c.Name, "step_id", stepID, 844 ) 845 // Live tail with Follow=true: the apiserver holds the 846 // connection open until the container terminates (or ctx is 847 // cancelled), so sendReaderLines only returns once the 848 // container is actually done. Without this, the read EOFs at 849 // the current tail position and the caller emits a spurious 850 // StepStatusEnd while the step is still running. 851 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 852 k8s.LogOptions{Follow: true}, 853 ) 854 if err != nil { 855 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err, 856 "pod", pod.Name, "container", c.Name, 857 ) 858 continue 859 } 860 p.sendReaderLines(ctx, out, rc, stepID) 861 _ = rc.Close() 862 p.log.Debug("streamTaskRunLogs: finished container", 863 "pod", pod.Name, "container", c.Name, 864 ) 865 } 866 } 867} 868 869func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) { 870 list, err := p.client.ListPods(ctx, namespace, 871 "tekton.dev/taskRun="+taskRun, 872 ) 873 if err != nil { 874 return nil, fmt.Errorf("list pods: %w", err) 875 } 876 pods := append([]k8s.Pod(nil), list...) 877 sort.Slice(pods, func(i, j int) bool { 878 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp) 879 }) 880 return pods, nil 881} 882 883func (p *tektonProvider) sendReaderLines( 884 ctx context.Context, 885 out chan<- LogLine, 886 rc io.Reader, 887 stepID int, 888) { 889 scanner := bufio.NewScanner(rc) 890 for scanner.Scan() { 891 if !sendLine(ctx, out, LogLine{ 892 Kind: LogKindData, 893 Time: time.Now(), 894 Content: scanner.Text() + "\n", 895 StepId: stepID, 896 Stream: "stdout", 897 }) { 898 return 899 } 900 } 901 if err := scanner.Err(); err != nil { 902 p.log.Debug("scan pod log", "err", err) 903 } 904} 905 906func numberToInt64(value any) int64 { 907 switch v := value.(type) { 908 case int64: 909 return v 910 case int: 911 return int64(v) 912 case float64: 913 return int64(v) 914 case json.Number: 915 i, _ := v.Int64() 916 return i 917 default: 918 return 0 919 } 920} 921 922func (p *tektonProvider) publishStatus( 923 ctx context.Context, 924 pipelineURI, workflow, status, runName string, 925 errMsg *string, 926 exitCode *int64, 927) error { 928 rec := tangled.PipelineStatus{ 929 LexiconTypeID: tangled.PipelineStatusNSID, 930 Pipeline: pipelineURI, 931 Workflow: workflow, 932 Status: status, 933 CreatedAt: time.Now().UTC().Format(time.RFC3339), 934 Error: errMsg, 935 ExitCode: exitCode, 936 } 937 body, err := json.Marshal(rec) 938 if err != nil { 939 return fmt.Errorf("marshal pipeline.status: %w", err) 940 } 941 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano()) 942 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 943 return fmt.Errorf("publish pipeline.status: %w", err) 944 } 945 return nil 946} 947 948func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string { 949 h := sha256.Sum256([]byte(strings.Join( 950 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00", 951 ))) 952 suffix := hex.EncodeToString(h[:])[:12] 953 base := dnsLabel("tack-" + workflow) 954 maxBase := 63 - len(suffix) - 1 955 if len(base) > maxBase { 956 base = strings.TrimRight(base[:maxBase], "-") 957 } 958 if base == "" { 959 base = "tack" 960 } 961 return base + "-" + suffix 962} 963 964func dnsLabel(s string) string { 965 var b strings.Builder 966 lastDash := false 967 for _, r := range strings.ToLower(s) { 968 ok := unicode.IsLetter(r) || unicode.IsDigit(r) 969 if ok { 970 b.WriteRune(r) 971 lastDash = false 972 continue 973 } 974 if !lastDash { 975 b.WriteByte('-') 976 lastDash = true 977 } 978 } 979 return strings.Trim(b.String(), "-") 980} 981 982func labelValue(s string) string { 983 v := dnsLabel(s) 984 if len(v) > 63 { 985 v = strings.TrimRight(v[:63], "-") 986 } 987 if v == "" { 988 return "unknown" 989 } 990 return v 991}