Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1package main 2 3// sourcehutProvider implements Provider against a builds.sr.ht 4// instance. Spawn submits one job per workflow through GraphQL's 5// `submit` mutation; status is observed via an in-process poll loop 6// — same shape as the Tekton provider. 7// 8// The build manifest a workflow targets is carried inline in the 9// workflow's YAML body under `tack.sourcehut.manifest`. Tack injects 10// a small set of TACK_* environment variables into the manifest so 11// the user's tasks can dispatch on the originating Tangled trigger. 12 13import ( 14 "context" 15 "encoding/json" 16 "errors" 17 "fmt" 18 "log/slog" 19 "strings" 20 "sync" 21 "time" 22 23 "go.yaml.in/yaml/v2" 24 "tangled.org/core/api/tangled" 25 26 "go.mitchellh.com/tack/internal/sourcehut" 27) 28 29// defaultSourcehutPollInterval is how often the watch loop re-fetches 30// upstream job state. 31const defaultSourcehutPollInterval = 5 * time.Second 32 33// workflowSourcehutDoc is the tack-flavoured schema we expect inside 34// each Tangled workflow's Raw YAML body. Same nesting convention as 35// the other providers (`tack: { sourcehut: ... }`) so a workflow can 36// grow other top-level keys without colliding with our namespace. 37type workflowSourcehutDoc struct { 38 Tack struct { 39 Sourcehut sourcehutWorkflowConfig `yaml:"sourcehut"` 40 } `yaml:"tack"` 41} 42 43// sourcehutWorkflowConfig is the sourcehut-specific subset of a 44// workflow YAML. 45// 46// Manifest is the YAML build manifest the user wants builds.sr.ht to 47// run, exactly as they'd paste it into the web UI. We treat it as an 48// opaque string until Spawn time, then decode/inject env vars/re-emit 49// just before submitting so users keep authorial control of their 50// manifests. 51// 52// Instance optionally overrides the provider's default sourcehut host 53// (e.g. for users running their own builds.sr.ht). It must be a full 54// URL including scheme so we don't have to guess http vs https. 55// 56// Tags/Note flow straight through to the submit API — they're useful 57// for the user's own filtering on builds.sr.ht's job list view. 58// 59// Secrets controls whether the runner mounts the user's configured 60// sourcehut secrets. Default is false because secrets injection is a 61// blast-radius decision that should be opt-in per workflow. 62type sourcehutWorkflowConfig struct { 63 Manifest string `yaml:"manifest"` 64 Instance string `yaml:"instance"` 65 Tags []string `yaml:"tags"` 66 Note string `yaml:"note"` 67 Secrets bool `yaml:"secrets"` 68} 69 70// parseSourcehutWorkflowConfig decodes `tack.sourcehut` from a workflow 71// body and validates the small set of fields we require. An empty body 72// or a missing manifest is a structural error so spawnWorkflow can 73// short-circuit cleanly instead of submitting a malformed job that 74// builds.sr.ht would reject. 75func parseSourcehutWorkflowConfig(raw string) (*sourcehutWorkflowConfig, error) { 76 if strings.TrimSpace(raw) == "" { 77 return nil, errors.New("workflow body is empty") 78 } 79 var doc workflowSourcehutDoc 80 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 81 return nil, fmt.Errorf("parse workflow yaml: %w", err) 82 } 83 cfg := doc.Tack.Sourcehut 84 if strings.TrimSpace(cfg.Manifest) == "" { 85 return nil, errors.New("workflow yaml: `tack.sourcehut.manifest` is required") 86 } 87 return &cfg, nil 88} 89 90// sourcehutProvider implements Provider. 91// 92// defaultInstance is the builds.sr.ht instance Spawn submits to when 93// a workflow doesn't override it; the configured token must 94// authenticate against that instance. defaultClient is the matching 95// Client; per-workflow `instance` overrides cause Spawn to mint a 96// short-lived sibling Client carrying the same token, since one token 97// can address only one sourcehut deployment in practice. 98type sourcehutProvider struct { 99 br *broker 100 st *store 101 log *slog.Logger 102 token string 103 defaultClient *sourcehut.Client 104 defaultInstance string 105 pollInterval time.Duration 106 107 // instanceClients caches a Client per non-default instance URL so a 108 // workflow that targets a custom builds.sr.ht doesn't mint (and 109 // leak) a fresh http.Transport + connection pool on every Spawn, 110 // watch tick, and Logs call. The default instance always uses 111 // defaultClient and never lands here. 112 mu sync.Mutex 113 instanceClients map[string]*sourcehut.Client 114} 115 116var _ Provider = (*sourcehutProvider)(nil) 117 118// newSourcehutProvider wires a provider to its sourcehut client and 119// to the broker it publishes pipeline.status records on. instance is 120// the default builds.sr.ht base URL; pass empty to use 121// sourcehut.DefaultBaseURL. 122func newSourcehutProvider( 123 br *broker, 124 st *store, 125 token string, 126 instance string, 127 log *slog.Logger, 128) *sourcehutProvider { 129 if instance == "" { 130 instance = sourcehut.DefaultBaseURL 131 } 132 return &sourcehutProvider{ 133 br: br, 134 st: st, 135 log: log.With("component", "provider", "kind", "sourcehut"), 136 token: token, 137 defaultClient: sourcehut.NewClient(instance, token), 138 defaultInstance: instance, 139 pollInterval: defaultSourcehutPollInterval, 140 instanceClients: map[string]*sourcehut.Client{}, 141 } 142} 143 144// Spawn satisfies Provider. For each workflow it submits a separate 145// builds.sr.ht job so each workflow gets its own status timeline. The 146// actual API call runs on a goroutine — submission is one HTTP round 147// trip, but Spawn is contractually non-blocking. 148func (p *sourcehutProvider) Spawn( 149 ctx context.Context, 150 knot string, 151 pipelineRkey string, 152 actor string, 153 trigger *tangled.Pipeline_TriggerMetadata, 154 workflows []*tangled.Pipeline_Workflow, 155) { 156 if len(workflows) == 0 { 157 p.log.Warn("pipeline has no workflows; nothing to spawn", 158 "knot", knot, "rkey", pipelineRkey, 159 ) 160 return 161 } 162 for _, wf := range workflows { 163 if wf == nil || wf.Name == "" { 164 continue 165 } 166 wf := wf 167 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 168 } 169} 170 171func (p *sourcehutProvider) spawnWorkflow( 172 ctx context.Context, 173 knot string, 174 pipelineRkey string, 175 actor string, 176 trigger *tangled.Pipeline_TriggerMetadata, 177 wf *tangled.Pipeline_Workflow, 178) { 179 logger := p.log.With( 180 "knot", knot, 181 "pipeline_rkey", pipelineRkey, 182 "workflow", wf.Name, 183 "actor", actor, 184 ) 185 186 cfg, err := parseSourcehutWorkflowConfig(wf.Raw) 187 if err != nil { 188 logger.Error("invalid workflow config; refusing to spawn", "err", err) 189 return 190 } 191 192 commit, branch := triggerCommitAndBranch(trigger) 193 manifest, err := injectSourcehutEnvironment(cfg.Manifest, map[string]string{ 194 "TACK_KNOT": knot, 195 "TACK_PIPELINE_RKEY": pipelineRkey, 196 "TACK_WORKFLOW": wf.Name, 197 "TACK_WORKFLOW_RAW": wf.Raw, 198 "TACK_ACTOR": actor, 199 "TACK_COMMIT": commit, 200 "TACK_BRANCH": branch, 201 }) 202 if err != nil { 203 logger.Error("inject TACK_* env into manifest", "err", err) 204 return 205 } 206 207 client, instance := p.clientFor(cfg.Instance) 208 logger = logger.With("instance", instance) 209 210 tags := cfg.Tags 211 if len(tags) == 0 { 212 // Auto-tag with "tack" so an operator browsing the builds.sr.ht 213 // job list can filter to the jobs originating from this spindle 214 // without users having to remember to set tags themselves. 215 tags = []string{"tack"} 216 } 217 note := cfg.Note 218 if note == "" { 219 note = fmt.Sprintf("tangled: %s @ %s", wf.Name, shortCommit(commit)) 220 } 221 222 job, err := client.SubmitJob(ctx, sourcehut.SubmitRequest{ 223 Manifest: manifest, 224 Tags: tags, 225 Note: note, 226 Secrets: cfg.Secrets, 227 Execute: true, 228 }) 229 if err != nil { 230 logger.Error("submit sourcehut job", "err", err) 231 return 232 } 233 logger.Info("sourcehut job submitted", 234 "job_id", job.ID, 235 "owner", job.Owner.CanonicalName, 236 "web_url", client.JobWebURL(job.Owner.CanonicalName, job.ID), 237 ) 238 239 pipelineURI := pipelineATURI(knot, pipelineRkey) 240 ref := SourcehutJobRef{ 241 Knot: knot, 242 PipelineRkey: pipelineRkey, 243 Workflow: wf.Name, 244 JobID: job.ID, 245 Owner: job.Owner.CanonicalName, 246 Instance: instance, 247 PipelineURI: pipelineURI, 248 } 249 if err := p.st.InsertSourcehutJob(ctx, ref); err != nil { 250 // Without the row the watch goroutine and any /logs lookup 251 // can't recover the job. Surface loudly and bail. 252 logger.Error("persist sourcehut job mapping", "err", err, 253 "job_id", job.ID, 254 ) 255 return 256 } 257 258 if err := p.publishStatus(ctx, pipelineURI, wf.Name, 259 "pending", job.ID, nil, nil); err != nil { 260 logger.Error("publish initial pending status", "err", err) 261 } 262 263 go p.watchJob(ctx, ref, client) 264} 265 266// watchJob polls the job until it reaches a terminal state, publishing 267// pipeline.status records on every distinct transition. We mirror the 268// Tekton provider's structure: a single goroutine per job, suppressed 269// duplicate publishes via `last`, exit on terminal or ctx cancellation. 270func (p *sourcehutProvider) watchJob( 271 ctx context.Context, 272 ref SourcehutJobRef, 273 client *sourcehut.Client, 274) { 275 logger := p.log.With( 276 "knot", ref.Knot, 277 "pipeline_rkey", ref.PipelineRkey, 278 "workflow", ref.Workflow, 279 "job_id", ref.JobID, 280 "instance", ref.Instance, 281 ) 282 logger.Debug("watchJob: starting") 283 284 last := "" 285 ticker := time.NewTicker(p.pollInterval) 286 defer ticker.Stop() 287 288 // Issue one immediate poll so a fast-completing job (e.g. a tiny 289 // manifest the runner finished before the first tick) doesn't sit 290 // at "pending" for the whole interval. 291 for { 292 job, err := client.GetJob(ctx, ref.JobID) 293 if errors.Is(err, sourcehut.ErrNotFound) { 294 logger.Warn("job disappeared while watching") 295 return 296 } 297 if err != nil { 298 logger.Debug("get sourcehut job", "err", err) 299 } else { 300 status, terminal, ok := mapSourcehutStatus(job.Status) 301 logger.Debug("watchJob: poll", 302 "upstream_status", job.Status, 303 "status", status, "terminal", terminal, "ok", ok, "last", last, 304 ) 305 if !ok { 306 // Keep polling on an unrecognised status, but warn 307 // so a silent infinite loop is at least visible. 308 logger.Warn("unmapped sourcehut status; continuing to poll", 309 "upstream_status", job.Status, 310 ) 311 } else if status != last { 312 last = status 313 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 314 status, ref.JobID, nil, nil); err != nil { 315 logger.Error("publish sourcehut status", "err", err, "status", status) 316 } 317 } 318 if ok && terminal { 319 logger.Debug("watchJob: terminal status reached; exiting", "status", status) 320 return 321 } 322 } 323 324 select { 325 case <-ctx.Done(): 326 logger.Debug("watchJob: context cancelled") 327 return 328 case <-ticker.C: 329 } 330 } 331} 332 333// mapSourcehutStatus translates builds.sr.ht's job status into the 334// Tangled spindle StatusKind strings the appview consumes. The 335// upstream enum is uppercase over GraphQL; we lowercase first so a 336// surface change between cases doesn't fall to the not-mapped branch. 337func mapSourcehutStatus(state string) (status string, terminal bool, ok bool) { 338 switch strings.ToLower(state) { 339 case "pending", "queued": 340 return "pending", false, true 341 case "running": 342 return "running", false, true 343 case "success": 344 return "success", true, true 345 case "failed", "timeout": 346 return "failed", true, true 347 case "cancelled": 348 return "cancelled", true, true 349 default: 350 return "", false, false 351 } 352} 353 354// Logs satisfies Provider. We resolve the (knot, rkey, workflow) tuple 355// to a sourcehut job, fetch its tasks, and stream the per-task logs as 356// LogLine frames bracketed by per-task control frames. 357// 358// Snapshot read, not a tail — for an in-progress job each task's log 359// is fetched at its current length and the channel closes. The 360// appview's repeated /logs calls during a running build give us 361// "good enough" liveness. 362func (p *sourcehutProvider) Logs( 363 ctx context.Context, 364 knot string, 365 pipelineRkey string, 366 workflow string, 367) (<-chan LogLine, error) { 368 ref, err := p.st.LookupSourcehutJobByTuple(ctx, knot, pipelineRkey, workflow) 369 if err != nil { 370 return nil, fmt.Errorf("lookup sourcehut job: %w", err) 371 } 372 if ref == nil { 373 return nil, ErrLogsNotFound 374 } 375 376 client, _ := p.clientFor(ref.Instance) 377 378 job, err := client.GetJob(ctx, ref.JobID) 379 if err != nil { 380 if errors.Is(err, sourcehut.ErrNotFound) { 381 return nil, ErrLogsNotFound 382 } 383 return nil, fmt.Errorf("get sourcehut job: %w", err) 384 } 385 386 // Probe the master log up front to detect a systemic 387 // authorization failure (typically a token that lacks 388 // `builds.sr.ht/LOGS:RO`). Without this check, every per-task 389 // fetch in the goroutine below would 403 and `streamStep` would 390 // emit empty steps — leaving the operator with builds whose 391 // status updates correctly but whose logs are silently blank. 392 // ErrNotFound here is fine: the runner just hasn't produced any 393 // setup output yet. 394 if _, err := client.GetTaskLog(ctx, ref.JobID, ""); err != nil && 395 !errors.Is(err, sourcehut.ErrNotFound) { 396 if errors.Is(err, sourcehut.ErrUnauthorized) { 397 return nil, fmt.Errorf( 398 "sourcehut log fetch unauthorized; "+ 399 "token likely missing `builds.sr.ht/LOGS:RO`: %w", err, 400 ) 401 } 402 return nil, fmt.Errorf("probe sourcehut task log: %w", err) 403 } 404 405 out := make(chan LogLine, 32) 406 go func() { 407 defer close(out) 408 stepID := 0 409 // "setup" is the master log: setup output (sources clone, 410 // package install) before any task runs. Always emit it as 411 // step 0 even if empty so the renderer has a stable timeline. 412 if !p.streamStep(ctx, out, client, ref, "setup", "", stepID) { 413 return 414 } 415 stepID++ 416 for _, task := range job.Tasks { 417 name := task.Name 418 if name == "" { 419 name = fmt.Sprintf("task %d", stepID) 420 } 421 if !p.streamStep(ctx, out, client, ref, name, task.Name, stepID) { 422 return 423 } 424 stepID++ 425 } 426 }() 427 return out, nil 428} 429 430// streamStep emits one step's worth of frames into out: a start 431// control, one data frame per non-empty log line, and an end control. 432// stepName is the human-visible label; logTask is the name passed to 433// GetTaskLog (empty string fetches the master log). Returns false if 434// the context fired and the caller should bail. 435func (p *sourcehutProvider) streamStep( 436 ctx context.Context, 437 out chan<- LogLine, 438 client *sourcehut.Client, 439 ref *SourcehutJobRef, 440 stepName, logTask string, 441 stepID int, 442) bool { 443 if !sendLine(ctx, out, LogLine{ 444 Kind: LogKindControl, Time: time.Now(), 445 Content: stepName, StepId: stepID, StepStatus: StepStatusStart, 446 }) { 447 return false 448 } 449 body, err := client.GetTaskLog(ctx, ref.JobID, logTask) 450 if err != nil && !errors.Is(err, sourcehut.ErrNotFound) { 451 if errors.Is(err, sourcehut.ErrUnauthorized) { 452 // Auth errors are systemic — every subsequent task fetch 453 // will fail the same way. Log loudly and abort the stream 454 // so the operator notices instead of getting a build with 455 // every step rendered empty. The HTTP layer can't change 456 // status mid-stream, but at least the server log will 457 // point straight at the misconfigured token. 458 p.log.Error("sourcehut log fetch unauthorized; aborting stream", 459 "err", err, "job_id", ref.JobID, "task", logTask, 460 ) 461 return false 462 } 463 // Don't fail the whole stream on one task; emit the end frame 464 // and move on so the renderer at least sees what other tasks 465 // produced. ErrNotFound (no log yet) is treated as an empty body. 466 p.log.Debug("fetch sourcehut task log", 467 "err", err, "job_id", ref.JobID, "task", logTask, 468 ) 469 body = "" 470 } 471 if !emitLogBody(ctx, out, body, stepID) { 472 return false 473 } 474 return sendLine(ctx, out, LogLine{ 475 Kind: LogKindControl, Time: time.Now(), 476 Content: stepName, StepId: stepID, StepStatus: StepStatusEnd, 477 }) 478} 479 480// emitLogBody pushes one LogLine per non-empty line of body into out. 481// Returns false if the context fired so the caller can stop draining. 482func emitLogBody(ctx context.Context, out chan<- LogLine, body string, stepID int) bool { 483 if body == "" { 484 return true 485 } 486 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 487 if line == "" { 488 continue 489 } 490 if !sendLine(ctx, out, LogLine{ 491 Kind: LogKindData, 492 Time: time.Now(), 493 Content: line + "\n", 494 StepId: stepID, 495 Stream: "stdout", 496 }) { 497 return false 498 } 499 } 500 return true 501} 502 503// publishStatus assembles a tangled.PipelineStatus record and pushes 504// it through the broker. jobID is mixed into the rkey so multiple 505// status records for the same job don't collide on the events table's 506// (rkey) uniqueness, and so an operator grepping the log can find 507// every record pertaining to a given sourcehut job. 508func (p *sourcehutProvider) publishStatus( 509 ctx context.Context, 510 pipelineURI, workflow, status string, 511 jobID int64, 512 errMsg *string, 513 exitCode *int64, 514) error { 515 rec := tangled.PipelineStatus{ 516 LexiconTypeID: tangled.PipelineStatusNSID, 517 Pipeline: pipelineURI, 518 Workflow: workflow, 519 Status: status, 520 CreatedAt: time.Now().UTC().Format(time.RFC3339), 521 Error: errMsg, 522 ExitCode: exitCode, 523 } 524 body, err := json.Marshal(rec) 525 if err != nil { 526 return fmt.Errorf("marshal pipeline.status: %w", err) 527 } 528 rkey := fmt.Sprintf("sh-%d-%s-%d", jobID, status, time.Now().UnixNano()) 529 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 530 return fmt.Errorf("publish pipeline.status: %w", err) 531 } 532 return nil 533} 534 535// injectSourcehutEnvironment decodes the user-supplied manifest, 536// merges env into its top-level `environment` map, and re-emits it 537// as YAML. We round-trip via map[string]any rather than a typed 538// schema so unknown fields aren't dropped; comments and key ordering 539// are not preserved. 540// 541// Existing user-set entries win over our injected vars — an explicit 542// `TACK_FOO: ...` is a deliberate override. 543// 544// `environment:` must be a map. A sequence form is rejected loudly 545// rather than silently replaced with our map. 546func injectSourcehutEnvironment(manifest string, env map[string]string) (string, error) { 547 var doc map[string]any 548 if err := yaml.Unmarshal([]byte(manifest), &doc); err != nil { 549 return "", fmt.Errorf("parse manifest: %w", err) 550 } 551 if doc == nil { 552 doc = map[string]any{} 553 } 554 var current map[any]any 555 switch existing := doc["environment"].(type) { 556 case nil: 557 current = map[any]any{} 558 case map[any]any: 559 current = existing 560 default: 561 return "", fmt.Errorf( 562 "manifest `environment` must be a map, got %T", existing, 563 ) 564 } 565 for k, v := range env { 566 if _, exists := current[k]; exists { 567 continue 568 } 569 current[k] = v 570 } 571 doc["environment"] = current 572 573 out, err := yaml.Marshal(doc) 574 if err != nil { 575 return "", fmt.Errorf("emit manifest: %w", err) 576 } 577 return string(out), nil 578} 579 580// clientFor resolves a workflow's optional `instance` override to the 581// client that should issue API calls for that workflow, plus the 582// concrete instance URL the row was/should-be persisted as. The 583// no-override (or matches-default) case reuses the long-lived default 584// client. Custom instances are memoised so repeated calls (per-poll, 585// per-Logs) reuse one Client — and therefore one connection pool — 586// per distinct upstream. 587func (p *sourcehutProvider) clientFor(instance string) (*sourcehut.Client, string) { 588 if instance == "" || instance == p.defaultInstance { 589 return p.defaultClient, p.defaultInstance 590 } 591 p.mu.Lock() 592 defer p.mu.Unlock() 593 if c, ok := p.instanceClients[instance]; ok { 594 return c, instance 595 } 596 c := sourcehut.NewClient(instance, p.token) 597 p.instanceClients[instance] = c 598 return c, instance 599} 600 601// shortCommit returns the first 12 hex chars of a commit SHA, or "?" 602// when commit is empty. 603func shortCommit(commit string) string { 604 if commit == "" { 605 return "?" 606 } 607 if len(commit) > 12 { 608 return commit[:12] 609 } 610 return commit 611}