Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// In-process event broker for the /events websocket fan-out. 4// 5// Lifecycle of an outbound event: 6// 7// publisher (e.g. Buildkite webhook handler) 8// │ 9// ▼ 10// broker.Publish ─── store.InsertEvent ──▶ events table (rowid = cursor) 11// │ 12// └── notify() ──▶ each subscriber's signal channel 13// │ 14// ▼ 15// /events handler wakes, 16// calls store.EventsAfter(cursor), 17// writes envelope JSON to its websocket. 18// 19// Subscribers don't receive events through the channel directly — only 20// a "wake up" signal. They re-read from the store using the cursor they 21// last delivered. This means: 22// 23// - slow clients can't make us drop events (they just lag behind in 24// rowid space and catch up on their own pace), 25// - reconnecting clients can resume by passing ?cursor=N, hitting the 26// same EventsAfter path used for live deliveries, 27// - we never have to bound a per-subscriber buffer. 28// 29// This mirrors the upstream Tangled spindle's notifier+stream design 30// (see tangled.org/core/spindle/stream.go), which is the source of 31// truth for the wire format on /events. 32 33import ( 34 "context" 35 "encoding/json" 36 "sync" 37) 38 39// eventsEnvelope is the wire shape we emit on /events frames. It must 40// match the upstream Tangled spindle byte-for-byte so the appview's 41// eventconsumer treats us as a drop-in source. 42// 43// Upstream defines this shape in two places that don't quite agree: 44// 45// - The producer (tangled.org/core/spindle/stream.go, streamPipelines) 46// marshals an inline map[string]any with lowercase keys. 47// - The consumer (tangled.org/core/eventconsumer.Message) is exported 48// but its Rkey/Nsid fields are missing JSON tags, so reusing it for 49// marshalling here would emit "Rkey"/"Nsid" — accepted on read only 50// because Go's json package matches field names case-insensitively. 51// 52// Defining our own struct keeps the wire output identical to the 53// upstream producer and lets every site that emits an event (the 54// /events handler today, future re-publishers tomorrow) share one 55// canonical type. 56// 57// Event is held as RawMessage so callers can splice a stored record 58// body straight in without an unmarshal/remarshal round-trip. 59type eventsEnvelope struct { 60 Rkey string `json:"rkey"` 61 Nsid string `json:"nsid"` 62 Event json.RawMessage `json:"event"` 63 Created int64 `json:"created"` 64} 65 66// broker fans out event-table writes to connected /events subscribers. 67// Construct with newBroker; safe for concurrent use. 68type broker struct { 69 st *store 70 71 mu sync.Mutex 72 subs map[chan struct{}]struct{} 73} 74 75// newBroker returns a broker bound to st. The store is used both for 76// durable writes in Publish and for cursor-based reads in EventsAfter 77// from the /events handler. 78func newBroker(st *store) *broker { 79 return &broker{ 80 st: st, 81 subs: make(map[chan struct{}]struct{}), 82 } 83} 84 85// Subscribe registers a new subscriber and returns its signal channel. 86// The channel is buffered with a capacity of 1: notify() does a 87// non-blocking send, so a pending notification simply coalesces with 88// the next one rather than blocking the publisher. Subscribers must 89// call Unsubscribe when done to free the slot. 90func (b *broker) Subscribe() chan struct{} { 91 ch := make(chan struct{}, 1) 92 b.mu.Lock() 93 b.subs[ch] = struct{}{} 94 b.mu.Unlock() 95 return ch 96} 97 98// Unsubscribe removes ch from the broker. Safe to call with a channel 99// that was never subscribed (no-op) or to call more than once. 100func (b *broker) Unsubscribe(ch chan struct{}) { 101 b.mu.Lock() 102 delete(b.subs, ch) 103 b.mu.Unlock() 104} 105 106// Publish persists an event and wakes every subscriber. The returned 107// int64 is the assigned cursor (rowid) for the new row, useful in tests 108// and for any caller that wants to log "published as cursor=N". 109// 110// eventJSON is the record body — typically a marshalled 111// tangled.PipelineStatus. The caller is responsible for choosing rkey 112// (atproto record key) and nsid (collection NSID). 113func (b *broker) Publish(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) { 114 created, err := b.st.InsertEvent(ctx, rkey, nsid, eventJSON) 115 if err != nil { 116 return 0, err 117 } 118 b.notify() 119 return created, nil 120} 121 122// notify sends a non-blocking signal to every subscriber. Held lock 123// covers iteration only — the send itself is O(1) and never blocks 124// because of the buffered channel + default case. 125func (b *broker) notify() { 126 b.mu.Lock() 127 defer b.mu.Unlock() 128 for ch := range b.subs { 129 select { 130 case ch <- struct{}{}: 131 default: 132 // A previous signal hasn't been drained yet; coalesce — 133 // the subscriber will catch up on the next read since it 134 // queries by cursor, not by message count. 135 } 136 } 137}