Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Knot event-stream subscriber. 4// 5// Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6// streams JSON-wrapped record events for that knot, including the 7// sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8// *not* come over the AT Proto firehose (jetstream); the knot publishes 9// them itself, so as a spindle we have to dial each knot whose repos have 10// pointed at us. 11// 12// We use tangled-core's `eventconsumer` package, which already handles 13// per-source connection management, retries, ordered processing and 14// cursor tracking. We hand it: 15// 16// 1. The initial set of knots, derived from previously-observed 17// sh.tangled.repo records that named us as their .spindle field. 18// 2. A ProcessFunc that, for now, simply logs every received event. 19// Once the build pipeline is wired up this is where pipeline 20// triggers will be translated into Buildkite builds. 21// 22// The jetstream consumer also gets a back-reference (via the 23// KnotConsumer interface) so it can dynamically subscribe a new knot — 24// or, conceptually, unsubscribe one — the moment a matching 25// sh.tangled.repo record arrives, without waiting for a tack restart. 26 27import ( 28 "context" 29 "encoding/json" 30 "fmt" 31 "log/slog" 32 "time" 33 34 "tangled.org/core/api/tangled" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/eventconsumer/cursor" 37) 38 39// KnotConsumer is the surface area the rest of tack uses to interact 40// with the knot event-stream subscriber. It exists so we can test 41// knot interactions from tests. 42// 43// Implementations must be safe for concurrent use: AddKnot is invoked 44// from the jetstream goroutine while the consumer's worker goroutines 45// are independently processing inbound knot events. 46// 47// The fake at knot_fake.go provides a no-network implementation suitable 48// for use from tests. 49type KnotConsumer interface { 50 // AddKnot subscribes to the given knot's /events websocket if not 51 // already subscribed. Calling with the same knot more than once is 52 // a no-op. An empty knot string is ignored. The supplied context 53 // scopes the dial; cancelling it tears the subscription down. 54 AddKnot(ctx context.Context, knot string) 55 56 // RemoveKnot stops processing events from the given knot. It is the 57 // inverse of AddKnot and must tolerate being called for a knot that 58 // was never added (no-op). An empty knot string is ignored. 59 // 60 // The production implementation is currently a no-op: tangled-core's 61 // eventconsumer does not expose a way to drop an individual source's 62 // websocket. Tracked upstream as 63 // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510 64 RemoveKnot(ctx context.Context, knot string) 65} 66 67// knotConsumer is the production KnotConsumer. It wraps 68// eventconsumer.Consumer with the small surface the rest of tack actually 69// wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 70// owned by main. 71// 72// Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 73// callers from importing eventconsumer just to construct a KnotSource, 74// and lets us swap or extend the underlying transport later. 75type knotConsumer struct { 76 c *eventconsumer.Consumer 77 log *slog.Logger 78 79 // cursors is the persistent cursor store the underlying consumer 80 // reads at every (re)connect. We retain a reference here so we 81 // can pre-seed an entry to "now" the first time we ever see a 82 // knot — see seedCursorIfMissing for why. 83 cursors cursor.Store 84 85 // provider dispatches each incoming pipeline trigger to whatever 86 // backend actually runs it (today: the fake provider; tomorrow: 87 // Buildkite). The consumer doesn't care which — it just hands 88 // over the decoded record and lets the provider publish status 89 // records back through its own broker connection. 90 provider Provider 91 92 // store, hostname, and ownerDID are the inputs to the 93 // pre-dispatch authorization check. We hold them on the 94 // consumer (instead of threading them through every call) so 95 // process(), which runs once per inbound knot event on a 96 // worker goroutine, can ask `is this trigger allowed?` against 97 // the persisted Tangled membership state without the rest of 98 // tack having to know it cares. 99 store *store 100 hostname string 101 ownerDID string 102} 103 104// Compile-time interface conformance check. 105var _ KnotConsumer = (*knotConsumer)(nil) 106 107// startKnotConsumer builds a knot event consumer pre-loaded with every 108// knot already known to the store, starts its connection loops in the 109// background, and returns the wrapper. The consumer keeps running until 110// ctx is cancelled. 111// 112// Cursor persistence is backed by tangled-core's SQLite cursor store 113// pointed at the same database file as the rest of tack. Without 114// this, a restart would replay every recent pipeline event from each 115// knot and fire a duplicate Buildkite build for each one. The 116// cursor.SqliteStore opens its own *sql.DB on the file, but 117// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and 118// the `cursors` table the upstream package creates doesn't collide 119// with any of our migrations. 120func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) { 121 logger := loggerFrom(ctx).With("component", "knotconsumer") 122 123 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 124 if err != nil { 125 return nil, fmt.Errorf("load known knots: %w", err) 126 } 127 128 // Persistent per-source cursor store. Keyed by source.Key() (the 129 // knot hostname for KnotSource), so each knot resumes exactly 130 // where it left off and we don't re-fire builds for events that 131 // were already processed before the previous shutdown. 132 cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath) 133 if err != nil { 134 return nil, fmt.Errorf("open knot cursor store: %w", err) 135 } 136 137 kc := &knotConsumer{ 138 log: logger, 139 cursors: cursorStore, 140 provider: provider, 141 store: st, 142 hostname: cfg.Hostname, 143 ownerDID: cfg.OwnerDID, 144 } 145 146 ccfg := eventconsumer.NewConsumerConfig() 147 ccfg.Logger = logger 148 ccfg.Dev = cfg.Dev 149 ccfg.ProcessFunc = kc.process 150 ccfg.CursorStore = cursorStore 151 for _, k := range knots { 152 // Pin a brand-new knot's cursor to "now" before the consumer 153 // ever connects, so a fresh tack install doesn't replay every 154 // historical pipeline event the knot has retained and fire a 155 // duplicate Buildkite build for each one. 156 kc.seedCursorIfMissing(k) 157 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 158 logger.Info("seeding knot source", "knot", k) 159 } 160 kc.c = eventconsumer.NewConsumer(*ccfg) 161 162 // Start workers + per-source connection loops. Consumer.Start is 163 // non-blocking; the goroutines it spawns observe ctx for shutdown. 164 kc.c.Start(ctx) 165 logger.Info("knot consumer started", "initial_knots", len(knots)) 166 167 return kc, nil 168} 169 170// AddKnot subscribes to a knot we hadn't been watching before. Safe to 171// call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 172// source's Key (the knot hostname), so passing the same knot twice is a 173// no-op. 174func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 175 if knot == "" { 176 return 177 } 178 // Same first-run protection as in startKnotConsumer: a knot we 179 // have never observed before must not retroactively fire 180 // pipelines for triggers older than the moment we learned about 181 // it. AddSource reads the cursor synchronously when it dials. 182 k.seedCursorIfMissing(knot) 183 k.log.Info("adding knot source", "knot", knot) 184 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 185} 186 187// seedCursorIfMissing writes the current time (as nanoseconds since 188// the unix epoch, the same unit eventconsumer.worker uses when it 189// advances cursors after each message) to the cursor store for knot 190// iff no cursor is already persisted for it. 191// 192// Without this, a brand-new install — or the first time we ever 193// dial a previously-unseen knot — connects to /events with no 194// cursor query parameter, which the knot servers interpret as 195// "stream from the beginning of time." For pipeline records that 196// would fire one Buildkite build per historical trigger the knot 197// still has retained. Pinning the cursor up-front limits us to 198// events that arrive *after* tack learned about the knot. 199// 200// Get returning 0 means "no cursor stored" across all the cursor 201// store implementations we use (memory/sqlite/redis), so it's a 202// safe sentinel to gate the write. 203func (k *knotConsumer) seedCursorIfMissing(knot string) { 204 if k.cursors.Get(knot) != 0 { 205 return 206 } 207 now := time.Now().UnixNano() 208 k.cursors.Set(knot, now) 209 k.log.Info("seeded fresh knot cursor to now", 210 "knot", knot, 211 "cursor", now, 212 ) 213} 214 215// RemoveKnot is currently a no-op — see the interface comment for the 216// upstream blocker. We still log at info so operators can see when the 217// reconciliation logic *would* have unsubscribed; once eventconsumer 218// gains a RemoveSource primitive, swap the body for a real call. 219func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) { 220 if knot == "" { 221 return 222 } 223 k.log.Info("remove knot source requested (no-op until upstream supports it)", 224 "knot", knot, 225 ) 226} 227 228// Stop tears down all knot websocket connections and waits for the 229// consumer's goroutines to exit. It must be called exactly once. 230func (k *knotConsumer) Stop() { 231 k.c.Stop() 232} 233 234// process is the ProcessFunc handed to eventconsumer. It runs once per 235// inbound message, on a worker goroutine. For now we only care about 236// pipeline records — everything else is logged at debug and dropped. 237// 238// Returning an error only logs it (the consumer keeps reading); the 239// cursor is advanced before the ProcessFunc runs, so a returned error 240// does *not* cause a replay. 241func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 242 switch msg.Nsid { 243 case tangled.PipelineNSID: 244 var p tangled.Pipeline 245 if err := json.Unmarshal(msg.EventJson, &p); err != nil { 246 k.log.Error("decode pipeline", 247 "err", err, 248 "knot", src.Key(), 249 "rkey", msg.Rkey, 250 ) 251 return err 252 } 253 254 // Pull a couple of fields out of the trigger metadata for log 255 // context. They're all optional in the schema, so each one is 256 // guarded — we want a noisy log entry, not a nil-deref. 257 var ( 258 triggerKind string 259 repoDid string 260 repoName string 261 ) 262 if p.TriggerMetadata != nil { 263 triggerKind = p.TriggerMetadata.Kind 264 if p.TriggerMetadata.Repo != nil { 265 repoDid = p.TriggerMetadata.Repo.Did 266 if p.TriggerMetadata.Repo.Repo != nil { 267 repoName = *p.TriggerMetadata.Repo.Repo 268 } 269 } 270 } 271 272 k.log.Info("pipeline event", 273 "knot", src.Key(), 274 "rkey", msg.Rkey, 275 "trigger", triggerKind, 276 "repo_did", repoDid, 277 "repo", repoName, 278 "workflows", len(p.Workflows), 279 ) 280 281 // Authorization gate. The knot's /events stream will hand 282 // us every pipeline record it publishes, including ones 283 // for repos that never opted into us. Being subscribed 284 // to a knot says nothing about which repos on it we 285 // actually serve. Without this check, any DID that can 286 // publish a sh.tangled.pipeline record on a knot we 287 // already watch could trigger CI on our spindle. 288 // 289 // We consult the persisted state mirrored from the AT 290 // Proto firehose: the trigger's repo must have declared 291 // us as its spindle on this knot, and its publisher DID 292 // must be the spindle owner or a member they vouched for 293 // via sh.tangled.spindle.member. See 294 // store.AuthorizePipelineActor for the precise rules. 295 ok, reason, err := k.store.AuthorizePipelineActor( 296 ctx, k.hostname, src.Key(), k.ownerDID, 297 repoDid, repoName, 298 ) 299 if err != nil { 300 // SQL/IO failure (not a denial). Log and refuse to 301 // spawn rather than fail-open: a transient store 302 // error during shutdown shouldn't be allowed to 303 // punch a hole in the security gate. 304 k.log.Error("authorize pipeline trigger", 305 "err", err, 306 "knot", src.Key(), 307 "rkey", msg.Rkey, 308 "repo_did", repoDid, 309 "repo", repoName, 310 ) 311 return err 312 } 313 if !ok { 314 k.log.Warn("rejecting unauthorized pipeline trigger", 315 "knot", src.Key(), 316 "rkey", msg.Rkey, 317 "repo_did", repoDid, 318 "repo", repoName, 319 "reason", reason, 320 ) 321 return nil 322 } 323 324 // Hand the trigger to whichever Provider was configured. 325 // Spawn is non-blocking — it fans out into provider-owned 326 // goroutines so this worker can move on to the next event. 327 // The provider keeps ctx around for shutdown coordination. 328 // repoDid is the actor: the publisher of the originating 329 // sh.tangled.repo record, which AuthorizePipelineActor 330 // just confirmed is allowed to spawn work here. 331 k.provider.Spawn(ctx, src.Key(), msg.Rkey, repoDid, p.TriggerMetadata, p.Workflows) 332 333 default: 334 // Knots may publish other record types over the same stream; we 335 // don't care about them yet. Debug-only so it's available when 336 // chasing "why isn't my pipeline firing" but doesn't drown out 337 // info-level logs. 338 k.log.Debug("ignored knot event", 339 "knot", src.Key(), 340 "nsid", msg.Nsid, 341 "rkey", msg.Rkey, 342 ) 343 } 344 345 return nil 346}