Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Knot event-stream subscriber. 4// 5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6// streams JSON-wrapped record events for that knot, including the 7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8// *not* come over the AT Proto firehose (jetstream); the knot publishes 9// them itself, so as a spindle we have to dial each knot whose repos have 10// pointed at us. 11// 12// We use tangled-core's `eventconsumer` package, which already handles 13// per-source connection management, retries, ordered processing and 14// cursor tracking. We hand it: 15// 16// 1. The initial set of knots, derived from previously-observed 17// sh.tangled.repo records that named us as their .spindle field. 18// 2. A ProcessFunc that, for now, simply logs every received event. 19// Once the build pipeline is wired up this is where pipeline 20// triggers will be translated into Buildkite builds. 21// 22// The jetstream consumer also gets a back-reference (via the 23// KnotConsumer interface) so it can dynamically subscribe a new knot — 24// or, conceptually, unsubscribe one — the moment a matching 25// sh.tangled.repo record arrives, without waiting for a tack restart. 26 27import ( 28 "context" 29 "encoding/json" 30 "fmt" 31 "log/slog" 32 33 "tangled.org/core/api/tangled" 34 "tangled.org/core/eventconsumer" 35) 36 37// KnotConsumer is the surface area the rest of tack uses to interact 38// with the knot event-stream subscriber. It exists so we can test 39// knot interactions from tests. 40// 41// Implementations must be safe for concurrent use: AddKnot is invoked 42// from the jetstream goroutine while the consumer's worker goroutines 43// are independently processing inbound knot events. 44// 45// The fake at knot_fake.go provides a no-network implementation suitable 46// for use from tests. 47type KnotConsumer interface { 48 // AddKnot subscribes to the given knot's /events websocket if not 49 // already subscribed. Calling with the same knot more than once is 50 // a no-op. An empty knot string is ignored. The supplied context 51 // scopes the dial; cancelling it tears the subscription down. 52 AddKnot(ctx context.Context, knot string) 53 54 // RemoveKnot stops processing events from the given knot. It is the 55 // inverse of AddKnot and must tolerate being called for a knot that 56 // was never added (no-op). An empty knot string is ignored. 57 // 58 // The production implementation is currently a no-op: tangled-core's 59 // eventconsumer does not expose a way to drop an individual source's 60 // websocket. Tracked upstream as 61 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510 62 RemoveKnot(ctx context.Context, knot string) 63} 64 65// knotConsumer is the production KnotConsumer. It wraps 66// eventconsumer.Consumer with the small surface the rest of tack actually 67// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 68// owned by main. 69// 70// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 71// callers from importing eventconsumer just to construct a KnotSource, 72// and lets us swap or extend the underlying transport later. 73type knotConsumer struct { 74 c *eventconsumer.Consumer 75 log *slog.Logger 76} 77 78// Compile-time interface conformance check. 79var _ KnotConsumer = (*knotConsumer)(nil) 80 81// startKnotConsumer builds a knot event consumer pre-loaded with every 82// knot already known to the store, starts its connection loops in the 83// background, and returns the wrapper. The consumer keeps running until 84// ctx is cancelled. 85// 86// Cursor persistence is intentionally in-memory for now: we only log 87// events, so re-receiving a few seconds of pipeline triggers after a 88// restart is harmless. When we start translating triggers into real 89// Buildkite builds, this should switch to a SQLite-backed cursor store 90// to avoid duplicate builds. 91func startKnotConsumer(ctx context.Context, cfg config, st *store) (*knotConsumer, error) { 92 logger := loggerFrom(ctx).With("component", "knotconsumer") 93 94 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 95 if err != nil { 96 return nil, fmt.Errorf("load known knots: %w", err) 97 } 98 99 kc := &knotConsumer{log: logger} 100 101 ccfg := eventconsumer.NewConsumerConfig() 102 ccfg.Logger = logger 103 ccfg.Dev = cfg.Dev 104 ccfg.ProcessFunc = kc.process 105 for _, k := range knots { 106 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 107 logger.Info("seeding knot source", "knot", k) 108 } 109 kc.c = eventconsumer.NewConsumer(*ccfg) 110 111 // Start workers + per-source connection loops. Consumer.Start is 112 // non-blocking; the goroutines it spawns observe ctx for shutdown. 113 kc.c.Start(ctx) 114 logger.Info("knot consumer started", "initial_knots", len(knots)) 115 116 return kc, nil 117} 118 119// AddKnot subscribes to a knot we hadn't been watching before. Safe to 120// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 121// source's Key (the knot hostname), so passing the same knot twice is a 122// no-op. 123func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 124 if knot == "" { 125 return 126 } 127 k.log.Info("adding knot source", "knot", knot) 128 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 129} 130 131// RemoveKnot is currently a no-op — see the interface comment for the 132// upstream blocker. We still log at info so operators can see when the 133// reconciliation logic *would* have unsubscribed; once eventconsumer 134// gains a RemoveSource primitive, swap the body for a real call. 135func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) { 136 if knot == "" { 137 return 138 } 139 k.log.Info("remove knot source requested (no-op until upstream supports it)", 140 "knot", knot, 141 ) 142} 143 144// Stop tears down all knot websocket connections and waits for the 145// consumer's goroutines to exit. It must be called exactly once. 146func (k *knotConsumer) Stop() { 147 k.c.Stop() 148} 149 150// process is the ProcessFunc handed to eventconsumer. It runs once per 151// inbound message, on a worker goroutine. For now we only care about 152// pipeline records — everything else is logged at debug and dropped. 153// 154// Returning an error only logs it (the consumer keeps reading); the 155// cursor is advanced before the ProcessFunc runs, so a returned error 156// does *not* cause a replay. 157func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 158 switch msg.Nsid { 159 case tangled.PipelineNSID: 160 var p tangled.Pipeline 161 if err := json.Unmarshal(msg.EventJson, &p); err != nil { 162 k.log.Error("decode pipeline", 163 "err", err, 164 "knot", src.Key(), 165 "rkey", msg.Rkey, 166 ) 167 return err 168 } 169 170 // Pull a couple of fields out of the trigger metadata for log 171 // context. They're all optional in the schema, so each one is 172 // guarded — we want a noisy log entry, not a nil-deref. 173 var ( 174 triggerKind string 175 repoDid string 176 repoName string 177 ) 178 if p.TriggerMetadata != nil { 179 triggerKind = p.TriggerMetadata.Kind 180 if p.TriggerMetadata.Repo != nil { 181 repoDid = p.TriggerMetadata.Repo.Did 182 if p.TriggerMetadata.Repo.Repo != nil { 183 repoName = *p.TriggerMetadata.Repo.Repo 184 } 185 } 186 } 187 188 k.log.Info("pipeline event", 189 "knot", src.Key(), 190 "rkey", msg.Rkey, 191 "trigger", triggerKind, 192 "repo_did", repoDid, 193 "repo", repoName, 194 "workflows", len(p.Workflows), 195 ) 196 197 default: 198 // Knots may publish other record types over the same stream; we 199 // don't care about them yet. Debug-only so it's available when 200 // chasing "why isn't my pipeline firing" but doesn't drown out 201 // info-level logs. 202 k.log.Debug("ignored knot event", 203 "knot", src.Key(), 204 "nsid", msg.Nsid, 205 "rkey", msg.Rkey, 206 ) 207 } 208 209 return nil 210}