Stitch any CI into Tangled
1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline
2// trigger records into Buildkite builds, and publishes Buildkite job state
3// back as sh.tangled.pipeline.status events on a WebSocket stream that the
4// Tangled appview can consume.
5package main
6
7import (
8 "context"
9 "errors"
10 "flag"
11 "fmt"
12 "log/slog"
13 "os"
14 "os/signal"
15 "syscall"
16
17 charmlog "github.com/charmbracelet/log"
18
19 "go.mitchellh.com/tack/internal/buildkite"
20)
21
22// config is the runtime configuration, sourced from environment variables and
23// flags. Env vars match the README so this can be swapped in for the stock
24// spindle without surprises.
25type config struct {
26 Addr string
27 Hostname string
28 OwnerDID string
29 JetstreamURL string
30 DBPath string
31 // Dev flips the knot event-stream scheme from wss:// to ws://.
32 // Useful when running against a local knot during development.
33 Dev bool
34
35 // Buildkite-mode configuration. BuildkiteToken is the switch:
36 // when empty we fall back to the in-process fake provider
37 // (useful for local development against a real Tangled
38 // jetstream); when set, the other Buildkite fields are
39 // required and tack will refuse to start without them.
40 //
41 // BuildkiteOrg is the *default* org used when a workflow YAML
42 // doesn't specify one of its own. The pipeline a workflow runs
43 // against is no longer global — it's pulled from the workflow
44 // body itself (see workflowConfig in provider_buildkite.go).
45 BuildkiteToken string
46 BuildkiteOrg string
47 BuildkiteWebhookSecret string
48 BuildkiteWebhookMode buildkite.WebhookMode
49
50 // Tekton mode is explicit because it only works from inside a
51 // Kubernetes cluster. When enabled, tack creates PipelineRuns in
52 // TektonNamespace using its pod's service account credentials.
53 TektonEnabled bool
54 TektonNamespace string
55}
56
57func loadConfig() (config, error) {
58 cfg := config{
59 Addr: envOr("TACK_LISTEN_ADDR", ":8080"),
60 Hostname: os.Getenv("TACK_HOSTNAME"),
61 OwnerDID: os.Getenv("TACK_OWNER_DID"),
62 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"),
63 DBPath: envOr("TACK_DB_PATH", "tack.db"),
64 Dev: os.Getenv("TACK_DEV") != "",
65 BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"),
66 BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"),
67 BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"),
68 BuildkiteWebhookMode: buildkite.WebhookMode(
69 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)),
70 ),
71 TektonEnabled: os.Getenv("TACK_TEKTON_ENABLED") == "1",
72 TektonNamespace: envOr("TACK_TEKTON_NAMESPACE", "default"),
73 }
74 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)")
75 flag.Parse()
76 cfg.Addr = *addrFlag
77
78 if cfg.OwnerDID == "" {
79 return cfg, errors.New("TACK_OWNER_DID is required")
80 }
81
82 // Hostname identifies *us* in sh.tangled.repo records (the .spindle
83 // field). Without it we have no way to know which repos point at us
84 // and therefore which knots we should subscribe to for pipeline
85 // triggers — so we refuse to start rather than silently subscribe to
86 // nothing.
87 if cfg.Hostname == "" {
88 return cfg, errors.New("TACK_HOSTNAME is required")
89 }
90
91 // If the operator opted into Buildkite mode (by supplying a
92 // token), every other Buildkite knob has to be present. Half-
93 // configured Buildkite leads to confusing failures deep in the
94 // provider; catch it at startup.
95 if cfg.BuildkiteToken != "" {
96 if cfg.BuildkiteOrg == "" {
97 return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set")
98 }
99 if cfg.BuildkiteWebhookSecret == "" {
100 return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set")
101 }
102 switch cfg.BuildkiteWebhookMode {
103 case buildkite.WebhookModeToken, buildkite.WebhookModeSignature:
104 default:
105 return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q",
106 buildkite.WebhookModeToken, buildkite.WebhookModeSignature,
107 cfg.BuildkiteWebhookMode,
108 )
109 }
110 }
111 if cfg.TektonEnabled && cfg.TektonNamespace == "" {
112 return cfg, errors.New("TACK_TEKTON_NAMESPACE is required when TACK_TEKTON_ENABLED=1")
113 }
114
115 return cfg, nil
116}
117
118func envOr(key, def string) string {
119 if v := os.Getenv(key); v != "" {
120 return v
121 }
122 return def
123}
124
125func main() {
126 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap
127 // it in slog.New to share the same backend with libraries that expect
128 // a *slog.Logger (notably the jetstream client).
129 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{
130 Level: charmlog.DebugLevel,
131 ReportTimestamp: true,
132 })
133 logger := slog.New(charmHandler)
134 slog.SetDefault(logger)
135
136 // Config loading
137 cfg, err := loadConfig()
138 if err != nil {
139 logger.Error("invalid configuration", "err", err)
140 os.Exit(2)
141 }
142
143 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached
144 // so any function we hand it to can pull it back out via loggerFrom.
145 ctx, stop := signal.NotifyContext(
146 context.Background(),
147 os.Interrupt, syscall.SIGTERM,
148 )
149 defer stop()
150 ctx = loggerInto(ctx, logger)
151
152 // Open (or create) the SQLite store. Holds jetstream cursor +
153 // observed Tangled membership records. Closed last during shutdown
154 // so anything writing to it during teardown still succeeds.
155 st, err := openStore(cfg.DBPath)
156 if err != nil {
157 logger.Error("failed to open store", "err", err, "path", cfg.DBPath)
158 os.Exit(1)
159 }
160 defer func() {
161 if err := st.Close(); err != nil {
162 logger.Error("close store", "err", err)
163 }
164 }()
165 logger.Info("store open", "path", cfg.DBPath)
166
167 // In-process broker for the /events fan-out. Wraps the store so
168 // publishes are durable and reconnecting subscribers can resume by
169 // cursor. Constructed before the consumers in case we ever want
170 // them to publish synthetic status events at startup.
171 br := newBroker(st)
172
173 // Providers turn Tangled pipeline triggers into pipeline.status
174 // events. We always wire the fake provider in so workflows can
175 // opt into it (via `tack: { fake: ... }`) for end-to-end testing
176 // even when Buildkite credentials are present; the Buildkite
177 // provider is added on top when configured. Routing is per-
178 // workflow and driven by the workflow YAML — see providerRouter.
179 //
180 // bkProvider is kept as a typed pointer separately because the
181 // /webhooks/buildkite handler needs the concrete *buildkiteProvider
182 // (for HandleWebhook + signature verification), not the abstract
183 // Provider surface.
184 providers := map[string]Provider{
185 "fake": newFakeProvider(br, logger),
186 }
187 logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)")
188
189 var bkProvider *buildkiteProvider
190 if cfg.BuildkiteToken != "" {
191 bkProvider = newBuildkiteProvider(
192 br, st,
193 buildkite.NewClient(cfg.BuildkiteToken),
194 cfg.BuildkiteOrg,
195 cfg.BuildkiteWebhookSecret,
196 cfg.BuildkiteWebhookMode,
197 logger,
198 )
199 providers["buildkite"] = bkProvider
200 logger.Info("buildkite provider enabled",
201 "default_org", cfg.BuildkiteOrg,
202 "webhook_mode", cfg.BuildkiteWebhookMode,
203 )
204 }
205 if cfg.TektonEnabled {
206 tkProvider, err := newInClusterTektonProvider(
207 br, st,
208 cfg.TektonNamespace,
209 logger,
210 )
211 if err != nil {
212 logger.Error("failed to configure tekton provider", "err", err)
213 os.Exit(1)
214 }
215 providers["tekton"] = tkProvider
216 logger.Info("tekton provider enabled",
217 "namespace", cfg.TektonNamespace,
218 )
219 }
220 provider := newProviderRouter(logger, providers)
221
222 // Start the knot event-stream consumer first so the jetstream
223 // loop has somewhere to register newly-observed knots into. It
224 // gets the provider so each incoming pipeline trigger has
225 // something to dispatch to.
226 knots, err := startKnotConsumer(ctx, cfg, st, provider)
227 if err != nil {
228 logger.Error("failed to start knot consumer", "err", err)
229 os.Exit(1)
230 }
231 defer knots.Stop()
232
233 // Start the JetStream listener in the background. It hands the knot
234 // consumer any new knot referenced by an incoming sh.tangled.repo
235 // record so we don't have to wait for a restart to pick it up.
236 if err := startJetstream(ctx, cfg, st, knots); err != nil {
237 logger.Error("failed to start jetstream consumer", "err", err)
238 os.Exit(1)
239 }
240
241 // Run the HTTP server. This blocks until ctx is cancelled or the
242 // listener errors.
243 if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil {
244 logger.Error("http server error", "err", err)
245 os.Exit(1)
246 }
247}