Stitch any CI into Tangled
1package main
2
3// buildkiteProvider implements Provider against a real Buildkite
4// account. Spawn translates a Tangled pipeline trigger into one
5// Buildkite build per workflow; status updates flow back asynchronously
6// through the /webhooks/buildkite handler (see http.go), which looks
7// the build UUID up in the buildkite_builds table to recover the
8// (knot, pipelineRkey, workflow) tuple this provider persisted at
9// Spawn time and publishes a sh.tangled.pipeline.status record on
10// the in-process broker.
11//
12// The Buildkite *pipeline slug* a workflow targets is carried inside
13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured
14// globally on the spindle. That keeps tack a thin translator: the
15// repo author decides which Buildkite pipeline runs each Tangled
16// workflow without an operator round-trip. See workflowConfig below
17// for the supported YAML schema.
18
19import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "log/slog"
25 "net/http"
26 "strings"
27 "time"
28
29 "go.yaml.in/yaml/v2"
30 "tangled.org/core/api/tangled"
31
32 "go.mitchellh.com/tack/internal/buildkite"
33)
34
35// Buildkite-side meta_data keys carrying the Tangled identity of a
36// build. Mirrored into env vars (see envFromTuple) so an operator's
37// Buildkite pipeline script can also reach them via $TACK_*. They
38// stay tightly namespaced so a coexisting Buildkite job that uses
39// meta_data for its own purposes won't collide.
40const (
41 bkMetaKnot = "tack:knot"
42 bkMetaPipelineRkey = "tack:pipeline_rkey"
43 bkMetaWorkflow = "tack:workflow"
44)
45
46// workflowConfig is the tack-flavoured schema we expect inside each
47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline`
48// slug is required; everything else is optional. Fields nest under
49// `tack: { buildkite: ... }` so the workflow YAML can grow other
50// top-level keys (Tangled's own scheduling fields, future provider
51// blocks) without colliding with our namespace.
52//
53// Fields map onto the Buildkite REST "Create a build" request
54// properties documented at
55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build
56// (see also the comment block on buildkite.CreateBuildRequest). We
57// expose only the small subset users genuinely need to override —
58// trigger metadata supplies commit/branch, and tack supplies the
59// identity env+meta the webhook handler relies on, so there's no
60// reason to let users re-specify those.
61type workflowConfig struct {
62 Tack tackConfig `yaml:"tack"`
63}
64
65// tackConfig is the per-provider block under the top-level `tack:`
66// key. Right now the only nested provider is Buildkite.
67type tackConfig struct {
68 Buildkite buildkiteConfig `yaml:"buildkite"`
69}
70
71// buildkiteConfig is the Buildkite-specific subset of workflowConfig.
72//
73// `org` lets a workflow target a Buildkite organisation other than
74// the spindle's default — useful when one tack instance fronts
75// multiple orgs. The configured API token must have access to that
76// org or the build creation request will 401/403; we surface that
77// error verbatim rather than guessing.
78//
79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout
80// is a *bool so omitting it leaves Buildkite's own default in place
81// instead of always shipping `false`.
82type buildkiteConfig struct {
83 Pipeline string `yaml:"pipeline"`
84 Org string `yaml:"org"`
85 CleanCheckout *bool `yaml:"clean_checkout"`
86}
87
88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig.
89// An empty body is treated as a structural error so spawnWorkflow can
90// short-circuit cleanly: a workflow with no body has nothing for tack
91// to do anyway.
92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) {
93 if strings.TrimSpace(raw) == "" {
94 return nil, errors.New("workflow body is empty")
95 }
96 var cfg workflowConfig
97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil {
98 return nil, fmt.Errorf("parse workflow yaml: %w", err)
99 }
100 bk := cfg.Tack.Buildkite
101 if bk.Pipeline == "" {
102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required")
103 }
104 return &bk, nil
105}
106
107// buildkiteProvider implements Provider.
108//
109// webhookSecret + webhookMode live on the provider rather than on
110// the HTTP server because the provider is the single owner of
111// "everything Buildkite-y": colocating the auth knob with the API
112// client and the state translator keeps configuration drift to one
113// place and makes the http.go side pure transport.
114//
115// defaultOrg is the Buildkite organisation the configured API token
116// belongs to. Workflows may opt into a different org via their YAML
117// `org` field; the API token then needs to be authorised against it.
118type buildkiteProvider struct {
119 br *broker
120 st *store
121 log *slog.Logger
122 client *buildkite.Client
123 defaultOrg string
124 webhookSecret string
125 webhookMode buildkite.WebhookMode
126}
127
128// Compile-time interface conformance check.
129var _ Provider = (*buildkiteProvider)(nil)
130
131// newBuildkiteProvider wires a provider to its Buildkite client and
132// to the broker it publishes pipeline.status records on. defaultOrg
133// is the org the API token authenticates against and the org used
134// when a workflow doesn't specify its own. webhookSecret/webhookMode
135// govern inbound /webhooks/buildkite request authentication.
136func newBuildkiteProvider(
137 br *broker,
138 st *store,
139 client *buildkite.Client,
140 defaultOrg string,
141 webhookSecret string,
142 webhookMode buildkite.WebhookMode,
143 log *slog.Logger,
144) *buildkiteProvider {
145 return &buildkiteProvider{
146 br: br,
147 st: st,
148 log: log.With("component", "provider", "kind", "buildkite"),
149 client: client,
150 defaultOrg: defaultOrg,
151 webhookSecret: webhookSecret,
152 webhookMode: webhookMode,
153 }
154}
155
156// VerifyWebhook authenticates an inbound webhook request using
157// whichever mode the provider was configured with. Returns nil on
158// success; the HTTP handler maps any returned error to 401.
159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error {
160 switch p.webhookMode {
161 case buildkite.WebhookModeSignature:
162 return buildkite.VerifySignature(
163 headers.Get("X-Buildkite-Signature"),
164 p.webhookSecret, body,
165 )
166 default:
167 // Token mode is the Buildkite default and our default, so
168 // any unrecognised value falls through to it rather than
169 // fail-closed at startup.
170 return buildkite.VerifyToken(
171 headers.Get("X-Buildkite-Token"),
172 p.webhookSecret,
173 )
174 }
175}
176
177// Spawn satisfies Provider. For each workflow it fires a separate
178// Buildkite build off the pipeline named in that workflow's YAML so
179// each workflow gets its own status timeline. The actual API call
180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we
181// still want Spawn to be non-blocking per the interface contract.
182//
183// On a successful create we persist the build UUID → (knot, rkey,
184// workflow) mapping and publish a "pending" pipeline.status so the
185// appview sees activity immediately, instead of waiting for the
186// first webhook to land.
187func (p *buildkiteProvider) Spawn(
188 ctx context.Context,
189 knot string,
190 pipelineRkey string,
191 trigger *tangled.Pipeline_TriggerMetadata,
192 workflows []*tangled.Pipeline_Workflow,
193) {
194 if len(workflows) == 0 {
195 p.log.Warn("pipeline has no workflows; nothing to spawn",
196 "knot", knot, "rkey", pipelineRkey,
197 )
198 return
199 }
200
201 for _, wf := range workflows {
202 if wf == nil || wf.Name == "" {
203 continue
204 }
205 wf := wf
206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf)
207 }
208}
209
210// spawnWorkflow does the per-workflow API + persistence work for
211// Spawn. Errors are logged with full context but not returned —
212// nothing in tack consumes the result, and a failed Spawn just
213// surfaces as the absence of any status update for the affected
214// workflow.
215func (p *buildkiteProvider) spawnWorkflow(
216 ctx context.Context,
217 knot string,
218 pipelineRkey string,
219 trigger *tangled.Pipeline_TriggerMetadata,
220 wf *tangled.Pipeline_Workflow,
221) {
222 logger := p.log.With(
223 "knot", knot,
224 "pipeline_rkey", pipelineRkey,
225 "workflow", wf.Name,
226 )
227
228 cfg, err := parseWorkflowConfig(wf.Raw)
229 if err != nil {
230 // Bad workflow YAML is a user-facing config error: log it
231 // loudly and skip. Firing a build off some default would
232 // be more confusing than doing nothing.
233 logger.Error("invalid workflow config; refusing to spawn", "err", err)
234 return
235 }
236 logger = logger.With("pipeline", cfg.Pipeline)
237
238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf)
239 if err != nil {
240 logger.Error("build create request", "err", err)
241 return
242 }
243
244 org := cfg.Org
245 if org == "" {
246 org = p.defaultOrg
247 }
248
249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req)
250 if err != nil {
251 logger.Error("create buildkite build", "err", err, "org", org)
252 return
253 }
254 logger.Info("buildkite build created",
255 "build_uuid", build.ID,
256 "build_number", build.Number,
257 "web_url", build.WebURL,
258 "org", org,
259 )
260
261 pipelineURI := pipelineATURI(knot, pipelineRkey)
262 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{
263 BuildUUID: build.ID,
264 BuildNumber: build.Number,
265 PipelineSlug: cfg.Pipeline,
266 Knot: knot,
267 PipelineRkey: pipelineRkey,
268 Workflow: wf.Name,
269 PipelineURI: pipelineURI,
270 }); err != nil {
271 // Webhook handlers will fail to translate this build's
272 // events because they can't recover the tuple. Surface
273 // loudly and bail; we don't want a half-tracked build
274 // silently leaking status into the broker.
275 logger.Error("persist buildkite build mapping", "err", err,
276 "build_uuid", build.ID,
277 )
278 return
279 }
280
281 // Initial status publish so the appview shows the build as
282 // queued without waiting for the first webhook. This mirrors
283 // the upstream spindle's "schedule then run" cadence.
284 if err := p.publishStatus(
285 ctx, pipelineURI, wf.Name, "pending", build.ID,
286 nil, nil,
287 ); err != nil {
288 logger.Error("publish initial pending status", "err", err)
289 }
290}
291
292// buildCreateRequest folds the parsed workflow config and the
293// Tangled trigger metadata into a single Buildkite create-build
294// payload. Trigger metadata supplies commit/branch; the workflow
295// YAML supplies the Buildkite routing knobs (pipeline/org) and the
296// small handful of build options we expose.
297//
298// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled
299// refs frequently don't match arbitrary Buildkite pipeline branch
300// filters, and a build silently dropped at create time is a worse
301// failure mode than running one we shouldn't have. Users wanting
302// the filter back are expected to drop the filter on the Buildkite
303// pipeline itself.
304//
305// Returns an error when the trigger lacks a commit — Buildkite's
306// API requires one and we'd rather log+skip than fire a build that
307// resolves to "whatever main happens to be".
308func (p *buildkiteProvider) buildCreateRequest(
309 cfg *buildkiteConfig,
310 trigger *tangled.Pipeline_TriggerMetadata,
311 knot, pipelineRkey string,
312 wf *tangled.Pipeline_Workflow,
313) (buildkite.CreateBuildRequest, error) {
314 commit, branch := triggerCommitAndBranch(trigger)
315 if commit == "" {
316 return buildkite.CreateBuildRequest{}, errors.New(
317 "trigger has no commit",
318 )
319 }
320
321 cleanCheckout := false
322 if cfg.CleanCheckout != nil {
323 cleanCheckout = *cfg.CleanCheckout
324 }
325
326 req := buildkite.CreateBuildRequest{
327 Commit: commit,
328 Branch: branch,
329 Message: fmt.Sprintf("tangled: %s", wf.Name),
330 Env: envFromTuple(knot, pipelineRkey, wf),
331 MetaData: map[string]string{
332 bkMetaKnot: knot,
333 bkMetaPipelineRkey: pipelineRkey,
334 bkMetaWorkflow: wf.Name,
335 },
336 CleanCheckout: cleanCheckout,
337 IgnorePipelineBranchFilters: true,
338 }
339
340 // Auto-populate Buildkite's PR fields from the Tangled PR
341 // trigger when present. Buildkite doesn't get a PR number from
342 // us (Tangled doesn't surface one through the trigger), but
343 // the base branch alone is enough for `pull_request_base_branch`-
344 // gated step filters to work.
345 if trigger != nil && trigger.PullRequest != nil {
346 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch
347 }
348
349 return req, nil
350}
351
352// Logs satisfies Provider. We resolve the (knot, rkey, workflow)
353// tuple to a Buildkite build via the store, fetch the current jobs
354// list, then drain each job's plain-text log into the channel as one
355// LogLine per output line.
356//
357// Per-job control frames bracket each job so the appview's renderer
358// has start/end markers to lay out timing — same shape as the fake
359// provider and the upstream spindle.
360//
361// This is a snapshot read, not a tail — finished or in-progress, we
362// fetch what's there and close. Live tailing would require Buildkite
363// agent log streaming, which the public REST API doesn't expose; the
364// appview's repeated /logs calls during a running build give us
365// "good enough" liveness without that complexity.
366func (p *buildkiteProvider) Logs(
367 ctx context.Context,
368 knot string,
369 pipelineRkey string,
370 workflow string,
371) (<-chan LogLine, error) {
372 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow)
373 if err != nil {
374 return nil, fmt.Errorf("lookup build mapping: %w", err)
375 }
376 if ref == nil {
377 return nil, ErrLogsNotFound
378 }
379
380 // Resolve the org against which we should pull jobs/logs. We
381 // don't persist it on BuildkiteBuildRef today (the slug + token
382 // have always been enough); fall back to the provider default,
383 // which is correct for the common single-org install. A
384 // future migration can add a column when multi-org installs
385 // need it.
386 org := p.defaultOrg
387
388 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber)
389 if err != nil {
390 if errors.Is(err, buildkite.ErrNotFound) {
391 return nil, ErrLogsNotFound
392 }
393 return nil, fmt.Errorf("get build: %w", err)
394 }
395
396 out := make(chan LogLine, 32)
397 go func() {
398 defer close(out)
399 stepID := 0
400 for _, job := range build.Jobs {
401 if job.Type != "" && job.Type != "script" {
402 // Skip non-script jobs (waiter, manual,
403 // trigger). They have no log to fetch and
404 // surfacing empty steps just clutters the
405 // appview.
406 continue
407 }
408 name := job.Name
409 if name == "" {
410 name = fmt.Sprintf("job %s", job.ID)
411 }
412
413 if !sendLine(ctx, out, LogLine{
414 Kind: LogKindControl,
415 Time: time.Now(),
416 Content: name,
417 StepId: stepID,
418 StepStatus: StepStatusStart,
419 }) {
420 return
421 }
422
423 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID)
424 if err != nil {
425 p.log.Debug("fetch job log",
426 "err", err,
427 "build_uuid", ref.BuildUUID,
428 "job_id", job.ID,
429 )
430 // Don't fail the whole stream on one job;
431 // emit the end frame and move on so the
432 // appview at least sees what other jobs
433 // produced.
434 body = ""
435 }
436
437 // Buildkite injects per-line timestamp metadata as
438 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and
439 // some renderers downstream don't recognise the APC
440 // envelope, leaking the inner "_bk;t=…" payload into
441 // the displayed text. Strip them here so consumers
442 // only ever see the actual log content.
443 body = stripTerminal(body)
444
445 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") {
446 if line == "" {
447 // Skip the leading empty entry that
448 // Split produces for empty bodies.
449 continue
450 }
451 if !sendLine(ctx, out, LogLine{
452 Kind: LogKindData,
453 Time: time.Now(),
454 Content: line + "\n",
455 StepId: stepID,
456 Stream: "stdout",
457 }) {
458 return
459 }
460 }
461
462 if !sendLine(ctx, out, LogLine{
463 Kind: LogKindControl,
464 Time: time.Now(),
465 Content: name,
466 StepId: stepID,
467 StepStatus: StepStatusEnd,
468 }) {
469 return
470 }
471 stepID++
472 }
473 }()
474 return out, nil
475}
476
477// publishStatus assembles a tangled.PipelineStatus record and pushes
478// it through the broker. buildUUID is mixed into the rkey so multiple
479// status events for the same workflow don't collide on the events
480// table's (rkey) uniqueness — and so an operator grepping the log
481// can find every record that pertains to a given Buildkite build.
482//
483// errMsg/exitCode are optional; pass nil for non-failure transitions.
484func (p *buildkiteProvider) publishStatus(
485 ctx context.Context,
486 pipelineURI, workflow, status, buildUUID string,
487 errMsg *string,
488 exitCode *int64,
489) error {
490 rec := tangled.PipelineStatus{
491 LexiconTypeID: tangled.PipelineStatusNSID,
492 Pipeline: pipelineURI,
493 Workflow: workflow,
494 Status: status,
495 CreatedAt: time.Now().UTC().Format(time.RFC3339),
496 Error: errMsg,
497 ExitCode: exitCode,
498 }
499 body, err := json.Marshal(rec)
500 if err != nil {
501 return fmt.Errorf("marshal pipeline.status: %w", err)
502 }
503 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano())
504 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
505 return fmt.Errorf("publish pipeline.status: %w", err)
506 }
507 return nil
508}
509
510// HandleWebhook applies a decoded Buildkite webhook payload: looks
511// the build up in the store, translates the Buildkite state into a
512// Tangled StatusKind, and publishes a pipeline.status record. Used
513// by the HTTP webhook handler so both the ingress logic and the
514// translation logic live next to each other.
515//
516// Returns nil for events we intentionally ignore (job.* events,
517// build.scheduled which we already publish locally on Spawn, builds
518// we don't have a mapping for) so the handler can 200 them — webhook
519// retries from Buildkite on a 4xx/5xx are noisy and not what we want
520// for "we just don't care about this event".
521func (p *buildkiteProvider) HandleWebhook(
522 ctx context.Context,
523 payload buildkite.WebhookPayload,
524) error {
525 // Only build.* events drive pipeline.status today. Everything
526 // else (job.*, agent.*, ping) is acknowledged silently.
527 if !strings.HasPrefix(payload.Event, "build.") {
528 return nil
529 }
530
531 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID)
532 if err != nil {
533 return fmt.Errorf("lookup build by uuid: %w", err)
534 }
535 if ref == nil {
536 // Most likely: this build was triggered outside tack and
537 // just happens to share our webhook URL. Nothing to do.
538 p.log.Debug("webhook for unknown build; ignoring",
539 "event", payload.Event,
540 "build_uuid", payload.Build.ID,
541 )
542 return nil
543 }
544
545 status, ok := mapBuildkiteState(payload.Build.State)
546 if !ok {
547 // Unknown / transient state ("blocked", "skipped",
548 // "not_run", "waiting"…) — log so we can extend the map
549 // later, but don't error out the webhook.
550 p.log.Debug("unmapped buildkite state; ignoring",
551 "event", payload.Event,
552 "state", payload.Build.State,
553 "build_uuid", payload.Build.ID,
554 )
555 return nil
556 }
557
558 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
559 status, ref.BuildUUID, nil, nil); err != nil {
560 return fmt.Errorf("publish webhook status: %w", err)
561 }
562 p.log.Info("buildkite webhook → pipeline.status",
563 "event", payload.Event,
564 "state", payload.Build.State,
565 "status", status,
566 "build_uuid", payload.Build.ID,
567 "workflow", ref.Workflow,
568 )
569 return nil
570}
571
572// mapBuildkiteState translates Buildkite's build state strings into
573// the Tangled spindle StatusKind enum. The mapping aligns with the
574// upstream constants (StatusKindRunning/Failed/Cancelled/Success);
575// states that don't have a direct analogue (blocked, skipped,
576// not_run) are reported as not-mapped so the caller can decide
577// whether to ignore them.
578func mapBuildkiteState(state string) (string, bool) {
579 switch state {
580 case "scheduled":
581 return "pending", true
582 case "running", "failing":
583 return "running", true
584 case "passed":
585 return "success", true
586 case "failed":
587 return "failed", true
588 case "canceled", "canceling":
589 return "cancelled", true
590 default:
591 return "", false
592 }
593}
594
595// envFromTuple builds the env block forwarded into the Buildkite
596// build. These are the only handle a user's Buildkite pipeline has
597// on the originating Tangled trigger: their pipeline.yml typically
598// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a
599// `pipeline upload` against a workflow-specific YAML file).
600//
601// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as
602// captured in the Tangled record. It can be empty if the workflow
603// definition omitted it; consumers should defend.
604func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string {
605 return map[string]string{
606 "TACK_KNOT": knot,
607 "TACK_PIPELINE_RKEY": pipelineRkey,
608 "TACK_WORKFLOW": wf.Name,
609 "TACK_WORKFLOW_RAW": wf.Raw,
610 }
611}
612
613// pipelineATURI returns the at-uri the appview joins pipeline.status
614// records back to their originating pipeline on. Format mirrors the
615// upstream spindle; the appview strips the `did:web:` prefix and
616// treats the remainder as the knot identifier.
617func pipelineATURI(knot, pipelineRkey string) string {
618 return fmt.Sprintf("at://did:web:%s/%s/%s",
619 knot, tangled.PipelineNSID, pipelineRkey,
620 )
621}
622
623// triggerCommitAndBranch extracts (commit, branch) from a Tangled
624// pipeline trigger, regardless of whether it was a push, a pull
625// request, or a manual run. Returns empty strings on a fully-empty
626// trigger so the caller can decide whether that's fatal.
627func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) {
628 if trigger == nil {
629 return "", ""
630 }
631 switch {
632 case trigger.Push != nil:
633 // For push events, NewSha is the commit being built and
634 // Ref is the full ref (e.g. "refs/heads/main") — strip
635 // the prefix so Buildkite's branch-aware features work.
636 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref)
637 case trigger.PullRequest != nil:
638 // PRs build the source commit on the source branch.
639 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch
640 default:
641 // Manual triggers and any future kinds: fall back to the
642 // repo default branch with no commit, which the caller
643 // will treat as fatal — manual triggers will need
644 // additional plumbing to pick a commit.
645 if trigger.Repo != nil {
646 return "", trigger.Repo.DefaultBranch
647 }
648 return "", ""
649 }
650}
651
652// refToBranch strips the conventional refs/heads/ prefix from a git
653// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are
654// returned as-is so downstream tooling can decide what to do with
655// them — Buildkite happily accepts either form in `branch`.
656func refToBranch(ref string) string {
657 const prefix = "refs/heads/"
658 if strings.HasPrefix(ref, prefix) {
659 return strings.TrimPrefix(ref, prefix)
660 }
661 return ref
662}
663
664// stripTerminal removes ANSI/ECMA-48 escape sequences from a log
665// payload, leaving only the displayable text. We need this because
666// Buildkite ships its plain-text log API with the agent's full
667// terminal output — per-line timestamp APC envelopes
668// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL
669// (`ESC [ K`), OSC title sets, etc. — and our consumers are not
670// terminal emulators; they render the bytes verbatim.
671//
672// We recognise the standard escape families described by ECMA-48:
673//
674// - CSI: ESC '[' parameters intermediates final
675// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\')
676// - everything else: ESC <single byte>
677//
678// As a safety belt we also strip the bare "_bk;t=<digits>" residue
679// that appears when an upstream processor has stripped the ESC/BEL
680// framing without understanding the APC envelope inside it.
681//
682// We should use libghostty for obvious reasons.
683func stripTerminal(s string) string {
684 if !strings.ContainsAny(s, "\x1b_") {
685 return s
686 }
687 var b strings.Builder
688 b.Grow(len(s))
689 for i := 0; i < len(s); {
690 if s[i] == 0x1b && i+1 < len(s) {
691 switch s[i+1] {
692 case '[':
693 // CSI: parameter bytes 0x30-0x3F, then
694 // intermediate bytes 0x20-0x2F, then a
695 // single final byte 0x40-0x7E. Anything
696 // that doesn't conform we drop minimally
697 // (just the ESC) so we don't swallow
698 // legitimate text.
699 j := i + 2
700 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F {
701 j++
702 }
703 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F {
704 j++
705 }
706 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E {
707 i = j + 1
708 continue
709 }
710 i += 2
711 continue
712 case ']', 'P', '_', 'X', '^':
713 // OSC/DCS/APC/SOS/PM: terminated by BEL or
714 // ST (ESC '\'). Drop the entire envelope.
715 j := i + 2
716 for j < len(s) {
717 if s[j] == 0x07 {
718 j++
719 break
720 }
721 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' {
722 j += 2
723 break
724 }
725 j++
726 }
727 i = j
728 continue
729 default:
730 // Two-byte escape (RIS, DECSC, charset
731 // selection, …). Drop both bytes.
732 i += 2
733 continue
734 }
735 }
736 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing.
737 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") {
738 j := i + len("_bk;t=")
739 for j < len(s) && s[j] >= '0' && s[j] <= '9' {
740 j++
741 }
742 if j > i+len("_bk;t=") {
743 i = j
744 continue
745 }
746 }
747 b.WriteByte(s[i])
748 i++
749 }
750 return b.String()
751}
752
753// sendLine pushes one LogLine into out, returning false if ctx
754// fired first. Centralised so the per-job loop in Logs stays
755// focused on the wire-shape decisions.
756func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool {
757 select {
758 case <-ctx.Done():
759 return false
760 case out <- line:
761 return true
762 }
763}