Stitch any CI into Tangled
1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline
2// trigger records into Buildkite builds, and publishes Buildkite job state
3// back as sh.tangled.pipeline.status events on a WebSocket stream that the
4// Tangled appview can consume.
5package main
6
7import (
8 "context"
9 "errors"
10 "flag"
11 "fmt"
12 "log/slog"
13 "os"
14 "os/signal"
15 "syscall"
16
17 charmlog "github.com/charmbracelet/log"
18
19 "go.mitchellh.com/tack/internal/buildkite"
20)
21
22// config is the runtime configuration, sourced from environment variables and
23// flags. Env vars match the README so this can be swapped in for the stock
24// spindle without surprises.
25type config struct {
26 Addr string
27 Hostname string
28 OwnerDID string
29 JetstreamURL string
30 DBPath string
31 // Dev flips the knot event-stream scheme from wss:// to ws://.
32 // Useful when running against a local knot during development.
33 Dev bool
34
35 // Buildkite-mode configuration. BuildkiteToken is the switch:
36 // when empty we fall back to the in-process fake provider
37 // (useful for local development against a real Tangled
38 // jetstream); when set, the other Buildkite fields are
39 // required and tack will refuse to start without them.
40 //
41 // BuildkiteOrg is the *default* org used when a workflow YAML
42 // doesn't specify one of its own. The pipeline a workflow runs
43 // against is no longer global — it's pulled from the workflow
44 // body itself (see workflowConfig in provider_buildkite.go).
45 BuildkiteToken string
46 BuildkiteOrg string
47 BuildkiteWebhookSecret string
48 BuildkiteWebhookMode buildkite.WebhookMode
49
50 // Tekton mode is explicit because it only works from inside a
51 // Kubernetes cluster. When enabled, tack creates PipelineRuns in
52 // TektonNamespace using its pod's service account credentials.
53 TektonEnabled bool
54 TektonNamespace string
55
56 // Sourcehut mode is gated on a token: when empty the provider is
57 // not registered and workflows naming `tack.sourcehut` route
58 // nowhere. SourcehutInstance overrides the public builds.sr.ht
59 // endpoint for users running their own deployment; the token must
60 // authenticate against whichever instance is targeted.
61 SourcehutToken string
62 SourcehutInstance string
63}
64
65func loadConfig() (config, error) {
66 cfg := config{
67 Addr: envOr("TACK_LISTEN_ADDR", ":8080"),
68 Hostname: os.Getenv("TACK_HOSTNAME"),
69 OwnerDID: os.Getenv("TACK_OWNER_DID"),
70 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"),
71 DBPath: envOr("TACK_DB_PATH", "tack.db"),
72 Dev: os.Getenv("TACK_DEV") != "",
73 BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"),
74 BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"),
75 BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"),
76 BuildkiteWebhookMode: buildkite.WebhookMode(
77 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)),
78 ),
79 TektonEnabled: os.Getenv("TACK_TEKTON_ENABLED") == "1",
80 TektonNamespace: envOr("TACK_TEKTON_NAMESPACE", "default"),
81 SourcehutToken: os.Getenv("TACK_SOURCEHUT_TOKEN"),
82 SourcehutInstance: os.Getenv("TACK_SOURCEHUT_INSTANCE"),
83 }
84 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)")
85 flag.Parse()
86 cfg.Addr = *addrFlag
87
88 if cfg.OwnerDID == "" {
89 return cfg, errors.New("TACK_OWNER_DID is required")
90 }
91
92 // Hostname identifies *us* in sh.tangled.repo records (the .spindle
93 // field). Without it we have no way to know which repos point at us
94 // and therefore which knots we should subscribe to for pipeline
95 // triggers — so we refuse to start rather than silently subscribe to
96 // nothing.
97 if cfg.Hostname == "" {
98 return cfg, errors.New("TACK_HOSTNAME is required")
99 }
100
101 // If the operator opted into Buildkite mode (by supplying a
102 // token), every other Buildkite knob has to be present. Half-
103 // configured Buildkite leads to confusing failures deep in the
104 // provider; catch it at startup.
105 if cfg.BuildkiteToken != "" {
106 if cfg.BuildkiteOrg == "" {
107 return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set")
108 }
109 if cfg.BuildkiteWebhookSecret == "" {
110 return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set")
111 }
112 switch cfg.BuildkiteWebhookMode {
113 case buildkite.WebhookModeToken, buildkite.WebhookModeSignature:
114 default:
115 return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q",
116 buildkite.WebhookModeToken, buildkite.WebhookModeSignature,
117 cfg.BuildkiteWebhookMode,
118 )
119 }
120 }
121 if cfg.TektonEnabled && cfg.TektonNamespace == "" {
122 return cfg, errors.New("TACK_TEKTON_NAMESPACE is required when TACK_TEKTON_ENABLED=1")
123 }
124
125 return cfg, nil
126}
127
128func envOr(key, def string) string {
129 if v := os.Getenv(key); v != "" {
130 return v
131 }
132 return def
133}
134
135func main() {
136 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap
137 // it in slog.New to share the same backend with libraries that expect
138 // a *slog.Logger (notably the jetstream client).
139 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{
140 Level: charmlog.DebugLevel,
141 ReportTimestamp: true,
142 })
143 logger := slog.New(charmHandler)
144 slog.SetDefault(logger)
145
146 // Config loading
147 cfg, err := loadConfig()
148 if err != nil {
149 logger.Error("invalid configuration", "err", err)
150 os.Exit(2)
151 }
152
153 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached
154 // so any function we hand it to can pull it back out via loggerFrom.
155 ctx, stop := signal.NotifyContext(
156 context.Background(),
157 os.Interrupt, syscall.SIGTERM,
158 )
159 defer stop()
160 ctx = loggerInto(ctx, logger)
161
162 // Open (or create) the SQLite store. Holds jetstream cursor +
163 // observed Tangled membership records. Closed last during shutdown
164 // so anything writing to it during teardown still succeeds.
165 st, err := openStore(cfg.DBPath)
166 if err != nil {
167 logger.Error("failed to open store", "err", err, "path", cfg.DBPath)
168 os.Exit(1)
169 }
170 defer func() {
171 if err := st.Close(); err != nil {
172 logger.Error("close store", "err", err)
173 }
174 }()
175 logger.Info("store open", "path", cfg.DBPath)
176
177 // In-process broker for the /events fan-out. Wraps the store so
178 // publishes are durable and reconnecting subscribers can resume by
179 // cursor. Constructed before the consumers in case we ever want
180 // them to publish synthetic status events at startup.
181 br := newBroker(st)
182
183 // Providers turn Tangled pipeline triggers into pipeline.status
184 // events. We always wire the fake provider in so workflows can
185 // opt into it (via `tack: { fake: ... }`) for end-to-end testing
186 // even when Buildkite credentials are present; the Buildkite
187 // provider is added on top when configured. Routing is per-
188 // workflow and driven by the workflow YAML — see providerRouter.
189 //
190 // bkProvider is kept as a typed pointer separately because the
191 // /webhooks/buildkite handler needs the concrete *buildkiteProvider
192 // (for HandleWebhook + signature verification), not the abstract
193 // Provider surface.
194 providers := map[string]Provider{
195 "fake": newFakeProvider(br, logger),
196 }
197 logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)")
198
199 var bkProvider *buildkiteProvider
200 if cfg.BuildkiteToken != "" {
201 bkProvider = newBuildkiteProvider(
202 br, st,
203 buildkite.NewClient(cfg.BuildkiteToken),
204 cfg.BuildkiteOrg,
205 cfg.BuildkiteWebhookSecret,
206 cfg.BuildkiteWebhookMode,
207 logger,
208 )
209 providers["buildkite"] = bkProvider
210 logger.Info("buildkite provider enabled",
211 "default_org", cfg.BuildkiteOrg,
212 "webhook_mode", cfg.BuildkiteWebhookMode,
213 )
214 }
215 if cfg.TektonEnabled {
216 tkProvider, err := newInClusterTektonProvider(
217 br, st,
218 cfg.TektonNamespace,
219 logger,
220 )
221 if err != nil {
222 logger.Error("failed to configure tekton provider", "err", err)
223 os.Exit(1)
224 }
225 providers["tekton"] = tkProvider
226 logger.Info("tekton provider enabled",
227 "namespace", cfg.TektonNamespace,
228 )
229 }
230 if cfg.SourcehutToken != "" {
231 shProvider := newSourcehutProvider(
232 br, st,
233 cfg.SourcehutToken,
234 cfg.SourcehutInstance,
235 logger,
236 )
237 providers["sourcehut"] = shProvider
238 logger.Info("sourcehut provider enabled",
239 "instance", shProvider.defaultInstance,
240 )
241 }
242 provider := newProviderRouter(logger, providers)
243
244 // Start the knot event-stream consumer first so the jetstream
245 // loop has somewhere to register newly-observed knots into. It
246 // gets the provider so each incoming pipeline trigger has
247 // something to dispatch to.
248 knots, err := startKnotConsumer(ctx, cfg, st, provider)
249 if err != nil {
250 logger.Error("failed to start knot consumer", "err", err)
251 os.Exit(1)
252 }
253 defer knots.Stop()
254
255 // Start the JetStream listener in the background. It hands the knot
256 // consumer any new knot referenced by an incoming sh.tangled.repo
257 // record so we don't have to wait for a restart to pick it up.
258 if err := startJetstream(ctx, cfg, st, knots); err != nil {
259 logger.Error("failed to start jetstream consumer", "err", err)
260 os.Exit(1)
261 }
262
263 // Run the HTTP server. This blocks until ctx is cancelled or the
264 // listener errors.
265 if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil {
266 logger.Error("http server error", "err", err)
267 os.Exit(1)
268 }
269}