Stitch any CI into Tangled
1package main
2
3// tektonProvider implements Provider by creating Tekton PipelineRuns
4// directly inside the Kubernetes cluster. Tack already receives and
5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would
6// duplicate the event-to-run translation layer instead of simplifying it.
7
8import (
9 "bufio"
10 "context"
11 "crypto/sha256"
12 "encoding/hex"
13 "encoding/json"
14 "errors"
15 "fmt"
16 "io"
17 "log/slog"
18 "sort"
19 "strings"
20 "time"
21 "unicode"
22
23 "tangled.org/core/api/tangled"
24
25 "go.mitchellh.com/tack/internal/k8s"
26 "go.yaml.in/yaml/v2"
27)
28
29const (
30 tektonAPIVersion = "tekton.dev/v1"
31 tektonRunKind = "PipelineRun"
32
33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by"
34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
35 tektonLabelWorkflow = "tack.mitchellh.com/workflow"
36
37 tektonAnnotationKnot = "tack.mitchellh.com/knot"
38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow"
40 tektonAnnotationActor = "tack.mitchellh.com/actor"
41 tektonAnnotationCommit = "tack.mitchellh.com/commit"
42 tektonAnnotationBranch = "tack.mitchellh.com/branch"
43)
44
45var (
46 pipelineRunsGVR = k8s.GVR{
47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns",
48 }
49 taskRunsGVR = k8s.GVR{
50 Group: "tekton.dev", Version: "v1", Resource: "taskruns",
51 }
52)
53
54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML.
55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are
56// deliberately string-only in v1: tack is meant to select an existing
57// runner and pass a small amount of routing data, not mirror Tekton's
58// entire PipelineRun API.
59type tektonWorkflowConfig struct {
60 Pipeline string `yaml:"pipeline"`
61 ServiceAccount string `yaml:"service_account"`
62 Params map[string]string `yaml:"params"`
63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"`
64}
65
66type tektonWorkspaceConfig struct {
67 Name string `yaml:"name"`
68 AccessModes []string `yaml:"access_modes"`
69 Storage *string `yaml:"storage"`
70 PVC *string `yaml:"pvc"`
71 Secret *string `yaml:"secret"`
72 ConfigMap *string `yaml:"config_map"`
73}
74
75type tektonWorkflowDoc struct {
76 Tack struct {
77 Tekton tektonWorkflowConfig `yaml:"tekton"`
78 } `yaml:"tack"`
79}
80
81// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body.
82func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) {
83 if strings.TrimSpace(raw) == "" {
84 return nil, errors.New("workflow body is empty")
85 }
86 var doc tektonWorkflowDoc
87 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
88 return nil, fmt.Errorf("parse workflow yaml: %w", err)
89 }
90 cfg := doc.Tack.Tekton
91 if cfg.Pipeline == "" {
92 return nil, errors.New(
93 "workflow yaml: `tack.tekton.pipeline` is required",
94 )
95 }
96
97 // Validate workspaces: each must have exactly one volume source
98 // so the resulting PipelineRun is unambiguous. Without this,
99 // a workspace with e.g. both `storage` and `pvc` set silently
100 // picks whichever the switch statement matches first.
101 for _, ws := range cfg.Workspaces {
102 if ws.Name == "" {
103 return nil, errors.New("workspace: name is required")
104 }
105 sources := 0
106 if ws.Storage != nil {
107 sources++
108 }
109 if ws.PVC != nil {
110 sources++
111 }
112 if ws.Secret != nil {
113 sources++
114 }
115 if ws.ConfigMap != nil {
116 sources++
117 }
118 if sources == 0 {
119 return nil, fmt.Errorf(
120 "workspace %q: no volume source specified", ws.Name,
121 )
122 }
123 if sources > 1 {
124 return nil, fmt.Errorf(
125 "workspace %q: multiple volume sources specified"+
126 " (pick one of storage, pvc, secret, config_map)",
127 ws.Name,
128 )
129 }
130 }
131
132 return &cfg, nil
133}
134
135type tektonProvider struct {
136 br *broker
137 st *store
138 log *slog.Logger
139 client k8s.Client
140 namespace string
141}
142
143var _ Provider = (*tektonProvider)(nil)
144
145func newTektonProvider(
146 br *broker,
147 st *store,
148 client k8s.Client,
149 namespace string,
150 log *slog.Logger,
151) *tektonProvider {
152 return &tektonProvider{
153 br: br,
154 st: st,
155 log: log.With("component", "provider", "kind", "tekton"),
156 client: client,
157 namespace: namespace,
158 }
159}
160
161func newInClusterTektonProvider(
162 br *broker,
163 st *store,
164 namespace string,
165 log *slog.Logger,
166) (*tektonProvider, error) {
167 client, err := k8s.NewInClusterClient()
168 if err != nil {
169 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err)
170 }
171 return newTektonProvider(br, st, client, namespace, log), nil
172}
173
174func (p *tektonProvider) Spawn(
175 ctx context.Context,
176 knot string,
177 pipelineRkey string,
178 actor string,
179 trigger *tangled.Pipeline_TriggerMetadata,
180 workflows []*tangled.Pipeline_Workflow,
181) {
182 if len(workflows) == 0 {
183 p.log.Warn("pipeline has no workflows; nothing to spawn",
184 "knot", knot, "rkey", pipelineRkey,
185 )
186 return
187 }
188 for _, wf := range workflows {
189 if wf == nil || wf.Name == "" {
190 continue
191 }
192 wf := wf
193 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
194 }
195}
196
197func (p *tektonProvider) spawnWorkflow(
198 ctx context.Context,
199 knot string,
200 pipelineRkey string,
201 actor string,
202 trigger *tangled.Pipeline_TriggerMetadata,
203 wf *tangled.Pipeline_Workflow,
204) {
205 logger := p.log.With(
206 "knot", knot,
207 "pipeline_rkey", pipelineRkey,
208 "workflow", wf.Name,
209 "actor", actor,
210 )
211
212 cfg, err := parseTektonWorkflowConfig(wf.Raw)
213 if err != nil {
214 logger.Error("invalid workflow config; refusing to spawn", "err", err)
215 return
216 }
217 commit, branch := triggerCommitAndBranch(trigger)
218 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch)
219 pr := buildTektonPipelineRun(
220 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf,
221 )
222
223 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr)
224 if errors.Is(err, k8s.ErrAlreadyExists) {
225 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name)
226 }
227 if err != nil {
228 logger.Error("create tekton PipelineRun", "err", err,
229 "namespace", p.namespace, "pipeline_run", name,
230 "pipeline", cfg.Pipeline,
231 )
232 return
233 }
234
235 ref := TektonRunRef{
236 Knot: knot,
237 PipelineRkey: pipelineRkey,
238 Workflow: wf.Name,
239 Namespace: p.namespace,
240 PipelineRunName: name,
241 PipelineRunUID: created.GetUID(),
242 PipelineName: cfg.Pipeline,
243 PipelineURI: pipelineATURI(knot, pipelineRkey),
244 }
245 if err := p.st.InsertTektonRun(ctx, ref); err != nil {
246 logger.Error("persist tekton run mapping", "err", err,
247 "pipeline_run", name,
248 )
249 return
250 }
251
252 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name,
253 "pending", name, nil, nil); err != nil {
254 logger.Error("publish initial pending status", "err", err)
255 }
256
257 logger.Info("tekton PipelineRun created",
258 "namespace", p.namespace,
259 "pipeline", cfg.Pipeline,
260 "pipeline_run", name,
261 "uid", ref.PipelineRunUID,
262 )
263 go p.watchPipelineRun(ctx, ref)
264}
265
266func buildTektonPipelineRun(
267 namespace, name string,
268 cfg *tektonWorkflowConfig,
269 knot, pipelineRkey, actor, commit, branch string,
270 wf *tangled.Pipeline_Workflow,
271) k8s.Object {
272 obj := k8s.Object{
273 "apiVersion": tektonAPIVersion,
274 "kind": tektonRunKind,
275 "metadata": map[string]any{
276 "name": name,
277 "namespace": namespace,
278 "labels": map[string]any{
279 tektonLabelManagedBy: "tack",
280 tektonLabelPipelineRkey: labelValue(pipelineRkey),
281 tektonLabelWorkflow: labelValue(wf.Name),
282 },
283 "annotations": map[string]any{
284 tektonAnnotationKnot: knot,
285 tektonAnnotationPipelineRkey: pipelineRkey,
286 tektonAnnotationWorkflow: wf.Name,
287 tektonAnnotationActor: actor,
288 tektonAnnotationCommit: commit,
289 tektonAnnotationBranch: branch,
290 },
291 },
292 "spec": map[string]any{
293 "pipelineRef": map[string]any{
294 "name": cfg.Pipeline,
295 },
296 "params": []any{
297 map[string]any{
298 "name": "commit",
299 "value": commit,
300 },
301 map[string]any{
302 "name": "branch",
303 "value": branch,
304 },
305 map[string]any{
306 "name": "actor",
307 "value": actor,
308 },
309 },
310 },
311 }
312
313 spec := obj["spec"].(map[string]any)
314
315 if len(cfg.Workspaces) != 0 {
316 spec["podTemplate"] = map[string]any{
317 "securityContext": map[string]any{
318 "fsGroup": 65532,
319 },
320 }
321
322 workspaces := []any{}
323
324 for _, ws := range cfg.Workspaces {
325 switch {
326 case ws.Storage != nil:
327 workspaces = append(workspaces, map[string]any{
328 "name": ws.Name,
329 "volumeClaimTemplate": map[string]any{
330 "spec": map[string]any{
331 "accessModes": ws.AccessModes,
332 "resources": map[string]any{
333 "requests": map[string]any{
334 "storage": *ws.Storage,
335 },
336 },
337 },
338 },
339 })
340
341 case ws.PVC != nil:
342 workspaces = append(workspaces, map[string]any{
343 "name": ws.Name,
344 "persistentVolumeClaim": map[string]any{
345 "claimName": *ws.PVC,
346 },
347 })
348
349 case ws.Secret != nil:
350 workspaces = append(workspaces, map[string]any{
351 "name": ws.Name,
352 "secret": map[string]any{
353 "secretName": *ws.Secret,
354 },
355 })
356
357 case ws.ConfigMap != nil:
358 workspaces = append(workspaces, map[string]any{
359 "name": ws.Name,
360 "configMap": map[string]any{
361 "name": *ws.ConfigMap,
362 },
363 })
364 }
365 }
366
367 spec["workspaces"] = workspaces
368 }
369
370 if cfg.ServiceAccount != "" {
371 spec["taskRunTemplate"] = map[string]any{
372 "serviceAccountName": cfg.ServiceAccount,
373 }
374 }
375
376 // Merge user-defined params with the built-in ones (commit,
377 // branch, actor). User params that collide with a built-in name
378 // win, so callers can override the defaults when the upstream
379 // Tekton Pipeline expects a different shape.
380 if len(cfg.Params) > 0 {
381 builtins := map[string]string{
382 "commit": commit,
383 "branch": branch,
384 "actor": actor,
385 }
386
387 // Collect all param names, user params override built-ins.
388 merged := make(map[string]string, len(builtins)+len(cfg.Params))
389 for k, v := range builtins {
390 merged[k] = v
391 }
392 for k, v := range cfg.Params {
393 merged[k] = v
394 }
395
396 keys := make([]string, 0, len(merged))
397 for key := range merged {
398 keys = append(keys, key)
399 }
400 sort.Strings(keys)
401
402 params := make([]any, 0, len(keys))
403 for _, key := range keys {
404 params = append(params, map[string]any{
405 "name": key,
406 "value": merged[key],
407 })
408 }
409 spec["params"] = params
410 }
411 return obj
412}
413
414func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) {
415 logger := p.log.With(
416 "knot", ref.Knot,
417 "pipeline_rkey", ref.PipelineRkey,
418 "workflow", ref.Workflow,
419 "namespace", ref.Namespace,
420 "pipeline_run", ref.PipelineRunName,
421 )
422
423 logger.Debug("watchPipelineRun: starting")
424
425 last := ""
426 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
427 ref.PipelineRunName); err == nil {
428 status, terminal, ok := mapTektonPipelineRunStatus(obj)
429 logger.Debug("watchPipelineRun: initial status read",
430 "status", status, "terminal", terminal, "ok", ok,
431 )
432 if ok {
433 last = status
434 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
435 status, ref.PipelineRunName, nil, nil); err != nil {
436 logger.Error("publish tekton status", "err", err, "status", status)
437 }
438 if terminal {
439 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status)
440 return
441 }
442 }
443 } else if errors.Is(err, k8s.ErrNotFound) {
444 logger.Warn("PipelineRun disappeared while watching")
445 return
446 } else {
447 logger.Debug("initial PipelineRun status read", "err", err)
448 }
449
450 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace,
451 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName},
452 )
453 if err != nil {
454 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err)
455 p.pollPipelineRun(ctx, ref, logger, last)
456 return
457 }
458 defer w.Stop()
459
460 logger.Debug("watchPipelineRun: watch established; entering event loop")
461 for {
462 select {
463 case <-ctx.Done():
464 logger.Debug("watchPipelineRun: context cancelled")
465 return
466 case ev, ok := <-w.ResultChan():
467 if !ok {
468 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling")
469 p.pollPipelineRun(ctx, ref, logger, last)
470 return
471 }
472 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object)
473 logger.Debug("watchPipelineRun: watch event",
474 "event_type", ev.Type,
475 "status", status, "terminal", terminal, "ok", ok, "last", last,
476 )
477 if !ok || status == last {
478 if terminal {
479 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status)
480 return
481 }
482 continue
483 }
484 last = status
485 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
486 status, ref.PipelineRunName, nil, nil); err != nil {
487 logger.Error("publish tekton status", "err", err, "status", status)
488 continue
489 }
490 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal)
491 if terminal {
492 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status)
493 return
494 }
495 }
496 }
497}
498
499func (p *tektonProvider) pollPipelineRun(
500 ctx context.Context,
501 ref TektonRunRef,
502 logger *slog.Logger,
503 last string,
504) {
505 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s")
506 ticker := time.NewTicker(5 * time.Second)
507 defer ticker.Stop()
508 for {
509 select {
510 case <-ctx.Done():
511 logger.Debug("pollPipelineRun: context cancelled")
512 return
513 case <-ticker.C:
514 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
515 ref.PipelineRunName,
516 )
517 if errors.Is(err, k8s.ErrNotFound) {
518 logger.Warn("PipelineRun disappeared while watching")
519 return
520 }
521 if err != nil {
522 logger.Debug("get PipelineRun status", "err", err)
523 continue
524 }
525 status, terminal, ok := mapTektonPipelineRunStatus(obj)
526 logger.Debug("pollPipelineRun: poll tick",
527 "status", status, "terminal", terminal, "ok", ok, "last", last,
528 )
529 if !ok || status == last {
530 if terminal {
531 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status)
532 return
533 }
534 continue
535 }
536 last = status
537 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
538 status, ref.PipelineRunName, nil, nil); err != nil {
539 logger.Error("publish tekton status", "err", err, "status", status)
540 continue
541 }
542 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal)
543 if terminal {
544 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status)
545 return
546 }
547 }
548 }
549}
550
551// mapTektonPipelineRunStatus translates Tekton's Succeeded condition
552// into the Tangled status strings consumed by the appview.
553func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) {
554 conditions, ok := obj.NestedSlice("status", "conditions")
555 if !ok || len(conditions) == 0 {
556 slog.Debug("mapTektonPipelineRunStatus: no conditions found",
557 "pipeline_run", obj.GetName(),
558 )
559 return "", false, false
560 }
561 for _, raw := range conditions {
562 cond, _ := raw.(map[string]interface{})
563 condType, _ := cond["type"].(string)
564 condStatus, _ := cond["status"].(string)
565 reason, _ := cond["reason"].(string)
566 message, _ := cond["message"].(string)
567 slog.Debug("mapTektonPipelineRunStatus: condition",
568 "pipeline_run", obj.GetName(),
569 "type", condType,
570 "status", condStatus,
571 "reason", reason,
572 "message", message,
573 )
574 if condType != "Succeeded" {
575 continue
576 }
577 switch condStatus {
578 case "True":
579 return "success", true, true
580 case "False":
581 if tektonReasonCancelled(reason) {
582 return "cancelled", true, true
583 }
584 return "failed", true, true
585 case "Unknown":
586 return "running", false, true
587 default:
588 return "", false, false
589 }
590 }
591 return "", false, false
592}
593
594func tektonReasonCancelled(reason string) bool {
595 r := strings.ToLower(reason)
596 return strings.Contains(r, "cancel") || strings.Contains(r, "stop")
597}
598
599func (p *tektonProvider) Logs(
600 ctx context.Context,
601 knot string,
602 pipelineRkey string,
603 workflow string,
604) (<-chan LogLine, error) {
605 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow)
606 if err != nil {
607 return nil, fmt.Errorf("lookup tekton run mapping: %w", err)
608 }
609 if ref == nil {
610 // No mapping at all means this provider never spawned a
611 // PipelineRun for the requested tuple, so a 404 is the
612 // honest answer.
613 return nil, ErrLogsNotFound
614 }
615
616 // At this point the workflow *was* spawned — we have a row in the
617 // store mapping the tuple to a PipelineRun. The TaskRuns the
618 // PipelineRun fans out to are created asynchronously by Tekton
619 // once the run is admitted, so a freshly-spawned or still-queueing
620 // PipelineRun will momentarily report zero TaskRuns. Returning
621 // ErrLogsNotFound here would mistranslate that into a 404 at the
622 // HTTP layer (see provider.go contract: ErrLogsNotFound means the
623 // workflow never ran), making just-spawned runs look nonexistent
624 // to the appview. Instead, hand back an open channel and poll
625 // inside the goroutine until TaskRuns materialize, ctx is
626 // cancelled, or the PipelineRun reaches a terminal state with
627 // nothing to stream.
628 out := make(chan LogLine, 32)
629 go func() {
630 defer close(out)
631 p.streamPipelineRunLogs(ctx, out, *ref)
632 }()
633 return out, nil
634}
635
636// streamPipelineRunLogs drives the Logs channel for a single PipelineRun.
637// It polls for TaskRuns until the PipelineRun is terminal (or ctx is
638// cancelled), streaming each TaskRun's logs exactly once.
639//
640// The loop is deliberately a poll rather than a one-shot pass for two
641// reasons:
642//
643// 1. Live runs need follow semantics. Streaming a still-running TaskRun
644// uses Follow=true on the pod log stream, so streamTaskRunLogs only
645// returns once the container actually terminates. The outer loop
646// then re-checks for new TaskRuns and the PipelineRun's terminal
647// state, instead of closing the channel mid-run.
648//
649// 2. PipelineRuns can spawn additional TaskRuns over time (sequential
650// `runAfter` tasks, finally blocks, retries). A single snapshot
651// would silently drop any TaskRun that appears after the snapshot,
652// even though the workflow is still running.
653//
654// The loop terminates only when the PipelineRun is terminal AND the
655// most recent listing produced no new TaskRuns, which together mean no
656// further TaskRuns will ever appear.
657func (p *tektonProvider) streamPipelineRunLogs(
658 ctx context.Context,
659 out chan<- LogLine,
660 ref TektonRunRef,
661) {
662 const pollInterval = 1 * time.Second
663 seen := map[string]bool{}
664 stepID := 0
665 for {
666 if err := ctx.Err(); err != nil {
667 return
668 }
669
670 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref)
671 if err != nil {
672 p.log.Debug("Logs: list TaskRuns failed",
673 "err", err, "pipeline_run", ref.PipelineRunName,
674 )
675 }
676 // Snapshot the terminal state *after* the listing so we never
677 // observe terminal=true while still missing a TaskRun that
678 // existed at list time. The reverse race (terminal observed
679 // before a TaskRun spawn) is handled by the next iteration: we
680 // only exit when terminal is true AND no new TaskRuns showed up
681 // in this pass.
682 terminal := p.isPipelineRunTerminal(ctx, ref)
683
684 var fresh []k8s.Object
685 for _, tr := range taskRuns {
686 name := tr.GetName()
687 if name == "" || seen[name] {
688 continue
689 }
690 seen[name] = true
691 fresh = append(fresh, tr)
692 }
693 p.log.Debug("Logs: poll iteration",
694 "pipeline_run", ref.PipelineRunName,
695 "task_runs_total", len(taskRuns),
696 "task_runs_new", len(fresh),
697 "terminal", terminal,
698 )
699
700 for _, tr := range fresh {
701 taskName := tr.GetName()
702 if taskName == "" {
703 taskName = fmt.Sprintf("task %d", stepID)
704 }
705 p.log.Debug("Logs: streaming TaskRun",
706 "task_run", taskName, "step_id", stepID, "terminal", terminal,
707 )
708 if !sendLine(ctx, out, LogLine{
709 Kind: LogKindControl,
710 Time: time.Now(),
711 Content: taskName,
712 StepId: stepID,
713 StepStatus: StepStatusStart,
714 }) {
715 return
716 }
717
718 // terminal is the snapshot taken at the top of this
719 // iteration. If the pipeline was terminal then, all
720 // TaskRuns we see are guaranteed complete and we can
721 // take the snapshot fast-path; otherwise we follow the
722 // pod logs live so StepStatusEnd is only emitted once
723 // the container actually exits.
724 if terminal {
725 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID)
726 } else {
727 p.streamTaskRunLogs(ctx, out, ref, tr, stepID)
728 }
729
730 if !sendLine(ctx, out, LogLine{
731 Kind: LogKindControl,
732 Time: time.Now(),
733 Content: taskName,
734 StepId: stepID,
735 StepStatus: StepStatusEnd,
736 }) {
737 return
738 }
739 p.log.Debug("Logs: finished TaskRun",
740 "task_run", taskName, "step_id", stepID,
741 )
742 stepID++
743 }
744
745 // Done condition: the PipelineRun is terminal AND we found no
746 // new TaskRuns this iteration. Both are required because a
747 // terminal PipelineRun can still expose a freshly-listed
748 // TaskRun whose pod we haven't drained yet.
749 if terminal && len(fresh) == 0 {
750 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns",
751 "pipeline_run", ref.PipelineRunName,
752 )
753 return
754 }
755
756 // When we processed new TaskRuns this round, loop again
757 // immediately to re-list — Follow=true streaming may have
758 // blocked us long enough for additional TaskRuns or terminal
759 // transitions to have happened.
760 if len(fresh) > 0 {
761 continue
762 }
763
764 select {
765 case <-ctx.Done():
766 return
767 case <-time.After(pollInterval):
768 }
769 }
770}
771
772// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now.
773func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool {
774 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
775 ref.PipelineRunName,
776 )
777 if err != nil {
778 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName)
779 return false
780 }
781 _, terminal, ok := mapTektonPipelineRunStatus(obj)
782 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok)
783 return ok && terminal
784}
785
786// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed.
787// It reads each step container's logs in one shot using all-containers traversal,
788// and also inlines the Tekton step status from the TaskRun (exit code, reason) as
789// control messages so the caller gets full context without needing to watch for events.
790func (p *tektonProvider) fetchCompletedTaskRunLogs(
791 ctx context.Context,
792 out chan<- LogLine,
793 ref TektonRunRef,
794 tr k8s.Object,
795 stepID int,
796) {
797 trName := tr.GetName()
798 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName)
799 if err != nil {
800 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err,
801 "task_run", trName, "pipeline_run", ref.PipelineRunName,
802 )
803 return
804 }
805 p.log.Debug("fetchCompletedTaskRunLogs: found pods",
806 "task_run", trName, "pod_count", len(pods),
807 )
808
809 // Emit a summary line from the TaskRun status (steps[*].terminated) so we
810 // get exit codes and reasons even if the pod logs are sparse.
811 steps, _ := tr.NestedSlice("status", "steps")
812 for _, rawStep := range steps {
813 step, _ := rawStep.(map[string]any)
814 stepName, _ := step["name"].(string)
815 term, _ := step["terminated"].(map[string]any)
816 if term == nil {
817 continue
818 }
819 exitCode := numberToInt64(term["exitCode"])
820 reason, _ := term["reason"].(string)
821 msg, _ := term["message"].(string)
822 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason)
823 if msg != "" {
824 line += " " + msg
825 }
826 p.log.Debug("fetchCompletedTaskRunLogs: step terminated",
827 "task_run", trName, "step", stepName,
828 "exit_code", exitCode, "reason", reason,
829 )
830 if !sendLine(ctx, out, LogLine{
831 Kind: LogKindData,
832 Time: time.Now(),
833 Content: line + "\n",
834 StepId: stepID,
835 Stream: "stdout",
836 }) {
837 return
838 }
839 }
840
841 for _, pod := range pods {
842 containers := append([]k8s.Container(nil), pod.InitContainers...)
843 containers = append(containers, pod.Containers...)
844 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers",
845 "pod", pod.Name, "container_count", len(containers),
846 )
847 for _, c := range containers {
848 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs",
849 "pod", pod.Name, "container", c.Name,
850 )
851 // Snapshot read (Follow=false) is correct here: the TaskRun
852 // has already terminated, so the API server has the full log
853 // available and there is nothing more to wait for.
854 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
855 k8s.LogOptions{Follow: false},
856 )
857 if err != nil {
858 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err,
859 "pod", pod.Name, "container", c.Name,
860 )
861 continue
862 }
863 p.sendReaderLines(ctx, out, rc, stepID)
864 _ = rc.Close()
865 p.log.Debug("fetchCompletedTaskRunLogs: done reading container",
866 "pod", pod.Name, "container", c.Name,
867 )
868 }
869 }
870}
871
872func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) {
873 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{
874 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName,
875 })
876 if err != nil {
877 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err)
878 }
879 items := append([]k8s.Object(nil), list...)
880 sort.Slice(items, func(i, j int) bool {
881 ti := items[i].GetCreationTimestamp()
882 tj := items[j].GetCreationTimestamp()
883 return ti.Before(tj)
884 })
885 return items, nil
886}
887
888func (p *tektonProvider) streamTaskRunLogs(
889 ctx context.Context,
890 out chan<- LogLine,
891 ref TektonRunRef,
892 tr k8s.Object,
893 stepID int,
894) {
895 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName())
896 if err != nil {
897 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err,
898 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName,
899 )
900 return
901 }
902 p.log.Debug("streamTaskRunLogs: found pods",
903 "task_run", tr.GetName(), "pod_count", len(pods),
904 )
905 for _, pod := range pods {
906 containers := append([]k8s.Container(nil), pod.InitContainers...)
907 containers = append(containers, pod.Containers...)
908 p.log.Debug("streamTaskRunLogs: streaming pod containers",
909 "pod", pod.Name, "container_count", len(containers),
910 )
911 for _, c := range containers {
912 p.log.Debug("streamTaskRunLogs: streaming container",
913 "pod", pod.Name, "container", c.Name, "step_id", stepID,
914 )
915 // Live tail with Follow=true: the apiserver holds the
916 // connection open until the container terminates (or ctx is
917 // cancelled), so sendReaderLines only returns once the
918 // container is actually done. Without this, the read EOFs at
919 // the current tail position and the caller emits a spurious
920 // StepStatusEnd while the step is still running.
921 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
922 k8s.LogOptions{Follow: true},
923 )
924 if err != nil {
925 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err,
926 "pod", pod.Name, "container", c.Name,
927 )
928 continue
929 }
930 p.sendReaderLines(ctx, out, rc, stepID)
931 _ = rc.Close()
932 p.log.Debug("streamTaskRunLogs: finished container",
933 "pod", pod.Name, "container", c.Name,
934 )
935 }
936 }
937}
938
939func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) {
940 list, err := p.client.ListPods(ctx, namespace,
941 "tekton.dev/taskRun="+taskRun,
942 )
943 if err != nil {
944 return nil, fmt.Errorf("list pods: %w", err)
945 }
946 pods := append([]k8s.Pod(nil), list...)
947 sort.Slice(pods, func(i, j int) bool {
948 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp)
949 })
950 return pods, nil
951}
952
953func (p *tektonProvider) sendReaderLines(
954 ctx context.Context,
955 out chan<- LogLine,
956 rc io.Reader,
957 stepID int,
958) {
959 scanner := bufio.NewScanner(rc)
960 for scanner.Scan() {
961 if !sendLine(ctx, out, LogLine{
962 Kind: LogKindData,
963 Time: time.Now(),
964 Content: scanner.Text() + "\n",
965 StepId: stepID,
966 Stream: "stdout",
967 }) {
968 return
969 }
970 }
971 if err := scanner.Err(); err != nil {
972 p.log.Debug("scan pod log", "err", err)
973 }
974}
975
976func numberToInt64(value any) int64 {
977 switch v := value.(type) {
978 case int64:
979 return v
980 case int:
981 return int64(v)
982 case float64:
983 return int64(v)
984 case json.Number:
985 i, _ := v.Int64()
986 return i
987 default:
988 return 0
989 }
990}
991
992func (p *tektonProvider) publishStatus(
993 ctx context.Context,
994 pipelineURI, workflow, status, runName string,
995 errMsg *string,
996 exitCode *int64,
997) error {
998 rec := tangled.PipelineStatus{
999 LexiconTypeID: tangled.PipelineStatusNSID,
1000 Pipeline: pipelineURI,
1001 Workflow: workflow,
1002 Status: status,
1003 CreatedAt: time.Now().UTC().Format(time.RFC3339),
1004 Error: errMsg,
1005 ExitCode: exitCode,
1006 }
1007 body, err := json.Marshal(rec)
1008 if err != nil {
1009 return fmt.Errorf("marshal pipeline.status: %w", err)
1010 }
1011 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano())
1012 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
1013 return fmt.Errorf("publish pipeline.status: %w", err)
1014 }
1015 return nil
1016}
1017
1018func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string {
1019 h := sha256.Sum256([]byte(strings.Join(
1020 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00",
1021 )))
1022 suffix := hex.EncodeToString(h[:])[:12]
1023 base := dnsLabel("tack-" + workflow)
1024 maxBase := 63 - len(suffix) - 1
1025 if len(base) > maxBase {
1026 base = strings.TrimRight(base[:maxBase], "-")
1027 }
1028 if base == "" {
1029 base = "tack"
1030 }
1031 return base + "-" + suffix
1032}
1033
1034func dnsLabel(s string) string {
1035 var b strings.Builder
1036 lastDash := false
1037 for _, r := range strings.ToLower(s) {
1038 ok := unicode.IsLetter(r) || unicode.IsDigit(r)
1039 if ok {
1040 b.WriteRune(r)
1041 lastDash = false
1042 continue
1043 }
1044 if !lastDash {
1045 b.WriteByte('-')
1046 lastDash = true
1047 }
1048 }
1049 return strings.Trim(b.String(), "-")
1050}
1051
1052func labelValue(s string) string {
1053 v := dnsLabel(s)
1054 if len(v) > 63 {
1055 v = strings.TrimRight(v[:63], "-")
1056 }
1057 if v == "" {
1058 return "unknown"
1059 }
1060 return v
1061}