Stitch any CI into Tangled
1package main
2
3// buildkiteProvider implements Provider against a real Buildkite
4// account. Spawn translates a Tangled pipeline trigger into one
5// Buildkite build per workflow; status updates flow back asynchronously
6// through the /webhooks/buildkite handler (see http.go), which looks
7// the build UUID up in the buildkite_builds table to recover the
8// (knot, pipelineRkey, workflow) tuple this provider persisted at
9// Spawn time and publishes a sh.tangled.pipeline.status record on
10// the in-process broker.
11//
12// The Buildkite *pipeline slug* a workflow targets is carried inside
13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured
14// globally on the spindle. That keeps tack a thin translator: the
15// repo author decides which Buildkite pipeline runs each Tangled
16// workflow without an operator round-trip. See workflowConfig below
17// for the supported YAML schema.
18
19import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "log/slog"
25 "net/http"
26 "strings"
27 "time"
28
29 "go.yaml.in/yaml/v2"
30 "tangled.org/core/api/tangled"
31
32 "go.mitchellh.com/tack/internal/buildkite"
33)
34
35// Buildkite-side meta_data keys carrying the Tangled identity of a
36// build. Mirrored into env vars (see envFromTuple) so an operator's
37// Buildkite pipeline script can also reach them via $TACK_*. They
38// stay tightly namespaced so a coexisting Buildkite job that uses
39// meta_data for its own purposes won't collide.
40const (
41 bkMetaKnot = "tack:knot"
42 bkMetaPipelineRkey = "tack:pipeline_rkey"
43 bkMetaWorkflow = "tack:workflow"
44)
45
46// workflowConfig is the tack-flavoured schema we expect inside each
47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline`
48// slug is required; everything else is optional. Fields nest under
49// `tack: { buildkite: ... }` so the workflow YAML can grow other
50// top-level keys (Tangled's own scheduling fields, future provider
51// blocks) without colliding with our namespace.
52//
53// Fields map onto the Buildkite REST "Create a build" request
54// properties documented at
55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build
56// (see also the comment block on buildkite.CreateBuildRequest). We
57// expose only the small subset users genuinely need to override —
58// trigger metadata supplies commit/branch, and tack supplies the
59// identity env+meta the webhook handler relies on, so there's no
60// reason to let users re-specify those.
61type workflowConfig struct {
62 Tack tackConfig `yaml:"tack"`
63}
64
65// tackConfig is the per-provider block under the top-level `tack:`
66// key. Right now the only nested provider is Buildkite.
67type tackConfig struct {
68 Buildkite buildkiteConfig `yaml:"buildkite"`
69}
70
71// buildkiteConfig is the Buildkite-specific subset of workflowConfig.
72//
73// `org` lets a workflow target a Buildkite organisation other than
74// the spindle's default — useful when one tack instance fronts
75// multiple orgs. The configured API token must have access to that
76// org or the build creation request will 401/403; we surface that
77// error verbatim rather than guessing.
78//
79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout
80// is a *bool so omitting it leaves Buildkite's own default in place
81// instead of always shipping `false`.
82type buildkiteConfig struct {
83 Pipeline string `yaml:"pipeline"`
84 Org string `yaml:"org"`
85 CleanCheckout *bool `yaml:"clean_checkout"`
86}
87
88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig.
89// An empty body is treated as a structural error so spawnWorkflow can
90// short-circuit cleanly: a workflow with no body has nothing for tack
91// to do anyway.
92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) {
93 if strings.TrimSpace(raw) == "" {
94 return nil, errors.New("workflow body is empty")
95 }
96 var cfg workflowConfig
97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil {
98 return nil, fmt.Errorf("parse workflow yaml: %w", err)
99 }
100 bk := cfg.Tack.Buildkite
101 if bk.Pipeline == "" {
102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required")
103 }
104 return &bk, nil
105}
106
107// buildkiteProvider implements Provider.
108//
109// webhookSecret + webhookMode live on the provider rather than on
110// the HTTP server because the provider is the single owner of
111// "everything Buildkite-y": colocating the auth knob with the API
112// client and the state translator keeps configuration drift to one
113// place and makes the http.go side pure transport.
114//
115// defaultOrg is the Buildkite organisation the configured API token
116// belongs to. Workflows may opt into a different org via their YAML
117// `org` field; the API token then needs to be authorised against it.
118type buildkiteProvider struct {
119 br *broker
120 st *store
121 log *slog.Logger
122 client *buildkite.Client
123 defaultOrg string
124 webhookSecret string
125 webhookMode buildkite.WebhookMode
126}
127
128// Compile-time interface conformance check.
129var _ Provider = (*buildkiteProvider)(nil)
130
131// newBuildkiteProvider wires a provider to its Buildkite client and
132// to the broker it publishes pipeline.status records on. defaultOrg
133// is the org the API token authenticates against and the org used
134// when a workflow doesn't specify its own. webhookSecret/webhookMode
135// govern inbound /webhooks/buildkite request authentication.
136func newBuildkiteProvider(
137 br *broker,
138 st *store,
139 client *buildkite.Client,
140 defaultOrg string,
141 webhookSecret string,
142 webhookMode buildkite.WebhookMode,
143 log *slog.Logger,
144) *buildkiteProvider {
145 return &buildkiteProvider{
146 br: br,
147 st: st,
148 log: log.With("component", "provider", "kind", "buildkite"),
149 client: client,
150 defaultOrg: defaultOrg,
151 webhookSecret: webhookSecret,
152 webhookMode: webhookMode,
153 }
154}
155
156// VerifyWebhook authenticates an inbound webhook request using
157// whichever mode the provider was configured with. Returns nil on
158// success; the HTTP handler maps any returned error to 401.
159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error {
160 switch p.webhookMode {
161 case buildkite.WebhookModeSignature:
162 return buildkite.VerifySignature(
163 headers.Get("X-Buildkite-Signature"),
164 p.webhookSecret, body,
165 )
166 default:
167 // Token mode is the Buildkite default and our default, so
168 // any unrecognised value falls through to it rather than
169 // fail-closed at startup.
170 return buildkite.VerifyToken(
171 headers.Get("X-Buildkite-Token"),
172 p.webhookSecret,
173 )
174 }
175}
176
177// Spawn satisfies Provider. For each workflow it fires a separate
178// Buildkite build off the pipeline named in that workflow's YAML so
179// each workflow gets its own status timeline. The actual API call
180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we
181// still want Spawn to be non-blocking per the interface contract.
182//
183// On a successful create we persist the build UUID → (knot, rkey,
184// workflow) mapping and publish a "pending" pipeline.status so the
185// appview sees activity immediately, instead of waiting for the
186// first webhook to land.
187func (p *buildkiteProvider) Spawn(
188 ctx context.Context,
189 knot string,
190 pipelineRkey string,
191 trigger *tangled.Pipeline_TriggerMetadata,
192 workflows []*tangled.Pipeline_Workflow,
193) {
194 if len(workflows) == 0 {
195 p.log.Warn("pipeline has no workflows; nothing to spawn",
196 "knot", knot, "rkey", pipelineRkey,
197 )
198 return
199 }
200
201 for _, wf := range workflows {
202 if wf == nil || wf.Name == "" {
203 continue
204 }
205 wf := wf
206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf)
207 }
208}
209
210// spawnWorkflow does the per-workflow API + persistence work for
211// Spawn. Errors are logged with full context but not returned —
212// nothing in tack consumes the result, and a failed Spawn just
213// surfaces as the absence of any status update for the affected
214// workflow.
215func (p *buildkiteProvider) spawnWorkflow(
216 ctx context.Context,
217 knot string,
218 pipelineRkey string,
219 trigger *tangled.Pipeline_TriggerMetadata,
220 wf *tangled.Pipeline_Workflow,
221) {
222 logger := p.log.With(
223 "knot", knot,
224 "pipeline_rkey", pipelineRkey,
225 "workflow", wf.Name,
226 )
227
228 cfg, err := parseWorkflowConfig(wf.Raw)
229 if err != nil {
230 // Bad workflow YAML is a user-facing config error: log it
231 // loudly and skip. Firing a build off some default would
232 // be more confusing than doing nothing.
233 logger.Error("invalid workflow config; refusing to spawn", "err", err)
234 return
235 }
236 logger = logger.With("pipeline", cfg.Pipeline)
237
238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf)
239 if err != nil {
240 logger.Error("build create request", "err", err)
241 return
242 }
243
244 org := cfg.Org
245 if org == "" {
246 org = p.defaultOrg
247 }
248
249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req)
250 if err != nil {
251 logger.Error("create buildkite build", "err", err, "org", org)
252 return
253 }
254 logger.Info("buildkite build created",
255 "build_uuid", build.ID,
256 "build_number", build.Number,
257 "web_url", build.WebURL,
258 "org", org,
259 )
260
261 pipelineURI := pipelineATURI(knot, pipelineRkey)
262 // Persist the *resolved* org — the one we actually issued the
263 // CreateBuild against — rather than cfg.Org. If we stored only
264 // cfg.Org, a later change to the provider's defaultOrg would
265 // silently retarget historical lookups (logs, webhook joins) at
266 // the wrong organisation. Legacy rows written before this fix
267 // may still have an empty Org; the read path keeps the
268 // defaultOrg fallback for those (see Logs).
269 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{
270 BuildUUID: build.ID,
271 BuildNumber: build.Number,
272 PipelineSlug: cfg.Pipeline,
273 Org: org,
274 Knot: knot,
275 PipelineRkey: pipelineRkey,
276 Workflow: wf.Name,
277 PipelineURI: pipelineURI,
278 }); err != nil {
279 // Webhook handlers will fail to translate this build's
280 // events because they can't recover the tuple. Surface
281 // loudly and bail; we don't want a half-tracked build
282 // silently leaking status into the broker.
283 logger.Error("persist buildkite build mapping", "err", err,
284 "build_uuid", build.ID,
285 )
286 return
287 }
288
289 // Initial status publish so the appview shows the build as
290 // queued without waiting for the first webhook. This mirrors
291 // the upstream spindle's "schedule then run" cadence.
292 if err := p.publishStatus(
293 ctx, pipelineURI, wf.Name, "pending", build.ID,
294 nil, nil,
295 ); err != nil {
296 logger.Error("publish initial pending status", "err", err)
297 }
298}
299
300// buildCreateRequest folds the parsed workflow config and the
301// Tangled trigger metadata into a single Buildkite create-build
302// payload. Trigger metadata supplies commit/branch; the workflow
303// YAML supplies the Buildkite routing knobs (pipeline/org) and the
304// small handful of build options we expose.
305//
306// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled
307// refs frequently don't match arbitrary Buildkite pipeline branch
308// filters, and a build silently dropped at create time is a worse
309// failure mode than running one we shouldn't have. Users wanting
310// the filter back are expected to drop the filter on the Buildkite
311// pipeline itself.
312//
313// Returns an error when the trigger lacks a commit — Buildkite's
314// API requires one and we'd rather log+skip than fire a build that
315// resolves to "whatever main happens to be".
316func (p *buildkiteProvider) buildCreateRequest(
317 cfg *buildkiteConfig,
318 trigger *tangled.Pipeline_TriggerMetadata,
319 knot, pipelineRkey string,
320 wf *tangled.Pipeline_Workflow,
321) (buildkite.CreateBuildRequest, error) {
322 commit, branch := triggerCommitAndBranch(trigger)
323 if commit == "" {
324 return buildkite.CreateBuildRequest{}, errors.New(
325 "trigger has no commit",
326 )
327 }
328
329 cleanCheckout := false
330 if cfg.CleanCheckout != nil {
331 cleanCheckout = *cfg.CleanCheckout
332 }
333
334 req := buildkite.CreateBuildRequest{
335 Commit: commit,
336 Branch: branch,
337 Message: fmt.Sprintf("tangled: %s", wf.Name),
338 Env: envFromTuple(knot, pipelineRkey, wf),
339 MetaData: map[string]string{
340 bkMetaKnot: knot,
341 bkMetaPipelineRkey: pipelineRkey,
342 bkMetaWorkflow: wf.Name,
343 },
344 CleanCheckout: cleanCheckout,
345 IgnorePipelineBranchFilters: true,
346 }
347
348 // Auto-populate Buildkite's PR fields from the Tangled PR
349 // trigger when present. Buildkite doesn't get a PR number from
350 // us (Tangled doesn't surface one through the trigger), but
351 // the base branch alone is enough for `pull_request_base_branch`-
352 // gated step filters to work.
353 if trigger != nil && trigger.PullRequest != nil {
354 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch
355 }
356
357 return req, nil
358}
359
360// Logs satisfies Provider. We resolve the (knot, rkey, workflow)
361// tuple to a Buildkite build via the store, fetch the current jobs
362// list, then drain each job's plain-text log into the channel as one
363// LogLine per output line.
364//
365// Per-job control frames bracket each job so the appview's renderer
366// has start/end markers to lay out timing — same shape as the fake
367// provider and the upstream spindle.
368//
369// This is a snapshot read, not a tail — finished or in-progress, we
370// fetch what's there and close. Live tailing would require Buildkite
371// agent log streaming, which the public REST API doesn't expose; the
372// appview's repeated /logs calls during a running build give us
373// "good enough" liveness without that complexity.
374func (p *buildkiteProvider) Logs(
375 ctx context.Context,
376 knot string,
377 pipelineRkey string,
378 workflow string,
379) (<-chan LogLine, error) {
380 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow)
381 if err != nil {
382 return nil, fmt.Errorf("lookup build mapping: %w", err)
383 }
384 if ref == nil {
385 return nil, ErrLogsNotFound
386 }
387
388 // Resolve the org against which we should pull jobs/logs.
389 // Spawn now persists the *resolved* org used at create time, so
390 // for any row written by current code ref.Org is authoritative
391 // and we use it verbatim. The empty-string fallback to
392 // defaultOrg only exists for legacy rows on disk that predate
393 // persisting the resolved org; new rows should never hit it.
394 org := ref.Org
395 if org == "" {
396 org = p.defaultOrg
397 }
398
399 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber)
400 if err != nil {
401 if errors.Is(err, buildkite.ErrNotFound) {
402 return nil, ErrLogsNotFound
403 }
404 return nil, fmt.Errorf("get build: %w", err)
405 }
406
407 out := make(chan LogLine, 32)
408 go func() {
409 defer close(out)
410 stepID := 0
411 for _, job := range build.Jobs {
412 if job.Type != "" && job.Type != "script" {
413 // Skip non-script jobs (waiter, manual,
414 // trigger). They have no log to fetch and
415 // surfacing empty steps just clutters the
416 // appview.
417 continue
418 }
419 name := job.Name
420 if name == "" {
421 name = fmt.Sprintf("job %s", job.ID)
422 }
423
424 if !sendLine(ctx, out, LogLine{
425 Kind: LogKindControl,
426 Time: time.Now(),
427 Content: name,
428 StepId: stepID,
429 StepStatus: StepStatusStart,
430 }) {
431 return
432 }
433
434 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID)
435 if err != nil {
436 p.log.Debug("fetch job log",
437 "err", err,
438 "build_uuid", ref.BuildUUID,
439 "job_id", job.ID,
440 )
441 // Don't fail the whole stream on one job;
442 // emit the end frame and move on so the
443 // appview at least sees what other jobs
444 // produced.
445 body = ""
446 }
447
448 // Buildkite injects per-line timestamp metadata as
449 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and
450 // some renderers downstream don't recognise the APC
451 // envelope, leaking the inner "_bk;t=…" payload into
452 // the displayed text. Strip them here so consumers
453 // only ever see the actual log content.
454 body = stripTerminal(body)
455
456 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") {
457 if line == "" {
458 // Skip the leading empty entry that
459 // Split produces for empty bodies.
460 continue
461 }
462 if !sendLine(ctx, out, LogLine{
463 Kind: LogKindData,
464 Time: time.Now(),
465 Content: line + "\n",
466 StepId: stepID,
467 Stream: "stdout",
468 }) {
469 return
470 }
471 }
472
473 if !sendLine(ctx, out, LogLine{
474 Kind: LogKindControl,
475 Time: time.Now(),
476 Content: name,
477 StepId: stepID,
478 StepStatus: StepStatusEnd,
479 }) {
480 return
481 }
482 stepID++
483 }
484 }()
485 return out, nil
486}
487
488// publishStatus assembles a tangled.PipelineStatus record and pushes
489// it through the broker. buildUUID is mixed into the rkey so multiple
490// status events for the same workflow don't collide on the events
491// table's (rkey) uniqueness — and so an operator grepping the log
492// can find every record that pertains to a given Buildkite build.
493//
494// errMsg/exitCode are optional; pass nil for non-failure transitions.
495func (p *buildkiteProvider) publishStatus(
496 ctx context.Context,
497 pipelineURI, workflow, status, buildUUID string,
498 errMsg *string,
499 exitCode *int64,
500) error {
501 rec := tangled.PipelineStatus{
502 LexiconTypeID: tangled.PipelineStatusNSID,
503 Pipeline: pipelineURI,
504 Workflow: workflow,
505 Status: status,
506 CreatedAt: time.Now().UTC().Format(time.RFC3339),
507 Error: errMsg,
508 ExitCode: exitCode,
509 }
510 body, err := json.Marshal(rec)
511 if err != nil {
512 return fmt.Errorf("marshal pipeline.status: %w", err)
513 }
514 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano())
515 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
516 return fmt.Errorf("publish pipeline.status: %w", err)
517 }
518 return nil
519}
520
521// HandleWebhook applies a decoded Buildkite webhook payload: looks
522// the build up in the store, translates the Buildkite state into a
523// Tangled StatusKind, and publishes a pipeline.status record. Used
524// by the HTTP webhook handler so both the ingress logic and the
525// translation logic live next to each other.
526//
527// Returns nil for events we intentionally ignore (job.* events,
528// build.scheduled which we already publish locally on Spawn, builds
529// we don't have a mapping for) so the handler can 200 them — webhook
530// retries from Buildkite on a 4xx/5xx are noisy and not what we want
531// for "we just don't care about this event".
532func (p *buildkiteProvider) HandleWebhook(
533 ctx context.Context,
534 payload buildkite.WebhookPayload,
535) error {
536 // Only build.* events drive pipeline.status today. Everything
537 // else (job.*, agent.*, ping) is acknowledged silently.
538 if !strings.HasPrefix(payload.Event, "build.") {
539 return nil
540 }
541
542 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID)
543 if err != nil {
544 return fmt.Errorf("lookup build by uuid: %w", err)
545 }
546 if ref == nil {
547 // Cache miss. Two plausible causes:
548 //
549 // 1. Genuinely-foreign build: a Buildkite job triggered
550 // outside tack that just happens to share this
551 // webhook URL. Nothing to do.
552 // 2. Race: Spawn's goroutine fired CreateBuild but
553 // hasn't yet written the UUID→tuple row. A fast
554 // build.scheduled webhook can land in that window
555 // and would otherwise be dropped forever.
556 //
557 // We disambiguate using the Buildkite meta_data we set at
558 // CreateBuild time. If the tack:* keys are present the
559 // build is ours; we reconstruct the ref and opportunistically
560 // persist it so subsequent webhooks (and any Logs call)
561 // hit the cache rather than re-doing this work.
562 ref = refFromWebhook(payload)
563 if ref == nil {
564 p.log.Debug("webhook for unknown build; ignoring",
565 "event", payload.Event,
566 "build_uuid", payload.Build.ID,
567 )
568 return nil
569 }
570 // Opportunistic cache fill. Failure here is non-fatal:
571 // Spawn's authoritative insert will land shortly (or has
572 // already, in which case our INSERT … ON CONFLICT just
573 // refreshes the row). We continue with the reconstructed
574 // ref either way so a status publish isn't lost.
575 if err := p.st.InsertBuildkiteBuild(ctx, *ref); err != nil {
576 p.log.Warn("opportunistic persist of buildkite build mapping",
577 "err", err, "build_uuid", ref.BuildUUID,
578 )
579 }
580 p.log.Info("buildkite webhook reconstructed from meta_data",
581 "event", payload.Event,
582 "build_uuid", ref.BuildUUID,
583 "workflow", ref.Workflow,
584 )
585 }
586
587 status, ok := mapBuildkiteState(payload.Build.State)
588 if !ok {
589 // Unknown / transient state ("blocked", "skipped",
590 // "not_run", "waiting"…) — log so we can extend the map
591 // later, but don't error out the webhook.
592 p.log.Debug("unmapped buildkite state; ignoring",
593 "event", payload.Event,
594 "state", payload.Build.State,
595 "build_uuid", payload.Build.ID,
596 )
597 return nil
598 }
599
600 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
601 status, ref.BuildUUID, nil, nil); err != nil {
602 return fmt.Errorf("publish webhook status: %w", err)
603 }
604 p.log.Info("buildkite webhook → pipeline.status",
605 "event", payload.Event,
606 "state", payload.Build.State,
607 "status", status,
608 "build_uuid", payload.Build.ID,
609 "workflow", ref.Workflow,
610 )
611 return nil
612}
613
614// refFromWebhook reconstructs a BuildkiteBuildRef directly from a
615// webhook payload, using the tack:* meta_data we attach at Spawn
616// time as the source of truth for (knot, pipeline_rkey, workflow).
617// Org and pipeline slug come from the payload's organization and
618// embedded pipeline objects, both of which Buildkite populates on
619// every build.* event.
620//
621// Returns nil when the payload doesn't carry our meta_data: that's
622// the signal the build was triggered outside tack and we should
623// keep ignoring it. A partial set (one or two of the three keys)
624// also returns nil; we don't want to half-reconstruct a row.
625//
626// This exists so HandleWebhook can recover the tuple when the
627// CreateBuild→InsertBuildkiteBuild race drops a webhook on the
628// floor. The caller is expected to opportunistically persist the
629// returned ref so subsequent lookups hit the cache.
630func refFromWebhook(payload buildkite.WebhookPayload) *BuildkiteBuildRef {
631 md := payload.Build.MetaData
632 knot := md[bkMetaKnot]
633 rkey := md[bkMetaPipelineRkey]
634 wf := md[bkMetaWorkflow]
635 if knot == "" || rkey == "" || wf == "" {
636 return nil
637 }
638
639 // Pipeline slug lives on the build's embedded pipeline object.
640 // Decoding it as map[string]interface{} keeps the buildkite
641 // package's Build struct from sprouting fields we only ever
642 // touch on this fallback path.
643 pipelineSlug, _ := payload.Build.Pipeline["slug"].(string)
644
645 return &BuildkiteBuildRef{
646 BuildUUID: payload.Build.ID,
647 BuildNumber: payload.Build.Number,
648 PipelineSlug: pipelineSlug,
649 Org: payload.Organization.Slug,
650 Knot: knot,
651 PipelineRkey: rkey,
652 Workflow: wf,
653 PipelineURI: pipelineATURI(knot, rkey),
654 }
655}
656
657// mapBuildkiteState translates Buildkite's build state strings into
658// the Tangled spindle StatusKind enum. The mapping aligns with the
659// upstream constants (StatusKindRunning/Failed/Cancelled/Success);
660// states that don't have a direct analogue (blocked, skipped,
661// not_run) are reported as not-mapped so the caller can decide
662// whether to ignore them.
663func mapBuildkiteState(state string) (string, bool) {
664 switch state {
665 case "scheduled":
666 return "pending", true
667 case "running", "failing":
668 return "running", true
669 case "passed":
670 return "success", true
671 case "failed":
672 return "failed", true
673 case "canceled", "canceling":
674 return "cancelled", true
675 default:
676 return "", false
677 }
678}
679
680// envFromTuple builds the env block forwarded into the Buildkite
681// build. These are the only handle a user's Buildkite pipeline has
682// on the originating Tangled trigger: their pipeline.yml typically
683// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a
684// `pipeline upload` against a workflow-specific YAML file).
685//
686// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as
687// captured in the Tangled record. It can be empty if the workflow
688// definition omitted it; consumers should defend.
689func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string {
690 return map[string]string{
691 "TACK_KNOT": knot,
692 "TACK_PIPELINE_RKEY": pipelineRkey,
693 "TACK_WORKFLOW": wf.Name,
694 "TACK_WORKFLOW_RAW": wf.Raw,
695 }
696}
697
698// pipelineATURI returns the at-uri the appview joins pipeline.status
699// records back to their originating pipeline on. Format mirrors the
700// upstream spindle; the appview strips the `did:web:` prefix and
701// treats the remainder as the knot identifier.
702func pipelineATURI(knot, pipelineRkey string) string {
703 return fmt.Sprintf("at://did:web:%s/%s/%s",
704 knot, tangled.PipelineNSID, pipelineRkey,
705 )
706}
707
708// triggerCommitAndBranch extracts (commit, branch) from a Tangled
709// pipeline trigger, regardless of whether it was a push, a pull
710// request, or a manual run. Returns empty strings on a fully-empty
711// trigger so the caller can decide whether that's fatal.
712func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) {
713 if trigger == nil {
714 return "", ""
715 }
716 switch {
717 case trigger.Push != nil:
718 // For push events, NewSha is the commit being built and
719 // Ref is the full ref (e.g. "refs/heads/main") — strip
720 // the prefix so Buildkite's branch-aware features work.
721 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref)
722 case trigger.PullRequest != nil:
723 // PRs build the source commit on the source branch.
724 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch
725 default:
726 // Manual triggers and any future kinds: fall back to the
727 // repo default branch with no commit, which the caller
728 // will treat as fatal — manual triggers will need
729 // additional plumbing to pick a commit.
730 if trigger.Repo != nil {
731 return "", trigger.Repo.DefaultBranch
732 }
733 return "", ""
734 }
735}
736
737// refToBranch strips the conventional refs/heads/ prefix from a git
738// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are
739// returned as-is so downstream tooling can decide what to do with
740// them — Buildkite happily accepts either form in `branch`.
741func refToBranch(ref string) string {
742 const prefix = "refs/heads/"
743 if strings.HasPrefix(ref, prefix) {
744 return strings.TrimPrefix(ref, prefix)
745 }
746 return ref
747}
748
749// stripTerminal removes ANSI/ECMA-48 escape sequences from a log
750// payload, leaving only the displayable text. We need this because
751// Buildkite ships its plain-text log API with the agent's full
752// terminal output — per-line timestamp APC envelopes
753// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL
754// (`ESC [ K`), OSC title sets, etc. — and our consumers are not
755// terminal emulators; they render the bytes verbatim.
756//
757// We recognise the standard escape families described by ECMA-48:
758//
759// - CSI: ESC '[' parameters intermediates final
760// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\')
761// - everything else: ESC <single byte>
762//
763// As a safety belt we also strip the bare "_bk;t=<digits>" residue
764// that appears when an upstream processor has stripped the ESC/BEL
765// framing without understanding the APC envelope inside it.
766//
767// We should use libghostty for obvious reasons.
768func stripTerminal(s string) string {
769 if !strings.ContainsAny(s, "\x1b_") {
770 return s
771 }
772 var b strings.Builder
773 b.Grow(len(s))
774 for i := 0; i < len(s); {
775 if s[i] == 0x1b && i+1 < len(s) {
776 switch s[i+1] {
777 case '[':
778 // CSI: parameter bytes 0x30-0x3F, then
779 // intermediate bytes 0x20-0x2F, then a
780 // single final byte 0x40-0x7E. Anything
781 // that doesn't conform we drop minimally
782 // (just the ESC) so we don't swallow
783 // legitimate text.
784 j := i + 2
785 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F {
786 j++
787 }
788 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F {
789 j++
790 }
791 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E {
792 i = j + 1
793 continue
794 }
795 i += 2
796 continue
797 case ']', 'P', '_', 'X', '^':
798 // OSC/DCS/APC/SOS/PM: terminated by BEL or
799 // ST (ESC '\'). Drop the entire envelope.
800 j := i + 2
801 for j < len(s) {
802 if s[j] == 0x07 {
803 j++
804 break
805 }
806 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' {
807 j += 2
808 break
809 }
810 j++
811 }
812 i = j
813 continue
814 default:
815 // Two-byte escape (RIS, DECSC, charset
816 // selection, …). Drop both bytes.
817 i += 2
818 continue
819 }
820 }
821 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing.
822 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") {
823 j := i + len("_bk;t=")
824 for j < len(s) && s[j] >= '0' && s[j] <= '9' {
825 j++
826 }
827 if j > i+len("_bk;t=") {
828 i = j
829 continue
830 }
831 }
832 b.WriteByte(s[i])
833 i++
834 }
835 return b.String()
836}
837
838// sendLine pushes one LogLine into out, returning false if ctx
839// fired first. Centralised so the per-job loop in Logs stays
840// focused on the wire-shape decisions.
841func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool {
842 select {
843 case <-ctx.Done():
844 return false
845 case out <- line:
846 return true
847 }
848}