Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 13 kB View raw
1package main 2 3// Knot event-stream subscriber. 4// 5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6// streams JSON-wrapped record events for that knot, including the 7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8// *not* come over the AT Proto firehose (jetstream); the knot publishes 9// them itself, so as a spindle we have to dial each knot whose repos have 10// pointed at us. 11// 12// We use tangled-core's `eventconsumer` package, which already handles 13// per-source connection management, retries, ordered processing and 14// cursor tracking. We hand it: 15// 16// 1. The initial set of knots, derived from previously-observed 17// sh.tangled.repo records that named us as their .spindle field. 18// 2. A ProcessFunc that, for now, simply logs every received event. 19// Once the build pipeline is wired up this is where pipeline 20// triggers will be translated into Buildkite builds. 21// 22// The jetstream consumer also gets a back-reference (via the 23// KnotConsumer interface) so it can dynamically subscribe a new knot — 24// or unsubscribe one — the moment a matching sh.tangled.repo record 25// arrives, without waiting for a tack restart. 26 27import ( 28 "context" 29 "encoding/json" 30 "fmt" 31 "log/slog" 32 "time" 33 34 "tangled.org/core/api/tangled" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/eventconsumer/cursor" 37) 38 39// KnotConsumer is the surface area the rest of tack uses to interact 40// with the knot event-stream subscriber. It exists so we can test 41// knot interactions from tests. 42// 43// Implementations must be safe for concurrent use: AddKnot is invoked 44// from the jetstream goroutine while the consumer's worker goroutines 45// are independently processing inbound knot events. 46// 47// The fake at knot_fake.go provides a no-network implementation suitable 48// for use from tests. 49type KnotConsumer interface { 50 // AddKnot subscribes to the given knot's /events websocket if not 51 // already subscribed. Calling with the same knot more than once is 52 // a no-op. An empty knot string is ignored. The supplied context 53 // scopes the dial; cancelling it tears the subscription down. 54 AddKnot(ctx context.Context, knot string) 55 56 // RemoveKnot stops processing events from the given knot. It is the 57 // inverse of AddKnot and must tolerate being called for a knot that 58 // was never added (no-op). An empty knot string is ignored. 59 RemoveKnot(ctx context.Context, knot string) 60} 61 62// knotConsumer is the production KnotConsumer. It wraps 63// eventconsumer.Consumer with the small surface the rest of tack actually 64// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 65// owned by main. 66// 67// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 68// callers from importing eventconsumer just to construct a KnotSource, 69// and lets us swap or extend the underlying transport later. 70type knotConsumer struct { 71 c *eventconsumer.Consumer 72 log *slog.Logger 73 74 // cursors is the persistent cursor store the underlying consumer 75 // reads at every (re)connect. We retain a reference here so we 76 // can pre-seed an entry to "now" the first time we ever see a 77 // knot — see seedCursorIfMissing for why. 78 cursors cursor.Store 79 80 // provider dispatches each incoming pipeline trigger to whatever 81 // backend actually runs it (today: the fake provider; tomorrow: 82 // Buildkite). The consumer doesn't care which — it just hands 83 // over the decoded record and lets the provider publish status 84 // records back through its own broker connection. 85 provider Provider 86 87 // store, hostname, and ownerDID are the inputs to the 88 // pre-dispatch authorization check. We hold them on the 89 // consumer (instead of threading them through every call) so 90 // process(), which runs once per inbound knot event on a 91 // worker goroutine, can ask `is this trigger allowed?` against 92 // the persisted Tangled membership state without the rest of 93 // tack having to know it cares. 94 store *store 95 hostname string 96 ownerDID string 97} 98 99// Compile-time interface conformance check. 100var _ KnotConsumer = (*knotConsumer)(nil) 101 102// startKnotConsumer builds a knot event consumer pre-loaded with every 103// knot already known to the store, starts its connection loops in the 104// background, and returns the wrapper. The consumer keeps running until 105// ctx is cancelled. 106// 107// Cursor persistence is backed by tangled-core's SQLite cursor store 108// pointed at the same database file as the rest of tack. Without 109// this, a restart would replay every recent pipeline event from each 110// knot and fire a duplicate Buildkite build for each one. The 111// cursor.SqliteStore opens its own *sql.DB on the file, but 112// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and 113// the `cursors` table the upstream package creates doesn't collide 114// with any of our migrations. 115func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) { 116 logger := loggerFrom(ctx).With("component", "knotconsumer") 117 118 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname, cfg.OwnerDID) 119 if err != nil { 120 return nil, fmt.Errorf("load known knots: %w", err) 121 } 122 123 // Persistent per-source cursor store. Keyed by source.Key() (the 124 // knot hostname for KnotSource), so each knot resumes exactly 125 // where it left off and we don't re-fire builds for events that 126 // were already processed before the previous shutdown. 127 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath) 128 if err != nil { 129 return nil, fmt.Errorf("open knot cursor store: %w", err) 130 } 131 132 kc := &knotConsumer{ 133 log: logger, 134 cursors: cursorStore, 135 provider: provider, 136 store: st, 137 hostname: cfg.Hostname, 138 ownerDID: cfg.OwnerDID, 139 } 140 141 ccfg := eventconsumer.NewConsumerConfig() 142 ccfg.Logger = logger 143 ccfg.Dev = cfg.Dev 144 ccfg.ProcessFunc = kc.process 145 ccfg.CursorStore = cursorStore 146 for _, k := range knots { 147 // Pin a brand-new knot's cursor to "now" before the consumer 148 // ever connects, so a fresh tack install doesn't replay every 149 // historical pipeline event the knot has retained and fire a 150 // duplicate Buildkite build for each one. 151 kc.seedCursorIfMissing(k) 152 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 153 logger.Info("seeding knot source", "knot", k) 154 } 155 kc.c = eventconsumer.NewConsumer(*ccfg) 156 157 // Start workers + per-source connection loops. Consumer.Start is 158 // non-blocking; the goroutines it spawns observe ctx for shutdown. 159 kc.c.Start(ctx) 160 logger.Info("knot consumer started", "initial_knots", len(knots)) 161 162 return kc, nil 163} 164 165// AddKnot subscribes to a knot we hadn't been watching before. Safe to 166// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 167// source's Key (the knot hostname), so passing the same knot twice is a 168// no-op. 169func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 170 if knot == "" { 171 return 172 } 173 // Same first-run protection as in startKnotConsumer: a knot we 174 // have never observed before must not retroactively fire 175 // pipelines for triggers older than the moment we learned about 176 // it. AddSource reads the cursor synchronously when it dials. 177 k.seedCursorIfMissing(knot) 178 k.log.Info("adding knot source", "knot", knot) 179 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 180} 181 182// seedCursorIfMissing writes the current time (as nanoseconds since 183// the unix epoch, the same unit eventconsumer.worker uses when it 184// advances cursors after each message) to the cursor store for knot 185// iff no cursor is already persisted for it. 186// 187// Without this, a brand-new install — or the first time we ever 188// dial a previously-unseen knot — connects to /events with no 189// cursor query parameter, which the knot servers interpret as 190// "stream from the beginning of time." For pipeline records that 191// would fire one Buildkite build per historical trigger the knot 192// still has retained. Pinning the cursor up-front limits us to 193// events that arrive *after* tack learned about the knot. 194// 195// Get returning 0 means "no cursor stored" across all the cursor 196// store implementations we use (memory/sqlite/redis), so it's a 197// safe sentinel to gate the write. 198func (k *knotConsumer) seedCursorIfMissing(knot string) { 199 if k.cursors.Get(knot) != 0 { 200 return 201 } 202 now := time.Now().UnixNano() 203 k.cursors.Set(knot, now) 204 k.log.Info("seeded fresh knot cursor to now", 205 "knot", knot, 206 "cursor", now, 207 ) 208} 209 210// RemoveKnot unsubscribes from a knot that no longer has any authorized 211// repos pointing at this spindle. Safe to call repeatedly: 212// eventconsumer.Consumer.RemoveSource ignores sources that are not present. 213func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) { 214 if knot == "" { 215 return 216 } 217 k.log.Info("removing knot source", "knot", knot) 218 k.c.RemoveSource(eventconsumer.NewKnotSource(knot)) 219} 220 221// Stop tears down all knot websocket connections and waits for the 222// consumer's goroutines to exit. It must be called exactly once. 223func (k *knotConsumer) Stop() { 224 k.c.Stop() 225} 226 227// process is the ProcessFunc handed to eventconsumer. It runs once per 228// inbound message, on a worker goroutine. For now we only care about 229// pipeline records — everything else is logged at debug and dropped. 230// 231// Returning an error only logs it (the consumer keeps reading); the 232// cursor is advanced before the ProcessFunc runs, so a returned error 233// does *not* cause a replay. 234func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 235 switch msg.Nsid { 236 case tangled.PipelineNSID: 237 var p tangled.Pipeline 238 if err := json.Unmarshal(msg.EventJson, &p); err != nil { 239 k.log.Error("decode pipeline", 240 "err", err, 241 "knot", src.Key(), 242 "rkey", msg.Rkey, 243 ) 244 return err 245 } 246 247 // Pull a couple of fields out of the trigger metadata for log 248 // context. They're all optional in the schema, so each one is 249 // guarded — we want a noisy log entry, not a nil-deref. 250 var ( 251 triggerKind string 252 repoDid string 253 repoName string 254 ) 255 if p.TriggerMetadata != nil { 256 triggerKind = p.TriggerMetadata.Kind 257 if p.TriggerMetadata.Repo != nil { 258 repoDid = p.TriggerMetadata.Repo.Did 259 if p.TriggerMetadata.Repo.Repo != nil { 260 repoName = *p.TriggerMetadata.Repo.Repo 261 } 262 } 263 } 264 265 k.log.Info("pipeline event", 266 "knot", src.Key(), 267 "rkey", msg.Rkey, 268 "trigger", triggerKind, 269 "repo_did", repoDid, 270 "repo", repoName, 271 "workflows", len(p.Workflows), 272 ) 273 274 // Authorization gate. The knot's /events stream will hand 275 // us every pipeline record it publishes, including ones 276 // for repos that never opted into us. Being subscribed 277 // to a knot says nothing about which repos on it we 278 // actually serve. Without this check, any DID that can 279 // publish a sh.tangled.pipeline record on a knot we 280 // already watch could trigger CI on our spindle. 281 // 282 // We consult the persisted state mirrored from the AT 283 // Proto firehose: the trigger's repo must have declared 284 // us as its spindle on this knot, and its publisher DID 285 // must be the spindle owner or a member they vouched for 286 // via sh.tangled.spindle.member. See 287 // store.AuthorizePipelineActor for the precise rules. 288 ok, reason, err := k.store.AuthorizePipelineActor( 289 ctx, k.hostname, src.Key(), k.ownerDID, 290 repoDid, repoName, 291 ) 292 if err != nil { 293 // SQL/IO failure (not a denial). Log and refuse to 294 // spawn rather than fail-open: a transient store 295 // error during shutdown shouldn't be allowed to 296 // punch a hole in the security gate. 297 k.log.Error("authorize pipeline trigger", 298 "err", err, 299 "knot", src.Key(), 300 "rkey", msg.Rkey, 301 "repo_did", repoDid, 302 "repo", repoName, 303 ) 304 return err 305 } 306 if !ok { 307 k.log.Warn("rejecting unauthorized pipeline trigger", 308 "knot", src.Key(), 309 "rkey", msg.Rkey, 310 "repo_did", repoDid, 311 "repo", repoName, 312 "reason", reason, 313 ) 314 return nil 315 } 316 317 // Hand the trigger to whichever Provider was configured. 318 // Spawn is non-blocking — it fans out into provider-owned 319 // goroutines so this worker can move on to the next event. 320 // The provider keeps ctx around for shutdown coordination. 321 // repoDid is the actor: the publisher of the originating 322 // sh.tangled.repo record, which AuthorizePipelineActor 323 // just confirmed is allowed to spawn work here. 324 k.provider.Spawn(ctx, src.Key(), msg.Rkey, repoDid, p.TriggerMetadata, p.Workflows) 325 326 default: 327 // Knots may publish other record types over the same stream; we 328 // don't care about them yet. Debug-only so it's available when 329 // chasing "why isn't my pipeline firing" but doesn't drown out 330 // info-level logs. 331 k.log.Debug("ignored knot event", 332 "knot", src.Key(), 333 "nsid", msg.Nsid, 334 "rkey", msg.Rkey, 335 ) 336 } 337 338 return nil 339}