Stitch any CI into Tangled
1package main
2
3// Knot event-stream subscriber.
4//
5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that
6// streams JSON-wrapped record events for that knot, including the
7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do
8// *not* come over the AT Proto firehose (jetstream); the knot publishes
9// them itself, so as a spindle we have to dial each knot whose repos have
10// pointed at us.
11//
12// We use tangled-core's `eventconsumer` package, which already handles
13// per-source connection management, retries, ordered processing and
14// cursor tracking. We hand it:
15//
16// 1. The initial set of knots, derived from previously-observed
17// sh.tangled.repo records that named us as their .spindle field.
18// 2. A ProcessFunc that, for now, simply logs every received event.
19// Once the build pipeline is wired up this is where pipeline
20// triggers will be translated into Buildkite builds.
21//
22// The jetstream consumer also gets a back-reference (via the
23// KnotConsumer interface) so it can dynamically subscribe a new knot —
24// or, conceptually, unsubscribe one — the moment a matching
25// sh.tangled.repo record arrives, without waiting for a tack restart.
26
27import (
28 "context"
29 "encoding/json"
30 "fmt"
31 "log/slog"
32
33 "tangled.org/core/api/tangled"
34 "tangled.org/core/eventconsumer"
35)
36
37// KnotConsumer is the surface area the rest of tack uses to interact
38// with the knot event-stream subscriber. It exists so we can test
39// knot interactions from tests.
40//
41// Implementations must be safe for concurrent use: AddKnot is invoked
42// from the jetstream goroutine while the consumer's worker goroutines
43// are independently processing inbound knot events.
44//
45// The fake at knot_fake.go provides a no-network implementation suitable
46// for use from tests.
47type KnotConsumer interface {
48 // AddKnot subscribes to the given knot's /events websocket if not
49 // already subscribed. Calling with the same knot more than once is
50 // a no-op. An empty knot string is ignored. The supplied context
51 // scopes the dial; cancelling it tears the subscription down.
52 AddKnot(ctx context.Context, knot string)
53
54 // RemoveKnot stops processing events from the given knot. It is the
55 // inverse of AddKnot and must tolerate being called for a knot that
56 // was never added (no-op). An empty knot string is ignored.
57 //
58 // The production implementation is currently a no-op: tangled-core's
59 // eventconsumer does not expose a way to drop an individual source's
60 // websocket. Tracked upstream as
61 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510
62 RemoveKnot(ctx context.Context, knot string)
63}
64
65// knotConsumer is the production KnotConsumer. It wraps
66// eventconsumer.Consumer with the small surface the rest of tack actually
67// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook
68// owned by main.
69//
70// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps
71// callers from importing eventconsumer just to construct a KnotSource,
72// and lets us swap or extend the underlying transport later.
73type knotConsumer struct {
74 c *eventconsumer.Consumer
75 log *slog.Logger
76}
77
78// Compile-time interface conformance check.
79var _ KnotConsumer = (*knotConsumer)(nil)
80
81// startKnotConsumer builds a knot event consumer pre-loaded with every
82// knot already known to the store, starts its connection loops in the
83// background, and returns the wrapper. The consumer keeps running until
84// ctx is cancelled.
85//
86// Cursor persistence is intentionally in-memory for now: we only log
87// events, so re-receiving a few seconds of pipeline triggers after a
88// restart is harmless. When we start translating triggers into real
89// Buildkite builds, this should switch to a SQLite-backed cursor store
90// to avoid duplicate builds.
91func startKnotConsumer(ctx context.Context, cfg config, st *store) (*knotConsumer, error) {
92 logger := loggerFrom(ctx).With("component", "knotconsumer")
93
94 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
95 if err != nil {
96 return nil, fmt.Errorf("load known knots: %w", err)
97 }
98
99 kc := &knotConsumer{log: logger}
100
101 ccfg := eventconsumer.NewConsumerConfig()
102 ccfg.Logger = logger
103 ccfg.Dev = cfg.Dev
104 ccfg.ProcessFunc = kc.process
105 for _, k := range knots {
106 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
107 logger.Info("seeding knot source", "knot", k)
108 }
109 kc.c = eventconsumer.NewConsumer(*ccfg)
110
111 // Start workers + per-source connection loops. Consumer.Start is
112 // non-blocking; the goroutines it spawns observe ctx for shutdown.
113 kc.c.Start(ctx)
114 logger.Info("knot consumer started", "initial_knots", len(knots))
115
116 return kc, nil
117}
118
119// AddKnot subscribes to a knot we hadn't been watching before. Safe to
120// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the
121// source's Key (the knot hostname), so passing the same knot twice is a
122// no-op.
123func (k *knotConsumer) AddKnot(ctx context.Context, knot string) {
124 if knot == "" {
125 return
126 }
127 k.log.Info("adding knot source", "knot", knot)
128 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
129}
130
131// RemoveKnot is currently a no-op — see the interface comment for the
132// upstream blocker. We still log at info so operators can see when the
133// reconciliation logic *would* have unsubscribed; once eventconsumer
134// gains a RemoveSource primitive, swap the body for a real call.
135func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
136 if knot == "" {
137 return
138 }
139 k.log.Info("remove knot source requested (no-op until upstream supports it)",
140 "knot", knot,
141 )
142}
143
144// Stop tears down all knot websocket connections and waits for the
145// consumer's goroutines to exit. It must be called exactly once.
146func (k *knotConsumer) Stop() {
147 k.c.Stop()
148}
149
150// process is the ProcessFunc handed to eventconsumer. It runs once per
151// inbound message, on a worker goroutine. For now we only care about
152// pipeline records — everything else is logged at debug and dropped.
153//
154// Returning an error only logs it (the consumer keeps reading); the
155// cursor is advanced before the ProcessFunc runs, so a returned error
156// does *not* cause a replay.
157func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
158 switch msg.Nsid {
159 case tangled.PipelineNSID:
160 var p tangled.Pipeline
161 if err := json.Unmarshal(msg.EventJson, &p); err != nil {
162 k.log.Error("decode pipeline",
163 "err", err,
164 "knot", src.Key(),
165 "rkey", msg.Rkey,
166 )
167 return err
168 }
169
170 // Pull a couple of fields out of the trigger metadata for log
171 // context. They're all optional in the schema, so each one is
172 // guarded — we want a noisy log entry, not a nil-deref.
173 var (
174 triggerKind string
175 repoDid string
176 repoName string
177 )
178 if p.TriggerMetadata != nil {
179 triggerKind = p.TriggerMetadata.Kind
180 if p.TriggerMetadata.Repo != nil {
181 repoDid = p.TriggerMetadata.Repo.Did
182 if p.TriggerMetadata.Repo.Repo != nil {
183 repoName = *p.TriggerMetadata.Repo.Repo
184 }
185 }
186 }
187
188 k.log.Info("pipeline event",
189 "knot", src.Key(),
190 "rkey", msg.Rkey,
191 "trigger", triggerKind,
192 "repo_did", repoDid,
193 "repo", repoName,
194 "workflows", len(p.Workflows),
195 )
196
197 default:
198 // Knots may publish other record types over the same stream; we
199 // don't care about them yet. Debug-only so it's available when
200 // chasing "why isn't my pipeline firing" but doesn't drown out
201 // info-level logs.
202 k.log.Debug("ignored knot event",
203 "knot", src.Key(),
204 "nsid", msg.Nsid,
205 "rkey", msg.Rkey,
206 )
207 }
208
209 return nil
210}