Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1// Tack is a custom Tangled spindle that translates sh.tangled.pipeline 2// trigger records into Buildkite builds, and publishes Buildkite job state 3// back as sh.tangled.pipeline.status events on a WebSocket stream that the 4// Tangled appview can consume. 5package main 6 7import ( 8 "context" 9 "errors" 10 "flag" 11 "log/slog" 12 "os" 13 "os/signal" 14 "syscall" 15 16 charmlog "github.com/charmbracelet/log" 17) 18 19// config is the runtime configuration, sourced from environment variables and 20// flags. Env vars match the README so this can be swapped in for the stock 21// spindle without surprises. 22type config struct { 23 Addr string 24 Hostname string 25 OwnerDID string 26 JetstreamURL string 27 DBPath string 28 // Dev flips the knot event-stream scheme from wss:// to ws://. 29 // Useful when running against a local knot during development. 30 Dev bool 31} 32 33func loadConfig() (config, error) { 34 cfg := config{ 35 Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 36 Hostname: os.Getenv("TACK_HOSTNAME"), 37 OwnerDID: os.Getenv("TACK_OWNER_DID"), 38 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 39 DBPath: envOr("TACK_DB_PATH", "tack.db"), 40 Dev: os.Getenv("TACK_DEV") != "", 41 } 42 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 43 flag.Parse() 44 cfg.Addr = *addrFlag 45 46 if cfg.OwnerDID == "" { 47 return cfg, errors.New("TACK_OWNER_DID is required") 48 } 49 50 // Hostname identifies *us* in sh.tangled.repo records (the .spindle 51 // field). Without it we have no way to know which repos point at us 52 // and therefore which knots we should subscribe to for pipeline 53 // triggers — so we refuse to start rather than silently subscribe to 54 // nothing. 55 if cfg.Hostname == "" { 56 return cfg, errors.New("TACK_HOSTNAME is required") 57 } 58 59 return cfg, nil 60} 61 62func envOr(key, def string) string { 63 if v := os.Getenv(key); v != "" { 64 return v 65 } 66 return def 67} 68 69func main() { 70 // Logging setup. charmbracelet/log implements slog.Handler, so we wrap 71 // it in slog.New to share the same backend with libraries that expect 72 // a *slog.Logger (notably the jetstream client). 73 charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ 74 Level: charmlog.DebugLevel, 75 ReportTimestamp: true, 76 }) 77 logger := slog.New(charmHandler) 78 slog.SetDefault(logger) 79 80 // Config loading 81 cfg, err := loadConfig() 82 if err != nil { 83 logger.Error("invalid configuration", "err", err) 84 os.Exit(2) 85 } 86 87 // Root context: cancelled on SIGINT/SIGTERM, with the logger attached 88 // so any function we hand it to can pull it back out via loggerFrom. 89 ctx, stop := signal.NotifyContext( 90 context.Background(), 91 os.Interrupt, syscall.SIGTERM, 92 ) 93 defer stop() 94 ctx = loggerInto(ctx, logger) 95 96 // Open (or create) the SQLite store. Holds jetstream cursor + 97 // observed Tangled membership records. Closed last during shutdown 98 // so anything writing to it during teardown still succeeds. 99 st, err := openStore(cfg.DBPath) 100 if err != nil { 101 logger.Error("failed to open store", "err", err, "path", cfg.DBPath) 102 os.Exit(1) 103 } 104 defer func() { 105 if err := st.Close(); err != nil { 106 logger.Error("close store", "err", err) 107 } 108 }() 109 logger.Info("store open", "path", cfg.DBPath) 110 111 // In-process broker for the /events fan-out. Wraps the store so 112 // publishes are durable and reconnecting subscribers can resume by 113 // cursor. Constructed before the consumers in case we ever want 114 // them to publish synthetic status events at startup. 115 br := newBroker(st) 116 117 // Provider that turns Tangled pipeline triggers into 118 // pipeline.status events. The fake provider stands in for a real 119 // CI integration: it emits synthetic running/success heartbeats 120 // over the broker so the entire jetstream → knot → /events flow 121 // is exercisable end-to-end. Swap this for a Buildkite-backed 122 // implementation once that lands. 123 provider := newFakeProvider(br, logger) 124 125 // Start the knot event-stream consumer first so the jetstream 126 // loop has somewhere to register newly-observed knots into. It 127 // gets the provider so each incoming pipeline trigger has 128 // something to dispatch to. 129 knots, err := startKnotConsumer(ctx, cfg, st, provider) 130 if err != nil { 131 logger.Error("failed to start knot consumer", "err", err) 132 os.Exit(1) 133 } 134 defer knots.Stop() 135 136 // Start the JetStream listener in the background. It hands the knot 137 // consumer any new knot referenced by an incoming sh.tangled.repo 138 // record so we don't have to wait for a restart to pick it up. 139 if err := startJetstream(ctx, cfg, st, knots); err != nil { 140 logger.Error("failed to start jetstream consumer", "err", err) 141 os.Exit(1) 142 } 143 144 // Run the HTTP server. This blocks until ctx is cancelled or the 145 // listener errors. 146 if err := runHTTP(ctx, cfg, br, provider); err != nil { 147 logger.Error("http server error", "err", err) 148 os.Exit(1) 149 } 150}