Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Knot event-stream subscriber. 4// 5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6// streams JSON-wrapped record events for that knot, including the 7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8// *not* come over the AT Proto firehose (jetstream); the knot publishes 9// them itself, so as a spindle we have to dial each knot whose repos have 10// pointed at us. 11// 12// We use tangled-core's `eventconsumer` package, which already handles 13// per-source connection management, retries, ordered processing and 14// cursor tracking. We hand it: 15// 16// 1. The initial set of knots, derived from previously-observed 17// sh.tangled.repo records that named us as their .spindle field. 18// 2. A ProcessFunc that, for now, simply logs every received event. 19// Once the build pipeline is wired up this is where pipeline 20// triggers will be translated into Buildkite builds. 21// 22// The jetstream consumer also gets a back-reference (via the 23// KnotConsumer interface) so it can dynamically subscribe a new knot — 24// or, conceptually, unsubscribe one — the moment a matching 25// sh.tangled.repo record arrives, without waiting for a tack restart. 26 27import ( 28 "context" 29 "encoding/json" 30 "fmt" 31 "log/slog" 32 33 "tangled.org/core/api/tangled" 34 "tangled.org/core/eventconsumer" 35) 36 37// KnotConsumer is the surface area the rest of tack uses to interact 38// with the knot event-stream subscriber. It exists so we can test 39// knot interactions from tests. 40// 41// Implementations must be safe for concurrent use: AddKnot is invoked 42// from the jetstream goroutine while the consumer's worker goroutines 43// are independently processing inbound knot events. 44// 45// The fake at knot_fake.go provides a no-network implementation suitable 46// for use from tests. 47type KnotConsumer interface { 48 // AddKnot subscribes to the given knot's /events websocket if not 49 // already subscribed. Calling with the same knot more than once is 50 // a no-op. An empty knot string is ignored. The supplied context 51 // scopes the dial; cancelling it tears the subscription down. 52 AddKnot(ctx context.Context, knot string) 53 54 // RemoveKnot stops processing events from the given knot. It is the 55 // inverse of AddKnot and must tolerate being called for a knot that 56 // was never added (no-op). An empty knot string is ignored. 57 // 58 // The production implementation is currently a no-op: tangled-core's 59 // eventconsumer does not expose a way to drop an individual source's 60 // websocket. Tracked upstream as 61 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510 62 RemoveKnot(ctx context.Context, knot string) 63} 64 65// knotConsumer is the production KnotConsumer. It wraps 66// eventconsumer.Consumer with the small surface the rest of tack actually 67// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 68// owned by main. 69// 70// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 71// callers from importing eventconsumer just to construct a KnotSource, 72// and lets us swap or extend the underlying transport later. 73type knotConsumer struct { 74 c *eventconsumer.Consumer 75 log *slog.Logger 76 77 // provider dispatches each incoming pipeline trigger to whatever 78 // backend actually runs it (today: the fake provider; tomorrow: 79 // Buildkite). The consumer doesn't care which — it just hands 80 // over the decoded record and lets the provider publish status 81 // records back through its own broker connection. 82 provider Provider 83} 84 85// Compile-time interface conformance check. 86var _ KnotConsumer = (*knotConsumer)(nil) 87 88// startKnotConsumer builds a knot event consumer pre-loaded with every 89// knot already known to the store, starts its connection loops in the 90// background, and returns the wrapper. The consumer keeps running until 91// ctx is cancelled. 92// 93// Cursor persistence is intentionally in-memory for now: we only log 94// events, so re-receiving a few seconds of pipeline triggers after a 95// restart is harmless. When we start translating triggers into real 96// Buildkite builds, this should switch to a SQLite-backed cursor store 97// to avoid duplicate builds. 98func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) { 99 logger := loggerFrom(ctx).With("component", "knotconsumer") 100 101 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 102 if err != nil { 103 return nil, fmt.Errorf("load known knots: %w", err) 104 } 105 106 kc := &knotConsumer{log: logger, provider: provider} 107 108 ccfg := eventconsumer.NewConsumerConfig() 109 ccfg.Logger = logger 110 ccfg.Dev = cfg.Dev 111 ccfg.ProcessFunc = kc.process 112 for _, k := range knots { 113 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 114 logger.Info("seeding knot source", "knot", k) 115 } 116 kc.c = eventconsumer.NewConsumer(*ccfg) 117 118 // Start workers + per-source connection loops. Consumer.Start is 119 // non-blocking; the goroutines it spawns observe ctx for shutdown. 120 kc.c.Start(ctx) 121 logger.Info("knot consumer started", "initial_knots", len(knots)) 122 123 return kc, nil 124} 125 126// AddKnot subscribes to a knot we hadn't been watching before. Safe to 127// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 128// source's Key (the knot hostname), so passing the same knot twice is a 129// no-op. 130func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 131 if knot == "" { 132 return 133 } 134 k.log.Info("adding knot source", "knot", knot) 135 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 136} 137 138// RemoveKnot is currently a no-op — see the interface comment for the 139// upstream blocker. We still log at info so operators can see when the 140// reconciliation logic *would* have unsubscribed; once eventconsumer 141// gains a RemoveSource primitive, swap the body for a real call. 142func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) { 143 if knot == "" { 144 return 145 } 146 k.log.Info("remove knot source requested (no-op until upstream supports it)", 147 "knot", knot, 148 ) 149} 150 151// Stop tears down all knot websocket connections and waits for the 152// consumer's goroutines to exit. It must be called exactly once. 153func (k *knotConsumer) Stop() { 154 k.c.Stop() 155} 156 157// process is the ProcessFunc handed to eventconsumer. It runs once per 158// inbound message, on a worker goroutine. For now we only care about 159// pipeline records — everything else is logged at debug and dropped. 160// 161// Returning an error only logs it (the consumer keeps reading); the 162// cursor is advanced before the ProcessFunc runs, so a returned error 163// does *not* cause a replay. 164func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 165 switch msg.Nsid { 166 case tangled.PipelineNSID: 167 var p tangled.Pipeline 168 if err := json.Unmarshal(msg.EventJson, &p); err != nil { 169 k.log.Error("decode pipeline", 170 "err", err, 171 "knot", src.Key(), 172 "rkey", msg.Rkey, 173 ) 174 return err 175 } 176 177 // Pull a couple of fields out of the trigger metadata for log 178 // context. They're all optional in the schema, so each one is 179 // guarded — we want a noisy log entry, not a nil-deref. 180 var ( 181 triggerKind string 182 repoDid string 183 repoName string 184 ) 185 if p.TriggerMetadata != nil { 186 triggerKind = p.TriggerMetadata.Kind 187 if p.TriggerMetadata.Repo != nil { 188 repoDid = p.TriggerMetadata.Repo.Did 189 if p.TriggerMetadata.Repo.Repo != nil { 190 repoName = *p.TriggerMetadata.Repo.Repo 191 } 192 } 193 } 194 195 k.log.Info("pipeline event", 196 "knot", src.Key(), 197 "rkey", msg.Rkey, 198 "trigger", triggerKind, 199 "repo_did", repoDid, 200 "repo", repoName, 201 "workflows", len(p.Workflows), 202 ) 203 204 // Hand the trigger to whichever Provider was configured. 205 // Spawn is non-blocking — it fans out into provider-owned 206 // goroutines so this worker can move on to the next event. 207 // The provider keeps ctx around for shutdown coordination. 208 k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.Workflows) 209 210 default: 211 // Knots may publish other record types over the same stream; we 212 // don't care about them yet. Debug-only so it's available when 213 // chasing "why isn't my pipeline firing" but doesn't drown out 214 // info-level logs. 215 k.log.Debug("ignored knot event", 216 "knot", src.Key(), 217 "nsid", msg.Nsid, 218 "rkey", msg.Rkey, 219 ) 220 } 221 222 return nil 223}