Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.3 kB View raw
1package main 2 3// providerRouter dispatches each incoming workflow to whichever 4// configured Provider matches the workflow's YAML body. Selection 5// happens per-workflow: we decode the top-level `tack:` map and pick 6// the first child key that names one of the registered providers. 7// This keeps the trigger plumbing oblivious to which backend any 8// given workflow will run on, and lets a single tack instance host 9// multiple providers concurrently — the workflow YAML is the source 10// of truth for routing, not the operator's env. 11// 12// Providers are registered as a (key → Provider) map. When a 13// workflow names more than one provider key under `tack:` (a config 14// mistake in practice) the router walks the YAML's child keys in 15// document order and picks the first one that has a registered 16// provider — Go's map iteration randomness never enters into it, 17// because the YAML's MapSlice preserves order. 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "log/slog" 24 "strings" 25 26 "go.yaml.in/yaml/v2" 27 "tangled.org/core/api/tangled" 28) 29 30// providerRouter is itself a Provider so the rest of tack (knot 31// consumer, HTTP handlers) keeps talking to a single Provider value 32// regardless of how many real backends are wired in. 33type providerRouter struct { 34 log *slog.Logger 35 providers map[string]Provider 36} 37 38// Compile-time interface conformance check. 39var _ Provider = (*providerRouter)(nil) 40 41// newProviderRouter wires a router from a (key → Provider) map. 42func newProviderRouter( 43 log *slog.Logger, 44 providers map[string]Provider, 45) *providerRouter { 46 return &providerRouter{ 47 log: log.With("component", "provider", "kind", "router"), 48 providers: providers, 49 } 50} 51 52// Spawn satisfies Provider. Each workflow is routed independently — 53// a single pipeline can mix workflows that target different 54// providers. Workflows whose YAML doesn't name a known provider are 55// logged loudly and skipped, since silently dropping them would 56// hide a config error from the operator. 57func (r *providerRouter) Spawn( 58 ctx context.Context, 59 knot string, 60 pipelineRkey string, 61 actor string, 62 trigger *tangled.Pipeline_TriggerMetadata, 63 workflows []*tangled.Pipeline_Workflow, 64) { 65 for _, wf := range workflows { 66 // Defensive: the lexicon allows nil entries and doesn't 67 // require a name. We can't route a workflow that has no 68 // body to inspect. 69 if wf == nil || wf.Name == "" { 70 continue 71 } 72 if wf.Engine != "tack" { 73 r.log.Error("workflow has wrong engine", 74 "err", fmt.Sprintf("expected engine %q, got %q", "tack", wf.Engine), 75 "knot", knot, 76 "pipeline_rkey", pipelineRkey, 77 "workflow", wf.Name, 78 ) 79 continue 80 } 81 p, err := r.pick(wf.Raw) 82 if err != nil { 83 r.log.Error("route workflow", 84 "err", err, 85 "knot", knot, 86 "pipeline_rkey", pipelineRkey, 87 "workflow", wf.Name, 88 ) 89 continue 90 } 91 // Hand the workflow off as a single-element slice so the 92 // downstream provider's existing Spawn loop runs unchanged. 93 // actor passes through verbatim; the router doesn't 94 // authorize, it just dispatches. 95 p.Spawn(ctx, knot, pipelineRkey, actor, trigger, 96 []*tangled.Pipeline_Workflow{wf}, 97 ) 98 } 99} 100 101// Logs satisfies Provider. The (knot, rkey, workflow) tuple alone 102// doesn't tell us which backend ran the workflow — the YAML body 103// isn't carried on the request — so we ask each provider and 104// surface the first one that has a stream. ErrLogsNotFound from a 105// provider just means "not mine"; we keep walking. Any other error 106// is the answer (a real backend failure should surface to the HTTP 107// caller, not be masked by the next provider). 108// 109// Map iteration order is undefined, but in practice exactly one 110// provider should know about any given (knot, rkey, workflow) so the 111// order is moot. 112func (r *providerRouter) Logs( 113 ctx context.Context, 114 knot string, 115 pipelineRkey string, 116 workflow string, 117) (<-chan LogLine, error) { 118 for _, p := range r.providers { 119 ch, err := p.Logs(ctx, knot, pipelineRkey, workflow) 120 if errors.Is(err, ErrLogsNotFound) { 121 continue 122 } 123 return ch, err 124 } 125 return nil, ErrLogsNotFound 126} 127 128// pick decodes raw and returns the first registered provider whose 129// key appears as a child of the top-level `tack:` map, walking the 130// YAML's children in document order. An empty body or YAML with no 131// `tack:` block — or one whose children name no registered provider 132// — is a structural error; no routing decision is possible. 133func (r *providerRouter) pick(raw string) (Provider, error) { 134 if strings.TrimSpace(raw) == "" { 135 return nil, errors.New("workflow body is empty") 136 } 137 // MapSlice preserves the on-the-wire ordering of the children 138 // of `tack:` so "first match" is deterministic w.r.t. the YAML 139 // document, not Go's randomised map iteration. 140 var doc struct { 141 Tack yaml.MapSlice `yaml:"tack"` 142 } 143 if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 144 return nil, fmt.Errorf("parse workflow yaml: %w", err) 145 } 146 for _, item := range doc.Tack { 147 // YAML map keys are usually strings, but the lexicon 148 // doesn't enforce that — guard so a stray int/bool key 149 // doesn't panic the type assertion. 150 key, ok := item.Key.(string) 151 if !ok { 152 continue 153 } 154 if p, ok := r.providers[key]; ok { 155 return p, nil 156 } 157 } 158 return nil, fmt.Errorf( 159 "workflow yaml has no `tack:` key matching a registered provider", 160 ) 161}