Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline 2// trigger records into Buildkite builds, and publishes Buildkite job state 3// back as sh.tangled.pipeline.status events on a WebSocket stream that the 4// Tangled appview can consume. 5package main 6 7import ( 8 "context" 9 "errors" 10 "flag" 11 "fmt" 12 "log/slog" 13 "os" 14 "os/signal" 15 "syscall" 16 17 charmlog "github.com/charmbracelet/log" 18 19 "go.mitchellh.com/tack/internal/buildkite" 20) 21 22// config is the runtime configuration, sourced from environment variables and 23// flags. Env vars match the README so this can be swapped in for the stock 24// spindle without surprises. 25type config struct { 26 Addr string 27 Hostname string 28 OwnerDID string 29 JetstreamURL string 30 DBPath string 31 // Dev flips the knot event-stream scheme from wss:// to ws://. 32 // Useful when running against a local knot during development. 33 Dev bool 34 35 // Buildkite-mode configuration. BuildkiteToken is the switch: 36 // when empty we fall back to the in-process fake provider 37 // (useful for local development against a real Tangled 38 // jetstream); when set, the other Buildkite fields are 39 // required and tack will refuse to start without them. 40 // 41 // BuildkiteOrg is the *default* org used when a workflow YAML 42 // doesn't specify one of its own. The pipeline a workflow runs 43 // against is no longer global — it's pulled from the workflow 44 // body itself (see workflowConfig in provider_buildkite.go). 45 BuildkiteToken string 46 BuildkiteOrg string 47 BuildkiteWebhookSecret string 48 BuildkiteWebhookMode buildkite.WebhookMode 49} 50 51func loadConfig() (config, error) { 52 cfg := config{ 53 Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 54 Hostname: os.Getenv("TACK_HOSTNAME"), 55 OwnerDID: os.Getenv("TACK_OWNER_DID"), 56 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 57 DBPath: envOr("TACK_DB_PATH", "tack.db"), 58 Dev: os.Getenv("TACK_DEV") != "", 59 BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"), 60 BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"), 61 BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"), 62 BuildkiteWebhookMode: buildkite.WebhookMode( 63 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)), 64 ), 65 } 66 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 67 flag.Parse() 68 cfg.Addr = *addrFlag 69 70 if cfg.OwnerDID == "" { 71 return cfg, errors.New("TACK_OWNER_DID is required") 72 } 73 74 // Hostname identifies *us* in sh.tangled.repo records (the .spindle 75 // field). Without it we have no way to know which repos point at us 76 // and therefore which knots we should subscribe to for pipeline 77 // triggers — so we refuse to start rather than silently subscribe to 78 // nothing. 79 if cfg.Hostname == "" { 80 return cfg, errors.New("TACK_HOSTNAME is required") 81 } 82 83 // If the operator opted into Buildkite mode (by supplying a 84 // token), every other Buildkite knob has to be present. Half- 85 // configured Buildkite leads to confusing failures deep in the 86 // provider; catch it at startup. 87 if cfg.BuildkiteToken != "" { 88 if cfg.BuildkiteOrg == "" { 89 return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set") 90 } 91 if cfg.BuildkiteWebhookSecret == "" { 92 return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set") 93 } 94 switch cfg.BuildkiteWebhookMode { 95 case buildkite.WebhookModeToken, buildkite.WebhookModeSignature: 96 default: 97 return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q", 98 buildkite.WebhookModeToken, buildkite.WebhookModeSignature, 99 cfg.BuildkiteWebhookMode, 100 ) 101 } 102 } 103 104 return cfg, nil 105} 106 107func envOr(key, def string) string { 108 if v := os.Getenv(key); v != "" { 109 return v 110 } 111 return def 112} 113 114func main() { 115 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap 116 // it in slog.New to share the same backend with libraries that expect 117 // a *slog.Logger (notably the jetstream client). 118 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ 119 Level: charmlog.DebugLevel, 120 ReportTimestamp: true, 121 }) 122 logger := slog.New(charmHandler) 123 slog.SetDefault(logger) 124 125 // Config loading 126 cfg, err := loadConfig() 127 if err != nil { 128 logger.Error("invalid configuration", "err", err) 129 os.Exit(2) 130 } 131 132 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached 133 // so any function we hand it to can pull it back out via loggerFrom. 134 ctx, stop := signal.NotifyContext( 135 context.Background(), 136 os.Interrupt, syscall.SIGTERM, 137 ) 138 defer stop() 139 ctx = loggerInto(ctx, logger) 140 141 // Open (or create) the SQLite store. Holds jetstream cursor + 142 // observed Tangled membership records. Closed last during shutdown 143 // so anything writing to it during teardown still succeeds. 144 st, err := openStore(cfg.DBPath) 145 if err != nil { 146 logger.Error("failed to open store", "err", err, "path", cfg.DBPath) 147 os.Exit(1) 148 } 149 defer func() { 150 if err := st.Close(); err != nil { 151 logger.Error("close store", "err", err) 152 } 153 }() 154 logger.Info("store open", "path", cfg.DBPath) 155 156 // In-process broker for the /events fan-out. Wraps the store so 157 // publishes are durable and reconnecting subscribers can resume by 158 // cursor. Constructed before the consumers in case we ever want 159 // them to publish synthetic status events at startup. 160 br := newBroker(st) 161 162 // Providers turn Tangled pipeline triggers into pipeline.status 163 // events. We always wire the fake provider in so workflows can 164 // opt into it (via `tack: { fake: ... }`) for end-to-end testing 165 // even when Buildkite credentials are present; the Buildkite 166 // provider is added on top when configured. Routing is per- 167 // workflow and driven by the workflow YAML — see providerRouter. 168 // 169 // bkProvider is kept as a typed pointer separately because the 170 // /webhooks/buildkite handler needs the concrete *buildkiteProvider 171 // (for HandleWebhook + signature verification), not the abstract 172 // Provider surface. 173 providers := map[string]Provider{ 174 "fake": newFakeProvider(br, logger), 175 } 176 logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)") 177 178 var bkProvider *buildkiteProvider 179 if cfg.BuildkiteToken != "" { 180 bkProvider = newBuildkiteProvider( 181 br, st, 182 buildkite.NewClient(cfg.BuildkiteToken), 183 cfg.BuildkiteOrg, 184 cfg.BuildkiteWebhookSecret, 185 cfg.BuildkiteWebhookMode, 186 logger, 187 ) 188 providers["buildkite"] = bkProvider 189 logger.Info("buildkite provider enabled", 190 "default_org", cfg.BuildkiteOrg, 191 "webhook_mode", cfg.BuildkiteWebhookMode, 192 ) 193 } 194 provider := newProviderRouter(logger, providers) 195 196 // Start the knot event-stream consumer first so the jetstream 197 // loop has somewhere to register newly-observed knots into. It 198 // gets the provider so each incoming pipeline trigger has 199 // something to dispatch to. 200 knots, err := startKnotConsumer(ctx, cfg, st, provider) 201 if err != nil { 202 logger.Error("failed to start knot consumer", "err", err) 203 os.Exit(1) 204 } 205 defer knots.Stop() 206 207 // Start the JetStream listener in the background. It hands the knot 208 // consumer any new knot referenced by an incoming sh.tangled.repo 209 // record so we don't have to wait for a restart to pick it up. 210 if err := startJetstream(ctx, cfg, st, knots); err != nil { 211 logger.Error("failed to start jetstream consumer", "err", err) 212 os.Exit(1) 213 } 214 215 // Run the HTTP server. This blocks until ctx is cancelled or the 216 // listener errors. 217 if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil { 218 logger.Error("http server error", "err", err) 219 os.Exit(1) 220 } 221}