Stitch any CI into Tangled
1package main
2
3// This file wires tack into the AT Protocol firehose via Bluesky's
4// "jetstream" — a JSON projection of the firehose served over a websocket
5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of
6// AT Proto: things like "this user is a spindle member", "this repo wants
7// this spindle", etc. are all atproto records published to users' PDSes,
8// and jetstream is how a service like a spindle observes them in real time.
9//
10// As a spindle, the records we care about are:
11//
12// - sh.tangled.spindle.member — owner authorizes a DID to use us
13// - sh.tangled.repo — a repo declares us as its spindle
14// - sh.tangled.repo.collaborator — collaborators on those repos
15//
16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over
17// jetstream; they're delivered by the knot servers via a separate event
18// stream. That is plumbed in separately.)
19
20import (
21 "context"
22 "encoding/json"
23 "fmt"
24 "time"
25
26 "github.com/bluesky-social/jetstream/pkg/client"
27 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
28 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
29 "tangled.org/core/api/tangled"
30)
31
32// jetstream operation strings. The jetstream protocol publishes these as
33// the Commit.Operation field; pulling them out as constants keeps the
34// switch in handleJetstreamEvent honest about typos.
35const (
36 jsOpCreate = "create"
37 jsOpUpdate = "update"
38 jsOpDelete = "delete"
39)
40
41// startJetstream dials the configured jetstream endpoint and spawns a
42// background goroutine that consumes events for the lifetime of ctx. It
43// returns once the client is constructed; connection errors surface in
44// logs, not return values, because the read loop is expected to reconnect
45// on its own.
46//
47// The store is used for two things: loading the persisted cursor so we
48// resume from the last seen event after a restart, and persisting
49// observed records so the rest of tack can answer membership questions
50// without re-reading the firehose.
51//
52// The logger is pulled from ctx (see log.go); falls back to slog.Default()
53// if none is attached.
54func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error {
55 logger := loggerFrom(ctx).With("component", "jetstream")
56
57 // `wantedCollections` is a server-side filter: jetstream will only send
58 // us commit events whose record collection (NSID) is in this list. The
59 // NSIDs come from tangled-core's generated lexicon types so they stay
60 // in sync with whatever the appview/knots are publishing.
61 collections := []string{
62 tangled.SpindleMemberNSID,
63 tangled.RepoNSID,
64 tangled.RepoCollaboratorNSID,
65 }
66
67 // Configure our JetStream client.
68 clientCfg := client.DefaultClientConfig()
69 clientCfg.WebsocketURL = cfg.JetstreamURL
70 clientCfg.WantedCollections = collections
71
72 // The handler closes over `st`, `knots` and the spindle hostname so
73 // the scheduler signature stays plain `func(ctx, *Event) error` and
74 // applyCommit can hand the knot consumer new sources as soon as
75 // matching repo records arrive.
76 handler := func(ctx context.Context, evt *jsmodels.Event) error {
77 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt)
78 }
79
80 // Re-attach the component-scoped logger so handler — which the
81 // scheduler invokes with the ctx we pass to ConnectAndRead — can
82 // pull it back out via loggerFrom.
83 ctx = loggerInto(ctx, logger)
84
85 // The sequential scheduler processes events one-at-a-time in arrival
86 // order. That's the right default for a spindle: ordering matters
87 // (e.g. a member-added event must apply before any record from that
88 // member is processed), and our event volume is tiny.
89 c, err := client.NewClient(
90 clientCfg,
91 logger,
92 sequential.NewScheduler("tack", logger, handler),
93 )
94 if err != nil {
95 return fmt.Errorf("new jetstream client: %w", err)
96 }
97
98 go func() {
99 for {
100 // We re-read the cursor from the store at every (re)connect so we
101 // pick up any progress the previous connection persisted before
102 // dying. nil means "start from now", which is the right default on
103 // a brand-new install or after a corrupt cursor read.
104 cur, err := st.LoadCursor(ctx)
105 if err != nil {
106 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err)
107 cur = nil
108 }
109 if cur != nil {
110 logger.Info("connecting to jetstream", "cursor_us", *cur)
111 } else {
112 logger.Info("connecting to jetstream from now (no cursor)")
113 }
114
115 // Reconnect loop. ConnectAndRead blocks on the websocket and returns
116 // either when the connection drops (transient network error, server
117 // restart, etc.) or when ctx is cancelled. On error we sleep briefly
118 // and reconnect; on ctx cancellation we exit cleanly.
119 if err := c.ConnectAndRead(ctx, cur); err != nil {
120 if ctx.Err() != nil {
121 return
122 }
123 logger.Error("jetstream read loop", "err", err)
124 time.Sleep(2 * time.Second)
125 continue
126 }
127 if ctx.Err() != nil {
128 return
129 }
130 }
131 }()
132
133 return nil
134}
135
136// handleJetstreamEvent is the per-event callback for the JetStream. It
137// applies the event to the store and advances the persisted cursor. Any
138// returned error is logged by the scheduler but does not tear down the
139// connection — the next event will retry the cursor write implicitly.
140func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error {
141 // We only care about commits, which are the actual record CRUD
142 // operations on a user's PDS. Account/identity events are ignored
143 // for now; if we ever care about handle changes we can add them.
144 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil {
145 return nil
146 }
147 logger := loggerFrom(ctx)
148
149 // Dispatch on collection. Unknown collections shouldn't happen given
150 // our wantedCollections filter, but be defensive — jetstream may
151 // send schema changes ahead of us updating the filter.
152 if err := applyCommit(ctx, st, knots, hostname, evt); err != nil {
153 logger.Error("apply commit",
154 "err", err,
155 "did", evt.Did,
156 "collection", evt.Commit.Collection,
157 "op", evt.Commit.Operation,
158 "rkey", evt.Commit.RKey,
159 )
160
161 // Fall through to cursor save: a single bad record shouldn't
162 // stall the cursor forever and force us to re-process every
163 // subsequent event after a restart.
164 }
165
166 // Advance the cursor. TimeUS is the jetstream-assigned microsecond
167 // timestamp; saving it after-apply means a crash mid-batch will at
168 // worst replay the failing event, never skip past it.
169 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil {
170 // Returning the error logs it; it doesn't kill the scheduler.
171 return fmt.Errorf("save cursor: %w", err)
172 }
173
174 return nil
175}
176
177// applyCommit routes a commit to the right store mutation based on its
178// collection NSID and operation.
179func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error {
180 c := evt.Commit
181 switch c.Collection {
182 case tangled.SpindleMemberNSID:
183 return applySpindleMember(ctx, st, evt.Did, c)
184 case tangled.RepoNSID:
185 return applyRepo(ctx, st, knots, hostname, evt.Did, c)
186 case tangled.RepoCollaboratorNSID:
187 return applyRepoCollaborator(ctx, st, evt.Did, c)
188 default:
189 // Server-side filter should prevent this, but log so we notice
190 // if jetstream ever changes behavior.
191 loggerFrom(ctx).Debug("ignoring unexpected collection",
192 "collection", c.Collection)
193 return nil
194 }
195}
196
197func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
198 switch c.Operation {
199 case jsOpCreate, jsOpUpdate:
200 var rec tangled.SpindleMember
201 if err := json.Unmarshal(c.Record, &rec); err != nil {
202 return fmt.Errorf("decode spindle.member: %w", err)
203 }
204 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt)
205 case jsOpDelete:
206 return st.DeleteSpindleMember(ctx, did, c.RKey)
207 }
208 return nil
209}
210
211func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error {
212 switch c.Operation {
213 case jsOpCreate, jsOpUpdate:
214 var rec tangled.Repo
215 if err := json.Unmarshal(c.Record, &rec); err != nil {
216 return fmt.Errorf("decode repo: %w", err)
217 }
218
219 // Capture the prior (knot, spindle) before the upsert so the
220 // post-mutation reconcile below can detect transitions like
221 // "repo used to point at us, no longer does" — which would
222 // otherwise leave a knot subscription dangling.
223 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
224 if err != nil {
225 return err
226 }
227
228 if err := st.UpsertRepo(ctx, did, c.RKey,
229 rec.Knot, rec.Name,
230 deref(rec.Spindle), deref(rec.RepoDid),
231 rec.CreatedAt,
232 ); err != nil {
233 return err
234 }
235
236 newSpindle := deref(rec.Spindle)
237 return reconcileKnot(ctx, st, knots, hostname,
238 oldKnot, oldSpindle,
239 rec.Knot, newSpindle,
240 )
241
242 case jsOpDelete:
243 // Same shape as the update path, just with no "new" side: we
244 // have to read the row out before deleting so we can decide
245 // whether deletion freed the last hold on a knot we'd been
246 // subscribed to.
247 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
248 if err != nil {
249 return err
250 }
251 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil {
252 return err
253 }
254 return reconcileKnot(ctx, st, knots, hostname,
255 oldKnot, oldSpindle,
256 "", "",
257 )
258 }
259 return nil
260}
261
262// reconcileKnot brings the knot consumer's subscriptions in line with
263// the latest store state after a single repo mutation. It is called
264// after the mutation has been applied so IsKnotWanted reflects the
265// post-mutation truth.
266//
267// Logic:
268// - If the new record names us as its spindle, ensure we're
269// subscribed to its knot (AddKnot is idempotent, so calling it on
270// an already-watched knot is cheap).
271// - If the old record named us as its spindle, check whether any
272// other repo still references that knot through us; if not,
273// RemoveKnot it. Skip this when the knot didn't change AND the
274// spindle didn't move away from us, because then nothing actually
275// released our hold.
276func reconcileKnot(
277 ctx context.Context,
278 st *store,
279 knots KnotConsumer,
280 hostname string,
281 oldKnot, oldSpindle string,
282 newKnot, newSpindle string,
283) error {
284 // Tests pass nil for the consumer when they only care about the
285 // store mutation half of the handler; tolerate that here so
286 // callers don't have to special-case it.
287 if knots == nil {
288 return nil
289 }
290
291 if newSpindle == hostname && newKnot != "" {
292 knots.AddKnot(ctx, newKnot)
293 }
294
295 // Did we just lose our claim on oldKnot? Two ways that can happen:
296 // the spindle field moved off of us, or the knot field moved to a
297 // different host. Either is a reason to consider unsubscribing
298 // from oldKnot — but only if no *other* repo still has us on it.
299 releasedOld := oldSpindle == hostname && oldKnot != "" &&
300 (newSpindle != hostname || newKnot != oldKnot)
301 if releasedOld {
302 stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot)
303 if err != nil {
304 return err
305 }
306 if !stillWanted {
307 knots.RemoveKnot(ctx, oldKnot)
308 }
309 }
310 return nil
311}
312
313func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
314 switch c.Operation {
315 case jsOpCreate, jsOpUpdate:
316 var rec tangled.RepoCollaborator
317 if err := json.Unmarshal(c.Record, &rec); err != nil {
318 return fmt.Errorf("decode repo.collaborator: %w", err)
319 }
320 return st.UpsertRepoCollaborator(ctx, did, c.RKey,
321 deref(rec.Repo), deref(rec.RepoDid),
322 rec.Subject, rec.CreatedAt,
323 )
324 case jsOpDelete:
325 return st.DeleteRepoCollaborator(ctx, did, c.RKey)
326 }
327 return nil
328}
329
330// deref returns the pointed-to string, or "" for nil. The lexicon types
331// model optional fields as *string; the store schema treats absent and
332// empty the same, so collapsing the two here keeps callers tidy.
333func deref(s *string) string {
334 if s == nil {
335 return ""
336 }
337 return *s
338}