Stitch any CI into Tangled
1package main
2
3// Knot event-stream subscriber.
4//
5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that
6// streams JSON-wrapped record events for that knot, including the
7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do
8// *not* come over the AT Proto firehose (jetstream); the knot publishes
9// them itself, so as a spindle we have to dial each knot whose repos have
10// pointed at us.
11//
12// We use tangled-core's `eventconsumer` package, which already handles
13// per-source connection management, retries, ordered processing and
14// cursor tracking. We hand it:
15//
16// 1. The initial set of knots, derived from previously-observed
17// sh.tangled.repo records that named us as their .spindle field.
18// 2. A ProcessFunc that, for now, simply logs every received event.
19// Once the build pipeline is wired up this is where pipeline
20// triggers will be translated into Buildkite builds.
21//
22// The jetstream consumer also gets a back-reference (via the
23// KnotConsumer interface) so it can dynamically subscribe a new knot —
24// or, conceptually, unsubscribe one — the moment a matching
25// sh.tangled.repo record arrives, without waiting for a tack restart.
26
27import (
28 "context"
29 "encoding/json"
30 "fmt"
31 "log/slog"
32
33 "tangled.org/core/api/tangled"
34 "tangled.org/core/eventconsumer"
35)
36
37// KnotConsumer is the surface area the rest of tack uses to interact
38// with the knot event-stream subscriber. It exists so we can test
39// knot interactions from tests.
40//
41// Implementations must be safe for concurrent use: AddKnot is invoked
42// from the jetstream goroutine while the consumer's worker goroutines
43// are independently processing inbound knot events.
44//
45// The fake at knot_fake.go provides a no-network implementation suitable
46// for use from tests.
47type KnotConsumer interface {
48 // AddKnot subscribes to the given knot's /events websocket if not
49 // already subscribed. Calling with the same knot more than once is
50 // a no-op. An empty knot string is ignored. The supplied context
51 // scopes the dial; cancelling it tears the subscription down.
52 AddKnot(ctx context.Context, knot string)
53
54 // RemoveKnot stops processing events from the given knot. It is the
55 // inverse of AddKnot and must tolerate being called for a knot that
56 // was never added (no-op). An empty knot string is ignored.
57 //
58 // The production implementation is currently a no-op: tangled-core's
59 // eventconsumer does not expose a way to drop an individual source's
60 // websocket. Tracked upstream as
61 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510
62 RemoveKnot(ctx context.Context, knot string)
63}
64
65// knotConsumer is the production KnotConsumer. It wraps
66// eventconsumer.Consumer with the small surface the rest of tack actually
67// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook
68// owned by main.
69//
70// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps
71// callers from importing eventconsumer just to construct a KnotSource,
72// and lets us swap or extend the underlying transport later.
73type knotConsumer struct {
74 c *eventconsumer.Consumer
75 log *slog.Logger
76
77 // provider dispatches each incoming pipeline trigger to whatever
78 // backend actually runs it (today: the fake provider; tomorrow:
79 // Buildkite). The consumer doesn't care which — it just hands
80 // over the decoded record and lets the provider publish status
81 // records back through its own broker connection.
82 provider Provider
83}
84
85// Compile-time interface conformance check.
86var _ KnotConsumer = (*knotConsumer)(nil)
87
88// startKnotConsumer builds a knot event consumer pre-loaded with every
89// knot already known to the store, starts its connection loops in the
90// background, and returns the wrapper. The consumer keeps running until
91// ctx is cancelled.
92//
93// Cursor persistence is intentionally in-memory for now: we only log
94// events, so re-receiving a few seconds of pipeline triggers after a
95// restart is harmless. When we start translating triggers into real
96// Buildkite builds, this should switch to a SQLite-backed cursor store
97// to avoid duplicate builds.
98func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
99 logger := loggerFrom(ctx).With("component", "knotconsumer")
100
101 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
102 if err != nil {
103 return nil, fmt.Errorf("load known knots: %w", err)
104 }
105
106 kc := &knotConsumer{log: logger, provider: provider}
107
108 ccfg := eventconsumer.NewConsumerConfig()
109 ccfg.Logger = logger
110 ccfg.Dev = cfg.Dev
111 ccfg.ProcessFunc = kc.process
112 for _, k := range knots {
113 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
114 logger.Info("seeding knot source", "knot", k)
115 }
116 kc.c = eventconsumer.NewConsumer(*ccfg)
117
118 // Start workers + per-source connection loops. Consumer.Start is
119 // non-blocking; the goroutines it spawns observe ctx for shutdown.
120 kc.c.Start(ctx)
121 logger.Info("knot consumer started", "initial_knots", len(knots))
122
123 return kc, nil
124}
125
126// AddKnot subscribes to a knot we hadn't been watching before. Safe to
127// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the
128// source's Key (the knot hostname), so passing the same knot twice is a
129// no-op.
130func (k *knotConsumer) AddKnot(ctx context.Context, knot string) {
131 if knot == "" {
132 return
133 }
134 k.log.Info("adding knot source", "knot", knot)
135 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
136}
137
138// RemoveKnot is currently a no-op — see the interface comment for the
139// upstream blocker. We still log at info so operators can see when the
140// reconciliation logic *would* have unsubscribed; once eventconsumer
141// gains a RemoveSource primitive, swap the body for a real call.
142func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
143 if knot == "" {
144 return
145 }
146 k.log.Info("remove knot source requested (no-op until upstream supports it)",
147 "knot", knot,
148 )
149}
150
151// Stop tears down all knot websocket connections and waits for the
152// consumer's goroutines to exit. It must be called exactly once.
153func (k *knotConsumer) Stop() {
154 k.c.Stop()
155}
156
157// process is the ProcessFunc handed to eventconsumer. It runs once per
158// inbound message, on a worker goroutine. For now we only care about
159// pipeline records — everything else is logged at debug and dropped.
160//
161// Returning an error only logs it (the consumer keeps reading); the
162// cursor is advanced before the ProcessFunc runs, so a returned error
163// does *not* cause a replay.
164func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
165 switch msg.Nsid {
166 case tangled.PipelineNSID:
167 var p tangled.Pipeline
168 if err := json.Unmarshal(msg.EventJson, &p); err != nil {
169 k.log.Error("decode pipeline",
170 "err", err,
171 "knot", src.Key(),
172 "rkey", msg.Rkey,
173 )
174 return err
175 }
176
177 // Pull a couple of fields out of the trigger metadata for log
178 // context. They're all optional in the schema, so each one is
179 // guarded — we want a noisy log entry, not a nil-deref.
180 var (
181 triggerKind string
182 repoDid string
183 repoName string
184 )
185 if p.TriggerMetadata != nil {
186 triggerKind = p.TriggerMetadata.Kind
187 if p.TriggerMetadata.Repo != nil {
188 repoDid = p.TriggerMetadata.Repo.Did
189 if p.TriggerMetadata.Repo.Repo != nil {
190 repoName = *p.TriggerMetadata.Repo.Repo
191 }
192 }
193 }
194
195 k.log.Info("pipeline event",
196 "knot", src.Key(),
197 "rkey", msg.Rkey,
198 "trigger", triggerKind,
199 "repo_did", repoDid,
200 "repo", repoName,
201 "workflows", len(p.Workflows),
202 )
203
204 // Hand the trigger to whichever Provider was configured.
205 // Spawn is non-blocking — it fans out into provider-owned
206 // goroutines so this worker can move on to the next event.
207 // The provider keeps ctx around for shutdown coordination.
208 k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.TriggerMetadata, p.Workflows)
209
210 default:
211 // Knots may publish other record types over the same stream; we
212 // don't care about them yet. Debug-only so it's available when
213 // chasing "why isn't my pipeline firing" but doesn't drown out
214 // info-level logs.
215 k.log.Debug("ignored knot event",
216 "knot", src.Key(),
217 "nsid", msg.Nsid,
218 "rkey", msg.Rkey,
219 )
220 }
221
222 return nil
223}