Stitch any CI into Tangled
1package main
2
3// In-process event broker for the /events websocket fan-out.
4//
5// Lifecycle of an outbound event:
6//
7// publisher (e.g. Buildkite webhook handler)
8// │
9// ▼
10// broker.Publish ─── store.InsertEvent ──▶ events table (rowid = cursor)
11// │
12// └── notify() ──▶ each subscriber's signal channel
13// │
14// ▼
15// /events handler wakes,
16// calls store.EventsAfter(cursor),
17// writes envelope JSON to its websocket.
18//
19// Subscribers don't receive events through the channel directly — only
20// a "wake up" signal. They re-read from the store using the cursor they
21// last delivered. This means:
22//
23// - slow clients can't make us drop events (they just lag behind in
24// rowid space and catch up on their own pace),
25// - reconnecting clients can resume by passing ?cursor=N, hitting the
26// same EventsAfter path used for live deliveries,
27// - we never have to bound a per-subscriber buffer.
28//
29// This mirrors the upstream Tangled spindle's notifier+stream design
30// (see tangled.org/core/spindle/stream.go), which is the source of
31// truth for the wire format on /events.
32
33import (
34 "context"
35 "encoding/json"
36 "sync"
37)
38
39// eventsEnvelope is the wire shape we emit on /events frames. It must
40// match the upstream Tangled spindle byte-for-byte so the appview's
41// eventconsumer treats us as a drop-in source.
42//
43// Upstream defines this shape in two places that don't quite agree:
44//
45// - The producer (tangled.org/core/spindle/stream.go, streamPipelines)
46// marshals an inline map[string]any with lowercase keys.
47// - The consumer (tangled.org/core/eventconsumer.Message) is exported
48// but its Rkey/Nsid fields are missing JSON tags, so reusing it for
49// marshalling here would emit "Rkey"/"Nsid" — accepted on read only
50// because Go's json package matches field names case-insensitively.
51//
52// Defining our own struct keeps the wire output identical to the
53// upstream producer and lets every site that emits an event (the
54// /events handler today, future re-publishers tomorrow) share one
55// canonical type.
56//
57// Event is held as RawMessage so callers can splice a stored record
58// body straight in without an unmarshal/remarshal round-trip.
59type eventsEnvelope struct {
60 Rkey string `json:"rkey"`
61 Nsid string `json:"nsid"`
62 Event json.RawMessage `json:"event"`
63 Created int64 `json:"created"`
64}
65
66// broker fans out event-table writes to connected /events subscribers.
67// Construct with newBroker; safe for concurrent use.
68type broker struct {
69 st *store
70
71 mu sync.Mutex
72 subs map[chan struct{}]struct{}
73}
74
75// newBroker returns a broker bound to st. The store is used both for
76// durable writes in Publish and for cursor-based reads in EventsAfter
77// from the /events handler.
78func newBroker(st *store) *broker {
79 return &broker{
80 st: st,
81 subs: make(map[chan struct{}]struct{}),
82 }
83}
84
85// Subscribe registers a new subscriber and returns its signal channel.
86// The channel is buffered with a capacity of 1: notify() does a
87// non-blocking send, so a pending notification simply coalesces with
88// the next one rather than blocking the publisher. Subscribers must
89// call Unsubscribe when done to free the slot.
90func (b *broker) Subscribe() chan struct{} {
91 ch := make(chan struct{}, 1)
92 b.mu.Lock()
93 b.subs[ch] = struct{}{}
94 b.mu.Unlock()
95 return ch
96}
97
98// Unsubscribe removes ch from the broker. Safe to call with a channel
99// that was never subscribed (no-op) or to call more than once.
100func (b *broker) Unsubscribe(ch chan struct{}) {
101 b.mu.Lock()
102 delete(b.subs, ch)
103 b.mu.Unlock()
104}
105
106// Publish persists an event and wakes every subscriber. The returned
107// int64 is the assigned cursor (rowid) for the new row, useful in tests
108// and for any caller that wants to log "published as cursor=N".
109//
110// eventJSON is the record body — typically a marshalled
111// tangled.PipelineStatus. The caller is responsible for choosing rkey
112// (atproto record key) and nsid (collection NSID).
113func (b *broker) Publish(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
114 created, err := b.st.InsertEvent(ctx, rkey, nsid, eventJSON)
115 if err != nil {
116 return 0, err
117 }
118 b.notify()
119 return created, nil
120}
121
122// notify sends a non-blocking signal to every subscriber. Held lock
123// covers iteration only — the send itself is O(1) and never blocks
124// because of the buffered channel + default case.
125func (b *broker) notify() {
126 b.mu.Lock()
127 defer b.mu.Unlock()
128 for ch := range b.subs {
129 select {
130 case ch <- struct{}{}:
131 default:
132 // A previous signal hasn't been drained yet; coalesce —
133 // the subscriber will catch up on the next read since it
134 // queries by cursor, not by message count.
135 }
136 }
137}