Stitch any CI into Tangled
1package main
2
3// Knot event-stream subscriber.
4//
5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that
6// streams JSON-wrapped record events for that knot, including the
7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do
8// *not* come over the AT Proto firehose (jetstream); the knot publishes
9// them itself, so as a spindle we have to dial each knot whose repos have
10// pointed at us.
11//
12// We use tangled-core's `eventconsumer` package, which already handles
13// per-source connection management, retries, ordered processing and
14// cursor tracking. We hand it:
15//
16// 1. The initial set of knots, derived from previously-observed
17// sh.tangled.repo records that named us as their .spindle field.
18// 2. A ProcessFunc that, for now, simply logs every received event.
19// Once the build pipeline is wired up this is where pipeline
20// triggers will be translated into Buildkite builds.
21//
22// The jetstream consumer also gets a back-reference (via the
23// KnotConsumer interface) so it can dynamically subscribe a new knot —
24// or unsubscribe one — the moment a matching sh.tangled.repo record
25// arrives, without waiting for a tack restart.
26
27import (
28 "context"
29 "encoding/json"
30 "fmt"
31 "log/slog"
32 "time"
33
34 "tangled.org/core/api/tangled"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/eventconsumer/cursor"
37)
38
39// KnotConsumer is the surface area the rest of tack uses to interact
40// with the knot event-stream subscriber. It exists so we can test
41// knot interactions from tests.
42//
43// Implementations must be safe for concurrent use: AddKnot is invoked
44// from the jetstream goroutine while the consumer's worker goroutines
45// are independently processing inbound knot events.
46//
47// The fake at knot_fake.go provides a no-network implementation suitable
48// for use from tests.
49type KnotConsumer interface {
50 // AddKnot subscribes to the given knot's /events websocket if not
51 // already subscribed. Calling with the same knot more than once is
52 // a no-op. An empty knot string is ignored. The supplied context
53 // scopes the dial; cancelling it tears the subscription down.
54 AddKnot(ctx context.Context, knot string)
55
56 // RemoveKnot stops processing events from the given knot. It is the
57 // inverse of AddKnot and must tolerate being called for a knot that
58 // was never added (no-op). An empty knot string is ignored.
59 RemoveKnot(ctx context.Context, knot string)
60}
61
62// knotConsumer is the production KnotConsumer. It wraps
63// eventconsumer.Consumer with the small surface the rest of tack actually
64// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook
65// owned by main.
66//
67// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps
68// callers from importing eventconsumer just to construct a KnotSource,
69// and lets us swap or extend the underlying transport later.
70type knotConsumer struct {
71 c *eventconsumer.Consumer
72 log *slog.Logger
73
74 // cursors is the persistent cursor store the underlying consumer
75 // reads at every (re)connect. We retain a reference here so we
76 // can pre-seed an entry to "now" the first time we ever see a
77 // knot — see seedCursorIfMissing for why.
78 cursors cursor.Store
79
80 // provider dispatches each incoming pipeline trigger to whatever
81 // backend actually runs it (today: the fake provider; tomorrow:
82 // Buildkite). The consumer doesn't care which — it just hands
83 // over the decoded record and lets the provider publish status
84 // records back through its own broker connection.
85 provider Provider
86
87 // store, hostname, and ownerDID are the inputs to the
88 // pre-dispatch authorization check. We hold them on the
89 // consumer (instead of threading them through every call) so
90 // process(), which runs once per inbound knot event on a
91 // worker goroutine, can ask `is this trigger allowed?` against
92 // the persisted Tangled membership state without the rest of
93 // tack having to know it cares.
94 store *store
95 hostname string
96 ownerDID string
97}
98
99// Compile-time interface conformance check.
100var _ KnotConsumer = (*knotConsumer)(nil)
101
102// startKnotConsumer builds a knot event consumer pre-loaded with every
103// knot already known to the store, starts its connection loops in the
104// background, and returns the wrapper. The consumer keeps running until
105// ctx is cancelled.
106//
107// Cursor persistence is backed by tangled-core's SQLite cursor store
108// pointed at the same database file as the rest of tack. Without
109// this, a restart would replay every recent pipeline event from each
110// knot and fire a duplicate Buildkite build for each one. The
111// cursor.SqliteStore opens its own *sql.DB on the file, but
112// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and
113// the `cursors` table the upstream package creates doesn't collide
114// with any of our migrations.
115func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
116 logger := loggerFrom(ctx).With("component", "knotconsumer")
117
118 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname, cfg.OwnerDID)
119 if err != nil {
120 return nil, fmt.Errorf("load known knots: %w", err)
121 }
122
123 // Persistent per-source cursor store. Keyed by source.Key() (the
124 // knot hostname for KnotSource), so each knot resumes exactly
125 // where it left off and we don't re-fire builds for events that
126 // were already processed before the previous shutdown.
127 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath)
128 if err != nil {
129 return nil, fmt.Errorf("open knot cursor store: %w", err)
130 }
131
132 kc := &knotConsumer{
133 log: logger,
134 cursors: cursorStore,
135 provider: provider,
136 store: st,
137 hostname: cfg.Hostname,
138 ownerDID: cfg.OwnerDID,
139 }
140
141 ccfg := eventconsumer.NewConsumerConfig()
142 ccfg.Logger = logger
143 ccfg.Dev = cfg.Dev
144 ccfg.ProcessFunc = kc.process
145 ccfg.CursorStore = cursorStore
146 for _, k := range knots {
147 // Pin a brand-new knot's cursor to "now" before the consumer
148 // ever connects, so a fresh tack install doesn't replay every
149 // historical pipeline event the knot has retained and fire a
150 // duplicate Buildkite build for each one.
151 kc.seedCursorIfMissing(k)
152 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
153 logger.Info("seeding knot source", "knot", k)
154 }
155 kc.c = eventconsumer.NewConsumer(*ccfg)
156
157 // Start workers + per-source connection loops. Consumer.Start is
158 // non-blocking; the goroutines it spawns observe ctx for shutdown.
159 kc.c.Start(ctx)
160 logger.Info("knot consumer started", "initial_knots", len(knots))
161
162 return kc, nil
163}
164
165// AddKnot subscribes to a knot we hadn't been watching before. Safe to
166// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the
167// source's Key (the knot hostname), so passing the same knot twice is a
168// no-op.
169func (k *knotConsumer) AddKnot(ctx context.Context, knot string) {
170 if knot == "" {
171 return
172 }
173 // Same first-run protection as in startKnotConsumer: a knot we
174 // have never observed before must not retroactively fire
175 // pipelines for triggers older than the moment we learned about
176 // it. AddSource reads the cursor synchronously when it dials.
177 k.seedCursorIfMissing(knot)
178 k.log.Info("adding knot source", "knot", knot)
179 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
180}
181
182// seedCursorIfMissing writes the current time (as nanoseconds since
183// the unix epoch, the same unit eventconsumer.worker uses when it
184// advances cursors after each message) to the cursor store for knot
185// iff no cursor is already persisted for it.
186//
187// Without this, a brand-new install — or the first time we ever
188// dial a previously-unseen knot — connects to /events with no
189// cursor query parameter, which the knot servers interpret as
190// "stream from the beginning of time." For pipeline records that
191// would fire one Buildkite build per historical trigger the knot
192// still has retained. Pinning the cursor up-front limits us to
193// events that arrive *after* tack learned about the knot.
194//
195// Get returning 0 means "no cursor stored" across all the cursor
196// store implementations we use (memory/sqlite/redis), so it's a
197// safe sentinel to gate the write.
198func (k *knotConsumer) seedCursorIfMissing(knot string) {
199 if k.cursors.Get(knot) != 0 {
200 return
201 }
202 now := time.Now().UnixNano()
203 k.cursors.Set(knot, now)
204 k.log.Info("seeded fresh knot cursor to now",
205 "knot", knot,
206 "cursor", now,
207 )
208}
209
210// RemoveKnot unsubscribes from a knot that no longer has any authorized
211// repos pointing at this spindle. Safe to call repeatedly:
212// eventconsumer.Consumer.RemoveSource ignores sources that are not present.
213func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
214 if knot == "" {
215 return
216 }
217 k.log.Info("removing knot source", "knot", knot)
218 k.c.RemoveSource(eventconsumer.NewKnotSource(knot))
219}
220
221// Stop tears down all knot websocket connections and waits for the
222// consumer's goroutines to exit. It must be called exactly once.
223func (k *knotConsumer) Stop() {
224 k.c.Stop()
225}
226
227// process is the ProcessFunc handed to eventconsumer. It runs once per
228// inbound message, on a worker goroutine. For now we only care about
229// pipeline records — everything else is logged at debug and dropped.
230//
231// Returning an error only logs it (the consumer keeps reading); the
232// cursor is advanced before the ProcessFunc runs, so a returned error
233// does *not* cause a replay.
234func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
235 switch msg.Nsid {
236 case tangled.PipelineNSID:
237 var p tangled.Pipeline
238 if err := json.Unmarshal(msg.EventJson, &p); err != nil {
239 k.log.Error("decode pipeline",
240 "err", err,
241 "knot", src.Key(),
242 "rkey", msg.Rkey,
243 )
244 return err
245 }
246
247 // Pull a couple of fields out of the trigger metadata for log
248 // context. They're all optional in the schema, so each one is
249 // guarded — we want a noisy log entry, not a nil-deref.
250 var (
251 triggerKind string
252 repoDid string
253 repoName string
254 )
255 if p.TriggerMetadata != nil {
256 triggerKind = p.TriggerMetadata.Kind
257 if p.TriggerMetadata.Repo != nil {
258 repoDid = p.TriggerMetadata.Repo.Did
259 if p.TriggerMetadata.Repo.Repo != nil {
260 repoName = *p.TriggerMetadata.Repo.Repo
261 }
262 }
263 }
264
265 k.log.Info("pipeline event",
266 "knot", src.Key(),
267 "rkey", msg.Rkey,
268 "trigger", triggerKind,
269 "repo_did", repoDid,
270 "repo", repoName,
271 "workflows", len(p.Workflows),
272 )
273
274 // Authorization gate. The knot's /events stream will hand
275 // us every pipeline record it publishes, including ones
276 // for repos that never opted into us. Being subscribed
277 // to a knot says nothing about which repos on it we
278 // actually serve. Without this check, any DID that can
279 // publish a sh.tangled.pipeline record on a knot we
280 // already watch could trigger CI on our spindle.
281 //
282 // We consult the persisted state mirrored from the AT
283 // Proto firehose: the trigger's repo must have declared
284 // us as its spindle on this knot, and its publisher DID
285 // must be the spindle owner or a member they vouched for
286 // via sh.tangled.spindle.member. See
287 // store.AuthorizePipelineActor for the precise rules.
288 ok, reason, err := k.store.AuthorizePipelineActor(
289 ctx, k.hostname, src.Key(), k.ownerDID,
290 repoDid, repoName,
291 )
292 if err != nil {
293 // SQL/IO failure (not a denial). Log and refuse to
294 // spawn rather than fail-open: a transient store
295 // error during shutdown shouldn't be allowed to
296 // punch a hole in the security gate.
297 k.log.Error("authorize pipeline trigger",
298 "err", err,
299 "knot", src.Key(),
300 "rkey", msg.Rkey,
301 "repo_did", repoDid,
302 "repo", repoName,
303 )
304 return err
305 }
306 if !ok {
307 k.log.Warn("rejecting unauthorized pipeline trigger",
308 "knot", src.Key(),
309 "rkey", msg.Rkey,
310 "repo_did", repoDid,
311 "repo", repoName,
312 "reason", reason,
313 )
314 return nil
315 }
316
317 // Hand the trigger to whichever Provider was configured.
318 // Spawn is non-blocking — it fans out into provider-owned
319 // goroutines so this worker can move on to the next event.
320 // The provider keeps ctx around for shutdown coordination.
321 // repoDid is the actor: the publisher of the originating
322 // sh.tangled.repo record, which AuthorizePipelineActor
323 // just confirmed is allowed to spawn work here.
324 k.provider.Spawn(ctx, src.Key(), msg.Rkey, repoDid, p.TriggerMetadata, p.Workflows)
325
326 default:
327 // Knots may publish other record types over the same stream; we
328 // don't care about them yet. Debug-only so it's available when
329 // chasing "why isn't my pipeline firing" but doesn't drown out
330 // info-level logs.
331 k.log.Debug("ignored knot event",
332 "knot", src.Key(),
333 "nsid", msg.Nsid,
334 "rkey", msg.Rkey,
335 )
336 }
337
338 return nil
339}