Stitch any CI into Tangled
1package main
2
3// tektonProvider implements Provider by creating Tekton PipelineRuns
4// directly inside the Kubernetes cluster. Tack already receives and
5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would
6// duplicate the event-to-run translation layer instead of simplifying it.
7
8import (
9 "bufio"
10 "context"
11 "crypto/sha256"
12 "encoding/hex"
13 "encoding/json"
14 "errors"
15 "fmt"
16 "io"
17 "log/slog"
18 "sort"
19 "strings"
20 "time"
21 "unicode"
22
23 "tangled.org/core/api/tangled"
24
25 "go.mitchellh.com/tack/internal/k8s"
26 "go.yaml.in/yaml/v2"
27)
28
29const (
30 tektonAPIVersion = "tekton.dev/v1"
31 tektonRunKind = "PipelineRun"
32
33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by"
34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
35 tektonLabelWorkflow = "tack.mitchellh.com/workflow"
36
37 tektonAnnotationKnot = "tack.mitchellh.com/knot"
38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow"
40 tektonAnnotationActor = "tack.mitchellh.com/actor"
41 tektonAnnotationCommit = "tack.mitchellh.com/commit"
42 tektonAnnotationBranch = "tack.mitchellh.com/branch"
43)
44
45var (
46 pipelineRunsGVR = k8s.GVR{
47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns",
48 }
49 taskRunsGVR = k8s.GVR{
50 Group: "tekton.dev", Version: "v1", Resource: "taskruns",
51 }
52)
53
54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML.
55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are
56// deliberately string-only in v1: tack is meant to select an existing
57// runner and pass a small amount of routing data, not mirror Tekton's
58// entire PipelineRun API.
59type tektonWorkflowConfig struct {
60 Pipeline string `yaml:"pipeline"`
61 ServiceAccount string `yaml:"service_account"`
62 Params map[string]string `yaml:"params"`
63}
64
65type tektonWorkflowDoc struct {
66 Tack struct {
67 Tekton tektonWorkflowConfig `yaml:"tekton"`
68 } `yaml:"tack"`
69}
70
71// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body.
72func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) {
73 if strings.TrimSpace(raw) == "" {
74 return nil, errors.New("workflow body is empty")
75 }
76 var doc tektonWorkflowDoc
77 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
78 return nil, fmt.Errorf("parse workflow yaml: %w", err)
79 }
80 cfg := doc.Tack.Tekton
81 if cfg.Pipeline == "" {
82 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required")
83 }
84 return &cfg, nil
85}
86
87type tektonProvider struct {
88 br *broker
89 st *store
90 log *slog.Logger
91 client k8s.Client
92 namespace string
93}
94
95var _ Provider = (*tektonProvider)(nil)
96
97func newTektonProvider(
98 br *broker,
99 st *store,
100 client k8s.Client,
101 namespace string,
102 log *slog.Logger,
103) *tektonProvider {
104 return &tektonProvider{
105 br: br,
106 st: st,
107 log: log.With("component", "provider", "kind", "tekton"),
108 client: client,
109 namespace: namespace,
110 }
111}
112
113func newInClusterTektonProvider(
114 br *broker,
115 st *store,
116 namespace string,
117 log *slog.Logger,
118) (*tektonProvider, error) {
119 client, err := k8s.NewInClusterClient()
120 if err != nil {
121 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err)
122 }
123 return newTektonProvider(br, st, client, namespace, log), nil
124}
125
126func (p *tektonProvider) Spawn(
127 ctx context.Context,
128 knot string,
129 pipelineRkey string,
130 actor string,
131 trigger *tangled.Pipeline_TriggerMetadata,
132 workflows []*tangled.Pipeline_Workflow,
133) {
134 if len(workflows) == 0 {
135 p.log.Warn("pipeline has no workflows; nothing to spawn",
136 "knot", knot, "rkey", pipelineRkey,
137 )
138 return
139 }
140 for _, wf := range workflows {
141 if wf == nil || wf.Name == "" {
142 continue
143 }
144 wf := wf
145 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
146 }
147}
148
149func (p *tektonProvider) spawnWorkflow(
150 ctx context.Context,
151 knot string,
152 pipelineRkey string,
153 actor string,
154 trigger *tangled.Pipeline_TriggerMetadata,
155 wf *tangled.Pipeline_Workflow,
156) {
157 logger := p.log.With(
158 "knot", knot,
159 "pipeline_rkey", pipelineRkey,
160 "workflow", wf.Name,
161 "actor", actor,
162 )
163
164 cfg, err := parseTektonWorkflowConfig(wf.Raw)
165 if err != nil {
166 logger.Error("invalid workflow config; refusing to spawn", "err", err)
167 return
168 }
169 commit, branch := triggerCommitAndBranch(trigger)
170 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch)
171 pr := buildTektonPipelineRun(
172 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf,
173 )
174
175 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr)
176 if errors.Is(err, k8s.ErrAlreadyExists) {
177 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name)
178 }
179 if err != nil {
180 logger.Error("create tekton PipelineRun", "err", err,
181 "namespace", p.namespace, "pipeline_run", name,
182 "pipeline", cfg.Pipeline,
183 )
184 return
185 }
186
187 ref := TektonRunRef{
188 Knot: knot,
189 PipelineRkey: pipelineRkey,
190 Workflow: wf.Name,
191 Namespace: p.namespace,
192 PipelineRunName: name,
193 PipelineRunUID: created.GetUID(),
194 PipelineName: cfg.Pipeline,
195 PipelineURI: pipelineATURI(knot, pipelineRkey),
196 }
197 if err := p.st.InsertTektonRun(ctx, ref); err != nil {
198 logger.Error("persist tekton run mapping", "err", err,
199 "pipeline_run", name,
200 )
201 return
202 }
203
204 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name,
205 "pending", name, nil, nil); err != nil {
206 logger.Error("publish initial pending status", "err", err)
207 }
208
209 logger.Info("tekton PipelineRun created",
210 "namespace", p.namespace,
211 "pipeline", cfg.Pipeline,
212 "pipeline_run", name,
213 "uid", ref.PipelineRunUID,
214 )
215 go p.watchPipelineRun(ctx, ref)
216}
217
218func buildTektonPipelineRun(
219 namespace, name string,
220 cfg *tektonWorkflowConfig,
221 knot, pipelineRkey, actor, commit, branch string,
222 wf *tangled.Pipeline_Workflow,
223) k8s.Object {
224 obj := k8s.Object{
225 "apiVersion": tektonAPIVersion,
226 "kind": tektonRunKind,
227 "metadata": map[string]any{
228 "name": name,
229 "namespace": namespace,
230 "labels": map[string]any{
231 tektonLabelManagedBy: "tack",
232 tektonLabelPipelineRkey: labelValue(pipelineRkey),
233 tektonLabelWorkflow: labelValue(wf.Name),
234 },
235 "annotations": map[string]any{
236 tektonAnnotationKnot: knot,
237 tektonAnnotationPipelineRkey: pipelineRkey,
238 tektonAnnotationWorkflow: wf.Name,
239 tektonAnnotationActor: actor,
240 tektonAnnotationCommit: commit,
241 tektonAnnotationBranch: branch,
242 },
243 },
244 "spec": map[string]any{
245 "pipelineRef": map[string]any{
246 "name": cfg.Pipeline,
247 },
248 },
249 }
250 spec := obj["spec"].(map[string]any)
251 if cfg.ServiceAccount != "" {
252 spec["serviceAccountName"] = cfg.ServiceAccount
253 }
254 if len(cfg.Params) > 0 {
255 keys := make([]string, 0, len(cfg.Params))
256 for key := range cfg.Params {
257 keys = append(keys, key)
258 }
259 sort.Strings(keys)
260 params := make([]any, 0, len(keys))
261 for _, key := range keys {
262 params = append(params, map[string]any{
263 "name": key,
264 "value": cfg.Params[key],
265 })
266 }
267 spec["params"] = params
268 }
269 return obj
270}
271
272func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) {
273 logger := p.log.With(
274 "knot", ref.Knot,
275 "pipeline_rkey", ref.PipelineRkey,
276 "workflow", ref.Workflow,
277 "namespace", ref.Namespace,
278 "pipeline_run", ref.PipelineRunName,
279 )
280
281 logger.Debug("watchPipelineRun: starting")
282
283 last := ""
284 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
285 ref.PipelineRunName); err == nil {
286 status, terminal, ok := mapTektonPipelineRunStatus(obj)
287 logger.Debug("watchPipelineRun: initial status read",
288 "status", status, "terminal", terminal, "ok", ok,
289 )
290 if ok {
291 last = status
292 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
293 status, ref.PipelineRunName, nil, nil); err != nil {
294 logger.Error("publish tekton status", "err", err, "status", status)
295 }
296 if terminal {
297 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status)
298 return
299 }
300 }
301 } else if errors.Is(err, k8s.ErrNotFound) {
302 logger.Warn("PipelineRun disappeared while watching")
303 return
304 } else {
305 logger.Debug("initial PipelineRun status read", "err", err)
306 }
307
308 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace,
309 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName},
310 )
311 if err != nil {
312 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err)
313 p.pollPipelineRun(ctx, ref, logger, last)
314 return
315 }
316 defer w.Stop()
317
318 logger.Debug("watchPipelineRun: watch established; entering event loop")
319 for {
320 select {
321 case <-ctx.Done():
322 logger.Debug("watchPipelineRun: context cancelled")
323 return
324 case ev, ok := <-w.ResultChan():
325 if !ok {
326 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling")
327 p.pollPipelineRun(ctx, ref, logger, last)
328 return
329 }
330 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object)
331 logger.Debug("watchPipelineRun: watch event",
332 "event_type", ev.Type,
333 "status", status, "terminal", terminal, "ok", ok, "last", last,
334 )
335 if !ok || status == last {
336 if terminal {
337 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status)
338 return
339 }
340 continue
341 }
342 last = status
343 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
344 status, ref.PipelineRunName, nil, nil); err != nil {
345 logger.Error("publish tekton status", "err", err, "status", status)
346 continue
347 }
348 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal)
349 if terminal {
350 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status)
351 return
352 }
353 }
354 }
355}
356
357func (p *tektonProvider) pollPipelineRun(
358 ctx context.Context,
359 ref TektonRunRef,
360 logger *slog.Logger,
361 last string,
362) {
363 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s")
364 ticker := time.NewTicker(5 * time.Second)
365 defer ticker.Stop()
366 for {
367 select {
368 case <-ctx.Done():
369 logger.Debug("pollPipelineRun: context cancelled")
370 return
371 case <-ticker.C:
372 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
373 ref.PipelineRunName,
374 )
375 if errors.Is(err, k8s.ErrNotFound) {
376 logger.Warn("PipelineRun disappeared while watching")
377 return
378 }
379 if err != nil {
380 logger.Debug("get PipelineRun status", "err", err)
381 continue
382 }
383 status, terminal, ok := mapTektonPipelineRunStatus(obj)
384 logger.Debug("pollPipelineRun: poll tick",
385 "status", status, "terminal", terminal, "ok", ok, "last", last,
386 )
387 if !ok || status == last {
388 if terminal {
389 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status)
390 return
391 }
392 continue
393 }
394 last = status
395 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
396 status, ref.PipelineRunName, nil, nil); err != nil {
397 logger.Error("publish tekton status", "err", err, "status", status)
398 continue
399 }
400 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal)
401 if terminal {
402 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status)
403 return
404 }
405 }
406 }
407}
408
409// mapTektonPipelineRunStatus translates Tekton's Succeeded condition
410// into the Tangled status strings consumed by the appview.
411func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) {
412 conditions, ok := obj.NestedSlice("status", "conditions")
413 if !ok || len(conditions) == 0 {
414 slog.Debug("mapTektonPipelineRunStatus: no conditions found",
415 "pipeline_run", obj.GetName(),
416 )
417 return "", false, false
418 }
419 for _, raw := range conditions {
420 cond, _ := raw.(map[string]interface{})
421 condType, _ := cond["type"].(string)
422 condStatus, _ := cond["status"].(string)
423 reason, _ := cond["reason"].(string)
424 message, _ := cond["message"].(string)
425 slog.Debug("mapTektonPipelineRunStatus: condition",
426 "pipeline_run", obj.GetName(),
427 "type", condType,
428 "status", condStatus,
429 "reason", reason,
430 "message", message,
431 )
432 if condType != "Succeeded" {
433 continue
434 }
435 switch condStatus {
436 case "True":
437 return "success", true, true
438 case "False":
439 if tektonReasonCancelled(reason) {
440 return "cancelled", true, true
441 }
442 return "failed", true, true
443 case "Unknown":
444 return "running", false, true
445 default:
446 return "", false, false
447 }
448 }
449 return "", false, false
450}
451
452func tektonReasonCancelled(reason string) bool {
453 r := strings.ToLower(reason)
454 return strings.Contains(r, "cancel") || strings.Contains(r, "stop")
455}
456
457func (p *tektonProvider) Logs(
458 ctx context.Context,
459 knot string,
460 pipelineRkey string,
461 workflow string,
462) (<-chan LogLine, error) {
463 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow)
464 if err != nil {
465 return nil, fmt.Errorf("lookup tekton run mapping: %w", err)
466 }
467 if ref == nil {
468 return nil, ErrLogsNotFound
469 }
470
471 taskRuns, err := p.taskRunsForPipelineRun(ctx, *ref)
472 if err != nil {
473 return nil, err
474 }
475 p.log.Debug("Logs: found TaskRuns for PipelineRun",
476 "pipeline_run", ref.PipelineRunName, "count", len(taskRuns),
477 )
478 if len(taskRuns) == 0 {
479 return nil, ErrLogsNotFound
480 }
481
482 terminal := p.isPipelineRunTerminal(ctx, *ref)
483 p.log.Debug("Logs: pipeline run terminal state", "pipeline_run", ref.PipelineRunName, "terminal", terminal)
484
485 out := make(chan LogLine, 32)
486 go func() {
487 defer close(out)
488 stepID := 0
489 for _, tr := range taskRuns {
490 taskName := tr.GetName()
491 if taskName == "" {
492 taskName = fmt.Sprintf("task %d", stepID)
493 }
494 p.log.Debug("Logs: streaming TaskRun", "task_run", taskName, "step_id", stepID, "terminal", terminal)
495 if !sendLine(ctx, out, LogLine{
496 Kind: LogKindControl,
497 Time: time.Now(),
498 Content: taskName,
499 StepId: stepID,
500 StepStatus: StepStatusStart,
501 }) {
502 return
503 }
504
505 if terminal {
506 p.fetchCompletedTaskRunLogs(ctx, out, *ref, tr, stepID)
507 } else {
508 p.streamTaskRunLogs(ctx, out, *ref, tr, stepID)
509 }
510
511 if !sendLine(ctx, out, LogLine{
512 Kind: LogKindControl,
513 Time: time.Now(),
514 Content: taskName,
515 StepId: stepID,
516 StepStatus: StepStatusEnd,
517 }) {
518 return
519 }
520 p.log.Debug("Logs: finished TaskRun", "task_run", taskName, "step_id", stepID)
521 stepID++
522 }
523 p.log.Debug("Logs: all TaskRuns streamed", "pipeline_run", ref.PipelineRunName)
524 }()
525 return out, nil
526}
527
528// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now.
529func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool {
530 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace,
531 ref.PipelineRunName,
532 )
533 if err != nil {
534 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName)
535 return false
536 }
537 _, terminal, ok := mapTektonPipelineRunStatus(obj)
538 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok)
539 return ok && terminal
540}
541
542// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed.
543// It reads each step container's logs in one shot using all-containers traversal,
544// and also inlines the Tekton step status from the TaskRun (exit code, reason) as
545// control messages so the caller gets full context without needing to watch for events.
546func (p *tektonProvider) fetchCompletedTaskRunLogs(
547 ctx context.Context,
548 out chan<- LogLine,
549 ref TektonRunRef,
550 tr k8s.Object,
551 stepID int,
552) {
553 trName := tr.GetName()
554 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName)
555 if err != nil {
556 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err,
557 "task_run", trName, "pipeline_run", ref.PipelineRunName,
558 )
559 return
560 }
561 p.log.Debug("fetchCompletedTaskRunLogs: found pods",
562 "task_run", trName, "pod_count", len(pods),
563 )
564
565 // Emit a summary line from the TaskRun status (steps[*].terminated) so we
566 // get exit codes and reasons even if the pod logs are sparse.
567 steps, _ := tr.NestedSlice("status", "steps")
568 for _, rawStep := range steps {
569 step, _ := rawStep.(map[string]any)
570 stepName, _ := step["name"].(string)
571 term, _ := step["terminated"].(map[string]any)
572 if term == nil {
573 continue
574 }
575 exitCode := numberToInt64(term["exitCode"])
576 reason, _ := term["reason"].(string)
577 msg, _ := term["message"].(string)
578 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason)
579 if msg != "" {
580 line += " " + msg
581 }
582 p.log.Debug("fetchCompletedTaskRunLogs: step terminated",
583 "task_run", trName, "step", stepName,
584 "exit_code", exitCode, "reason", reason,
585 )
586 if !sendLine(ctx, out, LogLine{
587 Kind: LogKindData,
588 Time: time.Now(),
589 Content: line + "\n",
590 StepId: stepID,
591 Stream: "stdout",
592 }) {
593 return
594 }
595 }
596
597 for _, pod := range pods {
598 containers := append([]k8s.Container(nil), pod.InitContainers...)
599 containers = append(containers, pod.Containers...)
600 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers",
601 "pod", pod.Name, "container_count", len(containers),
602 )
603 for _, c := range containers {
604 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs",
605 "pod", pod.Name, "container", c.Name,
606 )
607 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name)
608 if err != nil {
609 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err,
610 "pod", pod.Name, "container", c.Name,
611 )
612 continue
613 }
614 p.sendReaderLines(ctx, out, rc, stepID)
615 _ = rc.Close()
616 p.log.Debug("fetchCompletedTaskRunLogs: done reading container",
617 "pod", pod.Name, "container", c.Name,
618 )
619 }
620 }
621}
622
623func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) {
624 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{
625 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName,
626 })
627 if err != nil {
628 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err)
629 }
630 items := append([]k8s.Object(nil), list...)
631 sort.Slice(items, func(i, j int) bool {
632 ti := items[i].GetCreationTimestamp()
633 tj := items[j].GetCreationTimestamp()
634 return ti.Before(tj)
635 })
636 return items, nil
637}
638
639func (p *tektonProvider) streamTaskRunLogs(
640 ctx context.Context,
641 out chan<- LogLine,
642 ref TektonRunRef,
643 tr k8s.Object,
644 stepID int,
645) {
646 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName())
647 if err != nil {
648 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err,
649 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName,
650 )
651 return
652 }
653 p.log.Debug("streamTaskRunLogs: found pods",
654 "task_run", tr.GetName(), "pod_count", len(pods),
655 )
656 for _, pod := range pods {
657 containers := append([]k8s.Container(nil), pod.InitContainers...)
658 containers = append(containers, pod.Containers...)
659 p.log.Debug("streamTaskRunLogs: streaming pod containers",
660 "pod", pod.Name, "container_count", len(containers),
661 )
662 for _, c := range containers {
663 p.log.Debug("streamTaskRunLogs: streaming container",
664 "pod", pod.Name, "container", c.Name, "step_id", stepID,
665 )
666 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name)
667 if err != nil {
668 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err,
669 "pod", pod.Name, "container", c.Name,
670 )
671 continue
672 }
673 p.sendReaderLines(ctx, out, rc, stepID)
674 _ = rc.Close()
675 p.log.Debug("streamTaskRunLogs: finished container",
676 "pod", pod.Name, "container", c.Name,
677 )
678 }
679 }
680}
681
682func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) {
683 list, err := p.client.ListPods(ctx, namespace,
684 "tekton.dev/taskRun="+taskRun,
685 )
686 if err != nil {
687 return nil, fmt.Errorf("list pods: %w", err)
688 }
689 pods := append([]k8s.Pod(nil), list...)
690 sort.Slice(pods, func(i, j int) bool {
691 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp)
692 })
693 return pods, nil
694}
695
696func (p *tektonProvider) sendReaderLines(
697 ctx context.Context,
698 out chan<- LogLine,
699 rc io.Reader,
700 stepID int,
701) {
702 scanner := bufio.NewScanner(rc)
703 for scanner.Scan() {
704 if !sendLine(ctx, out, LogLine{
705 Kind: LogKindData,
706 Time: time.Now(),
707 Content: scanner.Text() + "\n",
708 StepId: stepID,
709 Stream: "stdout",
710 }) {
711 return
712 }
713 }
714 if err := scanner.Err(); err != nil {
715 p.log.Debug("scan pod log", "err", err)
716 }
717}
718
719func numberToInt64(value any) int64 {
720 switch v := value.(type) {
721 case int64:
722 return v
723 case int:
724 return int64(v)
725 case float64:
726 return int64(v)
727 case json.Number:
728 i, _ := v.Int64()
729 return i
730 default:
731 return 0
732 }
733}
734
735func (p *tektonProvider) publishStatus(
736 ctx context.Context,
737 pipelineURI, workflow, status, runName string,
738 errMsg *string,
739 exitCode *int64,
740) error {
741 rec := tangled.PipelineStatus{
742 LexiconTypeID: tangled.PipelineStatusNSID,
743 Pipeline: pipelineURI,
744 Workflow: workflow,
745 Status: status,
746 CreatedAt: time.Now().UTC().Format(time.RFC3339),
747 Error: errMsg,
748 ExitCode: exitCode,
749 }
750 body, err := json.Marshal(rec)
751 if err != nil {
752 return fmt.Errorf("marshal pipeline.status: %w", err)
753 }
754 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano())
755 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
756 return fmt.Errorf("publish pipeline.status: %w", err)
757 }
758 return nil
759}
760
761func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string {
762 h := sha256.Sum256([]byte(strings.Join(
763 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00",
764 )))
765 suffix := hex.EncodeToString(h[:])[:12]
766 base := dnsLabel("tack-" + workflow)
767 maxBase := 63 - len(suffix) - 1
768 if len(base) > maxBase {
769 base = strings.TrimRight(base[:maxBase], "-")
770 }
771 if base == "" {
772 base = "tack"
773 }
774 return base + "-" + suffix
775}
776
777func dnsLabel(s string) string {
778 var b strings.Builder
779 lastDash := false
780 for _, r := range strings.ToLower(s) {
781 ok := unicode.IsLetter(r) || unicode.IsDigit(r)
782 if ok {
783 b.WriteRune(r)
784 lastDash = false
785 continue
786 }
787 if !lastDash {
788 b.WriteByte('-')
789 lastDash = true
790 }
791 }
792 return strings.Trim(b.String(), "-")
793}
794
795func labelValue(s string) string {
796 v := dnsLabel(s)
797 if len(v) > 63 {
798 v = strings.TrimRight(v[:63], "-")
799 }
800 if v == "" {
801 return "unknown"
802 }
803 return v
804}