Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// buildkiteProvider implements Provider against a real Buildkite 4// account. Spawn translates a Tangled pipeline trigger into one 5// Buildkite build per workflow; status updates flow back asynchronously 6// through the /webhooks/buildkite handler (see http.go), which looks 7// the build UUID up in the buildkite_builds table to recover the 8// (knot, pipelineRkey, workflow) tuple this provider persisted at 9// Spawn time and publishes a sh.tangled.pipeline.status record on 10// the in-process broker. 11// 12// The Buildkite *pipeline slug* a workflow targets is carried inside 13// the workflow's YAML body (Pipeline_Workflow.Raw), not configured 14// globally on the spindle. That keeps tack a thin translator: the 15// repo author decides which Buildkite pipeline runs each Tangled 16// workflow without an operator round-trip. See workflowConfig below 17// for the supported YAML schema. 18 19import ( 20 "context" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "log/slog" 25 "net/http" 26 "strings" 27 "time" 28 29 "go.yaml.in/yaml/v2" 30 "tangled.org/core/api/tangled" 31 32 "go.mitchellh.com/tack/internal/buildkite" 33) 34 35// Buildkite-side meta_data keys carrying the Tangled identity of a 36// build. Mirrored into env vars (see envFromTuple) so an operator's 37// Buildkite pipeline script can also reach them via $TACK_*. They 38// stay tightly namespaced so a coexisting Buildkite job that uses 39// meta_data for its own purposes won't collide. 40const ( 41 bkMetaKnot = "tack:knot" 42 bkMetaPipelineRkey = "tack:pipeline_rkey" 43 bkMetaWorkflow = "tack:workflow" 44) 45 46// workflowConfig is the tack-flavoured schema we expect inside each 47// Tangled workflow's Raw YAML body. Only the Buildkite `pipeline` 48// slug is required; everything else is optional. Fields nest under 49// `tack: { buildkite: ... }` so the workflow YAML can grow other 50// top-level keys (Tangled's own scheduling fields, future provider 51// blocks) without colliding with our namespace. 52// 53// Fields map onto the Buildkite REST "Create a build" request 54// properties documented at 55// https://buildkite.com/docs/apis/rest-api/builds#create-a-build 56// (see also the comment block on buildkite.CreateBuildRequest). We 57// expose only the small subset users genuinely need to override — 58// trigger metadata supplies commit/branch, and tack supplies the 59// identity env+meta the webhook handler relies on, so there's no 60// reason to let users re-specify those. 61type workflowConfig struct { 62 Tack tackConfig `yaml:"tack"` 63} 64 65// tackConfig is the per-provider block under the top-level `tack:` 66// key. Right now the only nested provider is Buildkite. 67type tackConfig struct { 68 Buildkite buildkiteConfig `yaml:"buildkite"` 69} 70 71// buildkiteConfig is the Buildkite-specific subset of workflowConfig. 72// 73// `org` lets a workflow target a Buildkite organisation other than 74// the spindle's default — useful when one tack instance fronts 75// multiple orgs. The configured API token must have access to that 76// org or the build creation request will 401/403; we surface that 77// error verbatim rather than guessing. 78// 79// `clean_checkout` is forwarded verbatim to Buildkite. CleanCheckout 80// is a *bool so omitting it leaves Buildkite's own default in place 81// instead of always shipping `false`. 82type buildkiteConfig struct { 83 Pipeline string `yaml:"pipeline"` 84 Org string `yaml:"org"` 85 CleanCheckout *bool `yaml:"clean_checkout"` 86} 87 88// parseWorkflowConfig decodes a workflow YAML body into workflowConfig. 89// An empty body is treated as a structural error so spawnWorkflow can 90// short-circuit cleanly: a workflow with no body has nothing for tack 91// to do anyway. 92func parseWorkflowConfig(raw string) (*buildkiteConfig, error) { 93 if strings.TrimSpace(raw) == "" { 94 return nil, errors.New("workflow body is empty") 95 } 96 var cfg workflowConfig 97 if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil { 98 return nil, fmt.Errorf("parse workflow yaml: %w", err) 99 } 100 bk := cfg.Tack.Buildkite 101 if bk.Pipeline == "" { 102 return nil, errors.New("workflow yaml: `tack.buildkite.pipeline` is required") 103 } 104 return &bk, nil 105} 106 107// buildkiteProvider implements Provider. 108// 109// webhookSecret + webhookMode live on the provider rather than on 110// the HTTP server because the provider is the single owner of 111// "everything Buildkite-y": colocating the auth knob with the API 112// client and the state translator keeps configuration drift to one 113// place and makes the http.go side pure transport. 114// 115// defaultOrg is the Buildkite organisation the configured API token 116// belongs to. Workflows may opt into a different org via their YAML 117// `org` field; the API token then needs to be authorised against it. 118type buildkiteProvider struct { 119 br *broker 120 st *store 121 log *slog.Logger 122 client *buildkite.Client 123 defaultOrg string 124 webhookSecret string 125 webhookMode buildkite.WebhookMode 126} 127 128// Compile-time interface conformance check. 129var _ Provider = (*buildkiteProvider)(nil) 130 131// newBuildkiteProvider wires a provider to its Buildkite client and 132// to the broker it publishes pipeline.status records on. defaultOrg 133// is the org the API token authenticates against and the org used 134// when a workflow doesn't specify its own. webhookSecret/webhookMode 135// govern inbound /webhooks/buildkite request authentication. 136func newBuildkiteProvider( 137 br *broker, 138 st *store, 139 client *buildkite.Client, 140 defaultOrg string, 141 webhookSecret string, 142 webhookMode buildkite.WebhookMode, 143 log *slog.Logger, 144) *buildkiteProvider { 145 return &buildkiteProvider{ 146 br: br, 147 st: st, 148 log: log.With("component", "provider", "kind", "buildkite"), 149 client: client, 150 defaultOrg: defaultOrg, 151 webhookSecret: webhookSecret, 152 webhookMode: webhookMode, 153 } 154} 155 156// VerifyWebhook authenticates an inbound webhook request using 157// whichever mode the provider was configured with. Returns nil on 158// success; the HTTP handler maps any returned error to 401. 159func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error { 160 switch p.webhookMode { 161 case buildkite.WebhookModeSignature: 162 return buildkite.VerifySignature( 163 headers.Get("X-Buildkite-Signature"), 164 p.webhookSecret, body, 165 ) 166 default: 167 // Token mode is the Buildkite default and our default, so 168 // any unrecognised value falls through to it rather than 169 // fail-closed at startup. 170 return buildkite.VerifyToken( 171 headers.Get("X-Buildkite-Token"), 172 p.webhookSecret, 173 ) 174 } 175} 176 177// Spawn satisfies Provider. For each workflow it fires a separate 178// Buildkite build off the pipeline named in that workflow's YAML so 179// each workflow gets its own status timeline. The actual API call 180// runs on a goroutine — CreateBuild is one HTTP round-trip, but we 181// still want Spawn to be non-blocking per the interface contract. 182// 183// On a successful create we persist the build UUID → (knot, rkey, 184// workflow) mapping and publish a "pending" pipeline.status so the 185// appview sees activity immediately, instead of waiting for the 186// first webhook to land. 187func (p *buildkiteProvider) Spawn( 188 ctx context.Context, 189 knot string, 190 pipelineRkey string, 191 trigger *tangled.Pipeline_TriggerMetadata, 192 workflows []*tangled.Pipeline_Workflow, 193) { 194 if len(workflows) == 0 { 195 p.log.Warn("pipeline has no workflows; nothing to spawn", 196 "knot", knot, "rkey", pipelineRkey, 197 ) 198 return 199 } 200 201 for _, wf := range workflows { 202 if wf == nil || wf.Name == "" { 203 continue 204 } 205 wf := wf 206 go p.spawnWorkflow(ctx, knot, pipelineRkey, trigger, wf) 207 } 208} 209 210// spawnWorkflow does the per-workflow API + persistence work for 211// Spawn. Errors are logged with full context but not returned — 212// nothing in tack consumes the result, and a failed Spawn just 213// surfaces as the absence of any status update for the affected 214// workflow. 215func (p *buildkiteProvider) spawnWorkflow( 216 ctx context.Context, 217 knot string, 218 pipelineRkey string, 219 trigger *tangled.Pipeline_TriggerMetadata, 220 wf *tangled.Pipeline_Workflow, 221) { 222 logger := p.log.With( 223 "knot", knot, 224 "pipeline_rkey", pipelineRkey, 225 "workflow", wf.Name, 226 ) 227 228 cfg, err := parseWorkflowConfig(wf.Raw) 229 if err != nil { 230 // Bad workflow YAML is a user-facing config error: log it 231 // loudly and skip. Firing a build off some default would 232 // be more confusing than doing nothing. 233 logger.Error("invalid workflow config; refusing to spawn", "err", err) 234 return 235 } 236 logger = logger.With("pipeline", cfg.Pipeline) 237 238 req, err := p.buildCreateRequest(cfg, trigger, knot, pipelineRkey, wf) 239 if err != nil { 240 logger.Error("build create request", "err", err) 241 return 242 } 243 244 org := cfg.Org 245 if org == "" { 246 org = p.defaultOrg 247 } 248 249 build, err := p.client.CreateBuild(ctx, org, cfg.Pipeline, req) 250 if err != nil { 251 logger.Error("create buildkite build", "err", err, "org", org) 252 return 253 } 254 logger.Info("buildkite build created", 255 "build_uuid", build.ID, 256 "build_number", build.Number, 257 "web_url", build.WebURL, 258 "org", org, 259 ) 260 261 pipelineURI := pipelineATURI(knot, pipelineRkey) 262 if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{ 263 BuildUUID: build.ID, 264 BuildNumber: build.Number, 265 PipelineSlug: cfg.Pipeline, 266 Knot: knot, 267 PipelineRkey: pipelineRkey, 268 Workflow: wf.Name, 269 PipelineURI: pipelineURI, 270 }); err != nil { 271 // Webhook handlers will fail to translate this build's 272 // events because they can't recover the tuple. Surface 273 // loudly and bail; we don't want a half-tracked build 274 // silently leaking status into the broker. 275 logger.Error("persist buildkite build mapping", "err", err, 276 "build_uuid", build.ID, 277 ) 278 return 279 } 280 281 // Initial status publish so the appview shows the build as 282 // queued without waiting for the first webhook. This mirrors 283 // the upstream spindle's "schedule then run" cadence. 284 if err := p.publishStatus( 285 ctx, pipelineURI, wf.Name, "pending", build.ID, 286 nil, nil, 287 ); err != nil { 288 logger.Error("publish initial pending status", "err", err) 289 } 290} 291 292// buildCreateRequest folds the parsed workflow config and the 293// Tangled trigger metadata into a single Buildkite create-build 294// payload. Trigger metadata supplies commit/branch; the workflow 295// YAML supplies the Buildkite routing knobs (pipeline/org) and the 296// small handful of build options we expose. 297// 298// `ignore_pipeline_branch_filters` is hard-coded to true: Tangled 299// refs frequently don't match arbitrary Buildkite pipeline branch 300// filters, and a build silently dropped at create time is a worse 301// failure mode than running one we shouldn't have. Users wanting 302// the filter back are expected to drop the filter on the Buildkite 303// pipeline itself. 304// 305// Returns an error when the trigger lacks a commit — Buildkite's 306// API requires one and we'd rather log+skip than fire a build that 307// resolves to "whatever main happens to be". 308func (p *buildkiteProvider) buildCreateRequest( 309 cfg *buildkiteConfig, 310 trigger *tangled.Pipeline_TriggerMetadata, 311 knot, pipelineRkey string, 312 wf *tangled.Pipeline_Workflow, 313) (buildkite.CreateBuildRequest, error) { 314 commit, branch := triggerCommitAndBranch(trigger) 315 if commit == "" { 316 return buildkite.CreateBuildRequest{}, errors.New( 317 "trigger has no commit", 318 ) 319 } 320 321 cleanCheckout := false 322 if cfg.CleanCheckout != nil { 323 cleanCheckout = *cfg.CleanCheckout 324 } 325 326 req := buildkite.CreateBuildRequest{ 327 Commit: commit, 328 Branch: branch, 329 Message: fmt.Sprintf("tangled: %s", wf.Name), 330 Env: envFromTuple(knot, pipelineRkey, wf), 331 MetaData: map[string]string{ 332 bkMetaKnot: knot, 333 bkMetaPipelineRkey: pipelineRkey, 334 bkMetaWorkflow: wf.Name, 335 }, 336 CleanCheckout: cleanCheckout, 337 IgnorePipelineBranchFilters: true, 338 } 339 340 // Auto-populate Buildkite's PR fields from the Tangled PR 341 // trigger when present. Buildkite doesn't get a PR number from 342 // us (Tangled doesn't surface one through the trigger), but 343 // the base branch alone is enough for `pull_request_base_branch`- 344 // gated step filters to work. 345 if trigger != nil && trigger.PullRequest != nil { 346 req.PullRequestBaseBranch = trigger.PullRequest.TargetBranch 347 } 348 349 return req, nil 350} 351 352// Logs satisfies Provider. We resolve the (knot, rkey, workflow) 353// tuple to a Buildkite build via the store, fetch the current jobs 354// list, then drain each job's plain-text log into the channel as one 355// LogLine per output line. 356// 357// Per-job control frames bracket each job so the appview's renderer 358// has start/end markers to lay out timing — same shape as the fake 359// provider and the upstream spindle. 360// 361// This is a snapshot read, not a tail — finished or in-progress, we 362// fetch what's there and close. Live tailing would require Buildkite 363// agent log streaming, which the public REST API doesn't expose; the 364// appview's repeated /logs calls during a running build give us 365// "good enough" liveness without that complexity. 366func (p *buildkiteProvider) Logs( 367 ctx context.Context, 368 knot string, 369 pipelineRkey string, 370 workflow string, 371) (<-chan LogLine, error) { 372 ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow) 373 if err != nil { 374 return nil, fmt.Errorf("lookup build mapping: %w", err) 375 } 376 if ref == nil { 377 return nil, ErrLogsNotFound 378 } 379 380 // Resolve the org against which we should pull jobs/logs. We 381 // don't persist it on BuildkiteBuildRef today (the slug + token 382 // have always been enough); fall back to the provider default, 383 // which is correct for the common single-org install. A 384 // future migration can add a column when multi-org installs 385 // need it. 386 org := p.defaultOrg 387 388 build, err := p.client.GetBuild(ctx, org, ref.PipelineSlug, ref.BuildNumber) 389 if err != nil { 390 if errors.Is(err, buildkite.ErrNotFound) { 391 return nil, ErrLogsNotFound 392 } 393 return nil, fmt.Errorf("get build: %w", err) 394 } 395 396 out := make(chan LogLine, 32) 397 go func() { 398 defer close(out) 399 stepID := 0 400 for _, job := range build.Jobs { 401 if job.Type != "" && job.Type != "script" { 402 // Skip non-script jobs (waiter, manual, 403 // trigger). They have no log to fetch and 404 // surfacing empty steps just clutters the 405 // appview. 406 continue 407 } 408 name := job.Name 409 if name == "" { 410 name = fmt.Sprintf("job %s", job.ID) 411 } 412 413 if !sendLine(ctx, out, LogLine{ 414 Kind: LogKindControl, 415 Time: time.Now(), 416 Content: name, 417 StepId: stepID, 418 StepStatus: StepStatusStart, 419 }) { 420 return 421 } 422 423 body, err := p.client.GetJobLog(ctx, org, ref.PipelineSlug, ref.BuildNumber, job.ID) 424 if err != nil { 425 p.log.Debug("fetch job log", 426 "err", err, 427 "build_uuid", ref.BuildUUID, 428 "job_id", job.ID, 429 ) 430 // Don't fail the whole stream on one job; 431 // emit the end frame and move on so the 432 // appview at least sees what other jobs 433 // produced. 434 body = "" 435 } 436 437 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 438 if line == "" { 439 // Skip the leading empty entry that 440 // Split produces for empty bodies. 441 continue 442 } 443 if !sendLine(ctx, out, LogLine{ 444 Kind: LogKindData, 445 Time: time.Now(), 446 Content: line + "\n", 447 StepId: stepID, 448 Stream: "stdout", 449 }) { 450 return 451 } 452 } 453 454 if !sendLine(ctx, out, LogLine{ 455 Kind: LogKindControl, 456 Time: time.Now(), 457 Content: name, 458 StepId: stepID, 459 StepStatus: StepStatusEnd, 460 }) { 461 return 462 } 463 stepID++ 464 } 465 }() 466 return out, nil 467} 468 469// publishStatus assembles a tangled.PipelineStatus record and pushes 470// it through the broker. buildUUID is mixed into the rkey so multiple 471// status events for the same workflow don't collide on the events 472// table's (rkey) uniqueness — and so an operator grepping the log 473// can find every record that pertains to a given Buildkite build. 474// 475// errMsg/exitCode are optional; pass nil for non-failure transitions. 476func (p *buildkiteProvider) publishStatus( 477 ctx context.Context, 478 pipelineURI, workflow, status, buildUUID string, 479 errMsg *string, 480 exitCode *int64, 481) error { 482 rec := tangled.PipelineStatus{ 483 LexiconTypeID: tangled.PipelineStatusNSID, 484 Pipeline: pipelineURI, 485 Workflow: workflow, 486 Status: status, 487 CreatedAt: time.Now().UTC().Format(time.RFC3339), 488 Error: errMsg, 489 ExitCode: exitCode, 490 } 491 body, err := json.Marshal(rec) 492 if err != nil { 493 return fmt.Errorf("marshal pipeline.status: %w", err) 494 } 495 rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano()) 496 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 497 return fmt.Errorf("publish pipeline.status: %w", err) 498 } 499 return nil 500} 501 502// HandleWebhook applies a decoded Buildkite webhook payload: looks 503// the build up in the store, translates the Buildkite state into a 504// Tangled StatusKind, and publishes a pipeline.status record. Used 505// by the HTTP webhook handler so both the ingress logic and the 506// translation logic live next to each other. 507// 508// Returns nil for events we intentionally ignore (job.* events, 509// build.scheduled which we already publish locally on Spawn, builds 510// we don't have a mapping for) so the handler can 200 them — webhook 511// retries from Buildkite on a 4xx/5xx are noisy and not what we want 512// for "we just don't care about this event". 513func (p *buildkiteProvider) HandleWebhook( 514 ctx context.Context, 515 payload buildkite.WebhookPayload, 516) error { 517 // Only build.* events drive pipeline.status today. Everything 518 // else (job.*, agent.*, ping) is acknowledged silently. 519 if !strings.HasPrefix(payload.Event, "build.") { 520 return nil 521 } 522 523 ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID) 524 if err != nil { 525 return fmt.Errorf("lookup build by uuid: %w", err) 526 } 527 if ref == nil { 528 // Most likely: this build was triggered outside tack and 529 // just happens to share our webhook URL. Nothing to do. 530 p.log.Debug("webhook for unknown build; ignoring", 531 "event", payload.Event, 532 "build_uuid", payload.Build.ID, 533 ) 534 return nil 535 } 536 537 status, ok := mapBuildkiteState(payload.Build.State) 538 if !ok { 539 // Unknown / transient state ("blocked", "skipped", 540 // "not_run", "waiting"…) — log so we can extend the map 541 // later, but don't error out the webhook. 542 p.log.Debug("unmapped buildkite state; ignoring", 543 "event", payload.Event, 544 "state", payload.Build.State, 545 "build_uuid", payload.Build.ID, 546 ) 547 return nil 548 } 549 550 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 551 status, ref.BuildUUID, nil, nil); err != nil { 552 return fmt.Errorf("publish webhook status: %w", err) 553 } 554 p.log.Info("buildkite webhook → pipeline.status", 555 "event", payload.Event, 556 "state", payload.Build.State, 557 "status", status, 558 "build_uuid", payload.Build.ID, 559 "workflow", ref.Workflow, 560 ) 561 return nil 562} 563 564// mapBuildkiteState translates Buildkite's build state strings into 565// the Tangled spindle StatusKind enum. The mapping aligns with the 566// upstream constants (StatusKindRunning/Failed/Cancelled/Success); 567// states that don't have a direct analogue (blocked, skipped, 568// not_run) are reported as not-mapped so the caller can decide 569// whether to ignore them. 570func mapBuildkiteState(state string) (string, bool) { 571 switch state { 572 case "scheduled": 573 return "pending", true 574 case "running", "failing": 575 return "running", true 576 case "passed": 577 return "success", true 578 case "failed": 579 return "failed", true 580 case "canceled", "canceling": 581 return "cancelled", true 582 default: 583 return "", false 584 } 585} 586 587// envFromTuple builds the env block forwarded into the Buildkite 588// build. These are the only handle a user's Buildkite pipeline has 589// on the originating Tangled trigger: their pipeline.yml typically 590// reads $TACK_WORKFLOW and dispatches based on it (e.g. running a 591// `pipeline upload` against a workflow-specific YAML file). 592// 593// TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as 594// captured in the Tangled record. It can be empty if the workflow 595// definition omitted it; consumers should defend. 596func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string { 597 return map[string]string{ 598 "TACK_KNOT": knot, 599 "TACK_PIPELINE_RKEY": pipelineRkey, 600 "TACK_WORKFLOW": wf.Name, 601 "TACK_WORKFLOW_RAW": wf.Raw, 602 } 603} 604 605// pipelineATURI returns the at-uri the appview joins pipeline.status 606// records back to their originating pipeline on. Format mirrors the 607// upstream spindle; the appview strips the `did:web:` prefix and 608// treats the remainder as the knot identifier. 609func pipelineATURI(knot, pipelineRkey string) string { 610 return fmt.Sprintf("at://did:web:%s/%s/%s", 611 knot, tangled.PipelineNSID, pipelineRkey, 612 ) 613} 614 615// triggerCommitAndBranch extracts (commit, branch) from a Tangled 616// pipeline trigger, regardless of whether it was a push, a pull 617// request, or a manual run. Returns empty strings on a fully-empty 618// trigger so the caller can decide whether that's fatal. 619func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) { 620 if trigger == nil { 621 return "", "" 622 } 623 switch { 624 case trigger.Push != nil: 625 // For push events, NewSha is the commit being built and 626 // Ref is the full ref (e.g. "refs/heads/main") — strip 627 // the prefix so Buildkite's branch-aware features work. 628 return trigger.Push.NewSha, refToBranch(trigger.Push.Ref) 629 case trigger.PullRequest != nil: 630 // PRs build the source commit on the source branch. 631 return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch 632 default: 633 // Manual triggers and any future kinds: fall back to the 634 // repo default branch with no commit, which the caller 635 // will treat as fatal — manual triggers will need 636 // additional plumbing to pick a commit. 637 if trigger.Repo != nil { 638 return "", trigger.Repo.DefaultBranch 639 } 640 return "", "" 641 } 642} 643 644// refToBranch strips the conventional refs/heads/ prefix from a git 645// ref. Refs that don't match the prefix (tags, refs/pull/N/head) are 646// returned as-is so downstream tooling can decide what to do with 647// them — Buildkite happily accepts either form in `branch`. 648func refToBranch(ref string) string { 649 const prefix = "refs/heads/" 650 if strings.HasPrefix(ref, prefix) { 651 return strings.TrimPrefix(ref, prefix) 652 } 653 return ref 654} 655 656// sendLine pushes one LogLine into out, returning false if ctx 657// fired first. Centralised so the per-job loop in Logs stays 658// focused on the wire-shape decisions. 659func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool { 660 select { 661 case <-ctx.Done(): 662 return false 663 case out <- line: 664 return true 665 } 666}