Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Knot event-stream subscriber. 4// 5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6// streams JSON-wrapped record events for that knot, including the 7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8// *not* come over the AT Proto firehose (jetstream); the knot publishes 9// them itself, so as a spindle we have to dial each knot whose repos have 10// pointed at us. 11// 12// We use tangled-core's `eventconsumer` package, which already handles 13// per-source connection management, retries, ordered processing and 14// cursor tracking. We hand it: 15// 16// 1. The initial set of knots, derived from previously-observed 17// sh.tangled.repo records that named us as their .spindle field. 18// 2. A ProcessFunc that, for now, simply logs every received event. 19// Once the build pipeline is wired up this is where pipeline 20// triggers will be translated into Buildkite builds. 21// 22// The jetstream consumer also gets a back-reference (via the 23// KnotConsumer interface) so it can dynamically subscribe a new knot — 24// or, conceptually, unsubscribe one — the moment a matching 25// sh.tangled.repo record arrives, without waiting for a tack restart. 26 27import ( 28 "context" 29 "encoding/json" 30 "fmt" 31 "log/slog" 32 "time" 33 34 "tangled.org/core/api/tangled" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/eventconsumer/cursor" 37) 38 39// KnotConsumer is the surface area the rest of tack uses to interact 40// with the knot event-stream subscriber. It exists so we can test 41// knot interactions from tests. 42// 43// Implementations must be safe for concurrent use: AddKnot is invoked 44// from the jetstream goroutine while the consumer's worker goroutines 45// are independently processing inbound knot events. 46// 47// The fake at knot_fake.go provides a no-network implementation suitable 48// for use from tests. 49type KnotConsumer interface { 50 // AddKnot subscribes to the given knot's /events websocket if not 51 // already subscribed. Calling with the same knot more than once is 52 // a no-op. An empty knot string is ignored. The supplied context 53 // scopes the dial; cancelling it tears the subscription down. 54 AddKnot(ctx context.Context, knot string) 55 56 // RemoveKnot stops processing events from the given knot. It is the 57 // inverse of AddKnot and must tolerate being called for a knot that 58 // was never added (no-op). An empty knot string is ignored. 59 // 60 // The production implementation is currently a no-op: tangled-core's 61 // eventconsumer does not expose a way to drop an individual source's 62 // websocket. Tracked upstream as 63 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510 64 RemoveKnot(ctx context.Context, knot string) 65} 66 67// knotConsumer is the production KnotConsumer. It wraps 68// eventconsumer.Consumer with the small surface the rest of tack actually 69// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 70// owned by main. 71// 72// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 73// callers from importing eventconsumer just to construct a KnotSource, 74// and lets us swap or extend the underlying transport later. 75type knotConsumer struct { 76 c *eventconsumer.Consumer 77 log *slog.Logger 78 79 // cursors is the persistent cursor store the underlying consumer 80 // reads at every (re)connect. We retain a reference here so we 81 // can pre-seed an entry to "now" the first time we ever see a 82 // knot — see seedCursorIfMissing for why. 83 cursors cursor.Store 84 85 // provider dispatches each incoming pipeline trigger to whatever 86 // backend actually runs it (today: the fake provider; tomorrow: 87 // Buildkite). The consumer doesn't care which — it just hands 88 // over the decoded record and lets the provider publish status 89 // records back through its own broker connection. 90 provider Provider 91} 92 93// Compile-time interface conformance check. 94var _ KnotConsumer = (*knotConsumer)(nil) 95 96// startKnotConsumer builds a knot event consumer pre-loaded with every 97// knot already known to the store, starts its connection loops in the 98// background, and returns the wrapper. The consumer keeps running until 99// ctx is cancelled. 100// 101// Cursor persistence is backed by tangled-core's SQLite cursor store 102// pointed at the same database file as the rest of tack. Without 103// this, a restart would replay every recent pipeline event from each 104// knot and fire a duplicate Buildkite build for each one. The 105// cursor.SqliteStore opens its own *sql.DB on the file, but 106// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and 107// the `cursors` table the upstream package creates doesn't collide 108// with any of our migrations. 109func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) { 110 logger := loggerFrom(ctx).With("component", "knotconsumer") 111 112 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 113 if err != nil { 114 return nil, fmt.Errorf("load known knots: %w", err) 115 } 116 117 // Persistent per-source cursor store. Keyed by source.Key() (the 118 // knot hostname for KnotSource), so each knot resumes exactly 119 // where it left off and we don't re-fire builds for events that 120 // were already processed before the previous shutdown. 121 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath) 122 if err != nil { 123 return nil, fmt.Errorf("open knot cursor store: %w", err) 124 } 125 126 kc := &knotConsumer{log: logger, cursors: cursorStore, provider: provider} 127 128 ccfg := eventconsumer.NewConsumerConfig() 129 ccfg.Logger = logger 130 ccfg.Dev = cfg.Dev 131 ccfg.ProcessFunc = kc.process 132 ccfg.CursorStore = cursorStore 133 for _, k := range knots { 134 // Pin a brand-new knot's cursor to "now" before the consumer 135 // ever connects, so a fresh tack install doesn't replay every 136 // historical pipeline event the knot has retained and fire a 137 // duplicate Buildkite build for each one. 138 kc.seedCursorIfMissing(k) 139 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 140 logger.Info("seeding knot source", "knot", k) 141 } 142 kc.c = eventconsumer.NewConsumer(*ccfg) 143 144 // Start workers + per-source connection loops. Consumer.Start is 145 // non-blocking; the goroutines it spawns observe ctx for shutdown. 146 kc.c.Start(ctx) 147 logger.Info("knot consumer started", "initial_knots", len(knots)) 148 149 return kc, nil 150} 151 152// AddKnot subscribes to a knot we hadn't been watching before. Safe to 153// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 154// source's Key (the knot hostname), so passing the same knot twice is a 155// no-op. 156func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 157 if knot == "" { 158 return 159 } 160 // Same first-run protection as in startKnotConsumer: a knot we 161 // have never observed before must not retroactively fire 162 // pipelines for triggers older than the moment we learned about 163 // it. AddSource reads the cursor synchronously when it dials. 164 k.seedCursorIfMissing(knot) 165 k.log.Info("adding knot source", "knot", knot) 166 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 167} 168 169// seedCursorIfMissing writes the current time (as nanoseconds since 170// the unix epoch, the same unit eventconsumer.worker uses when it 171// advances cursors after each message) to the cursor store for knot 172// iff no cursor is already persisted for it. 173// 174// Without this, a brand-new install — or the first time we ever 175// dial a previously-unseen knot — connects to /events with no 176// cursor query parameter, which the knot servers interpret as 177// "stream from the beginning of time." For pipeline records that 178// would fire one Buildkite build per historical trigger the knot 179// still has retained. Pinning the cursor up-front limits us to 180// events that arrive *after* tack learned about the knot. 181// 182// Get returning 0 means "no cursor stored" across all the cursor 183// store implementations we use (memory/sqlite/redis), so it's a 184// safe sentinel to gate the write. 185func (k *knotConsumer) seedCursorIfMissing(knot string) { 186 if k.cursors.Get(knot) != 0 { 187 return 188 } 189 now := time.Now().UnixNano() 190 k.cursors.Set(knot, now) 191 k.log.Info("seeded fresh knot cursor to now", 192 "knot", knot, 193 "cursor", now, 194 ) 195} 196 197// RemoveKnot is currently a no-op — see the interface comment for the 198// upstream blocker. We still log at info so operators can see when the 199// reconciliation logic *would* have unsubscribed; once eventconsumer 200// gains a RemoveSource primitive, swap the body for a real call. 201func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) { 202 if knot == "" { 203 return 204 } 205 k.log.Info("remove knot source requested (no-op until upstream supports it)", 206 "knot", knot, 207 ) 208} 209 210// Stop tears down all knot websocket connections and waits for the 211// consumer's goroutines to exit. It must be called exactly once. 212func (k *knotConsumer) Stop() { 213 k.c.Stop() 214} 215 216// process is the ProcessFunc handed to eventconsumer. It runs once per 217// inbound message, on a worker goroutine. For now we only care about 218// pipeline records — everything else is logged at debug and dropped. 219// 220// Returning an error only logs it (the consumer keeps reading); the 221// cursor is advanced before the ProcessFunc runs, so a returned error 222// does *not* cause a replay. 223func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 224 switch msg.Nsid { 225 case tangled.PipelineNSID: 226 var p tangled.Pipeline 227 if err := json.Unmarshal(msg.EventJson, &p); err != nil { 228 k.log.Error("decode pipeline", 229 "err", err, 230 "knot", src.Key(), 231 "rkey", msg.Rkey, 232 ) 233 return err 234 } 235 236 // Pull a couple of fields out of the trigger metadata for log 237 // context. They're all optional in the schema, so each one is 238 // guarded — we want a noisy log entry, not a nil-deref. 239 var ( 240 triggerKind string 241 repoDid string 242 repoName string 243 ) 244 if p.TriggerMetadata != nil { 245 triggerKind = p.TriggerMetadata.Kind 246 if p.TriggerMetadata.Repo != nil { 247 repoDid = p.TriggerMetadata.Repo.Did 248 if p.TriggerMetadata.Repo.Repo != nil { 249 repoName = *p.TriggerMetadata.Repo.Repo 250 } 251 } 252 } 253 254 k.log.Info("pipeline event", 255 "knot", src.Key(), 256 "rkey", msg.Rkey, 257 "trigger", triggerKind, 258 "repo_did", repoDid, 259 "repo", repoName, 260 "workflows", len(p.Workflows), 261 ) 262 263 // Hand the trigger to whichever Provider was configured. 264 // Spawn is non-blocking — it fans out into provider-owned 265 // goroutines so this worker can move on to the next event. 266 // The provider keeps ctx around for shutdown coordination. 267 k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.TriggerMetadata, p.Workflows) 268 269 default: 270 // Knots may publish other record types over the same stream; we 271 // don't care about them yet. Debug-only so it's available when 272 // chasing "why isn't my pipeline firing" but doesn't drown out 273 // info-level logs. 274 k.log.Debug("ignored knot event", 275 "knot", src.Key(), 276 "nsid", msg.Nsid, 277 "rkey", msg.Rkey, 278 ) 279 } 280 281 return nil 282}