Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 29 kB View raw
1package main 2 3// buildkiteProvider implements Provider against a real Buildkite 4// account. Spawn translates a Tangled pipeline trigger into one 5// Buildkite build per workflow; status updates flow back asynchronously 6// through the /webhooks/buildkite handler (see http.go), which looks 7// the build UUID up in the buildkite_builds table to recover the 8// (knot, pipelineRkey, workflow) tuple this provider persisted at 9// Spawn time and publishes a sh.tangled.pipeline.status record on 10// the in-process broker. 11// 12// The Buildkite *pipeline slug* a workflow targets is carried inside 13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured 14// globally on the spindle. That keeps tack a thin translator: the 15// repo author decides which Buildkite pipeline runs each Tangled 16// workflow without an operator round-trip. See workflowConfig below 17// for the supported YAML schema. 18 19import ( 20 "context" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "log/slog" 25 "net/http" 26 "strings" 27 "time" 28 29 "go.yaml.in/yaml/v2" 30 "tangled.org/core/api/tangled" 31 32 "go.mitchellh.com/tack/internal/buildkite" 33) 34 35// Buildkite-side meta_data keys carrying the Tangled identity of a 36// build. Mirrored into env vars (see envFromTuple) so an operator's 37// Buildkite pipeline script can also reach them via $TACK_*. They 38// stay tightly namespaced so a coexisting Buildkite job that uses 39// meta_data for its own purposes won't collide. 40const ( 41 bkMetaKnot = "tack:knot" 42 bkMetaPipelineRkey = "tack:pipeline_rkey" 43 bkMetaWorkflow = "tack:workflow" 44) 45 46// workflowConfig is the tack-flavoured schema we expect inside each 47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline` 48// slug is required; everything else is optional. Fields nest under 49// `tack: { buildkite: ... }` so the workflow YAML can grow other 50// top-level keys (Tangled's own scheduling fields, future provider 51// blocks) without colliding with our namespace. 52// 53// Fields map onto the Buildkite REST "Create a build" request 54// properties documented at 55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build 56// (see also the comment block on buildkite.CreateBuildRequest). We 57// expose only the small subset users genuinely need to override — 58// trigger metadata supplies commit/branch, and tack supplies the 59// identity env+meta the webhook handler relies on, so there's no 60// reason to let users re-specify those. 61type workflowConfig struct { 62 Tack tackConfig `yaml:"tack"` 63} 64 65// tackConfig is the per-provider block under the top-level `tack:` 66// key. Right now the only nested provider is Buildkite. 67type tackConfig struct { 68 Buildkite buildkiteConfig `yaml:"buildkite"` 69} 70 71// buildkiteConfig is the Buildkite-specific subset of workflowConfig. 72// 73// `org` lets a workflow target a Buildkite organisation other than 74// the spindle's default — useful when one tack instance fronts 75// multiple orgs. The configured API token must have access to that 76// org or the build creation request will 401/403; we surface that 77// error verbatim rather than guessing. 78// 79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout 80// is a *bool so omitting it leaves Buildkite's own default in place 81// instead of always shipping `false`. 82type buildkiteConfig struct { 83 Pipeline string `yaml:"pipeline"` 84 Org string `yaml:"org"` 85 CleanCheckout *bool `yaml:"clean_checkout"` 86} 87 88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig. 89// An empty body is treated as a structural error so spawnWorkflow can 90// short-circuit cleanly: a workflow with no body has nothing for tack 91// to do anyway. 92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) { 93 if strings.TrimSpace(raw) == "" { 94 return nil, errors.New("workflow body is empty") 95 } 96 var cfg workflowConfig 97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil { 98 return nil, fmt.Errorf("parse workflow yaml: %w", err) 99 } 100 bk := cfg.Tack.Buildkite 101 if bk.Pipeline == "" { 102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required") 103 } 104 return &bk, nil 105} 106 107// buildkiteProvider implements Provider. 108// 109// webhookSecret + webhookMode live on the provider rather than on 110// the HTTP server because the provider is the single owner of 111// "everything Buildkite-y": colocating the auth knob with the API 112// client and the state translator keeps configuration drift to one 113// place and makes the http.go side pure transport. 114// 115// defaultOrg is the Buildkite organisation the configured API token 116// belongs to. Workflows may opt into a different org via their YAML 117// `org` field; the API token then needs to be authorised against it. 118type buildkiteProvider struct { 119 br *broker 120 st *store 121 log *slog.Logger 122 client *buildkite.Client 123 defaultOrg string 124 webhookSecret string 125 webhookMode buildkite.WebhookMode 126} 127 128// Compile-time interface conformance check. 129var _ Provider = (*buildkiteProvider)(nil) 130 131// newBuildkiteProvider wires a provider to its Buildkite client and 132// to the broker it publishes pipeline.status records on. defaultOrg 133// is the org the API token authenticates against and the org used 134// when a workflow doesn't specify its own. webhookSecret/webhookMode 135// govern inbound /webhooks/buildkite request authentication. 136func newBuildkiteProvider( 137 br *broker, 138 st *store, 139 client *buildkite.Client, 140 defaultOrg string, 141 webhookSecret string, 142 webhookMode buildkite.WebhookMode, 143 log *slog.Logger, 144) *buildkiteProvider { 145 return &buildkiteProvider{ 146 br: br, 147 st: st, 148 log: log.With("component", "provider", "kind", "buildkite"), 149 client: client, 150 defaultOrg: defaultOrg, 151 webhookSecret: webhookSecret, 152 webhookMode: webhookMode, 153 } 154} 155 156// VerifyWebhook authenticates an inbound webhook request using 157// whichever mode the provider was configured with. Returns nil on 158// success; the HTTP handler maps any returned error to 401. 159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error { 160 switch p.webhookMode { 161 case buildkite.WebhookModeSignature: 162 return buildkite.VerifySignature( 163 headers.Get("X-Buildkite-Signature"), 164 p.webhookSecret, body, 165 ) 166 default: 167 // Token mode is the Buildkite default and our default, so 168 // any unrecognised value falls through to it rather than 169 // fail-closed at startup. 170 return buildkite.VerifyToken( 171 headers.Get("X-Buildkite-Token"), 172 p.webhookSecret, 173 ) 174 } 175} 176 177// Spawn satisfies Provider. For each workflow it fires a separate 178// Buildkite build off the pipeline named in that workflow's YAML so 179// each workflow gets its own status timeline. The actual API call 180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we 181// still want Spawn to be non-blocking per the interface contract. 182// 183// On a successful create we persist the build UUID → (knot, rkey, 184// workflow) mapping and publish a "pending" pipeline.status so the 185// appview sees activity immediately, instead of waiting for the 186// first webhook to land. 187func (p *buildkiteProvider) Spawn( 188 ctx context.Context, 189 knot string, 190 pipelineRkey string, 191 actor string, 192 trigger *tangled.Pipeline_TriggerMetadata, 193 workflows []*tangled.Pipeline_Workflow, 194) { 195 if len(workflows) == 0 { 196 p.log.Warn("pipeline has no workflows; nothing to spawn", 197 "knot", knot, "rkey", pipelineRkey, 198 ) 199 return 200 } 201 202 for _, wf := range workflows { 203 if wf == nil || wf.Name == "" { 204 continue 205 } 206 wf := wf 207 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 208 } 209} 210 211// spawnWorkflow does the per-workflow API + persistence work for 212// Spawn. Errors are logged with full context but not returned — 213// nothing in tack consumes the result, and a failed Spawn just 214// surfaces as the absence of any status update for the affected 215// workflow. 216// 217// actor is the DID the knot consumer authorized to spawn this work 218// (the publishing repo owner). It's surfaced in the per-workflow 219// logger so operator-side audits can join Buildkite logs back to the 220// triggering Tangled identity even before the build itself is up. 221func (p *buildkiteProvider) spawnWorkflow( 222 ctx context.Context, 223 knot string, 224 pipelineRkey string, 225 actor string, 226 trigger *tangled.Pipeline_TriggerMetadata, 227 wf *tangled.Pipeline_Workflow, 228) { 229 logger := p.log.With( 230 "knot", knot, 231 "pipeline_rkey", pipelineRkey, 232 "workflow", wf.Name, 233 "actor", actor, 234 ) 235 236 cfg, err := parseWorkflowConfig(wf.Raw) 237 if err != nil { 238 // Bad workflow YAML is a user-facing config error: log it 239 // loudly and skip. Firing a build off some default would 240 // be more confusing than doing nothing. 241 logger.Error("invalid workflow config; refusing to spawn", "err", err) 242 return 243 } 244 logger = logger.With("pipeline", cfg.Pipeline) 245 246 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf) 247 if err != nil { 248 logger.Error("build create request", "err", err) 249 return 250 } 251 252 org := cfg.Org 253 if org == "" { 254 org = p.defaultOrg 255 } 256 257 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req) 258 if err != nil { 259 logger.Error("create buildkite build", "err", err, "org", org) 260 return 261 } 262 logger.Info("buildkite build created", 263 "build_uuid", build.ID, 264 "build_number", build.Number, 265 "web_url", build.WebURL, 266 "org", org, 267 ) 268 269 pipelineURI := pipelineATURI(knot, pipelineRkey) 270 // Persist the *resolved* org — the one we actually issued the 271 // CreateBuild against — rather than cfg.Org. If we stored only 272 // cfg.Org, a later change to the provider's defaultOrg would 273 // silently retarget historical lookups (logs, webhook joins) at 274 // the wrong organisation. Legacy rows written before this fix 275 // may still have an empty Org; the read path keeps the 276 // defaultOrg fallback for those (see Logs). 277 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{ 278 BuildUUID: build.ID, 279 BuildNumber: build.Number, 280 PipelineSlug: cfg.Pipeline, 281 Org: org, 282 Knot: knot, 283 PipelineRkey: pipelineRkey, 284 Workflow: wf.Name, 285 PipelineURI: pipelineURI, 286 }); err != nil { 287 // Webhook handlers will fail to translate this build's 288 // events because they can't recover the tuple. Surface 289 // loudly and bail; we don't want a half-tracked build 290 // silently leaking status into the broker. 291 logger.Error("persist buildkite build mapping", "err", err, 292 "build_uuid", build.ID, 293 ) 294 return 295 } 296 297 // Initial status publish so the appview shows the build as 298 // queued without waiting for the first webhook. This mirrors 299 // the upstream spindle's "schedule then run" cadence. 300 if err := p.publishStatus( 301 ctx, pipelineURI, wf.Name, "pending", build.ID, 302 nil, nil, 303 ); err != nil { 304 logger.Error("publish initial pending status", "err", err) 305 } 306} 307 308// buildCreateRequest folds the parsed workflow config and the 309// Tangled trigger metadata into a single Buildkite create-build 310// payload. Trigger metadata supplies commit/branch; the workflow 311// YAML supplies the Buildkite routing knobs (pipeline/org) and the 312// small handful of build options we expose. 313// 314// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled 315// refs frequently don't match arbitrary Buildkite pipeline branch 316// filters, and a build silently dropped at create time is a worse 317// failure mode than running one we shouldn't have. Users wanting 318// the filter back are expected to drop the filter on the Buildkite 319// pipeline itself. 320// 321// Returns an error when the trigger lacks a commit — Buildkite's 322// API requires one and we'd rather log+skip than fire a build that 323// resolves to "whatever main happens to be". 324func (p *buildkiteProvider) buildCreateRequest( 325 cfg *buildkiteConfig, 326 trigger *tangled.Pipeline_TriggerMetadata, 327 knot, pipelineRkey string, 328 wf *tangled.Pipeline_Workflow, 329) (buildkite.CreateBuildRequest, error) { 330 commit, branch := triggerCommitAndBranch(trigger) 331 if commit == "" { 332 return buildkite.CreateBuildRequest{}, errors.New( 333 "trigger has no commit", 334 ) 335 } 336 337 cleanCheckout := false 338 if cfg.CleanCheckout != nil { 339 cleanCheckout = *cfg.CleanCheckout 340 } 341 342 req := buildkite.CreateBuildRequest{ 343 Commit: commit, 344 Branch: branch, 345 Message: fmt.Sprintf("tangled: %s", wf.Name), 346 Env: envFromTuple(knot, pipelineRkey, wf), 347 MetaData: map[string]string{ 348 bkMetaKnot: knot, 349 bkMetaPipelineRkey: pipelineRkey, 350 bkMetaWorkflow: wf.Name, 351 }, 352 CleanCheckout: cleanCheckout, 353 IgnorePipelineBranchFilters: true, 354 } 355 356 // Auto-populate Buildkite's PR fields from the Tangled PR 357 // trigger when present. Buildkite doesn't get a PR number from 358 // us (Tangled doesn't surface one through the trigger), but 359 // the base branch alone is enough for `pull_request_base_branch`- 360 // gated step filters to work. 361 if trigger != nil && trigger.PullRequest != nil { 362 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch 363 } 364 365 return req, nil 366} 367 368// Logs satisfies Provider. We resolve the (knot, rkey, workflow) 369// tuple to a Buildkite build via the store, fetch the current jobs 370// list, then drain each job's plain-text log into the channel as one 371// LogLine per output line. 372// 373// Per-job control frames bracket each job so the appview's renderer 374// has start/end markers to lay out timing — same shape as the fake 375// provider and the upstream spindle. 376// 377// This is a snapshot read, not a tail — finished or in-progress, we 378// fetch what's there and close. Live tailing would require Buildkite 379// agent log streaming, which the public REST API doesn't expose; the 380// appview's repeated /logs calls during a running build give us 381// "good enough" liveness without that complexity. 382func (p *buildkiteProvider) Logs( 383 ctx context.Context, 384 knot string, 385 pipelineRkey string, 386 workflow string, 387) (<-chan LogLine, error) { 388 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow) 389 if err != nil { 390 return nil, fmt.Errorf("lookup build mapping: %w", err) 391 } 392 if ref == nil { 393 return nil, ErrLogsNotFound 394 } 395 396 // Resolve the org against which we should pull jobs/logs. 397 // Spawn now persists the *resolved* org used at create time, so 398 // for any row written by current code ref.Org is authoritative 399 // and we use it verbatim. The empty-string fallback to 400 // defaultOrg only exists for legacy rows on disk that predate 401 // persisting the resolved org; new rows should never hit it. 402 org := ref.Org 403 if org == "" { 404 org = p.defaultOrg 405 } 406 407 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber) 408 if err != nil { 409 if errors.Is(err, buildkite.ErrNotFound) { 410 return nil, ErrLogsNotFound 411 } 412 return nil, fmt.Errorf("get build: %w", err) 413 } 414 415 out := make(chan LogLine, 32) 416 go func() { 417 defer close(out) 418 stepID := 0 419 for _, job := range build.Jobs { 420 if job.Type != "" && job.Type != "script" { 421 // Skip non-script jobs (waiter, manual, 422 // trigger). They have no log to fetch and 423 // surfacing empty steps just clutters the 424 // appview. 425 continue 426 } 427 name := job.Name 428 if name == "" { 429 name = fmt.Sprintf("job %s", job.ID) 430 } 431 432 if !sendLine(ctx, out, LogLine{ 433 Kind: LogKindControl, 434 Time: time.Now(), 435 Content: name, 436 StepId: stepID, 437 StepStatus: StepStatusStart, 438 }) { 439 return 440 } 441 442 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID) 443 if err != nil { 444 p.log.Debug("fetch job log", 445 "err", err, 446 "build_uuid", ref.BuildUUID, 447 "job_id", job.ID, 448 ) 449 // Don't fail the whole stream on one job; 450 // emit the end frame and move on so the 451 // appview at least sees what other jobs 452 // produced. 453 body = "" 454 } 455 456 // Buildkite injects per-line timestamp metadata as 457 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and 458 // some renderers downstream don't recognise the APC 459 // envelope, leaking the inner "_bk;t=…" payload into 460 // the displayed text. Strip them here so consumers 461 // only ever see the actual log content. 462 body = stripTerminal(body) 463 464 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 465 if line == "" { 466 // Skip the leading empty entry that 467 // Split produces for empty bodies. 468 continue 469 } 470 if !sendLine(ctx, out, LogLine{ 471 Kind: LogKindData, 472 Time: time.Now(), 473 Content: line + "\n", 474 StepId: stepID, 475 Stream: "stdout", 476 }) { 477 return 478 } 479 } 480 481 if !sendLine(ctx, out, LogLine{ 482 Kind: LogKindControl, 483 Time: time.Now(), 484 Content: name, 485 StepId: stepID, 486 StepStatus: StepStatusEnd, 487 }) { 488 return 489 } 490 stepID++ 491 } 492 }() 493 return out, nil 494} 495 496// publishStatus assembles a tangled.PipelineStatus record and pushes 497// it through the broker. buildUUID is mixed into the rkey so multiple 498// status events for the same workflow don't collide on the events 499// table's (rkey) uniqueness — and so an operator grepping the log 500// can find every record that pertains to a given Buildkite build. 501// 502// errMsg/exitCode are optional; pass nil for non-failure transitions. 503func (p *buildkiteProvider) publishStatus( 504 ctx context.Context, 505 pipelineURI, workflow, status, buildUUID string, 506 errMsg *string, 507 exitCode *int64, 508) error { 509 rec := tangled.PipelineStatus{ 510 LexiconTypeID: tangled.PipelineStatusNSID, 511 Pipeline: pipelineURI, 512 Workflow: workflow, 513 Status: status, 514 CreatedAt: time.Now().UTC().Format(time.RFC3339), 515 Error: errMsg, 516 ExitCode: exitCode, 517 } 518 body, err := json.Marshal(rec) 519 if err != nil { 520 return fmt.Errorf("marshal pipeline.status: %w", err) 521 } 522 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano()) 523 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 524 return fmt.Errorf("publish pipeline.status: %w", err) 525 } 526 return nil 527} 528 529// HandleWebhook applies a decoded Buildkite webhook payload: looks 530// the build up in the store, translates the Buildkite state into a 531// Tangled StatusKind, and publishes a pipeline.status record. Used 532// by the HTTP webhook handler so both the ingress logic and the 533// translation logic live next to each other. 534// 535// Returns nil for events we intentionally ignore (job.* events, 536// build.scheduled which we already publish locally on Spawn, builds 537// we don't have a mapping for) so the handler can 200 them — webhook 538// retries from Buildkite on a 4xx/5xx are noisy and not what we want 539// for "we just don't care about this event". 540func (p *buildkiteProvider) HandleWebhook( 541 ctx context.Context, 542 payload buildkite.WebhookPayload, 543) error { 544 // Only build.* events drive pipeline.status today. Everything 545 // else (job.*, agent.*, ping) is acknowledged silently. 546 if !strings.HasPrefix(payload.Event, "build.") { 547 return nil 548 } 549 550 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID) 551 if err != nil { 552 return fmt.Errorf("lookup build by uuid: %w", err) 553 } 554 if ref == nil { 555 // Cache miss. Two plausible causes: 556 // 557 // 1. Genuinely-foreign build: a Buildkite job triggered 558 // outside tack that just happens to share this 559 // webhook URL. Nothing to do. 560 // 2. Race: Spawn's goroutine fired CreateBuild but 561 // hasn't yet written the UUID→tuple row. A fast 562 // build.scheduled webhook can land in that window 563 // and would otherwise be dropped forever. 564 // 565 // We disambiguate using the Buildkite meta_data we set at 566 // CreateBuild time. If the tack:* keys are present the 567 // build is ours; we reconstruct the ref and opportunistically 568 // persist it so subsequent webhooks (and any Logs call) 569 // hit the cache rather than re-doing this work. 570 ref = refFromWebhook(payload) 571 if ref == nil { 572 p.log.Debug("webhook for unknown build; ignoring", 573 "event", payload.Event, 574 "build_uuid", payload.Build.ID, 575 ) 576 return nil 577 } 578 // Opportunistic cache fill. Failure here is non-fatal: 579 // Spawn's authoritative insert will land shortly (or has 580 // already, in which case our INSERT … ON CONFLICT just 581 // refreshes the row). We continue with the reconstructed 582 // ref either way so a status publish isn't lost. 583 if err := p.st.InsertBuildkiteBuild(ctx, *ref); err != nil { 584 p.log.Warn("opportunistic persist of buildkite build mapping", 585 "err", err, "build_uuid", ref.BuildUUID, 586 ) 587 } 588 p.log.Info("buildkite webhook reconstructed from meta_data", 589 "event", payload.Event, 590 "build_uuid", ref.BuildUUID, 591 "workflow", ref.Workflow, 592 ) 593 } 594 595 status, ok := mapBuildkiteState(payload.Build.State) 596 if !ok { 597 // Unknown / transient state ("blocked", "skipped", 598 // "not_run", "waiting"…) — log so we can extend the map 599 // later, but don't error out the webhook. 600 p.log.Debug("unmapped buildkite state; ignoring", 601 "event", payload.Event, 602 "state", payload.Build.State, 603 "build_uuid", payload.Build.ID, 604 ) 605 return nil 606 } 607 608 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 609 status, ref.BuildUUID, nil, nil); err != nil { 610 return fmt.Errorf("publish webhook status: %w", err) 611 } 612 p.log.Info("buildkite webhook → pipeline.status", 613 "event", payload.Event, 614 "state", payload.Build.State, 615 "status", status, 616 "build_uuid", payload.Build.ID, 617 "workflow", ref.Workflow, 618 ) 619 return nil 620} 621 622// refFromWebhook reconstructs a BuildkiteBuildRef directly from a 623// webhook payload, using the tack:* meta_data we attach at Spawn 624// time as the source of truth for (knot, pipeline_rkey, workflow). 625// Org and pipeline slug come from the payload's organization and 626// embedded pipeline objects, both of which Buildkite populates on 627// every build.* event. 628// 629// Returns nil when the payload doesn't carry our meta_data: that's 630// the signal the build was triggered outside tack and we should 631// keep ignoring it. A partial set (one or two of the three keys) 632// also returns nil; we don't want to half-reconstruct a row. 633// 634// This exists so HandleWebhook can recover the tuple when the 635// CreateBuild→InsertBuildkiteBuild race drops a webhook on the 636// floor. The caller is expected to opportunistically persist the 637// returned ref so subsequent lookups hit the cache. 638func refFromWebhook(payload buildkite.WebhookPayload) *BuildkiteBuildRef { 639 md := payload.Build.MetaData 640 knot := md[bkMetaKnot] 641 rkey := md[bkMetaPipelineRkey] 642 wf := md[bkMetaWorkflow] 643 if knot == "" || rkey == "" || wf == "" { 644 return nil 645 } 646 647 // Pipeline slug lives on the build's embedded pipeline object. 648 // Decoding it as map[string]interface{} keeps the buildkite 649 // package's Build struct from sprouting fields we only ever 650 // touch on this fallback path. 651 pipelineSlug, _ := payload.Build.Pipeline["slug"].(string) 652 653 return &BuildkiteBuildRef{ 654 BuildUUID: payload.Build.ID, 655 BuildNumber: payload.Build.Number, 656 PipelineSlug: pipelineSlug, 657 Org: payload.Organization.Slug, 658 Knot: knot, 659 PipelineRkey: rkey, 660 Workflow: wf, 661 PipelineURI: pipelineATURI(knot, rkey), 662 } 663} 664 665// mapBuildkiteState translates Buildkite's build state strings into 666// the Tangled spindle StatusKind enum. The mapping aligns with the 667// upstream constants (StatusKindRunning/Failed/Cancelled/Success); 668// states that don't have a direct analogue (blocked, skipped, 669// not_run) are reported as not-mapped so the caller can decide 670// whether to ignore them. 671func mapBuildkiteState(state string) (string, bool) { 672 switch state { 673 case "scheduled": 674 return "pending", true 675 case "running", "failing": 676 return "running", true 677 case "passed": 678 return "success", true 679 case "failed": 680 return "failed", true 681 case "canceled", "canceling": 682 return "cancelled", true 683 default: 684 return "", false 685 } 686} 687 688// envFromTuple builds the env block forwarded into the Buildkite 689// build. These are the only handle a user's Buildkite pipeline has 690// on the originating Tangled trigger: their pipeline.yml typically 691// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a 692// `pipeline upload` against a workflow-specific YAML file). 693// 694// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as 695// captured in the Tangled record. It can be empty if the workflow 696// definition omitted it; consumers should defend. 697func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string { 698 return map[string]string{ 699 "TACK_KNOT": knot, 700 "TACK_PIPELINE_RKEY": pipelineRkey, 701 "TACK_WORKFLOW": wf.Name, 702 "TACK_WORKFLOW_RAW": wf.Raw, 703 } 704} 705 706// pipelineATURI returns the at-uri the appview joins pipeline.status 707// records back to their originating pipeline on. Format mirrors the 708// upstream spindle; the appview strips the `did:web:` prefix and 709// treats the remainder as the knot identifier. 710func pipelineATURI(knot, pipelineRkey string) string { 711 return fmt.Sprintf("at://did:web:%s/%s/%s", 712 knot, tangled.PipelineNSID, pipelineRkey, 713 ) 714} 715 716// triggerCommitAndBranch extracts (commit, branch) from a Tangled 717// pipeline trigger, regardless of whether it was a push, a pull 718// request, or a manual run. Returns empty strings on a fully-empty 719// trigger so the caller can decide whether that's fatal. 720func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) { 721 if trigger == nil { 722 return "", "" 723 } 724 switch { 725 case trigger.Push != nil: 726 // For push events, NewSha is the commit being built and 727 // Ref is the full ref (e.g. "refs/heads/main") — strip 728 // the prefix so Buildkite's branch-aware features work. 729 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref) 730 case trigger.PullRequest != nil: 731 // PRs build the source commit on the source branch. 732 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch 733 default: 734 // Manual triggers and any future kinds: fall back to the 735 // repo default branch with no commit, which the caller 736 // will treat as fatal — manual triggers will need 737 // additional plumbing to pick a commit. 738 if trigger.Repo != nil { 739 return "", trigger.Repo.DefaultBranch 740 } 741 return "", "" 742 } 743} 744 745// refToBranch strips the conventional refs/heads/ prefix from a git 746// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are 747// returned as-is so downstream tooling can decide what to do with 748// them — Buildkite happily accepts either form in `branch`. 749func refToBranch(ref string) string { 750 const prefix = "refs/heads/" 751 if strings.HasPrefix(ref, prefix) { 752 return strings.TrimPrefix(ref, prefix) 753 } 754 return ref 755} 756 757// stripTerminal removes ANSI/ECMA-48 escape sequences from a log 758// payload, leaving only the displayable text. We need this because 759// Buildkite ships its plain-text log API with the agent's full 760// terminal output — per-line timestamp APC envelopes 761// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL 762// (`ESC [ K`), OSC title sets, etc. — and our consumers are not 763// terminal emulators; they render the bytes verbatim. 764// 765// We recognise the standard escape families described by ECMA-48: 766// 767// - CSI: ESC '[' parameters intermediates final 768// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\') 769// - everything else: ESC <single byte> 770// 771// As a safety belt we also strip the bare "_bk;t=<digits>" residue 772// that appears when an upstream processor has stripped the ESC/BEL 773// framing without understanding the APC envelope inside it. 774// 775// We should use libghostty for obvious reasons. 776func stripTerminal(s string) string { 777 if !strings.ContainsAny(s, "\x1b_") { 778 return s 779 } 780 var b strings.Builder 781 b.Grow(len(s)) 782 for i := 0; i < len(s); { 783 if s[i] == 0x1b && i+1 < len(s) { 784 switch s[i+1] { 785 case '[': 786 // CSI: parameter bytes 0x30-0x3F, then 787 // intermediate bytes 0x20-0x2F, then a 788 // single final byte 0x40-0x7E. Anything 789 // that doesn't conform we drop minimally 790 // (just the ESC) so we don't swallow 791 // legitimate text. 792 j := i + 2 793 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F { 794 j++ 795 } 796 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F { 797 j++ 798 } 799 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E { 800 i = j + 1 801 continue 802 } 803 i += 2 804 continue 805 case ']', 'P', '_', 'X', '^': 806 // OSC/DCS/APC/SOS/PM: terminated by BEL or 807 // ST (ESC '\'). Drop the entire envelope. 808 j := i + 2 809 for j < len(s) { 810 if s[j] == 0x07 { 811 j++ 812 break 813 } 814 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' { 815 j += 2 816 break 817 } 818 j++ 819 } 820 i = j 821 continue 822 default: 823 // Two-byte escape (RIS, DECSC, charset 824 // selection, …). Drop both bytes. 825 i += 2 826 continue 827 } 828 } 829 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing. 830 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") { 831 j := i + len("_bk;t=") 832 for j < len(s) && s[j] >= '0' && s[j] <= '9' { 833 j++ 834 } 835 if j > i+len("_bk;t=") { 836 i = j 837 continue 838 } 839 } 840 b.WriteByte(s[i]) 841 i++ 842 } 843 return b.String() 844} 845 846// sendLine pushes one LogLine into out, returning false if ctx 847// fired first. Centralised so the per-job loop in Logs stays 848// focused on the wire-shape decisions. 849func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool { 850 select { 851 case <-ctx.Done(): 852 return false 853 case out <- line: 854 return true 855 } 856}