Stitch any CI into Tangled
1package main
2
3// providerRouter dispatches each incoming workflow to whichever
4// configured Provider matches the workflow's YAML body. Selection
5// happens per-workflow: we decode the top-level `tack:` map and pick
6// the first child key that names one of the registered providers.
7// This keeps the trigger plumbing oblivious to which backend any
8// given workflow will run on, and lets a single tack instance host
9// multiple providers concurrently — the workflow YAML is the source
10// of truth for routing, not the operator's env.
11//
12// Providers are registered as a (key → Provider) map. When a
13// workflow names more than one provider key under `tack:` (a config
14// mistake in practice) the router walks the YAML's child keys in
15// document order and picks the first one that has a registered
16// provider — Go's map iteration randomness never enters into it,
17// because the YAML's MapSlice preserves order.
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "log/slog"
24 "strings"
25
26 "go.yaml.in/yaml/v2"
27 "tangled.org/core/api/tangled"
28)
29
30// providerRouter is itself a Provider so the rest of tack (knot
31// consumer, HTTP handlers) keeps talking to a single Provider value
32// regardless of how many real backends are wired in.
33type providerRouter struct {
34 log *slog.Logger
35 providers map[string]Provider
36}
37
38// Compile-time interface conformance check.
39var _ Provider = (*providerRouter)(nil)
40
41// newProviderRouter wires a router from a (key → Provider) map.
42func newProviderRouter(
43 log *slog.Logger,
44 providers map[string]Provider,
45) *providerRouter {
46 return &providerRouter{
47 log: log.With("component", "provider", "kind", "router"),
48 providers: providers,
49 }
50}
51
52// Spawn satisfies Provider. Each workflow is routed independently —
53// a single pipeline can mix workflows that target different
54// providers. Workflows whose YAML doesn't name a known provider are
55// logged loudly and skipped, since silently dropping them would
56// hide a config error from the operator.
57func (r *providerRouter) Spawn(
58 ctx context.Context,
59 knot string,
60 pipelineRkey string,
61 actor string,
62 trigger *tangled.Pipeline_TriggerMetadata,
63 workflows []*tangled.Pipeline_Workflow,
64) {
65 for _, wf := range workflows {
66 // Defensive: the lexicon allows nil entries and doesn't
67 // require a name. We can't route a workflow that has no
68 // body to inspect.
69 if wf == nil || wf.Name == "" {
70 continue
71 }
72 if wf.Engine != "tack" {
73 r.log.Error("workflow has wrong engine",
74 "err", fmt.Sprintf("expected engine %q, got %q", "tack", wf.Engine),
75 "knot", knot,
76 "pipeline_rkey", pipelineRkey,
77 "workflow", wf.Name,
78 )
79 continue
80 }
81 p, err := r.pick(wf.Raw)
82 if err != nil {
83 r.log.Error("route workflow",
84 "err", err,
85 "knot", knot,
86 "pipeline_rkey", pipelineRkey,
87 "workflow", wf.Name,
88 )
89 continue
90 }
91 // Hand the workflow off as a single-element slice so the
92 // downstream provider's existing Spawn loop runs unchanged.
93 // actor passes through verbatim; the router doesn't
94 // authorize, it just dispatches.
95 p.Spawn(ctx, knot, pipelineRkey, actor, trigger,
96 []*tangled.Pipeline_Workflow{wf},
97 )
98 }
99}
100
101// Logs satisfies Provider. The (knot, rkey, workflow) tuple alone
102// doesn't tell us which backend ran the workflow — the YAML body
103// isn't carried on the request — so we ask each provider and
104// surface the first one that has a stream. ErrLogsNotFound from a
105// provider just means "not mine"; we keep walking. Any other error
106// is the answer (a real backend failure should surface to the HTTP
107// caller, not be masked by the next provider).
108//
109// Map iteration order is undefined, but in practice exactly one
110// provider should know about any given (knot, rkey, workflow) so the
111// order is moot.
112func (r *providerRouter) Logs(
113 ctx context.Context,
114 knot string,
115 pipelineRkey string,
116 workflow string,
117) (<-chan LogLine, error) {
118 for _, p := range r.providers {
119 ch, err := p.Logs(ctx, knot, pipelineRkey, workflow)
120 if errors.Is(err, ErrLogsNotFound) {
121 continue
122 }
123 return ch, err
124 }
125 return nil, ErrLogsNotFound
126}
127
128// pick decodes raw and returns the first registered provider whose
129// key appears as a child of the top-level `tack:` map, walking the
130// YAML's children in document order. An empty body or YAML with no
131// `tack:` block — or one whose children name no registered provider
132// — is a structural error; no routing decision is possible.
133func (r *providerRouter) pick(raw string) (Provider, error) {
134 if strings.TrimSpace(raw) == "" {
135 return nil, errors.New("workflow body is empty")
136 }
137 // MapSlice preserves the on-the-wire ordering of the children
138 // of `tack:` so "first match" is deterministic w.r.t. the YAML
139 // document, not Go's randomised map iteration.
140 var doc struct {
141 Tack yaml.MapSlice `yaml:"tack"`
142 }
143 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil {
144 return nil, fmt.Errorf("parse workflow yaml: %w", err)
145 }
146 for _, item := range doc.Tack {
147 // YAML map keys are usually strings, but the lexicon
148 // doesn't enforce that — guard so a stray int/bool key
149 // doesn't panic the type assertion.
150 key, ok := item.Key.(string)
151 if !ok {
152 continue
153 }
154 if p, ok := r.providers[key]; ok {
155 return p, nil
156 }
157 }
158 return nil, fmt.Errorf(
159 "workflow yaml has no `tack:` key matching a registered provider",
160 )
161}