Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// tektonProvider implements Provider by creating Tekton PipelineRuns 4// directly inside the Kubernetes cluster. Tack already receives and 5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would 6// duplicate the event-to-run translation layer instead of simplifying it. 7 8import ( 9 "bufio" 10 "context" 11 "crypto/sha256" 12 "encoding/hex" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "log/slog" 18 "sort" 19 "strings" 20 "time" 21 "unicode" 22 23 corev1 "k8s.io/api/core/v1" 24 apierrors "k8s.io/apimachinery/pkg/api/errors" 25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 26 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" 27 "k8s.io/apimachinery/pkg/fields" 28 "k8s.io/apimachinery/pkg/labels" 29 runtimeschema "k8s.io/apimachinery/pkg/runtime/schema" 30 "k8s.io/client-go/dynamic" 31 "k8s.io/client-go/kubernetes" 32 "k8s.io/client-go/rest" 33 "tangled.org/core/api/tangled" 34 35 "go.yaml.in/yaml/v2" 36) 37 38const ( 39 tektonAPIVersion = "tekton.dev/v1" 40 tektonRunKind = "PipelineRun" 41 42 tektonLabelManagedBy = "tack.mitchellh.com/managed-by" 43 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 44 tektonLabelWorkflow = "tack.mitchellh.com/workflow" 45 46 tektonAnnotationKnot = "tack.mitchellh.com/knot" 47 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 48 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow" 49 tektonAnnotationActor = "tack.mitchellh.com/actor" 50 tektonAnnotationCommit = "tack.mitchellh.com/commit" 51 tektonAnnotationBranch = "tack.mitchellh.com/branch" 52) 53 54var ( 55 pipelineRunsGVR = runtimeschema.GroupVersionResource{ 56 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns", 57 } 58 taskRunsGVR = runtimeschema.GroupVersionResource{ 59 Group: "tekton.dev", Version: "v1", Resource: "taskruns", 60 } 61) 62 63// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML. 64// `pipeline` names an existing in-cluster Tekton Pipeline. Params are 65// deliberately string-only in v1: tack is meant to select an existing 66// runner and pass a small amount of routing data, not mirror Tekton's 67// entire PipelineRun API. 68type tektonWorkflowConfig struct { 69 Pipeline string `yaml:"pipeline"` 70 ServiceAccount string `yaml:"service_account"` 71 Params map[string]string `yaml:"params"` 72} 73 74type tektonWorkflowDoc struct { 75 Tack struct { 76 Tekton tektonWorkflowConfig `yaml:"tekton"` 77 } `yaml:"tack"` 78} 79 80// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body. 81func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) { 82 if strings.TrimSpace(raw) == "" { 83 return nil, errors.New("workflow body is empty") 84 } 85 var doc tektonWorkflowDoc 86 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 87 return nil, fmt.Errorf("parse workflow yaml: %w", err) 88 } 89 cfg := doc.Tack.Tekton 90 if cfg.Pipeline == "" { 91 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required") 92 } 93 return &cfg, nil 94} 95 96type tektonProvider struct { 97 br *broker 98 st *store 99 log *slog.Logger 100 dyn dynamic.Interface 101 kube kubernetes.Interface 102 namespace string 103} 104 105var _ Provider = (*tektonProvider)(nil) 106 107func newTektonProvider( 108 br *broker, 109 st *store, 110 dyn dynamic.Interface, 111 kube kubernetes.Interface, 112 namespace string, 113 log *slog.Logger, 114) *tektonProvider { 115 return &tektonProvider{ 116 br: br, 117 st: st, 118 log: log.With("component", "provider", "kind", "tekton"), 119 dyn: dyn, 120 kube: kube, 121 namespace: namespace, 122 } 123} 124 125func newInClusterTektonProvider( 126 br *broker, 127 st *store, 128 namespace string, 129 log *slog.Logger, 130) (*tektonProvider, error) { 131 cfg, err := rest.InClusterConfig() 132 if err != nil { 133 return nil, fmt.Errorf("load in-cluster kubernetes config: %w", err) 134 } 135 dyn, err := dynamic.NewForConfig(cfg) 136 if err != nil { 137 return nil, fmt.Errorf("create dynamic kubernetes client: %w", err) 138 } 139 kube, err := kubernetes.NewForConfig(cfg) 140 if err != nil { 141 return nil, fmt.Errorf("create kubernetes client: %w", err) 142 } 143 return newTektonProvider(br, st, dyn, kube, namespace, log), nil 144} 145 146func (p *tektonProvider) Spawn( 147 ctx context.Context, 148 knot string, 149 pipelineRkey string, 150 actor string, 151 trigger *tangled.Pipeline_TriggerMetadata, 152 workflows []*tangled.Pipeline_Workflow, 153) { 154 if len(workflows) == 0 { 155 p.log.Warn("pipeline has no workflows; nothing to spawn", 156 "knot", knot, "rkey", pipelineRkey, 157 ) 158 return 159 } 160 for _, wf := range workflows { 161 if wf == nil || wf.Name == "" { 162 continue 163 } 164 wf := wf 165 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 166 } 167} 168 169func (p *tektonProvider) spawnWorkflow( 170 ctx context.Context, 171 knot string, 172 pipelineRkey string, 173 actor string, 174 trigger *tangled.Pipeline_TriggerMetadata, 175 wf *tangled.Pipeline_Workflow, 176) { 177 logger := p.log.With( 178 "knot", knot, 179 "pipeline_rkey", pipelineRkey, 180 "workflow", wf.Name, 181 "actor", actor, 182 ) 183 184 cfg, err := parseTektonWorkflowConfig(wf.Raw) 185 if err != nil { 186 logger.Error("invalid workflow config; refusing to spawn", "err", err) 187 return 188 } 189 commit, branch := triggerCommitAndBranch(trigger) 190 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch) 191 pr := buildTektonPipelineRun( 192 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf, 193 ) 194 195 runs := p.dyn.Resource(pipelineRunsGVR).Namespace(p.namespace) 196 created, err := runs.Create(ctx, pr, metav1.CreateOptions{}) 197 if apierrors.IsAlreadyExists(err) { 198 created, err = runs.Get(ctx, name, metav1.GetOptions{}) 199 } 200 if err != nil { 201 logger.Error("create tekton PipelineRun", "err", err, 202 "namespace", p.namespace, "pipeline_run", name, 203 "pipeline", cfg.Pipeline, 204 ) 205 return 206 } 207 208 ref := TektonRunRef{ 209 Knot: knot, 210 PipelineRkey: pipelineRkey, 211 Workflow: wf.Name, 212 Namespace: p.namespace, 213 PipelineRunName: name, 214 PipelineRunUID: string(created.GetUID()), 215 PipelineName: cfg.Pipeline, 216 PipelineURI: pipelineATURI(knot, pipelineRkey), 217 } 218 if err := p.st.InsertTektonRun(ctx, ref); err != nil { 219 logger.Error("persist tekton run mapping", "err", err, 220 "pipeline_run", name, 221 ) 222 return 223 } 224 225 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name, 226 "pending", name, nil, nil); err != nil { 227 logger.Error("publish initial pending status", "err", err) 228 } 229 230 logger.Info("tekton PipelineRun created", 231 "namespace", p.namespace, 232 "pipeline", cfg.Pipeline, 233 "pipeline_run", name, 234 "uid", ref.PipelineRunUID, 235 ) 236 go p.watchPipelineRun(ctx, ref) 237} 238 239func buildTektonPipelineRun( 240 namespace, name string, 241 cfg *tektonWorkflowConfig, 242 knot, pipelineRkey, actor, commit, branch string, 243 wf *tangled.Pipeline_Workflow, 244) *unstructured.Unstructured { 245 obj := &unstructured.Unstructured{ 246 Object: map[string]interface{}{ 247 "apiVersion": tektonAPIVersion, 248 "kind": tektonRunKind, 249 "metadata": map[string]interface{}{ 250 "name": name, 251 "namespace": namespace, 252 "labels": map[string]interface{}{ 253 tektonLabelManagedBy: "tack", 254 tektonLabelPipelineRkey: labelValue(pipelineRkey), 255 tektonLabelWorkflow: labelValue(wf.Name), 256 }, 257 "annotations": map[string]interface{}{ 258 tektonAnnotationKnot: knot, 259 tektonAnnotationPipelineRkey: pipelineRkey, 260 tektonAnnotationWorkflow: wf.Name, 261 tektonAnnotationActor: actor, 262 tektonAnnotationCommit: commit, 263 tektonAnnotationBranch: branch, 264 }, 265 }, 266 "spec": map[string]interface{}{ 267 "pipelineRef": map[string]interface{}{ 268 "name": cfg.Pipeline, 269 }, 270 }, 271 }, 272 } 273 spec := obj.Object["spec"].(map[string]interface{}) 274 if cfg.ServiceAccount != "" { 275 spec["serviceAccountName"] = cfg.ServiceAccount 276 } 277 if len(cfg.Params) > 0 { 278 keys := make([]string, 0, len(cfg.Params)) 279 for key := range cfg.Params { 280 keys = append(keys, key) 281 } 282 sort.Strings(keys) 283 params := make([]interface{}, 0, len(keys)) 284 for _, key := range keys { 285 params = append(params, map[string]interface{}{ 286 "name": key, 287 "value": cfg.Params[key], 288 }) 289 } 290 spec["params"] = params 291 } 292 return obj 293} 294 295func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) { 296 logger := p.log.With( 297 "knot", ref.Knot, 298 "pipeline_rkey", ref.PipelineRkey, 299 "workflow", ref.Workflow, 300 "namespace", ref.Namespace, 301 "pipeline_run", ref.PipelineRunName, 302 ) 303 304 last := "" 305 if obj, err := p.dyn.Resource(pipelineRunsGVR).Namespace(ref.Namespace). 306 Get(ctx, ref.PipelineRunName, metav1.GetOptions{}); err == nil { 307 status, terminal, ok := mapTektonPipelineRunStatus(obj) 308 if ok { 309 last = status 310 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 311 status, ref.PipelineRunName, nil, nil); err != nil { 312 logger.Error("publish tekton status", "err", err, "status", status) 313 } 314 if terminal { 315 return 316 } 317 } 318 } else if apierrors.IsNotFound(err) { 319 logger.Warn("PipelineRun disappeared while watching") 320 return 321 } else { 322 logger.Debug("initial PipelineRun status read", "err", err) 323 } 324 325 w, err := p.dyn.Resource(pipelineRunsGVR).Namespace(ref.Namespace). 326 Watch(ctx, metav1.ListOptions{ 327 FieldSelector: fields.OneTermEqualSelector( 328 "metadata.name", ref.PipelineRunName, 329 ).String(), 330 }) 331 if err != nil { 332 logger.Debug("watch PipelineRun status; falling back to polling", "err", err) 333 p.pollPipelineRun(ctx, ref, logger, last) 334 return 335 } 336 defer w.Stop() 337 338 for { 339 select { 340 case <-ctx.Done(): 341 return 342 case ev, ok := <-w.ResultChan(): 343 if !ok { 344 p.pollPipelineRun(ctx, ref, logger, last) 345 return 346 } 347 obj, ok := ev.Object.(*unstructured.Unstructured) 348 if !ok { 349 continue 350 } 351 status, terminal, ok := mapTektonPipelineRunStatus(obj) 352 if !ok || status == last { 353 if terminal { 354 return 355 } 356 continue 357 } 358 last = status 359 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 360 status, ref.PipelineRunName, nil, nil); err != nil { 361 logger.Error("publish tekton status", "err", err, "status", status) 362 continue 363 } 364 if terminal { 365 return 366 } 367 } 368 } 369} 370 371func (p *tektonProvider) pollPipelineRun( 372 ctx context.Context, 373 ref TektonRunRef, 374 logger *slog.Logger, 375 last string, 376) { 377 ticker := time.NewTicker(5 * time.Second) 378 defer ticker.Stop() 379 for { 380 select { 381 case <-ctx.Done(): 382 return 383 case <-ticker.C: 384 obj, err := p.dyn.Resource(pipelineRunsGVR).Namespace(ref.Namespace). 385 Get(ctx, ref.PipelineRunName, metav1.GetOptions{}) 386 if apierrors.IsNotFound(err) { 387 logger.Warn("PipelineRun disappeared while watching") 388 return 389 } 390 if err != nil { 391 logger.Debug("get PipelineRun status", "err", err) 392 continue 393 } 394 status, terminal, ok := mapTektonPipelineRunStatus(obj) 395 if !ok || status == last { 396 if terminal { 397 return 398 } 399 continue 400 } 401 last = status 402 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 403 status, ref.PipelineRunName, nil, nil); err != nil { 404 logger.Error("publish tekton status", "err", err, "status", status) 405 continue 406 } 407 if terminal { 408 return 409 } 410 } 411 } 412} 413 414// mapTektonPipelineRunStatus translates Tekton's Succeeded condition 415// into the Tangled status strings consumed by the appview. 416func mapTektonPipelineRunStatus(obj *unstructured.Unstructured) (status string, terminal bool, ok bool) { 417 conditions, ok, _ := unstructured.NestedSlice(obj.Object, "status", "conditions") 418 if !ok || len(conditions) == 0 { 419 return "", false, false 420 } 421 for _, raw := range conditions { 422 cond, _ := raw.(map[string]interface{}) 423 if cond["type"] != "Succeeded" { 424 continue 425 } 426 condStatus, _ := cond["status"].(string) 427 reason, _ := cond["reason"].(string) 428 switch condStatus { 429 case "True": 430 return "success", true, true 431 case "False": 432 if tektonReasonCancelled(reason) { 433 return "cancelled", true, true 434 } 435 return "failed", true, true 436 case "Unknown": 437 return "running", false, true 438 default: 439 return "", false, false 440 } 441 } 442 return "", false, false 443} 444 445func tektonReasonCancelled(reason string) bool { 446 r := strings.ToLower(reason) 447 return strings.Contains(r, "cancel") || strings.Contains(r, "stop") 448} 449 450func (p *tektonProvider) Logs( 451 ctx context.Context, 452 knot string, 453 pipelineRkey string, 454 workflow string, 455) (<-chan LogLine, error) { 456 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow) 457 if err != nil { 458 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 459 } 460 if ref == nil { 461 return nil, ErrLogsNotFound 462 } 463 464 taskRuns, err := p.taskRunsForPipelineRun(ctx, *ref) 465 if err != nil { 466 return nil, err 467 } 468 if len(taskRuns) == 0 { 469 return nil, ErrLogsNotFound 470 } 471 472 out := make(chan LogLine, 32) 473 go func() { 474 defer close(out) 475 stepID := 0 476 for _, tr := range taskRuns { 477 taskName := tr.GetName() 478 if taskName == "" { 479 taskName = fmt.Sprintf("task %d", stepID) 480 } 481 if !sendLine(ctx, out, LogLine{ 482 Kind: LogKindControl, 483 Time: time.Now(), 484 Content: taskName, 485 StepId: stepID, 486 StepStatus: StepStatusStart, 487 }) { 488 return 489 } 490 491 p.streamTaskRunLogs(ctx, out, *ref, tr, stepID) 492 493 if !sendLine(ctx, out, LogLine{ 494 Kind: LogKindControl, 495 Time: time.Now(), 496 Content: taskName, 497 StepId: stepID, 498 StepStatus: StepStatusEnd, 499 }) { 500 return 501 } 502 stepID++ 503 } 504 }() 505 return out, nil 506} 507 508func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]unstructured.Unstructured, error) { 509 sel := labels.Set{"tekton.dev/pipelineRun": ref.PipelineRunName}.String() 510 list, err := p.dyn.Resource(taskRunsGVR).Namespace(ref.Namespace). 511 List(ctx, metav1.ListOptions{LabelSelector: sel}) 512 if err != nil { 513 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err) 514 } 515 items := append([]unstructured.Unstructured(nil), list.Items...) 516 sort.Slice(items, func(i, j int) bool { 517 ti := items[i].GetCreationTimestamp() 518 tj := items[j].GetCreationTimestamp() 519 return ti.Before(&tj) 520 }) 521 return items, nil 522} 523 524func (p *tektonProvider) streamTaskRunLogs( 525 ctx context.Context, 526 out chan<- LogLine, 527 ref TektonRunRef, 528 tr unstructured.Unstructured, 529 stepID int, 530) { 531 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName()) 532 if err != nil { 533 p.log.Debug("list pods for TaskRun", "err", err, 534 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName, 535 ) 536 return 537 } 538 for _, pod := range pods { 539 for _, c := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { 540 req := p.kube.CoreV1().Pods(ref.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ 541 Container: c.Name, 542 }) 543 rc, err := req.Stream(ctx) 544 if err != nil { 545 p.log.Debug("stream pod logs", "err", err, 546 "pod", pod.Name, "container", c.Name, 547 ) 548 continue 549 } 550 p.sendReaderLines(ctx, out, rc, stepID) 551 _ = rc.Close() 552 } 553 } 554} 555 556func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]corev1.Pod, error) { 557 sel := labels.Set{"tekton.dev/taskRun": taskRun}.String() 558 list, err := p.kube.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ 559 LabelSelector: sel, 560 }) 561 if err != nil { 562 return nil, fmt.Errorf("list pods: %w", err) 563 } 564 pods := append([]corev1.Pod(nil), list.Items...) 565 sort.Slice(pods, func(i, j int) bool { 566 return pods[i].CreationTimestamp.Before(&pods[j].CreationTimestamp) 567 }) 568 return pods, nil 569} 570 571func (p *tektonProvider) sendReaderLines( 572 ctx context.Context, 573 out chan<- LogLine, 574 rc io.Reader, 575 stepID int, 576) { 577 scanner := bufio.NewScanner(rc) 578 for scanner.Scan() { 579 if !sendLine(ctx, out, LogLine{ 580 Kind: LogKindData, 581 Time: time.Now(), 582 Content: scanner.Text() + "\n", 583 StepId: stepID, 584 Stream: "stdout", 585 }) { 586 return 587 } 588 } 589 if err := scanner.Err(); err != nil { 590 p.log.Debug("scan pod log", "err", err) 591 } 592} 593 594func (p *tektonProvider) publishStatus( 595 ctx context.Context, 596 pipelineURI, workflow, status, runName string, 597 errMsg *string, 598 exitCode *int64, 599) error { 600 rec := tangled.PipelineStatus{ 601 LexiconTypeID: tangled.PipelineStatusNSID, 602 Pipeline: pipelineURI, 603 Workflow: workflow, 604 Status: status, 605 CreatedAt: time.Now().UTC().Format(time.RFC3339), 606 Error: errMsg, 607 ExitCode: exitCode, 608 } 609 body, err := json.Marshal(rec) 610 if err != nil { 611 return fmt.Errorf("marshal pipeline.status: %w", err) 612 } 613 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano()) 614 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 615 return fmt.Errorf("publish pipeline.status: %w", err) 616 } 617 return nil 618} 619 620func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string { 621 h := sha256.Sum256([]byte(strings.Join( 622 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00", 623 ))) 624 suffix := hex.EncodeToString(h[:])[:12] 625 base := dnsLabel("tack-" + workflow) 626 maxBase := 63 - len(suffix) - 1 627 if len(base) > maxBase { 628 base = strings.TrimRight(base[:maxBase], "-") 629 } 630 if base == "" { 631 base = "tack" 632 } 633 return base + "-" + suffix 634} 635 636func dnsLabel(s string) string { 637 var b strings.Builder 638 lastDash := false 639 for _, r := range strings.ToLower(s) { 640 ok := unicode.IsLetter(r) || unicode.IsDigit(r) 641 if ok { 642 b.WriteRune(r) 643 lastDash = false 644 continue 645 } 646 if !lastDash { 647 b.WriteByte('-') 648 lastDash = true 649 } 650 } 651 return strings.Trim(b.String(), "-") 652} 653 654func labelValue(s string) string { 655 v := dnsLabel(s) 656 if len(v) > 63 { 657 v = strings.TrimRight(v[:63], "-") 658 } 659 if v == "" { 660 return "unknown" 661 } 662 return v 663}