Stitch any CI into Tangled
1package main
2
3// tektonProvider implements Provider by creating Tekton PipelineRuns
4// directly inside the Kubernetes cluster. Tack already receives and
5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would
6// duplicate the event-to-run translation layer instead of simplifying it.
7
8import (
9 "bufio"
10 "context"
11 "crypto/sha256"
12 "encoding/hex"
13 "encoding/json"
14 "errors"
15 "fmt"
16 "io"
17 "log/slog"
18 "sort"
19 "strings"
20 "time"
21 "unicode"
22
23 "tangled.org/core/api/tangled"
24
25 "go.mitchellh.com/tack/internal/k8s"
26 "go.yaml.in/yaml/v2"
27)
28
29const (
30 tektonAPIVersion = "tekton.dev/v1"
31 tektonRunKind = "PipelineRun"
32
33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by"
34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
35 tektonLabelWorkflow = "tack.mitchellh.com/workflow"
36
37 tektonAnnotationKnot = "tack.mitchellh.com/knot"
38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow"
40 tektonAnnotationActor = "tack.mitchellh.com/actor"
41 tektonAnnotationCommit = "tack.mitchellh.com/commit"
42 tektonAnnotationBranch = "tack.mitchellh.com/branch"
43)
44
45var (
46 pipelineRunsGVR = k8s.GVR{
47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns",
48 }
49 taskRunsGVR = k8s.GVR{
50 Group: "tekton.dev", Version: "v1", Resource: "taskruns",
51 }
52)
53
54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML.
55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are
56// deliberately string-only in v1: tack is meant to select an existing
57// runner and pass a small amount of routing data, not mirror Tekton's
58// entire PipelineRun API.
59type tektonWorkflowConfig struct {
60 Pipeline string `yaml:"pipeline"`
61 ServiceAccount string `yaml:"service_account"`
62 Params map[string]string `yaml:"params"`
63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"`
64}
65
66type tektonWorkspaceConfig struct {
67 Name string `yaml:"name"`
68 AccessModes []string `yaml:"access_modes"`
69 Storage *string `yaml:"storage"`
70 PVC *string `yaml:"pvc"`
71 Secret *string `yaml:"secret"`
72 ConfigMap *string `yaml:"config_map"`
73}
74
75type tektonWorkflowDoc struct {
76 Tack struct {
77 Tekton tektonWorkflowConfig `yaml:"tekton"`
78 } `yaml:"tack"`
79}
80
81// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body.
82func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) {
83 if strings.TrimSpace(raw) == "" {
84 return nil, errors.New("workflow body is empty")
85 }
86 var doc tektonWorkflowDoc
87 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
88 return nil, fmt.Errorf("parse workflow yaml: %w", err)
89 }
90 cfg := doc.Tack.Tekton
91 if cfg.Pipeline == "" {
92 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required")
93 }
94 return &cfg, nil
95}
96
97type tektonProvider struct {
98 br *broker
99 st *store
100 log *slog.Logger
101 client k8s.Client
102 namespace string
103}
104
105var _ Provider = (*tektonProvider)(nil)
106
107func newTektonProvider(
108 br *broker,
109 st *store,
110 client k8s.Client,
111 namespace string,
112 log *slog.Logger,
113) *tektonProvider {
114 return &tektonProvider{
115 br: br,
116 st: st,
117 log: log.With("component", "provider", "kind", "tekton"),
118 client: client,
119 namespace: namespace,
120 }
121}
122
123func newInClusterTektonProvider(
124 br *broker,
125 st *store,
126 namespace string,
127 log *slog.Logger,
128) (*tektonProvider, error) {
129 client, err := k8s.NewInClusterClient()
130 if err != nil {
131 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err)
132 }
133 return newTektonProvider(br, st, client, namespace, log), nil
134}
135
136func (p *tektonProvider) Spawn(
137 ctx context.Context,
138 knot string,
139 pipelineRkey string,
140 actor string,
141 trigger *tangled.Pipeline_TriggerMetadata,
142 workflows []*tangled.Pipeline_Workflow,
143) {
144 if len(workflows) == 0 {
145 p.log.Warn("pipeline has no workflows; nothing to spawn",
146 "knot", knot, "rkey", pipelineRkey,
147 )
148 return
149 }
150 for _, wf := range workflows {
151 if wf == nil || wf.Name == "" {
152 continue
153 }
154 wf := wf
155 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
156 }
157}
158
159func (p *tektonProvider) spawnWorkflow(
160 ctx context.Context,
161 knot string,
162 pipelineRkey string,
163 actor string,
164 trigger *tangled.Pipeline_TriggerMetadata,
165 wf *tangled.Pipeline_Workflow,
166) {
167 logger := p.log.With(
168 "knot", knot,
169 "pipeline_rkey", pipelineRkey,
170 "workflow", wf.Name,
171 "actor", actor,
172 )
173
174 cfg, err := parseTektonWorkflowConfig(wf.Raw)
175 if err != nil {
176 logger.Error("invalid workflow config; refusing to spawn", "err", err)
177 return
178 }
179 commit, branch := triggerCommitAndBranch(trigger)
180 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch)
181 pr := buildTektonPipelineRun(
182 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf,
183 )
184
185 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr)
186 if errors.Is(err, k8s.ErrAlreadyExists) {
187 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name)
188 }
189 if err != nil {
190 logger.Error("create tekton PipelineRun", "err", err,
191 "namespace", p.namespace, "pipeline_run", name,
192 "pipeline", cfg.Pipeline,
193 )
194 return
195 }
196
197 ref := TektonRunRef{
198 Knot: knot,
199 PipelineRkey: pipelineRkey,
200 Workflow: wf.Name,
201 Namespace: p.namespace,
202 PipelineRunName: name,
203 PipelineRunUID: created.GetUID(),
204 PipelineName: cfg.Pipeline,
205 PipelineURI: pipelineATURI(knot, pipelineRkey),
206 }
207 if err := p.st.InsertTektonRun(ctx, ref); err != nil {
208 logger.Error("persist tekton run mapping", "err", err,
209 "pipeline_run", name,
210 )
211 return
212 }
213
214 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name,
215 "pending", name, nil, nil); err != nil {
216 logger.Error("publish initial pending status", "err", err)
217 }
218
219 logger.Info("tekton PipelineRun created",
220 "namespace", p.namespace,
221 "pipeline", cfg.Pipeline,
222 "pipeline_run", name,
223 "uid", ref.PipelineRunUID,
224 )
225 go p.watchPipelineRun(ctx, ref)
226}
227
228func buildTektonPipelineRun(
229 namespace, name string,
230 cfg *tektonWorkflowConfig,
231 knot, pipelineRkey, actor, commit, branch string,
232 wf *tangled.Pipeline_Workflow,
233) k8s.Object {
234 obj := k8s.Object{
235 "apiVersion": tektonAPIVersion,
236 "kind": tektonRunKind,
237 "metadata": map[string]any{
238 "name": name,
239 "namespace": namespace,
240 "labels": map[string]any{
241 tektonLabelManagedBy: "tack",
242 tektonLabelPipelineRkey: labelValue(pipelineRkey),
243 tektonLabelWorkflow: labelValue(wf.Name),
244 },
245 "annotations": map[string]any{
246 tektonAnnotationKnot: knot,
247 tektonAnnotationPipelineRkey: pipelineRkey,
248 tektonAnnotationWorkflow: wf.Name,
249 tektonAnnotationActor: actor,
250 tektonAnnotationCommit: commit,
251 tektonAnnotationBranch: branch,
252 },
253 },
254 "spec": map[string]any{
255 "pipelineRef": map[string]any{
256 "name": cfg.Pipeline,
257 },
258 "params": []any{
259 map[string]any{
260 "name": "commit",
261 "value": commit,
262 },
263 map[string]any{
264 "name": "branch",
265 "value": branch,
266 },
267 map[string]any{
268 "name": "actor",
269 "value": actor,
270 },
271 },
272 },
273 }
274
275 spec := obj["spec"].(map[string]any)
276
277 if len(cfg.Workspaces) != 0 {
278 spec["podTemplate"] = map[string]any{
279 "securityContext": map[string]any{
280 "fsGroup": 65532,
281 },
282 }
283
284 workspaces := []any{}
285
286 for _, ws := range cfg.Workspaces {
287 switch {
288 case ws.Storage != nil:
289 workspaces = append(workspaces, map[string]any{
290 "name": ws.Name,
291 "volumeClaimTemplate": map[string]any{
292 "spec": map[string]any{
293 "accessModes": ws.AccessModes,
294 "resources": map[string]any{
295 "requests": map[string]any{
296 "storage": *ws.Storage,
297 },
298 },
299 },
300 },
301 })
302
303 case ws.PVC != nil:
304 workspaces = append(workspaces, map[string]any{
305 "name": ws.Name,
306 "persistentVolumeClaim": map[string]any{
307 "claimName": *ws.PVC,
308 },
309 })
310
311 case ws.Secret != nil:
312 workspaces = append(workspaces, map[string]any{
313 "name": ws.Name,
314 "secret": map[string]any{
315 "secretName": *ws.Secret,
316 },
317 })
318
319 case ws.ConfigMap != nil:
320 workspaces = append(workspaces, map[string]any{
321 "name": ws.Name,
322 "configMap": map[string]any{
323 "name": *ws.ConfigMap,
324 },
325 })
326 }
327 }
328
329 spec["workspaces"] = workspaces
330 }
331
332 if cfg.ServiceAccount != "" {
333 spec["serviceAccountName"] = cfg.ServiceAccount
334 }
335 if len(cfg.Params) > 0 {
336 keys := make([]string, 0, len(cfg.Params))
337 for key := range cfg.Params {
338 keys = append(keys, key)
339 }
340 sort.Strings(keys)
341 params := make([]any, 0, len(keys))
342 for _, key := range keys {
343 params = append(params, map[string]any{
344 "name": key,
345 "value": cfg.Params[key],
346 })
347 }
348 spec["params"] = params
349 }
350 return obj
351}
352
353func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) {
354 logger := p.log.With(
355 "knot", ref.Knot,
356 "pipeline_rkey", ref.PipelineRkey,
357 "workflow", ref.Workflow,
358 "namespace", ref.Namespace,
359 "pipeline_run", ref.PipelineRunName,
360 )
361
362 logger.Debug("watchPipelineRun: starting")
363
364 last := ""
365 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
366 ref.PipelineRunName); err == nil {
367 status, terminal, ok := mapTektonPipelineRunStatus(obj)
368 logger.Debug("watchPipelineRun: initial status read",
369 "status", status, "terminal", terminal, "ok", ok,
370 )
371 if ok {
372 last = status
373 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
374 status, ref.PipelineRunName, nil, nil); err != nil {
375 logger.Error("publish tekton status", "err", err, "status", status)
376 }
377 if terminal {
378 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status)
379 return
380 }
381 }
382 } else if errors.Is(err, k8s.ErrNotFound) {
383 logger.Warn("PipelineRun disappeared while watching")
384 return
385 } else {
386 logger.Debug("initial PipelineRun status read", "err", err)
387 }
388
389 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace,
390 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName},
391 )
392 if err != nil {
393 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err)
394 p.pollPipelineRun(ctx, ref, logger, last)
395 return
396 }
397 defer w.Stop()
398
399 logger.Debug("watchPipelineRun: watch established; entering event loop")
400 for {
401 select {
402 case <-ctx.Done():
403 logger.Debug("watchPipelineRun: context cancelled")
404 return
405 case ev, ok := <-w.ResultChan():
406 if !ok {
407 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling")
408 p.pollPipelineRun(ctx, ref, logger, last)
409 return
410 }
411 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object)
412 logger.Debug("watchPipelineRun: watch event",
413 "event_type", ev.Type,
414 "status", status, "terminal", terminal, "ok", ok, "last", last,
415 )
416 if !ok || status == last {
417 if terminal {
418 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status)
419 return
420 }
421 continue
422 }
423 last = status
424 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
425 status, ref.PipelineRunName, nil, nil); err != nil {
426 logger.Error("publish tekton status", "err", err, "status", status)
427 continue
428 }
429 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal)
430 if terminal {
431 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status)
432 return
433 }
434 }
435 }
436}
437
438func (p *tektonProvider) pollPipelineRun(
439 ctx context.Context,
440 ref TektonRunRef,
441 logger *slog.Logger,
442 last string,
443) {
444 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s")
445 ticker := time.NewTicker(5 * time.Second)
446 defer ticker.Stop()
447 for {
448 select {
449 case <-ctx.Done():
450 logger.Debug("pollPipelineRun: context cancelled")
451 return
452 case <-ticker.C:
453 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
454 ref.PipelineRunName,
455 )
456 if errors.Is(err, k8s.ErrNotFound) {
457 logger.Warn("PipelineRun disappeared while watching")
458 return
459 }
460 if err != nil {
461 logger.Debug("get PipelineRun status", "err", err)
462 continue
463 }
464 status, terminal, ok := mapTektonPipelineRunStatus(obj)
465 logger.Debug("pollPipelineRun: poll tick",
466 "status", status, "terminal", terminal, "ok", ok, "last", last,
467 )
468 if !ok || status == last {
469 if terminal {
470 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status)
471 return
472 }
473 continue
474 }
475 last = status
476 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
477 status, ref.PipelineRunName, nil, nil); err != nil {
478 logger.Error("publish tekton status", "err", err, "status", status)
479 continue
480 }
481 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal)
482 if terminal {
483 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status)
484 return
485 }
486 }
487 }
488}
489
490// mapTektonPipelineRunStatus translates Tekton's Succeeded condition
491// into the Tangled status strings consumed by the appview.
492func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) {
493 conditions, ok := obj.NestedSlice("status", "conditions")
494 if !ok || len(conditions) == 0 {
495 slog.Debug("mapTektonPipelineRunStatus: no conditions found",
496 "pipeline_run", obj.GetName(),
497 )
498 return "", false, false
499 }
500 for _, raw := range conditions {
501 cond, _ := raw.(map[string]interface{})
502 condType, _ := cond["type"].(string)
503 condStatus, _ := cond["status"].(string)
504 reason, _ := cond["reason"].(string)
505 message, _ := cond["message"].(string)
506 slog.Debug("mapTektonPipelineRunStatus: condition",
507 "pipeline_run", obj.GetName(),
508 "type", condType,
509 "status", condStatus,
510 "reason", reason,
511 "message", message,
512 )
513 if condType != "Succeeded" {
514 continue
515 }
516 switch condStatus {
517 case "True":
518 return "success", true, true
519 case "False":
520 if tektonReasonCancelled(reason) {
521 return "cancelled", true, true
522 }
523 return "failed", true, true
524 case "Unknown":
525 return "running", false, true
526 default:
527 return "", false, false
528 }
529 }
530 return "", false, false
531}
532
533func tektonReasonCancelled(reason string) bool {
534 r := strings.ToLower(reason)
535 return strings.Contains(r, "cancel") || strings.Contains(r, "stop")
536}
537
538func (p *tektonProvider) Logs(
539 ctx context.Context,
540 knot string,
541 pipelineRkey string,
542 workflow string,
543) (<-chan LogLine, error) {
544 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow)
545 if err != nil {
546 return nil, fmt.Errorf("lookup tekton run mapping: %w", err)
547 }
548 if ref == nil {
549 // No mapping at all means this provider never spawned a
550 // PipelineRun for the requested tuple, so a 404 is the
551 // honest answer.
552 return nil, ErrLogsNotFound
553 }
554
555 // At this point the workflow *was* spawned — we have a row in the
556 // store mapping the tuple to a PipelineRun. The TaskRuns the
557 // PipelineRun fans out to are created asynchronously by Tekton
558 // once the run is admitted, so a freshly-spawned or still-queueing
559 // PipelineRun will momentarily report zero TaskRuns. Returning
560 // ErrLogsNotFound here would mistranslate that into a 404 at the
561 // HTTP layer (see provider.go contract: ErrLogsNotFound means the
562 // workflow never ran), making just-spawned runs look nonexistent
563 // to the appview. Instead, hand back an open channel and poll
564 // inside the goroutine until TaskRuns materialize, ctx is
565 // cancelled, or the PipelineRun reaches a terminal state with
566 // nothing to stream.
567 out := make(chan LogLine, 32)
568 go func() {
569 defer close(out)
570 p.streamPipelineRunLogs(ctx, out, *ref)
571 }()
572 return out, nil
573}
574
575// streamPipelineRunLogs drives the Logs channel for a single PipelineRun.
576// It polls for TaskRuns until the PipelineRun is terminal (or ctx is
577// cancelled), streaming each TaskRun's logs exactly once.
578//
579// The loop is deliberately a poll rather than a one-shot pass for two
580// reasons:
581//
582// 1. Live runs need follow semantics. Streaming a still-running TaskRun
583// uses Follow=true on the pod log stream, so streamTaskRunLogs only
584// returns once the container actually terminates. The outer loop
585// then re-checks for new TaskRuns and the PipelineRun's terminal
586// state, instead of closing the channel mid-run.
587//
588// 2. PipelineRuns can spawn additional TaskRuns over time (sequential
589// `runAfter` tasks, finally blocks, retries). A single snapshot
590// would silently drop any TaskRun that appears after the snapshot,
591// even though the workflow is still running.
592//
593// The loop terminates only when the PipelineRun is terminal AND the
594// most recent listing produced no new TaskRuns, which together mean no
595// further TaskRuns will ever appear.
596func (p *tektonProvider) streamPipelineRunLogs(
597 ctx context.Context,
598 out chan<- LogLine,
599 ref TektonRunRef,
600) {
601 const pollInterval = 1 * time.Second
602 seen := map[string]bool{}
603 stepID := 0
604 for {
605 if err := ctx.Err(); err != nil {
606 return
607 }
608
609 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref)
610 if err != nil {
611 p.log.Debug("Logs: list TaskRuns failed",
612 "err", err, "pipeline_run", ref.PipelineRunName,
613 )
614 }
615 // Snapshot the terminal state *after* the listing so we never
616 // observe terminal=true while still missing a TaskRun that
617 // existed at list time. The reverse race (terminal observed
618 // before a TaskRun spawn) is handled by the next iteration: we
619 // only exit when terminal is true AND no new TaskRuns showed up
620 // in this pass.
621 terminal := p.isPipelineRunTerminal(ctx, ref)
622
623 var fresh []k8s.Object
624 for _, tr := range taskRuns {
625 name := tr.GetName()
626 if name == "" || seen[name] {
627 continue
628 }
629 seen[name] = true
630 fresh = append(fresh, tr)
631 }
632 p.log.Debug("Logs: poll iteration",
633 "pipeline_run", ref.PipelineRunName,
634 "task_runs_total", len(taskRuns),
635 "task_runs_new", len(fresh),
636 "terminal", terminal,
637 )
638
639 for _, tr := range fresh {
640 taskName := tr.GetName()
641 if taskName == "" {
642 taskName = fmt.Sprintf("task %d", stepID)
643 }
644 p.log.Debug("Logs: streaming TaskRun",
645 "task_run", taskName, "step_id", stepID, "terminal", terminal,
646 )
647 if !sendLine(ctx, out, LogLine{
648 Kind: LogKindControl,
649 Time: time.Now(),
650 Content: taskName,
651 StepId: stepID,
652 StepStatus: StepStatusStart,
653 }) {
654 return
655 }
656
657 // terminal is the snapshot taken at the top of this
658 // iteration. If the pipeline was terminal then, all
659 // TaskRuns we see are guaranteed complete and we can
660 // take the snapshot fast-path; otherwise we follow the
661 // pod logs live so StepStatusEnd is only emitted once
662 // the container actually exits.
663 if terminal {
664 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID)
665 } else {
666 p.streamTaskRunLogs(ctx, out, ref, tr, stepID)
667 }
668
669 if !sendLine(ctx, out, LogLine{
670 Kind: LogKindControl,
671 Time: time.Now(),
672 Content: taskName,
673 StepId: stepID,
674 StepStatus: StepStatusEnd,
675 }) {
676 return
677 }
678 p.log.Debug("Logs: finished TaskRun",
679 "task_run", taskName, "step_id", stepID,
680 )
681 stepID++
682 }
683
684 // Done condition: the PipelineRun is terminal AND we found no
685 // new TaskRuns this iteration. Both are required because a
686 // terminal PipelineRun can still expose a freshly-listed
687 // TaskRun whose pod we haven't drained yet.
688 if terminal && len(fresh) == 0 {
689 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns",
690 "pipeline_run", ref.PipelineRunName,
691 )
692 return
693 }
694
695 // When we processed new TaskRuns this round, loop again
696 // immediately to re-list — Follow=true streaming may have
697 // blocked us long enough for additional TaskRuns or terminal
698 // transitions to have happened.
699 if len(fresh) > 0 {
700 continue
701 }
702
703 select {
704 case <-ctx.Done():
705 return
706 case <-time.After(pollInterval):
707 }
708 }
709}
710
711// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now.
712func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool {
713 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
714 ref.PipelineRunName,
715 )
716 if err != nil {
717 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName)
718 return false
719 }
720 _, terminal, ok := mapTektonPipelineRunStatus(obj)
721 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok)
722 return ok && terminal
723}
724
725// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed.
726// It reads each step container's logs in one shot using all-containers traversal,
727// and also inlines the Tekton step status from the TaskRun (exit code, reason) as
728// control messages so the caller gets full context without needing to watch for events.
729func (p *tektonProvider) fetchCompletedTaskRunLogs(
730 ctx context.Context,
731 out chan<- LogLine,
732 ref TektonRunRef,
733 tr k8s.Object,
734 stepID int,
735) {
736 trName := tr.GetName()
737 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName)
738 if err != nil {
739 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err,
740 "task_run", trName, "pipeline_run", ref.PipelineRunName,
741 )
742 return
743 }
744 p.log.Debug("fetchCompletedTaskRunLogs: found pods",
745 "task_run", trName, "pod_count", len(pods),
746 )
747
748 // Emit a summary line from the TaskRun status (steps[*].terminated) so we
749 // get exit codes and reasons even if the pod logs are sparse.
750 steps, _ := tr.NestedSlice("status", "steps")
751 for _, rawStep := range steps {
752 step, _ := rawStep.(map[string]any)
753 stepName, _ := step["name"].(string)
754 term, _ := step["terminated"].(map[string]any)
755 if term == nil {
756 continue
757 }
758 exitCode := numberToInt64(term["exitCode"])
759 reason, _ := term["reason"].(string)
760 msg, _ := term["message"].(string)
761 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason)
762 if msg != "" {
763 line += " " + msg
764 }
765 p.log.Debug("fetchCompletedTaskRunLogs: step terminated",
766 "task_run", trName, "step", stepName,
767 "exit_code", exitCode, "reason", reason,
768 )
769 if !sendLine(ctx, out, LogLine{
770 Kind: LogKindData,
771 Time: time.Now(),
772 Content: line + "\n",
773 StepId: stepID,
774 Stream: "stdout",
775 }) {
776 return
777 }
778 }
779
780 for _, pod := range pods {
781 containers := append([]k8s.Container(nil), pod.InitContainers...)
782 containers = append(containers, pod.Containers...)
783 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers",
784 "pod", pod.Name, "container_count", len(containers),
785 )
786 for _, c := range containers {
787 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs",
788 "pod", pod.Name, "container", c.Name,
789 )
790 // Snapshot read (Follow=false) is correct here: the TaskRun
791 // has already terminated, so the API server has the full log
792 // available and there is nothing more to wait for.
793 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
794 k8s.LogOptions{Follow: false},
795 )
796 if err != nil {
797 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err,
798 "pod", pod.Name, "container", c.Name,
799 )
800 continue
801 }
802 p.sendReaderLines(ctx, out, rc, stepID)
803 _ = rc.Close()
804 p.log.Debug("fetchCompletedTaskRunLogs: done reading container",
805 "pod", pod.Name, "container", c.Name,
806 )
807 }
808 }
809}
810
811func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) {
812 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{
813 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName,
814 })
815 if err != nil {
816 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err)
817 }
818 items := append([]k8s.Object(nil), list...)
819 sort.Slice(items, func(i, j int) bool {
820 ti := items[i].GetCreationTimestamp()
821 tj := items[j].GetCreationTimestamp()
822 return ti.Before(tj)
823 })
824 return items, nil
825}
826
827func (p *tektonProvider) streamTaskRunLogs(
828 ctx context.Context,
829 out chan<- LogLine,
830 ref TektonRunRef,
831 tr k8s.Object,
832 stepID int,
833) {
834 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName())
835 if err != nil {
836 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err,
837 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName,
838 )
839 return
840 }
841 p.log.Debug("streamTaskRunLogs: found pods",
842 "task_run", tr.GetName(), "pod_count", len(pods),
843 )
844 for _, pod := range pods {
845 containers := append([]k8s.Container(nil), pod.InitContainers...)
846 containers = append(containers, pod.Containers...)
847 p.log.Debug("streamTaskRunLogs: streaming pod containers",
848 "pod", pod.Name, "container_count", len(containers),
849 )
850 for _, c := range containers {
851 p.log.Debug("streamTaskRunLogs: streaming container",
852 "pod", pod.Name, "container", c.Name, "step_id", stepID,
853 )
854 // Live tail with Follow=true: the apiserver holds the
855 // connection open until the container terminates (or ctx is
856 // cancelled), so sendReaderLines only returns once the
857 // container is actually done. Without this, the read EOFs at
858 // the current tail position and the caller emits a spurious
859 // StepStatusEnd while the step is still running.
860 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
861 k8s.LogOptions{Follow: true},
862 )
863 if err != nil {
864 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err,
865 "pod", pod.Name, "container", c.Name,
866 )
867 continue
868 }
869 p.sendReaderLines(ctx, out, rc, stepID)
870 _ = rc.Close()
871 p.log.Debug("streamTaskRunLogs: finished container",
872 "pod", pod.Name, "container", c.Name,
873 )
874 }
875 }
876}
877
878func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) {
879 list, err := p.client.ListPods(ctx, namespace,
880 "tekton.dev/taskRun="+taskRun,
881 )
882 if err != nil {
883 return nil, fmt.Errorf("list pods: %w", err)
884 }
885 pods := append([]k8s.Pod(nil), list...)
886 sort.Slice(pods, func(i, j int) bool {
887 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp)
888 })
889 return pods, nil
890}
891
892func (p *tektonProvider) sendReaderLines(
893 ctx context.Context,
894 out chan<- LogLine,
895 rc io.Reader,
896 stepID int,
897) {
898 scanner := bufio.NewScanner(rc)
899 for scanner.Scan() {
900 if !sendLine(ctx, out, LogLine{
901 Kind: LogKindData,
902 Time: time.Now(),
903 Content: scanner.Text() + "\n",
904 StepId: stepID,
905 Stream: "stdout",
906 }) {
907 return
908 }
909 }
910 if err := scanner.Err(); err != nil {
911 p.log.Debug("scan pod log", "err", err)
912 }
913}
914
915func numberToInt64(value any) int64 {
916 switch v := value.(type) {
917 case int64:
918 return v
919 case int:
920 return int64(v)
921 case float64:
922 return int64(v)
923 case json.Number:
924 i, _ := v.Int64()
925 return i
926 default:
927 return 0
928 }
929}
930
931func (p *tektonProvider) publishStatus(
932 ctx context.Context,
933 pipelineURI, workflow, status, runName string,
934 errMsg *string,
935 exitCode *int64,
936) error {
937 rec := tangled.PipelineStatus{
938 LexiconTypeID: tangled.PipelineStatusNSID,
939 Pipeline: pipelineURI,
940 Workflow: workflow,
941 Status: status,
942 CreatedAt: time.Now().UTC().Format(time.RFC3339),
943 Error: errMsg,
944 ExitCode: exitCode,
945 }
946 body, err := json.Marshal(rec)
947 if err != nil {
948 return fmt.Errorf("marshal pipeline.status: %w", err)
949 }
950 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano())
951 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
952 return fmt.Errorf("publish pipeline.status: %w", err)
953 }
954 return nil
955}
956
957func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string {
958 h := sha256.Sum256([]byte(strings.Join(
959 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00",
960 )))
961 suffix := hex.EncodeToString(h[:])[:12]
962 base := dnsLabel("tack-" + workflow)
963 maxBase := 63 - len(suffix) - 1
964 if len(base) > maxBase {
965 base = strings.TrimRight(base[:maxBase], "-")
966 }
967 if base == "" {
968 base = "tack"
969 }
970 return base + "-" + suffix
971}
972
973func dnsLabel(s string) string {
974 var b strings.Builder
975 lastDash := false
976 for _, r := range strings.ToLower(s) {
977 ok := unicode.IsLetter(r) || unicode.IsDigit(r)
978 if ok {
979 b.WriteRune(r)
980 lastDash = false
981 continue
982 }
983 if !lastDash {
984 b.WriteByte('-')
985 lastDash = true
986 }
987 }
988 return strings.Trim(b.String(), "-")
989}
990
991func labelValue(s string) string {
992 v := dnsLabel(s)
993 if len(v) > 63 {
994 v = strings.TrimRight(v[:63], "-")
995 }
996 if v == "" {
997 return "unknown"
998 }
999 return v
1000}