Stitch any CI into Tangled
1package main
2
3// Knot event-stream subscriber.
4//
5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that
6// streams JSON-wrapped record events for that knot, including the
7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do
8// *not* come over the AT Proto firehose (jetstream); the knot publishes
9// them itself, so as a spindle we have to dial each knot whose repos have
10// pointed at us.
11//
12// We use tangled-core's `eventconsumer` package, which already handles
13// per-source connection management, retries, ordered processing and
14// cursor tracking. We hand it:
15//
16// 1. The initial set of knots, derived from previously-observed
17// sh.tangled.repo records that named us as their .spindle field.
18// 2. A ProcessFunc that, for now, simply logs every received event.
19// Once the build pipeline is wired up this is where pipeline
20// triggers will be translated into Buildkite builds.
21//
22// The jetstream consumer also gets a back-reference (via the
23// KnotConsumer interface) so it can dynamically subscribe a new knot —
24// or, conceptually, unsubscribe one — the moment a matching
25// sh.tangled.repo record arrives, without waiting for a tack restart.
26
27import (
28 "context"
29 "encoding/json"
30 "fmt"
31 "log/slog"
32
33 "tangled.org/core/api/tangled"
34 "tangled.org/core/eventconsumer"
35 "tangled.org/core/eventconsumer/cursor"
36)
37
38// KnotConsumer is the surface area the rest of tack uses to interact
39// with the knot event-stream subscriber. It exists so we can test
40// knot interactions from tests.
41//
42// Implementations must be safe for concurrent use: AddKnot is invoked
43// from the jetstream goroutine while the consumer's worker goroutines
44// are independently processing inbound knot events.
45//
46// The fake at knot_fake.go provides a no-network implementation suitable
47// for use from tests.
48type KnotConsumer interface {
49 // AddKnot subscribes to the given knot's /events websocket if not
50 // already subscribed. Calling with the same knot more than once is
51 // a no-op. An empty knot string is ignored. The supplied context
52 // scopes the dial; cancelling it tears the subscription down.
53 AddKnot(ctx context.Context, knot string)
54
55 // RemoveKnot stops processing events from the given knot. It is the
56 // inverse of AddKnot and must tolerate being called for a knot that
57 // was never added (no-op). An empty knot string is ignored.
58 //
59 // The production implementation is currently a no-op: tangled-core's
60 // eventconsumer does not expose a way to drop an individual source's
61 // websocket. Tracked upstream as
62 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510
63 RemoveKnot(ctx context.Context, knot string)
64}
65
66// knotConsumer is the production KnotConsumer. It wraps
67// eventconsumer.Consumer with the small surface the rest of tack actually
68// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook
69// owned by main.
70//
71// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps
72// callers from importing eventconsumer just to construct a KnotSource,
73// and lets us swap or extend the underlying transport later.
74type knotConsumer struct {
75 c *eventconsumer.Consumer
76 log *slog.Logger
77
78 // provider dispatches each incoming pipeline trigger to whatever
79 // backend actually runs it (today: the fake provider; tomorrow:
80 // Buildkite). The consumer doesn't care which — it just hands
81 // over the decoded record and lets the provider publish status
82 // records back through its own broker connection.
83 provider Provider
84}
85
86// Compile-time interface conformance check.
87var _ KnotConsumer = (*knotConsumer)(nil)
88
89// startKnotConsumer builds a knot event consumer pre-loaded with every
90// knot already known to the store, starts its connection loops in the
91// background, and returns the wrapper. The consumer keeps running until
92// ctx is cancelled.
93//
94// Cursor persistence is backed by tangled-core's SQLite cursor store
95// pointed at the same database file as the rest of tack. Without
96// this, a restart would replay every recent pipeline event from each
97// knot and fire a duplicate Buildkite build for each one. The
98// cursor.SqliteStore opens its own *sql.DB on the file, but
99// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and
100// the `cursors` table the upstream package creates doesn't collide
101// with any of our migrations.
102func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
103 logger := loggerFrom(ctx).With("component", "knotconsumer")
104
105 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
106 if err != nil {
107 return nil, fmt.Errorf("load known knots: %w", err)
108 }
109
110 // Persistent per-source cursor store. Keyed by source.Key() (the
111 // knot hostname for KnotSource), so each knot resumes exactly
112 // where it left off and we don't re-fire builds for events that
113 // were already processed before the previous shutdown.
114 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath)
115 if err != nil {
116 return nil, fmt.Errorf("open knot cursor store: %w", err)
117 }
118
119 kc := &knotConsumer{log: logger, provider: provider}
120
121 ccfg := eventconsumer.NewConsumerConfig()
122 ccfg.Logger = logger
123 ccfg.Dev = cfg.Dev
124 ccfg.ProcessFunc = kc.process
125 ccfg.CursorStore = cursorStore
126 for _, k := range knots {
127 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
128 logger.Info("seeding knot source", "knot", k)
129 }
130 kc.c = eventconsumer.NewConsumer(*ccfg)
131
132 // Start workers + per-source connection loops. Consumer.Start is
133 // non-blocking; the goroutines it spawns observe ctx for shutdown.
134 kc.c.Start(ctx)
135 logger.Info("knot consumer started", "initial_knots", len(knots))
136
137 return kc, nil
138}
139
140// AddKnot subscribes to a knot we hadn't been watching before. Safe to
141// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the
142// source's Key (the knot hostname), so passing the same knot twice is a
143// no-op.
144func (k *knotConsumer) AddKnot(ctx context.Context, knot string) {
145 if knot == "" {
146 return
147 }
148 k.log.Info("adding knot source", "knot", knot)
149 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
150}
151
152// RemoveKnot is currently a no-op — see the interface comment for the
153// upstream blocker. We still log at info so operators can see when the
154// reconciliation logic *would* have unsubscribed; once eventconsumer
155// gains a RemoveSource primitive, swap the body for a real call.
156func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
157 if knot == "" {
158 return
159 }
160 k.log.Info("remove knot source requested (no-op until upstream supports it)",
161 "knot", knot,
162 )
163}
164
165// Stop tears down all knot websocket connections and waits for the
166// consumer's goroutines to exit. It must be called exactly once.
167func (k *knotConsumer) Stop() {
168 k.c.Stop()
169}
170
171// process is the ProcessFunc handed to eventconsumer. It runs once per
172// inbound message, on a worker goroutine. For now we only care about
173// pipeline records — everything else is logged at debug and dropped.
174//
175// Returning an error only logs it (the consumer keeps reading); the
176// cursor is advanced before the ProcessFunc runs, so a returned error
177// does *not* cause a replay.
178func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
179 switch msg.Nsid {
180 case tangled.PipelineNSID:
181 var p tangled.Pipeline
182 if err := json.Unmarshal(msg.EventJson, &p); err != nil {
183 k.log.Error("decode pipeline",
184 "err", err,
185 "knot", src.Key(),
186 "rkey", msg.Rkey,
187 )
188 return err
189 }
190
191 // Pull a couple of fields out of the trigger metadata for log
192 // context. They're all optional in the schema, so each one is
193 // guarded — we want a noisy log entry, not a nil-deref.
194 var (
195 triggerKind string
196 repoDid string
197 repoName string
198 )
199 if p.TriggerMetadata != nil {
200 triggerKind = p.TriggerMetadata.Kind
201 if p.TriggerMetadata.Repo != nil {
202 repoDid = p.TriggerMetadata.Repo.Did
203 if p.TriggerMetadata.Repo.Repo != nil {
204 repoName = *p.TriggerMetadata.Repo.Repo
205 }
206 }
207 }
208
209 k.log.Info("pipeline event",
210 "knot", src.Key(),
211 "rkey", msg.Rkey,
212 "trigger", triggerKind,
213 "repo_did", repoDid,
214 "repo", repoName,
215 "workflows", len(p.Workflows),
216 )
217
218 // Hand the trigger to whichever Provider was configured.
219 // Spawn is non-blocking — it fans out into provider-owned
220 // goroutines so this worker can move on to the next event.
221 // The provider keeps ctx around for shutdown coordination.
222 k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.TriggerMetadata, p.Workflows)
223
224 default:
225 // Knots may publish other record types over the same stream; we
226 // don't care about them yet. Debug-only so it's available when
227 // chasing "why isn't my pipeline firing" but doesn't drown out
228 // info-level logs.
229 k.log.Debug("ignored knot event",
230 "knot", src.Key(),
231 "nsid", msg.Nsid,
232 "rkey", msg.Rkey,
233 )
234 }
235
236 return nil
237}