Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// buildkiteProvider implements Provider against a real Buildkite 4// account. Spawn translates a Tangled pipeline trigger into one 5// Buildkite build per workflow; status updates flow back asynchronously 6// through the /webhooks/buildkite handler (see http.go), which looks 7// the build UUID up in the buildkite_builds table to recover the 8// (knot, pipelineRkey, workflow) tuple this provider persisted at 9// Spawn time and publishes a sh.tangled.pipeline.status record on 10// the in-process broker. 11// 12// The Buildkite *pipeline slug* a workflow targets is carried inside 13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured 14// globally on the spindle. That keeps tack a thin translator: the 15// repo author decides which Buildkite pipeline runs each Tangled 16// workflow without an operator round-trip. See workflowConfig below 17// for the supported YAML schema. 18 19import ( 20 "context" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "log/slog" 25 "net/http" 26 "strings" 27 "time" 28 29 "go.yaml.in/yaml/v2" 30 "tangled.org/core/api/tangled" 31 32 "go.mitchellh.com/tack/internal/buildkite" 33) 34 35// Buildkite-side meta_data keys carrying the Tangled identity of a 36// build. Mirrored into env vars (see envFromTuple) so an operator's 37// Buildkite pipeline script can also reach them via $TACK_*. They 38// stay tightly namespaced so a coexisting Buildkite job that uses 39// meta_data for its own purposes won't collide. 40const ( 41 bkMetaKnot = "tack:knot" 42 bkMetaPipelineRkey = "tack:pipeline_rkey" 43 bkMetaWorkflow = "tack:workflow" 44) 45 46// workflowConfig is the tack-flavoured schema we expect inside each 47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline` 48// slug is required; everything else is optional. Fields nest under 49// `tack: { buildkite: ... }` so the workflow YAML can grow other 50// top-level keys (Tangled's own scheduling fields, future provider 51// blocks) without colliding with our namespace. 52// 53// Fields map onto the Buildkite REST "Create a build" request 54// properties documented at 55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build 56// (see also the comment block on buildkite.CreateBuildRequest). We 57// expose only the small subset users genuinely need to override — 58// trigger metadata supplies commit/branch, and tack supplies the 59// identity env+meta the webhook handler relies on, so there's no 60// reason to let users re-specify those. 61type workflowConfig struct { 62 Tack tackConfig `yaml:"tack"` 63} 64 65// tackConfig is the per-provider block under the top-level `tack:` 66// key. Right now the only nested provider is Buildkite. 67type tackConfig struct { 68 Buildkite buildkiteConfig `yaml:"buildkite"` 69} 70 71// buildkiteConfig is the Buildkite-specific subset of workflowConfig. 72// 73// `org` lets a workflow target a Buildkite organisation other than 74// the spindle's default — useful when one tack instance fronts 75// multiple orgs. The configured API token must have access to that 76// org or the build creation request will 401/403; we surface that 77// error verbatim rather than guessing. 78// 79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout 80// is a *bool so omitting it leaves Buildkite's own default in place 81// instead of always shipping `false`. 82type buildkiteConfig struct { 83 Pipeline string `yaml:"pipeline"` 84 Org string `yaml:"org"` 85 CleanCheckout *bool `yaml:"clean_checkout"` 86} 87 88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig. 89// An empty body is treated as a structural error so spawnWorkflow can 90// short-circuit cleanly: a workflow with no body has nothing for tack 91// to do anyway. 92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) { 93 if strings.TrimSpace(raw) == "" { 94 return nil, errors.New("workflow body is empty") 95 } 96 var cfg workflowConfig 97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil { 98 return nil, fmt.Errorf("parse workflow yaml: %w", err) 99 } 100 bk := cfg.Tack.Buildkite 101 if bk.Pipeline == "" { 102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required") 103 } 104 return &bk, nil 105} 106 107// buildkiteProvider implements Provider. 108// 109// webhookSecret + webhookMode live on the provider rather than on 110// the HTTP server because the provider is the single owner of 111// "everything Buildkite-y": colocating the auth knob with the API 112// client and the state translator keeps configuration drift to one 113// place and makes the http.go side pure transport. 114// 115// defaultOrg is the Buildkite organisation the configured API token 116// belongs to. Workflows may opt into a different org via their YAML 117// `org` field; the API token then needs to be authorised against it. 118type buildkiteProvider struct { 119 br *broker 120 st *store 121 log *slog.Logger 122 client *buildkite.Client 123 defaultOrg string 124 webhookSecret string 125 webhookMode buildkite.WebhookMode 126} 127 128// Compile-time interface conformance check. 129var _ Provider = (*buildkiteProvider)(nil) 130 131// newBuildkiteProvider wires a provider to its Buildkite client and 132// to the broker it publishes pipeline.status records on. defaultOrg 133// is the org the API token authenticates against and the org used 134// when a workflow doesn't specify its own. webhookSecret/webhookMode 135// govern inbound /webhooks/buildkite request authentication. 136func newBuildkiteProvider( 137 br *broker, 138 st *store, 139 client *buildkite.Client, 140 defaultOrg string, 141 webhookSecret string, 142 webhookMode buildkite.WebhookMode, 143 log *slog.Logger, 144) *buildkiteProvider { 145 return &buildkiteProvider{ 146 br: br, 147 st: st, 148 log: log.With("component", "provider", "kind", "buildkite"), 149 client: client, 150 defaultOrg: defaultOrg, 151 webhookSecret: webhookSecret, 152 webhookMode: webhookMode, 153 } 154} 155 156// VerifyWebhook authenticates an inbound webhook request using 157// whichever mode the provider was configured with. Returns nil on 158// success; the HTTP handler maps any returned error to 401. 159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error { 160 switch p.webhookMode { 161 case buildkite.WebhookModeSignature: 162 return buildkite.VerifySignature( 163 headers.Get("X-Buildkite-Signature"), 164 p.webhookSecret, body, 165 ) 166 default: 167 // Token mode is the Buildkite default and our default, so 168 // any unrecognised value falls through to it rather than 169 // fail-closed at startup. 170 return buildkite.VerifyToken( 171 headers.Get("X-Buildkite-Token"), 172 p.webhookSecret, 173 ) 174 } 175} 176 177// Spawn satisfies Provider. For each workflow it fires a separate 178// Buildkite build off the pipeline named in that workflow's YAML so 179// each workflow gets its own status timeline. The actual API call 180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we 181// still want Spawn to be non-blocking per the interface contract. 182// 183// On a successful create we persist the build UUID → (knot, rkey, 184// workflow) mapping and publish a "pending" pipeline.status so the 185// appview sees activity immediately, instead of waiting for the 186// first webhook to land. 187func (p *buildkiteProvider) Spawn( 188 ctx context.Context, 189 knot string, 190 pipelineRkey string, 191 trigger *tangled.Pipeline_TriggerMetadata, 192 workflows []*tangled.Pipeline_Workflow, 193) { 194 if len(workflows) == 0 { 195 p.log.Warn("pipeline has no workflows; nothing to spawn", 196 "knot", knot, "rkey", pipelineRkey, 197 ) 198 return 199 } 200 201 for _, wf := range workflows { 202 if wf == nil || wf.Name == "" { 203 continue 204 } 205 wf := wf 206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf) 207 } 208} 209 210// spawnWorkflow does the per-workflow API + persistence work for 211// Spawn. Errors are logged with full context but not returned — 212// nothing in tack consumes the result, and a failed Spawn just 213// surfaces as the absence of any status update for the affected 214// workflow. 215func (p *buildkiteProvider) spawnWorkflow( 216 ctx context.Context, 217 knot string, 218 pipelineRkey string, 219 trigger *tangled.Pipeline_TriggerMetadata, 220 wf *tangled.Pipeline_Workflow, 221) { 222 logger := p.log.With( 223 "knot", knot, 224 "pipeline_rkey", pipelineRkey, 225 "workflow", wf.Name, 226 ) 227 228 cfg, err := parseWorkflowConfig(wf.Raw) 229 if err != nil { 230 // Bad workflow YAML is a user-facing config error: log it 231 // loudly and skip. Firing a build off some default would 232 // be more confusing than doing nothing. 233 logger.Error("invalid workflow config; refusing to spawn", "err", err) 234 return 235 } 236 logger = logger.With("pipeline", cfg.Pipeline) 237 238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf) 239 if err != nil { 240 logger.Error("build create request", "err", err) 241 return 242 } 243 244 org := cfg.Org 245 if org == "" { 246 org = p.defaultOrg 247 } 248 249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req) 250 if err != nil { 251 logger.Error("create buildkite build", "err", err, "org", org) 252 return 253 } 254 logger.Info("buildkite build created", 255 "build_uuid", build.ID, 256 "build_number", build.Number, 257 "web_url", build.WebURL, 258 "org", org, 259 ) 260 261 pipelineURI := pipelineATURI(knot, pipelineRkey) 262 // Persist the org we actually used. cfg.Org (not the resolved 263 // `org`) is intentional: empty means "fall back to defaultOrg 264 // at read time", which keeps the row meaningful even if the 265 // provider's defaultOrg changes later — and matches what older 266 // rows scan as before this column existed. 267 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{ 268 BuildUUID: build.ID, 269 BuildNumber: build.Number, 270 PipelineSlug: cfg.Pipeline, 271 Org: cfg.Org, 272 Knot: knot, 273 PipelineRkey: pipelineRkey, 274 Workflow: wf.Name, 275 PipelineURI: pipelineURI, 276 }); err != nil { 277 // Webhook handlers will fail to translate this build's 278 // events because they can't recover the tuple. Surface 279 // loudly and bail; we don't want a half-tracked build 280 // silently leaking status into the broker. 281 logger.Error("persist buildkite build mapping", "err", err, 282 "build_uuid", build.ID, 283 ) 284 return 285 } 286 287 // Initial status publish so the appview shows the build as 288 // queued without waiting for the first webhook. This mirrors 289 // the upstream spindle's "schedule then run" cadence. 290 if err := p.publishStatus( 291 ctx, pipelineURI, wf.Name, "pending", build.ID, 292 nil, nil, 293 ); err != nil { 294 logger.Error("publish initial pending status", "err", err) 295 } 296} 297 298// buildCreateRequest folds the parsed workflow config and the 299// Tangled trigger metadata into a single Buildkite create-build 300// payload. Trigger metadata supplies commit/branch; the workflow 301// YAML supplies the Buildkite routing knobs (pipeline/org) and the 302// small handful of build options we expose. 303// 304// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled 305// refs frequently don't match arbitrary Buildkite pipeline branch 306// filters, and a build silently dropped at create time is a worse 307// failure mode than running one we shouldn't have. Users wanting 308// the filter back are expected to drop the filter on the Buildkite 309// pipeline itself. 310// 311// Returns an error when the trigger lacks a commit — Buildkite's 312// API requires one and we'd rather log+skip than fire a build that 313// resolves to "whatever main happens to be". 314func (p *buildkiteProvider) buildCreateRequest( 315 cfg *buildkiteConfig, 316 trigger *tangled.Pipeline_TriggerMetadata, 317 knot, pipelineRkey string, 318 wf *tangled.Pipeline_Workflow, 319) (buildkite.CreateBuildRequest, error) { 320 commit, branch := triggerCommitAndBranch(trigger) 321 if commit == "" { 322 return buildkite.CreateBuildRequest{}, errors.New( 323 "trigger has no commit", 324 ) 325 } 326 327 cleanCheckout := false 328 if cfg.CleanCheckout != nil { 329 cleanCheckout = *cfg.CleanCheckout 330 } 331 332 req := buildkite.CreateBuildRequest{ 333 Commit: commit, 334 Branch: branch, 335 Message: fmt.Sprintf("tangled: %s", wf.Name), 336 Env: envFromTuple(knot, pipelineRkey, wf), 337 MetaData: map[string]string{ 338 bkMetaKnot: knot, 339 bkMetaPipelineRkey: pipelineRkey, 340 bkMetaWorkflow: wf.Name, 341 }, 342 CleanCheckout: cleanCheckout, 343 IgnorePipelineBranchFilters: true, 344 } 345 346 // Auto-populate Buildkite's PR fields from the Tangled PR 347 // trigger when present. Buildkite doesn't get a PR number from 348 // us (Tangled doesn't surface one through the trigger), but 349 // the base branch alone is enough for `pull_request_base_branch`- 350 // gated step filters to work. 351 if trigger != nil && trigger.PullRequest != nil { 352 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch 353 } 354 355 return req, nil 356} 357 358// Logs satisfies Provider. We resolve the (knot, rkey, workflow) 359// tuple to a Buildkite build via the store, fetch the current jobs 360// list, then drain each job's plain-text log into the channel as one 361// LogLine per output line. 362// 363// Per-job control frames bracket each job so the appview's renderer 364// has start/end markers to lay out timing — same shape as the fake 365// provider and the upstream spindle. 366// 367// This is a snapshot read, not a tail — finished or in-progress, we 368// fetch what's there and close. Live tailing would require Buildkite 369// agent log streaming, which the public REST API doesn't expose; the 370// appview's repeated /logs calls during a running build give us 371// "good enough" liveness without that complexity. 372func (p *buildkiteProvider) Logs( 373 ctx context.Context, 374 knot string, 375 pipelineRkey string, 376 workflow string, 377) (<-chan LogLine, error) { 378 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow) 379 if err != nil { 380 return nil, fmt.Errorf("lookup build mapping: %w", err) 381 } 382 if ref == nil { 383 return nil, ErrLogsNotFound 384 } 385 386 // Resolve the org against which we should pull jobs/logs. The 387 // workflow YAML can override the spindle's default at Spawn 388 // time (tack.buildkite.org), so we honour whatever the row 389 // recorded; an empty value means the workflow didn't override, 390 // which is the same as "use defaultOrg" — including for rows 391 // written before the org column was added. 392 org := ref.Org 393 if org == "" { 394 org = p.defaultOrg 395 } 396 397 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber) 398 if err != nil { 399 if errors.Is(err, buildkite.ErrNotFound) { 400 return nil, ErrLogsNotFound 401 } 402 return nil, fmt.Errorf("get build: %w", err) 403 } 404 405 out := make(chan LogLine, 32) 406 go func() { 407 defer close(out) 408 stepID := 0 409 for _, job := range build.Jobs { 410 if job.Type != "" && job.Type != "script" { 411 // Skip non-script jobs (waiter, manual, 412 // trigger). They have no log to fetch and 413 // surfacing empty steps just clutters the 414 // appview. 415 continue 416 } 417 name := job.Name 418 if name == "" { 419 name = fmt.Sprintf("job %s", job.ID) 420 } 421 422 if !sendLine(ctx, out, LogLine{ 423 Kind: LogKindControl, 424 Time: time.Now(), 425 Content: name, 426 StepId: stepID, 427 StepStatus: StepStatusStart, 428 }) { 429 return 430 } 431 432 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID) 433 if err != nil { 434 p.log.Debug("fetch job log", 435 "err", err, 436 "build_uuid", ref.BuildUUID, 437 "job_id", job.ID, 438 ) 439 // Don't fail the whole stream on one job; 440 // emit the end frame and move on so the 441 // appview at least sees what other jobs 442 // produced. 443 body = "" 444 } 445 446 // Buildkite injects per-line timestamp metadata as 447 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and 448 // some renderers downstream don't recognise the APC 449 // envelope, leaking the inner "_bk;t=…" payload into 450 // the displayed text. Strip them here so consumers 451 // only ever see the actual log content. 452 body = stripTerminal(body) 453 454 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 455 if line == "" { 456 // Skip the leading empty entry that 457 // Split produces for empty bodies. 458 continue 459 } 460 if !sendLine(ctx, out, LogLine{ 461 Kind: LogKindData, 462 Time: time.Now(), 463 Content: line + "\n", 464 StepId: stepID, 465 Stream: "stdout", 466 }) { 467 return 468 } 469 } 470 471 if !sendLine(ctx, out, LogLine{ 472 Kind: LogKindControl, 473 Time: time.Now(), 474 Content: name, 475 StepId: stepID, 476 StepStatus: StepStatusEnd, 477 }) { 478 return 479 } 480 stepID++ 481 } 482 }() 483 return out, nil 484} 485 486// publishStatus assembles a tangled.PipelineStatus record and pushes 487// it through the broker. buildUUID is mixed into the rkey so multiple 488// status events for the same workflow don't collide on the events 489// table's (rkey) uniqueness — and so an operator grepping the log 490// can find every record that pertains to a given Buildkite build. 491// 492// errMsg/exitCode are optional; pass nil for non-failure transitions. 493func (p *buildkiteProvider) publishStatus( 494 ctx context.Context, 495 pipelineURI, workflow, status, buildUUID string, 496 errMsg *string, 497 exitCode *int64, 498) error { 499 rec := tangled.PipelineStatus{ 500 LexiconTypeID: tangled.PipelineStatusNSID, 501 Pipeline: pipelineURI, 502 Workflow: workflow, 503 Status: status, 504 CreatedAt: time.Now().UTC().Format(time.RFC3339), 505 Error: errMsg, 506 ExitCode: exitCode, 507 } 508 body, err := json.Marshal(rec) 509 if err != nil { 510 return fmt.Errorf("marshal pipeline.status: %w", err) 511 } 512 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano()) 513 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 514 return fmt.Errorf("publish pipeline.status: %w", err) 515 } 516 return nil 517} 518 519// HandleWebhook applies a decoded Buildkite webhook payload: looks 520// the build up in the store, translates the Buildkite state into a 521// Tangled StatusKind, and publishes a pipeline.status record. Used 522// by the HTTP webhook handler so both the ingress logic and the 523// translation logic live next to each other. 524// 525// Returns nil for events we intentionally ignore (job.* events, 526// build.scheduled which we already publish locally on Spawn, builds 527// we don't have a mapping for) so the handler can 200 them — webhook 528// retries from Buildkite on a 4xx/5xx are noisy and not what we want 529// for "we just don't care about this event". 530func (p *buildkiteProvider) HandleWebhook( 531 ctx context.Context, 532 payload buildkite.WebhookPayload, 533) error { 534 // Only build.* events drive pipeline.status today. Everything 535 // else (job.*, agent.*, ping) is acknowledged silently. 536 if !strings.HasPrefix(payload.Event, "build.") { 537 return nil 538 } 539 540 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID) 541 if err != nil { 542 return fmt.Errorf("lookup build by uuid: %w", err) 543 } 544 if ref == nil { 545 // Most likely: this build was triggered outside tack and 546 // just happens to share our webhook URL. Nothing to do. 547 p.log.Debug("webhook for unknown build; ignoring", 548 "event", payload.Event, 549 "build_uuid", payload.Build.ID, 550 ) 551 return nil 552 } 553 554 status, ok := mapBuildkiteState(payload.Build.State) 555 if !ok { 556 // Unknown / transient state ("blocked", "skipped", 557 // "not_run", "waiting"…) — log so we can extend the map 558 // later, but don't error out the webhook. 559 p.log.Debug("unmapped buildkite state; ignoring", 560 "event", payload.Event, 561 "state", payload.Build.State, 562 "build_uuid", payload.Build.ID, 563 ) 564 return nil 565 } 566 567 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 568 status, ref.BuildUUID, nil, nil); err != nil { 569 return fmt.Errorf("publish webhook status: %w", err) 570 } 571 p.log.Info("buildkite webhook → pipeline.status", 572 "event", payload.Event, 573 "state", payload.Build.State, 574 "status", status, 575 "build_uuid", payload.Build.ID, 576 "workflow", ref.Workflow, 577 ) 578 return nil 579} 580 581// mapBuildkiteState translates Buildkite's build state strings into 582// the Tangled spindle StatusKind enum. The mapping aligns with the 583// upstream constants (StatusKindRunning/Failed/Cancelled/Success); 584// states that don't have a direct analogue (blocked, skipped, 585// not_run) are reported as not-mapped so the caller can decide 586// whether to ignore them. 587func mapBuildkiteState(state string) (string, bool) { 588 switch state { 589 case "scheduled": 590 return "pending", true 591 case "running", "failing": 592 return "running", true 593 case "passed": 594 return "success", true 595 case "failed": 596 return "failed", true 597 case "canceled", "canceling": 598 return "cancelled", true 599 default: 600 return "", false 601 } 602} 603 604// envFromTuple builds the env block forwarded into the Buildkite 605// build. These are the only handle a user's Buildkite pipeline has 606// on the originating Tangled trigger: their pipeline.yml typically 607// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a 608// `pipeline upload` against a workflow-specific YAML file). 609// 610// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as 611// captured in the Tangled record. It can be empty if the workflow 612// definition omitted it; consumers should defend. 613func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string { 614 return map[string]string{ 615 "TACK_KNOT": knot, 616 "TACK_PIPELINE_RKEY": pipelineRkey, 617 "TACK_WORKFLOW": wf.Name, 618 "TACK_WORKFLOW_RAW": wf.Raw, 619 } 620} 621 622// pipelineATURI returns the at-uri the appview joins pipeline.status 623// records back to their originating pipeline on. Format mirrors the 624// upstream spindle; the appview strips the `did:web:` prefix and 625// treats the remainder as the knot identifier. 626func pipelineATURI(knot, pipelineRkey string) string { 627 return fmt.Sprintf("at://did:web:%s/%s/%s", 628 knot, tangled.PipelineNSID, pipelineRkey, 629 ) 630} 631 632// triggerCommitAndBranch extracts (commit, branch) from a Tangled 633// pipeline trigger, regardless of whether it was a push, a pull 634// request, or a manual run. Returns empty strings on a fully-empty 635// trigger so the caller can decide whether that's fatal. 636func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) { 637 if trigger == nil { 638 return "", "" 639 } 640 switch { 641 case trigger.Push != nil: 642 // For push events, NewSha is the commit being built and 643 // Ref is the full ref (e.g. "refs/heads/main") — strip 644 // the prefix so Buildkite's branch-aware features work. 645 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref) 646 case trigger.PullRequest != nil: 647 // PRs build the source commit on the source branch. 648 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch 649 default: 650 // Manual triggers and any future kinds: fall back to the 651 // repo default branch with no commit, which the caller 652 // will treat as fatal — manual triggers will need 653 // additional plumbing to pick a commit. 654 if trigger.Repo != nil { 655 return "", trigger.Repo.DefaultBranch 656 } 657 return "", "" 658 } 659} 660 661// refToBranch strips the conventional refs/heads/ prefix from a git 662// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are 663// returned as-is so downstream tooling can decide what to do with 664// them — Buildkite happily accepts either form in `branch`. 665func refToBranch(ref string) string { 666 const prefix = "refs/heads/" 667 if strings.HasPrefix(ref, prefix) { 668 return strings.TrimPrefix(ref, prefix) 669 } 670 return ref 671} 672 673// stripTerminal removes ANSI/ECMA-48 escape sequences from a log 674// payload, leaving only the displayable text. We need this because 675// Buildkite ships its plain-text log API with the agent's full 676// terminal output — per-line timestamp APC envelopes 677// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL 678// (`ESC [ K`), OSC title sets, etc. — and our consumers are not 679// terminal emulators; they render the bytes verbatim. 680// 681// We recognise the standard escape families described by ECMA-48: 682// 683// - CSI: ESC '[' parameters intermediates final 684// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\') 685// - everything else: ESC <single byte> 686// 687// As a safety belt we also strip the bare "_bk;t=<digits>" residue 688// that appears when an upstream processor has stripped the ESC/BEL 689// framing without understanding the APC envelope inside it. 690// 691// We should use libghostty for obvious reasons. 692func stripTerminal(s string) string { 693 if !strings.ContainsAny(s, "\x1b_") { 694 return s 695 } 696 var b strings.Builder 697 b.Grow(len(s)) 698 for i := 0; i < len(s); { 699 if s[i] == 0x1b && i+1 < len(s) { 700 switch s[i+1] { 701 case '[': 702 // CSI: parameter bytes 0x30-0x3F, then 703 // intermediate bytes 0x20-0x2F, then a 704 // single final byte 0x40-0x7E. Anything 705 // that doesn't conform we drop minimally 706 // (just the ESC) so we don't swallow 707 // legitimate text. 708 j := i + 2 709 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F { 710 j++ 711 } 712 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F { 713 j++ 714 } 715 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E { 716 i = j + 1 717 continue 718 } 719 i += 2 720 continue 721 case ']', 'P', '_', 'X', '^': 722 // OSC/DCS/APC/SOS/PM: terminated by BEL or 723 // ST (ESC '\'). Drop the entire envelope. 724 j := i + 2 725 for j < len(s) { 726 if s[j] == 0x07 { 727 j++ 728 break 729 } 730 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' { 731 j += 2 732 break 733 } 734 j++ 735 } 736 i = j 737 continue 738 default: 739 // Two-byte escape (RIS, DECSC, charset 740 // selection, …). Drop both bytes. 741 i += 2 742 continue 743 } 744 } 745 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing. 746 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") { 747 j := i + len("_bk;t=") 748 for j < len(s) && s[j] >= '0' && s[j] <= '9' { 749 j++ 750 } 751 if j > i+len("_bk;t=") { 752 i = j 753 continue 754 } 755 } 756 b.WriteByte(s[i]) 757 i++ 758 } 759 return b.String() 760} 761 762// sendLine pushes one LogLine into out, returning false if ctx 763// fired first. Centralised so the per-job loop in Logs stays 764// focused on the wire-shape decisions. 765func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool { 766 select { 767 case <-ctx.Done(): 768 return false 769 case out <- line: 770 return true 771 } 772}