Stitch any CI into Tangled
1package main
2
3// buildkiteProvider implements Provider against a real Buildkite
4// account. Spawn translates a Tangled pipeline trigger into one
5// Buildkite build per workflow; status updates flow back asynchronously
6// through the /webhooks/buildkite handler (see http.go), which looks
7// the build UUID up in the buildkite_builds table to recover the
8// (knot, pipelineRkey, workflow) tuple this provider persisted at
9// Spawn time and publishes a sh.tangled.pipeline.status record on
10// the in-process broker.
11//
12// The Buildkite *pipeline slug* a workflow targets is carried inside
13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured
14// globally on the spindle. That keeps tack a thin translator: the
15// repo author decides which Buildkite pipeline runs each Tangled
16// workflow without an operator round-trip. See workflowConfig below
17// for the supported YAML schema.
18
19import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "log/slog"
25 "net/http"
26 "strings"
27 "time"
28
29 "go.yaml.in/yaml/v2"
30 "tangled.org/core/api/tangled"
31
32 "go.mitchellh.com/tack/internal/buildkite"
33)
34
35// Buildkite-side meta_data keys carrying the Tangled identity of a
36// build. Mirrored into env vars (see envFromTuple) so an operator's
37// Buildkite pipeline script can also reach them via $TACK_*. They
38// stay tightly namespaced so a coexisting Buildkite job that uses
39// meta_data for its own purposes won't collide.
40const (
41 bkMetaKnot = "tack:knot"
42 bkMetaPipelineRkey = "tack:pipeline_rkey"
43 bkMetaWorkflow = "tack:workflow"
44)
45
46// workflowConfig is the tack-flavoured schema we expect inside each
47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline`
48// slug is required; everything else is optional. Fields nest under
49// `tack: { buildkite: ... }` so the workflow YAML can grow other
50// top-level keys (Tangled's own scheduling fields, future provider
51// blocks) without colliding with our namespace.
52//
53// Fields map onto the Buildkite REST "Create a build" request
54// properties documented at
55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build
56// (see also the comment block on buildkite.CreateBuildRequest). We
57// expose only the small subset users genuinely need to override —
58// trigger metadata supplies commit/branch, and tack supplies the
59// identity env+meta the webhook handler relies on, so there's no
60// reason to let users re-specify those.
61type workflowConfig struct {
62 Tack tackConfig `yaml:"tack"`
63}
64
65// tackConfig is the per-provider block under the top-level `tack:`
66// key. Right now the only nested provider is Buildkite.
67type tackConfig struct {
68 Buildkite buildkiteConfig `yaml:"buildkite"`
69}
70
71// buildkiteConfig is the Buildkite-specific subset of workflowConfig.
72//
73// `org` lets a workflow target a Buildkite organisation other than
74// the spindle's default — useful when one tack instance fronts
75// multiple orgs. The configured API token must have access to that
76// org or the build creation request will 401/403; we surface that
77// error verbatim rather than guessing.
78//
79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout
80// is a *bool so omitting it leaves Buildkite's own default in place
81// instead of always shipping `false`.
82type buildkiteConfig struct {
83 Pipeline string `yaml:"pipeline"`
84 Org string `yaml:"org"`
85 CleanCheckout *bool `yaml:"clean_checkout"`
86}
87
88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig.
89// An empty body is treated as a structural error so spawnWorkflow can
90// short-circuit cleanly: a workflow with no body has nothing for tack
91// to do anyway.
92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) {
93 if strings.TrimSpace(raw) == "" {
94 return nil, errors.New("workflow body is empty")
95 }
96 var cfg workflowConfig
97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil {
98 return nil, fmt.Errorf("parse workflow yaml: %w", err)
99 }
100 bk := cfg.Tack.Buildkite
101 if bk.Pipeline == "" {
102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required")
103 }
104 return &bk, nil
105}
106
107// buildkiteProvider implements Provider.
108//
109// webhookSecret + webhookMode live on the provider rather than on
110// the HTTP server because the provider is the single owner of
111// "everything Buildkite-y": colocating the auth knob with the API
112// client and the state translator keeps configuration drift to one
113// place and makes the http.go side pure transport.
114//
115// defaultOrg is the Buildkite organisation the configured API token
116// belongs to. Workflows may opt into a different org via their YAML
117// `org` field; the API token then needs to be authorised against it.
118type buildkiteProvider struct {
119 br *broker
120 st *store
121 log *slog.Logger
122 client *buildkite.Client
123 defaultOrg string
124 webhookSecret string
125 webhookMode buildkite.WebhookMode
126}
127
128// Compile-time interface conformance check.
129var _ Provider = (*buildkiteProvider)(nil)
130
131// newBuildkiteProvider wires a provider to its Buildkite client and
132// to the broker it publishes pipeline.status records on. defaultOrg
133// is the org the API token authenticates against and the org used
134// when a workflow doesn't specify its own. webhookSecret/webhookMode
135// govern inbound /webhooks/buildkite request authentication.
136func newBuildkiteProvider(
137 br *broker,
138 st *store,
139 client *buildkite.Client,
140 defaultOrg string,
141 webhookSecret string,
142 webhookMode buildkite.WebhookMode,
143 log *slog.Logger,
144) *buildkiteProvider {
145 return &buildkiteProvider{
146 br: br,
147 st: st,
148 log: log.With("component", "provider", "kind", "buildkite"),
149 client: client,
150 defaultOrg: defaultOrg,
151 webhookSecret: webhookSecret,
152 webhookMode: webhookMode,
153 }
154}
155
156// VerifyWebhook authenticates an inbound webhook request using
157// whichever mode the provider was configured with. Returns nil on
158// success; the HTTP handler maps any returned error to 401.
159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error {
160 switch p.webhookMode {
161 case buildkite.WebhookModeSignature:
162 return buildkite.VerifySignature(
163 headers.Get("X-Buildkite-Signature"),
164 p.webhookSecret, body,
165 )
166 default:
167 // Token mode is the Buildkite default and our default, so
168 // any unrecognised value falls through to it rather than
169 // fail-closed at startup.
170 return buildkite.VerifyToken(
171 headers.Get("X-Buildkite-Token"),
172 p.webhookSecret,
173 )
174 }
175}
176
177// Spawn satisfies Provider. For each workflow it fires a separate
178// Buildkite build off the pipeline named in that workflow's YAML so
179// each workflow gets its own status timeline. The actual API call
180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we
181// still want Spawn to be non-blocking per the interface contract.
182//
183// On a successful create we persist the build UUID → (knot, rkey,
184// workflow) mapping and publish a "pending" pipeline.status so the
185// appview sees activity immediately, instead of waiting for the
186// first webhook to land.
187func (p *buildkiteProvider) Spawn(
188 ctx context.Context,
189 knot string,
190 pipelineRkey string,
191 actor string,
192 trigger *tangled.Pipeline_TriggerMetadata,
193 workflows []*tangled.Pipeline_Workflow,
194) {
195 if len(workflows) == 0 {
196 p.log.Warn("pipeline has no workflows; nothing to spawn",
197 "knot", knot, "rkey", pipelineRkey,
198 )
199 return
200 }
201
202 for _, wf := range workflows {
203 if wf == nil || wf.Name == "" {
204 continue
205 }
206 wf := wf
207 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
208 }
209}
210
211// spawnWorkflow does the per-workflow API + persistence work for
212// Spawn. Errors are logged with full context but not returned —
213// nothing in tack consumes the result, and a failed Spawn just
214// surfaces as the absence of any status update for the affected
215// workflow.
216//
217// actor is the DID the knot consumer authorized to spawn this work
218// (the publishing repo owner). It's surfaced in the per-workflow
219// logger so operator-side audits can join Buildkite logs back to the
220// triggering Tangled identity even before the build itself is up.
221func (p *buildkiteProvider) spawnWorkflow(
222 ctx context.Context,
223 knot string,
224 pipelineRkey string,
225 actor string,
226 trigger *tangled.Pipeline_TriggerMetadata,
227 wf *tangled.Pipeline_Workflow,
228) {
229 logger := p.log.With(
230 "knot", knot,
231 "pipeline_rkey", pipelineRkey,
232 "workflow", wf.Name,
233 "actor", actor,
234 )
235
236 cfg, err := parseWorkflowConfig(wf.Raw)
237 if err != nil {
238 // Bad workflow YAML is a user-facing config error: log it
239 // loudly and skip. Firing a build off some default would
240 // be more confusing than doing nothing.
241 logger.Error("invalid workflow config; refusing to spawn", "err", err)
242 return
243 }
244 logger = logger.With("pipeline", cfg.Pipeline)
245
246 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf)
247 if err != nil {
248 logger.Error("build create request", "err", err)
249 return
250 }
251
252 org := cfg.Org
253 if org == "" {
254 org = p.defaultOrg
255 }
256
257 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req)
258 if err != nil {
259 logger.Error("create buildkite build", "err", err, "org", org)
260 return
261 }
262 logger.Info("buildkite build created",
263 "build_uuid", build.ID,
264 "build_number", build.Number,
265 "web_url", build.WebURL,
266 "org", org,
267 )
268
269 pipelineURI := pipelineATURI(knot, pipelineRkey)
270 // Persist the *resolved* org — the one we actually issued the
271 // CreateBuild against — rather than cfg.Org. If we stored only
272 // cfg.Org, a later change to the provider's defaultOrg would
273 // silently retarget historical lookups (logs, webhook joins) at
274 // the wrong organisation. Legacy rows written before this fix
275 // may still have an empty Org; the read path keeps the
276 // defaultOrg fallback for those (see Logs).
277 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{
278 BuildUUID: build.ID,
279 BuildNumber: build.Number,
280 PipelineSlug: cfg.Pipeline,
281 Org: org,
282 Knot: knot,
283 PipelineRkey: pipelineRkey,
284 Workflow: wf.Name,
285 PipelineURI: pipelineURI,
286 }); err != nil {
287 // Webhook handlers will fail to translate this build's
288 // events because they can't recover the tuple. Surface
289 // loudly and bail; we don't want a half-tracked build
290 // silently leaking status into the broker.
291 logger.Error("persist buildkite build mapping", "err", err,
292 "build_uuid", build.ID,
293 )
294 return
295 }
296
297 // Initial status publish so the appview shows the build as
298 // queued without waiting for the first webhook. This mirrors
299 // the upstream spindle's "schedule then run" cadence.
300 if err := p.publishStatus(
301 ctx, pipelineURI, wf.Name, "pending", build.ID,
302 nil, nil,
303 ); err != nil {
304 logger.Error("publish initial pending status", "err", err)
305 }
306}
307
308// buildCreateRequest folds the parsed workflow config and the
309// Tangled trigger metadata into a single Buildkite create-build
310// payload. Trigger metadata supplies commit/branch; the workflow
311// YAML supplies the Buildkite routing knobs (pipeline/org) and the
312// small handful of build options we expose.
313//
314// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled
315// refs frequently don't match arbitrary Buildkite pipeline branch
316// filters, and a build silently dropped at create time is a worse
317// failure mode than running one we shouldn't have. Users wanting
318// the filter back are expected to drop the filter on the Buildkite
319// pipeline itself.
320//
321// Returns an error when the trigger lacks a commit — Buildkite's
322// API requires one and we'd rather log+skip than fire a build that
323// resolves to "whatever main happens to be".
324func (p *buildkiteProvider) buildCreateRequest(
325 cfg *buildkiteConfig,
326 trigger *tangled.Pipeline_TriggerMetadata,
327 knot, pipelineRkey string,
328 wf *tangled.Pipeline_Workflow,
329) (buildkite.CreateBuildRequest, error) {
330 commit, branch := triggerCommitAndBranch(trigger)
331 if commit == "" {
332 return buildkite.CreateBuildRequest{}, errors.New(
333 "trigger has no commit",
334 )
335 }
336
337 cleanCheckout := false
338 if cfg.CleanCheckout != nil {
339 cleanCheckout = *cfg.CleanCheckout
340 }
341
342 req := buildkite.CreateBuildRequest{
343 Commit: commit,
344 Branch: branch,
345 Message: fmt.Sprintf("tangled: %s", wf.Name),
346 Env: envFromTuple(knot, pipelineRkey, wf),
347 MetaData: map[string]string{
348 bkMetaKnot: knot,
349 bkMetaPipelineRkey: pipelineRkey,
350 bkMetaWorkflow: wf.Name,
351 },
352 CleanCheckout: cleanCheckout,
353 IgnorePipelineBranchFilters: true,
354 }
355
356 // Auto-populate Buildkite's PR fields from the Tangled PR
357 // trigger when present. Buildkite doesn't get a PR number from
358 // us (Tangled doesn't surface one through the trigger), but
359 // the base branch alone is enough for `pull_request_base_branch`-
360 // gated step filters to work.
361 if trigger != nil && trigger.PullRequest != nil {
362 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch
363 }
364
365 return req, nil
366}
367
368// Logs satisfies Provider. We resolve the (knot, rkey, workflow)
369// tuple to a Buildkite build via the store, fetch the current jobs
370// list, then drain each job's plain-text log into the channel as one
371// LogLine per output line.
372//
373// Per-job control frames bracket each job so the appview's renderer
374// has start/end markers to lay out timing — same shape as the fake
375// provider and the upstream spindle.
376//
377// This is a snapshot read, not a tail — finished or in-progress, we
378// fetch what's there and close. Live tailing would require Buildkite
379// agent log streaming, which the public REST API doesn't expose; the
380// appview's repeated /logs calls during a running build give us
381// "good enough" liveness without that complexity.
382func (p *buildkiteProvider) Logs(
383 ctx context.Context,
384 knot string,
385 pipelineRkey string,
386 workflow string,
387) (<-chan LogLine, error) {
388 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow)
389 if err != nil {
390 return nil, fmt.Errorf("lookup build mapping: %w", err)
391 }
392 if ref == nil {
393 return nil, ErrLogsNotFound
394 }
395
396 // Resolve the org against which we should pull jobs/logs.
397 // Spawn now persists the *resolved* org used at create time, so
398 // for any row written by current code ref.Org is authoritative
399 // and we use it verbatim. The empty-string fallback to
400 // defaultOrg only exists for legacy rows on disk that predate
401 // persisting the resolved org; new rows should never hit it.
402 org := ref.Org
403 if org == "" {
404 org = p.defaultOrg
405 }
406
407 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber)
408 if err != nil {
409 if errors.Is(err, buildkite.ErrNotFound) {
410 return nil, ErrLogsNotFound
411 }
412 return nil, fmt.Errorf("get build: %w", err)
413 }
414
415 out := make(chan LogLine, 32)
416 go func() {
417 defer close(out)
418 stepID := 0
419 for _, job := range build.Jobs {
420 if job.Type != "" && job.Type != "script" {
421 // Skip non-script jobs (waiter, manual,
422 // trigger). They have no log to fetch and
423 // surfacing empty steps just clutters the
424 // appview.
425 continue
426 }
427 name := job.Name
428 if name == "" {
429 name = fmt.Sprintf("job %s", job.ID)
430 }
431
432 if !sendLine(ctx, out, LogLine{
433 Kind: LogKindControl,
434 Time: time.Now(),
435 Content: name,
436 StepId: stepID,
437 StepStatus: StepStatusStart,
438 }) {
439 return
440 }
441
442 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID)
443 if err != nil {
444 p.log.Debug("fetch job log",
445 "err", err,
446 "build_uuid", ref.BuildUUID,
447 "job_id", job.ID,
448 )
449 // Don't fail the whole stream on one job;
450 // emit the end frame and move on so the
451 // appview at least sees what other jobs
452 // produced.
453 body = ""
454 }
455
456 // Buildkite injects per-line timestamp metadata as
457 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and
458 // some renderers downstream don't recognise the APC
459 // envelope, leaking the inner "_bk;t=…" payload into
460 // the displayed text. Strip them here so consumers
461 // only ever see the actual log content.
462 body = stripTerminal(body)
463
464 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") {
465 if line == "" {
466 // Skip the leading empty entry that
467 // Split produces for empty bodies.
468 continue
469 }
470 if !sendLine(ctx, out, LogLine{
471 Kind: LogKindData,
472 Time: time.Now(),
473 Content: line + "\n",
474 StepId: stepID,
475 Stream: "stdout",
476 }) {
477 return
478 }
479 }
480
481 if !sendLine(ctx, out, LogLine{
482 Kind: LogKindControl,
483 Time: time.Now(),
484 Content: name,
485 StepId: stepID,
486 StepStatus: StepStatusEnd,
487 }) {
488 return
489 }
490 stepID++
491 }
492 }()
493 return out, nil
494}
495
496// publishStatus assembles a tangled.PipelineStatus record and pushes
497// it through the broker. buildUUID is mixed into the rkey so multiple
498// status events for the same workflow don't collide on the events
499// table's (rkey) uniqueness — and so an operator grepping the log
500// can find every record that pertains to a given Buildkite build.
501//
502// errMsg/exitCode are optional; pass nil for non-failure transitions.
503func (p *buildkiteProvider) publishStatus(
504 ctx context.Context,
505 pipelineURI, workflow, status, buildUUID string,
506 errMsg *string,
507 exitCode *int64,
508) error {
509 rec := tangled.PipelineStatus{
510 LexiconTypeID: tangled.PipelineStatusNSID,
511 Pipeline: pipelineURI,
512 Workflow: workflow,
513 Status: status,
514 CreatedAt: time.Now().UTC().Format(time.RFC3339),
515 Error: errMsg,
516 ExitCode: exitCode,
517 }
518 body, err := json.Marshal(rec)
519 if err != nil {
520 return fmt.Errorf("marshal pipeline.status: %w", err)
521 }
522 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano())
523 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
524 return fmt.Errorf("publish pipeline.status: %w", err)
525 }
526 return nil
527}
528
529// HandleWebhook applies a decoded Buildkite webhook payload: looks
530// the build up in the store, translates the Buildkite state into a
531// Tangled StatusKind, and publishes a pipeline.status record. Used
532// by the HTTP webhook handler so both the ingress logic and the
533// translation logic live next to each other.
534//
535// Returns nil for events we intentionally ignore (job.* events,
536// build.scheduled which we already publish locally on Spawn, builds
537// we don't have a mapping for) so the handler can 200 them — webhook
538// retries from Buildkite on a 4xx/5xx are noisy and not what we want
539// for "we just don't care about this event".
540func (p *buildkiteProvider) HandleWebhook(
541 ctx context.Context,
542 payload buildkite.WebhookPayload,
543) error {
544 // Only build.* events drive pipeline.status today. Everything
545 // else (job.*, agent.*, ping) is acknowledged silently.
546 if !strings.HasPrefix(payload.Event, "build.") {
547 return nil
548 }
549
550 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID)
551 if err != nil {
552 return fmt.Errorf("lookup build by uuid: %w", err)
553 }
554 if ref == nil {
555 // Cache miss. Two plausible causes:
556 //
557 // 1. Genuinely-foreign build: a Buildkite job triggered
558 // outside tack that just happens to share this
559 // webhook URL. Nothing to do.
560 // 2. Race: Spawn's goroutine fired CreateBuild but
561 // hasn't yet written the UUID→tuple row. A fast
562 // build.scheduled webhook can land in that window
563 // and would otherwise be dropped forever.
564 //
565 // We disambiguate using the Buildkite meta_data we set at
566 // CreateBuild time. If the tack:* keys are present the
567 // build is ours; we reconstruct the ref and opportunistically
568 // persist it so subsequent webhooks (and any Logs call)
569 // hit the cache rather than re-doing this work.
570 ref = refFromWebhook(payload)
571 if ref == nil {
572 p.log.Debug("webhook for unknown build; ignoring",
573 "event", payload.Event,
574 "build_uuid", payload.Build.ID,
575 )
576 return nil
577 }
578 // Opportunistic cache fill. Failure here is non-fatal:
579 // Spawn's authoritative insert will land shortly (or has
580 // already, in which case our INSERT … ON CONFLICT just
581 // refreshes the row). We continue with the reconstructed
582 // ref either way so a status publish isn't lost.
583 if err := p.st.InsertBuildkiteBuild(ctx, *ref); err != nil {
584 p.log.Warn("opportunistic persist of buildkite build mapping",
585 "err", err, "build_uuid", ref.BuildUUID,
586 )
587 }
588 p.log.Info("buildkite webhook reconstructed from meta_data",
589 "event", payload.Event,
590 "build_uuid", ref.BuildUUID,
591 "workflow", ref.Workflow,
592 )
593 }
594
595 status, ok := mapBuildkiteState(payload.Build.State)
596 if !ok {
597 // Unknown / transient state ("blocked", "skipped",
598 // "not_run", "waiting"…) — log so we can extend the map
599 // later, but don't error out the webhook.
600 p.log.Debug("unmapped buildkite state; ignoring",
601 "event", payload.Event,
602 "state", payload.Build.State,
603 "build_uuid", payload.Build.ID,
604 )
605 return nil
606 }
607
608 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
609 status, ref.BuildUUID, nil, nil); err != nil {
610 return fmt.Errorf("publish webhook status: %w", err)
611 }
612 p.log.Info("buildkite webhook → pipeline.status",
613 "event", payload.Event,
614 "state", payload.Build.State,
615 "status", status,
616 "build_uuid", payload.Build.ID,
617 "workflow", ref.Workflow,
618 )
619 return nil
620}
621
622// refFromWebhook reconstructs a BuildkiteBuildRef directly from a
623// webhook payload, using the tack:* meta_data we attach at Spawn
624// time as the source of truth for (knot, pipeline_rkey, workflow).
625// Org and pipeline slug come from the payload's organization and
626// embedded pipeline objects, both of which Buildkite populates on
627// every build.* event.
628//
629// Returns nil when the payload doesn't carry our meta_data: that's
630// the signal the build was triggered outside tack and we should
631// keep ignoring it. A partial set (one or two of the three keys)
632// also returns nil; we don't want to half-reconstruct a row.
633//
634// This exists so HandleWebhook can recover the tuple when the
635// CreateBuild→InsertBuildkiteBuild race drops a webhook on the
636// floor. The caller is expected to opportunistically persist the
637// returned ref so subsequent lookups hit the cache.
638func refFromWebhook(payload buildkite.WebhookPayload) *BuildkiteBuildRef {
639 md := payload.Build.MetaData
640 knot := md[bkMetaKnot]
641 rkey := md[bkMetaPipelineRkey]
642 wf := md[bkMetaWorkflow]
643 if knot == "" || rkey == "" || wf == "" {
644 return nil
645 }
646
647 // Pipeline slug lives on the build's embedded pipeline object.
648 // Decoding it as map[string]interface{} keeps the buildkite
649 // package's Build struct from sprouting fields we only ever
650 // touch on this fallback path.
651 pipelineSlug, _ := payload.Build.Pipeline["slug"].(string)
652
653 return &BuildkiteBuildRef{
654 BuildUUID: payload.Build.ID,
655 BuildNumber: payload.Build.Number,
656 PipelineSlug: pipelineSlug,
657 Org: payload.Organization.Slug,
658 Knot: knot,
659 PipelineRkey: rkey,
660 Workflow: wf,
661 PipelineURI: pipelineATURI(knot, rkey),
662 }
663}
664
665// mapBuildkiteState translates Buildkite's build state strings into
666// the Tangled spindle StatusKind enum. The mapping aligns with the
667// upstream constants (StatusKindRunning/Failed/Cancelled/Success);
668// states that don't have a direct analogue (blocked, skipped,
669// not_run) are reported as not-mapped so the caller can decide
670// whether to ignore them.
671func mapBuildkiteState(state string) (string, bool) {
672 switch state {
673 case "scheduled":
674 return "pending", true
675 case "running", "failing":
676 return "running", true
677 case "passed":
678 return "success", true
679 case "failed":
680 return "failed", true
681 case "canceled", "canceling":
682 return "cancelled", true
683 default:
684 return "", false
685 }
686}
687
688// envFromTuple builds the env block forwarded into the Buildkite
689// build. These are the only handle a user's Buildkite pipeline has
690// on the originating Tangled trigger: their pipeline.yml typically
691// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a
692// `pipeline upload` against a workflow-specific YAML file).
693//
694// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as
695// captured in the Tangled record. It can be empty if the workflow
696// definition omitted it; consumers should defend.
697func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string {
698 return map[string]string{
699 "TACK_KNOT": knot,
700 "TACK_PIPELINE_RKEY": pipelineRkey,
701 "TACK_WORKFLOW": wf.Name,
702 "TACK_WORKFLOW_RAW": wf.Raw,
703 }
704}
705
706// pipelineATURI returns the at-uri the appview joins pipeline.status
707// records back to their originating pipeline on. Format mirrors the
708// upstream spindle; the appview strips the `did:web:` prefix and
709// treats the remainder as the knot identifier.
710func pipelineATURI(knot, pipelineRkey string) string {
711 return fmt.Sprintf("at://did:web:%s/%s/%s",
712 knot, tangled.PipelineNSID, pipelineRkey,
713 )
714}
715
716// triggerCommitAndBranch extracts (commit, branch) from a Tangled
717// pipeline trigger, regardless of whether it was a push, a pull
718// request, or a manual run. Returns empty strings on a fully-empty
719// trigger so the caller can decide whether that's fatal.
720func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) {
721 if trigger == nil {
722 return "", ""
723 }
724 switch {
725 case trigger.Push != nil:
726 // For push events, NewSha is the commit being built and
727 // Ref is the full ref (e.g. "refs/heads/main") — strip
728 // the prefix so Buildkite's branch-aware features work.
729 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref)
730 case trigger.PullRequest != nil:
731 // PRs build the source commit on the source branch.
732 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch
733 default:
734 // Manual triggers and any future kinds: fall back to the
735 // repo default branch with no commit, which the caller
736 // will treat as fatal — manual triggers will need
737 // additional plumbing to pick a commit.
738 if trigger.Repo != nil {
739 return "", trigger.Repo.DefaultBranch
740 }
741 return "", ""
742 }
743}
744
745// refToBranch strips the conventional refs/heads/ prefix from a git
746// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are
747// returned as-is so downstream tooling can decide what to do with
748// them — Buildkite happily accepts either form in `branch`.
749func refToBranch(ref string) string {
750 const prefix = "refs/heads/"
751 if strings.HasPrefix(ref, prefix) {
752 return strings.TrimPrefix(ref, prefix)
753 }
754 return ref
755}
756
757// stripTerminal removes ANSI/ECMA-48 escape sequences from a log
758// payload, leaving only the displayable text. We need this because
759// Buildkite ships its plain-text log API with the agent's full
760// terminal output — per-line timestamp APC envelopes
761// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL
762// (`ESC [ K`), OSC title sets, etc. — and our consumers are not
763// terminal emulators; they render the bytes verbatim.
764//
765// We recognise the standard escape families described by ECMA-48:
766//
767// - CSI: ESC '[' parameters intermediates final
768// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\')
769// - everything else: ESC <single byte>
770//
771// As a safety belt we also strip the bare "_bk;t=<digits>" residue
772// that appears when an upstream processor has stripped the ESC/BEL
773// framing without understanding the APC envelope inside it.
774//
775// We should use libghostty for obvious reasons.
776func stripTerminal(s string) string {
777 if !strings.ContainsAny(s, "\x1b_") {
778 return s
779 }
780 var b strings.Builder
781 b.Grow(len(s))
782 for i := 0; i < len(s); {
783 if s[i] == 0x1b && i+1 < len(s) {
784 switch s[i+1] {
785 case '[':
786 // CSI: parameter bytes 0x30-0x3F, then
787 // intermediate bytes 0x20-0x2F, then a
788 // single final byte 0x40-0x7E. Anything
789 // that doesn't conform we drop minimally
790 // (just the ESC) so we don't swallow
791 // legitimate text.
792 j := i + 2
793 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F {
794 j++
795 }
796 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F {
797 j++
798 }
799 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E {
800 i = j + 1
801 continue
802 }
803 i += 2
804 continue
805 case ']', 'P', '_', 'X', '^':
806 // OSC/DCS/APC/SOS/PM: terminated by BEL or
807 // ST (ESC '\'). Drop the entire envelope.
808 j := i + 2
809 for j < len(s) {
810 if s[j] == 0x07 {
811 j++
812 break
813 }
814 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' {
815 j += 2
816 break
817 }
818 j++
819 }
820 i = j
821 continue
822 default:
823 // Two-byte escape (RIS, DECSC, charset
824 // selection, …). Drop both bytes.
825 i += 2
826 continue
827 }
828 }
829 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing.
830 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") {
831 j := i + len("_bk;t=")
832 for j < len(s) && s[j] >= '0' && s[j] <= '9' {
833 j++
834 }
835 if j > i+len("_bk;t=") {
836 i = j
837 continue
838 }
839 }
840 b.WriteByte(s[i])
841 i++
842 }
843 return b.String()
844}
845
846// sendLine pushes one LogLine into out, returning false if ctx
847// fired first. Centralised so the per-job loop in Logs stays
848// focused on the wire-shape decisions.
849func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool {
850 select {
851 case <-ctx.Done():
852 return false
853 case out <- line:
854 return true
855 }
856}