Stitch any CI into Tangled
1package main
2
3// tektonProvider implements Provider by creating Tekton PipelineRuns
4// directly inside the Kubernetes cluster. Tack already receives and
5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would
6// duplicate the event-to-run translation layer instead of simplifying it.
7
8import (
9 "bufio"
10 "context"
11 "crypto/sha256"
12 "encoding/hex"
13 "encoding/json"
14 "errors"
15 "fmt"
16 "io"
17 "log/slog"
18 "sort"
19 "strings"
20 "time"
21 "unicode"
22
23 corev1 "k8s.io/api/core/v1"
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27 "k8s.io/apimachinery/pkg/fields"
28 "k8s.io/apimachinery/pkg/labels"
29 runtimeschema "k8s.io/apimachinery/pkg/runtime/schema"
30 "k8s.io/client-go/dynamic"
31 "k8s.io/client-go/kubernetes"
32 "k8s.io/client-go/rest"
33 "tangled.org/core/api/tangled"
34
35 "go.yaml.in/yaml/v2"
36)
37
38const (
39 tektonAPIVersion = "tekton.dev/v1"
40 tektonRunKind = "PipelineRun"
41
42 tektonLabelManagedBy = "tack.mitchellh.com/managed-by"
43 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
44 tektonLabelWorkflow = "tack.mitchellh.com/workflow"
45
46 tektonAnnotationKnot = "tack.mitchellh.com/knot"
47 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey"
48 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow"
49 tektonAnnotationActor = "tack.mitchellh.com/actor"
50 tektonAnnotationCommit = "tack.mitchellh.com/commit"
51 tektonAnnotationBranch = "tack.mitchellh.com/branch"
52)
53
54var (
55 pipelineRunsGVR = runtimeschema.GroupVersionResource{
56 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns",
57 }
58 taskRunsGVR = runtimeschema.GroupVersionResource{
59 Group: "tekton.dev", Version: "v1", Resource: "taskruns",
60 }
61)
62
63// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML.
64// `pipeline` names an existing in-cluster Tekton Pipeline. Params are
65// deliberately string-only in v1: tack is meant to select an existing
66// runner and pass a small amount of routing data, not mirror Tekton's
67// entire PipelineRun API.
68type tektonWorkflowConfig struct {
69 Pipeline string `yaml:"pipeline"`
70 ServiceAccount string `yaml:"service_account"`
71 Params map[string]string `yaml:"params"`
72}
73
74type tektonWorkflowDoc struct {
75 Tack struct {
76 Tekton tektonWorkflowConfig `yaml:"tekton"`
77 } `yaml:"tack"`
78}
79
80// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body.
81func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) {
82 if strings.TrimSpace(raw) == "" {
83 return nil, errors.New("workflow body is empty")
84 }
85 var doc tektonWorkflowDoc
86 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
87 return nil, fmt.Errorf("parse workflow yaml: %w", err)
88 }
89 cfg := doc.Tack.Tekton
90 if cfg.Pipeline == "" {
91 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required")
92 }
93 return &cfg, nil
94}
95
96type tektonProvider struct {
97 br *broker
98 st *store
99 log *slog.Logger
100 dyn dynamic.Interface
101 kube kubernetes.Interface
102 namespace string
103}
104
105var _ Provider = (*tektonProvider)(nil)
106
107func newTektonProvider(
108 br *broker,
109 st *store,
110 dyn dynamic.Interface,
111 kube kubernetes.Interface,
112 namespace string,
113 log *slog.Logger,
114) *tektonProvider {
115 return &tektonProvider{
116 br: br,
117 st: st,
118 log: log.With("component", "provider", "kind", "tekton"),
119 dyn: dyn,
120 kube: kube,
121 namespace: namespace,
122 }
123}
124
125func newInClusterTektonProvider(
126 br *broker,
127 st *store,
128 namespace string,
129 log *slog.Logger,
130) (*tektonProvider, error) {
131 cfg, err := rest.InClusterConfig()
132 if err != nil {
133 return nil, fmt.Errorf("load in-cluster kubernetes config: %w", err)
134 }
135 dyn, err := dynamic.NewForConfig(cfg)
136 if err != nil {
137 return nil, fmt.Errorf("create dynamic kubernetes client: %w", err)
138 }
139 kube, err := kubernetes.NewForConfig(cfg)
140 if err != nil {
141 return nil, fmt.Errorf("create kubernetes client: %w", err)
142 }
143 return newTektonProvider(br, st, dyn, kube, namespace, log), nil
144}
145
146func (p *tektonProvider) Spawn(
147 ctx context.Context,
148 knot string,
149 pipelineRkey string,
150 actor string,
151 trigger *tangled.Pipeline_TriggerMetadata,
152 workflows []*tangled.Pipeline_Workflow,
153) {
154 if len(workflows) == 0 {
155 p.log.Warn("pipeline has no workflows; nothing to spawn",
156 "knot", knot, "rkey", pipelineRkey,
157 )
158 return
159 }
160 for _, wf := range workflows {
161 if wf == nil || wf.Name == "" {
162 continue
163 }
164 wf := wf
165 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
166 }
167}
168
169func (p *tektonProvider) spawnWorkflow(
170 ctx context.Context,
171 knot string,
172 pipelineRkey string,
173 actor string,
174 trigger *tangled.Pipeline_TriggerMetadata,
175 wf *tangled.Pipeline_Workflow,
176) {
177 logger := p.log.With(
178 "knot", knot,
179 "pipeline_rkey", pipelineRkey,
180 "workflow", wf.Name,
181 "actor", actor,
182 )
183
184 cfg, err := parseTektonWorkflowConfig(wf.Raw)
185 if err != nil {
186 logger.Error("invalid workflow config; refusing to spawn", "err", err)
187 return
188 }
189 commit, branch := triggerCommitAndBranch(trigger)
190 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch)
191 pr := buildTektonPipelineRun(
192 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf,
193 )
194
195 runs := p.dyn.Resource(pipelineRunsGVR).Namespace(p.namespace)
196 created, err := runs.Create(ctx, pr, metav1.CreateOptions{})
197 if apierrors.IsAlreadyExists(err) {
198 created, err = runs.Get(ctx, name, metav1.GetOptions{})
199 }
200 if err != nil {
201 logger.Error("create tekton PipelineRun", "err", err,
202 "namespace", p.namespace, "pipeline_run", name,
203 "pipeline", cfg.Pipeline,
204 )
205 return
206 }
207
208 ref := TektonRunRef{
209 Knot: knot,
210 PipelineRkey: pipelineRkey,
211 Workflow: wf.Name,
212 Namespace: p.namespace,
213 PipelineRunName: name,
214 PipelineRunUID: string(created.GetUID()),
215 PipelineName: cfg.Pipeline,
216 PipelineURI: pipelineATURI(knot, pipelineRkey),
217 }
218 if err := p.st.InsertTektonRun(ctx, ref); err != nil {
219 logger.Error("persist tekton run mapping", "err", err,
220 "pipeline_run", name,
221 )
222 return
223 }
224
225 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name,
226 "pending", name, nil, nil); err != nil {
227 logger.Error("publish initial pending status", "err", err)
228 }
229
230 logger.Info("tekton PipelineRun created",
231 "namespace", p.namespace,
232 "pipeline", cfg.Pipeline,
233 "pipeline_run", name,
234 "uid", ref.PipelineRunUID,
235 )
236 go p.watchPipelineRun(ctx, ref)
237}
238
239func buildTektonPipelineRun(
240 namespace, name string,
241 cfg *tektonWorkflowConfig,
242 knot, pipelineRkey, actor, commit, branch string,
243 wf *tangled.Pipeline_Workflow,
244) *unstructured.Unstructured {
245 obj := &unstructured.Unstructured{
246 Object: map[string]interface{}{
247 "apiVersion": tektonAPIVersion,
248 "kind": tektonRunKind,
249 "metadata": map[string]interface{}{
250 "name": name,
251 "namespace": namespace,
252 "labels": map[string]interface{}{
253 tektonLabelManagedBy: "tack",
254 tektonLabelPipelineRkey: labelValue(pipelineRkey),
255 tektonLabelWorkflow: labelValue(wf.Name),
256 },
257 "annotations": map[string]interface{}{
258 tektonAnnotationKnot: knot,
259 tektonAnnotationPipelineRkey: pipelineRkey,
260 tektonAnnotationWorkflow: wf.Name,
261 tektonAnnotationActor: actor,
262 tektonAnnotationCommit: commit,
263 tektonAnnotationBranch: branch,
264 },
265 },
266 "spec": map[string]interface{}{
267 "pipelineRef": map[string]interface{}{
268 "name": cfg.Pipeline,
269 },
270 },
271 },
272 }
273 spec := obj.Object["spec"].(map[string]interface{})
274 if cfg.ServiceAccount != "" {
275 spec["serviceAccountName"] = cfg.ServiceAccount
276 }
277 if len(cfg.Params) > 0 {
278 keys := make([]string, 0, len(cfg.Params))
279 for key := range cfg.Params {
280 keys = append(keys, key)
281 }
282 sort.Strings(keys)
283 params := make([]interface{}, 0, len(keys))
284 for _, key := range keys {
285 params = append(params, map[string]interface{}{
286 "name": key,
287 "value": cfg.Params[key],
288 })
289 }
290 spec["params"] = params
291 }
292 return obj
293}
294
295func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) {
296 logger := p.log.With(
297 "knot", ref.Knot,
298 "pipeline_rkey", ref.PipelineRkey,
299 "workflow", ref.Workflow,
300 "namespace", ref.Namespace,
301 "pipeline_run", ref.PipelineRunName,
302 )
303
304 last := ""
305 if obj, err := p.dyn.Resource(pipelineRunsGVR).Namespace(ref.Namespace).
306 Get(ctx, ref.PipelineRunName, metav1.GetOptions{}); err == nil {
307 status, terminal, ok := mapTektonPipelineRunStatus(obj)
308 if ok {
309 last = status
310 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
311 status, ref.PipelineRunName, nil, nil); err != nil {
312 logger.Error("publish tekton status", "err", err, "status", status)
313 }
314 if terminal {
315 return
316 }
317 }
318 } else if apierrors.IsNotFound(err) {
319 logger.Warn("PipelineRun disappeared while watching")
320 return
321 } else {
322 logger.Debug("initial PipelineRun status read", "err", err)
323 }
324
325 w, err := p.dyn.Resource(pipelineRunsGVR).Namespace(ref.Namespace).
326 Watch(ctx, metav1.ListOptions{
327 FieldSelector: fields.OneTermEqualSelector(
328 "metadata.name", ref.PipelineRunName,
329 ).String(),
330 })
331 if err != nil {
332 logger.Debug("watch PipelineRun status; falling back to polling", "err", err)
333 p.pollPipelineRun(ctx, ref, logger, last)
334 return
335 }
336 defer w.Stop()
337
338 for {
339 select {
340 case <-ctx.Done():
341 return
342 case ev, ok := <-w.ResultChan():
343 if !ok {
344 p.pollPipelineRun(ctx, ref, logger, last)
345 return
346 }
347 obj, ok := ev.Object.(*unstructured.Unstructured)
348 if !ok {
349 continue
350 }
351 status, terminal, ok := mapTektonPipelineRunStatus(obj)
352 if !ok || status == last {
353 if terminal {
354 return
355 }
356 continue
357 }
358 last = status
359 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
360 status, ref.PipelineRunName, nil, nil); err != nil {
361 logger.Error("publish tekton status", "err", err, "status", status)
362 continue
363 }
364 if terminal {
365 return
366 }
367 }
368 }
369}
370
371func (p *tektonProvider) pollPipelineRun(
372 ctx context.Context,
373 ref TektonRunRef,
374 logger *slog.Logger,
375 last string,
376) {
377 ticker := time.NewTicker(5 * time.Second)
378 defer ticker.Stop()
379 for {
380 select {
381 case <-ctx.Done():
382 return
383 case <-ticker.C:
384 obj, err := p.dyn.Resource(pipelineRunsGVR).Namespace(ref.Namespace).
385 Get(ctx, ref.PipelineRunName, metav1.GetOptions{})
386 if apierrors.IsNotFound(err) {
387 logger.Warn("PipelineRun disappeared while watching")
388 return
389 }
390 if err != nil {
391 logger.Debug("get PipelineRun status", "err", err)
392 continue
393 }
394 status, terminal, ok := mapTektonPipelineRunStatus(obj)
395 if !ok || status == last {
396 if terminal {
397 return
398 }
399 continue
400 }
401 last = status
402 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
403 status, ref.PipelineRunName, nil, nil); err != nil {
404 logger.Error("publish tekton status", "err", err, "status", status)
405 continue
406 }
407 if terminal {
408 return
409 }
410 }
411 }
412}
413
414// mapTektonPipelineRunStatus translates Tekton's Succeeded condition
415// into the Tangled status strings consumed by the appview.
416func mapTektonPipelineRunStatus(obj *unstructured.Unstructured) (status string, terminal bool, ok bool) {
417 conditions, ok, _ := unstructured.NestedSlice(obj.Object, "status", "conditions")
418 if !ok || len(conditions) == 0 {
419 return "", false, false
420 }
421 for _, raw := range conditions {
422 cond, _ := raw.(map[string]interface{})
423 if cond["type"] != "Succeeded" {
424 continue
425 }
426 condStatus, _ := cond["status"].(string)
427 reason, _ := cond["reason"].(string)
428 switch condStatus {
429 case "True":
430 return "success", true, true
431 case "False":
432 if tektonReasonCancelled(reason) {
433 return "cancelled", true, true
434 }
435 return "failed", true, true
436 case "Unknown":
437 return "running", false, true
438 default:
439 return "", false, false
440 }
441 }
442 return "", false, false
443}
444
445func tektonReasonCancelled(reason string) bool {
446 r := strings.ToLower(reason)
447 return strings.Contains(r, "cancel") || strings.Contains(r, "stop")
448}
449
450func (p *tektonProvider) Logs(
451 ctx context.Context,
452 knot string,
453 pipelineRkey string,
454 workflow string,
455) (<-chan LogLine, error) {
456 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow)
457 if err != nil {
458 return nil, fmt.Errorf("lookup tekton run mapping: %w", err)
459 }
460 if ref == nil {
461 return nil, ErrLogsNotFound
462 }
463
464 taskRuns, err := p.taskRunsForPipelineRun(ctx, *ref)
465 if err != nil {
466 return nil, err
467 }
468 if len(taskRuns) == 0 {
469 return nil, ErrLogsNotFound
470 }
471
472 out := make(chan LogLine, 32)
473 go func() {
474 defer close(out)
475 stepID := 0
476 for _, tr := range taskRuns {
477 taskName := tr.GetName()
478 if taskName == "" {
479 taskName = fmt.Sprintf("task %d", stepID)
480 }
481 if !sendLine(ctx, out, LogLine{
482 Kind: LogKindControl,
483 Time: time.Now(),
484 Content: taskName,
485 StepId: stepID,
486 StepStatus: StepStatusStart,
487 }) {
488 return
489 }
490
491 p.streamTaskRunLogs(ctx, out, *ref, tr, stepID)
492
493 if !sendLine(ctx, out, LogLine{
494 Kind: LogKindControl,
495 Time: time.Now(),
496 Content: taskName,
497 StepId: stepID,
498 StepStatus: StepStatusEnd,
499 }) {
500 return
501 }
502 stepID++
503 }
504 }()
505 return out, nil
506}
507
508func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]unstructured.Unstructured, error) {
509 sel := labels.Set{"tekton.dev/pipelineRun": ref.PipelineRunName}.String()
510 list, err := p.dyn.Resource(taskRunsGVR).Namespace(ref.Namespace).
511 List(ctx, metav1.ListOptions{LabelSelector: sel})
512 if err != nil {
513 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err)
514 }
515 items := append([]unstructured.Unstructured(nil), list.Items...)
516 sort.Slice(items, func(i, j int) bool {
517 ti := items[i].GetCreationTimestamp()
518 tj := items[j].GetCreationTimestamp()
519 return ti.Before(&tj)
520 })
521 return items, nil
522}
523
524func (p *tektonProvider) streamTaskRunLogs(
525 ctx context.Context,
526 out chan<- LogLine,
527 ref TektonRunRef,
528 tr unstructured.Unstructured,
529 stepID int,
530) {
531 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName())
532 if err != nil {
533 p.log.Debug("list pods for TaskRun", "err", err,
534 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName,
535 )
536 return
537 }
538 for _, pod := range pods {
539 for _, c := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
540 req := p.kube.CoreV1().Pods(ref.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
541 Container: c.Name,
542 })
543 rc, err := req.Stream(ctx)
544 if err != nil {
545 p.log.Debug("stream pod logs", "err", err,
546 "pod", pod.Name, "container", c.Name,
547 )
548 continue
549 }
550 p.sendReaderLines(ctx, out, rc, stepID)
551 _ = rc.Close()
552 }
553 }
554}
555
556func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]corev1.Pod, error) {
557 sel := labels.Set{"tekton.dev/taskRun": taskRun}.String()
558 list, err := p.kube.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
559 LabelSelector: sel,
560 })
561 if err != nil {
562 return nil, fmt.Errorf("list pods: %w", err)
563 }
564 pods := append([]corev1.Pod(nil), list.Items...)
565 sort.Slice(pods, func(i, j int) bool {
566 return pods[i].CreationTimestamp.Before(&pods[j].CreationTimestamp)
567 })
568 return pods, nil
569}
570
571func (p *tektonProvider) sendReaderLines(
572 ctx context.Context,
573 out chan<- LogLine,
574 rc io.Reader,
575 stepID int,
576) {
577 scanner := bufio.NewScanner(rc)
578 for scanner.Scan() {
579 if !sendLine(ctx, out, LogLine{
580 Kind: LogKindData,
581 Time: time.Now(),
582 Content: scanner.Text() + "\n",
583 StepId: stepID,
584 Stream: "stdout",
585 }) {
586 return
587 }
588 }
589 if err := scanner.Err(); err != nil {
590 p.log.Debug("scan pod log", "err", err)
591 }
592}
593
594func (p *tektonProvider) publishStatus(
595 ctx context.Context,
596 pipelineURI, workflow, status, runName string,
597 errMsg *string,
598 exitCode *int64,
599) error {
600 rec := tangled.PipelineStatus{
601 LexiconTypeID: tangled.PipelineStatusNSID,
602 Pipeline: pipelineURI,
603 Workflow: workflow,
604 Status: status,
605 CreatedAt: time.Now().UTC().Format(time.RFC3339),
606 Error: errMsg,
607 ExitCode: exitCode,
608 }
609 body, err := json.Marshal(rec)
610 if err != nil {
611 return fmt.Errorf("marshal pipeline.status: %w", err)
612 }
613 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano())
614 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
615 return fmt.Errorf("publish pipeline.status: %w", err)
616 }
617 return nil
618}
619
620func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string {
621 h := sha256.Sum256([]byte(strings.Join(
622 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00",
623 )))
624 suffix := hex.EncodeToString(h[:])[:12]
625 base := dnsLabel("tack-" + workflow)
626 maxBase := 63 - len(suffix) - 1
627 if len(base) > maxBase {
628 base = strings.TrimRight(base[:maxBase], "-")
629 }
630 if base == "" {
631 base = "tack"
632 }
633 return base + "-" + suffix
634}
635
636func dnsLabel(s string) string {
637 var b strings.Builder
638 lastDash := false
639 for _, r := range strings.ToLower(s) {
640 ok := unicode.IsLetter(r) || unicode.IsDigit(r)
641 if ok {
642 b.WriteRune(r)
643 lastDash = false
644 continue
645 }
646 if !lastDash {
647 b.WriteByte('-')
648 lastDash = true
649 }
650 }
651 return strings.Trim(b.String(), "-")
652}
653
654func labelValue(s string) string {
655 v := dnsLabel(s)
656 if len(v) > 63 {
657 v = strings.TrimRight(v[:63], "-")
658 }
659 if v == "" {
660 return "unknown"
661 }
662 return v
663}