Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// sourcehutProvider implements Provider against a builds.sr.ht 4// instance. Spawn submits one job per workflow through GraphQL's 5// `submit` mutation; status is observed via an in-process poll loop 6// — same shape as the Tekton provider. 7// 8// The build manifest a workflow targets is carried inline in the 9// workflow's YAML body under `tack.sourcehut.manifest`. Tack injects 10// a small set of TACK_* environment variables into the manifest so 11// the user's tasks can dispatch on the originating Tangled trigger. 12 13import ( 14 "context" 15 "encoding/json" 16 "errors" 17 "fmt" 18 "log/slog" 19 "strings" 20 "sync" 21 "time" 22 23 "go.yaml.in/yaml/v2" 24 "tangled.org/core/api/tangled" 25 26 "go.mitchellh.com/tack/internal/sourcehut" 27) 28 29// defaultSourcehutPollInterval is how often the watch loop re-fetches 30// upstream job state. 31const defaultSourcehutPollInterval = 5 * time.Second 32 33// workflowSourcehutDoc is the tack-flavoured schema we expect inside 34// each Tangled workflow's Raw YAML body. Same nesting convention as 35// the other providers (`tack: { sourcehut: ... }`) so a workflow can 36// grow other top-level keys without colliding with our namespace. 37type workflowSourcehutDoc struct { 38 Tack struct { 39 Sourcehut sourcehutWorkflowConfig `yaml:"sourcehut"` 40 } `yaml:"tack"` 41} 42 43// sourcehutWorkflowConfig is the sourcehut-specific subset of a 44// workflow YAML. 45// 46// Manifest is the YAML build manifest the user wants builds.sr.ht to 47// run, exactly as they'd paste it into the web UI. We treat it as an 48// opaque string until Spawn time, then decode/inject env vars/re-emit 49// just before submitting so users keep authorial control of their 50// manifests. 51// 52// Instance optionally overrides the provider's default sourcehut host 53// (e.g. for users running their own builds.sr.ht). It must be a full 54// URL including scheme so we don't have to guess http vs https. 55// 56// Tags/Note flow straight through to the submit API — they're useful 57// for the user's own filtering on builds.sr.ht's job list view. 58// 59// Secrets controls whether the runner mounts the user's configured 60// sourcehut secrets. Default is false because secrets injection is a 61// blast-radius decision that should be opt-in per workflow. 62type sourcehutWorkflowConfig struct { 63 Manifest string `yaml:"manifest"` 64 Instance string `yaml:"instance"` 65 Tags []string `yaml:"tags"` 66 Note string `yaml:"note"` 67 Secrets bool `yaml:"secrets"` 68} 69 70// parseSourcehutWorkflowConfig decodes `tack.sourcehut` from a workflow 71// body and validates the small set of fields we require. An empty body 72// or a missing manifest is a structural error so spawnWorkflow can 73// short-circuit cleanly instead of submitting a malformed job that 74// builds.sr.ht would reject. 75func parseSourcehutWorkflowConfig(raw string) (*sourcehutWorkflowConfig, error) { 76 if strings.TrimSpace(raw) == "" { 77 return nil, errors.New("workflow body is empty") 78 } 79 var doc workflowSourcehutDoc 80 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 81 return nil, fmt.Errorf("parse workflow yaml: %w", err) 82 } 83 cfg := doc.Tack.Sourcehut 84 if strings.TrimSpace(cfg.Manifest) == "" { 85 return nil, errors.New("workflow yaml: `tack.sourcehut.manifest` is required") 86 } 87 return &cfg, nil 88} 89 90// sourcehutProvider implements Provider. 91// 92// defaultInstance is the builds.sr.ht instance Spawn submits to when 93// a workflow doesn't override it; the configured token must 94// authenticate against that instance. defaultClient is the matching 95// Client; per-workflow `instance` overrides cause Spawn to mint a 96// short-lived sibling Client carrying the same token, since one token 97// can address only one sourcehut deployment in practice. 98type sourcehutProvider struct { 99 br *broker 100 st *store 101 log *slog.Logger 102 token string 103 defaultClient *sourcehut.Client 104 defaultInstance string 105 pollInterval time.Duration 106 107 // instanceClients caches a Client per non-default instance URL so a 108 // workflow that targets a custom builds.sr.ht doesn't mint (and 109 // leak) a fresh http.Transport + connection pool on every Spawn, 110 // watch tick, and Logs call. The default instance always uses 111 // defaultClient and never lands here. 112 mu sync.Mutex 113 instanceClients map[string]*sourcehut.Client 114} 115 116var _ Provider = (*sourcehutProvider)(nil) 117 118// newSourcehutProvider wires a provider to its sourcehut client and 119// to the broker it publishes pipeline.status records on. instance is 120// the default builds.sr.ht base URL; pass empty to use 121// sourcehut.DefaultBaseURL. 122func newSourcehutProvider( 123 br *broker, 124 st *store, 125 token string, 126 instance string, 127 log *slog.Logger, 128) *sourcehutProvider { 129 if instance == "" { 130 instance = sourcehut.DefaultBaseURL 131 } 132 return &sourcehutProvider{ 133 br: br, 134 st: st, 135 log: log.With("component", "provider", "kind", "sourcehut"), 136 token: token, 137 defaultClient: sourcehut.NewClient(instance, token), 138 defaultInstance: instance, 139 pollInterval: defaultSourcehutPollInterval, 140 instanceClients: map[string]*sourcehut.Client{}, 141 } 142} 143 144// Spawn satisfies Provider. For each workflow it submits a separate 145// builds.sr.ht job so each workflow gets its own status timeline. The 146// actual API call runs on a goroutine — submission is one HTTP round 147// trip, but Spawn is contractually non-blocking. 148func (p *sourcehutProvider) Spawn( 149 ctx context.Context, 150 knot string, 151 pipelineRkey string, 152 actor string, 153 trigger *tangled.Pipeline_TriggerMetadata, 154 workflows []*tangled.Pipeline_Workflow, 155) { 156 if len(workflows) == 0 { 157 p.log.Warn("pipeline has no workflows; nothing to spawn", 158 "knot", knot, "rkey", pipelineRkey, 159 ) 160 return 161 } 162 for _, wf := range workflows { 163 if wf == nil || wf.Name == "" { 164 continue 165 } 166 wf := wf 167 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 168 } 169} 170 171func (p *sourcehutProvider) spawnWorkflow( 172 ctx context.Context, 173 knot string, 174 pipelineRkey string, 175 actor string, 176 trigger *tangled.Pipeline_TriggerMetadata, 177 wf *tangled.Pipeline_Workflow, 178) { 179 logger := p.log.With( 180 "knot", knot, 181 "pipeline_rkey", pipelineRkey, 182 "workflow", wf.Name, 183 "actor", actor, 184 ) 185 186 cfg, err := parseSourcehutWorkflowConfig(wf.Raw) 187 if err != nil { 188 logger.Error("invalid workflow config; refusing to spawn", "err", err) 189 return 190 } 191 192 commit, branch := triggerCommitAndBranch(trigger) 193 manifest, err := injectSourcehutEnvironment(cfg.Manifest, map[string]string{ 194 "TACK_KNOT": knot, 195 "TACK_PIPELINE_RKEY": pipelineRkey, 196 "TACK_WORKFLOW": wf.Name, 197 "TACK_WORKFLOW_RAW": wf.Raw, 198 "TACK_ACTOR": actor, 199 "TACK_COMMIT": commit, 200 "TACK_BRANCH": branch, 201 }) 202 if err != nil { 203 logger.Error("inject TACK_* env into manifest", "err", err) 204 return 205 } 206 207 client, instance := p.clientFor(cfg.Instance) 208 logger = logger.With("instance", instance) 209 210 tags := cfg.Tags 211 if len(tags) == 0 { 212 // Auto-tag with "tack" so an operator browsing the builds.sr.ht 213 // job list can filter to the jobs originating from this spindle 214 // without users having to remember to set tags themselves. 215 tags = []string{"tack"} 216 } 217 note := cfg.Note 218 if note == "" { 219 note = fmt.Sprintf("tangled: %s @ %s", wf.Name, shortCommit(commit)) 220 } 221 222 job, err := client.SubmitJob(ctx, sourcehut.SubmitRequest{ 223 Manifest: manifest, 224 Tags: tags, 225 Note: note, 226 Secrets: cfg.Secrets, 227 Execute: true, 228 }) 229 if err != nil { 230 logger.Error("submit sourcehut job", "err", err) 231 return 232 } 233 logger.Info("sourcehut job submitted", 234 "job_id", job.ID, 235 "owner", job.Owner.CanonicalName, 236 "web_url", client.JobWebURL(job.Owner.CanonicalName, job.ID), 237 ) 238 239 pipelineURI := pipelineATURI(knot, pipelineRkey) 240 ref := SourcehutJobRef{ 241 Knot: knot, 242 PipelineRkey: pipelineRkey, 243 Workflow: wf.Name, 244 JobID: job.ID, 245 Owner: job.Owner.CanonicalName, 246 Instance: instance, 247 PipelineURI: pipelineURI, 248 } 249 if err := p.st.InsertSourcehutJob(ctx, ref); err != nil { 250 // Without the row the watch goroutine and any /logs lookup 251 // can't recover the job. Surface loudly and bail. 252 logger.Error("persist sourcehut job mapping", "err", err, 253 "job_id", job.ID, 254 ) 255 return 256 } 257 258 if err := p.publishStatus(ctx, pipelineURI, wf.Name, 259 "pending", job.ID, nil, nil); err != nil { 260 logger.Error("publish initial pending status", "err", err) 261 } 262 263 go p.watchJob(ctx, ref, client) 264} 265 266// watchJob polls the job until it reaches a terminal state, publishing 267// pipeline.status records on every distinct transition. We mirror the 268// Tekton provider's structure: a single goroutine per job, suppressed 269// duplicate publishes via `last`, exit on terminal or ctx cancellation. 270func (p *sourcehutProvider) watchJob( 271 ctx context.Context, 272 ref SourcehutJobRef, 273 client *sourcehut.Client, 274) { 275 logger := p.log.With( 276 "knot", ref.Knot, 277 "pipeline_rkey", ref.PipelineRkey, 278 "workflow", ref.Workflow, 279 "job_id", ref.JobID, 280 "instance", ref.Instance, 281 ) 282 logger.Debug("watchJob: starting") 283 284 last := "" 285 ticker := time.NewTicker(p.pollInterval) 286 defer ticker.Stop() 287 288 // Issue one immediate poll so a fast-completing job (e.g. a tiny 289 // manifest the runner finished before the first tick) doesn't sit 290 // at "pending" for the whole interval. 291 for { 292 job, err := client.GetJob(ctx, ref.JobID) 293 if errors.Is(err, sourcehut.ErrNotFound) { 294 logger.Warn("job disappeared while watching") 295 return 296 } 297 if err != nil { 298 logger.Debug("get sourcehut job", "err", err) 299 } else { 300 status, terminal, ok := mapSourcehutStatus(job.Status) 301 logger.Debug("watchJob: poll", 302 "upstream_status", job.Status, 303 "status", status, "terminal", terminal, "ok", ok, "last", last, 304 ) 305 if !ok { 306 // Keep polling on an unrecognised status, but warn 307 // so a silent infinite loop is at least visible. 308 logger.Warn("unmapped sourcehut status; continuing to poll", 309 "upstream_status", job.Status, 310 ) 311 } else if status != last { 312 last = status 313 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 314 status, ref.JobID, nil, nil); err != nil { 315 logger.Error("publish sourcehut status", "err", err, "status", status) 316 } 317 } 318 if ok && terminal { 319 logger.Debug("watchJob: terminal status reached; exiting", "status", status) 320 return 321 } 322 } 323 324 select { 325 case <-ctx.Done(): 326 logger.Debug("watchJob: context cancelled") 327 return 328 case <-ticker.C: 329 } 330 } 331} 332 333// mapSourcehutStatus translates builds.sr.ht's job status into the 334// Tangled spindle StatusKind strings the appview consumes. The 335// upstream enum is uppercase over GraphQL; we lowercase first so a 336// surface change between cases doesn't fall to the not-mapped branch. 337func mapSourcehutStatus(state string) (status string, terminal bool, ok bool) { 338 switch strings.ToLower(state) { 339 case "pending", "queued": 340 return "pending", false, true 341 case "running": 342 return "running", false, true 343 case "success": 344 return "success", true, true 345 case "failed", "timeout": 346 return "failed", true, true 347 case "cancelled": 348 return "cancelled", true, true 349 default: 350 return "", false, false 351 } 352} 353 354// Logs satisfies Provider. We resolve the (knot, rkey, workflow) tuple 355// to a sourcehut job, fetch its tasks, and stream the per-task logs as 356// LogLine frames bracketed by per-task control frames. 357// 358// Snapshot read, not a tail — for an in-progress job each task's log 359// is fetched at its current length and the channel closes. The 360// appview's repeated /logs calls during a running build give us 361// "good enough" liveness. 362func (p *sourcehutProvider) Logs( 363 ctx context.Context, 364 knot string, 365 pipelineRkey string, 366 workflow string, 367) (<-chan LogLine, error) { 368 ref, err := p.st.LookupSourcehutJobByTuple(ctx, knot, pipelineRkey, workflow) 369 if err != nil { 370 return nil, fmt.Errorf("lookup sourcehut job: %w", err) 371 } 372 if ref == nil { 373 return nil, ErrLogsNotFound 374 } 375 376 client, _ := p.clientFor(ref.Instance) 377 378 job, err := client.GetJob(ctx, ref.JobID) 379 if err != nil { 380 if errors.Is(err, sourcehut.ErrNotFound) { 381 return nil, ErrLogsNotFound 382 } 383 return nil, fmt.Errorf("get sourcehut job: %w", err) 384 } 385 386 out := make(chan LogLine, 32) 387 go func() { 388 defer close(out) 389 stepID := 0 390 // "setup" is the master log: setup output (sources clone, 391 // package install) before any task runs. Always emit it as 392 // step 0 even if empty so the renderer has a stable timeline. 393 if !p.streamStep(ctx, out, client, ref, "setup", "", stepID) { 394 return 395 } 396 stepID++ 397 for _, task := range job.Tasks { 398 name := task.Name 399 if name == "" { 400 name = fmt.Sprintf("task %d", stepID) 401 } 402 if !p.streamStep(ctx, out, client, ref, name, task.Name, stepID) { 403 return 404 } 405 stepID++ 406 } 407 }() 408 return out, nil 409} 410 411// streamStep emits one step's worth of frames into out: a start 412// control, one data frame per non-empty log line, and an end control. 413// stepName is the human-visible label; logTask is the name passed to 414// GetTaskLog (empty string fetches the master log). Returns false if 415// the context fired and the caller should bail. 416func (p *sourcehutProvider) streamStep( 417 ctx context.Context, 418 out chan<- LogLine, 419 client *sourcehut.Client, 420 ref *SourcehutJobRef, 421 stepName, logTask string, 422 stepID int, 423) bool { 424 if !sendLine(ctx, out, LogLine{ 425 Kind: LogKindControl, Time: time.Now(), 426 Content: stepName, StepId: stepID, StepStatus: StepStatusStart, 427 }) { 428 return false 429 } 430 body, err := client.GetTaskLog(ctx, ref.JobID, logTask) 431 if err != nil && !errors.Is(err, sourcehut.ErrNotFound) { 432 // Don't fail the whole stream on one task; emit the end frame 433 // and move on so the renderer at least sees what other tasks 434 // produced. ErrNotFound (no log yet) is treated as an empty body. 435 p.log.Debug("fetch sourcehut task log", 436 "err", err, "job_id", ref.JobID, "task", logTask, 437 ) 438 body = "" 439 } 440 if !emitLogBody(ctx, out, body, stepID) { 441 return false 442 } 443 return sendLine(ctx, out, LogLine{ 444 Kind: LogKindControl, Time: time.Now(), 445 Content: stepName, StepId: stepID, StepStatus: StepStatusEnd, 446 }) 447} 448 449// emitLogBody pushes one LogLine per non-empty line of body into out. 450// Returns false if the context fired so the caller can stop draining. 451func emitLogBody(ctx context.Context, out chan<- LogLine, body string, stepID int) bool { 452 if body == "" { 453 return true 454 } 455 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 456 if line == "" { 457 continue 458 } 459 if !sendLine(ctx, out, LogLine{ 460 Kind: LogKindData, 461 Time: time.Now(), 462 Content: line + "\n", 463 StepId: stepID, 464 Stream: "stdout", 465 }) { 466 return false 467 } 468 } 469 return true 470} 471 472// publishStatus assembles a tangled.PipelineStatus record and pushes 473// it through the broker. jobID is mixed into the rkey so multiple 474// status records for the same job don't collide on the events table's 475// (rkey) uniqueness, and so an operator grepping the log can find 476// every record pertaining to a given sourcehut job. 477func (p *sourcehutProvider) publishStatus( 478 ctx context.Context, 479 pipelineURI, workflow, status string, 480 jobID int64, 481 errMsg *string, 482 exitCode *int64, 483) error { 484 rec := tangled.PipelineStatus{ 485 LexiconTypeID: tangled.PipelineStatusNSID, 486 Pipeline: pipelineURI, 487 Workflow: workflow, 488 Status: status, 489 CreatedAt: time.Now().UTC().Format(time.RFC3339), 490 Error: errMsg, 491 ExitCode: exitCode, 492 } 493 body, err := json.Marshal(rec) 494 if err != nil { 495 return fmt.Errorf("marshal pipeline.status: %w", err) 496 } 497 rkey := fmt.Sprintf("sh-%d-%s-%d", jobID, status, time.Now().UnixNano()) 498 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 499 return fmt.Errorf("publish pipeline.status: %w", err) 500 } 501 return nil 502} 503 504// injectSourcehutEnvironment decodes the user-supplied manifest, 505// merges env into its top-level `environment` map, and re-emits it 506// as YAML. We round-trip via map[string]any rather than a typed 507// schema so unknown fields aren't dropped; comments and key ordering 508// are not preserved. 509// 510// Existing user-set entries win over our injected vars — an explicit 511// `TACK_FOO: ...` is a deliberate override. 512// 513// `environment:` must be a map. A sequence form is rejected loudly 514// rather than silently replaced with our map. 515func injectSourcehutEnvironment(manifest string, env map[string]string) (string, error) { 516 var doc map[string]any 517 if err := yaml.Unmarshal([]byte(manifest), &doc); err != nil { 518 return "", fmt.Errorf("parse manifest: %w", err) 519 } 520 if doc == nil { 521 doc = map[string]any{} 522 } 523 var current map[any]any 524 switch existing := doc["environment"].(type) { 525 case nil: 526 current = map[any]any{} 527 case map[any]any: 528 current = existing 529 default: 530 return "", fmt.Errorf( 531 "manifest `environment` must be a map, got %T", existing, 532 ) 533 } 534 for k, v := range env { 535 if _, exists := current[k]; exists { 536 continue 537 } 538 current[k] = v 539 } 540 doc["environment"] = current 541 542 out, err := yaml.Marshal(doc) 543 if err != nil { 544 return "", fmt.Errorf("emit manifest: %w", err) 545 } 546 return string(out), nil 547} 548 549// clientFor resolves a workflow's optional `instance` override to the 550// client that should issue API calls for that workflow, plus the 551// concrete instance URL the row was/should-be persisted as. The 552// no-override (or matches-default) case reuses the long-lived default 553// client. Custom instances are memoised so repeated calls (per-poll, 554// per-Logs) reuse one Client — and therefore one connection pool — 555// per distinct upstream. 556func (p *sourcehutProvider) clientFor(instance string) (*sourcehut.Client, string) { 557 if instance == "" || instance == p.defaultInstance { 558 return p.defaultClient, p.defaultInstance 559 } 560 p.mu.Lock() 561 defer p.mu.Unlock() 562 if c, ok := p.instanceClients[instance]; ok { 563 return c, instance 564 } 565 c := sourcehut.NewClient(instance, p.token) 566 p.instanceClients[instance] = c 567 return c, instance 568} 569 570// shortCommit returns the first 12 hex chars of a commit SHA, or "?" 571// when commit is empty. 572func shortCommit(commit string) string { 573 if commit == "" { 574 return "?" 575 } 576 if len(commit) > 12 { 577 return commit[:12] 578 } 579 return commit 580}