Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// tektonProvider implements Provider by creating Tekton PipelineRuns 4// directly inside the Kubernetes cluster. Tack already receives and 5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would 6// duplicate the event-to-run translation layer instead of simplifying it. 7 8import ( 9 "bufio" 10 "context" 11 "crypto/sha256" 12 "encoding/hex" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "log/slog" 18 "sort" 19 "strings" 20 "time" 21 "unicode" 22 23 "tangled.org/core/api/tangled" 24 25 "go.mitchellh.com/tack/internal/k8s" 26 "go.yaml.in/yaml/v2" 27) 28 29const ( 30 tektonAPIVersion = "tekton.dev/v1" 31 tektonRunKind = "PipelineRun" 32 33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by" 34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 35 tektonLabelWorkflow = "tack.mitchellh.com/workflow" 36 37 tektonAnnotationKnot = "tack.mitchellh.com/knot" 38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow" 40 tektonAnnotationActor = "tack.mitchellh.com/actor" 41 tektonAnnotationCommit = "tack.mitchellh.com/commit" 42 tektonAnnotationBranch = "tack.mitchellh.com/branch" 43) 44 45var ( 46 pipelineRunsGVR = k8s.GVR{ 47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns", 48 } 49 taskRunsGVR = k8s.GVR{ 50 Group: "tekton.dev", Version: "v1", Resource: "taskruns", 51 } 52) 53 54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML. 55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are 56// deliberately string-only in v1: tack is meant to select an existing 57// runner and pass a small amount of routing data, not mirror Tekton's 58// entire PipelineRun API. 59type tektonWorkflowConfig struct { 60 Pipeline string `yaml:"pipeline"` 61 ServiceAccount string `yaml:"service_account"` 62 Params map[string]string `yaml:"params"` 63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"` 64} 65 66type tektonWorkspaceConfig struct { 67 Name string `yaml:"name"` 68 AccessModes []string `yaml:"access_modes"` 69 Storage *string `yaml:"storage"` 70 PVC *string `yaml:"pvc"` 71} 72 73type tektonWorkflowDoc struct { 74 Tack struct { 75 Tekton tektonWorkflowConfig `yaml:"tekton"` 76 } `yaml:"tack"` 77} 78 79// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body. 80func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) { 81 if strings.TrimSpace(raw) == "" { 82 return nil, errors.New("workflow body is empty") 83 } 84 var doc tektonWorkflowDoc 85 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 86 return nil, fmt.Errorf("parse workflow yaml: %w", err) 87 } 88 cfg := doc.Tack.Tekton 89 if cfg.Pipeline == "" { 90 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required") 91 } 92 return &cfg, nil 93} 94 95type tektonProvider struct { 96 br *broker 97 st *store 98 log *slog.Logger 99 client k8s.Client 100 namespace string 101} 102 103var _ Provider = (*tektonProvider)(nil) 104 105func newTektonProvider( 106 br *broker, 107 st *store, 108 client k8s.Client, 109 namespace string, 110 log *slog.Logger, 111) *tektonProvider { 112 return &tektonProvider{ 113 br: br, 114 st: st, 115 log: log.With("component", "provider", "kind", "tekton"), 116 client: client, 117 namespace: namespace, 118 } 119} 120 121func newInClusterTektonProvider( 122 br *broker, 123 st *store, 124 namespace string, 125 log *slog.Logger, 126) (*tektonProvider, error) { 127 client, err := k8s.NewInClusterClient() 128 if err != nil { 129 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err) 130 } 131 return newTektonProvider(br, st, client, namespace, log), nil 132} 133 134func (p *tektonProvider) Spawn( 135 ctx context.Context, 136 knot string, 137 pipelineRkey string, 138 actor string, 139 trigger *tangled.Pipeline_TriggerMetadata, 140 workflows []*tangled.Pipeline_Workflow, 141) { 142 if len(workflows) == 0 { 143 p.log.Warn("pipeline has no workflows; nothing to spawn", 144 "knot", knot, "rkey", pipelineRkey, 145 ) 146 return 147 } 148 for _, wf := range workflows { 149 if wf == nil || wf.Name == "" { 150 continue 151 } 152 wf := wf 153 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 154 } 155} 156 157func (p *tektonProvider) spawnWorkflow( 158 ctx context.Context, 159 knot string, 160 pipelineRkey string, 161 actor string, 162 trigger *tangled.Pipeline_TriggerMetadata, 163 wf *tangled.Pipeline_Workflow, 164) { 165 logger := p.log.With( 166 "knot", knot, 167 "pipeline_rkey", pipelineRkey, 168 "workflow", wf.Name, 169 "actor", actor, 170 ) 171 172 cfg, err := parseTektonWorkflowConfig(wf.Raw) 173 if err != nil { 174 logger.Error("invalid workflow config; refusing to spawn", "err", err) 175 return 176 } 177 commit, branch := triggerCommitAndBranch(trigger) 178 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch) 179 pr := buildTektonPipelineRun( 180 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf, 181 ) 182 183 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr) 184 if errors.Is(err, k8s.ErrAlreadyExists) { 185 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name) 186 } 187 if err != nil { 188 logger.Error("create tekton PipelineRun", "err", err, 189 "namespace", p.namespace, "pipeline_run", name, 190 "pipeline", cfg.Pipeline, 191 ) 192 return 193 } 194 195 ref := TektonRunRef{ 196 Knot: knot, 197 PipelineRkey: pipelineRkey, 198 Workflow: wf.Name, 199 Namespace: p.namespace, 200 PipelineRunName: name, 201 PipelineRunUID: created.GetUID(), 202 PipelineName: cfg.Pipeline, 203 PipelineURI: pipelineATURI(knot, pipelineRkey), 204 } 205 if err := p.st.InsertTektonRun(ctx, ref); err != nil { 206 logger.Error("persist tekton run mapping", "err", err, 207 "pipeline_run", name, 208 ) 209 return 210 } 211 212 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name, 213 "pending", name, nil, nil); err != nil { 214 logger.Error("publish initial pending status", "err", err) 215 } 216 217 logger.Info("tekton PipelineRun created", 218 "namespace", p.namespace, 219 "pipeline", cfg.Pipeline, 220 "pipeline_run", name, 221 "uid", ref.PipelineRunUID, 222 ) 223 go p.watchPipelineRun(ctx, ref) 224} 225 226func buildTektonPipelineRun( 227 namespace, name string, 228 cfg *tektonWorkflowConfig, 229 knot, pipelineRkey, actor, commit, branch string, 230 wf *tangled.Pipeline_Workflow, 231) k8s.Object { 232 obj := k8s.Object{ 233 "apiVersion": tektonAPIVersion, 234 "kind": tektonRunKind, 235 "metadata": map[string]any{ 236 "name": name, 237 "namespace": namespace, 238 "labels": map[string]any{ 239 tektonLabelManagedBy: "tack", 240 tektonLabelPipelineRkey: labelValue(pipelineRkey), 241 tektonLabelWorkflow: labelValue(wf.Name), 242 }, 243 "annotations": map[string]any{ 244 tektonAnnotationKnot: knot, 245 tektonAnnotationPipelineRkey: pipelineRkey, 246 tektonAnnotationWorkflow: wf.Name, 247 tektonAnnotationActor: actor, 248 tektonAnnotationCommit: commit, 249 tektonAnnotationBranch: branch, 250 }, 251 }, 252 "spec": map[string]any{ 253 "pipelineRef": map[string]any{ 254 "name": cfg.Pipeline, 255 }, 256 "params": []any{ 257 map[string]any{ 258 "name": "commit", 259 "value": commit, 260 }, 261 map[string]any{ 262 "name": "branch", 263 "value": branch, 264 }, 265 map[string]any{ 266 "name": "actor", 267 "value": actor, 268 }, 269 }, 270 }, 271 } 272 273 spec := obj["spec"].(map[string]any) 274 275 if len(cfg.Workspaces) != 0 { 276 spec["podTemplate"] = map[string]any{ 277 "securityContext": map[string]any{ 278 "fsGroup": 65532, 279 }, 280 } 281 282 workspaces := []any{} 283 284 for _, ws := range cfg.Workspaces { 285 switch { 286 case ws.Storage != nil: 287 workspaces = append(workspaces, map[string]any{ 288 "name": ws.Name, 289 "volumeClaimTemplate": map[string]any{ 290 "spec": map[string]any{ 291 "accessModes": ws.AccessModes, 292 "resources": map[string]any{ 293 "requests": map[string]any{ 294 "storage": *ws.Storage, 295 }, 296 }, 297 }, 298 }, 299 }) 300 301 case ws.PVC != nil: 302 workspaces = append(workspaces, map[string]any{ 303 "name": ws.Name, 304 "persistentVolumeClaim": map[string]any{ 305 "claimName": *ws.PVC, 306 }, 307 }) 308 } 309 } 310 311 spec["workspaces"] = workspaces 312 } 313 314 if cfg.ServiceAccount != "" { 315 spec["serviceAccountName"] = cfg.ServiceAccount 316 } 317 if len(cfg.Params) > 0 { 318 keys := make([]string, 0, len(cfg.Params)) 319 for key := range cfg.Params { 320 keys = append(keys, key) 321 } 322 sort.Strings(keys) 323 params := make([]any, 0, len(keys)) 324 for _, key := range keys { 325 params = append(params, map[string]any{ 326 "name": key, 327 "value": cfg.Params[key], 328 }) 329 } 330 spec["params"] = params 331 } 332 return obj 333} 334 335func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) { 336 logger := p.log.With( 337 "knot", ref.Knot, 338 "pipeline_rkey", ref.PipelineRkey, 339 "workflow", ref.Workflow, 340 "namespace", ref.Namespace, 341 "pipeline_run", ref.PipelineRunName, 342 ) 343 344 logger.Debug("watchPipelineRun: starting") 345 346 last := "" 347 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 348 ref.PipelineRunName); err == nil { 349 status, terminal, ok := mapTektonPipelineRunStatus(obj) 350 logger.Debug("watchPipelineRun: initial status read", 351 "status", status, "terminal", terminal, "ok", ok, 352 ) 353 if ok { 354 last = status 355 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 356 status, ref.PipelineRunName, nil, nil); err != nil { 357 logger.Error("publish tekton status", "err", err, "status", status) 358 } 359 if terminal { 360 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status) 361 return 362 } 363 } 364 } else if errors.Is(err, k8s.ErrNotFound) { 365 logger.Warn("PipelineRun disappeared while watching") 366 return 367 } else { 368 logger.Debug("initial PipelineRun status read", "err", err) 369 } 370 371 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace, 372 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName}, 373 ) 374 if err != nil { 375 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err) 376 p.pollPipelineRun(ctx, ref, logger, last) 377 return 378 } 379 defer w.Stop() 380 381 logger.Debug("watchPipelineRun: watch established; entering event loop") 382 for { 383 select { 384 case <-ctx.Done(): 385 logger.Debug("watchPipelineRun: context cancelled") 386 return 387 case ev, ok := <-w.ResultChan(): 388 if !ok { 389 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling") 390 p.pollPipelineRun(ctx, ref, logger, last) 391 return 392 } 393 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object) 394 logger.Debug("watchPipelineRun: watch event", 395 "event_type", ev.Type, 396 "status", status, "terminal", terminal, "ok", ok, "last", last, 397 ) 398 if !ok || status == last { 399 if terminal { 400 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status) 401 return 402 } 403 continue 404 } 405 last = status 406 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 407 status, ref.PipelineRunName, nil, nil); err != nil { 408 logger.Error("publish tekton status", "err", err, "status", status) 409 continue 410 } 411 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal) 412 if terminal { 413 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status) 414 return 415 } 416 } 417 } 418} 419 420func (p *tektonProvider) pollPipelineRun( 421 ctx context.Context, 422 ref TektonRunRef, 423 logger *slog.Logger, 424 last string, 425) { 426 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s") 427 ticker := time.NewTicker(5 * time.Second) 428 defer ticker.Stop() 429 for { 430 select { 431 case <-ctx.Done(): 432 logger.Debug("pollPipelineRun: context cancelled") 433 return 434 case <-ticker.C: 435 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 436 ref.PipelineRunName, 437 ) 438 if errors.Is(err, k8s.ErrNotFound) { 439 logger.Warn("PipelineRun disappeared while watching") 440 return 441 } 442 if err != nil { 443 logger.Debug("get PipelineRun status", "err", err) 444 continue 445 } 446 status, terminal, ok := mapTektonPipelineRunStatus(obj) 447 logger.Debug("pollPipelineRun: poll tick", 448 "status", status, "terminal", terminal, "ok", ok, "last", last, 449 ) 450 if !ok || status == last { 451 if terminal { 452 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status) 453 return 454 } 455 continue 456 } 457 last = status 458 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 459 status, ref.PipelineRunName, nil, nil); err != nil { 460 logger.Error("publish tekton status", "err", err, "status", status) 461 continue 462 } 463 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal) 464 if terminal { 465 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status) 466 return 467 } 468 } 469 } 470} 471 472// mapTektonPipelineRunStatus translates Tekton's Succeeded condition 473// into the Tangled status strings consumed by the appview. 474func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) { 475 conditions, ok := obj.NestedSlice("status", "conditions") 476 if !ok || len(conditions) == 0 { 477 slog.Debug("mapTektonPipelineRunStatus: no conditions found", 478 "pipeline_run", obj.GetName(), 479 ) 480 return "", false, false 481 } 482 for _, raw := range conditions { 483 cond, _ := raw.(map[string]interface{}) 484 condType, _ := cond["type"].(string) 485 condStatus, _ := cond["status"].(string) 486 reason, _ := cond["reason"].(string) 487 message, _ := cond["message"].(string) 488 slog.Debug("mapTektonPipelineRunStatus: condition", 489 "pipeline_run", obj.GetName(), 490 "type", condType, 491 "status", condStatus, 492 "reason", reason, 493 "message", message, 494 ) 495 if condType != "Succeeded" { 496 continue 497 } 498 switch condStatus { 499 case "True": 500 return "success", true, true 501 case "False": 502 if tektonReasonCancelled(reason) { 503 return "cancelled", true, true 504 } 505 return "failed", true, true 506 case "Unknown": 507 return "running", false, true 508 default: 509 return "", false, false 510 } 511 } 512 return "", false, false 513} 514 515func tektonReasonCancelled(reason string) bool { 516 r := strings.ToLower(reason) 517 return strings.Contains(r, "cancel") || strings.Contains(r, "stop") 518} 519 520func (p *tektonProvider) Logs( 521 ctx context.Context, 522 knot string, 523 pipelineRkey string, 524 workflow string, 525) (<-chan LogLine, error) { 526 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow) 527 if err != nil { 528 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 529 } 530 if ref == nil { 531 // No mapping at all means this provider never spawned a 532 // PipelineRun for the requested tuple, so a 404 is the 533 // honest answer. 534 return nil, ErrLogsNotFound 535 } 536 537 // At this point the workflow *was* spawned — we have a row in the 538 // store mapping the tuple to a PipelineRun. The TaskRuns the 539 // PipelineRun fans out to are created asynchronously by Tekton 540 // once the run is admitted, so a freshly-spawned or still-queueing 541 // PipelineRun will momentarily report zero TaskRuns. Returning 542 // ErrLogsNotFound here would mistranslate that into a 404 at the 543 // HTTP layer (see provider.go contract: ErrLogsNotFound means the 544 // workflow never ran), making just-spawned runs look nonexistent 545 // to the appview. Instead, hand back an open channel and poll 546 // inside the goroutine until TaskRuns materialize, ctx is 547 // cancelled, or the PipelineRun reaches a terminal state with 548 // nothing to stream. 549 out := make(chan LogLine, 32) 550 go func() { 551 defer close(out) 552 p.streamPipelineRunLogs(ctx, out, *ref) 553 }() 554 return out, nil 555} 556 557// streamPipelineRunLogs drives the Logs channel for a single PipelineRun. 558// It polls for TaskRuns until the PipelineRun is terminal (or ctx is 559// cancelled), streaming each TaskRun's logs exactly once. 560// 561// The loop is deliberately a poll rather than a one-shot pass for two 562// reasons: 563// 564// 1. Live runs need follow semantics. Streaming a still-running TaskRun 565// uses Follow=true on the pod log stream, so streamTaskRunLogs only 566// returns once the container actually terminates. The outer loop 567// then re-checks for new TaskRuns and the PipelineRun's terminal 568// state, instead of closing the channel mid-run. 569// 570// 2. PipelineRuns can spawn additional TaskRuns over time (sequential 571// `runAfter` tasks, finally blocks, retries). A single snapshot 572// would silently drop any TaskRun that appears after the snapshot, 573// even though the workflow is still running. 574// 575// The loop terminates only when the PipelineRun is terminal AND the 576// most recent listing produced no new TaskRuns, which together mean no 577// further TaskRuns will ever appear. 578func (p *tektonProvider) streamPipelineRunLogs( 579 ctx context.Context, 580 out chan<- LogLine, 581 ref TektonRunRef, 582) { 583 const pollInterval = 1 * time.Second 584 seen := map[string]bool{} 585 stepID := 0 586 for { 587 if err := ctx.Err(); err != nil { 588 return 589 } 590 591 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 592 if err != nil { 593 p.log.Debug("Logs: list TaskRuns failed", 594 "err", err, "pipeline_run", ref.PipelineRunName, 595 ) 596 } 597 // Snapshot the terminal state *after* the listing so we never 598 // observe terminal=true while still missing a TaskRun that 599 // existed at list time. The reverse race (terminal observed 600 // before a TaskRun spawn) is handled by the next iteration: we 601 // only exit when terminal is true AND no new TaskRuns showed up 602 // in this pass. 603 terminal := p.isPipelineRunTerminal(ctx, ref) 604 605 var fresh []k8s.Object 606 for _, tr := range taskRuns { 607 name := tr.GetName() 608 if name == "" || seen[name] { 609 continue 610 } 611 seen[name] = true 612 fresh = append(fresh, tr) 613 } 614 p.log.Debug("Logs: poll iteration", 615 "pipeline_run", ref.PipelineRunName, 616 "task_runs_total", len(taskRuns), 617 "task_runs_new", len(fresh), 618 "terminal", terminal, 619 ) 620 621 for _, tr := range fresh { 622 taskName := tr.GetName() 623 if taskName == "" { 624 taskName = fmt.Sprintf("task %d", stepID) 625 } 626 p.log.Debug("Logs: streaming TaskRun", 627 "task_run", taskName, "step_id", stepID, "terminal", terminal, 628 ) 629 if !sendLine(ctx, out, LogLine{ 630 Kind: LogKindControl, 631 Time: time.Now(), 632 Content: taskName, 633 StepId: stepID, 634 StepStatus: StepStatusStart, 635 }) { 636 return 637 } 638 639 // terminal is the snapshot taken at the top of this 640 // iteration. If the pipeline was terminal then, all 641 // TaskRuns we see are guaranteed complete and we can 642 // take the snapshot fast-path; otherwise we follow the 643 // pod logs live so StepStatusEnd is only emitted once 644 // the container actually exits. 645 if terminal { 646 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID) 647 } else { 648 p.streamTaskRunLogs(ctx, out, ref, tr, stepID) 649 } 650 651 if !sendLine(ctx, out, LogLine{ 652 Kind: LogKindControl, 653 Time: time.Now(), 654 Content: taskName, 655 StepId: stepID, 656 StepStatus: StepStatusEnd, 657 }) { 658 return 659 } 660 p.log.Debug("Logs: finished TaskRun", 661 "task_run", taskName, "step_id", stepID, 662 ) 663 stepID++ 664 } 665 666 // Done condition: the PipelineRun is terminal AND we found no 667 // new TaskRuns this iteration. Both are required because a 668 // terminal PipelineRun can still expose a freshly-listed 669 // TaskRun whose pod we haven't drained yet. 670 if terminal && len(fresh) == 0 { 671 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns", 672 "pipeline_run", ref.PipelineRunName, 673 ) 674 return 675 } 676 677 // When we processed new TaskRuns this round, loop again 678 // immediately to re-list — Follow=true streaming may have 679 // blocked us long enough for additional TaskRuns or terminal 680 // transitions to have happened. 681 if len(fresh) > 0 { 682 continue 683 } 684 685 select { 686 case <-ctx.Done(): 687 return 688 case <-time.After(pollInterval): 689 } 690 } 691} 692 693// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now. 694func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool { 695 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 696 ref.PipelineRunName, 697 ) 698 if err != nil { 699 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName) 700 return false 701 } 702 _, terminal, ok := mapTektonPipelineRunStatus(obj) 703 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok) 704 return ok && terminal 705} 706 707// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed. 708// It reads each step container's logs in one shot using all-containers traversal, 709// and also inlines the Tekton step status from the TaskRun (exit code, reason) as 710// control messages so the caller gets full context without needing to watch for events. 711func (p *tektonProvider) fetchCompletedTaskRunLogs( 712 ctx context.Context, 713 out chan<- LogLine, 714 ref TektonRunRef, 715 tr k8s.Object, 716 stepID int, 717) { 718 trName := tr.GetName() 719 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName) 720 if err != nil { 721 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err, 722 "task_run", trName, "pipeline_run", ref.PipelineRunName, 723 ) 724 return 725 } 726 p.log.Debug("fetchCompletedTaskRunLogs: found pods", 727 "task_run", trName, "pod_count", len(pods), 728 ) 729 730 // Emit a summary line from the TaskRun status (steps[*].terminated) so we 731 // get exit codes and reasons even if the pod logs are sparse. 732 steps, _ := tr.NestedSlice("status", "steps") 733 for _, rawStep := range steps { 734 step, _ := rawStep.(map[string]any) 735 stepName, _ := step["name"].(string) 736 term, _ := step["terminated"].(map[string]any) 737 if term == nil { 738 continue 739 } 740 exitCode := numberToInt64(term["exitCode"]) 741 reason, _ := term["reason"].(string) 742 msg, _ := term["message"].(string) 743 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason) 744 if msg != "" { 745 line += " " + msg 746 } 747 p.log.Debug("fetchCompletedTaskRunLogs: step terminated", 748 "task_run", trName, "step", stepName, 749 "exit_code", exitCode, "reason", reason, 750 ) 751 if !sendLine(ctx, out, LogLine{ 752 Kind: LogKindData, 753 Time: time.Now(), 754 Content: line + "\n", 755 StepId: stepID, 756 Stream: "stdout", 757 }) { 758 return 759 } 760 } 761 762 for _, pod := range pods { 763 containers := append([]k8s.Container(nil), pod.InitContainers...) 764 containers = append(containers, pod.Containers...) 765 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers", 766 "pod", pod.Name, "container_count", len(containers), 767 ) 768 for _, c := range containers { 769 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs", 770 "pod", pod.Name, "container", c.Name, 771 ) 772 // Snapshot read (Follow=false) is correct here: the TaskRun 773 // has already terminated, so the API server has the full log 774 // available and there is nothing more to wait for. 775 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 776 k8s.LogOptions{Follow: false}, 777 ) 778 if err != nil { 779 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err, 780 "pod", pod.Name, "container", c.Name, 781 ) 782 continue 783 } 784 p.sendReaderLines(ctx, out, rc, stepID) 785 _ = rc.Close() 786 p.log.Debug("fetchCompletedTaskRunLogs: done reading container", 787 "pod", pod.Name, "container", c.Name, 788 ) 789 } 790 } 791} 792 793func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) { 794 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{ 795 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName, 796 }) 797 if err != nil { 798 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err) 799 } 800 items := append([]k8s.Object(nil), list...) 801 sort.Slice(items, func(i, j int) bool { 802 ti := items[i].GetCreationTimestamp() 803 tj := items[j].GetCreationTimestamp() 804 return ti.Before(tj) 805 }) 806 return items, nil 807} 808 809func (p *tektonProvider) streamTaskRunLogs( 810 ctx context.Context, 811 out chan<- LogLine, 812 ref TektonRunRef, 813 tr k8s.Object, 814 stepID int, 815) { 816 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName()) 817 if err != nil { 818 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err, 819 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName, 820 ) 821 return 822 } 823 p.log.Debug("streamTaskRunLogs: found pods", 824 "task_run", tr.GetName(), "pod_count", len(pods), 825 ) 826 for _, pod := range pods { 827 containers := append([]k8s.Container(nil), pod.InitContainers...) 828 containers = append(containers, pod.Containers...) 829 p.log.Debug("streamTaskRunLogs: streaming pod containers", 830 "pod", pod.Name, "container_count", len(containers), 831 ) 832 for _, c := range containers { 833 p.log.Debug("streamTaskRunLogs: streaming container", 834 "pod", pod.Name, "container", c.Name, "step_id", stepID, 835 ) 836 // Live tail with Follow=true: the apiserver holds the 837 // connection open until the container terminates (or ctx is 838 // cancelled), so sendReaderLines only returns once the 839 // container is actually done. Without this, the read EOFs at 840 // the current tail position and the caller emits a spurious 841 // StepStatusEnd while the step is still running. 842 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 843 k8s.LogOptions{Follow: true}, 844 ) 845 if err != nil { 846 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err, 847 "pod", pod.Name, "container", c.Name, 848 ) 849 continue 850 } 851 p.sendReaderLines(ctx, out, rc, stepID) 852 _ = rc.Close() 853 p.log.Debug("streamTaskRunLogs: finished container", 854 "pod", pod.Name, "container", c.Name, 855 ) 856 } 857 } 858} 859 860func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) { 861 list, err := p.client.ListPods(ctx, namespace, 862 "tekton.dev/taskRun="+taskRun, 863 ) 864 if err != nil { 865 return nil, fmt.Errorf("list pods: %w", err) 866 } 867 pods := append([]k8s.Pod(nil), list...) 868 sort.Slice(pods, func(i, j int) bool { 869 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp) 870 }) 871 return pods, nil 872} 873 874func (p *tektonProvider) sendReaderLines( 875 ctx context.Context, 876 out chan<- LogLine, 877 rc io.Reader, 878 stepID int, 879) { 880 scanner := bufio.NewScanner(rc) 881 for scanner.Scan() { 882 if !sendLine(ctx, out, LogLine{ 883 Kind: LogKindData, 884 Time: time.Now(), 885 Content: scanner.Text() + "\n", 886 StepId: stepID, 887 Stream: "stdout", 888 }) { 889 return 890 } 891 } 892 if err := scanner.Err(); err != nil { 893 p.log.Debug("scan pod log", "err", err) 894 } 895} 896 897func numberToInt64(value any) int64 { 898 switch v := value.(type) { 899 case int64: 900 return v 901 case int: 902 return int64(v) 903 case float64: 904 return int64(v) 905 case json.Number: 906 i, _ := v.Int64() 907 return i 908 default: 909 return 0 910 } 911} 912 913func (p *tektonProvider) publishStatus( 914 ctx context.Context, 915 pipelineURI, workflow, status, runName string, 916 errMsg *string, 917 exitCode *int64, 918) error { 919 rec := tangled.PipelineStatus{ 920 LexiconTypeID: tangled.PipelineStatusNSID, 921 Pipeline: pipelineURI, 922 Workflow: workflow, 923 Status: status, 924 CreatedAt: time.Now().UTC().Format(time.RFC3339), 925 Error: errMsg, 926 ExitCode: exitCode, 927 } 928 body, err := json.Marshal(rec) 929 if err != nil { 930 return fmt.Errorf("marshal pipeline.status: %w", err) 931 } 932 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano()) 933 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 934 return fmt.Errorf("publish pipeline.status: %w", err) 935 } 936 return nil 937} 938 939func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string { 940 h := sha256.Sum256([]byte(strings.Join( 941 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00", 942 ))) 943 suffix := hex.EncodeToString(h[:])[:12] 944 base := dnsLabel("tack-" + workflow) 945 maxBase := 63 - len(suffix) - 1 946 if len(base) > maxBase { 947 base = strings.TrimRight(base[:maxBase], "-") 948 } 949 if base == "" { 950 base = "tack" 951 } 952 return base + "-" + suffix 953} 954 955func dnsLabel(s string) string { 956 var b strings.Builder 957 lastDash := false 958 for _, r := range strings.ToLower(s) { 959 ok := unicode.IsLetter(r) || unicode.IsDigit(r) 960 if ok { 961 b.WriteRune(r) 962 lastDash = false 963 continue 964 } 965 if !lastDash { 966 b.WriteByte('-') 967 lastDash = true 968 } 969 } 970 return strings.Trim(b.String(), "-") 971} 972 973func labelValue(s string) string { 974 v := dnsLabel(s) 975 if len(v) > 63 { 976 v = strings.TrimRight(v[:63], "-") 977 } 978 if v == "" { 979 return "unknown" 980 } 981 return v 982}