Stitch any CI into Tangled
1package main
2
3// tektonProvider implements Provider by creating Tekton PipelineRuns
4// directly inside the Kubernetes cluster. Tack already receives and
5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would
6// duplicate the event-to-run translation layer instead of simplifying it.
7
8import (
9 "bufio"
10 "context"
11 "crypto/sha256"
12 "encoding/hex"
13 "encoding/json"
14 "errors"
15 "fmt"
16 "io"
17 "log/slog"
18 "sort"
19 "strings"
20 "time"
21 "unicode"
22
23 "tangled.org/core/api/tangled"
24
25 "go.mitchellh.com/tack/internal/k8s"
26 "go.yaml.in/yaml/v2"
27)
28
29const (
30 tektonAPIVersion = "tekton.dev/v1"
31 tektonRunKind = "PipelineRun"
32
33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by"
34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
35 tektonLabelWorkflow = "tack.mitchellh.com/workflow"
36
37 tektonAnnotationKnot = "tack.mitchellh.com/knot"
38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow"
40 tektonAnnotationActor = "tack.mitchellh.com/actor"
41 tektonAnnotationCommit = "tack.mitchellh.com/commit"
42 tektonAnnotationBranch = "tack.mitchellh.com/branch"
43)
44
45var (
46 pipelineRunsGVR = k8s.GVR{
47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns",
48 }
49 taskRunsGVR = k8s.GVR{
50 Group: "tekton.dev", Version: "v1", Resource: "taskruns",
51 }
52)
53
54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML.
55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are
56// deliberately string-only in v1: tack is meant to select an existing
57// runner and pass a small amount of routing data, not mirror Tekton's
58// entire PipelineRun API.
59type tektonWorkflowConfig struct {
60 Pipeline string `yaml:"pipeline"`
61 ServiceAccount string `yaml:"service_account"`
62 Params map[string]string `yaml:"params"`
63 Workspaces []tektonWorkspaceConfig `yaml:"workspaces"`
64}
65
66type tektonWorkspaceConfig struct {
67 Name string `yaml:"name"`
68 AccessModes []string `yaml:"access_modes"`
69 Storage *string `yaml:"storage"`
70 PVC *string `yaml:"pvc"`
71}
72
73type tektonWorkflowDoc struct {
74 Tack struct {
75 Tekton tektonWorkflowConfig `yaml:"tekton"`
76 } `yaml:"tack"`
77}
78
79// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body.
80func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) {
81 if strings.TrimSpace(raw) == "" {
82 return nil, errors.New("workflow body is empty")
83 }
84 var doc tektonWorkflowDoc
85 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
86 return nil, fmt.Errorf("parse workflow yaml: %w", err)
87 }
88 cfg := doc.Tack.Tekton
89 if cfg.Pipeline == "" {
90 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required")
91 }
92 return &cfg, nil
93}
94
95type tektonProvider struct {
96 br *broker
97 st *store
98 log *slog.Logger
99 client k8s.Client
100 namespace string
101}
102
103var _ Provider = (*tektonProvider)(nil)
104
105func newTektonProvider(
106 br *broker,
107 st *store,
108 client k8s.Client,
109 namespace string,
110 log *slog.Logger,
111) *tektonProvider {
112 return &tektonProvider{
113 br: br,
114 st: st,
115 log: log.With("component", "provider", "kind", "tekton"),
116 client: client,
117 namespace: namespace,
118 }
119}
120
121func newInClusterTektonProvider(
122 br *broker,
123 st *store,
124 namespace string,
125 log *slog.Logger,
126) (*tektonProvider, error) {
127 client, err := k8s.NewInClusterClient()
128 if err != nil {
129 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err)
130 }
131 return newTektonProvider(br, st, client, namespace, log), nil
132}
133
134func (p *tektonProvider) Spawn(
135 ctx context.Context,
136 knot string,
137 pipelineRkey string,
138 actor string,
139 trigger *tangled.Pipeline_TriggerMetadata,
140 workflows []*tangled.Pipeline_Workflow,
141) {
142 if len(workflows) == 0 {
143 p.log.Warn("pipeline has no workflows; nothing to spawn",
144 "knot", knot, "rkey", pipelineRkey,
145 )
146 return
147 }
148 for _, wf := range workflows {
149 if wf == nil || wf.Name == "" {
150 continue
151 }
152 wf := wf
153 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
154 }
155}
156
157func (p *tektonProvider) spawnWorkflow(
158 ctx context.Context,
159 knot string,
160 pipelineRkey string,
161 actor string,
162 trigger *tangled.Pipeline_TriggerMetadata,
163 wf *tangled.Pipeline_Workflow,
164) {
165 logger := p.log.With(
166 "knot", knot,
167 "pipeline_rkey", pipelineRkey,
168 "workflow", wf.Name,
169 "actor", actor,
170 )
171
172 cfg, err := parseTektonWorkflowConfig(wf.Raw)
173 if err != nil {
174 logger.Error("invalid workflow config; refusing to spawn", "err", err)
175 return
176 }
177 commit, branch := triggerCommitAndBranch(trigger)
178 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch)
179 pr := buildTektonPipelineRun(
180 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf,
181 )
182
183 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr)
184 if errors.Is(err, k8s.ErrAlreadyExists) {
185 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name)
186 }
187 if err != nil {
188 logger.Error("create tekton PipelineRun", "err", err,
189 "namespace", p.namespace, "pipeline_run", name,
190 "pipeline", cfg.Pipeline,
191 )
192 return
193 }
194
195 ref := TektonRunRef{
196 Knot: knot,
197 PipelineRkey: pipelineRkey,
198 Workflow: wf.Name,
199 Namespace: p.namespace,
200 PipelineRunName: name,
201 PipelineRunUID: created.GetUID(),
202 PipelineName: cfg.Pipeline,
203 PipelineURI: pipelineATURI(knot, pipelineRkey),
204 }
205 if err := p.st.InsertTektonRun(ctx, ref); err != nil {
206 logger.Error("persist tekton run mapping", "err", err,
207 "pipeline_run", name,
208 )
209 return
210 }
211
212 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name,
213 "pending", name, nil, nil); err != nil {
214 logger.Error("publish initial pending status", "err", err)
215 }
216
217 logger.Info("tekton PipelineRun created",
218 "namespace", p.namespace,
219 "pipeline", cfg.Pipeline,
220 "pipeline_run", name,
221 "uid", ref.PipelineRunUID,
222 )
223 go p.watchPipelineRun(ctx, ref)
224}
225
226func buildTektonPipelineRun(
227 namespace, name string,
228 cfg *tektonWorkflowConfig,
229 knot, pipelineRkey, actor, commit, branch string,
230 wf *tangled.Pipeline_Workflow,
231) k8s.Object {
232 obj := k8s.Object{
233 "apiVersion": tektonAPIVersion,
234 "kind": tektonRunKind,
235 "metadata": map[string]any{
236 "name": name,
237 "namespace": namespace,
238 "labels": map[string]any{
239 tektonLabelManagedBy: "tack",
240 tektonLabelPipelineRkey: labelValue(pipelineRkey),
241 tektonLabelWorkflow: labelValue(wf.Name),
242 },
243 "annotations": map[string]any{
244 tektonAnnotationKnot: knot,
245 tektonAnnotationPipelineRkey: pipelineRkey,
246 tektonAnnotationWorkflow: wf.Name,
247 tektonAnnotationActor: actor,
248 tektonAnnotationCommit: commit,
249 tektonAnnotationBranch: branch,
250 },
251 },
252 "spec": map[string]any{
253 "pipelineRef": map[string]any{
254 "name": cfg.Pipeline,
255 },
256 "params": []any{
257 map[string]any{
258 "name": "commit",
259 "value": commit,
260 },
261 map[string]any{
262 "name": "branch",
263 "value": branch,
264 },
265 map[string]any{
266 "name": "actor",
267 "value": actor,
268 },
269 },
270 },
271 }
272
273 spec := obj["spec"].(map[string]any)
274
275 if len(cfg.Workspaces) != 0 {
276 spec["podTemplate"] = map[string]any{
277 "securityContext": map[string]any{
278 "fsGroup": 65532,
279 },
280 }
281
282 workspaces := []any{}
283
284 for _, ws := range cfg.Workspaces {
285 switch {
286 case ws.Storage != nil:
287 workspaces = append(workspaces, map[string]any{
288 "name": ws.Name,
289 "volumeClaimTemplate": map[string]any{
290 "spec": map[string]any{
291 "accessModes": ws.AccessModes,
292 "resources": map[string]any{
293 "requests": map[string]any{
294 "storage": *ws.Storage,
295 },
296 },
297 },
298 },
299 })
300
301 case ws.PVC != nil:
302 workspaces = append(workspaces, map[string]any{
303 "name": ws.Name,
304 "persistentVolumeClaim": map[string]any{
305 "claimName": *ws.PVC,
306 },
307 })
308 }
309 }
310
311 spec["workspaces"] = workspaces
312 }
313
314 if cfg.ServiceAccount != "" {
315 spec["serviceAccountName"] = cfg.ServiceAccount
316 }
317 if len(cfg.Params) > 0 {
318 keys := make([]string, 0, len(cfg.Params))
319 for key := range cfg.Params {
320 keys = append(keys, key)
321 }
322 sort.Strings(keys)
323 params := make([]any, 0, len(keys))
324 for _, key := range keys {
325 params = append(params, map[string]any{
326 "name": key,
327 "value": cfg.Params[key],
328 })
329 }
330 spec["params"] = params
331 }
332 return obj
333}
334
335func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) {
336 logger := p.log.With(
337 "knot", ref.Knot,
338 "pipeline_rkey", ref.PipelineRkey,
339 "workflow", ref.Workflow,
340 "namespace", ref.Namespace,
341 "pipeline_run", ref.PipelineRunName,
342 )
343
344 logger.Debug("watchPipelineRun: starting")
345
346 last := ""
347 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
348 ref.PipelineRunName); err == nil {
349 status, terminal, ok := mapTektonPipelineRunStatus(obj)
350 logger.Debug("watchPipelineRun: initial status read",
351 "status", status, "terminal", terminal, "ok", ok,
352 )
353 if ok {
354 last = status
355 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
356 status, ref.PipelineRunName, nil, nil); err != nil {
357 logger.Error("publish tekton status", "err", err, "status", status)
358 }
359 if terminal {
360 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status)
361 return
362 }
363 }
364 } else if errors.Is(err, k8s.ErrNotFound) {
365 logger.Warn("PipelineRun disappeared while watching")
366 return
367 } else {
368 logger.Debug("initial PipelineRun status read", "err", err)
369 }
370
371 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace,
372 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName},
373 )
374 if err != nil {
375 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err)
376 p.pollPipelineRun(ctx, ref, logger, last)
377 return
378 }
379 defer w.Stop()
380
381 logger.Debug("watchPipelineRun: watch established; entering event loop")
382 for {
383 select {
384 case <-ctx.Done():
385 logger.Debug("watchPipelineRun: context cancelled")
386 return
387 case ev, ok := <-w.ResultChan():
388 if !ok {
389 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling")
390 p.pollPipelineRun(ctx, ref, logger, last)
391 return
392 }
393 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object)
394 logger.Debug("watchPipelineRun: watch event",
395 "event_type", ev.Type,
396 "status", status, "terminal", terminal, "ok", ok, "last", last,
397 )
398 if !ok || status == last {
399 if terminal {
400 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status)
401 return
402 }
403 continue
404 }
405 last = status
406 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
407 status, ref.PipelineRunName, nil, nil); err != nil {
408 logger.Error("publish tekton status", "err", err, "status", status)
409 continue
410 }
411 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal)
412 if terminal {
413 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status)
414 return
415 }
416 }
417 }
418}
419
420func (p *tektonProvider) pollPipelineRun(
421 ctx context.Context,
422 ref TektonRunRef,
423 logger *slog.Logger,
424 last string,
425) {
426 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s")
427 ticker := time.NewTicker(5 * time.Second)
428 defer ticker.Stop()
429 for {
430 select {
431 case <-ctx.Done():
432 logger.Debug("pollPipelineRun: context cancelled")
433 return
434 case <-ticker.C:
435 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
436 ref.PipelineRunName,
437 )
438 if errors.Is(err, k8s.ErrNotFound) {
439 logger.Warn("PipelineRun disappeared while watching")
440 return
441 }
442 if err != nil {
443 logger.Debug("get PipelineRun status", "err", err)
444 continue
445 }
446 status, terminal, ok := mapTektonPipelineRunStatus(obj)
447 logger.Debug("pollPipelineRun: poll tick",
448 "status", status, "terminal", terminal, "ok", ok, "last", last,
449 )
450 if !ok || status == last {
451 if terminal {
452 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status)
453 return
454 }
455 continue
456 }
457 last = status
458 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
459 status, ref.PipelineRunName, nil, nil); err != nil {
460 logger.Error("publish tekton status", "err", err, "status", status)
461 continue
462 }
463 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal)
464 if terminal {
465 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status)
466 return
467 }
468 }
469 }
470}
471
472// mapTektonPipelineRunStatus translates Tekton's Succeeded condition
473// into the Tangled status strings consumed by the appview.
474func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) {
475 conditions, ok := obj.NestedSlice("status", "conditions")
476 if !ok || len(conditions) == 0 {
477 slog.Debug("mapTektonPipelineRunStatus: no conditions found",
478 "pipeline_run", obj.GetName(),
479 )
480 return "", false, false
481 }
482 for _, raw := range conditions {
483 cond, _ := raw.(map[string]interface{})
484 condType, _ := cond["type"].(string)
485 condStatus, _ := cond["status"].(string)
486 reason, _ := cond["reason"].(string)
487 message, _ := cond["message"].(string)
488 slog.Debug("mapTektonPipelineRunStatus: condition",
489 "pipeline_run", obj.GetName(),
490 "type", condType,
491 "status", condStatus,
492 "reason", reason,
493 "message", message,
494 )
495 if condType != "Succeeded" {
496 continue
497 }
498 switch condStatus {
499 case "True":
500 return "success", true, true
501 case "False":
502 if tektonReasonCancelled(reason) {
503 return "cancelled", true, true
504 }
505 return "failed", true, true
506 case "Unknown":
507 return "running", false, true
508 default:
509 return "", false, false
510 }
511 }
512 return "", false, false
513}
514
515func tektonReasonCancelled(reason string) bool {
516 r := strings.ToLower(reason)
517 return strings.Contains(r, "cancel") || strings.Contains(r, "stop")
518}
519
520func (p *tektonProvider) Logs(
521 ctx context.Context,
522 knot string,
523 pipelineRkey string,
524 workflow string,
525) (<-chan LogLine, error) {
526 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow)
527 if err != nil {
528 return nil, fmt.Errorf("lookup tekton run mapping: %w", err)
529 }
530 if ref == nil {
531 // No mapping at all means this provider never spawned a
532 // PipelineRun for the requested tuple, so a 404 is the
533 // honest answer.
534 return nil, ErrLogsNotFound
535 }
536
537 // At this point the workflow *was* spawned — we have a row in the
538 // store mapping the tuple to a PipelineRun. The TaskRuns the
539 // PipelineRun fans out to are created asynchronously by Tekton
540 // once the run is admitted, so a freshly-spawned or still-queueing
541 // PipelineRun will momentarily report zero TaskRuns. Returning
542 // ErrLogsNotFound here would mistranslate that into a 404 at the
543 // HTTP layer (see provider.go contract: ErrLogsNotFound means the
544 // workflow never ran), making just-spawned runs look nonexistent
545 // to the appview. Instead, hand back an open channel and poll
546 // inside the goroutine until TaskRuns materialize, ctx is
547 // cancelled, or the PipelineRun reaches a terminal state with
548 // nothing to stream.
549 out := make(chan LogLine, 32)
550 go func() {
551 defer close(out)
552 p.streamPipelineRunLogs(ctx, out, *ref)
553 }()
554 return out, nil
555}
556
557// streamPipelineRunLogs drives the Logs channel for a single PipelineRun.
558// It polls for TaskRuns until the PipelineRun is terminal (or ctx is
559// cancelled), streaming each TaskRun's logs exactly once.
560//
561// The loop is deliberately a poll rather than a one-shot pass for two
562// reasons:
563//
564// 1. Live runs need follow semantics. Streaming a still-running TaskRun
565// uses Follow=true on the pod log stream, so streamTaskRunLogs only
566// returns once the container actually terminates. The outer loop
567// then re-checks for new TaskRuns and the PipelineRun's terminal
568// state, instead of closing the channel mid-run.
569//
570// 2. PipelineRuns can spawn additional TaskRuns over time (sequential
571// `runAfter` tasks, finally blocks, retries). A single snapshot
572// would silently drop any TaskRun that appears after the snapshot,
573// even though the workflow is still running.
574//
575// The loop terminates only when the PipelineRun is terminal AND the
576// most recent listing produced no new TaskRuns, which together mean no
577// further TaskRuns will ever appear.
578func (p *tektonProvider) streamPipelineRunLogs(
579 ctx context.Context,
580 out chan<- LogLine,
581 ref TektonRunRef,
582) {
583 const pollInterval = 1 * time.Second
584 seen := map[string]bool{}
585 stepID := 0
586 for {
587 if err := ctx.Err(); err != nil {
588 return
589 }
590
591 taskRuns, err := p.taskRunsForPipelineRun(ctx, ref)
592 if err != nil {
593 p.log.Debug("Logs: list TaskRuns failed",
594 "err", err, "pipeline_run", ref.PipelineRunName,
595 )
596 }
597 // Snapshot the terminal state *after* the listing so we never
598 // observe terminal=true while still missing a TaskRun that
599 // existed at list time. The reverse race (terminal observed
600 // before a TaskRun spawn) is handled by the next iteration: we
601 // only exit when terminal is true AND no new TaskRuns showed up
602 // in this pass.
603 terminal := p.isPipelineRunTerminal(ctx, ref)
604
605 var fresh []k8s.Object
606 for _, tr := range taskRuns {
607 name := tr.GetName()
608 if name == "" || seen[name] {
609 continue
610 }
611 seen[name] = true
612 fresh = append(fresh, tr)
613 }
614 p.log.Debug("Logs: poll iteration",
615 "pipeline_run", ref.PipelineRunName,
616 "task_runs_total", len(taskRuns),
617 "task_runs_new", len(fresh),
618 "terminal", terminal,
619 )
620
621 for _, tr := range fresh {
622 taskName := tr.GetName()
623 if taskName == "" {
624 taskName = fmt.Sprintf("task %d", stepID)
625 }
626 p.log.Debug("Logs: streaming TaskRun",
627 "task_run", taskName, "step_id", stepID, "terminal", terminal,
628 )
629 if !sendLine(ctx, out, LogLine{
630 Kind: LogKindControl,
631 Time: time.Now(),
632 Content: taskName,
633 StepId: stepID,
634 StepStatus: StepStatusStart,
635 }) {
636 return
637 }
638
639 // terminal is the snapshot taken at the top of this
640 // iteration. If the pipeline was terminal then, all
641 // TaskRuns we see are guaranteed complete and we can
642 // take the snapshot fast-path; otherwise we follow the
643 // pod logs live so StepStatusEnd is only emitted once
644 // the container actually exits.
645 if terminal {
646 p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID)
647 } else {
648 p.streamTaskRunLogs(ctx, out, ref, tr, stepID)
649 }
650
651 if !sendLine(ctx, out, LogLine{
652 Kind: LogKindControl,
653 Time: time.Now(),
654 Content: taskName,
655 StepId: stepID,
656 StepStatus: StepStatusEnd,
657 }) {
658 return
659 }
660 p.log.Debug("Logs: finished TaskRun",
661 "task_run", taskName, "step_id", stepID,
662 )
663 stepID++
664 }
665
666 // Done condition: the PipelineRun is terminal AND we found no
667 // new TaskRuns this iteration. Both are required because a
668 // terminal PipelineRun can still expose a freshly-listed
669 // TaskRun whose pod we haven't drained yet.
670 if terminal && len(fresh) == 0 {
671 p.log.Debug("Logs: pipeline run terminal, no new TaskRuns",
672 "pipeline_run", ref.PipelineRunName,
673 )
674 return
675 }
676
677 // When we processed new TaskRuns this round, loop again
678 // immediately to re-list — Follow=true streaming may have
679 // blocked us long enough for additional TaskRuns or terminal
680 // transitions to have happened.
681 if len(fresh) > 0 {
682 continue
683 }
684
685 select {
686 case <-ctx.Done():
687 return
688 case <-time.After(pollInterval):
689 }
690 }
691}
692
693// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now.
694func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool {
695 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
696 ref.PipelineRunName,
697 )
698 if err != nil {
699 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName)
700 return false
701 }
702 _, terminal, ok := mapTektonPipelineRunStatus(obj)
703 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok)
704 return ok && terminal
705}
706
707// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed.
708// It reads each step container's logs in one shot using all-containers traversal,
709// and also inlines the Tekton step status from the TaskRun (exit code, reason) as
710// control messages so the caller gets full context without needing to watch for events.
711func (p *tektonProvider) fetchCompletedTaskRunLogs(
712 ctx context.Context,
713 out chan<- LogLine,
714 ref TektonRunRef,
715 tr k8s.Object,
716 stepID int,
717) {
718 trName := tr.GetName()
719 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName)
720 if err != nil {
721 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err,
722 "task_run", trName, "pipeline_run", ref.PipelineRunName,
723 )
724 return
725 }
726 p.log.Debug("fetchCompletedTaskRunLogs: found pods",
727 "task_run", trName, "pod_count", len(pods),
728 )
729
730 // Emit a summary line from the TaskRun status (steps[*].terminated) so we
731 // get exit codes and reasons even if the pod logs are sparse.
732 steps, _ := tr.NestedSlice("status", "steps")
733 for _, rawStep := range steps {
734 step, _ := rawStep.(map[string]any)
735 stepName, _ := step["name"].(string)
736 term, _ := step["terminated"].(map[string]any)
737 if term == nil {
738 continue
739 }
740 exitCode := numberToInt64(term["exitCode"])
741 reason, _ := term["reason"].(string)
742 msg, _ := term["message"].(string)
743 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason)
744 if msg != "" {
745 line += " " + msg
746 }
747 p.log.Debug("fetchCompletedTaskRunLogs: step terminated",
748 "task_run", trName, "step", stepName,
749 "exit_code", exitCode, "reason", reason,
750 )
751 if !sendLine(ctx, out, LogLine{
752 Kind: LogKindData,
753 Time: time.Now(),
754 Content: line + "\n",
755 StepId: stepID,
756 Stream: "stdout",
757 }) {
758 return
759 }
760 }
761
762 for _, pod := range pods {
763 containers := append([]k8s.Container(nil), pod.InitContainers...)
764 containers = append(containers, pod.Containers...)
765 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers",
766 "pod", pod.Name, "container_count", len(containers),
767 )
768 for _, c := range containers {
769 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs",
770 "pod", pod.Name, "container", c.Name,
771 )
772 // Snapshot read (Follow=false) is correct here: the TaskRun
773 // has already terminated, so the API server has the full log
774 // available and there is nothing more to wait for.
775 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
776 k8s.LogOptions{Follow: false},
777 )
778 if err != nil {
779 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err,
780 "pod", pod.Name, "container", c.Name,
781 )
782 continue
783 }
784 p.sendReaderLines(ctx, out, rc, stepID)
785 _ = rc.Close()
786 p.log.Debug("fetchCompletedTaskRunLogs: done reading container",
787 "pod", pod.Name, "container", c.Name,
788 )
789 }
790 }
791}
792
793func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) {
794 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{
795 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName,
796 })
797 if err != nil {
798 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err)
799 }
800 items := append([]k8s.Object(nil), list...)
801 sort.Slice(items, func(i, j int) bool {
802 ti := items[i].GetCreationTimestamp()
803 tj := items[j].GetCreationTimestamp()
804 return ti.Before(tj)
805 })
806 return items, nil
807}
808
809func (p *tektonProvider) streamTaskRunLogs(
810 ctx context.Context,
811 out chan<- LogLine,
812 ref TektonRunRef,
813 tr k8s.Object,
814 stepID int,
815) {
816 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName())
817 if err != nil {
818 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err,
819 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName,
820 )
821 return
822 }
823 p.log.Debug("streamTaskRunLogs: found pods",
824 "task_run", tr.GetName(), "pod_count", len(pods),
825 )
826 for _, pod := range pods {
827 containers := append([]k8s.Container(nil), pod.InitContainers...)
828 containers = append(containers, pod.Containers...)
829 p.log.Debug("streamTaskRunLogs: streaming pod containers",
830 "pod", pod.Name, "container_count", len(containers),
831 )
832 for _, c := range containers {
833 p.log.Debug("streamTaskRunLogs: streaming container",
834 "pod", pod.Name, "container", c.Name, "step_id", stepID,
835 )
836 // Live tail with Follow=true: the apiserver holds the
837 // connection open until the container terminates (or ctx is
838 // cancelled), so sendReaderLines only returns once the
839 // container is actually done. Without this, the read EOFs at
840 // the current tail position and the caller emits a spurious
841 // StepStatusEnd while the step is still running.
842 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name,
843 k8s.LogOptions{Follow: true},
844 )
845 if err != nil {
846 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err,
847 "pod", pod.Name, "container", c.Name,
848 )
849 continue
850 }
851 p.sendReaderLines(ctx, out, rc, stepID)
852 _ = rc.Close()
853 p.log.Debug("streamTaskRunLogs: finished container",
854 "pod", pod.Name, "container", c.Name,
855 )
856 }
857 }
858}
859
860func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) {
861 list, err := p.client.ListPods(ctx, namespace,
862 "tekton.dev/taskRun="+taskRun,
863 )
864 if err != nil {
865 return nil, fmt.Errorf("list pods: %w", err)
866 }
867 pods := append([]k8s.Pod(nil), list...)
868 sort.Slice(pods, func(i, j int) bool {
869 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp)
870 })
871 return pods, nil
872}
873
874func (p *tektonProvider) sendReaderLines(
875 ctx context.Context,
876 out chan<- LogLine,
877 rc io.Reader,
878 stepID int,
879) {
880 scanner := bufio.NewScanner(rc)
881 for scanner.Scan() {
882 if !sendLine(ctx, out, LogLine{
883 Kind: LogKindData,
884 Time: time.Now(),
885 Content: scanner.Text() + "\n",
886 StepId: stepID,
887 Stream: "stdout",
888 }) {
889 return
890 }
891 }
892 if err := scanner.Err(); err != nil {
893 p.log.Debug("scan pod log", "err", err)
894 }
895}
896
897func numberToInt64(value any) int64 {
898 switch v := value.(type) {
899 case int64:
900 return v
901 case int:
902 return int64(v)
903 case float64:
904 return int64(v)
905 case json.Number:
906 i, _ := v.Int64()
907 return i
908 default:
909 return 0
910 }
911}
912
913func (p *tektonProvider) publishStatus(
914 ctx context.Context,
915 pipelineURI, workflow, status, runName string,
916 errMsg *string,
917 exitCode *int64,
918) error {
919 rec := tangled.PipelineStatus{
920 LexiconTypeID: tangled.PipelineStatusNSID,
921 Pipeline: pipelineURI,
922 Workflow: workflow,
923 Status: status,
924 CreatedAt: time.Now().UTC().Format(time.RFC3339),
925 Error: errMsg,
926 ExitCode: exitCode,
927 }
928 body, err := json.Marshal(rec)
929 if err != nil {
930 return fmt.Errorf("marshal pipeline.status: %w", err)
931 }
932 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano())
933 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
934 return fmt.Errorf("publish pipeline.status: %w", err)
935 }
936 return nil
937}
938
939func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string {
940 h := sha256.Sum256([]byte(strings.Join(
941 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00",
942 )))
943 suffix := hex.EncodeToString(h[:])[:12]
944 base := dnsLabel("tack-" + workflow)
945 maxBase := 63 - len(suffix) - 1
946 if len(base) > maxBase {
947 base = strings.TrimRight(base[:maxBase], "-")
948 }
949 if base == "" {
950 base = "tack"
951 }
952 return base + "-" + suffix
953}
954
955func dnsLabel(s string) string {
956 var b strings.Builder
957 lastDash := false
958 for _, r := range strings.ToLower(s) {
959 ok := unicode.IsLetter(r) || unicode.IsDigit(r)
960 if ok {
961 b.WriteRune(r)
962 lastDash = false
963 continue
964 }
965 if !lastDash {
966 b.WriteByte('-')
967 lastDash = true
968 }
969 }
970 return strings.Trim(b.String(), "-")
971}
972
973func labelValue(s string) string {
974 v := dnsLabel(s)
975 if len(v) > 63 {
976 v = strings.TrimRight(v[:63], "-")
977 }
978 if v == "" {
979 return "unknown"
980 }
981 return v
982}