Stitch any CI into Tangled
1package main
2
3// tektonProvider implements Provider by creating Tekton PipelineRuns
4// directly inside the Kubernetes cluster. Tack already receives and
5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would
6// duplicate the event-to-run translation layer instead of simplifying it.
7
8import (
9 "bufio"
10 "context"
11 "crypto/sha256"
12 "encoding/hex"
13 "encoding/json"
14 "errors"
15 "fmt"
16 "io"
17 "log/slog"
18 "sort"
19 "strings"
20 "time"
21 "unicode"
22
23 "tangled.org/core/api/tangled"
24
25 "go.mitchellh.com/tack/internal/k8s"
26 "go.yaml.in/yaml/v2"
27)
28
29const (
30 tektonAPIVersion = "tekton.dev/v1"
31 tektonRunKind = "PipelineRun"
32
33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by"
34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
35 tektonLabelWorkflow = "tack.mitchellh.com/workflow"
36
37 tektonAnnotationKnot = "tack.mitchellh.com/knot"
38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow"
40 tektonAnnotationActor = "tack.mitchellh.com/actor"
41 tektonAnnotationCommit = "tack.mitchellh.com/commit"
42 tektonAnnotationBranch = "tack.mitchellh.com/branch"
43)
44
45var (
46 pipelineRunsGVR = k8s.GVR{
47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns",
48 }
49 taskRunsGVR = k8s.GVR{
50 Group: "tekton.dev", Version: "v1", Resource: "taskruns",
51 }
52)
53
54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML.
55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are
56// deliberately string-only in v1: tack is meant to select an existing
57// runner and pass a small amount of routing data, not mirror Tekton's
58// entire PipelineRun API.
59type tektonWorkflowConfig struct {
60 Pipeline string `yaml:"pipeline"`
61 ServiceAccount string `yaml:"service_account"`
62 Params map[string]string `yaml:"params"`
63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"`
64}
65
66type tektonWorkspaceConfig struct {
67 Name string `yaml:"name"`
68 AccessModes []string `yaml:"access_modes"`
69 Storage *string `yaml:"storage"`
70 PVC *string `yaml:"pvc"`
71 Secret *string `yaml:"secret"`
72}
73
74type tektonWorkflowDoc struct {
75 Tack struct {
76 Tekton tektonWorkflowConfig `yaml:"tekton"`
77 } `yaml:"tack"`
78}
79
80// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body.
81func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) {
82 if strings.TrimSpace(raw) == "" {
83 return nil, errors.New("workflow body is empty")
84 }
85 var doc tektonWorkflowDoc
86 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
87 return nil, fmt.Errorf("parse workflow yaml: %w", err)
88 }
89 cfg := doc.Tack.Tekton
90 if cfg.Pipeline == "" {
91 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required")
92 }
93 return &cfg, nil
94}
95
96type tektonProvider struct {
97 br *broker
98 st *store
99 log *slog.Logger
100 client k8s.Client
101 namespace string
102}
103
104var _ Provider = (*tektonProvider)(nil)
105
106func newTektonProvider(
107 br *broker,
108 st *store,
109 client k8s.Client,
110 namespace string,
111 log *slog.Logger,
112) *tektonProvider {
113 return &tektonProvider{
114 br: br,
115 st: st,
116 log: log.With("component", "provider", "kind", "tekton"),
117 client: client,
118 namespace: namespace,
119 }
120}
121
122func newInClusterTektonProvider(
123 br *broker,
124 st *store,
125 namespace string,
126 log *slog.Logger,
127) (*tektonProvider, error) {
128 client, err := k8s.NewInClusterClient()
129 if err != nil {
130 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err)
131 }
132 return newTektonProvider(br, st, client, namespace, log), nil
133}
134
135func (p *tektonProvider) Spawn(
136 ctx context.Context,
137 knot string,
138 pipelineRkey string,
139 actor string,
140 trigger *tangled.Pipeline_TriggerMetadata,
141 workflows []*tangled.Pipeline_Workflow,
142) {
143 if len(workflows) == 0 {
144 p.log.Warn("pipeline has no workflows; nothing to spawn",
145 "knot", knot, "rkey", pipelineRkey,
146 )
147 return
148 }
149 for _, wf := range workflows {
150 if wf == nil || wf.Name == "" {
151 continue
152 }
153 wf := wf
154 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
155 }
156}
157
158func (p *tektonProvider) spawnWorkflow(
159 ctx context.Context,
160 knot string,
161 pipelineRkey string,
162 actor string,
163 trigger *tangled.Pipeline_TriggerMetadata,
164 wf *tangled.Pipeline_Workflow,
165) {
166 logger := p.log.With(
167 "knot", knot,
168 "pipeline_rkey", pipelineRkey,
169 "workflow", wf.Name,
170 "actor", actor,
171 )
172
173 cfg, err := parseTektonWorkflowConfig(wf.Raw)
174 if err != nil {
175 logger.Error("invalid workflow config; refusing to spawn", "err", err)
176 return
177 }
178 commit, branch := triggerCommitAndBranch(trigger)
179 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch)
180 pr := buildTektonPipelineRun(
181 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf,
182 )
183
184 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr)
185 if errors.Is(err, k8s.ErrAlreadyExists) {
186 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name)
187 }
188 if err != nil {
189 logger.Error("create tekton PipelineRun", "err", err,
190 "namespace", p.namespace, "pipeline_run", name,
191 "pipeline", cfg.Pipeline,
192 )
193 return
194 }
195
196 ref := TektonRunRef{
197 Knot: knot,
198 PipelineRkey: pipelineRkey,
199 Workflow: wf.Name,
200 Namespace: p.namespace,
201 PipelineRunName: name,
202 PipelineRunUID: created.GetUID(),
203 PipelineName: cfg.Pipeline,
204 PipelineURI: pipelineATURI(knot, pipelineRkey),
205 }
206 if err := p.st.InsertTektonRun(ctx, ref); err != nil {
207 logger.Error("persist tekton run mapping", "err", err,
208 "pipeline_run", name,
209 )
210 return
211 }
212
213 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name,
214 "pending", name, nil, nil); err != nil {
215 logger.Error("publish initial pending status", "err", err)
216 }
217
218 logger.Info("tekton PipelineRun created",
219 "namespace", p.namespace,
220 "pipeline", cfg.Pipeline,
221 "pipeline_run", name,
222 "uid", ref.PipelineRunUID,
223 )
224 go p.watchPipelineRun(ctx, ref)
225}
226
227func buildTektonPipelineRun(
228 namespace, name string,
229 cfg *tektonWorkflowConfig,
230 knot, pipelineRkey, actor, commit, branch string,
231 wf *tangled.Pipeline_Workflow,
232) k8s.Object {
233 obj := k8s.Object{
234 "apiVersion": tektonAPIVersion,
235 "kind": tektonRunKind,
236 "metadata": map[string]any{
237 "name": name,
238 "namespace": namespace,
239 "labels": map[string]any{
240 tektonLabelManagedBy: "tack",
241 tektonLabelPipelineRkey: labelValue(pipelineRkey),
242 tektonLabelWorkflow: labelValue(wf.Name),
243 },
244 "annotations": map[string]any{
245 tektonAnnotationKnot: knot,
246 tektonAnnotationPipelineRkey: pipelineRkey,
247 tektonAnnotationWorkflow: wf.Name,
248 tektonAnnotationActor: actor,
249 tektonAnnotationCommit: commit,
250 tektonAnnotationBranch: branch,
251 },
252 },
253 "spec": map[string]any{
254 "pipelineRef": map[string]any{
255 "name": cfg.Pipeline,
256 },
257 "params": []any{
258 map[string]any{
259 "name": "commit",
260 "value": commit,
261 },
262 map[string]any{
263 "name": "branch",
264 "value": branch,
265 },
266 map[string]any{
267 "name": "actor",
268 "value": actor,
269 },
270 },
271 },
272 }
273
274 spec := obj["spec"].(map[string]any)
275
276 if len(cfg.Workspaces) != 0 {
277 spec["podTemplate"] = map[string]any{
278 "securityContext": map[string]any{
279 "fsGroup": 65532,
280 },
281 }
282
283 workspaces := []any{}
284
285 for _, ws := range cfg.Workspaces {
286 switch {
287 case ws.Storage != nil:
288 workspaces = append(workspaces, map[string]any{
289 "name": ws.Name,
290 "volumeClaimTemplate": map[string]any{
291 "spec": map[string]any{
292 "accessModes": ws.AccessModes,
293 "resources": map[string]any{
294 "requests": map[string]any{
295 "storage": *ws.Storage,
296 },
297 },
298 },
299 },
300 })
301
302 case ws.PVC != nil:
303 workspaces = append(workspaces, map[string]any{
304 "name": ws.Name,
305 "persistentVolumeClaim": map[string]any{
306 "claimName": *ws.PVC,
307 },
308 })
309
310 case ws.Secret != nil:
311 workspaces = append(workspaces, map[string]any{
312 "name": ws.Name,
313 "secret": map[string]any{
314 "secretName": *ws.Secret,
315 },
316 })
317 }
318 }
319
320 spec["workspaces"] = workspaces
321 }
322
323 if cfg.ServiceAccount != "" {
324 spec["serviceAccountName"] = cfg.ServiceAccount
325 }
326 if len(cfg.Params) > 0 {
327 keys := make([]string, 0, len(cfg.Params))
328 for key := range cfg.Params {
329 keys = append(keys, key)
330 }
331 sort.Strings(keys)
332 params := make([]any, 0, len(keys))
333 for _, key := range keys {
334 params = append(params, map[string]any{
335 "name": key,
336 "value": cfg.Params[key],
337 })
338 }
339 spec["params"] = params
340 }
341 return obj
342}
343
344func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) {
345 logger := p.log.With(
346 "knot", ref.Knot,
347 "pipeline_rkey", ref.PipelineRkey,
348 "workflow", ref.Workflow,
349 "namespace", ref.Namespace,
350 "pipeline_run", ref.PipelineRunName,
351 )
352
353 logger.Debug("watchPipelineRun: starting")
354
355 last := ""
356 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
357 ref.PipelineRunName); err == nil {
358 status, terminal, ok := mapTektonPipelineRunStatus(obj)
359 logger.Debug("watchPipelineRun: initial status read",
360 "status", status, "terminal", terminal, "ok", ok,
361 )
362 if ok {
363 last = status
364 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
365 status, ref.PipelineRunName, nil, nil); err != nil {
366 logger.Error("publish tekton status", "err", err, "status", status)
367 }
368 if terminal {
369 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status)
370 return
371 }
372 }
373 } else if errors.Is(err, k8s.ErrNotFound) {
374 logger.Warn("PipelineRun disappeared while watching")
375 return
376 } else {
377 logger.Debug("initial PipelineRun status read", "err", err)
378 }
379
380 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace,
381 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName},
382 )
383 if err != nil {
384 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err)
385 p.pollPipelineRun(ctx, ref, logger, last)
386 return
387 }
388 defer w.Stop()
389
390 logger.Debug("watchPipelineRun: watch established; entering event loop")
391 for {
392 select {
393 case <-ctx.Done():
394 logger.Debug("watchPipelineRun: context cancelled")
395 return
396 case ev, ok := <-w.ResultChan():
397 if !ok {
398 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling")
399 p.pollPipelineRun(ctx, ref, logger, last)
400 return
401 }
402 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object)
403 logger.Debug("watchPipelineRun: watch event",
404 "event_type", ev.Type,
405 "status", status, "terminal", terminal, "ok", ok, "last", last,
406 )
407 if !ok || status == last {
408 if terminal {
409 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status)
410 return
411 }
412 continue
413 }
414 last = status
415 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
416 status, ref.PipelineRunName, nil, nil); err != nil {
417 logger.Error("publish tekton status", "err", err, "status", status)
418 continue
419 }
420 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal)
421 if terminal {
422 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status)
423 return
424 }
425 }
426 }
427}
428
429func (p *tektonProvider) pollPipelineRun(
430 ctx context.Context,
431 ref TektonRunRef,
432 logger *slog.Logger,
433 last string,
434) {
435 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s")
436 ticker := time.NewTicker(5 * time.Second)
437 defer ticker.Stop()
438 for {
439 select {
440 case <-ctx.Done():
441 logger.Debug("pollPipelineRun: context cancelled")
442 return
443 case <-ticker.C:
444 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
445 ref.PipelineRunName,
446 )
447 if errors.Is(err, k8s.ErrNotFound) {
448 logger.Warn("PipelineRun disappeared while watching")
449 return
450 }
451 if err != nil {
452 logger.Debug("get PipelineRun status", "err", err)
453 continue
454 }
455 status, terminal, ok := mapTektonPipelineRunStatus(obj)
456 logger.Debug("pollPipelineRun: poll tick",
457 "status", status, "terminal", terminal, "ok", ok, "last", last,
458 )
459 if !ok || status == last {
460 if terminal {
461 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status)
462 return
463 }
464 continue
465 }
466 last = status
467 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
468 status, ref.PipelineRunName, nil, nil); err != nil {
469 logger.Error("publish tekton status", "err", err, "status", status)
470 continue
471 }
472 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal)
473 if terminal {
474 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status)
475 return
476 }
477 }
478 }
479}
480
481// mapTektonPipelineRunStatus translates Tekton's Succeeded condition
482// into the Tangled status strings consumed by the appview.
483func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) {
484 conditions, ok := obj.NestedSlice("status", "conditions")
485 if !ok || len(conditions) == 0 {
486 slog.Debug("mapTektonPipelineRunStatus: no conditions found",
487 "pipeline_run", obj.GetName(),
488 )
489 return "", false, false
490 }
491 for _, raw := range conditions {
492 cond, _ := raw.(map[string]interface{})
493 condType, _ := cond["type"].(string)
494 condStatus, _ := cond["status"].(string)
495 reason, _ := cond["reason"].(string)
496 message, _ := cond["message"].(string)
497 slog.Debug("mapTektonPipelineRunStatus: condition",
498 "pipeline_run", obj.GetName(),
499 "type", condType,
500 "status", condStatus,
501 "reason", reason,
502 "message", message,
503 )
504 if condType != "Succeeded" {
505 continue
506 }
507 switch condStatus {
508 case "True":
509 return "success", true, true
510 case "False":
511 if tektonReasonCancelled(reason) {
512 return "cancelled", true, true
513 }
514 return "failed", true, true
515 case "Unknown":
516 return "running", false, true
517 default:
518 return "", false, false
519 }
520 }
521 return "", false, false
522}
523
524func tektonReasonCancelled(reason string) bool {
525 r := strings.ToLower(reason)
526 return strings.Contains(r, "cancel") || strings.Contains(r, "stop")
527}
528
529func (p *tektonProvider) Logs(
530 ctx context.Context,
531 knot string,
532 pipelineRkey string,
533 workflow string,
534) (<-chan LogLine, error) {
535 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow)
536 if err != nil {
537 return nil, fmt.Errorf("lookup tekton run mapping: %w", err)
538 }
539 if ref == nil {
540 // No mapping at all means this provider never spawned a
541 // PipelineRun for the requested tuple, so a 404 is the
542 // honest answer.
543 return nil, ErrLogsNotFound
544 }
545
546 // At this point the workflow *was* spawned — we have a row in the
547 // store mapping the tuple to a PipelineRun. The TaskRuns the
548 // PipelineRun fans out to are created asynchronously by Tekton
549 // once the run is admitted, so a freshly-spawned or still-queueing
550 // PipelineRun will momentarily report zero TaskRuns. Returning
551 // ErrLogsNotFound here would mistranslate that into a 404 at the
552 // HTTP layer (see provider.go contract: ErrLogsNotFound means the
553 // workflow never ran), making just-spawned runs look nonexistent
554 // to the appview. Instead, hand back an open channel and poll
555 // inside the goroutine until TaskRuns materialize, ctx is
556 // cancelled, or the PipelineRun reaches a terminal state with
557 // nothing to stream.
558 out := make(chan LogLine, 32)
559 go func() {
560 defer close(out)
561 p.streamPipelineRunLogs(ctx, out, *ref)
562 }()
563 return out, nil
564}
565
566// streamPipelineRunLogs drives the Logs channel for a single PipelineRun.
567// It polls for TaskRuns until the PipelineRun is terminal (or ctx is
568// cancelled), streaming each TaskRun's logs exactly once.
569//
570// The loop is deliberately a poll rather than a one-shot pass for two
571// reasons:
572//
573// 1. Live runs need follow semantics. Streaming a still-running TaskRun
574// uses Follow=true on the pod log stream, so streamTaskRunLogs only
575// returns once the container actually terminates. The outer loop
576// then re-checks for new TaskRuns and the PipelineRun's terminal
577// state, instead of closing the channel mid-run.
578//
579// 2. PipelineRuns can spawn additional TaskRuns over time (sequential
580// `runAfter` tasks, finally blocks, retries). A single snapshot
581// would silently drop any TaskRun that appears after the snapshot,
582// even though the workflow is still running.
583//
584// The loop terminates only when the PipelineRun is terminal AND the
585// most recent listing produced no new TaskRuns, which together mean no
586// further TaskRuns will ever appear.
587func (p *tektonProvider) streamPipelineRunLogs(
588 ctx context.Context,
589 out chan<- LogLine,
590 ref TektonRunRef,
591) {
592 const pollInterval = 1 * time.Second
593 seen := map[string]bool{}
594 stepID := 0
595 for {
596 if err := ctx.Err(); err != nil {
597 return
598 }
599
600 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref)
601 if err != nil {
602 p.log.Debug("Logs: list TaskRuns failed",
603 "err", err, "pipeline_run", ref.PipelineRunName,
604 )
605 }
606 // Snapshot the terminal state *after* the listing so we never
607 // observe terminal=true while still missing a TaskRun that
608 // existed at list time. The reverse race (terminal observed
609 // before a TaskRun spawn) is handled by the next iteration: we
610 // only exit when terminal is true AND no new TaskRuns showed up
611 // in this pass.
612 terminal := p.isPipelineRunTerminal(ctx, ref)
613
614 var fresh []k8s.Object
615 for _, tr := range taskRuns {
616 name := tr.GetName()
617 if name == "" || seen[name] {
618 continue
619 }
620 seen[name] = true
621 fresh = append(fresh, tr)
622 }
623 p.log.Debug("Logs: poll iteration",
624 "pipeline_run", ref.PipelineRunName,
625 "task_runs_total", len(taskRuns),
626 "task_runs_new", len(fresh),
627 "terminal", terminal,
628 )
629
630 for _, tr := range fresh {
631 taskName := tr.GetName()
632 if taskName == "" {
633 taskName = fmt.Sprintf("task %d", stepID)
634 }
635 p.log.Debug("Logs: streaming TaskRun",
636 "task_run", taskName, "step_id", stepID, "terminal", terminal,
637 )
638 if !sendLine(ctx, out, LogLine{
639 Kind: LogKindControl,
640 Time: time.Now(),
641 Content: taskName,
642 StepId: stepID,
643 StepStatus: StepStatusStart,
644 }) {
645 return
646 }
647
648 // terminal is the snapshot taken at the top of this
649 // iteration. If the pipeline was terminal then, all
650 // TaskRuns we see are guaranteed complete and we can
651 // take the snapshot fast-path; otherwise we follow the
652 // pod logs live so StepStatusEnd is only emitted once
653 // the container actually exits.
654 if terminal {
655 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID)
656 } else {
657 p.streamTaskRunLogs(ctx, out, ref, tr, stepID)
658 }
659
660 if !sendLine(ctx, out, LogLine{
661 Kind: LogKindControl,
662 Time: time.Now(),
663 Content: taskName,
664 StepId: stepID,
665 StepStatus: StepStatusEnd,
666 }) {
667 return
668 }
669 p.log.Debug("Logs: finished TaskRun",
670 "task_run", taskName, "step_id", stepID,
671 )
672 stepID++
673 }
674
675 // Done condition: the PipelineRun is terminal AND we found no
676 // new TaskRuns this iteration. Both are required because a
677 // terminal PipelineRun can still expose a freshly-listed
678 // TaskRun whose pod we haven't drained yet.
679 if terminal && len(fresh) == 0 {
680 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns",
681 "pipeline_run", ref.PipelineRunName,
682 )
683 return
684 }
685
686 // When we processed new TaskRuns this round, loop again
687 // immediately to re-list — Follow=true streaming may have
688 // blocked us long enough for additional TaskRuns or terminal
689 // transitions to have happened.
690 if len(fresh) > 0 {
691 continue
692 }
693
694 select {
695 case <-ctx.Done():
696 return
697 case <-time.After(pollInterval):
698 }
699 }
700}
701
702// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now.
703func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool {
704 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
705 ref.PipelineRunName,
706 )
707 if err != nil {
708 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName)
709 return false
710 }
711 _, terminal, ok := mapTektonPipelineRunStatus(obj)
712 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok)
713 return ok && terminal
714}
715
716// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed.
717// It reads each step container's logs in one shot using all-containers traversal,
718// and also inlines the Tekton step status from the TaskRun (exit code, reason) as
719// control messages so the caller gets full context without needing to watch for events.
720func (p *tektonProvider) fetchCompletedTaskRunLogs(
721 ctx context.Context,
722 out chan<- LogLine,
723 ref TektonRunRef,
724 tr k8s.Object,
725 stepID int,
726) {
727 trName := tr.GetName()
728 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName)
729 if err != nil {
730 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err,
731 "task_run", trName, "pipeline_run", ref.PipelineRunName,
732 )
733 return
734 }
735 p.log.Debug("fetchCompletedTaskRunLogs: found pods",
736 "task_run", trName, "pod_count", len(pods),
737 )
738
739 // Emit a summary line from the TaskRun status (steps[*].terminated) so we
740 // get exit codes and reasons even if the pod logs are sparse.
741 steps, _ := tr.NestedSlice("status", "steps")
742 for _, rawStep := range steps {
743 step, _ := rawStep.(map[string]any)
744 stepName, _ := step["name"].(string)
745 term, _ := step["terminated"].(map[string]any)
746 if term == nil {
747 continue
748 }
749 exitCode := numberToInt64(term["exitCode"])
750 reason, _ := term["reason"].(string)
751 msg, _ := term["message"].(string)
752 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason)
753 if msg != "" {
754 line += " " + msg
755 }
756 p.log.Debug("fetchCompletedTaskRunLogs: step terminated",
757 "task_run", trName, "step", stepName,
758 "exit_code", exitCode, "reason", reason,
759 )
760 if !sendLine(ctx, out, LogLine{
761 Kind: LogKindData,
762 Time: time.Now(),
763 Content: line + "\n",
764 StepId: stepID,
765 Stream: "stdout",
766 }) {
767 return
768 }
769 }
770
771 for _, pod := range pods {
772 containers := append([]k8s.Container(nil), pod.InitContainers...)
773 containers = append(containers, pod.Containers...)
774 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers",
775 "pod", pod.Name, "container_count", len(containers),
776 )
777 for _, c := range containers {
778 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs",
779 "pod", pod.Name, "container", c.Name,
780 )
781 // Snapshot read (Follow=false) is correct here: the TaskRun
782 // has already terminated, so the API server has the full log
783 // available and there is nothing more to wait for.
784 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
785 k8s.LogOptions{Follow: false},
786 )
787 if err != nil {
788 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err,
789 "pod", pod.Name, "container", c.Name,
790 )
791 continue
792 }
793 p.sendReaderLines(ctx, out, rc, stepID)
794 _ = rc.Close()
795 p.log.Debug("fetchCompletedTaskRunLogs: done reading container",
796 "pod", pod.Name, "container", c.Name,
797 )
798 }
799 }
800}
801
802func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) {
803 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{
804 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName,
805 })
806 if err != nil {
807 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err)
808 }
809 items := append([]k8s.Object(nil), list...)
810 sort.Slice(items, func(i, j int) bool {
811 ti := items[i].GetCreationTimestamp()
812 tj := items[j].GetCreationTimestamp()
813 return ti.Before(tj)
814 })
815 return items, nil
816}
817
818func (p *tektonProvider) streamTaskRunLogs(
819 ctx context.Context,
820 out chan<- LogLine,
821 ref TektonRunRef,
822 tr k8s.Object,
823 stepID int,
824) {
825 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName())
826 if err != nil {
827 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err,
828 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName,
829 )
830 return
831 }
832 p.log.Debug("streamTaskRunLogs: found pods",
833 "task_run", tr.GetName(), "pod_count", len(pods),
834 )
835 for _, pod := range pods {
836 containers := append([]k8s.Container(nil), pod.InitContainers...)
837 containers = append(containers, pod.Containers...)
838 p.log.Debug("streamTaskRunLogs: streaming pod containers",
839 "pod", pod.Name, "container_count", len(containers),
840 )
841 for _, c := range containers {
842 p.log.Debug("streamTaskRunLogs: streaming container",
843 "pod", pod.Name, "container", c.Name, "step_id", stepID,
844 )
845 // Live tail with Follow=true: the apiserver holds the
846 // connection open until the container terminates (or ctx is
847 // cancelled), so sendReaderLines only returns once the
848 // container is actually done. Without this, the read EOFs at
849 // the current tail position and the caller emits a spurious
850 // StepStatusEnd while the step is still running.
851 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
852 k8s.LogOptions{Follow: true},
853 )
854 if err != nil {
855 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err,
856 "pod", pod.Name, "container", c.Name,
857 )
858 continue
859 }
860 p.sendReaderLines(ctx, out, rc, stepID)
861 _ = rc.Close()
862 p.log.Debug("streamTaskRunLogs: finished container",
863 "pod", pod.Name, "container", c.Name,
864 )
865 }
866 }
867}
868
869func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) {
870 list, err := p.client.ListPods(ctx, namespace,
871 "tekton.dev/taskRun="+taskRun,
872 )
873 if err != nil {
874 return nil, fmt.Errorf("list pods: %w", err)
875 }
876 pods := append([]k8s.Pod(nil), list...)
877 sort.Slice(pods, func(i, j int) bool {
878 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp)
879 })
880 return pods, nil
881}
882
883func (p *tektonProvider) sendReaderLines(
884 ctx context.Context,
885 out chan<- LogLine,
886 rc io.Reader,
887 stepID int,
888) {
889 scanner := bufio.NewScanner(rc)
890 for scanner.Scan() {
891 if !sendLine(ctx, out, LogLine{
892 Kind: LogKindData,
893 Time: time.Now(),
894 Content: scanner.Text() + "\n",
895 StepId: stepID,
896 Stream: "stdout",
897 }) {
898 return
899 }
900 }
901 if err := scanner.Err(); err != nil {
902 p.log.Debug("scan pod log", "err", err)
903 }
904}
905
906func numberToInt64(value any) int64 {
907 switch v := value.(type) {
908 case int64:
909 return v
910 case int:
911 return int64(v)
912 case float64:
913 return int64(v)
914 case json.Number:
915 i, _ := v.Int64()
916 return i
917 default:
918 return 0
919 }
920}
921
922func (p *tektonProvider) publishStatus(
923 ctx context.Context,
924 pipelineURI, workflow, status, runName string,
925 errMsg *string,
926 exitCode *int64,
927) error {
928 rec := tangled.PipelineStatus{
929 LexiconTypeID: tangled.PipelineStatusNSID,
930 Pipeline: pipelineURI,
931 Workflow: workflow,
932 Status: status,
933 CreatedAt: time.Now().UTC().Format(time.RFC3339),
934 Error: errMsg,
935 ExitCode: exitCode,
936 }
937 body, err := json.Marshal(rec)
938 if err != nil {
939 return fmt.Errorf("marshal pipeline.status: %w", err)
940 }
941 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano())
942 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
943 return fmt.Errorf("publish pipeline.status: %w", err)
944 }
945 return nil
946}
947
948func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string {
949 h := sha256.Sum256([]byte(strings.Join(
950 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00",
951 )))
952 suffix := hex.EncodeToString(h[:])[:12]
953 base := dnsLabel("tack-" + workflow)
954 maxBase := 63 - len(suffix) - 1
955 if len(base) > maxBase {
956 base = strings.TrimRight(base[:maxBase], "-")
957 }
958 if base == "" {
959 base = "tack"
960 }
961 return base + "-" + suffix
962}
963
964func dnsLabel(s string) string {
965 var b strings.Builder
966 lastDash := false
967 for _, r := range strings.ToLower(s) {
968 ok := unicode.IsLetter(r) || unicode.IsDigit(r)
969 if ok {
970 b.WriteRune(r)
971 lastDash = false
972 continue
973 }
974 if !lastDash {
975 b.WriteByte('-')
976 lastDash = true
977 }
978 }
979 return strings.Trim(b.String(), "-")
980}
981
982func labelValue(s string) string {
983 v := dnsLabel(s)
984 if len(v) > 63 {
985 v = strings.TrimRight(v[:63], "-")
986 }
987 if v == "" {
988 return "unknown"
989 }
990 return v
991}