Stitch any CI into Tangled
1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline
2// trigger records into Buildkite builds, and publishes Buildkite job state
3// back as sh.tangled.pipeline.status events on a WebSocket stream that the
4// Tangled appview can consume.
5package main
6
7import (
8 "context"
9 "errors"
10 "flag"
11 "log/slog"
12 "os"
13 "os/signal"
14 "syscall"
15
16 charmlog "github.com/charmbracelet/log"
17)
18
19// config is the runtime configuration, sourced from environment variables and
20// flags. Env vars match the README so this can be swapped in for the stock
21// spindle without surprises.
22type config struct {
23 Addr string
24 Hostname string
25 OwnerDID string
26 JetstreamURL string
27 DBPath string
28 // Dev flips the knot event-stream scheme from wss:// to ws://.
29 // Useful when running against a local knot during development.
30 Dev bool
31}
32
33func loadConfig() (config, error) {
34 cfg := config{
35 Addr: envOr("TACK_LISTEN_ADDR", ":8080"),
36 Hostname: os.Getenv("TACK_HOSTNAME"),
37 OwnerDID: os.Getenv("TACK_OWNER_DID"),
38 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"),
39 DBPath: envOr("TACK_DB_PATH", "tack.db"),
40 Dev: os.Getenv("TACK_DEV") != "",
41 }
42 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)")
43 flag.Parse()
44 cfg.Addr = *addrFlag
45
46 if cfg.OwnerDID == "" {
47 return cfg, errors.New("TACK_OWNER_DID is required")
48 }
49
50 // Hostname identifies *us* in sh.tangled.repo records (the .spindle
51 // field). Without it we have no way to know which repos point at us
52 // and therefore which knots we should subscribe to for pipeline
53 // triggers — so we refuse to start rather than silently subscribe to
54 // nothing.
55 if cfg.Hostname == "" {
56 return cfg, errors.New("TACK_HOSTNAME is required")
57 }
58
59 return cfg, nil
60}
61
62func envOr(key, def string) string {
63 if v := os.Getenv(key); v != "" {
64 return v
65 }
66 return def
67}
68
69func main() {
70 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap
71 // it in slog.New to share the same backend with libraries that expect
72 // a *slog.Logger (notably the jetstream client).
73 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{
74 Level: charmlog.DebugLevel,
75 ReportTimestamp: true,
76 })
77 logger := slog.New(charmHandler)
78 slog.SetDefault(logger)
79
80 // Config loading
81 cfg, err := loadConfig()
82 if err != nil {
83 logger.Error("invalid configuration", "err", err)
84 os.Exit(2)
85 }
86
87 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached
88 // so any function we hand it to can pull it back out via loggerFrom.
89 ctx, stop := signal.NotifyContext(
90 context.Background(),
91 os.Interrupt, syscall.SIGTERM,
92 )
93 defer stop()
94 ctx = loggerInto(ctx, logger)
95
96 // Open (or create) the SQLite store. Holds jetstream cursor +
97 // observed Tangled membership records. Closed last during shutdown
98 // so anything writing to it during teardown still succeeds.
99 st, err := openStore(cfg.DBPath)
100 if err != nil {
101 logger.Error("failed to open store", "err", err, "path", cfg.DBPath)
102 os.Exit(1)
103 }
104 defer func() {
105 if err := st.Close(); err != nil {
106 logger.Error("close store", "err", err)
107 }
108 }()
109 logger.Info("store open", "path", cfg.DBPath)
110
111 // In-process broker for the /events fan-out. Wraps the store so
112 // publishes are durable and reconnecting subscribers can resume by
113 // cursor. Constructed before the consumers in case we ever want
114 // them to publish synthetic status events at startup.
115 br := newBroker(st)
116
117 // Provider that turns Tangled pipeline triggers into
118 // pipeline.status events. The fake provider stands in for a real
119 // CI integration: it emits synthetic running/success heartbeats
120 // over the broker so the entire jetstream → knot → /events flow
121 // is exercisable end-to-end. Swap this for a Buildkite-backed
122 // implementation once that lands.
123 provider := newFakeProvider(br, logger)
124
125 // Start the knot event-stream consumer first so the jetstream
126 // loop has somewhere to register newly-observed knots into. It
127 // gets the provider so each incoming pipeline trigger has
128 // something to dispatch to.
129 knots, err := startKnotConsumer(ctx, cfg, st, provider)
130 if err != nil {
131 logger.Error("failed to start knot consumer", "err", err)
132 os.Exit(1)
133 }
134 defer knots.Stop()
135
136 // Start the JetStream listener in the background. It hands the knot
137 // consumer any new knot referenced by an incoming sh.tangled.repo
138 // record so we don't have to wait for a restart to pick it up.
139 if err := startJetstream(ctx, cfg, st, knots); err != nil {
140 logger.Error("failed to start jetstream consumer", "err", err)
141 os.Exit(1)
142 }
143
144 // Run the HTTP server. This blocks until ctx is cancelled or the
145 // listener errors.
146 if err := runHTTP(ctx, cfg, br, provider); err != nil {
147 logger.Error("http server error", "err", err)
148 os.Exit(1)
149 }
150}