Stitch any CI into Tangled
1package main
2
3// Knot event-stream subscriber.
4//
5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that
6// streams JSON-wrapped record events for that knot, including the
7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do
8// *not* come over the AT Proto firehose (jetstream); the knot publishes
9// them itself, so as a spindle we have to dial each knot whose repos have
10// pointed at us.
11//
12// We use tangled-core's `eventconsumer` package, which already handles
13// per-source connection management, retries, ordered processing and
14// cursor tracking. We hand it:
15//
16// 1. The initial set of knots, derived from previously-observed
17// sh.tangled.repo records that named us as their .spindle field.
18// 2. A ProcessFunc that, for now, simply logs every received event.
19// Once the build pipeline is wired up this is where pipeline
20// triggers will be translated into Buildkite builds.
21//
22// The jetstream consumer also gets a back-reference (via the
23// KnotConsumer interface) so it can dynamically subscribe a new knot —
24// or, conceptually, unsubscribe one — the moment a matching
25// sh.tangled.repo record arrives, without waiting for a tack restart.
26
27import (
28 "context"
29 "encoding/json"
30 "fmt"
31 "log/slog"
32 "time"
33
34 "tangled.org/core/api/tangled"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/eventconsumer/cursor"
37)
38
39// KnotConsumer is the surface area the rest of tack uses to interact
40// with the knot event-stream subscriber. It exists so we can test
41// knot interactions from tests.
42//
43// Implementations must be safe for concurrent use: AddKnot is invoked
44// from the jetstream goroutine while the consumer's worker goroutines
45// are independently processing inbound knot events.
46//
47// The fake at knot_fake.go provides a no-network implementation suitable
48// for use from tests.
49type KnotConsumer interface {
50 // AddKnot subscribes to the given knot's /events websocket if not
51 // already subscribed. Calling with the same knot more than once is
52 // a no-op. An empty knot string is ignored. The supplied context
53 // scopes the dial; cancelling it tears the subscription down.
54 AddKnot(ctx context.Context, knot string)
55
56 // RemoveKnot stops processing events from the given knot. It is the
57 // inverse of AddKnot and must tolerate being called for a knot that
58 // was never added (no-op). An empty knot string is ignored.
59 //
60 // The production implementation is currently a no-op: tangled-core's
61 // eventconsumer does not expose a way to drop an individual source's
62 // websocket. Tracked upstream as
63 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510
64 RemoveKnot(ctx context.Context, knot string)
65}
66
67// knotConsumer is the production KnotConsumer. It wraps
68// eventconsumer.Consumer with the small surface the rest of tack actually
69// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook
70// owned by main.
71//
72// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps
73// callers from importing eventconsumer just to construct a KnotSource,
74// and lets us swap or extend the underlying transport later.
75type knotConsumer struct {
76 c *eventconsumer.Consumer
77 log *slog.Logger
78
79 // cursors is the persistent cursor store the underlying consumer
80 // reads at every (re)connect. We retain a reference here so we
81 // can pre-seed an entry to "now" the first time we ever see a
82 // knot — see seedCursorIfMissing for why.
83 cursors cursor.Store
84
85 // provider dispatches each incoming pipeline trigger to whatever
86 // backend actually runs it (today: the fake provider; tomorrow:
87 // Buildkite). The consumer doesn't care which — it just hands
88 // over the decoded record and lets the provider publish status
89 // records back through its own broker connection.
90 provider Provider
91}
92
93// Compile-time interface conformance check.
94var _ KnotConsumer = (*knotConsumer)(nil)
95
96// startKnotConsumer builds a knot event consumer pre-loaded with every
97// knot already known to the store, starts its connection loops in the
98// background, and returns the wrapper. The consumer keeps running until
99// ctx is cancelled.
100//
101// Cursor persistence is backed by tangled-core's SQLite cursor store
102// pointed at the same database file as the rest of tack. Without
103// this, a restart would replay every recent pipeline event from each
104// knot and fire a duplicate Buildkite build for each one. The
105// cursor.SqliteStore opens its own *sql.DB on the file, but
106// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and
107// the `cursors` table the upstream package creates doesn't collide
108// with any of our migrations.
109func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
110 logger := loggerFrom(ctx).With("component", "knotconsumer")
111
112 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
113 if err != nil {
114 return nil, fmt.Errorf("load known knots: %w", err)
115 }
116
117 // Persistent per-source cursor store. Keyed by source.Key() (the
118 // knot hostname for KnotSource), so each knot resumes exactly
119 // where it left off and we don't re-fire builds for events that
120 // were already processed before the previous shutdown.
121 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath)
122 if err != nil {
123 return nil, fmt.Errorf("open knot cursor store: %w", err)
124 }
125
126 kc := &knotConsumer{log: logger, cursors: cursorStore, provider: provider}
127
128 ccfg := eventconsumer.NewConsumerConfig()
129 ccfg.Logger = logger
130 ccfg.Dev = cfg.Dev
131 ccfg.ProcessFunc = kc.process
132 ccfg.CursorStore = cursorStore
133 for _, k := range knots {
134 // Pin a brand-new knot's cursor to "now" before the consumer
135 // ever connects, so a fresh tack install doesn't replay every
136 // historical pipeline event the knot has retained and fire a
137 // duplicate Buildkite build for each one.
138 kc.seedCursorIfMissing(k)
139 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
140 logger.Info("seeding knot source", "knot", k)
141 }
142 kc.c = eventconsumer.NewConsumer(*ccfg)
143
144 // Start workers + per-source connection loops. Consumer.Start is
145 // non-blocking; the goroutines it spawns observe ctx for shutdown.
146 kc.c.Start(ctx)
147 logger.Info("knot consumer started", "initial_knots", len(knots))
148
149 return kc, nil
150}
151
152// AddKnot subscribes to a knot we hadn't been watching before. Safe to
153// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the
154// source's Key (the knot hostname), so passing the same knot twice is a
155// no-op.
156func (k *knotConsumer) AddKnot(ctx context.Context, knot string) {
157 if knot == "" {
158 return
159 }
160 // Same first-run protection as in startKnotConsumer: a knot we
161 // have never observed before must not retroactively fire
162 // pipelines for triggers older than the moment we learned about
163 // it. AddSource reads the cursor synchronously when it dials.
164 k.seedCursorIfMissing(knot)
165 k.log.Info("adding knot source", "knot", knot)
166 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
167}
168
169// seedCursorIfMissing writes the current time (as nanoseconds since
170// the unix epoch, the same unit eventconsumer.worker uses when it
171// advances cursors after each message) to the cursor store for knot
172// iff no cursor is already persisted for it.
173//
174// Without this, a brand-new install — or the first time we ever
175// dial a previously-unseen knot — connects to /events with no
176// cursor query parameter, which the knot servers interpret as
177// "stream from the beginning of time." For pipeline records that
178// would fire one Buildkite build per historical trigger the knot
179// still has retained. Pinning the cursor up-front limits us to
180// events that arrive *after* tack learned about the knot.
181//
182// Get returning 0 means "no cursor stored" across all the cursor
183// store implementations we use (memory/sqlite/redis), so it's a
184// safe sentinel to gate the write.
185func (k *knotConsumer) seedCursorIfMissing(knot string) {
186 if k.cursors.Get(knot) != 0 {
187 return
188 }
189 now := time.Now().UnixNano()
190 k.cursors.Set(knot, now)
191 k.log.Info("seeded fresh knot cursor to now",
192 "knot", knot,
193 "cursor", now,
194 )
195}
196
197// RemoveKnot is currently a no-op — see the interface comment for the
198// upstream blocker. We still log at info so operators can see when the
199// reconciliation logic *would* have unsubscribed; once eventconsumer
200// gains a RemoveSource primitive, swap the body for a real call.
201func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
202 if knot == "" {
203 return
204 }
205 k.log.Info("remove knot source requested (no-op until upstream supports it)",
206 "knot", knot,
207 )
208}
209
210// Stop tears down all knot websocket connections and waits for the
211// consumer's goroutines to exit. It must be called exactly once.
212func (k *knotConsumer) Stop() {
213 k.c.Stop()
214}
215
216// process is the ProcessFunc handed to eventconsumer. It runs once per
217// inbound message, on a worker goroutine. For now we only care about
218// pipeline records — everything else is logged at debug and dropped.
219//
220// Returning an error only logs it (the consumer keeps reading); the
221// cursor is advanced before the ProcessFunc runs, so a returned error
222// does *not* cause a replay.
223func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
224 switch msg.Nsid {
225 case tangled.PipelineNSID:
226 var p tangled.Pipeline
227 if err := json.Unmarshal(msg.EventJson, &p); err != nil {
228 k.log.Error("decode pipeline",
229 "err", err,
230 "knot", src.Key(),
231 "rkey", msg.Rkey,
232 )
233 return err
234 }
235
236 // Pull a couple of fields out of the trigger metadata for log
237 // context. They're all optional in the schema, so each one is
238 // guarded — we want a noisy log entry, not a nil-deref.
239 var (
240 triggerKind string
241 repoDid string
242 repoName string
243 )
244 if p.TriggerMetadata != nil {
245 triggerKind = p.TriggerMetadata.Kind
246 if p.TriggerMetadata.Repo != nil {
247 repoDid = p.TriggerMetadata.Repo.Did
248 if p.TriggerMetadata.Repo.Repo != nil {
249 repoName = *p.TriggerMetadata.Repo.Repo
250 }
251 }
252 }
253
254 k.log.Info("pipeline event",
255 "knot", src.Key(),
256 "rkey", msg.Rkey,
257 "trigger", triggerKind,
258 "repo_did", repoDid,
259 "repo", repoName,
260 "workflows", len(p.Workflows),
261 )
262
263 // Hand the trigger to whichever Provider was configured.
264 // Spawn is non-blocking — it fans out into provider-owned
265 // goroutines so this worker can move on to the next event.
266 // The provider keeps ctx around for shutdown coordination.
267 k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.TriggerMetadata, p.Workflows)
268
269 default:
270 // Knots may publish other record types over the same stream; we
271 // don't care about them yet. Debug-only so it's available when
272 // chasing "why isn't my pipeline firing" but doesn't drown out
273 // info-level logs.
274 k.log.Debug("ignored knot event",
275 "knot", src.Key(),
276 "nsid", msg.Nsid,
277 "rkey", msg.Rkey,
278 )
279 }
280
281 return nil
282}