Stitch any CI into Tangled
1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline
2// trigger records into Buildkite builds, and publishes Buildkite job state
3// back as sh.tangled.pipeline.status events on a WebSocket stream that the
4// Tangled appview can consume.
5package main
6
7import (
8 "context"
9 "errors"
10 "flag"
11 "fmt"
12 "log/slog"
13 "os"
14 "os/signal"
15 "syscall"
16
17 charmlog "github.com/charmbracelet/log"
18
19 "go.mitchellh.com/tack/internal/buildkite"
20)
21
22// config is the runtime configuration, sourced from environment variables and
23// flags. Env vars match the README so this can be swapped in for the stock
24// spindle without surprises.
25type config struct {
26 Addr string
27 Hostname string
28 OwnerDID string
29 JetstreamURL string
30 DBPath string
31 // Dev flips the knot event-stream scheme from wss:// to ws://.
32 // Useful when running against a local knot during development.
33 Dev bool
34
35 // Buildkite-mode configuration. BuildkiteToken is the switch:
36 // when empty we fall back to the in-process fake provider
37 // (useful for local development against a real Tangled
38 // jetstream); when set, the other Buildkite fields are
39 // required and tack will refuse to start without them.
40 //
41 // BuildkiteOrg is the *default* org used when a workflow YAML
42 // doesn't specify one of its own. The pipeline a workflow runs
43 // against is no longer global — it's pulled from the workflow
44 // body itself (see workflowConfig in provider_buildkite.go).
45 BuildkiteToken string
46 BuildkiteOrg string
47 BuildkiteWebhookSecret string
48 BuildkiteWebhookMode buildkite.WebhookMode
49}
50
51func loadConfig() (config, error) {
52 cfg := config{
53 Addr: envOr("TACK_LISTEN_ADDR", ":8080"),
54 Hostname: os.Getenv("TACK_HOSTNAME"),
55 OwnerDID: os.Getenv("TACK_OWNER_DID"),
56 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"),
57 DBPath: envOr("TACK_DB_PATH", "tack.db"),
58 Dev: os.Getenv("TACK_DEV") != "",
59 BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"),
60 BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"),
61 BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"),
62 BuildkiteWebhookMode: buildkite.WebhookMode(
63 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)),
64 ),
65 }
66 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)")
67 flag.Parse()
68 cfg.Addr = *addrFlag
69
70 if cfg.OwnerDID == "" {
71 return cfg, errors.New("TACK_OWNER_DID is required")
72 }
73
74 // Hostname identifies *us* in sh.tangled.repo records (the .spindle
75 // field). Without it we have no way to know which repos point at us
76 // and therefore which knots we should subscribe to for pipeline
77 // triggers — so we refuse to start rather than silently subscribe to
78 // nothing.
79 if cfg.Hostname == "" {
80 return cfg, errors.New("TACK_HOSTNAME is required")
81 }
82
83 // If the operator opted into Buildkite mode (by supplying a
84 // token), every other Buildkite knob has to be present. Half-
85 // configured Buildkite leads to confusing failures deep in the
86 // provider; catch it at startup.
87 if cfg.BuildkiteToken != "" {
88 if cfg.BuildkiteOrg == "" {
89 return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set")
90 }
91 if cfg.BuildkiteWebhookSecret == "" {
92 return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set")
93 }
94 switch cfg.BuildkiteWebhookMode {
95 case buildkite.WebhookModeToken, buildkite.WebhookModeSignature:
96 default:
97 return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q",
98 buildkite.WebhookModeToken, buildkite.WebhookModeSignature,
99 cfg.BuildkiteWebhookMode,
100 )
101 }
102 }
103
104 return cfg, nil
105}
106
107func envOr(key, def string) string {
108 if v := os.Getenv(key); v != "" {
109 return v
110 }
111 return def
112}
113
114func main() {
115 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap
116 // it in slog.New to share the same backend with libraries that expect
117 // a *slog.Logger (notably the jetstream client).
118 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{
119 Level: charmlog.DebugLevel,
120 ReportTimestamp: true,
121 })
122 logger := slog.New(charmHandler)
123 slog.SetDefault(logger)
124
125 // Config loading
126 cfg, err := loadConfig()
127 if err != nil {
128 logger.Error("invalid configuration", "err", err)
129 os.Exit(2)
130 }
131
132 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached
133 // so any function we hand it to can pull it back out via loggerFrom.
134 ctx, stop := signal.NotifyContext(
135 context.Background(),
136 os.Interrupt, syscall.SIGTERM,
137 )
138 defer stop()
139 ctx = loggerInto(ctx, logger)
140
141 // Open (or create) the SQLite store. Holds jetstream cursor +
142 // observed Tangled membership records. Closed last during shutdown
143 // so anything writing to it during teardown still succeeds.
144 st, err := openStore(cfg.DBPath)
145 if err != nil {
146 logger.Error("failed to open store", "err", err, "path", cfg.DBPath)
147 os.Exit(1)
148 }
149 defer func() {
150 if err := st.Close(); err != nil {
151 logger.Error("close store", "err", err)
152 }
153 }()
154 logger.Info("store open", "path", cfg.DBPath)
155
156 // In-process broker for the /events fan-out. Wraps the store so
157 // publishes are durable and reconnecting subscribers can resume by
158 // cursor. Constructed before the consumers in case we ever want
159 // them to publish synthetic status events at startup.
160 br := newBroker(st)
161
162 // Providers turn Tangled pipeline triggers into pipeline.status
163 // events. We always wire the fake provider in so workflows can
164 // opt into it (via `tack: { fake: ... }`) for end-to-end testing
165 // even when Buildkite credentials are present; the Buildkite
166 // provider is added on top when configured. Routing is per-
167 // workflow and driven by the workflow YAML — see providerRouter.
168 //
169 // bkProvider is kept as a typed pointer separately because the
170 // /webhooks/buildkite handler needs the concrete *buildkiteProvider
171 // (for HandleWebhook + signature verification), not the abstract
172 // Provider surface.
173 providers := map[string]Provider{
174 "fake": newFakeProvider(br, logger),
175 }
176 logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)")
177
178 var bkProvider *buildkiteProvider
179 if cfg.BuildkiteToken != "" {
180 bkProvider = newBuildkiteProvider(
181 br, st,
182 buildkite.NewClient(cfg.BuildkiteToken),
183 cfg.BuildkiteOrg,
184 cfg.BuildkiteWebhookSecret,
185 cfg.BuildkiteWebhookMode,
186 logger,
187 )
188 providers["buildkite"] = bkProvider
189 logger.Info("buildkite provider enabled",
190 "default_org", cfg.BuildkiteOrg,
191 "webhook_mode", cfg.BuildkiteWebhookMode,
192 )
193 }
194 provider := newProviderRouter(logger, providers)
195
196 // Start the knot event-stream consumer first so the jetstream
197 // loop has somewhere to register newly-observed knots into. It
198 // gets the provider so each incoming pipeline trigger has
199 // something to dispatch to.
200 knots, err := startKnotConsumer(ctx, cfg, st, provider)
201 if err != nil {
202 logger.Error("failed to start knot consumer", "err", err)
203 os.Exit(1)
204 }
205 defer knots.Stop()
206
207 // Start the JetStream listener in the background. It hands the knot
208 // consumer any new knot referenced by an incoming sh.tangled.repo
209 // record so we don't have to wait for a restart to pick it up.
210 if err := startJetstream(ctx, cfg, st, knots); err != nil {
211 logger.Error("failed to start jetstream consumer", "err", err)
212 os.Exit(1)
213 }
214
215 // Run the HTTP server. This blocks until ctx is cancelled or the
216 // listener errors.
217 if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil {
218 logger.Error("http server error", "err", err)
219 os.Exit(1)
220 }
221}