Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline 2// trigger records into Buildkite builds, and publishes Buildkite job state 3// back as sh.tangled.pipeline.status events on a WebSocket stream that the 4// Tangled appview can consume. 5package main 6 7import ( 8 "context" 9 "errors" 10 "flag" 11 "fmt" 12 "log/slog" 13 "os" 14 "os/signal" 15 "syscall" 16 17 charmlog "github.com/charmbracelet/log" 18 19 "go.mitchellh.com/tack/internal/buildkite" 20) 21 22// config is the runtime configuration, sourced from environment variables and 23// flags. Env vars match the README so this can be swapped in for the stock 24// spindle without surprises. 25type config struct { 26 Addr string 27 Hostname string 28 OwnerDID string 29 JetstreamURL string 30 DBPath string 31 // Dev flips the knot event-stream scheme from wss:// to ws://. 32 // Useful when running against a local knot during development. 33 Dev bool 34 35 // Buildkite-mode configuration. BuildkiteToken is the switch: 36 // when empty we fall back to the in-process fake provider 37 // (useful for local development against a real Tangled 38 // jetstream); when set, the other Buildkite fields are 39 // required and tack will refuse to start without them. 40 // 41 // BuildkiteOrg is the *default* org used when a workflow YAML 42 // doesn't specify one of its own. The pipeline a workflow runs 43 // against is no longer global — it's pulled from the workflow 44 // body itself (see workflowConfig in provider_buildkite.go). 45 BuildkiteToken string 46 BuildkiteOrg string 47 BuildkiteWebhookSecret string 48 BuildkiteWebhookMode buildkite.WebhookMode 49 50 // Tekton mode is explicit because it only works from inside a 51 // Kubernetes cluster. When enabled, tack creates PipelineRuns in 52 // TektonNamespace using its pod's service account credentials. 53 TektonEnabled bool 54 TektonNamespace string 55} 56 57func loadConfig() (config, error) { 58 cfg := config{ 59 Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 60 Hostname: os.Getenv("TACK_HOSTNAME"), 61 OwnerDID: os.Getenv("TACK_OWNER_DID"), 62 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 63 DBPath: envOr("TACK_DB_PATH", "tack.db"), 64 Dev: os.Getenv("TACK_DEV") != "", 65 BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"), 66 BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"), 67 BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"), 68 BuildkiteWebhookMode: buildkite.WebhookMode( 69 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)), 70 ), 71 TektonEnabled: os.Getenv("TACK_TEKTON_ENABLED") == "1", 72 TektonNamespace: envOr("TACK_TEKTON_NAMESPACE", "default"), 73 } 74 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 75 flag.Parse() 76 cfg.Addr = *addrFlag 77 78 if cfg.OwnerDID == "" { 79 return cfg, errors.New("TACK_OWNER_DID is required") 80 } 81 82 // Hostname identifies *us* in sh.tangled.repo records (the .spindle 83 // field). Without it we have no way to know which repos point at us 84 // and therefore which knots we should subscribe to for pipeline 85 // triggers — so we refuse to start rather than silently subscribe to 86 // nothing. 87 if cfg.Hostname == "" { 88 return cfg, errors.New("TACK_HOSTNAME is required") 89 } 90 91 // If the operator opted into Buildkite mode (by supplying a 92 // token), every other Buildkite knob has to be present. Half- 93 // configured Buildkite leads to confusing failures deep in the 94 // provider; catch it at startup. 95 if cfg.BuildkiteToken != "" { 96 if cfg.BuildkiteOrg == "" { 97 return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set") 98 } 99 if cfg.BuildkiteWebhookSecret == "" { 100 return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set") 101 } 102 switch cfg.BuildkiteWebhookMode { 103 case buildkite.WebhookModeToken, buildkite.WebhookModeSignature: 104 default: 105 return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q", 106 buildkite.WebhookModeToken, buildkite.WebhookModeSignature, 107 cfg.BuildkiteWebhookMode, 108 ) 109 } 110 } 111 if cfg.TektonEnabled && cfg.TektonNamespace == "" { 112 return cfg, errors.New("TACK_TEKTON_NAMESPACE is required when TACK_TEKTON_ENABLED=1") 113 } 114 115 return cfg, nil 116} 117 118func envOr(key, def string) string { 119 if v := os.Getenv(key); v != "" { 120 return v 121 } 122 return def 123} 124 125func main() { 126 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap 127 // it in slog.New to share the same backend with libraries that expect 128 // a *slog.Logger (notably the jetstream client). 129 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ 130 Level: charmlog.DebugLevel, 131 ReportTimestamp: true, 132 }) 133 logger := slog.New(charmHandler) 134 slog.SetDefault(logger) 135 136 // Config loading 137 cfg, err := loadConfig() 138 if err != nil { 139 logger.Error("invalid configuration", "err", err) 140 os.Exit(2) 141 } 142 143 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached 144 // so any function we hand it to can pull it back out via loggerFrom. 145 ctx, stop := signal.NotifyContext( 146 context.Background(), 147 os.Interrupt, syscall.SIGTERM, 148 ) 149 defer stop() 150 ctx = loggerInto(ctx, logger) 151 152 // Open (or create) the SQLite store. Holds jetstream cursor + 153 // observed Tangled membership records. Closed last during shutdown 154 // so anything writing to it during teardown still succeeds. 155 st, err := openStore(cfg.DBPath) 156 if err != nil { 157 logger.Error("failed to open store", "err", err, "path", cfg.DBPath) 158 os.Exit(1) 159 } 160 defer func() { 161 if err := st.Close(); err != nil { 162 logger.Error("close store", "err", err) 163 } 164 }() 165 logger.Info("store open", "path", cfg.DBPath) 166 167 // In-process broker for the /events fan-out. Wraps the store so 168 // publishes are durable and reconnecting subscribers can resume by 169 // cursor. Constructed before the consumers in case we ever want 170 // them to publish synthetic status events at startup. 171 br := newBroker(st) 172 173 // Providers turn Tangled pipeline triggers into pipeline.status 174 // events. We always wire the fake provider in so workflows can 175 // opt into it (via `tack: { fake: ... }`) for end-to-end testing 176 // even when Buildkite credentials are present; the Buildkite 177 // provider is added on top when configured. Routing is per- 178 // workflow and driven by the workflow YAML — see providerRouter. 179 // 180 // bkProvider is kept as a typed pointer separately because the 181 // /webhooks/buildkite handler needs the concrete *buildkiteProvider 182 // (for HandleWebhook + signature verification), not the abstract 183 // Provider surface. 184 providers := map[string]Provider{ 185 "fake": newFakeProvider(br, logger), 186 } 187 logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)") 188 189 var bkProvider *buildkiteProvider 190 if cfg.BuildkiteToken != "" { 191 bkProvider = newBuildkiteProvider( 192 br, st, 193 buildkite.NewClient(cfg.BuildkiteToken), 194 cfg.BuildkiteOrg, 195 cfg.BuildkiteWebhookSecret, 196 cfg.BuildkiteWebhookMode, 197 logger, 198 ) 199 providers["buildkite"] = bkProvider 200 logger.Info("buildkite provider enabled", 201 "default_org", cfg.BuildkiteOrg, 202 "webhook_mode", cfg.BuildkiteWebhookMode, 203 ) 204 } 205 if cfg.TektonEnabled { 206 tkProvider, err := newInClusterTektonProvider( 207 br, st, 208 cfg.TektonNamespace, 209 logger, 210 ) 211 if err != nil { 212 logger.Error("failed to configure tekton provider", "err", err) 213 os.Exit(1) 214 } 215 providers["tekton"] = tkProvider 216 logger.Info("tekton provider enabled", 217 "namespace", cfg.TektonNamespace, 218 ) 219 } 220 provider := newProviderRouter(logger, providers) 221 222 // Start the knot event-stream consumer first so the jetstream 223 // loop has somewhere to register newly-observed knots into. It 224 // gets the provider so each incoming pipeline trigger has 225 // something to dispatch to. 226 knots, err := startKnotConsumer(ctx, cfg, st, provider) 227 if err != nil { 228 logger.Error("failed to start knot consumer", "err", err) 229 os.Exit(1) 230 } 231 defer knots.Stop() 232 233 // Start the JetStream listener in the background. It hands the knot 234 // consumer any new knot referenced by an incoming sh.tangled.repo 235 // record so we don't have to wait for a restart to pick it up. 236 if err := startJetstream(ctx, cfg, st, knots); err != nil { 237 logger.Error("failed to start jetstream consumer", "err", err) 238 os.Exit(1) 239 } 240 241 // Run the HTTP server. This blocks until ctx is cancelled or the 242 // listener errors. 243 if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil { 244 logger.Error("http server error", "err", err) 245 os.Exit(1) 246 } 247}