Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Knot event-stream subscriber. 4// 5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6// streams JSON-wrapped record events for that knot, including the 7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8// *not* come over the AT Proto firehose (jetstream); the knot publishes 9// them itself, so as a spindle we have to dial each knot whose repos have 10// pointed at us. 11// 12// We use tangled-core's `eventconsumer` package, which already handles 13// per-source connection management, retries, ordered processing and 14// cursor tracking. We hand it: 15// 16// 1. The initial set of knots, derived from previously-observed 17// sh.tangled.repo records that named us as their .spindle field. 18// 2. A ProcessFunc that, for now, simply logs every received event. 19// Once the build pipeline is wired up this is where pipeline 20// triggers will be translated into Buildkite builds. 21// 22// The jetstream consumer also gets a back-reference (via the 23// KnotConsumer interface) so it can dynamically subscribe a new knot — 24// or, conceptually, unsubscribe one — the moment a matching 25// sh.tangled.repo record arrives, without waiting for a tack restart. 26 27import ( 28 "context" 29 "encoding/json" 30 "fmt" 31 "log/slog" 32 33 "tangled.org/core/api/tangled" 34 "tangled.org/core/eventconsumer" 35 "tangled.org/core/eventconsumer/cursor" 36) 37 38// KnotConsumer is the surface area the rest of tack uses to interact 39// with the knot event-stream subscriber. It exists so we can test 40// knot interactions from tests. 41// 42// Implementations must be safe for concurrent use: AddKnot is invoked 43// from the jetstream goroutine while the consumer's worker goroutines 44// are independently processing inbound knot events. 45// 46// The fake at knot_fake.go provides a no-network implementation suitable 47// for use from tests. 48type KnotConsumer interface { 49 // AddKnot subscribes to the given knot's /events websocket if not 50 // already subscribed. Calling with the same knot more than once is 51 // a no-op. An empty knot string is ignored. The supplied context 52 // scopes the dial; cancelling it tears the subscription down. 53 AddKnot(ctx context.Context, knot string) 54 55 // RemoveKnot stops processing events from the given knot. It is the 56 // inverse of AddKnot and must tolerate being called for a knot that 57 // was never added (no-op). An empty knot string is ignored. 58 // 59 // The production implementation is currently a no-op: tangled-core's 60 // eventconsumer does not expose a way to drop an individual source's 61 // websocket. Tracked upstream as 62 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510 63 RemoveKnot(ctx context.Context, knot string) 64} 65 66// knotConsumer is the production KnotConsumer. It wraps 67// eventconsumer.Consumer with the small surface the rest of tack actually 68// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 69// owned by main. 70// 71// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 72// callers from importing eventconsumer just to construct a KnotSource, 73// and lets us swap or extend the underlying transport later. 74type knotConsumer struct { 75 c *eventconsumer.Consumer 76 log *slog.Logger 77 78 // provider dispatches each incoming pipeline trigger to whatever 79 // backend actually runs it (today: the fake provider; tomorrow: 80 // Buildkite). The consumer doesn't care which — it just hands 81 // over the decoded record and lets the provider publish status 82 // records back through its own broker connection. 83 provider Provider 84} 85 86// Compile-time interface conformance check. 87var _ KnotConsumer = (*knotConsumer)(nil) 88 89// startKnotConsumer builds a knot event consumer pre-loaded with every 90// knot already known to the store, starts its connection loops in the 91// background, and returns the wrapper. The consumer keeps running until 92// ctx is cancelled. 93// 94// Cursor persistence is backed by tangled-core's SQLite cursor store 95// pointed at the same database file as the rest of tack. Without 96// this, a restart would replay every recent pipeline event from each 97// knot and fire a duplicate Buildkite build for each one. The 98// cursor.SqliteStore opens its own *sql.DB on the file, but 99// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and 100// the `cursors` table the upstream package creates doesn't collide 101// with any of our migrations. 102func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) { 103 logger := loggerFrom(ctx).With("component", "knotconsumer") 104 105 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 106 if err != nil { 107 return nil, fmt.Errorf("load known knots: %w", err) 108 } 109 110 // Persistent per-source cursor store. Keyed by source.Key() (the 111 // knot hostname for KnotSource), so each knot resumes exactly 112 // where it left off and we don't re-fire builds for events that 113 // were already processed before the previous shutdown. 114 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath) 115 if err != nil { 116 return nil, fmt.Errorf("open knot cursor store: %w", err) 117 } 118 119 kc := &knotConsumer{log: logger, provider: provider} 120 121 ccfg := eventconsumer.NewConsumerConfig() 122 ccfg.Logger = logger 123 ccfg.Dev = cfg.Dev 124 ccfg.ProcessFunc = kc.process 125 ccfg.CursorStore = cursorStore 126 for _, k := range knots { 127 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 128 logger.Info("seeding knot source", "knot", k) 129 } 130 kc.c = eventconsumer.NewConsumer(*ccfg) 131 132 // Start workers + per-source connection loops. Consumer.Start is 133 // non-blocking; the goroutines it spawns observe ctx for shutdown. 134 kc.c.Start(ctx) 135 logger.Info("knot consumer started", "initial_knots", len(knots)) 136 137 return kc, nil 138} 139 140// AddKnot subscribes to a knot we hadn't been watching before. Safe to 141// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 142// source's Key (the knot hostname), so passing the same knot twice is a 143// no-op. 144func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 145 if knot == "" { 146 return 147 } 148 k.log.Info("adding knot source", "knot", knot) 149 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 150} 151 152// RemoveKnot is currently a no-op — see the interface comment for the 153// upstream blocker. We still log at info so operators can see when the 154// reconciliation logic *would* have unsubscribed; once eventconsumer 155// gains a RemoveSource primitive, swap the body for a real call. 156func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) { 157 if knot == "" { 158 return 159 } 160 k.log.Info("remove knot source requested (no-op until upstream supports it)", 161 "knot", knot, 162 ) 163} 164 165// Stop tears down all knot websocket connections and waits for the 166// consumer's goroutines to exit. It must be called exactly once. 167func (k *knotConsumer) Stop() { 168 k.c.Stop() 169} 170 171// process is the ProcessFunc handed to eventconsumer. It runs once per 172// inbound message, on a worker goroutine. For now we only care about 173// pipeline records — everything else is logged at debug and dropped. 174// 175// Returning an error only logs it (the consumer keeps reading); the 176// cursor is advanced before the ProcessFunc runs, so a returned error 177// does *not* cause a replay. 178func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 179 switch msg.Nsid { 180 case tangled.PipelineNSID: 181 var p tangled.Pipeline 182 if err := json.Unmarshal(msg.EventJson, &p); err != nil { 183 k.log.Error("decode pipeline", 184 "err", err, 185 "knot", src.Key(), 186 "rkey", msg.Rkey, 187 ) 188 return err 189 } 190 191 // Pull a couple of fields out of the trigger metadata for log 192 // context. They're all optional in the schema, so each one is 193 // guarded — we want a noisy log entry, not a nil-deref. 194 var ( 195 triggerKind string 196 repoDid string 197 repoName string 198 ) 199 if p.TriggerMetadata != nil { 200 triggerKind = p.TriggerMetadata.Kind 201 if p.TriggerMetadata.Repo != nil { 202 repoDid = p.TriggerMetadata.Repo.Did 203 if p.TriggerMetadata.Repo.Repo != nil { 204 repoName = *p.TriggerMetadata.Repo.Repo 205 } 206 } 207 } 208 209 k.log.Info("pipeline event", 210 "knot", src.Key(), 211 "rkey", msg.Rkey, 212 "trigger", triggerKind, 213 "repo_did", repoDid, 214 "repo", repoName, 215 "workflows", len(p.Workflows), 216 ) 217 218 // Hand the trigger to whichever Provider was configured. 219 // Spawn is non-blocking — it fans out into provider-owned 220 // goroutines so this worker can move on to the next event. 221 // The provider keeps ctx around for shutdown coordination. 222 k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.TriggerMetadata, p.Workflows) 223 224 default: 225 // Knots may publish other record types over the same stream; we 226 // don't care about them yet. Debug-only so it's available when 227 // chasing "why isn't my pipeline firing" but doesn't drown out 228 // info-level logs. 229 k.log.Debug("ignored knot event", 230 "knot", src.Key(), 231 "nsid", msg.Nsid, 232 "rkey", msg.Rkey, 233 ) 234 } 235 236 return nil 237}