Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// tektonProvider implements Provider by creating Tekton PipelineRuns 4// directly inside the Kubernetes cluster. Tack already receives and 5// authorizes Tangled pipeline triggers, so adding Tekton Triggers would 6// duplicate the event-to-run translation layer instead of simplifying it. 7 8import ( 9 "bufio" 10 "context" 11 "crypto/sha256" 12 "encoding/hex" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "log/slog" 18 "sort" 19 "strings" 20 "time" 21 "unicode" 22 23 "tangled.org/core/api/tangled" 24 25 "go.mitchellh.com/tack/internal/k8s" 26 "go.yaml.in/yaml/v2" 27) 28 29const ( 30 tektonAPIVersion = "tekton.dev/v1" 31 tektonRunKind = "PipelineRun" 32 33 tektonLabelManagedBy = "tack.mitchellh.com/managed-by" 34 tektonLabelPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 35 tektonLabelWorkflow = "tack.mitchellh.com/workflow" 36 37 tektonAnnotationKnot = "tack.mitchellh.com/knot" 38 tektonAnnotationPipelineRkey = "tack.mitchellh.com/pipeline-rkey" 39 tektonAnnotationWorkflow = "tack.mitchellh.com/workflow" 40 tektonAnnotationActor = "tack.mitchellh.com/actor" 41 tektonAnnotationCommit = "tack.mitchellh.com/commit" 42 tektonAnnotationBranch = "tack.mitchellh.com/branch" 43) 44 45var ( 46 pipelineRunsGVR = k8s.GVR{ 47 Group: "tekton.dev", Version: "v1", Resource: "pipelineruns", 48 } 49 taskRunsGVR = k8s.GVR{ 50 Group: "tekton.dev", Version: "v1", Resource: "taskruns", 51 } 52) 53 54// tektonWorkflowConfig is the Tekton-specific subset of workflow YAML. 55// `pipeline` names an existing in-cluster Tekton Pipeline. Params are 56// deliberately string-only in v1: tack is meant to select an existing 57// runner and pass a small amount of routing data, not mirror Tekton's 58// entire PipelineRun API. 59type tektonWorkflowConfig struct { 60 Pipeline string `yaml:"pipeline"` 61 ServiceAccount string `yaml:"service_account"` 62 Params map[string]string `yaml:"params"` 63} 64 65type tektonWorkflowDoc struct { 66 Tack struct { 67 Tekton tektonWorkflowConfig `yaml:"tekton"` 68 } `yaml:"tack"` 69} 70 71// parseTektonWorkflowConfig decodes `tack.tekton` from a workflow body. 72func parseTektonWorkflowConfig(raw string) (*tektonWorkflowConfig, error) { 73 if strings.TrimSpace(raw) == "" { 74 return nil, errors.New("workflow body is empty") 75 } 76 var doc tektonWorkflowDoc 77 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 78 return nil, fmt.Errorf("parse workflow yaml: %w", err) 79 } 80 cfg := doc.Tack.Tekton 81 if cfg.Pipeline == "" { 82 return nil, errors.New("workflow yaml: `tack.tekton.pipeline` is required") 83 } 84 return &cfg, nil 85} 86 87type tektonProvider struct { 88 br *broker 89 st *store 90 log *slog.Logger 91 client k8s.Client 92 namespace string 93} 94 95var _ Provider = (*tektonProvider)(nil) 96 97func newTektonProvider( 98 br *broker, 99 st *store, 100 client k8s.Client, 101 namespace string, 102 log *slog.Logger, 103) *tektonProvider { 104 return &tektonProvider{ 105 br: br, 106 st: st, 107 log: log.With("component", "provider", "kind", "tekton"), 108 client: client, 109 namespace: namespace, 110 } 111} 112 113func newInClusterTektonProvider( 114 br *broker, 115 st *store, 116 namespace string, 117 log *slog.Logger, 118) (*tektonProvider, error) { 119 client, err := k8s.NewInClusterClient() 120 if err != nil { 121 return nil, fmt.Errorf("configure in-cluster kubernetes client: %w", err) 122 } 123 return newTektonProvider(br, st, client, namespace, log), nil 124} 125 126func (p *tektonProvider) Spawn( 127 ctx context.Context, 128 knot string, 129 pipelineRkey string, 130 actor string, 131 trigger *tangled.Pipeline_TriggerMetadata, 132 workflows []*tangled.Pipeline_Workflow, 133) { 134 if len(workflows) == 0 { 135 p.log.Warn("pipeline has no workflows; nothing to spawn", 136 "knot", knot, "rkey", pipelineRkey, 137 ) 138 return 139 } 140 for _, wf := range workflows { 141 if wf == nil || wf.Name == "" { 142 continue 143 } 144 wf := wf 145 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 146 } 147} 148 149func (p *tektonProvider) spawnWorkflow( 150 ctx context.Context, 151 knot string, 152 pipelineRkey string, 153 actor string, 154 trigger *tangled.Pipeline_TriggerMetadata, 155 wf *tangled.Pipeline_Workflow, 156) { 157 logger := p.log.With( 158 "knot", knot, 159 "pipeline_rkey", pipelineRkey, 160 "workflow", wf.Name, 161 "actor", actor, 162 ) 163 164 cfg, err := parseTektonWorkflowConfig(wf.Raw) 165 if err != nil { 166 logger.Error("invalid workflow config; refusing to spawn", "err", err) 167 return 168 } 169 commit, branch := triggerCommitAndBranch(trigger) 170 name := tektonPipelineRunName(knot, pipelineRkey, wf.Name, commit, branch) 171 pr := buildTektonPipelineRun( 172 p.namespace, name, cfg, knot, pipelineRkey, actor, commit, branch, wf, 173 ) 174 175 created, err := p.client.CreateObject(ctx, pipelineRunsGVR, p.namespace, pr) 176 if errors.Is(err, k8s.ErrAlreadyExists) { 177 created, err = p.client.GetObject(ctx, pipelineRunsGVR, p.namespace, name) 178 } 179 if err != nil { 180 logger.Error("create tekton PipelineRun", "err", err, 181 "namespace", p.namespace, "pipeline_run", name, 182 "pipeline", cfg.Pipeline, 183 ) 184 return 185 } 186 187 ref := TektonRunRef{ 188 Knot: knot, 189 PipelineRkey: pipelineRkey, 190 Workflow: wf.Name, 191 Namespace: p.namespace, 192 PipelineRunName: name, 193 PipelineRunUID: created.GetUID(), 194 PipelineName: cfg.Pipeline, 195 PipelineURI: pipelineATURI(knot, pipelineRkey), 196 } 197 if err := p.st.InsertTektonRun(ctx, ref); err != nil { 198 logger.Error("persist tekton run mapping", "err", err, 199 "pipeline_run", name, 200 ) 201 return 202 } 203 204 if err := p.publishStatus(ctx, ref.PipelineURI, wf.Name, 205 "pending", name, nil, nil); err != nil { 206 logger.Error("publish initial pending status", "err", err) 207 } 208 209 logger.Info("tekton PipelineRun created", 210 "namespace", p.namespace, 211 "pipeline", cfg.Pipeline, 212 "pipeline_run", name, 213 "uid", ref.PipelineRunUID, 214 ) 215 go p.watchPipelineRun(ctx, ref) 216} 217 218func buildTektonPipelineRun( 219 namespace, name string, 220 cfg *tektonWorkflowConfig, 221 knot, pipelineRkey, actor, commit, branch string, 222 wf *tangled.Pipeline_Workflow, 223) k8s.Object { 224 obj := k8s.Object{ 225 "apiVersion": tektonAPIVersion, 226 "kind": tektonRunKind, 227 "metadata": map[string]any{ 228 "name": name, 229 "namespace": namespace, 230 "labels": map[string]any{ 231 tektonLabelManagedBy: "tack", 232 tektonLabelPipelineRkey: labelValue(pipelineRkey), 233 tektonLabelWorkflow: labelValue(wf.Name), 234 }, 235 "annotations": map[string]any{ 236 tektonAnnotationKnot: knot, 237 tektonAnnotationPipelineRkey: pipelineRkey, 238 tektonAnnotationWorkflow: wf.Name, 239 tektonAnnotationActor: actor, 240 tektonAnnotationCommit: commit, 241 tektonAnnotationBranch: branch, 242 }, 243 }, 244 "spec": map[string]any{ 245 "pipelineRef": map[string]any{ 246 "name": cfg.Pipeline, 247 }, 248 }, 249 } 250 spec := obj["spec"].(map[string]any) 251 if cfg.ServiceAccount != "" { 252 spec["serviceAccountName"] = cfg.ServiceAccount 253 } 254 if len(cfg.Params) > 0 { 255 keys := make([]string, 0, len(cfg.Params)) 256 for key := range cfg.Params { 257 keys = append(keys, key) 258 } 259 sort.Strings(keys) 260 params := make([]any, 0, len(keys)) 261 for _, key := range keys { 262 params = append(params, map[string]any{ 263 "name": key, 264 "value": cfg.Params[key], 265 }) 266 } 267 spec["params"] = params 268 } 269 return obj 270} 271 272func (p *tektonProvider) watchPipelineRun(ctx context.Context, ref TektonRunRef) { 273 logger := p.log.With( 274 "knot", ref.Knot, 275 "pipeline_rkey", ref.PipelineRkey, 276 "workflow", ref.Workflow, 277 "namespace", ref.Namespace, 278 "pipeline_run", ref.PipelineRunName, 279 ) 280 281 logger.Debug("watchPipelineRun: starting") 282 283 last := "" 284 if obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 285 ref.PipelineRunName); err == nil { 286 status, terminal, ok := mapTektonPipelineRunStatus(obj) 287 logger.Debug("watchPipelineRun: initial status read", 288 "status", status, "terminal", terminal, "ok", ok, 289 ) 290 if ok { 291 last = status 292 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 293 status, ref.PipelineRunName, nil, nil); err != nil { 294 logger.Error("publish tekton status", "err", err, "status", status) 295 } 296 if terminal { 297 logger.Debug("watchPipelineRun: already terminal on initial read; exiting", "status", status) 298 return 299 } 300 } 301 } else if errors.Is(err, k8s.ErrNotFound) { 302 logger.Warn("PipelineRun disappeared while watching") 303 return 304 } else { 305 logger.Debug("initial PipelineRun status read", "err", err) 306 } 307 308 w, err := p.client.WatchObjects(ctx, pipelineRunsGVR, ref.Namespace, 309 k8s.ListOptions{FieldSelector: "metadata.name=" + ref.PipelineRunName}, 310 ) 311 if err != nil { 312 logger.Debug("watchPipelineRun: watch failed; falling back to polling", "err", err) 313 p.pollPipelineRun(ctx, ref, logger, last) 314 return 315 } 316 defer w.Stop() 317 318 logger.Debug("watchPipelineRun: watch established; entering event loop") 319 for { 320 select { 321 case <-ctx.Done(): 322 logger.Debug("watchPipelineRun: context cancelled") 323 return 324 case ev, ok := <-w.ResultChan(): 325 if !ok { 326 logger.Debug("watchPipelineRun: watch channel closed; falling back to polling") 327 p.pollPipelineRun(ctx, ref, logger, last) 328 return 329 } 330 status, terminal, ok := mapTektonPipelineRunStatus(ev.Object) 331 logger.Debug("watchPipelineRun: watch event", 332 "event_type", ev.Type, 333 "status", status, "terminal", terminal, "ok", ok, "last", last, 334 ) 335 if !ok || status == last { 336 if terminal { 337 logger.Debug("watchPipelineRun: terminal status unchanged; exiting", "status", status) 338 return 339 } 340 continue 341 } 342 last = status 343 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 344 status, ref.PipelineRunName, nil, nil); err != nil { 345 logger.Error("publish tekton status", "err", err, "status", status) 346 continue 347 } 348 logger.Debug("watchPipelineRun: published status", "status", status, "terminal", terminal) 349 if terminal { 350 logger.Debug("watchPipelineRun: terminal status reached; exiting", "status", status) 351 return 352 } 353 } 354 } 355} 356 357func (p *tektonProvider) pollPipelineRun( 358 ctx context.Context, 359 ref TektonRunRef, 360 logger *slog.Logger, 361 last string, 362) { 363 logger.Debug("pollPipelineRun: starting poll loop", "interval", "5s") 364 ticker := time.NewTicker(5 * time.Second) 365 defer ticker.Stop() 366 for { 367 select { 368 case <-ctx.Done(): 369 logger.Debug("pollPipelineRun: context cancelled") 370 return 371 case <-ticker.C: 372 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 373 ref.PipelineRunName, 374 ) 375 if errors.Is(err, k8s.ErrNotFound) { 376 logger.Warn("PipelineRun disappeared while watching") 377 return 378 } 379 if err != nil { 380 logger.Debug("get PipelineRun status", "err", err) 381 continue 382 } 383 status, terminal, ok := mapTektonPipelineRunStatus(obj) 384 logger.Debug("pollPipelineRun: poll tick", 385 "status", status, "terminal", terminal, "ok", ok, "last", last, 386 ) 387 if !ok || status == last { 388 if terminal { 389 logger.Debug("pollPipelineRun: terminal status unchanged; exiting", "status", status) 390 return 391 } 392 continue 393 } 394 last = status 395 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 396 status, ref.PipelineRunName, nil, nil); err != nil { 397 logger.Error("publish tekton status", "err", err, "status", status) 398 continue 399 } 400 logger.Debug("pollPipelineRun: published status", "status", status, "terminal", terminal) 401 if terminal { 402 logger.Debug("pollPipelineRun: terminal status reached; exiting", "status", status) 403 return 404 } 405 } 406 } 407} 408 409// mapTektonPipelineRunStatus translates Tekton's Succeeded condition 410// into the Tangled status strings consumed by the appview. 411func mapTektonPipelineRunStatus(obj k8s.Object) (status string, terminal bool, ok bool) { 412 conditions, ok := obj.NestedSlice("status", "conditions") 413 if !ok || len(conditions) == 0 { 414 slog.Debug("mapTektonPipelineRunStatus: no conditions found", 415 "pipeline_run", obj.GetName(), 416 ) 417 return "", false, false 418 } 419 for _, raw := range conditions { 420 cond, _ := raw.(map[string]interface{}) 421 condType, _ := cond["type"].(string) 422 condStatus, _ := cond["status"].(string) 423 reason, _ := cond["reason"].(string) 424 message, _ := cond["message"].(string) 425 slog.Debug("mapTektonPipelineRunStatus: condition", 426 "pipeline_run", obj.GetName(), 427 "type", condType, 428 "status", condStatus, 429 "reason", reason, 430 "message", message, 431 ) 432 if condType != "Succeeded" { 433 continue 434 } 435 switch condStatus { 436 case "True": 437 return "success", true, true 438 case "False": 439 if tektonReasonCancelled(reason) { 440 return "cancelled", true, true 441 } 442 return "failed", true, true 443 case "Unknown": 444 return "running", false, true 445 default: 446 return "", false, false 447 } 448 } 449 return "", false, false 450} 451 452func tektonReasonCancelled(reason string) bool { 453 r := strings.ToLower(reason) 454 return strings.Contains(r, "cancel") || strings.Contains(r, "stop") 455} 456 457func (p *tektonProvider) Logs( 458 ctx context.Context, 459 knot string, 460 pipelineRkey string, 461 workflow string, 462) (<-chan LogLine, error) { 463 ref, err := p.st.LookupTektonRunByTuple(ctx, knot, pipelineRkey, workflow) 464 if err != nil { 465 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 466 } 467 if ref == nil { 468 return nil, ErrLogsNotFound 469 } 470 471 taskRuns, err := p.taskRunsForPipelineRun(ctx, *ref) 472 if err != nil { 473 return nil, err 474 } 475 p.log.Debug("Logs: found TaskRuns for PipelineRun", 476 "pipeline_run", ref.PipelineRunName, "count", len(taskRuns), 477 ) 478 if len(taskRuns) == 0 { 479 return nil, ErrLogsNotFound 480 } 481 482 terminal := p.isPipelineRunTerminal(ctx, *ref) 483 p.log.Debug("Logs: pipeline run terminal state", "pipeline_run", ref.PipelineRunName, "terminal", terminal) 484 485 out := make(chan LogLine, 32) 486 go func() { 487 defer close(out) 488 stepID := 0 489 for _, tr := range taskRuns { 490 taskName := tr.GetName() 491 if taskName == "" { 492 taskName = fmt.Sprintf("task %d", stepID) 493 } 494 p.log.Debug("Logs: streaming TaskRun", "task_run", taskName, "step_id", stepID, "terminal", terminal) 495 if !sendLine(ctx, out, LogLine{ 496 Kind: LogKindControl, 497 Time: time.Now(), 498 Content: taskName, 499 StepId: stepID, 500 StepStatus: StepStatusStart, 501 }) { 502 return 503 } 504 505 if terminal { 506 p.fetchCompletedTaskRunLogs(ctx, out, *ref, tr, stepID) 507 } else { 508 p.streamTaskRunLogs(ctx, out, *ref, tr, stepID) 509 } 510 511 if !sendLine(ctx, out, LogLine{ 512 Kind: LogKindControl, 513 Time: time.Now(), 514 Content: taskName, 515 StepId: stepID, 516 StepStatus: StepStatusEnd, 517 }) { 518 return 519 } 520 p.log.Debug("Logs: finished TaskRun", "task_run", taskName, "step_id", stepID) 521 stepID++ 522 } 523 p.log.Debug("Logs: all TaskRuns streamed", "pipeline_run", ref.PipelineRunName) 524 }() 525 return out, nil 526} 527 528// isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now. 529func (p *tektonProvider) isPipelineRunTerminal(ctx context.Context, ref TektonRunRef) bool { 530 obj, err := p.client.GetObject(ctx, pipelineRunsGVR, ref.Namespace, 531 ref.PipelineRunName, 532 ) 533 if err != nil { 534 p.log.Debug("isPipelineRunTerminal: failed to get PipelineRun", "err", err, "pipeline_run", ref.PipelineRunName) 535 return false 536 } 537 _, terminal, ok := mapTektonPipelineRunStatus(obj) 538 p.log.Debug("isPipelineRunTerminal: status check", "pipeline_run", ref.PipelineRunName, "terminal", terminal, "ok", ok) 539 return ok && terminal 540} 541 542// fetchCompletedTaskRunLogs fetches all logs from a TaskRun that has already completed. 543// It reads each step container's logs in one shot using all-containers traversal, 544// and also inlines the Tekton step status from the TaskRun (exit code, reason) as 545// control messages so the caller gets full context without needing to watch for events. 546func (p *tektonProvider) fetchCompletedTaskRunLogs( 547 ctx context.Context, 548 out chan<- LogLine, 549 ref TektonRunRef, 550 tr k8s.Object, 551 stepID int, 552) { 553 trName := tr.GetName() 554 pods, err := p.podsForTaskRun(ctx, ref.Namespace, trName) 555 if err != nil { 556 p.log.Debug("fetchCompletedTaskRunLogs: list pods failed", "err", err, 557 "task_run", trName, "pipeline_run", ref.PipelineRunName, 558 ) 559 return 560 } 561 p.log.Debug("fetchCompletedTaskRunLogs: found pods", 562 "task_run", trName, "pod_count", len(pods), 563 ) 564 565 // Emit a summary line from the TaskRun status (steps[*].terminated) so we 566 // get exit codes and reasons even if the pod logs are sparse. 567 steps, _ := tr.NestedSlice("status", "steps") 568 for _, rawStep := range steps { 569 step, _ := rawStep.(map[string]any) 570 stepName, _ := step["name"].(string) 571 term, _ := step["terminated"].(map[string]any) 572 if term == nil { 573 continue 574 } 575 exitCode := numberToInt64(term["exitCode"]) 576 reason, _ := term["reason"].(string) 577 msg, _ := term["message"].(string) 578 line := fmt.Sprintf("[step %s] exit=%d reason=%s", stepName, exitCode, reason) 579 if msg != "" { 580 line += " " + msg 581 } 582 p.log.Debug("fetchCompletedTaskRunLogs: step terminated", 583 "task_run", trName, "step", stepName, 584 "exit_code", exitCode, "reason", reason, 585 ) 586 if !sendLine(ctx, out, LogLine{ 587 Kind: LogKindData, 588 Time: time.Now(), 589 Content: line + "\n", 590 StepId: stepID, 591 Stream: "stdout", 592 }) { 593 return 594 } 595 } 596 597 for _, pod := range pods { 598 containers := append([]k8s.Container(nil), pod.InitContainers...) 599 containers = append(containers, pod.Containers...) 600 p.log.Debug("fetchCompletedTaskRunLogs: reading pod containers", 601 "pod", pod.Name, "container_count", len(containers), 602 ) 603 for _, c := range containers { 604 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs", 605 "pod", pod.Name, "container", c.Name, 606 ) 607 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name) 608 if err != nil { 609 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err, 610 "pod", pod.Name, "container", c.Name, 611 ) 612 continue 613 } 614 p.sendReaderLines(ctx, out, rc, stepID) 615 _ = rc.Close() 616 p.log.Debug("fetchCompletedTaskRunLogs: done reading container", 617 "pod", pod.Name, "container", c.Name, 618 ) 619 } 620 } 621} 622 623func (p *tektonProvider) taskRunsForPipelineRun(ctx context.Context, ref TektonRunRef) ([]k8s.Object, error) { 624 list, err := p.client.ListObjects(ctx, taskRunsGVR, ref.Namespace, k8s.ListOptions{ 625 LabelSelector: "tekton.dev/pipelineRun=" + ref.PipelineRunName, 626 }) 627 if err != nil { 628 return nil, fmt.Errorf("list Tekton TaskRuns: %w", err) 629 } 630 items := append([]k8s.Object(nil), list...) 631 sort.Slice(items, func(i, j int) bool { 632 ti := items[i].GetCreationTimestamp() 633 tj := items[j].GetCreationTimestamp() 634 return ti.Before(tj) 635 }) 636 return items, nil 637} 638 639func (p *tektonProvider) streamTaskRunLogs( 640 ctx context.Context, 641 out chan<- LogLine, 642 ref TektonRunRef, 643 tr k8s.Object, 644 stepID int, 645) { 646 pods, err := p.podsForTaskRun(ctx, ref.Namespace, tr.GetName()) 647 if err != nil { 648 p.log.Debug("streamTaskRunLogs: list pods for TaskRun failed", "err", err, 649 "task_run", tr.GetName(), "pipeline_run", ref.PipelineRunName, 650 ) 651 return 652 } 653 p.log.Debug("streamTaskRunLogs: found pods", 654 "task_run", tr.GetName(), "pod_count", len(pods), 655 ) 656 for _, pod := range pods { 657 containers := append([]k8s.Container(nil), pod.InitContainers...) 658 containers = append(containers, pod.Containers...) 659 p.log.Debug("streamTaskRunLogs: streaming pod containers", 660 "pod", pod.Name, "container_count", len(containers), 661 ) 662 for _, c := range containers { 663 p.log.Debug("streamTaskRunLogs: streaming container", 664 "pod", pod.Name, "container", c.Name, "step_id", stepID, 665 ) 666 rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name) 667 if err != nil { 668 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err, 669 "pod", pod.Name, "container", c.Name, 670 ) 671 continue 672 } 673 p.sendReaderLines(ctx, out, rc, stepID) 674 _ = rc.Close() 675 p.log.Debug("streamTaskRunLogs: finished container", 676 "pod", pod.Name, "container", c.Name, 677 ) 678 } 679 } 680} 681 682func (p *tektonProvider) podsForTaskRun(ctx context.Context, namespace, taskRun string) ([]k8s.Pod, error) { 683 list, err := p.client.ListPods(ctx, namespace, 684 "tekton.dev/taskRun="+taskRun, 685 ) 686 if err != nil { 687 return nil, fmt.Errorf("list pods: %w", err) 688 } 689 pods := append([]k8s.Pod(nil), list...) 690 sort.Slice(pods, func(i, j int) bool { 691 return pods[i].CreationTimestamp.Before(pods[j].CreationTimestamp) 692 }) 693 return pods, nil 694} 695 696func (p *tektonProvider) sendReaderLines( 697 ctx context.Context, 698 out chan<- LogLine, 699 rc io.Reader, 700 stepID int, 701) { 702 scanner := bufio.NewScanner(rc) 703 for scanner.Scan() { 704 if !sendLine(ctx, out, LogLine{ 705 Kind: LogKindData, 706 Time: time.Now(), 707 Content: scanner.Text() + "\n", 708 StepId: stepID, 709 Stream: "stdout", 710 }) { 711 return 712 } 713 } 714 if err := scanner.Err(); err != nil { 715 p.log.Debug("scan pod log", "err", err) 716 } 717} 718 719func numberToInt64(value any) int64 { 720 switch v := value.(type) { 721 case int64: 722 return v 723 case int: 724 return int64(v) 725 case float64: 726 return int64(v) 727 case json.Number: 728 i, _ := v.Int64() 729 return i 730 default: 731 return 0 732 } 733} 734 735func (p *tektonProvider) publishStatus( 736 ctx context.Context, 737 pipelineURI, workflow, status, runName string, 738 errMsg *string, 739 exitCode *int64, 740) error { 741 rec := tangled.PipelineStatus{ 742 LexiconTypeID: tangled.PipelineStatusNSID, 743 Pipeline: pipelineURI, 744 Workflow: workflow, 745 Status: status, 746 CreatedAt: time.Now().UTC().Format(time.RFC3339), 747 Error: errMsg, 748 ExitCode: exitCode, 749 } 750 body, err := json.Marshal(rec) 751 if err != nil { 752 return fmt.Errorf("marshal pipeline.status: %w", err) 753 } 754 rkey := fmt.Sprintf("tk-%s-%s-%d", runName, status, time.Now().UnixNano()) 755 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 756 return fmt.Errorf("publish pipeline.status: %w", err) 757 } 758 return nil 759} 760 761func tektonPipelineRunName(knot, pipelineRkey, workflow, commit, branch string) string { 762 h := sha256.Sum256([]byte(strings.Join( 763 []string{knot, pipelineRkey, workflow, commit, branch}, "\x00", 764 ))) 765 suffix := hex.EncodeToString(h[:])[:12] 766 base := dnsLabel("tack-" + workflow) 767 maxBase := 63 - len(suffix) - 1 768 if len(base) > maxBase { 769 base = strings.TrimRight(base[:maxBase], "-") 770 } 771 if base == "" { 772 base = "tack" 773 } 774 return base + "-" + suffix 775} 776 777func dnsLabel(s string) string { 778 var b strings.Builder 779 lastDash := false 780 for _, r := range strings.ToLower(s) { 781 ok := unicode.IsLetter(r) || unicode.IsDigit(r) 782 if ok { 783 b.WriteRune(r) 784 lastDash = false 785 continue 786 } 787 if !lastDash { 788 b.WriteByte('-') 789 lastDash = true 790 } 791 } 792 return strings.Trim(b.String(), "-") 793} 794 795func labelValue(s string) string { 796 v := dnsLabel(s) 797 if len(v) > 63 { 798 v = strings.TrimRight(v[:63], "-") 799 } 800 if v == "" { 801 return "unknown" 802 } 803 return v 804}