Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.3 kB View raw
1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline 2// trigger records into Buildkite builds, and publishes Buildkite job state 3// back as sh.tangled.pipeline.status events on a WebSocket stream that the 4// Tangled appview can consume. 5package main 6 7import ( 8 "context" 9 "errors" 10 "flag" 11 "fmt" 12 "log/slog" 13 "os" 14 "os/signal" 15 "syscall" 16 17 charmlog "github.com/charmbracelet/log" 18 19 "go.mitchellh.com/tack/internal/buildkite" 20) 21 22// config is the runtime configuration, sourced from environment variables and 23// flags. Env vars match the README so this can be swapped in for the stock 24// spindle without surprises. 25type config struct { 26 Addr string 27 Hostname string 28 OwnerDID string 29 JetstreamURL string 30 DBPath string 31 // Dev flips the knot event-stream scheme from wss:// to ws://. 32 // Useful when running against a local knot during development. 33 Dev bool 34 35 // Buildkite-mode configuration. BuildkiteToken is the switch: 36 // when empty we fall back to the in-process fake provider 37 // (useful for local development against a real Tangled 38 // jetstream); when set, the other Buildkite fields are 39 // required and tack will refuse to start without them. 40 // 41 // BuildkiteOrg is the *default* org used when a workflow YAML 42 // doesn't specify one of its own. The pipeline a workflow runs 43 // against is no longer global — it's pulled from the workflow 44 // body itself (see workflowConfig in provider_buildkite.go). 45 BuildkiteToken string 46 BuildkiteOrg string 47 BuildkiteWebhookSecret string 48 BuildkiteWebhookMode buildkite.WebhookMode 49 50 // Tekton mode is explicit because it only works from inside a 51 // Kubernetes cluster. When enabled, tack creates PipelineRuns in 52 // TektonNamespace using its pod's service account credentials. 53 TektonEnabled bool 54 TektonNamespace string 55 56 // Sourcehut mode is gated on a token: when empty the provider is 57 // not registered and workflows naming `tack.sourcehut` route 58 // nowhere. SourcehutInstance overrides the public builds.sr.ht 59 // endpoint for users running their own deployment; the token must 60 // authenticate against whichever instance is targeted. 61 SourcehutToken string 62 SourcehutInstance string 63} 64 65func loadConfig() (config, error) { 66 cfg := config{ 67 Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 68 Hostname: os.Getenv("TACK_HOSTNAME"), 69 OwnerDID: os.Getenv("TACK_OWNER_DID"), 70 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 71 DBPath: envOr("TACK_DB_PATH", "tack.db"), 72 Dev: os.Getenv("TACK_DEV") != "", 73 BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"), 74 BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"), 75 BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"), 76 BuildkiteWebhookMode: buildkite.WebhookMode( 77 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)), 78 ), 79 TektonEnabled: os.Getenv("TACK_TEKTON_ENABLED") == "1", 80 TektonNamespace: envOr("TACK_TEKTON_NAMESPACE", "default"), 81 SourcehutToken: os.Getenv("TACK_SOURCEHUT_TOKEN"), 82 SourcehutInstance: os.Getenv("TACK_SOURCEHUT_INSTANCE"), 83 } 84 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 85 flag.Parse() 86 cfg.Addr = *addrFlag 87 88 if cfg.OwnerDID == "" { 89 return cfg, errors.New("TACK_OWNER_DID is required") 90 } 91 92 // Hostname identifies *us* in sh.tangled.repo records (the .spindle 93 // field). Without it we have no way to know which repos point at us 94 // and therefore which knots we should subscribe to for pipeline 95 // triggers — so we refuse to start rather than silently subscribe to 96 // nothing. 97 if cfg.Hostname == "" { 98 return cfg, errors.New("TACK_HOSTNAME is required") 99 } 100 101 // If the operator opted into Buildkite mode (by supplying a 102 // token), every other Buildkite knob has to be present. Half- 103 // configured Buildkite leads to confusing failures deep in the 104 // provider; catch it at startup. 105 if cfg.BuildkiteToken != "" { 106 if cfg.BuildkiteOrg == "" { 107 return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set") 108 } 109 if cfg.BuildkiteWebhookSecret == "" { 110 return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set") 111 } 112 switch cfg.BuildkiteWebhookMode { 113 case buildkite.WebhookModeToken, buildkite.WebhookModeSignature: 114 default: 115 return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q", 116 buildkite.WebhookModeToken, buildkite.WebhookModeSignature, 117 cfg.BuildkiteWebhookMode, 118 ) 119 } 120 } 121 if cfg.TektonEnabled && cfg.TektonNamespace == "" { 122 return cfg, errors.New("TACK_TEKTON_NAMESPACE is required when TACK_TEKTON_ENABLED=1") 123 } 124 125 return cfg, nil 126} 127 128func envOr(key, def string) string { 129 if v := os.Getenv(key); v != "" { 130 return v 131 } 132 return def 133} 134 135func main() { 136 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap 137 // it in slog.New to share the same backend with libraries that expect 138 // a *slog.Logger (notably the jetstream client). 139 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ 140 Level: charmlog.DebugLevel, 141 ReportTimestamp: true, 142 }) 143 logger := slog.New(charmHandler) 144 slog.SetDefault(logger) 145 146 // Config loading 147 cfg, err := loadConfig() 148 if err != nil { 149 logger.Error("invalid configuration", "err", err) 150 os.Exit(2) 151 } 152 153 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached 154 // so any function we hand it to can pull it back out via loggerFrom. 155 ctx, stop := signal.NotifyContext( 156 context.Background(), 157 os.Interrupt, syscall.SIGTERM, 158 ) 159 defer stop() 160 ctx = loggerInto(ctx, logger) 161 162 // Open (or create) the SQLite store. Holds jetstream cursor + 163 // observed Tangled membership records. Closed last during shutdown 164 // so anything writing to it during teardown still succeeds. 165 st, err := openStore(cfg.DBPath) 166 if err != nil { 167 logger.Error("failed to open store", "err", err, "path", cfg.DBPath) 168 os.Exit(1) 169 } 170 defer func() { 171 if err := st.Close(); err != nil { 172 logger.Error("close store", "err", err) 173 } 174 }() 175 logger.Info("store open", "path", cfg.DBPath) 176 177 // In-process broker for the /events fan-out. Wraps the store so 178 // publishes are durable and reconnecting subscribers can resume by 179 // cursor. Constructed before the consumers in case we ever want 180 // them to publish synthetic status events at startup. 181 br := newBroker(st) 182 183 // Providers turn Tangled pipeline triggers into pipeline.status 184 // events. We always wire the fake provider in so workflows can 185 // opt into it (via `tack: { fake: ... }`) for end-to-end testing 186 // even when Buildkite credentials are present; the Buildkite 187 // provider is added on top when configured. Routing is per- 188 // workflow and driven by the workflow YAML — see providerRouter. 189 // 190 // bkProvider is kept as a typed pointer separately because the 191 // /webhooks/buildkite handler needs the concrete *buildkiteProvider 192 // (for HandleWebhook + signature verification), not the abstract 193 // Provider surface. 194 providers := map[string]Provider{ 195 "fake": newFakeProvider(br, logger), 196 } 197 logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)") 198 199 var bkProvider *buildkiteProvider 200 if cfg.BuildkiteToken != "" { 201 bkProvider = newBuildkiteProvider( 202 br, st, 203 buildkite.NewClient(cfg.BuildkiteToken), 204 cfg.BuildkiteOrg, 205 cfg.BuildkiteWebhookSecret, 206 cfg.BuildkiteWebhookMode, 207 logger, 208 ) 209 providers["buildkite"] = bkProvider 210 logger.Info("buildkite provider enabled", 211 "default_org", cfg.BuildkiteOrg, 212 "webhook_mode", cfg.BuildkiteWebhookMode, 213 ) 214 } 215 if cfg.TektonEnabled { 216 tkProvider, err := newInClusterTektonProvider( 217 br, st, 218 cfg.TektonNamespace, 219 logger, 220 ) 221 if err != nil { 222 logger.Error("failed to configure tekton provider", "err", err) 223 os.Exit(1) 224 } 225 providers["tekton"] = tkProvider 226 logger.Info("tekton provider enabled", 227 "namespace", cfg.TektonNamespace, 228 ) 229 } 230 if cfg.SourcehutToken != "" { 231 shProvider := newSourcehutProvider( 232 br, st, 233 cfg.SourcehutToken, 234 cfg.SourcehutInstance, 235 logger, 236 ) 237 providers["sourcehut"] = shProvider 238 logger.Info("sourcehut provider enabled", 239 "instance", shProvider.defaultInstance, 240 ) 241 } 242 provider := newProviderRouter(logger, providers) 243 244 // Start the knot event-stream consumer first so the jetstream 245 // loop has somewhere to register newly-observed knots into. It 246 // gets the provider so each incoming pipeline trigger has 247 // something to dispatch to. 248 knots, err := startKnotConsumer(ctx, cfg, st, provider) 249 if err != nil { 250 logger.Error("failed to start knot consumer", "err", err) 251 os.Exit(1) 252 } 253 defer knots.Stop() 254 255 // Start the JetStream listener in the background. It hands the knot 256 // consumer any new knot referenced by an incoming sh.tangled.repo 257 // record so we don't have to wait for a restart to pick it up. 258 if err := startJetstream(ctx, cfg, st, knots); err != nil { 259 logger.Error("failed to start jetstream consumer", "err", err) 260 os.Exit(1) 261 } 262 263 // Run the HTTP server. This blocks until ctx is cancelled or the 264 // listener errors. 265 if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil { 266 logger.Error("http server error", "err", err) 267 os.Exit(1) 268 } 269}