Stitch any CI into Tangled
1package main
2
3// sourcehutProvider implements Provider against a builds.sr.ht
4// instance. Spawn submits one job per workflow through GraphQL's
5// `submit` mutation; status is observed via an in-process poll loop
6// — same shape as the Tekton provider.
7//
8// The build manifest a workflow targets is carried inline in the
9// workflow's YAML body under `tack.sourcehut.manifest`. Tack injects
10// a small set of TACK_* environment variables into the manifest so
11// the user's tasks can dispatch on the originating Tangled trigger.
12
13import (
14 "context"
15 "encoding/json"
16 "errors"
17 "fmt"
18 "log/slog"
19 "strings"
20 "sync"
21 "time"
22
23 "go.yaml.in/yaml/v2"
24 "tangled.org/core/api/tangled"
25
26 "go.mitchellh.com/tack/internal/sourcehut"
27)
28
29// defaultSourcehutPollInterval is how often the watch loop re-fetches
30// upstream job state.
31const defaultSourcehutPollInterval = 5 * time.Second
32
33// workflowSourcehutDoc is the tack-flavoured schema we expect inside
34// each Tangled workflow's Raw YAML body. Same nesting convention as
35// the other providers (`tack: { sourcehut: ... }`) so a workflow can
36// grow other top-level keys without colliding with our namespace.
37type workflowSourcehutDoc struct {
38 Tack struct {
39 Sourcehut sourcehutWorkflowConfig `yaml:"sourcehut"`
40 } `yaml:"tack"`
41}
42
43// sourcehutWorkflowConfig is the sourcehut-specific subset of a
44// workflow YAML.
45//
46// Manifest is the YAML build manifest the user wants builds.sr.ht to
47// run, exactly as they'd paste it into the web UI. We treat it as an
48// opaque string until Spawn time, then decode/inject env vars/re-emit
49// just before submitting so users keep authorial control of their
50// manifests.
51//
52// Instance optionally overrides the provider's default sourcehut host
53// (e.g. for users running their own builds.sr.ht). It must be a full
54// URL including scheme so we don't have to guess http vs https.
55//
56// Tags/Note flow straight through to the submit API — they're useful
57// for the user's own filtering on builds.sr.ht's job list view.
58//
59// Secrets controls whether the runner mounts the user's configured
60// sourcehut secrets. Default is false because secrets injection is a
61// blast-radius decision that should be opt-in per workflow.
62type sourcehutWorkflowConfig struct {
63 Manifest string `yaml:"manifest"`
64 Instance string `yaml:"instance"`
65 Tags []string `yaml:"tags"`
66 Note string `yaml:"note"`
67 Secrets bool `yaml:"secrets"`
68}
69
70// parseSourcehutWorkflowConfig decodes `tack.sourcehut` from a workflow
71// body and validates the small set of fields we require. An empty body
72// or a missing manifest is a structural error so spawnWorkflow can
73// short-circuit cleanly instead of submitting a malformed job that
74// builds.sr.ht would reject.
75func parseSourcehutWorkflowConfig(raw string) (*sourcehutWorkflowConfig, error) {
76 if strings.TrimSpace(raw) == "" {
77 return nil, errors.New("workflow body is empty")
78 }
79 var doc workflowSourcehutDoc
80 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
81 return nil, fmt.Errorf("parse workflow yaml: %w", err)
82 }
83 cfg := doc.Tack.Sourcehut
84 if strings.TrimSpace(cfg.Manifest) == "" {
85 return nil, errors.New("workflow yaml: `tack.sourcehut.manifest` is required")
86 }
87 return &cfg, nil
88}
89
90// sourcehutProvider implements Provider.
91//
92// defaultInstance is the builds.sr.ht instance Spawn submits to when
93// a workflow doesn't override it; the configured token must
94// authenticate against that instance. defaultClient is the matching
95// Client; per-workflow `instance` overrides cause Spawn to mint a
96// short-lived sibling Client carrying the same token, since one token
97// can address only one sourcehut deployment in practice.
98type sourcehutProvider struct {
99 br *broker
100 st *store
101 log *slog.Logger
102 token string
103 defaultClient *sourcehut.Client
104 defaultInstance string
105 pollInterval time.Duration
106
107 // instanceClients caches a Client per non-default instance URL so a
108 // workflow that targets a custom builds.sr.ht doesn't mint (and
109 // leak) a fresh http.Transport + connection pool on every Spawn,
110 // watch tick, and Logs call. The default instance always uses
111 // defaultClient and never lands here.
112 mu sync.Mutex
113 instanceClients map[string]*sourcehut.Client
114}
115
116var _ Provider = (*sourcehutProvider)(nil)
117
118// newSourcehutProvider wires a provider to its sourcehut client and
119// to the broker it publishes pipeline.status records on. instance is
120// the default builds.sr.ht base URL; pass empty to use
121// sourcehut.DefaultBaseURL.
122func newSourcehutProvider(
123 br *broker,
124 st *store,
125 token string,
126 instance string,
127 log *slog.Logger,
128) *sourcehutProvider {
129 if instance == "" {
130 instance = sourcehut.DefaultBaseURL
131 }
132 return &sourcehutProvider{
133 br: br,
134 st: st,
135 log: log.With("component", "provider", "kind", "sourcehut"),
136 token: token,
137 defaultClient: sourcehut.NewClient(instance, token),
138 defaultInstance: instance,
139 pollInterval: defaultSourcehutPollInterval,
140 instanceClients: map[string]*sourcehut.Client{},
141 }
142}
143
144// Spawn satisfies Provider. For each workflow it submits a separate
145// builds.sr.ht job so each workflow gets its own status timeline. The
146// actual API call runs on a goroutine — submission is one HTTP round
147// trip, but Spawn is contractually non-blocking.
148func (p *sourcehutProvider) Spawn(
149 ctx context.Context,
150 knot string,
151 pipelineRkey string,
152 actor string,
153 trigger *tangled.Pipeline_TriggerMetadata,
154 workflows []*tangled.Pipeline_Workflow,
155) {
156 if len(workflows) == 0 {
157 p.log.Warn("pipeline has no workflows; nothing to spawn",
158 "knot", knot, "rkey", pipelineRkey,
159 )
160 return
161 }
162 for _, wf := range workflows {
163 if wf == nil || wf.Name == "" {
164 continue
165 }
166 wf := wf
167 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
168 }
169}
170
171func (p *sourcehutProvider) spawnWorkflow(
172 ctx context.Context,
173 knot string,
174 pipelineRkey string,
175 actor string,
176 trigger *tangled.Pipeline_TriggerMetadata,
177 wf *tangled.Pipeline_Workflow,
178) {
179 logger := p.log.With(
180 "knot", knot,
181 "pipeline_rkey", pipelineRkey,
182 "workflow", wf.Name,
183 "actor", actor,
184 )
185
186 cfg, err := parseSourcehutWorkflowConfig(wf.Raw)
187 if err != nil {
188 logger.Error("invalid workflow config; refusing to spawn", "err", err)
189 return
190 }
191
192 commit, branch := triggerCommitAndBranch(trigger)
193 manifest, err := injectSourcehutEnvironment(cfg.Manifest, map[string]string{
194 "TACK_KNOT": knot,
195 "TACK_PIPELINE_RKEY": pipelineRkey,
196 "TACK_WORKFLOW": wf.Name,
197 "TACK_WORKFLOW_RAW": wf.Raw,
198 "TACK_ACTOR": actor,
199 "TACK_COMMIT": commit,
200 "TACK_BRANCH": branch,
201 })
202 if err != nil {
203 logger.Error("inject TACK_* env into manifest", "err", err)
204 return
205 }
206
207 client, instance := p.clientFor(cfg.Instance)
208 logger = logger.With("instance", instance)
209
210 tags := cfg.Tags
211 if len(tags) == 0 {
212 // Auto-tag with "tack" so an operator browsing the builds.sr.ht
213 // job list can filter to the jobs originating from this spindle
214 // without users having to remember to set tags themselves.
215 tags = []string{"tack"}
216 }
217 note := cfg.Note
218 if note == "" {
219 note = fmt.Sprintf("tangled: %s @ %s", wf.Name, shortCommit(commit))
220 }
221
222 job, err := client.SubmitJob(ctx, sourcehut.SubmitRequest{
223 Manifest: manifest,
224 Tags: tags,
225 Note: note,
226 Secrets: cfg.Secrets,
227 Execute: true,
228 })
229 if err != nil {
230 logger.Error("submit sourcehut job", "err", err)
231 return
232 }
233 logger.Info("sourcehut job submitted",
234 "job_id", job.ID,
235 "owner", job.Owner.CanonicalName,
236 "web_url", client.JobWebURL(job.Owner.CanonicalName, job.ID),
237 )
238
239 pipelineURI := pipelineATURI(knot, pipelineRkey)
240 ref := SourcehutJobRef{
241 Knot: knot,
242 PipelineRkey: pipelineRkey,
243 Workflow: wf.Name,
244 JobID: job.ID,
245 Owner: job.Owner.CanonicalName,
246 Instance: instance,
247 PipelineURI: pipelineURI,
248 }
249 if err := p.st.InsertSourcehutJob(ctx, ref); err != nil {
250 // Without the row the watch goroutine and any /logs lookup
251 // can't recover the job. Surface loudly and bail.
252 logger.Error("persist sourcehut job mapping", "err", err,
253 "job_id", job.ID,
254 )
255 return
256 }
257
258 if err := p.publishStatus(ctx, pipelineURI, wf.Name,
259 "pending", job.ID, nil, nil); err != nil {
260 logger.Error("publish initial pending status", "err", err)
261 }
262
263 go p.watchJob(ctx, ref, client)
264}
265
266// watchJob polls the job until it reaches a terminal state, publishing
267// pipeline.status records on every distinct transition. We mirror the
268// Tekton provider's structure: a single goroutine per job, suppressed
269// duplicate publishes via `last`, exit on terminal or ctx cancellation.
270func (p *sourcehutProvider) watchJob(
271 ctx context.Context,
272 ref SourcehutJobRef,
273 client *sourcehut.Client,
274) {
275 logger := p.log.With(
276 "knot", ref.Knot,
277 "pipeline_rkey", ref.PipelineRkey,
278 "workflow", ref.Workflow,
279 "job_id", ref.JobID,
280 "instance", ref.Instance,
281 )
282 logger.Debug("watchJob: starting")
283
284 last := ""
285 ticker := time.NewTicker(p.pollInterval)
286 defer ticker.Stop()
287
288 // Issue one immediate poll so a fast-completing job (e.g. a tiny
289 // manifest the runner finished before the first tick) doesn't sit
290 // at "pending" for the whole interval.
291 for {
292 job, err := client.GetJob(ctx, ref.JobID)
293 if errors.Is(err, sourcehut.ErrNotFound) {
294 logger.Warn("job disappeared while watching")
295 return
296 }
297 if err != nil {
298 logger.Debug("get sourcehut job", "err", err)
299 } else {
300 status, terminal, ok := mapSourcehutStatus(job.Status)
301 logger.Debug("watchJob: poll",
302 "upstream_status", job.Status,
303 "status", status, "terminal", terminal, "ok", ok, "last", last,
304 )
305 if !ok {
306 // Keep polling on an unrecognised status, but warn
307 // so a silent infinite loop is at least visible.
308 logger.Warn("unmapped sourcehut status; continuing to poll",
309 "upstream_status", job.Status,
310 )
311 } else if status != last {
312 last = status
313 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
314 status, ref.JobID, nil, nil); err != nil {
315 logger.Error("publish sourcehut status", "err", err, "status", status)
316 }
317 }
318 if ok && terminal {
319 logger.Debug("watchJob: terminal status reached; exiting", "status", status)
320 return
321 }
322 }
323
324 select {
325 case <-ctx.Done():
326 logger.Debug("watchJob: context cancelled")
327 return
328 case <-ticker.C:
329 }
330 }
331}
332
333// mapSourcehutStatus translates builds.sr.ht's job status into the
334// Tangled spindle StatusKind strings the appview consumes. The
335// upstream enum is uppercase over GraphQL; we lowercase first so a
336// surface change between cases doesn't fall to the not-mapped branch.
337func mapSourcehutStatus(state string) (status string, terminal bool, ok bool) {
338 switch strings.ToLower(state) {
339 case "pending", "queued":
340 return "pending", false, true
341 case "running":
342 return "running", false, true
343 case "success":
344 return "success", true, true
345 case "failed", "timeout":
346 return "failed", true, true
347 case "cancelled":
348 return "cancelled", true, true
349 default:
350 return "", false, false
351 }
352}
353
354// Logs satisfies Provider. We resolve the (knot, rkey, workflow) tuple
355// to a sourcehut job, fetch its tasks, and stream the per-task logs as
356// LogLine frames bracketed by per-task control frames.
357//
358// Snapshot read, not a tail — for an in-progress job each task's log
359// is fetched at its current length and the channel closes. The
360// appview's repeated /logs calls during a running build give us
361// "good enough" liveness.
362func (p *sourcehutProvider) Logs(
363 ctx context.Context,
364 knot string,
365 pipelineRkey string,
366 workflow string,
367) (<-chan LogLine, error) {
368 ref, err := p.st.LookupSourcehutJobByTuple(ctx, knot, pipelineRkey, workflow)
369 if err != nil {
370 return nil, fmt.Errorf("lookup sourcehut job: %w", err)
371 }
372 if ref == nil {
373 return nil, ErrLogsNotFound
374 }
375
376 client, _ := p.clientFor(ref.Instance)
377
378 job, err := client.GetJob(ctx, ref.JobID)
379 if err != nil {
380 if errors.Is(err, sourcehut.ErrNotFound) {
381 return nil, ErrLogsNotFound
382 }
383 return nil, fmt.Errorf("get sourcehut job: %w", err)
384 }
385
386 // Probe the master log up front to detect a systemic
387 // authorization failure (typically a token that lacks
388 // `builds.sr.ht/LOGS:RO`). Without this check, every per-task
389 // fetch in the goroutine below would 403 and `streamStep` would
390 // emit empty steps — leaving the operator with builds whose
391 // status updates correctly but whose logs are silently blank.
392 // ErrNotFound here is fine: the runner just hasn't produced any
393 // setup output yet.
394 if _, err := client.GetTaskLog(ctx, ref.JobID, ""); err != nil &&
395 !errors.Is(err, sourcehut.ErrNotFound) {
396 if errors.Is(err, sourcehut.ErrUnauthorized) {
397 return nil, fmt.Errorf(
398 "sourcehut log fetch unauthorized; "+
399 "token likely missing `builds.sr.ht/LOGS:RO`: %w", err,
400 )
401 }
402 return nil, fmt.Errorf("probe sourcehut task log: %w", err)
403 }
404
405 out := make(chan LogLine, 32)
406 go func() {
407 defer close(out)
408 stepID := 0
409 // "setup" is the master log: setup output (sources clone,
410 // package install) before any task runs. Always emit it as
411 // step 0 even if empty so the renderer has a stable timeline.
412 if !p.streamStep(ctx, out, client, ref, "setup", "", stepID) {
413 return
414 }
415 stepID++
416 for _, task := range job.Tasks {
417 name := task.Name
418 if name == "" {
419 name = fmt.Sprintf("task %d", stepID)
420 }
421 if !p.streamStep(ctx, out, client, ref, name, task.Name, stepID) {
422 return
423 }
424 stepID++
425 }
426 }()
427 return out, nil
428}
429
430// streamStep emits one step's worth of frames into out: a start
431// control, one data frame per non-empty log line, and an end control.
432// stepName is the human-visible label; logTask is the name passed to
433// GetTaskLog (empty string fetches the master log). Returns false if
434// the context fired and the caller should bail.
435func (p *sourcehutProvider) streamStep(
436 ctx context.Context,
437 out chan<- LogLine,
438 client *sourcehut.Client,
439 ref *SourcehutJobRef,
440 stepName, logTask string,
441 stepID int,
442) bool {
443 if !sendLine(ctx, out, LogLine{
444 Kind: LogKindControl, Time: time.Now(),
445 Content: stepName, StepId: stepID, StepStatus: StepStatusStart,
446 }) {
447 return false
448 }
449 body, err := client.GetTaskLog(ctx, ref.JobID, logTask)
450 if err != nil && !errors.Is(err, sourcehut.ErrNotFound) {
451 if errors.Is(err, sourcehut.ErrUnauthorized) {
452 // Auth errors are systemic — every subsequent task fetch
453 // will fail the same way. Log loudly and abort the stream
454 // so the operator notices instead of getting a build with
455 // every step rendered empty. The HTTP layer can't change
456 // status mid-stream, but at least the server log will
457 // point straight at the misconfigured token.
458 p.log.Error("sourcehut log fetch unauthorized; aborting stream",
459 "err", err, "job_id", ref.JobID, "task", logTask,
460 )
461 return false
462 }
463 // Don't fail the whole stream on one task; emit the end frame
464 // and move on so the renderer at least sees what other tasks
465 // produced. ErrNotFound (no log yet) is treated as an empty body.
466 p.log.Debug("fetch sourcehut task log",
467 "err", err, "job_id", ref.JobID, "task", logTask,
468 )
469 body = ""
470 }
471 if !emitLogBody(ctx, out, body, stepID) {
472 return false
473 }
474 return sendLine(ctx, out, LogLine{
475 Kind: LogKindControl, Time: time.Now(),
476 Content: stepName, StepId: stepID, StepStatus: StepStatusEnd,
477 })
478}
479
480// emitLogBody pushes one LogLine per non-empty line of body into out.
481// Returns false if the context fired so the caller can stop draining.
482func emitLogBody(ctx context.Context, out chan<- LogLine, body string, stepID int) bool {
483 if body == "" {
484 return true
485 }
486 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") {
487 if line == "" {
488 continue
489 }
490 if !sendLine(ctx, out, LogLine{
491 Kind: LogKindData,
492 Time: time.Now(),
493 Content: line + "\n",
494 StepId: stepID,
495 Stream: "stdout",
496 }) {
497 return false
498 }
499 }
500 return true
501}
502
503// publishStatus assembles a tangled.PipelineStatus record and pushes
504// it through the broker. jobID is mixed into the rkey so multiple
505// status records for the same job don't collide on the events table's
506// (rkey) uniqueness, and so an operator grepping the log can find
507// every record pertaining to a given sourcehut job.
508func (p *sourcehutProvider) publishStatus(
509 ctx context.Context,
510 pipelineURI, workflow, status string,
511 jobID int64,
512 errMsg *string,
513 exitCode *int64,
514) error {
515 rec := tangled.PipelineStatus{
516 LexiconTypeID: tangled.PipelineStatusNSID,
517 Pipeline: pipelineURI,
518 Workflow: workflow,
519 Status: status,
520 CreatedAt: time.Now().UTC().Format(time.RFC3339),
521 Error: errMsg,
522 ExitCode: exitCode,
523 }
524 body, err := json.Marshal(rec)
525 if err != nil {
526 return fmt.Errorf("marshal pipeline.status: %w", err)
527 }
528 rkey := fmt.Sprintf("sh-%d-%s-%d", jobID, status, time.Now().UnixNano())
529 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
530 return fmt.Errorf("publish pipeline.status: %w", err)
531 }
532 return nil
533}
534
535// injectSourcehutEnvironment decodes the user-supplied manifest,
536// merges env into its top-level `environment` map, and re-emits it
537// as YAML. We round-trip via map[string]any rather than a typed
538// schema so unknown fields aren't dropped; comments and key ordering
539// are not preserved.
540//
541// Existing user-set entries win over our injected vars — an explicit
542// `TACK_FOO: ...` is a deliberate override.
543//
544// `environment:` must be a map. A sequence form is rejected loudly
545// rather than silently replaced with our map.
546func injectSourcehutEnvironment(manifest string, env map[string]string) (string, error) {
547 var doc map[string]any
548 if err := yaml.Unmarshal([]byte(manifest), &doc); err != nil {
549 return "", fmt.Errorf("parse manifest: %w", err)
550 }
551 if doc == nil {
552 doc = map[string]any{}
553 }
554 var current map[any]any
555 switch existing := doc["environment"].(type) {
556 case nil:
557 current = map[any]any{}
558 case map[any]any:
559 current = existing
560 default:
561 return "", fmt.Errorf(
562 "manifest `environment` must be a map, got %T", existing,
563 )
564 }
565 for k, v := range env {
566 if _, exists := current[k]; exists {
567 continue
568 }
569 current[k] = v
570 }
571 doc["environment"] = current
572
573 out, err := yaml.Marshal(doc)
574 if err != nil {
575 return "", fmt.Errorf("emit manifest: %w", err)
576 }
577 return string(out), nil
578}
579
580// clientFor resolves a workflow's optional `instance` override to the
581// client that should issue API calls for that workflow, plus the
582// concrete instance URL the row was/should-be persisted as. The
583// no-override (or matches-default) case reuses the long-lived default
584// client. Custom instances are memoised so repeated calls (per-poll,
585// per-Logs) reuse one Client — and therefore one connection pool —
586// per distinct upstream.
587func (p *sourcehutProvider) clientFor(instance string) (*sourcehut.Client, string) {
588 if instance == "" || instance == p.defaultInstance {
589 return p.defaultClient, p.defaultInstance
590 }
591 p.mu.Lock()
592 defer p.mu.Unlock()
593 if c, ok := p.instanceClients[instance]; ok {
594 return c, instance
595 }
596 c := sourcehut.NewClient(instance, p.token)
597 p.instanceClients[instance] = c
598 return c, instance
599}
600
601// shortCommit returns the first 12 hex chars of a commit SHA, or "?"
602// when commit is empty.
603func shortCommit(commit string) string {
604 if commit == "" {
605 return "?"
606 }
607 if len(commit) > 12 {
608 return commit[:12]
609 }
610 return commit
611}