Stitch any CI into Tangled
1package main
2
3// Knot event-stream subscriber.
4//
5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that
6// streams JSON-wrapped record events for that knot, including the
7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do
8// *not* come over the AT Proto firehose (jetstream); the knot publishes
9// them itself, so as a spindle we have to dial each knot whose repos have
10// pointed at us.
11//
12// We use tangled-core's `eventconsumer` package, which already handles
13// per-source connection management, retries, ordered processing and
14// cursor tracking. We hand it:
15//
16// 1. The initial set of knots, derived from previously-observed
17// sh.tangled.repo records that named us as their .spindle field.
18// 2. A ProcessFunc that, for now, simply logs every received event.
19// Once the build pipeline is wired up this is where pipeline
20// triggers will be translated into Buildkite builds.
21//
22// The jetstream consumer also gets a back-reference (via the
23// KnotConsumer interface) so it can dynamically subscribe a new knot —
24// or, conceptually, unsubscribe one — the moment a matching
25// sh.tangled.repo record arrives, without waiting for a tack restart.
26
27import (
28 "context"
29 "encoding/json"
30 "fmt"
31 "log/slog"
32 "time"
33
34 "tangled.org/core/api/tangled"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/eventconsumer/cursor"
37)
38
39// KnotConsumer is the surface area the rest of tack uses to interact
40// with the knot event-stream subscriber. It exists so we can test
41// knot interactions from tests.
42//
43// Implementations must be safe for concurrent use: AddKnot is invoked
44// from the jetstream goroutine while the consumer's worker goroutines
45// are independently processing inbound knot events.
46//
47// The fake at knot_fake.go provides a no-network implementation suitable
48// for use from tests.
49type KnotConsumer interface {
50 // AddKnot subscribes to the given knot's /events websocket if not
51 // already subscribed. Calling with the same knot more than once is
52 // a no-op. An empty knot string is ignored. The supplied context
53 // scopes the dial; cancelling it tears the subscription down.
54 AddKnot(ctx context.Context, knot string)
55
56 // RemoveKnot stops processing events from the given knot. It is the
57 // inverse of AddKnot and must tolerate being called for a knot that
58 // was never added (no-op). An empty knot string is ignored.
59 //
60 // The production implementation is currently a no-op: tangled-core's
61 // eventconsumer does not expose a way to drop an individual source's
62 // websocket. Tracked upstream as
63 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510
64 RemoveKnot(ctx context.Context, knot string)
65}
66
67// knotConsumer is the production KnotConsumer. It wraps
68// eventconsumer.Consumer with the small surface the rest of tack actually
69// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook
70// owned by main.
71//
72// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps
73// callers from importing eventconsumer just to construct a KnotSource,
74// and lets us swap or extend the underlying transport later.
75type knotConsumer struct {
76 c *eventconsumer.Consumer
77 log *slog.Logger
78
79 // cursors is the persistent cursor store the underlying consumer
80 // reads at every (re)connect. We retain a reference here so we
81 // can pre-seed an entry to "now" the first time we ever see a
82 // knot — see seedCursorIfMissing for why.
83 cursors cursor.Store
84
85 // provider dispatches each incoming pipeline trigger to whatever
86 // backend actually runs it (today: the fake provider; tomorrow:
87 // Buildkite). The consumer doesn't care which — it just hands
88 // over the decoded record and lets the provider publish status
89 // records back through its own broker connection.
90 provider Provider
91
92 // store, hostname, and ownerDID are the inputs to the
93 // pre-dispatch authorization check. We hold them on the
94 // consumer (instead of threading them through every call) so
95 // process(), which runs once per inbound knot event on a
96 // worker goroutine, can ask `is this trigger allowed?` against
97 // the persisted Tangled membership state without the rest of
98 // tack having to know it cares.
99 store *store
100 hostname string
101 ownerDID string
102}
103
104// Compile-time interface conformance check.
105var _ KnotConsumer = (*knotConsumer)(nil)
106
107// startKnotConsumer builds a knot event consumer pre-loaded with every
108// knot already known to the store, starts its connection loops in the
109// background, and returns the wrapper. The consumer keeps running until
110// ctx is cancelled.
111//
112// Cursor persistence is backed by tangled-core's SQLite cursor store
113// pointed at the same database file as the rest of tack. Without
114// this, a restart would replay every recent pipeline event from each
115// knot and fire a duplicate Buildkite build for each one. The
116// cursor.SqliteStore opens its own *sql.DB on the file, but
117// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and
118// the `cursors` table the upstream package creates doesn't collide
119// with any of our migrations.
120func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
121 logger := loggerFrom(ctx).With("component", "knotconsumer")
122
123 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
124 if err != nil {
125 return nil, fmt.Errorf("load known knots: %w", err)
126 }
127
128 // Persistent per-source cursor store. Keyed by source.Key() (the
129 // knot hostname for KnotSource), so each knot resumes exactly
130 // where it left off and we don't re-fire builds for events that
131 // were already processed before the previous shutdown.
132 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath)
133 if err != nil {
134 return nil, fmt.Errorf("open knot cursor store: %w", err)
135 }
136
137 kc := &knotConsumer{
138 log: logger,
139 cursors: cursorStore,
140 provider: provider,
141 store: st,
142 hostname: cfg.Hostname,
143 ownerDID: cfg.OwnerDID,
144 }
145
146 ccfg := eventconsumer.NewConsumerConfig()
147 ccfg.Logger = logger
148 ccfg.Dev = cfg.Dev
149 ccfg.ProcessFunc = kc.process
150 ccfg.CursorStore = cursorStore
151 for _, k := range knots {
152 // Pin a brand-new knot's cursor to "now" before the consumer
153 // ever connects, so a fresh tack install doesn't replay every
154 // historical pipeline event the knot has retained and fire a
155 // duplicate Buildkite build for each one.
156 kc.seedCursorIfMissing(k)
157 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
158 logger.Info("seeding knot source", "knot", k)
159 }
160 kc.c = eventconsumer.NewConsumer(*ccfg)
161
162 // Start workers + per-source connection loops. Consumer.Start is
163 // non-blocking; the goroutines it spawns observe ctx for shutdown.
164 kc.c.Start(ctx)
165 logger.Info("knot consumer started", "initial_knots", len(knots))
166
167 return kc, nil
168}
169
170// AddKnot subscribes to a knot we hadn't been watching before. Safe to
171// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the
172// source's Key (the knot hostname), so passing the same knot twice is a
173// no-op.
174func (k *knotConsumer) AddKnot(ctx context.Context, knot string) {
175 if knot == "" {
176 return
177 }
178 // Same first-run protection as in startKnotConsumer: a knot we
179 // have never observed before must not retroactively fire
180 // pipelines for triggers older than the moment we learned about
181 // it. AddSource reads the cursor synchronously when it dials.
182 k.seedCursorIfMissing(knot)
183 k.log.Info("adding knot source", "knot", knot)
184 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
185}
186
187// seedCursorIfMissing writes the current time (as nanoseconds since
188// the unix epoch, the same unit eventconsumer.worker uses when it
189// advances cursors after each message) to the cursor store for knot
190// iff no cursor is already persisted for it.
191//
192// Without this, a brand-new install — or the first time we ever
193// dial a previously-unseen knot — connects to /events with no
194// cursor query parameter, which the knot servers interpret as
195// "stream from the beginning of time." For pipeline records that
196// would fire one Buildkite build per historical trigger the knot
197// still has retained. Pinning the cursor up-front limits us to
198// events that arrive *after* tack learned about the knot.
199//
200// Get returning 0 means "no cursor stored" across all the cursor
201// store implementations we use (memory/sqlite/redis), so it's a
202// safe sentinel to gate the write.
203func (k *knotConsumer) seedCursorIfMissing(knot string) {
204 if k.cursors.Get(knot) != 0 {
205 return
206 }
207 now := time.Now().UnixNano()
208 k.cursors.Set(knot, now)
209 k.log.Info("seeded fresh knot cursor to now",
210 "knot", knot,
211 "cursor", now,
212 )
213}
214
215// RemoveKnot is currently a no-op — see the interface comment for the
216// upstream blocker. We still log at info so operators can see when the
217// reconciliation logic *would* have unsubscribed; once eventconsumer
218// gains a RemoveSource primitive, swap the body for a real call.
219func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
220 if knot == "" {
221 return
222 }
223 k.log.Info("remove knot source requested (no-op until upstream supports it)",
224 "knot", knot,
225 )
226}
227
228// Stop tears down all knot websocket connections and waits for the
229// consumer's goroutines to exit. It must be called exactly once.
230func (k *knotConsumer) Stop() {
231 k.c.Stop()
232}
233
234// process is the ProcessFunc handed to eventconsumer. It runs once per
235// inbound message, on a worker goroutine. For now we only care about
236// pipeline records — everything else is logged at debug and dropped.
237//
238// Returning an error only logs it (the consumer keeps reading); the
239// cursor is advanced before the ProcessFunc runs, so a returned error
240// does *not* cause a replay.
241func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
242 switch msg.Nsid {
243 case tangled.PipelineNSID:
244 var p tangled.Pipeline
245 if err := json.Unmarshal(msg.EventJson, &p); err != nil {
246 k.log.Error("decode pipeline",
247 "err", err,
248 "knot", src.Key(),
249 "rkey", msg.Rkey,
250 )
251 return err
252 }
253
254 // Pull a couple of fields out of the trigger metadata for log
255 // context. They're all optional in the schema, so each one is
256 // guarded — we want a noisy log entry, not a nil-deref.
257 var (
258 triggerKind string
259 repoDid string
260 repoName string
261 )
262 if p.TriggerMetadata != nil {
263 triggerKind = p.TriggerMetadata.Kind
264 if p.TriggerMetadata.Repo != nil {
265 repoDid = p.TriggerMetadata.Repo.Did
266 if p.TriggerMetadata.Repo.Repo != nil {
267 repoName = *p.TriggerMetadata.Repo.Repo
268 }
269 }
270 }
271
272 k.log.Info("pipeline event",
273 "knot", src.Key(),
274 "rkey", msg.Rkey,
275 "trigger", triggerKind,
276 "repo_did", repoDid,
277 "repo", repoName,
278 "workflows", len(p.Workflows),
279 )
280
281 // Authorization gate. The knot's /events stream will hand
282 // us every pipeline record it publishes, including ones
283 // for repos that never opted into us. Being subscribed
284 // to a knot says nothing about which repos on it we
285 // actually serve. Without this check, any DID that can
286 // publish a sh.tangled.pipeline record on a knot we
287 // already watch could trigger CI on our spindle.
288 //
289 // We consult the persisted state mirrored from the AT
290 // Proto firehose: the trigger's repo must have declared
291 // us as its spindle on this knot, and its publisher DID
292 // must be the spindle owner or a member they vouched for
293 // via sh.tangled.spindle.member. See
294 // store.AuthorizePipelineActor for the precise rules.
295 ok, reason, err := k.store.AuthorizePipelineActor(
296 ctx, k.hostname, src.Key(), k.ownerDID,
297 repoDid, repoName,
298 )
299 if err != nil {
300 // SQL/IO failure (not a denial). Log and refuse to
301 // spawn rather than fail-open: a transient store
302 // error during shutdown shouldn't be allowed to
303 // punch a hole in the security gate.
304 k.log.Error("authorize pipeline trigger",
305 "err", err,
306 "knot", src.Key(),
307 "rkey", msg.Rkey,
308 "repo_did", repoDid,
309 "repo", repoName,
310 )
311 return err
312 }
313 if !ok {
314 k.log.Warn("rejecting unauthorized pipeline trigger",
315 "knot", src.Key(),
316 "rkey", msg.Rkey,
317 "repo_did", repoDid,
318 "repo", repoName,
319 "reason", reason,
320 )
321 return nil
322 }
323
324 // Hand the trigger to whichever Provider was configured.
325 // Spawn is non-blocking — it fans out into provider-owned
326 // goroutines so this worker can move on to the next event.
327 // The provider keeps ctx around for shutdown coordination.
328 // repoDid is the actor: the publisher of the originating
329 // sh.tangled.repo record, which AuthorizePipelineActor
330 // just confirmed is allowed to spawn work here.
331 k.provider.Spawn(ctx, src.Key(), msg.Rkey, repoDid, p.TriggerMetadata, p.Workflows)
332
333 default:
334 // Knots may publish other record types over the same stream; we
335 // don't care about them yet. Debug-only so it's available when
336 // chasing "why isn't my pipeline firing" but doesn't drown out
337 // info-level logs.
338 k.log.Debug("ignored knot event",
339 "knot", src.Key(),
340 "nsid", msg.Nsid,
341 "rkey", msg.Rkey,
342 )
343 }
344
345 return nil
346}