Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// buildkiteProvider implements Provider against a real Buildkite 4// account. Spawn translates a Tangled pipeline trigger into one 5// Buildkite build per workflow; status updates flow back asynchronously 6// through the /webhooks/buildkite handler (see http.go), which looks 7// the build UUID up in the buildkite_builds table to recover the 8// (knot, pipelineRkey, workflow) tuple this provider persisted at 9// Spawn time and publishes a sh.tangled.pipeline.status record on 10// the in-process broker. 11// 12// The Buildkite *pipeline slug* a workflow targets is carried inside 13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured 14// globally on the spindle. That keeps tack a thin translator: the 15// repo author decides which Buildkite pipeline runs each Tangled 16// workflow without an operator round-trip. See workflowConfig below 17// for the supported YAML schema. 18 19import ( 20 "context" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "log/slog" 25 "net/http" 26 "strings" 27 "time" 28 29 "go.yaml.in/yaml/v2" 30 "tangled.org/core/api/tangled" 31 32 "go.mitchellh.com/tack/internal/buildkite" 33) 34 35// Buildkite-side meta_data keys carrying the Tangled identity of a 36// build. Mirrored into env vars (see envFromTuple) so an operator's 37// Buildkite pipeline script can also reach them via $TACK_*. They 38// stay tightly namespaced so a coexisting Buildkite job that uses 39// meta_data for its own purposes won't collide. 40const ( 41 bkMetaKnot = "tack:knot" 42 bkMetaPipelineRkey = "tack:pipeline_rkey" 43 bkMetaWorkflow = "tack:workflow" 44) 45 46// workflowConfig is the tack-flavoured schema we expect inside each 47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline` 48// slug is required; everything else is optional. Fields nest under 49// `tack: { buildkite: ... }` so the workflow YAML can grow other 50// top-level keys (Tangled's own scheduling fields, future provider 51// blocks) without colliding with our namespace. 52// 53// Fields map onto the Buildkite REST "Create a build" request 54// properties documented at 55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build 56// (see also the comment block on buildkite.CreateBuildRequest). We 57// expose only the small subset users genuinely need to override — 58// trigger metadata supplies commit/branch, and tack supplies the 59// identity env+meta the webhook handler relies on, so there's no 60// reason to let users re-specify those. 61type workflowConfig struct { 62 Tack tackConfig `yaml:"tack"` 63} 64 65// tackConfig is the per-provider block under the top-level `tack:` 66// key. Right now the only nested provider is Buildkite. 67type tackConfig struct { 68 Buildkite buildkiteConfig `yaml:"buildkite"` 69} 70 71// buildkiteConfig is the Buildkite-specific subset of workflowConfig. 72// 73// `org` lets a workflow target a Buildkite organisation other than 74// the spindle's default — useful when one tack instance fronts 75// multiple orgs. The configured API token must have access to that 76// org or the build creation request will 401/403; we surface that 77// error verbatim rather than guessing. 78// 79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout 80// is a *bool so omitting it leaves Buildkite's own default in place 81// instead of always shipping `false`. 82type buildkiteConfig struct { 83 Pipeline string `yaml:"pipeline"` 84 Org string `yaml:"org"` 85 CleanCheckout *bool `yaml:"clean_checkout"` 86} 87 88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig. 89// An empty body is treated as a structural error so spawnWorkflow can 90// short-circuit cleanly: a workflow with no body has nothing for tack 91// to do anyway. 92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) { 93 if strings.TrimSpace(raw) == "" { 94 return nil, errors.New("workflow body is empty") 95 } 96 var cfg workflowConfig 97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil { 98 return nil, fmt.Errorf("parse workflow yaml: %w", err) 99 } 100 bk := cfg.Tack.Buildkite 101 if bk.Pipeline == "" { 102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required") 103 } 104 return &bk, nil 105} 106 107// buildkiteProvider implements Provider. 108// 109// webhookSecret + webhookMode live on the provider rather than on 110// the HTTP server because the provider is the single owner of 111// "everything Buildkite-y": colocating the auth knob with the API 112// client and the state translator keeps configuration drift to one 113// place and makes the http.go side pure transport. 114// 115// defaultOrg is the Buildkite organisation the configured API token 116// belongs to. Workflows may opt into a different org via their YAML 117// `org` field; the API token then needs to be authorised against it. 118type buildkiteProvider struct { 119 br *broker 120 st *store 121 log *slog.Logger 122 client *buildkite.Client 123 defaultOrg string 124 webhookSecret string 125 webhookMode buildkite.WebhookMode 126} 127 128// Compile-time interface conformance check. 129var _ Provider = (*buildkiteProvider)(nil) 130 131// newBuildkiteProvider wires a provider to its Buildkite client and 132// to the broker it publishes pipeline.status records on. defaultOrg 133// is the org the API token authenticates against and the org used 134// when a workflow doesn't specify its own. webhookSecret/webhookMode 135// govern inbound /webhooks/buildkite request authentication. 136func newBuildkiteProvider( 137 br *broker, 138 st *store, 139 client *buildkite.Client, 140 defaultOrg string, 141 webhookSecret string, 142 webhookMode buildkite.WebhookMode, 143 log *slog.Logger, 144) *buildkiteProvider { 145 return &buildkiteProvider{ 146 br: br, 147 st: st, 148 log: log.With("component", "provider", "kind", "buildkite"), 149 client: client, 150 defaultOrg: defaultOrg, 151 webhookSecret: webhookSecret, 152 webhookMode: webhookMode, 153 } 154} 155 156// VerifyWebhook authenticates an inbound webhook request using 157// whichever mode the provider was configured with. Returns nil on 158// success; the HTTP handler maps any returned error to 401. 159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error { 160 switch p.webhookMode { 161 case buildkite.WebhookModeSignature: 162 return buildkite.VerifySignature( 163 headers.Get("X-Buildkite-Signature"), 164 p.webhookSecret, body, 165 ) 166 default: 167 // Token mode is the Buildkite default and our default, so 168 // any unrecognised value falls through to it rather than 169 // fail-closed at startup. 170 return buildkite.VerifyToken( 171 headers.Get("X-Buildkite-Token"), 172 p.webhookSecret, 173 ) 174 } 175} 176 177// Spawn satisfies Provider. For each workflow it fires a separate 178// Buildkite build off the pipeline named in that workflow's YAML so 179// each workflow gets its own status timeline. The actual API call 180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we 181// still want Spawn to be non-blocking per the interface contract. 182// 183// On a successful create we persist the build UUID → (knot, rkey, 184// workflow) mapping and publish a "pending" pipeline.status so the 185// appview sees activity immediately, instead of waiting for the 186// first webhook to land. 187func (p *buildkiteProvider) Spawn( 188 ctx context.Context, 189 knot string, 190 pipelineRkey string, 191 trigger *tangled.Pipeline_TriggerMetadata, 192 workflows []*tangled.Pipeline_Workflow, 193) { 194 if len(workflows) == 0 { 195 p.log.Warn("pipeline has no workflows; nothing to spawn", 196 "knot", knot, "rkey", pipelineRkey, 197 ) 198 return 199 } 200 201 for _, wf := range workflows { 202 if wf == nil || wf.Name == "" { 203 continue 204 } 205 wf := wf 206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf) 207 } 208} 209 210// spawnWorkflow does the per-workflow API + persistence work for 211// Spawn. Errors are logged with full context but not returned — 212// nothing in tack consumes the result, and a failed Spawn just 213// surfaces as the absence of any status update for the affected 214// workflow. 215func (p *buildkiteProvider) spawnWorkflow( 216 ctx context.Context, 217 knot string, 218 pipelineRkey string, 219 trigger *tangled.Pipeline_TriggerMetadata, 220 wf *tangled.Pipeline_Workflow, 221) { 222 logger := p.log.With( 223 "knot", knot, 224 "pipeline_rkey", pipelineRkey, 225 "workflow", wf.Name, 226 ) 227 228 cfg, err := parseWorkflowConfig(wf.Raw) 229 if err != nil { 230 // Bad workflow YAML is a user-facing config error: log it 231 // loudly and skip. Firing a build off some default would 232 // be more confusing than doing nothing. 233 logger.Error("invalid workflow config; refusing to spawn", "err", err) 234 return 235 } 236 logger = logger.With("pipeline", cfg.Pipeline) 237 238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf) 239 if err != nil { 240 logger.Error("build create request", "err", err) 241 return 242 } 243 244 org := cfg.Org 245 if org == "" { 246 org = p.defaultOrg 247 } 248 249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req) 250 if err != nil { 251 logger.Error("create buildkite build", "err", err, "org", org) 252 return 253 } 254 logger.Info("buildkite build created", 255 "build_uuid", build.ID, 256 "build_number", build.Number, 257 "web_url", build.WebURL, 258 "org", org, 259 ) 260 261 pipelineURI := pipelineATURI(knot, pipelineRkey) 262 // Persist the *resolved* org — the one we actually issued the 263 // CreateBuild against — rather than cfg.Org. If we stored only 264 // cfg.Org, a later change to the provider's defaultOrg would 265 // silently retarget historical lookups (logs, webhook joins) at 266 // the wrong organisation. Legacy rows written before this fix 267 // may still have an empty Org; the read path keeps the 268 // defaultOrg fallback for those (see Logs). 269 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{ 270 BuildUUID: build.ID, 271 BuildNumber: build.Number, 272 PipelineSlug: cfg.Pipeline, 273 Org: org, 274 Knot: knot, 275 PipelineRkey: pipelineRkey, 276 Workflow: wf.Name, 277 PipelineURI: pipelineURI, 278 }); err != nil { 279 // Webhook handlers will fail to translate this build's 280 // events because they can't recover the tuple. Surface 281 // loudly and bail; we don't want a half-tracked build 282 // silently leaking status into the broker. 283 logger.Error("persist buildkite build mapping", "err", err, 284 "build_uuid", build.ID, 285 ) 286 return 287 } 288 289 // Initial status publish so the appview shows the build as 290 // queued without waiting for the first webhook. This mirrors 291 // the upstream spindle's "schedule then run" cadence. 292 if err := p.publishStatus( 293 ctx, pipelineURI, wf.Name, "pending", build.ID, 294 nil, nil, 295 ); err != nil { 296 logger.Error("publish initial pending status", "err", err) 297 } 298} 299 300// buildCreateRequest folds the parsed workflow config and the 301// Tangled trigger metadata into a single Buildkite create-build 302// payload. Trigger metadata supplies commit/branch; the workflow 303// YAML supplies the Buildkite routing knobs (pipeline/org) and the 304// small handful of build options we expose. 305// 306// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled 307// refs frequently don't match arbitrary Buildkite pipeline branch 308// filters, and a build silently dropped at create time is a worse 309// failure mode than running one we shouldn't have. Users wanting 310// the filter back are expected to drop the filter on the Buildkite 311// pipeline itself. 312// 313// Returns an error when the trigger lacks a commit — Buildkite's 314// API requires one and we'd rather log+skip than fire a build that 315// resolves to "whatever main happens to be". 316func (p *buildkiteProvider) buildCreateRequest( 317 cfg *buildkiteConfig, 318 trigger *tangled.Pipeline_TriggerMetadata, 319 knot, pipelineRkey string, 320 wf *tangled.Pipeline_Workflow, 321) (buildkite.CreateBuildRequest, error) { 322 commit, branch := triggerCommitAndBranch(trigger) 323 if commit == "" { 324 return buildkite.CreateBuildRequest{}, errors.New( 325 "trigger has no commit", 326 ) 327 } 328 329 cleanCheckout := false 330 if cfg.CleanCheckout != nil { 331 cleanCheckout = *cfg.CleanCheckout 332 } 333 334 req := buildkite.CreateBuildRequest{ 335 Commit: commit, 336 Branch: branch, 337 Message: fmt.Sprintf("tangled: %s", wf.Name), 338 Env: envFromTuple(knot, pipelineRkey, wf), 339 MetaData: map[string]string{ 340 bkMetaKnot: knot, 341 bkMetaPipelineRkey: pipelineRkey, 342 bkMetaWorkflow: wf.Name, 343 }, 344 CleanCheckout: cleanCheckout, 345 IgnorePipelineBranchFilters: true, 346 } 347 348 // Auto-populate Buildkite's PR fields from the Tangled PR 349 // trigger when present. Buildkite doesn't get a PR number from 350 // us (Tangled doesn't surface one through the trigger), but 351 // the base branch alone is enough for `pull_request_base_branch`- 352 // gated step filters to work. 353 if trigger != nil && trigger.PullRequest != nil { 354 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch 355 } 356 357 return req, nil 358} 359 360// Logs satisfies Provider. We resolve the (knot, rkey, workflow) 361// tuple to a Buildkite build via the store, fetch the current jobs 362// list, then drain each job's plain-text log into the channel as one 363// LogLine per output line. 364// 365// Per-job control frames bracket each job so the appview's renderer 366// has start/end markers to lay out timing — same shape as the fake 367// provider and the upstream spindle. 368// 369// This is a snapshot read, not a tail — finished or in-progress, we 370// fetch what's there and close. Live tailing would require Buildkite 371// agent log streaming, which the public REST API doesn't expose; the 372// appview's repeated /logs calls during a running build give us 373// "good enough" liveness without that complexity. 374func (p *buildkiteProvider) Logs( 375 ctx context.Context, 376 knot string, 377 pipelineRkey string, 378 workflow string, 379) (<-chan LogLine, error) { 380 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow) 381 if err != nil { 382 return nil, fmt.Errorf("lookup build mapping: %w", err) 383 } 384 if ref == nil { 385 return nil, ErrLogsNotFound 386 } 387 388 // Resolve the org against which we should pull jobs/logs. 389 // Spawn now persists the *resolved* org used at create time, so 390 // for any row written by current code ref.Org is authoritative 391 // and we use it verbatim. The empty-string fallback to 392 // defaultOrg only exists for legacy rows on disk that predate 393 // persisting the resolved org; new rows should never hit it. 394 org := ref.Org 395 if org == "" { 396 org = p.defaultOrg 397 } 398 399 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber) 400 if err != nil { 401 if errors.Is(err, buildkite.ErrNotFound) { 402 return nil, ErrLogsNotFound 403 } 404 return nil, fmt.Errorf("get build: %w", err) 405 } 406 407 out := make(chan LogLine, 32) 408 go func() { 409 defer close(out) 410 stepID := 0 411 for _, job := range build.Jobs { 412 if job.Type != "" && job.Type != "script" { 413 // Skip non-script jobs (waiter, manual, 414 // trigger). They have no log to fetch and 415 // surfacing empty steps just clutters the 416 // appview. 417 continue 418 } 419 name := job.Name 420 if name == "" { 421 name = fmt.Sprintf("job %s", job.ID) 422 } 423 424 if !sendLine(ctx, out, LogLine{ 425 Kind: LogKindControl, 426 Time: time.Now(), 427 Content: name, 428 StepId: stepID, 429 StepStatus: StepStatusStart, 430 }) { 431 return 432 } 433 434 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID) 435 if err != nil { 436 p.log.Debug("fetch job log", 437 "err", err, 438 "build_uuid", ref.BuildUUID, 439 "job_id", job.ID, 440 ) 441 // Don't fail the whole stream on one job; 442 // emit the end frame and move on so the 443 // appview at least sees what other jobs 444 // produced. 445 body = "" 446 } 447 448 // Buildkite injects per-line timestamp metadata as 449 // ANSI APC sequences (ESC "_" "bk;t=<ms>" BEL) and 450 // some renderers downstream don't recognise the APC 451 // envelope, leaking the inner "_bk;t=…" payload into 452 // the displayed text. Strip them here so consumers 453 // only ever see the actual log content. 454 body = stripTerminal(body) 455 456 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 457 if line == "" { 458 // Skip the leading empty entry that 459 // Split produces for empty bodies. 460 continue 461 } 462 if !sendLine(ctx, out, LogLine{ 463 Kind: LogKindData, 464 Time: time.Now(), 465 Content: line + "\n", 466 StepId: stepID, 467 Stream: "stdout", 468 }) { 469 return 470 } 471 } 472 473 if !sendLine(ctx, out, LogLine{ 474 Kind: LogKindControl, 475 Time: time.Now(), 476 Content: name, 477 StepId: stepID, 478 StepStatus: StepStatusEnd, 479 }) { 480 return 481 } 482 stepID++ 483 } 484 }() 485 return out, nil 486} 487 488// publishStatus assembles a tangled.PipelineStatus record and pushes 489// it through the broker. buildUUID is mixed into the rkey so multiple 490// status events for the same workflow don't collide on the events 491// table's (rkey) uniqueness — and so an operator grepping the log 492// can find every record that pertains to a given Buildkite build. 493// 494// errMsg/exitCode are optional; pass nil for non-failure transitions. 495func (p *buildkiteProvider) publishStatus( 496 ctx context.Context, 497 pipelineURI, workflow, status, buildUUID string, 498 errMsg *string, 499 exitCode *int64, 500) error { 501 rec := tangled.PipelineStatus{ 502 LexiconTypeID: tangled.PipelineStatusNSID, 503 Pipeline: pipelineURI, 504 Workflow: workflow, 505 Status: status, 506 CreatedAt: time.Now().UTC().Format(time.RFC3339), 507 Error: errMsg, 508 ExitCode: exitCode, 509 } 510 body, err := json.Marshal(rec) 511 if err != nil { 512 return fmt.Errorf("marshal pipeline.status: %w", err) 513 } 514 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano()) 515 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 516 return fmt.Errorf("publish pipeline.status: %w", err) 517 } 518 return nil 519} 520 521// HandleWebhook applies a decoded Buildkite webhook payload: looks 522// the build up in the store, translates the Buildkite state into a 523// Tangled StatusKind, and publishes a pipeline.status record. Used 524// by the HTTP webhook handler so both the ingress logic and the 525// translation logic live next to each other. 526// 527// Returns nil for events we intentionally ignore (job.* events, 528// build.scheduled which we already publish locally on Spawn, builds 529// we don't have a mapping for) so the handler can 200 them — webhook 530// retries from Buildkite on a 4xx/5xx are noisy and not what we want 531// for "we just don't care about this event". 532func (p *buildkiteProvider) HandleWebhook( 533 ctx context.Context, 534 payload buildkite.WebhookPayload, 535) error { 536 // Only build.* events drive pipeline.status today. Everything 537 // else (job.*, agent.*, ping) is acknowledged silently. 538 if !strings.HasPrefix(payload.Event, "build.") { 539 return nil 540 } 541 542 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID) 543 if err != nil { 544 return fmt.Errorf("lookup build by uuid: %w", err) 545 } 546 if ref == nil { 547 // Cache miss. Two plausible causes: 548 // 549 // 1. Genuinely-foreign build: a Buildkite job triggered 550 // outside tack that just happens to share this 551 // webhook URL. Nothing to do. 552 // 2. Race: Spawn's goroutine fired CreateBuild but 553 // hasn't yet written the UUID→tuple row. A fast 554 // build.scheduled webhook can land in that window 555 // and would otherwise be dropped forever. 556 // 557 // We disambiguate using the Buildkite meta_data we set at 558 // CreateBuild time. If the tack:* keys are present the 559 // build is ours; we reconstruct the ref and opportunistically 560 // persist it so subsequent webhooks (and any Logs call) 561 // hit the cache rather than re-doing this work. 562 ref = refFromWebhook(payload) 563 if ref == nil { 564 p.log.Debug("webhook for unknown build; ignoring", 565 "event", payload.Event, 566 "build_uuid", payload.Build.ID, 567 ) 568 return nil 569 } 570 // Opportunistic cache fill. Failure here is non-fatal: 571 // Spawn's authoritative insert will land shortly (or has 572 // already, in which case our INSERT … ON CONFLICT just 573 // refreshes the row). We continue with the reconstructed 574 // ref either way so a status publish isn't lost. 575 if err := p.st.InsertBuildkiteBuild(ctx, *ref); err != nil { 576 p.log.Warn("opportunistic persist of buildkite build mapping", 577 "err", err, "build_uuid", ref.BuildUUID, 578 ) 579 } 580 p.log.Info("buildkite webhook reconstructed from meta_data", 581 "event", payload.Event, 582 "build_uuid", ref.BuildUUID, 583 "workflow", ref.Workflow, 584 ) 585 } 586 587 status, ok := mapBuildkiteState(payload.Build.State) 588 if !ok { 589 // Unknown / transient state ("blocked", "skipped", 590 // "not_run", "waiting"…) — log so we can extend the map 591 // later, but don't error out the webhook. 592 p.log.Debug("unmapped buildkite state; ignoring", 593 "event", payload.Event, 594 "state", payload.Build.State, 595 "build_uuid", payload.Build.ID, 596 ) 597 return nil 598 } 599 600 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 601 status, ref.BuildUUID, nil, nil); err != nil { 602 return fmt.Errorf("publish webhook status: %w", err) 603 } 604 p.log.Info("buildkite webhook → pipeline.status", 605 "event", payload.Event, 606 "state", payload.Build.State, 607 "status", status, 608 "build_uuid", payload.Build.ID, 609 "workflow", ref.Workflow, 610 ) 611 return nil 612} 613 614// refFromWebhook reconstructs a BuildkiteBuildRef directly from a 615// webhook payload, using the tack:* meta_data we attach at Spawn 616// time as the source of truth for (knot, pipeline_rkey, workflow). 617// Org and pipeline slug come from the payload's organization and 618// embedded pipeline objects, both of which Buildkite populates on 619// every build.* event. 620// 621// Returns nil when the payload doesn't carry our meta_data: that's 622// the signal the build was triggered outside tack and we should 623// keep ignoring it. A partial set (one or two of the three keys) 624// also returns nil; we don't want to half-reconstruct a row. 625// 626// This exists so HandleWebhook can recover the tuple when the 627// CreateBuild→InsertBuildkiteBuild race drops a webhook on the 628// floor. The caller is expected to opportunistically persist the 629// returned ref so subsequent lookups hit the cache. 630func refFromWebhook(payload buildkite.WebhookPayload) *BuildkiteBuildRef { 631 md := payload.Build.MetaData 632 knot := md[bkMetaKnot] 633 rkey := md[bkMetaPipelineRkey] 634 wf := md[bkMetaWorkflow] 635 if knot == "" || rkey == "" || wf == "" { 636 return nil 637 } 638 639 // Pipeline slug lives on the build's embedded pipeline object. 640 // Decoding it as map[string]interface{} keeps the buildkite 641 // package's Build struct from sprouting fields we only ever 642 // touch on this fallback path. 643 pipelineSlug, _ := payload.Build.Pipeline["slug"].(string) 644 645 return &BuildkiteBuildRef{ 646 BuildUUID: payload.Build.ID, 647 BuildNumber: payload.Build.Number, 648 PipelineSlug: pipelineSlug, 649 Org: payload.Organization.Slug, 650 Knot: knot, 651 PipelineRkey: rkey, 652 Workflow: wf, 653 PipelineURI: pipelineATURI(knot, rkey), 654 } 655} 656 657// mapBuildkiteState translates Buildkite's build state strings into 658// the Tangled spindle StatusKind enum. The mapping aligns with the 659// upstream constants (StatusKindRunning/Failed/Cancelled/Success); 660// states that don't have a direct analogue (blocked, skipped, 661// not_run) are reported as not-mapped so the caller can decide 662// whether to ignore them. 663func mapBuildkiteState(state string) (string, bool) { 664 switch state { 665 case "scheduled": 666 return "pending", true 667 case "running", "failing": 668 return "running", true 669 case "passed": 670 return "success", true 671 case "failed": 672 return "failed", true 673 case "canceled", "canceling": 674 return "cancelled", true 675 default: 676 return "", false 677 } 678} 679 680// envFromTuple builds the env block forwarded into the Buildkite 681// build. These are the only handle a user's Buildkite pipeline has 682// on the originating Tangled trigger: their pipeline.yml typically 683// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a 684// `pipeline upload` against a workflow-specific YAML file). 685// 686// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as 687// captured in the Tangled record. It can be empty if the workflow 688// definition omitted it; consumers should defend. 689func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string { 690 return map[string]string{ 691 "TACK_KNOT": knot, 692 "TACK_PIPELINE_RKEY": pipelineRkey, 693 "TACK_WORKFLOW": wf.Name, 694 "TACK_WORKFLOW_RAW": wf.Raw, 695 } 696} 697 698// pipelineATURI returns the at-uri the appview joins pipeline.status 699// records back to their originating pipeline on. Format mirrors the 700// upstream spindle; the appview strips the `did:web:` prefix and 701// treats the remainder as the knot identifier. 702func pipelineATURI(knot, pipelineRkey string) string { 703 return fmt.Sprintf("at://did:web:%s/%s/%s", 704 knot, tangled.PipelineNSID, pipelineRkey, 705 ) 706} 707 708// triggerCommitAndBranch extracts (commit, branch) from a Tangled 709// pipeline trigger, regardless of whether it was a push, a pull 710// request, or a manual run. Returns empty strings on a fully-empty 711// trigger so the caller can decide whether that's fatal. 712func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) { 713 if trigger == nil { 714 return "", "" 715 } 716 switch { 717 case trigger.Push != nil: 718 // For push events, NewSha is the commit being built and 719 // Ref is the full ref (e.g. "refs/heads/main") — strip 720 // the prefix so Buildkite's branch-aware features work. 721 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref) 722 case trigger.PullRequest != nil: 723 // PRs build the source commit on the source branch. 724 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch 725 default: 726 // Manual triggers and any future kinds: fall back to the 727 // repo default branch with no commit, which the caller 728 // will treat as fatal — manual triggers will need 729 // additional plumbing to pick a commit. 730 if trigger.Repo != nil { 731 return "", trigger.Repo.DefaultBranch 732 } 733 return "", "" 734 } 735} 736 737// refToBranch strips the conventional refs/heads/ prefix from a git 738// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are 739// returned as-is so downstream tooling can decide what to do with 740// them — Buildkite happily accepts either form in `branch`. 741func refToBranch(ref string) string { 742 const prefix = "refs/heads/" 743 if strings.HasPrefix(ref, prefix) { 744 return strings.TrimPrefix(ref, prefix) 745 } 746 return ref 747} 748 749// stripTerminal removes ANSI/ECMA-48 escape sequences from a log 750// payload, leaving only the displayable text. We need this because 751// Buildkite ships its plain-text log API with the agent's full 752// terminal output — per-line timestamp APC envelopes 753// (`ESC _ "bk;t=<unix-ms>" BEL`), CSI colour codes, clear-to-EOL 754// (`ESC [ K`), OSC title sets, etc. — and our consumers are not 755// terminal emulators; they render the bytes verbatim. 756// 757// We recognise the standard escape families described by ECMA-48: 758// 759// - CSI: ESC '[' parameters intermediates final 760// - OSC/DCS/APC/SOS/PM: ESC (']'|'P'|'_'|'X'|'^') … (BEL | ESC '\') 761// - everything else: ESC <single byte> 762// 763// As a safety belt we also strip the bare "_bk;t=<digits>" residue 764// that appears when an upstream processor has stripped the ESC/BEL 765// framing without understanding the APC envelope inside it. 766// 767// We should use libghostty for obvious reasons. 768func stripTerminal(s string) string { 769 if !strings.ContainsAny(s, "\x1b_") { 770 return s 771 } 772 var b strings.Builder 773 b.Grow(len(s)) 774 for i := 0; i < len(s); { 775 if s[i] == 0x1b && i+1 < len(s) { 776 switch s[i+1] { 777 case '[': 778 // CSI: parameter bytes 0x30-0x3F, then 779 // intermediate bytes 0x20-0x2F, then a 780 // single final byte 0x40-0x7E. Anything 781 // that doesn't conform we drop minimally 782 // (just the ESC) so we don't swallow 783 // legitimate text. 784 j := i + 2 785 for j < len(s) && s[j] >= 0x30 && s[j] <= 0x3F { 786 j++ 787 } 788 for j < len(s) && s[j] >= 0x20 && s[j] <= 0x2F { 789 j++ 790 } 791 if j < len(s) && s[j] >= 0x40 && s[j] <= 0x7E { 792 i = j + 1 793 continue 794 } 795 i += 2 796 continue 797 case ']', 'P', '_', 'X', '^': 798 // OSC/DCS/APC/SOS/PM: terminated by BEL or 799 // ST (ESC '\'). Drop the entire envelope. 800 j := i + 2 801 for j < len(s) { 802 if s[j] == 0x07 { 803 j++ 804 break 805 } 806 if s[j] == 0x1b && j+1 < len(s) && s[j+1] == '\\' { 807 j += 2 808 break 809 } 810 j++ 811 } 812 i = j 813 continue 814 default: 815 // Two-byte escape (RIS, DECSC, charset 816 // selection, …). Drop both bytes. 817 i += 2 818 continue 819 } 820 } 821 // Bare residue: "_bk;t=<digits>" with no ESC/BEL framing. 822 if s[i] == '_' && strings.HasPrefix(s[i:], "_bk;t=") { 823 j := i + len("_bk;t=") 824 for j < len(s) && s[j] >= '0' && s[j] <= '9' { 825 j++ 826 } 827 if j > i+len("_bk;t=") { 828 i = j 829 continue 830 } 831 } 832 b.WriteByte(s[i]) 833 i++ 834 } 835 return b.String() 836} 837 838// sendLine pushes one LogLine into out, returning false if ctx 839// fired first. Centralised so the per-job loop in Logs stays 840// focused on the wire-shape decisions. 841func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool { 842 select { 843 case <-ctx.Done(): 844 return false 845 case out <- line: 846 return true 847 } 848}