Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// buildkiteProvider implements Provider against a real Buildkite 4// account. Spawn translates a Tangled pipeline trigger into one 5// Buildkite build per workflow; status updates flow back asynchronously 6// through the /webhooks/buildkite handler (see http.go), which looks 7// the build UUID up in the buildkite_builds table to recover the 8// (knot, pipelineRkey, workflow) tuple this provider persisted at 9// Spawn time and publishes a sh.tangled.pipeline.status record on 10// the in-process broker. 11// 12// The Buildkite *pipeline slug* a workflow targets is carried inside 13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured 14// globally on the spindle. That keeps tack a thin translator: the 15// repo author decides which Buildkite pipeline runs each Tangled 16// workflow without an operator round-trip. See workflowConfig below 17// for the supported YAML schema. 18 19import ( 20 "context" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "log/slog" 25 "net/http" 26 "strings" 27 "time" 28 29 "go.yaml.in/yaml/v2" 30 "tangled.org/core/api/tangled" 31 32 "go.mitchellh.com/tack/internal/buildkite" 33) 34 35// Buildkite-side meta_data keys carrying the Tangled identity of a 36// build. Mirrored into env vars (see envFromTuple) so an operator's 37// Buildkite pipeline script can also reach them via $TACK_*. They 38// stay tightly namespaced so a coexisting Buildkite job that uses 39// meta_data for its own purposes won't collide. 40const ( 41 bkMetaKnot = "tack:knot" 42 bkMetaPipelineRkey = "tack:pipeline_rkey" 43 bkMetaWorkflow = "tack:workflow" 44) 45 46// workflowConfig is the tack-flavoured schema we expect inside each 47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline` 48// slug is required; everything else is optional. Fields nest under 49// `tack: { buildkite: ... }` so the workflow YAML can grow other 50// top-level keys (Tangled's own scheduling fields, future provider 51// blocks) without colliding with our namespace. 52// 53// Fields map onto the Buildkite REST "Create a build" request 54// properties documented at 55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build 56// (see also the comment block on buildkite.CreateBuildRequest). We 57// expose only the small subset users genuinely need to override — 58// trigger metadata supplies commit/branch, and tack supplies the 59// identity env+meta the webhook handler relies on, so there's no 60// reason to let users re-specify those. 61type workflowConfig struct { 62 Tack tackConfig `yaml:"tack"` 63} 64 65// tackConfig is the per-provider block under the top-level `tack:` 66// key. Right now the only nested provider is Buildkite. 67type tackConfig struct { 68 Buildkite buildkiteConfig `yaml:"buildkite"` 69} 70 71// buildkiteConfig is the Buildkite-specific subset of workflowConfig. 72// 73// `org` lets a workflow target a Buildkite organisation other than 74// the spindle's default — useful when one tack instance fronts 75// multiple orgs. The configured API token must have access to that 76// org or the build creation request will 401/403; we surface that 77// error verbatim rather than guessing. 78// 79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout 80// is a *bool so omitting it leaves Buildkite's own default in place 81// instead of always shipping `false`. 82type buildkiteConfig struct { 83 Pipeline string `yaml:"pipeline"` 84 Org string `yaml:"org"` 85 CleanCheckout *bool `yaml:"clean_checkout"` 86} 87 88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig. 89// An empty body is treated as a structural error so spawnWorkflow can 90// short-circuit cleanly: a workflow with no body has nothing for tack 91// to do anyway. 92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) { 93 if strings.TrimSpace(raw) == "" { 94 return nil, errors.New("workflow body is empty") 95 } 96 var cfg workflowConfig 97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil { 98 return nil, fmt.Errorf("parse workflow yaml: %w", err) 99 } 100 bk := cfg.Tack.Buildkite 101 if bk.Pipeline == "" { 102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required") 103 } 104 return &bk, nil 105} 106 107// buildkiteProvider implements Provider. 108// 109// webhookSecret + webhookMode live on the provider rather than on 110// the HTTP server because the provider is the single owner of 111// "everything Buildkite-y": colocating the auth knob with the API 112// client and the state translator keeps configuration drift to one 113// place and makes the http.go side pure transport. 114// 115// defaultOrg is the Buildkite organisation the configured API token 116// belongs to. Workflows may opt into a different org via their YAML 117// `org` field; the API token then needs to be authorised against it. 118type buildkiteProvider struct { 119 br *broker 120 st *store 121 log *slog.Logger 122 client *buildkite.Client 123 defaultOrg string 124 webhookSecret string 125 webhookMode buildkite.WebhookMode 126} 127 128// Compile-time interface conformance check. 129var _ Provider = (*buildkiteProvider)(nil) 130 131// newBuildkiteProvider wires a provider to its Buildkite client and 132// to the broker it publishes pipeline.status records on. defaultOrg 133// is the org the API token authenticates against and the org used 134// when a workflow doesn't specify its own. webhookSecret/webhookMode 135// govern inbound /webhooks/buildkite request authentication. 136func newBuildkiteProvider( 137 br *broker, 138 st *store, 139 client *buildkite.Client, 140 defaultOrg string, 141 webhookSecret string, 142 webhookMode buildkite.WebhookMode, 143 log *slog.Logger, 144) *buildkiteProvider { 145 return &buildkiteProvider{ 146 br: br, 147 st: st, 148 log: log.With("component", "provider", "kind", "buildkite"), 149 client: client, 150 defaultOrg: defaultOrg, 151 webhookSecret: webhookSecret, 152 webhookMode: webhookMode, 153 } 154} 155 156// VerifyWebhook authenticates an inbound webhook request using 157// whichever mode the provider was configured with. Returns nil on 158// success; the HTTP handler maps any returned error to 401. 159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error { 160 switch p.webhookMode { 161 case buildkite.WebhookModeSignature: 162 return buildkite.VerifySignature( 163 headers.Get("X-Buildkite-Signature"), 164 p.webhookSecret, body, 165 ) 166 default: 167 // Token mode is the Buildkite default and our default, so 168 // any unrecognised value falls through to it rather than 169 // fail-closed at startup. 170 return buildkite.VerifyToken( 171 headers.Get("X-Buildkite-Token"), 172 p.webhookSecret, 173 ) 174 } 175} 176 177// Spawn satisfies Provider. For each workflow it fires a separate 178// Buildkite build off the pipeline named in that workflow's YAML so 179// each workflow gets its own status timeline. The actual API call 180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we 181// still want Spawn to be non-blocking per the interface contract. 182// 183// On a successful create we persist the build UUID → (knot, rkey, 184// workflow) mapping and publish a "pending" pipeline.status so the 185// appview sees activity immediately, instead of waiting for the 186// first webhook to land. 187func (p *buildkiteProvider) Spawn( 188 ctx context.Context, 189 knot string, 190 pipelineRkey string, 191 trigger *tangled.Pipeline_TriggerMetadata, 192 workflows []*tangled.Pipeline_Workflow, 193) { 194 if len(workflows) == 0 { 195 p.log.Warn("pipeline has no workflows; nothing to spawn", 196 "knot", knot, "rkey", pipelineRkey, 197 ) 198 return 199 } 200 201 for _, wf := range workflows { 202 if wf == nil || wf.Name == "" { 203 continue 204 } 205 wf := wf 206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf) 207 } 208} 209 210// spawnWorkflow does the per-workflow API + persistence work for 211// Spawn. Errors are logged with full context but not returned — 212// nothing in tack consumes the result, and a failed Spawn just 213// surfaces as the absence of any status update for the affected 214// workflow. 215func (p *buildkiteProvider) spawnWorkflow( 216 ctx context.Context, 217 knot string, 218 pipelineRkey string, 219 trigger *tangled.Pipeline_TriggerMetadata, 220 wf *tangled.Pipeline_Workflow, 221) { 222 logger := p.log.With( 223 "knot", knot, 224 "pipeline_rkey", pipelineRkey, 225 "workflow", wf.Name, 226 ) 227 228 cfg, err := parseWorkflowConfig(wf.Raw) 229 if err != nil { 230 // Bad workflow YAML is a user-facing config error: log it 231 // loudly and skip. Firing a build off some default would 232 // be more confusing than doing nothing. 233 logger.Error("invalid workflow config; refusing to spawn", "err", err) 234 return 235 } 236 logger = logger.With("pipeline", cfg.Pipeline) 237 238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf) 239 if err != nil { 240 logger.Error("build create request", "err", err) 241 return 242 } 243 244 org := cfg.Org 245 if org == "" { 246 org = p.defaultOrg 247 } 248 249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req) 250 if err != nil { 251 logger.Error("create buildkite build", "err", err, "org", org) 252 return 253 } 254 logger.Info("buildkite build created", 255 "build_uuid", build.ID, 256 "build_number", build.Number, 257 "web_url", build.WebURL, 258 "org", org, 259 ) 260 261 pipelineURI := pipelineATURI(knot, pipelineRkey) 262 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{ 263 BuildUUID: build.ID, 264 BuildNumber: build.Number, 265 PipelineSlug: cfg.Pipeline, 266 Knot: knot, 267 PipelineRkey: pipelineRkey, 268 Workflow: wf.Name, 269 PipelineURI: pipelineURI, 270 }); err != nil { 271 // Webhook handlers will fail to translate this build's 272 // events because they can't recover the tuple. Surface 273 // loudly and bail; we don't want a half-tracked build 274 // silently leaking status into the broker. 275 logger.Error("persist buildkite build mapping", "err", err, 276 "build_uuid", build.ID, 277 ) 278 return 279 } 280 281 // Initial status publish so the appview shows the build as 282 // queued without waiting for the first webhook. This mirrors 283 // the upstream spindle's "schedule then run" cadence. 284 if err := p.publishStatus( 285 ctx, pipelineURI, wf.Name, "pending", build.ID, 286 nil, nil, 287 ); err != nil { 288 logger.Error("publish initial pending status", "err", err) 289 } 290} 291 292// buildCreateRequest folds the parsed workflow config and the 293// Tangled trigger metadata into a single Buildkite create-build 294// payload. Trigger metadata supplies commit/branch; the workflow 295// YAML supplies the Buildkite routing knobs (pipeline/org) and the 296// small handful of build options we expose. 297// 298// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled 299// refs frequently don't match arbitrary Buildkite pipeline branch 300// filters, and a build silently dropped at create time is a worse 301// failure mode than running one we shouldn't have. Users wanting 302// the filter back are expected to drop the filter on the Buildkite 303// pipeline itself. 304// 305// Returns an error when the trigger lacks a commit — Buildkite's 306// API requires one and we'd rather log+skip than fire a build that 307// resolves to "whatever main happens to be". 308func (p *buildkiteProvider) buildCreateRequest( 309 cfg *buildkiteConfig, 310 trigger *tangled.Pipeline_TriggerMetadata, 311 knot, pipelineRkey string, 312 wf *tangled.Pipeline_Workflow, 313) (buildkite.CreateBuildRequest, error) { 314 commit, branch := triggerCommitAndBranch(trigger) 315 if commit == "" { 316 return buildkite.CreateBuildRequest{}, errors.New( 317 "trigger has no commit", 318 ) 319 } 320 321 cleanCheckout := false 322 if cfg.CleanCheckout != nil { 323 cleanCheckout = *cfg.CleanCheckout 324 } 325 326 req := buildkite.CreateBuildRequest{ 327 Commit: commit, 328 Branch: branch, 329 Message: fmt.Sprintf("tangled: %s", wf.Name), 330 Env: envFromTuple(knot, pipelineRkey, wf), 331 MetaData: map[string]string{ 332 bkMetaKnot: knot, 333 bkMetaPipelineRkey: pipelineRkey, 334 bkMetaWorkflow: wf.Name, 335 }, 336 CleanCheckout: cleanCheckout, 337 IgnorePipelineBranchFilters: true, 338 } 339 340 // Auto-populate Buildkite's PR fields from the Tangled PR 341 // trigger when present. Buildkite doesn't get a PR number from 342 // us (Tangled doesn't surface one through the trigger), but 343 // the base branch alone is enough for `pull_request_base_branch`- 344 // gated step filters to work. 345 if trigger != nil && trigger.PullRequest != nil { 346 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch 347 } 348 349 return req, nil 350} 351 352// Logs satisfies Provider. We resolve the (knot, rkey, workflow) 353// tuple to a Buildkite build via the store, fetch the current jobs 354// list, then drain each job's plain-text log into the channel as one 355// LogLine per output line. 356// 357// Per-job control frames bracket each job so the appview's renderer 358// has start/end markers to lay out timing — same shape as the fake 359// provider and the upstream spindle. 360// 361// This is a snapshot read, not a tail — finished or in-progress, we 362// fetch what's there and close. Live tailing would require Buildkite 363// agent log streaming, which the public REST API doesn't expose; the 364// appview's repeated /logs calls during a running build give us 365// "good enough" liveness without that complexity. 366func (p *buildkiteProvider) Logs( 367 ctx context.Context, 368 knot string, 369 pipelineRkey string, 370 workflow string, 371) (<-chan LogLine, error) { 372 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow) 373 if err != nil { 374 return nil, fmt.Errorf("lookup build mapping: %w", err) 375 } 376 if ref == nil { 377 return nil, ErrLogsNotFound 378 } 379 380 // Resolve the org against which we should pull jobs/logs. We 381 // don't persist it on BuildkiteBuildRef today (the slug + token 382 // have always been enough); fall back to the provider default, 383 // which is correct for the common single-org install. A 384 // future migration can add a column when multi-org installs 385 // need it. 386 org := p.defaultOrg 387 388 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber) 389 if err != nil { 390 if errors.Is(err, buildkite.ErrNotFound) { 391 return nil, ErrLogsNotFound 392 } 393 return nil, fmt.Errorf("get build: %w", err) 394 } 395 396 out := make(chan LogLine, 32) 397 go func() { 398 defer close(out) 399 stepID := 0 400 for _, job := range build.Jobs { 401 if job.Type != "" && job.Type != "script" { 402 // Skip non-script jobs (waiter, manual, 403 // trigger). They have no log to fetch and 404 // surfacing empty steps just clutters the 405 // appview. 406 continue 407 } 408 name := job.Name 409 if name == "" { 410 name = fmt.Sprintf("job %s", job.ID) 411 } 412 413 if !sendLine(ctx, out, LogLine{ 414 Kind: LogKindControl, 415 Time: time.Now(), 416 Content: name, 417 StepId: stepID, 418 StepStatus: StepStatusStart, 419 }) { 420 return 421 } 422 423 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID) 424 if err != nil { 425 p.log.Debug("fetch job log", 426 "err", err, 427 "build_uuid", ref.BuildUUID, 428 "job_id", job.ID, 429 ) 430 // Don't fail the whole stream on one job; 431 // emit the end frame and move on so the 432 // appview at least sees what other jobs 433 // produced. 434 body = "" 435 } 436 437 // Buildkite injects per-line timestamp metadata as 438 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and 439 // some renderers downstream don't recognise the APC 440 // envelope, leaking the inner "_bk;t=…" payload into 441 // the displayed text. Strip them here so consumers 442 // only ever see the actual log content. 443 body = stripTerminal(body) 444 445 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 446 if line == "" { 447 // Skip the leading empty entry that 448 // Split produces for empty bodies. 449 continue 450 } 451 if !sendLine(ctx, out, LogLine{ 452 Kind: LogKindData, 453 Time: time.Now(), 454 Content: line + "\n", 455 StepId: stepID, 456 Stream: "stdout", 457 }) { 458 return 459 } 460 } 461 462 if !sendLine(ctx, out, LogLine{ 463 Kind: LogKindControl, 464 Time: time.Now(), 465 Content: name, 466 StepId: stepID, 467 StepStatus: StepStatusEnd, 468 }) { 469 return 470 } 471 stepID++ 472 } 473 }() 474 return out, nil 475} 476 477// publishStatus assembles a tangled.PipelineStatus record and pushes 478// it through the broker. buildUUID is mixed into the rkey so multiple 479// status events for the same workflow don't collide on the events 480// table's (rkey) uniqueness — and so an operator grepping the log 481// can find every record that pertains to a given Buildkite build. 482// 483// errMsg/exitCode are optional; pass nil for non-failure transitions. 484func (p *buildkiteProvider) publishStatus( 485 ctx context.Context, 486 pipelineURI, workflow, status, buildUUID string, 487 errMsg *string, 488 exitCode *int64, 489) error { 490 rec := tangled.PipelineStatus{ 491 LexiconTypeID: tangled.PipelineStatusNSID, 492 Pipeline: pipelineURI, 493 Workflow: workflow, 494 Status: status, 495 CreatedAt: time.Now().UTC().Format(time.RFC3339), 496 Error: errMsg, 497 ExitCode: exitCode, 498 } 499 body, err := json.Marshal(rec) 500 if err != nil { 501 return fmt.Errorf("marshal pipeline.status: %w", err) 502 } 503 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano()) 504 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 505 return fmt.Errorf("publish pipeline.status: %w", err) 506 } 507 return nil 508} 509 510// HandleWebhook applies a decoded Buildkite webhook payload: looks 511// the build up in the store, translates the Buildkite state into a 512// Tangled StatusKind, and publishes a pipeline.status record. Used 513// by the HTTP webhook handler so both the ingress logic and the 514// translation logic live next to each other. 515// 516// Returns nil for events we intentionally ignore (job.* events, 517// build.scheduled which we already publish locally on Spawn, builds 518// we don't have a mapping for) so the handler can 200 them — webhook 519// retries from Buildkite on a 4xx/5xx are noisy and not what we want 520// for "we just don't care about this event". 521func (p *buildkiteProvider) HandleWebhook( 522 ctx context.Context, 523 payload buildkite.WebhookPayload, 524) error { 525 // Only build.* events drive pipeline.status today. Everything 526 // else (job.*, agent.*, ping) is acknowledged silently. 527 if !strings.HasPrefix(payload.Event, "build.") { 528 return nil 529 } 530 531 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID) 532 if err != nil { 533 return fmt.Errorf("lookup build by uuid: %w", err) 534 } 535 if ref == nil { 536 // Most likely: this build was triggered outside tack and 537 // just happens to share our webhook URL. Nothing to do. 538 p.log.Debug("webhook for unknown build; ignoring", 539 "event", payload.Event, 540 "build_uuid", payload.Build.ID, 541 ) 542 return nil 543 } 544 545 status, ok := mapBuildkiteState(payload.Build.State) 546 if !ok { 547 // Unknown / transient state ("blocked", "skipped", 548 // "not_run", "waiting"…) — log so we can extend the map 549 // later, but don't error out the webhook. 550 p.log.Debug("unmapped buildkite state; ignoring", 551 "event", payload.Event, 552 "state", payload.Build.State, 553 "build_uuid", payload.Build.ID, 554 ) 555 return nil 556 } 557 558 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 559 status, ref.BuildUUID, nil, nil); err != nil { 560 return fmt.Errorf("publish webhook status: %w", err) 561 } 562 p.log.Info("buildkite webhook → pipeline.status", 563 "event", payload.Event, 564 "state", payload.Build.State, 565 "status", status, 566 "build_uuid", payload.Build.ID, 567 "workflow", ref.Workflow, 568 ) 569 return nil 570} 571 572// mapBuildkiteState translates Buildkite's build state strings into 573// the Tangled spindle StatusKind enum. The mapping aligns with the 574// upstream constants (StatusKindRunning/Failed/Cancelled/Success); 575// states that don't have a direct analogue (blocked, skipped, 576// not_run) are reported as not-mapped so the caller can decide 577// whether to ignore them. 578func mapBuildkiteState(state string) (string, bool) { 579 switch state { 580 case "scheduled": 581 return "pending", true 582 case "running", "failing": 583 return "running", true 584 case "passed": 585 return "success", true 586 case "failed": 587 return "failed", true 588 case "canceled", "canceling": 589 return "cancelled", true 590 default: 591 return "", false 592 } 593} 594 595// envFromTuple builds the env block forwarded into the Buildkite 596// build. These are the only handle a user's Buildkite pipeline has 597// on the originating Tangled trigger: their pipeline.yml typically 598// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a 599// `pipeline upload` against a workflow-specific YAML file). 600// 601// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as 602// captured in the Tangled record. It can be empty if the workflow 603// definition omitted it; consumers should defend. 604func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string { 605 return map[string]string{ 606 "TACK_KNOT": knot, 607 "TACK_PIPELINE_RKEY": pipelineRkey, 608 "TACK_WORKFLOW": wf.Name, 609 "TACK_WORKFLOW_RAW": wf.Raw, 610 } 611} 612 613// pipelineATURI returns the at-uri the appview joins pipeline.status 614// records back to their originating pipeline on. Format mirrors the 615// upstream spindle; the appview strips the `did:web:` prefix and 616// treats the remainder as the knot identifier. 617func pipelineATURI(knot, pipelineRkey string) string { 618 return fmt.Sprintf("at://did:web:%s/%s/%s", 619 knot, tangled.PipelineNSID, pipelineRkey, 620 ) 621} 622 623// triggerCommitAndBranch extracts (commit, branch) from a Tangled 624// pipeline trigger, regardless of whether it was a push, a pull 625// request, or a manual run. Returns empty strings on a fully-empty 626// trigger so the caller can decide whether that's fatal. 627func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) { 628 if trigger == nil { 629 return "", "" 630 } 631 switch { 632 case trigger.Push != nil: 633 // For push events, NewSha is the commit being built and 634 // Ref is the full ref (e.g. "refs/heads/main") — strip 635 // the prefix so Buildkite's branch-aware features work. 636 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref) 637 case trigger.PullRequest != nil: 638 // PRs build the source commit on the source branch. 639 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch 640 default: 641 // Manual triggers and any future kinds: fall back to the 642 // repo default branch with no commit, which the caller 643 // will treat as fatal — manual triggers will need 644 // additional plumbing to pick a commit. 645 if trigger.Repo != nil { 646 return "", trigger.Repo.DefaultBranch 647 } 648 return "", "" 649 } 650} 651 652// refToBranch strips the conventional refs/heads/ prefix from a git 653// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are 654// returned as-is so downstream tooling can decide what to do with 655// them — Buildkite happily accepts either form in `branch`. 656func refToBranch(ref string) string { 657 const prefix = "refs/heads/" 658 if strings.HasPrefix(ref, prefix) { 659 return strings.TrimPrefix(ref, prefix) 660 } 661 return ref 662} 663 664// stripTerminal removes ANSI/ECMA-48 escape sequences from a log 665// payload, leaving only the displayable text. We need this because 666// Buildkite ships its plain-text log API with the agent's full 667// terminal output — per-line timestamp APC envelopes 668// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL 669// (`ESC [ K`), OSC title sets, etc. — and our consumers are not 670// terminal emulators; they render the bytes verbatim. 671// 672// We recognise the standard escape families described by ECMA-48: 673// 674// - CSI: ESC '[' parameters intermediates final 675// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\') 676// - everything else: ESC <single byte> 677// 678// As a safety belt we also strip the bare "_bk;t=<digits>" residue 679// that appears when an upstream processor has stripped the ESC/BEL 680// framing without understanding the APC envelope inside it. 681// 682// We should use libghostty for obvious reasons. 683func stripTerminal(s string) string { 684 if !strings.ContainsAny(s, "\x1b_") { 685 return s 686 } 687 var b strings.Builder 688 b.Grow(len(s)) 689 for i := 0; i < len(s); { 690 if s[i] == 0x1b && i+1 < len(s) { 691 switch s[i+1] { 692 case '[': 693 // CSI: parameter bytes 0x30-0x3F, then 694 // intermediate bytes 0x20-0x2F, then a 695 // single final byte 0x40-0x7E. Anything 696 // that doesn't conform we drop minimally 697 // (just the ESC) so we don't swallow 698 // legitimate text. 699 j := i + 2 700 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F { 701 j++ 702 } 703 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F { 704 j++ 705 } 706 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E { 707 i = j + 1 708 continue 709 } 710 i += 2 711 continue 712 case ']', 'P', '_', 'X', '^': 713 // OSC/DCS/APC/SOS/PM: terminated by BEL or 714 // ST (ESC '\'). Drop the entire envelope. 715 j := i + 2 716 for j < len(s) { 717 if s[j] == 0x07 { 718 j++ 719 break 720 } 721 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' { 722 j += 2 723 break 724 } 725 j++ 726 } 727 i = j 728 continue 729 default: 730 // Two-byte escape (RIS, DECSC, charset 731 // selection, …). Drop both bytes. 732 i += 2 733 continue 734 } 735 } 736 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing. 737 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") { 738 j := i + len("_bk;t=") 739 for j < len(s) && s[j] >= '0' && s[j] <= '9' { 740 j++ 741 } 742 if j > i+len("_bk;t=") { 743 i = j 744 continue 745 } 746 } 747 b.WriteByte(s[i]) 748 i++ 749 } 750 return b.String() 751} 752 753// sendLine pushes one LogLine into out, returning false if ctx 754// fired first. Centralised so the per-job loop in Logs stays 755// focused on the wire-shape decisions. 756func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool { 757 select { 758 case <-ctx.Done(): 759 return false 760 case out <- line: 761 return true 762 } 763}