Stitch any CI into Tangled
1package main
2
3// sourcehutProvider implements Provider against a builds.sr.ht
4// instance. Spawn submits one job per workflow through GraphQL's
5// `submit` mutation; status is observed via an in-process poll loop
6// — same shape as the Tekton provider.
7//
8// The build manifest a workflow targets is carried inline in the
9// workflow's YAML body under `tack.sourcehut.manifest`. Tack injects
10// a small set of TACK_* environment variables into the manifest so
11// the user's tasks can dispatch on the originating Tangled trigger.
12
13import (
14 "context"
15 "encoding/json"
16 "errors"
17 "fmt"
18 "log/slog"
19 "strings"
20 "sync"
21 "time"
22
23 "go.yaml.in/yaml/v2"
24 "tangled.org/core/api/tangled"
25
26 "go.mitchellh.com/tack/internal/sourcehut"
27)
28
29// defaultSourcehutPollInterval is how often the watch loop re-fetches
30// upstream job state.
31const defaultSourcehutPollInterval = 5 * time.Second
32
33// workflowSourcehutDoc is the tack-flavoured schema we expect inside
34// each Tangled workflow's Raw YAML body. Same nesting convention as
35// the other providers (`tack: { sourcehut: ... }`) so a workflow can
36// grow other top-level keys without colliding with our namespace.
37type workflowSourcehutDoc struct {
38 Tack struct {
39 Sourcehut sourcehutWorkflowConfig `yaml:"sourcehut"`
40 } `yaml:"tack"`
41}
42
43// sourcehutWorkflowConfig is the sourcehut-specific subset of a
44// workflow YAML.
45//
46// Manifest is the YAML build manifest the user wants builds.sr.ht to
47// run, exactly as they'd paste it into the web UI. We treat it as an
48// opaque string until Spawn time, then decode/inject env vars/re-emit
49// just before submitting so users keep authorial control of their
50// manifests.
51//
52// Instance optionally overrides the provider's default sourcehut host
53// (e.g. for users running their own builds.sr.ht). It must be a full
54// URL including scheme so we don't have to guess http vs https.
55//
56// Tags/Note flow straight through to the submit API — they're useful
57// for the user's own filtering on builds.sr.ht's job list view.
58//
59// Secrets controls whether the runner mounts the user's configured
60// sourcehut secrets. Default is false because secrets injection is a
61// blast-radius decision that should be opt-in per workflow.
62type sourcehutWorkflowConfig struct {
63 Manifest string `yaml:"manifest"`
64 Instance string `yaml:"instance"`
65 Tags []string `yaml:"tags"`
66 Note string `yaml:"note"`
67 Secrets bool `yaml:"secrets"`
68}
69
70// parseSourcehutWorkflowConfig decodes `tack.sourcehut` from a workflow
71// body and validates the small set of fields we require. An empty body
72// or a missing manifest is a structural error so spawnWorkflow can
73// short-circuit cleanly instead of submitting a malformed job that
74// builds.sr.ht would reject.
75func parseSourcehutWorkflowConfig(raw string) (*sourcehutWorkflowConfig, error) {
76 if strings.TrimSpace(raw) == "" {
77 return nil, errors.New("workflow body is empty")
78 }
79 var doc workflowSourcehutDoc
80 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
81 return nil, fmt.Errorf("parse workflow yaml: %w", err)
82 }
83 cfg := doc.Tack.Sourcehut
84 if strings.TrimSpace(cfg.Manifest) == "" {
85 return nil, errors.New("workflow yaml: `tack.sourcehut.manifest` is required")
86 }
87 return &cfg, nil
88}
89
90// sourcehutProvider implements Provider.
91//
92// defaultInstance is the builds.sr.ht instance Spawn submits to when
93// a workflow doesn't override it; the configured token must
94// authenticate against that instance. defaultClient is the matching
95// Client; per-workflow `instance` overrides cause Spawn to mint a
96// short-lived sibling Client carrying the same token, since one token
97// can address only one sourcehut deployment in practice.
98type sourcehutProvider struct {
99 br *broker
100 st *store
101 log *slog.Logger
102 token string
103 defaultClient *sourcehut.Client
104 defaultInstance string
105 pollInterval time.Duration
106
107 // instanceClients caches a Client per non-default instance URL so a
108 // workflow that targets a custom builds.sr.ht doesn't mint (and
109 // leak) a fresh http.Transport + connection pool on every Spawn,
110 // watch tick, and Logs call. The default instance always uses
111 // defaultClient and never lands here.
112 mu sync.Mutex
113 instanceClients map[string]*sourcehut.Client
114}
115
116var _ Provider = (*sourcehutProvider)(nil)
117
118// newSourcehutProvider wires a provider to its sourcehut client and
119// to the broker it publishes pipeline.status records on. instance is
120// the default builds.sr.ht base URL; pass empty to use
121// sourcehut.DefaultBaseURL.
122func newSourcehutProvider(
123 br *broker,
124 st *store,
125 token string,
126 instance string,
127 log *slog.Logger,
128) *sourcehutProvider {
129 if instance == "" {
130 instance = sourcehut.DefaultBaseURL
131 }
132 return &sourcehutProvider{
133 br: br,
134 st: st,
135 log: log.With("component", "provider", "kind", "sourcehut"),
136 token: token,
137 defaultClient: sourcehut.NewClient(instance, token),
138 defaultInstance: instance,
139 pollInterval: defaultSourcehutPollInterval,
140 instanceClients: map[string]*sourcehut.Client{},
141 }
142}
143
144// Spawn satisfies Provider. For each workflow it submits a separate
145// builds.sr.ht job so each workflow gets its own status timeline. The
146// actual API call runs on a goroutine — submission is one HTTP round
147// trip, but Spawn is contractually non-blocking.
148func (p *sourcehutProvider) Spawn(
149 ctx context.Context,
150 knot string,
151 pipelineRkey string,
152 actor string,
153 trigger *tangled.Pipeline_TriggerMetadata,
154 workflows []*tangled.Pipeline_Workflow,
155) {
156 if len(workflows) == 0 {
157 p.log.Warn("pipeline has no workflows; nothing to spawn",
158 "knot", knot, "rkey", pipelineRkey,
159 )
160 return
161 }
162 for _, wf := range workflows {
163 if wf == nil || wf.Name == "" {
164 continue
165 }
166 wf := wf
167 go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf)
168 }
169}
170
171func (p *sourcehutProvider) spawnWorkflow(
172 ctx context.Context,
173 knot string,
174 pipelineRkey string,
175 actor string,
176 trigger *tangled.Pipeline_TriggerMetadata,
177 wf *tangled.Pipeline_Workflow,
178) {
179 logger := p.log.With(
180 "knot", knot,
181 "pipeline_rkey", pipelineRkey,
182 "workflow", wf.Name,
183 "actor", actor,
184 )
185
186 cfg, err := parseSourcehutWorkflowConfig(wf.Raw)
187 if err != nil {
188 logger.Error("invalid workflow config; refusing to spawn", "err", err)
189 return
190 }
191
192 commit, branch := triggerCommitAndBranch(trigger)
193 manifest, err := injectSourcehutEnvironment(cfg.Manifest, map[string]string{
194 "TACK_KNOT": knot,
195 "TACK_PIPELINE_RKEY": pipelineRkey,
196 "TACK_WORKFLOW": wf.Name,
197 "TACK_WORKFLOW_RAW": wf.Raw,
198 "TACK_ACTOR": actor,
199 "TACK_COMMIT": commit,
200 "TACK_BRANCH": branch,
201 })
202 if err != nil {
203 logger.Error("inject TACK_* env into manifest", "err", err)
204 return
205 }
206
207 client, instance := p.clientFor(cfg.Instance)
208 logger = logger.With("instance", instance)
209
210 tags := cfg.Tags
211 if len(tags) == 0 {
212 // Auto-tag with "tack" so an operator browsing the builds.sr.ht
213 // job list can filter to the jobs originating from this spindle
214 // without users having to remember to set tags themselves.
215 tags = []string{"tack"}
216 }
217 note := cfg.Note
218 if note == "" {
219 note = fmt.Sprintf("tangled: %s @ %s", wf.Name, shortCommit(commit))
220 }
221
222 job, err := client.SubmitJob(ctx, sourcehut.SubmitRequest{
223 Manifest: manifest,
224 Tags: tags,
225 Note: note,
226 Secrets: cfg.Secrets,
227 Execute: true,
228 })
229 if err != nil {
230 logger.Error("submit sourcehut job", "err", err)
231 return
232 }
233 logger.Info("sourcehut job submitted",
234 "job_id", job.ID,
235 "owner", job.Owner.CanonicalName,
236 "web_url", client.JobWebURL(job.Owner.CanonicalName, job.ID),
237 )
238
239 pipelineURI := pipelineATURI(knot, pipelineRkey)
240 ref := SourcehutJobRef{
241 Knot: knot,
242 PipelineRkey: pipelineRkey,
243 Workflow: wf.Name,
244 JobID: job.ID,
245 Owner: job.Owner.CanonicalName,
246 Instance: instance,
247 PipelineURI: pipelineURI,
248 }
249 if err := p.st.InsertSourcehutJob(ctx, ref); err != nil {
250 // Without the row the watch goroutine and any /logs lookup
251 // can't recover the job. Surface loudly and bail.
252 logger.Error("persist sourcehut job mapping", "err", err,
253 "job_id", job.ID,
254 )
255 return
256 }
257
258 if err := p.publishStatus(ctx, pipelineURI, wf.Name,
259 "pending", job.ID, nil, nil); err != nil {
260 logger.Error("publish initial pending status", "err", err)
261 }
262
263 go p.watchJob(ctx, ref, client)
264}
265
266// watchJob polls the job until it reaches a terminal state, publishing
267// pipeline.status records on every distinct transition. We mirror the
268// Tekton provider's structure: a single goroutine per job, suppressed
269// duplicate publishes via `last`, exit on terminal or ctx cancellation.
270func (p *sourcehutProvider) watchJob(
271 ctx context.Context,
272 ref SourcehutJobRef,
273 client *sourcehut.Client,
274) {
275 logger := p.log.With(
276 "knot", ref.Knot,
277 "pipeline_rkey", ref.PipelineRkey,
278 "workflow", ref.Workflow,
279 "job_id", ref.JobID,
280 "instance", ref.Instance,
281 )
282 logger.Debug("watchJob: starting")
283
284 last := ""
285 ticker := time.NewTicker(p.pollInterval)
286 defer ticker.Stop()
287
288 // Issue one immediate poll so a fast-completing job (e.g. a tiny
289 // manifest the runner finished before the first tick) doesn't sit
290 // at "pending" for the whole interval.
291 for {
292 job, err := client.GetJob(ctx, ref.JobID)
293 if errors.Is(err, sourcehut.ErrNotFound) {
294 logger.Warn("job disappeared while watching")
295 return
296 }
297 if err != nil {
298 logger.Debug("get sourcehut job", "err", err)
299 } else {
300 status, terminal, ok := mapSourcehutStatus(job.Status)
301 logger.Debug("watchJob: poll",
302 "upstream_status", job.Status,
303 "status", status, "terminal", terminal, "ok", ok, "last", last,
304 )
305 if !ok {
306 // Keep polling on an unrecognised status, but warn
307 // so a silent infinite loop is at least visible.
308 logger.Warn("unmapped sourcehut status; continuing to poll",
309 "upstream_status", job.Status,
310 )
311 } else if status != last {
312 last = status
313 if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow,
314 status, ref.JobID, nil, nil); err != nil {
315 logger.Error("publish sourcehut status", "err", err, "status", status)
316 }
317 }
318 if ok && terminal {
319 logger.Debug("watchJob: terminal status reached; exiting", "status", status)
320 return
321 }
322 }
323
324 select {
325 case <-ctx.Done():
326 logger.Debug("watchJob: context cancelled")
327 return
328 case <-ticker.C:
329 }
330 }
331}
332
333// mapSourcehutStatus translates builds.sr.ht's job status into the
334// Tangled spindle StatusKind strings the appview consumes. The
335// upstream enum is uppercase over GraphQL; we lowercase first so a
336// surface change between cases doesn't fall to the not-mapped branch.
337func mapSourcehutStatus(state string) (status string, terminal bool, ok bool) {
338 switch strings.ToLower(state) {
339 case "pending", "queued":
340 return "pending", false, true
341 case "running":
342 return "running", false, true
343 case "success":
344 return "success", true, true
345 case "failed", "timeout":
346 return "failed", true, true
347 case "cancelled":
348 return "cancelled", true, true
349 default:
350 return "", false, false
351 }
352}
353
354// Logs satisfies Provider. We resolve the (knot, rkey, workflow) tuple
355// to a sourcehut job, fetch its tasks, and stream the per-task logs as
356// LogLine frames bracketed by per-task control frames.
357//
358// Snapshot read, not a tail — for an in-progress job each task's log
359// is fetched at its current length and the channel closes. The
360// appview's repeated /logs calls during a running build give us
361// "good enough" liveness.
362func (p *sourcehutProvider) Logs(
363 ctx context.Context,
364 knot string,
365 pipelineRkey string,
366 workflow string,
367) (<-chan LogLine, error) {
368 ref, err := p.st.LookupSourcehutJobByTuple(ctx, knot, pipelineRkey, workflow)
369 if err != nil {
370 return nil, fmt.Errorf("lookup sourcehut job: %w", err)
371 }
372 if ref == nil {
373 return nil, ErrLogsNotFound
374 }
375
376 client, _ := p.clientFor(ref.Instance)
377
378 job, err := client.GetJob(ctx, ref.JobID)
379 if err != nil {
380 if errors.Is(err, sourcehut.ErrNotFound) {
381 return nil, ErrLogsNotFound
382 }
383 return nil, fmt.Errorf("get sourcehut job: %w", err)
384 }
385
386 out := make(chan LogLine, 32)
387 go func() {
388 defer close(out)
389 stepID := 0
390 // "setup" is the master log: setup output (sources clone,
391 // package install) before any task runs. Always emit it as
392 // step 0 even if empty so the renderer has a stable timeline.
393 if !p.streamStep(ctx, out, client, ref, "setup", "", stepID) {
394 return
395 }
396 stepID++
397 for _, task := range job.Tasks {
398 name := task.Name
399 if name == "" {
400 name = fmt.Sprintf("task %d", stepID)
401 }
402 if !p.streamStep(ctx, out, client, ref, name, task.Name, stepID) {
403 return
404 }
405 stepID++
406 }
407 }()
408 return out, nil
409}
410
411// streamStep emits one step's worth of frames into out: a start
412// control, one data frame per non-empty log line, and an end control.
413// stepName is the human-visible label; logTask is the name passed to
414// GetTaskLog (empty string fetches the master log). Returns false if
415// the context fired and the caller should bail.
416func (p *sourcehutProvider) streamStep(
417 ctx context.Context,
418 out chan<- LogLine,
419 client *sourcehut.Client,
420 ref *SourcehutJobRef,
421 stepName, logTask string,
422 stepID int,
423) bool {
424 if !sendLine(ctx, out, LogLine{
425 Kind: LogKindControl, Time: time.Now(),
426 Content: stepName, StepId: stepID, StepStatus: StepStatusStart,
427 }) {
428 return false
429 }
430 body, err := client.GetTaskLog(ctx, ref.JobID, logTask)
431 if err != nil && !errors.Is(err, sourcehut.ErrNotFound) {
432 // Don't fail the whole stream on one task; emit the end frame
433 // and move on so the renderer at least sees what other tasks
434 // produced. ErrNotFound (no log yet) is treated as an empty body.
435 p.log.Debug("fetch sourcehut task log",
436 "err", err, "job_id", ref.JobID, "task", logTask,
437 )
438 body = ""
439 }
440 if !emitLogBody(ctx, out, body, stepID) {
441 return false
442 }
443 return sendLine(ctx, out, LogLine{
444 Kind: LogKindControl, Time: time.Now(),
445 Content: stepName, StepId: stepID, StepStatus: StepStatusEnd,
446 })
447}
448
449// emitLogBody pushes one LogLine per non-empty line of body into out.
450// Returns false if the context fired so the caller can stop draining.
451func emitLogBody(ctx context.Context, out chan<- LogLine, body string, stepID int) bool {
452 if body == "" {
453 return true
454 }
455 for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") {
456 if line == "" {
457 continue
458 }
459 if !sendLine(ctx, out, LogLine{
460 Kind: LogKindData,
461 Time: time.Now(),
462 Content: line + "\n",
463 StepId: stepID,
464 Stream: "stdout",
465 }) {
466 return false
467 }
468 }
469 return true
470}
471
472// publishStatus assembles a tangled.PipelineStatus record and pushes
473// it through the broker. jobID is mixed into the rkey so multiple
474// status records for the same job don't collide on the events table's
475// (rkey) uniqueness, and so an operator grepping the log can find
476// every record pertaining to a given sourcehut job.
477func (p *sourcehutProvider) publishStatus(
478 ctx context.Context,
479 pipelineURI, workflow, status string,
480 jobID int64,
481 errMsg *string,
482 exitCode *int64,
483) error {
484 rec := tangled.PipelineStatus{
485 LexiconTypeID: tangled.PipelineStatusNSID,
486 Pipeline: pipelineURI,
487 Workflow: workflow,
488 Status: status,
489 CreatedAt: time.Now().UTC().Format(time.RFC3339),
490 Error: errMsg,
491 ExitCode: exitCode,
492 }
493 body, err := json.Marshal(rec)
494 if err != nil {
495 return fmt.Errorf("marshal pipeline.status: %w", err)
496 }
497 rkey := fmt.Sprintf("sh-%d-%s-%d", jobID, status, time.Now().UnixNano())
498 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
499 return fmt.Errorf("publish pipeline.status: %w", err)
500 }
501 return nil
502}
503
504// injectSourcehutEnvironment decodes the user-supplied manifest,
505// merges env into its top-level `environment` map, and re-emits it
506// as YAML. We round-trip via map[string]any rather than a typed
507// schema so unknown fields aren't dropped; comments and key ordering
508// are not preserved.
509//
510// Existing user-set entries win over our injected vars — an explicit
511// `TACK_FOO: ...` is a deliberate override.
512//
513// `environment:` must be a map. A sequence form is rejected loudly
514// rather than silently replaced with our map.
515func injectSourcehutEnvironment(manifest string, env map[string]string) (string, error) {
516 var doc map[string]any
517 if err := yaml.Unmarshal([]byte(manifest), &doc); err != nil {
518 return "", fmt.Errorf("parse manifest: %w", err)
519 }
520 if doc == nil {
521 doc = map[string]any{}
522 }
523 var current map[any]any
524 switch existing := doc["environment"].(type) {
525 case nil:
526 current = map[any]any{}
527 case map[any]any:
528 current = existing
529 default:
530 return "", fmt.Errorf(
531 "manifest `environment` must be a map, got %T", existing,
532 )
533 }
534 for k, v := range env {
535 if _, exists := current[k]; exists {
536 continue
537 }
538 current[k] = v
539 }
540 doc["environment"] = current
541
542 out, err := yaml.Marshal(doc)
543 if err != nil {
544 return "", fmt.Errorf("emit manifest: %w", err)
545 }
546 return string(out), nil
547}
548
549// clientFor resolves a workflow's optional `instance` override to the
550// client that should issue API calls for that workflow, plus the
551// concrete instance URL the row was/should-be persisted as. The
552// no-override (or matches-default) case reuses the long-lived default
553// client. Custom instances are memoised so repeated calls (per-poll,
554// per-Logs) reuse one Client — and therefore one connection pool —
555// per distinct upstream.
556func (p *sourcehutProvider) clientFor(instance string) (*sourcehut.Client, string) {
557 if instance == "" || instance == p.defaultInstance {
558 return p.defaultClient, p.defaultInstance
559 }
560 p.mu.Lock()
561 defer p.mu.Unlock()
562 if c, ok := p.instanceClients[instance]; ok {
563 return c, instance
564 }
565 c := sourcehut.NewClient(instance, p.token)
566 p.instanceClients[instance] = c
567 return c, instance
568}
569
570// shortCommit returns the first 12 hex chars of a commit SHA, or "?"
571// when commit is empty.
572func shortCommit(commit string) string {
573 if commit == "" {
574 return "?"
575 }
576 if len(commit) > 12 {
577 return commit[:12]
578 }
579 return commit
580}