Stitch any CI into Tangled
1package main
2
3// buildkiteProvider implements Provider against a real Buildkite
4// account. Spawn translates a Tangled pipeline trigger into one
5// Buildkite build per workflow; status updates flow back asynchronously
6// through the /webhooks/buildkite handler (see http.go), which looks
7// the build UUID up in the buildkite_builds table to recover the
8// (knot, pipelineRkey, workflow) tuple this provider persisted at
9// Spawn time and publishes a sh.tangled.pipeline.status record on
10// the in-process broker.
11//
12// The Buildkite *pipeline slug* a workflow targets is carried inside
13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured
14// globally on the spindle. That keeps tack a thin translator: the
15// repo author decides which Buildkite pipeline runs each Tangled
16// workflow without an operator round-trip. See workflowConfig below
17// for the supported YAML schema.
18
19import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "log/slog"
25 "net/http"
26 "strings"
27 "time"
28
29 "go.yaml.in/yaml/v2"
30 "tangled.org/core/api/tangled"
31
32 "go.mitchellh.com/tack/internal/buildkite"
33)
34
35// Buildkite-side meta_data keys carrying the Tangled identity of a
36// build. Mirrored into env vars (see envFromTuple) so an operator's
37// Buildkite pipeline script can also reach them via $TACK_*. They
38// stay tightly namespaced so a coexisting Buildkite job that uses
39// meta_data for its own purposes won't collide.
40const (
41 bkMetaKnot = "tack:knot"
42 bkMetaPipelineRkey = "tack:pipeline_rkey"
43 bkMetaWorkflow = "tack:workflow"
44)
45
46// workflowConfig is the tack-flavoured schema we expect inside each
47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline`
48// slug is required; everything else is optional. Fields nest under
49// `tack: { buildkite: ... }` so the workflow YAML can grow other
50// top-level keys (Tangled's own scheduling fields, future provider
51// blocks) without colliding with our namespace.
52//
53// Fields map onto the Buildkite REST "Create a build" request
54// properties documented at
55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build
56// (see also the comment block on buildkite.CreateBuildRequest). We
57// expose only the small subset users genuinely need to override —
58// trigger metadata supplies commit/branch, and tack supplies the
59// identity env+meta the webhook handler relies on, so there's no
60// reason to let users re-specify those.
61type workflowConfig struct {
62 Tack tackConfig `yaml:"tack"`
63}
64
65// tackConfig is the per-provider block under the top-level `tack:`
66// key. Right now the only nested provider is Buildkite.
67type tackConfig struct {
68 Buildkite buildkiteConfig `yaml:"buildkite"`
69}
70
71// buildkiteConfig is the Buildkite-specific subset of workflowConfig.
72//
73// `org` lets a workflow target a Buildkite organisation other than
74// the spindle's default — useful when one tack instance fronts
75// multiple orgs. The configured API token must have access to that
76// org or the build creation request will 401/403; we surface that
77// error verbatim rather than guessing.
78//
79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout
80// is a *bool so omitting it leaves Buildkite's own default in place
81// instead of always shipping `false`.
82type buildkiteConfig struct {
83 Pipeline string `yaml:"pipeline"`
84 Org string `yaml:"org"`
85 CleanCheckout *bool `yaml:"clean_checkout"`
86}
87
88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig.
89// An empty body is treated as a structural error so spawnWorkflow can
90// short-circuit cleanly: a workflow with no body has nothing for tack
91// to do anyway.
92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) {
93 if strings.TrimSpace(raw) == "" {
94 return nil, errors.New("workflow body is empty")
95 }
96 var cfg workflowConfig
97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil {
98 return nil, fmt.Errorf("parse workflow yaml: %w", err)
99 }
100 bk := cfg.Tack.Buildkite
101 if bk.Pipeline == "" {
102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required")
103 }
104 return &bk, nil
105}
106
107// buildkiteProvider implements Provider.
108//
109// webhookSecret + webhookMode live on the provider rather than on
110// the HTTP server because the provider is the single owner of
111// "everything Buildkite-y": colocating the auth knob with the API
112// client and the state translator keeps configuration drift to one
113// place and makes the http.go side pure transport.
114//
115// defaultOrg is the Buildkite organisation the configured API token
116// belongs to. Workflows may opt into a different org via their YAML
117// `org` field; the API token then needs to be authorised against it.
118type buildkiteProvider struct {
119 br *broker
120 st *store
121 log *slog.Logger
122 client *buildkite.Client
123 defaultOrg string
124 webhookSecret string
125 webhookMode buildkite.WebhookMode
126}
127
128// Compile-time interface conformance check.
129var _ Provider = (*buildkiteProvider)(nil)
130
131// newBuildkiteProvider wires a provider to its Buildkite client and
132// to the broker it publishes pipeline.status records on. defaultOrg
133// is the org the API token authenticates against and the org used
134// when a workflow doesn't specify its own. webhookSecret/webhookMode
135// govern inbound /webhooks/buildkite request authentication.
136func newBuildkiteProvider(
137 br *broker,
138 st *store,
139 client *buildkite.Client,
140 defaultOrg string,
141 webhookSecret string,
142 webhookMode buildkite.WebhookMode,
143 log *slog.Logger,
144) *buildkiteProvider {
145 return &buildkiteProvider{
146 br: br,
147 st: st,
148 log: log.With("component", "provider", "kind", "buildkite"),
149 client: client,
150 defaultOrg: defaultOrg,
151 webhookSecret: webhookSecret,
152 webhookMode: webhookMode,
153 }
154}
155
156// VerifyWebhook authenticates an inbound webhook request using
157// whichever mode the provider was configured with. Returns nil on
158// success; the HTTP handler maps any returned error to 401.
159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error {
160 switch p.webhookMode {
161 case buildkite.WebhookModeSignature:
162 return buildkite.VerifySignature(
163 headers.Get("X-Buildkite-Signature"),
164 p.webhookSecret, body,
165 )
166 default:
167 // Token mode is the Buildkite default and our default, so
168 // any unrecognised value falls through to it rather than
169 // fail-closed at startup.
170 return buildkite.VerifyToken(
171 headers.Get("X-Buildkite-Token"),
172 p.webhookSecret,
173 )
174 }
175}
176
177// Spawn satisfies Provider. For each workflow it fires a separate
178// Buildkite build off the pipeline named in that workflow's YAML so
179// each workflow gets its own status timeline. The actual API call
180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we
181// still want Spawn to be non-blocking per the interface contract.
182//
183// On a successful create we persist the build UUID → (knot, rkey,
184// workflow) mapping and publish a "pending" pipeline.status so the
185// appview sees activity immediately, instead of waiting for the
186// first webhook to land.
187func (p *buildkiteProvider) Spawn(
188 ctx context.Context,
189 knot string,
190 pipelineRkey string,
191 trigger *tangled.Pipeline_TriggerMetadata,
192 workflows []*tangled.Pipeline_Workflow,
193) {
194 if len(workflows) == 0 {
195 p.log.Warn("pipeline has no workflows; nothing to spawn",
196 "knot", knot, "rkey", pipelineRkey,
197 )
198 return
199 }
200
201 for _, wf := range workflows {
202 if wf == nil || wf.Name == "" {
203 continue
204 }
205 wf := wf
206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf)
207 }
208}
209
210// spawnWorkflow does the per-workflow API + persistence work for
211// Spawn. Errors are logged with full context but not returned —
212// nothing in tack consumes the result, and a failed Spawn just
213// surfaces as the absence of any status update for the affected
214// workflow.
215func (p *buildkiteProvider) spawnWorkflow(
216 ctx context.Context,
217 knot string,
218 pipelineRkey string,
219 trigger *tangled.Pipeline_TriggerMetadata,
220 wf *tangled.Pipeline_Workflow,
221) {
222 logger := p.log.With(
223 "knot", knot,
224 "pipeline_rkey", pipelineRkey,
225 "workflow", wf.Name,
226 )
227
228 cfg, err := parseWorkflowConfig(wf.Raw)
229 if err != nil {
230 // Bad workflow YAML is a user-facing config error: log it
231 // loudly and skip. Firing a build off some default would
232 // be more confusing than doing nothing.
233 logger.Error("invalid workflow config; refusing to spawn", "err", err)
234 return
235 }
236 logger = logger.With("pipeline", cfg.Pipeline)
237
238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf)
239 if err != nil {
240 logger.Error("build create request", "err", err)
241 return
242 }
243
244 org := cfg.Org
245 if org == "" {
246 org = p.defaultOrg
247 }
248
249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req)
250 if err != nil {
251 logger.Error("create buildkite build", "err", err, "org", org)
252 return
253 }
254 logger.Info("buildkite build created",
255 "build_uuid", build.ID,
256 "build_number", build.Number,
257 "web_url", build.WebURL,
258 "org", org,
259 )
260
261 pipelineURI := pipelineATURI(knot, pipelineRkey)
262 // Persist the org we actually used. cfg.Org (not the resolved
263 // `org`) is intentional: empty means "fall back to defaultOrg
264 // at read time", which keeps the row meaningful even if the
265 // provider's defaultOrg changes later — and matches what older
266 // rows scan as before this column existed.
267 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{
268 BuildUUID: build.ID,
269 BuildNumber: build.Number,
270 PipelineSlug: cfg.Pipeline,
271 Org: cfg.Org,
272 Knot: knot,
273 PipelineRkey: pipelineRkey,
274 Workflow: wf.Name,
275 PipelineURI: pipelineURI,
276 }); err != nil {
277 // Webhook handlers will fail to translate this build's
278 // events because they can't recover the tuple. Surface
279 // loudly and bail; we don't want a half-tracked build
280 // silently leaking status into the broker.
281 logger.Error("persist buildkite build mapping", "err", err,
282 "build_uuid", build.ID,
283 )
284 return
285 }
286
287 // Initial status publish so the appview shows the build as
288 // queued without waiting for the first webhook. This mirrors
289 // the upstream spindle's "schedule then run" cadence.
290 if err := p.publishStatus(
291 ctx, pipelineURI, wf.Name, "pending", build.ID,
292 nil, nil,
293 ); err != nil {
294 logger.Error("publish initial pending status", "err", err)
295 }
296}
297
298// buildCreateRequest folds the parsed workflow config and the
299// Tangled trigger metadata into a single Buildkite create-build
300// payload. Trigger metadata supplies commit/branch; the workflow
301// YAML supplies the Buildkite routing knobs (pipeline/org) and the
302// small handful of build options we expose.
303//
304// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled
305// refs frequently don't match arbitrary Buildkite pipeline branch
306// filters, and a build silently dropped at create time is a worse
307// failure mode than running one we shouldn't have. Users wanting
308// the filter back are expected to drop the filter on the Buildkite
309// pipeline itself.
310//
311// Returns an error when the trigger lacks a commit — Buildkite's
312// API requires one and we'd rather log+skip than fire a build that
313// resolves to "whatever main happens to be".
314func (p *buildkiteProvider) buildCreateRequest(
315 cfg *buildkiteConfig,
316 trigger *tangled.Pipeline_TriggerMetadata,
317 knot, pipelineRkey string,
318 wf *tangled.Pipeline_Workflow,
319) (buildkite.CreateBuildRequest, error) {
320 commit, branch := triggerCommitAndBranch(trigger)
321 if commit == "" {
322 return buildkite.CreateBuildRequest{}, errors.New(
323 "trigger has no commit",
324 )
325 }
326
327 cleanCheckout := false
328 if cfg.CleanCheckout != nil {
329 cleanCheckout = *cfg.CleanCheckout
330 }
331
332 req := buildkite.CreateBuildRequest{
333 Commit: commit,
334 Branch: branch,
335 Message: fmt.Sprintf("tangled: %s", wf.Name),
336 Env: envFromTuple(knot, pipelineRkey, wf),
337 MetaData: map[string]string{
338 bkMetaKnot: knot,
339 bkMetaPipelineRkey: pipelineRkey,
340 bkMetaWorkflow: wf.Name,
341 },
342 CleanCheckout: cleanCheckout,
343 IgnorePipelineBranchFilters: true,
344 }
345
346 // Auto-populate Buildkite's PR fields from the Tangled PR
347 // trigger when present. Buildkite doesn't get a PR number from
348 // us (Tangled doesn't surface one through the trigger), but
349 // the base branch alone is enough for `pull_request_base_branch`-
350 // gated step filters to work.
351 if trigger != nil && trigger.PullRequest != nil {
352 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch
353 }
354
355 return req, nil
356}
357
358// Logs satisfies Provider. We resolve the (knot, rkey, workflow)
359// tuple to a Buildkite build via the store, fetch the current jobs
360// list, then drain each job's plain-text log into the channel as one
361// LogLine per output line.
362//
363// Per-job control frames bracket each job so the appview's renderer
364// has start/end markers to lay out timing — same shape as the fake
365// provider and the upstream spindle.
366//
367// This is a snapshot read, not a tail — finished or in-progress, we
368// fetch what's there and close. Live tailing would require Buildkite
369// agent log streaming, which the public REST API doesn't expose; the
370// appview's repeated /logs calls during a running build give us
371// "good enough" liveness without that complexity.
372func (p *buildkiteProvider) Logs(
373 ctx context.Context,
374 knot string,
375 pipelineRkey string,
376 workflow string,
377) (<-chan LogLine, error) {
378 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow)
379 if err != nil {
380 return nil, fmt.Errorf("lookup build mapping: %w", err)
381 }
382 if ref == nil {
383 return nil, ErrLogsNotFound
384 }
385
386 // Resolve the org against which we should pull jobs/logs. The
387 // workflow YAML can override the spindle's default at Spawn
388 // time (tack.buildkite.org), so we honour whatever the row
389 // recorded; an empty value means the workflow didn't override,
390 // which is the same as "use defaultOrg" — including for rows
391 // written before the org column was added.
392 org := ref.Org
393 if org == "" {
394 org = p.defaultOrg
395 }
396
397 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber)
398 if err != nil {
399 if errors.Is(err, buildkite.ErrNotFound) {
400 return nil, ErrLogsNotFound
401 }
402 return nil, fmt.Errorf("get build: %w", err)
403 }
404
405 out := make(chan LogLine, 32)
406 go func() {
407 defer close(out)
408 stepID := 0
409 for _, job := range build.Jobs {
410 if job.Type != "" && job.Type != "script" {
411 // Skip non-script jobs (waiter, manual,
412 // trigger). They have no log to fetch and
413 // surfacing empty steps just clutters the
414 // appview.
415 continue
416 }
417 name := job.Name
418 if name == "" {
419 name = fmt.Sprintf("job %s", job.ID)
420 }
421
422 if !sendLine(ctx, out, LogLine{
423 Kind: LogKindControl,
424 Time: time.Now(),
425 Content: name,
426 StepId: stepID,
427 StepStatus: StepStatusStart,
428 }) {
429 return
430 }
431
432 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID)
433 if err != nil {
434 p.log.Debug("fetch job log",
435 "err", err,
436 "build_uuid", ref.BuildUUID,
437 "job_id", job.ID,
438 )
439 // Don't fail the whole stream on one job;
440 // emit the end frame and move on so the
441 // appview at least sees what other jobs
442 // produced.
443 body = ""
444 }
445
446 // Buildkite injects per-line timestamp metadata as
447 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and
448 // some renderers downstream don't recognise the APC
449 // envelope, leaking the inner "_bk;t=…" payload into
450 // the displayed text. Strip them here so consumers
451 // only ever see the actual log content.
452 body = stripTerminal(body)
453
454 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") {
455 if line == "" {
456 // Skip the leading empty entry that
457 // Split produces for empty bodies.
458 continue
459 }
460 if !sendLine(ctx, out, LogLine{
461 Kind: LogKindData,
462 Time: time.Now(),
463 Content: line + "\n",
464 StepId: stepID,
465 Stream: "stdout",
466 }) {
467 return
468 }
469 }
470
471 if !sendLine(ctx, out, LogLine{
472 Kind: LogKindControl,
473 Time: time.Now(),
474 Content: name,
475 StepId: stepID,
476 StepStatus: StepStatusEnd,
477 }) {
478 return
479 }
480 stepID++
481 }
482 }()
483 return out, nil
484}
485
486// publishStatus assembles a tangled.PipelineStatus record and pushes
487// it through the broker. buildUUID is mixed into the rkey so multiple
488// status events for the same workflow don't collide on the events
489// table's (rkey) uniqueness — and so an operator grepping the log
490// can find every record that pertains to a given Buildkite build.
491//
492// errMsg/exitCode are optional; pass nil for non-failure transitions.
493func (p *buildkiteProvider) publishStatus(
494 ctx context.Context,
495 pipelineURI, workflow, status, buildUUID string,
496 errMsg *string,
497 exitCode *int64,
498) error {
499 rec := tangled.PipelineStatus{
500 LexiconTypeID: tangled.PipelineStatusNSID,
501 Pipeline: pipelineURI,
502 Workflow: workflow,
503 Status: status,
504 CreatedAt: time.Now().UTC().Format(time.RFC3339),
505 Error: errMsg,
506 ExitCode: exitCode,
507 }
508 body, err := json.Marshal(rec)
509 if err != nil {
510 return fmt.Errorf("marshal pipeline.status: %w", err)
511 }
512 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano())
513 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
514 return fmt.Errorf("publish pipeline.status: %w", err)
515 }
516 return nil
517}
518
519// HandleWebhook applies a decoded Buildkite webhook payload: looks
520// the build up in the store, translates the Buildkite state into a
521// Tangled StatusKind, and publishes a pipeline.status record. Used
522// by the HTTP webhook handler so both the ingress logic and the
523// translation logic live next to each other.
524//
525// Returns nil for events we intentionally ignore (job.* events,
526// build.scheduled which we already publish locally on Spawn, builds
527// we don't have a mapping for) so the handler can 200 them — webhook
528// retries from Buildkite on a 4xx/5xx are noisy and not what we want
529// for "we just don't care about this event".
530func (p *buildkiteProvider) HandleWebhook(
531 ctx context.Context,
532 payload buildkite.WebhookPayload,
533) error {
534 // Only build.* events drive pipeline.status today. Everything
535 // else (job.*, agent.*, ping) is acknowledged silently.
536 if !strings.HasPrefix(payload.Event, "build.") {
537 return nil
538 }
539
540 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID)
541 if err != nil {
542 return fmt.Errorf("lookup build by uuid: %w", err)
543 }
544 if ref == nil {
545 // Most likely: this build was triggered outside tack and
546 // just happens to share our webhook URL. Nothing to do.
547 p.log.Debug("webhook for unknown build; ignoring",
548 "event", payload.Event,
549 "build_uuid", payload.Build.ID,
550 )
551 return nil
552 }
553
554 status, ok := mapBuildkiteState(payload.Build.State)
555 if !ok {
556 // Unknown / transient state ("blocked", "skipped",
557 // "not_run", "waiting"…) — log so we can extend the map
558 // later, but don't error out the webhook.
559 p.log.Debug("unmapped buildkite state; ignoring",
560 "event", payload.Event,
561 "state", payload.Build.State,
562 "build_uuid", payload.Build.ID,
563 )
564 return nil
565 }
566
567 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
568 status, ref.BuildUUID, nil, nil); err != nil {
569 return fmt.Errorf("publish webhook status: %w", err)
570 }
571 p.log.Info("buildkite webhook → pipeline.status",
572 "event", payload.Event,
573 "state", payload.Build.State,
574 "status", status,
575 "build_uuid", payload.Build.ID,
576 "workflow", ref.Workflow,
577 )
578 return nil
579}
580
581// mapBuildkiteState translates Buildkite's build state strings into
582// the Tangled spindle StatusKind enum. The mapping aligns with the
583// upstream constants (StatusKindRunning/Failed/Cancelled/Success);
584// states that don't have a direct analogue (blocked, skipped,
585// not_run) are reported as not-mapped so the caller can decide
586// whether to ignore them.
587func mapBuildkiteState(state string) (string, bool) {
588 switch state {
589 case "scheduled":
590 return "pending", true
591 case "running", "failing":
592 return "running", true
593 case "passed":
594 return "success", true
595 case "failed":
596 return "failed", true
597 case "canceled", "canceling":
598 return "cancelled", true
599 default:
600 return "", false
601 }
602}
603
604// envFromTuple builds the env block forwarded into the Buildkite
605// build. These are the only handle a user's Buildkite pipeline has
606// on the originating Tangled trigger: their pipeline.yml typically
607// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a
608// `pipeline upload` against a workflow-specific YAML file).
609//
610// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as
611// captured in the Tangled record. It can be empty if the workflow
612// definition omitted it; consumers should defend.
613func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string {
614 return map[string]string{
615 "TACK_KNOT": knot,
616 "TACK_PIPELINE_RKEY": pipelineRkey,
617 "TACK_WORKFLOW": wf.Name,
618 "TACK_WORKFLOW_RAW": wf.Raw,
619 }
620}
621
622// pipelineATURI returns the at-uri the appview joins pipeline.status
623// records back to their originating pipeline on. Format mirrors the
624// upstream spindle; the appview strips the `did:web:` prefix and
625// treats the remainder as the knot identifier.
626func pipelineATURI(knot, pipelineRkey string) string {
627 return fmt.Sprintf("at://did:web:%s/%s/%s",
628 knot, tangled.PipelineNSID, pipelineRkey,
629 )
630}
631
632// triggerCommitAndBranch extracts (commit, branch) from a Tangled
633// pipeline trigger, regardless of whether it was a push, a pull
634// request, or a manual run. Returns empty strings on a fully-empty
635// trigger so the caller can decide whether that's fatal.
636func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) {
637 if trigger == nil {
638 return "", ""
639 }
640 switch {
641 case trigger.Push != nil:
642 // For push events, NewSha is the commit being built and
643 // Ref is the full ref (e.g. "refs/heads/main") — strip
644 // the prefix so Buildkite's branch-aware features work.
645 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref)
646 case trigger.PullRequest != nil:
647 // PRs build the source commit on the source branch.
648 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch
649 default:
650 // Manual triggers and any future kinds: fall back to the
651 // repo default branch with no commit, which the caller
652 // will treat as fatal — manual triggers will need
653 // additional plumbing to pick a commit.
654 if trigger.Repo != nil {
655 return "", trigger.Repo.DefaultBranch
656 }
657 return "", ""
658 }
659}
660
661// refToBranch strips the conventional refs/heads/ prefix from a git
662// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are
663// returned as-is so downstream tooling can decide what to do with
664// them — Buildkite happily accepts either form in `branch`.
665func refToBranch(ref string) string {
666 const prefix = "refs/heads/"
667 if strings.HasPrefix(ref, prefix) {
668 return strings.TrimPrefix(ref, prefix)
669 }
670 return ref
671}
672
673// stripTerminal removes ANSI/ECMA-48 escape sequences from a log
674// payload, leaving only the displayable text. We need this because
675// Buildkite ships its plain-text log API with the agent's full
676// terminal output — per-line timestamp APC envelopes
677// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL
678// (`ESC [ K`), OSC title sets, etc. — and our consumers are not
679// terminal emulators; they render the bytes verbatim.
680//
681// We recognise the standard escape families described by ECMA-48:
682//
683// - CSI: ESC '[' parameters intermediates final
684// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\')
685// - everything else: ESC <single byte>
686//
687// As a safety belt we also strip the bare "_bk;t=<digits>" residue
688// that appears when an upstream processor has stripped the ESC/BEL
689// framing without understanding the APC envelope inside it.
690//
691// We should use libghostty for obvious reasons.
692func stripTerminal(s string) string {
693 if !strings.ContainsAny(s, "\x1b_") {
694 return s
695 }
696 var b strings.Builder
697 b.Grow(len(s))
698 for i := 0; i < len(s); {
699 if s[i] == 0x1b && i+1 < len(s) {
700 switch s[i+1] {
701 case '[':
702 // CSI: parameter bytes 0x30-0x3F, then
703 // intermediate bytes 0x20-0x2F, then a
704 // single final byte 0x40-0x7E. Anything
705 // that doesn't conform we drop minimally
706 // (just the ESC) so we don't swallow
707 // legitimate text.
708 j := i + 2
709 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F {
710 j++
711 }
712 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F {
713 j++
714 }
715 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E {
716 i = j + 1
717 continue
718 }
719 i += 2
720 continue
721 case ']', 'P', '_', 'X', '^':
722 // OSC/DCS/APC/SOS/PM: terminated by BEL or
723 // ST (ESC '\'). Drop the entire envelope.
724 j := i + 2
725 for j < len(s) {
726 if s[j] == 0x07 {
727 j++
728 break
729 }
730 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' {
731 j += 2
732 break
733 }
734 j++
735 }
736 i = j
737 continue
738 default:
739 // Two-byte escape (RIS, DECSC, charset
740 // selection, …). Drop both bytes.
741 i += 2
742 continue
743 }
744 }
745 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing.
746 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") {
747 j := i + len("_bk;t=")
748 for j < len(s) && s[j] >= '0' && s[j] <= '9' {
749 j++
750 }
751 if j > i+len("_bk;t=") {
752 i = j
753 continue
754 }
755 }
756 b.WriteByte(s[i])
757 i++
758 }
759 return b.String()
760}
761
762// sendLine pushes one LogLine into out, returning false if ctx
763// fired first. Centralised so the per-job loop in Logs stays
764// focused on the wire-shape decisions.
765func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool {
766 select {
767 case <-ctx.Done():
768 return false
769 case out <- line:
770 return true
771 }
772}