Stitch any CI into Tangled
1package main
2
3// This file wires tack into the AT Protocol firehose via Bluesky's
4// "jetstream" — a JSON projection of the firehose served over a websocket
5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of
6// AT Proto: things like "this user is a spindle member", "this repo wants
7// this spindle", etc. are all atproto records published to users' PDSes,
8// and jetstream is how a service like a spindle observes them in real time.
9//
10// As a spindle, the records we care about are:
11//
12// - sh.tangled.spindle.member — owner authorizes a DID to use us
13// - sh.tangled.repo — a repo declares us as its spindle
14// - sh.tangled.repo.collaborator — collaborators on those repos
15//
16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over
17// jetstream; they're delivered by the knot servers via a separate event
18// stream. That is plumbed in separately.)
19
20import (
21 "context"
22 "encoding/json"
23 "fmt"
24 "time"
25
26 "github.com/bluesky-social/jetstream/pkg/client"
27 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
28 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
29 "tangled.org/core/api/tangled"
30)
31
32// jetstream operation strings. The jetstream protocol publishes these as
33// the Commit.Operation field; pulling them out as constants keeps the
34// switch in handleJetstreamEvent honest about typos.
35const (
36 jsOpCreate = "create"
37 jsOpUpdate = "update"
38 jsOpDelete = "delete"
39)
40
41// jetstreamRewind is how far before the persisted cursor we resume on
42// reconnect. Jetstream's docs recommend rewinding a few seconds because
43// the cursor is a time-based filter and exact-boundary replay is not
44// guaranteed gapless across disconnects. The handler's mutations are
45// idempotent (UPSERTs and DELETEs keyed on (did, rkey)) so the small
46// amount of duplicate replay this introduces is harmless.
47const jetstreamRewind = 5 * time.Second
48
49// startJetstream dials the configured jetstream endpoint and spawns a
50// background goroutine that consumes events for the lifetime of ctx. It
51// returns once the client is constructed; connection errors surface in
52// logs, not return values, because the read loop is expected to reconnect
53// on its own.
54//
55// The store is used for two things: loading the persisted cursor so we
56// resume from the last seen event after a restart, and persisting
57// observed records so the rest of tack can answer membership questions
58// without re-reading the firehose.
59//
60// The logger is pulled from ctx (see log.go); falls back to slog.Default()
61// if none is attached.
62func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error {
63 logger := loggerFrom(ctx).With("component", "jetstream")
64
65 // `wantedCollections` is a server-side filter: jetstream will only send
66 // us commit events whose record collection (NSID) is in this list. The
67 // NSIDs come from tangled-core's generated lexicon types so they stay
68 // in sync with whatever the appview/knots are publishing.
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 tangled.RepoCollaboratorNSID,
73 }
74
75 // Configure our JetStream client.
76 clientCfg := client.DefaultClientConfig()
77 clientCfg.WebsocketURL = cfg.JetstreamURL
78 clientCfg.WantedCollections = collections
79
80 // The handler closes over `st`, `knots` and the spindle hostname so
81 // the scheduler signature stays plain `func(ctx, *Event) error` and
82 // applyCommit can hand the knot consumer new sources as soon as
83 // matching repo records arrive.
84 handler := func(ctx context.Context, evt *jsmodels.Event) error {
85 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt)
86 }
87
88 // Re-attach the component-scoped logger so handler — which the
89 // scheduler invokes with the ctx we pass to ConnectAndRead — can
90 // pull it back out via loggerFrom.
91 ctx = loggerInto(ctx, logger)
92
93 // The sequential scheduler processes events one-at-a-time in arrival
94 // order. That's the right default for a spindle: ordering matters
95 // (e.g. a member-added event must apply before any record from that
96 // member is processed), and our event volume is tiny.
97 c, err := client.NewClient(
98 clientCfg,
99 logger,
100 sequential.NewScheduler("tack", logger, handler),
101 )
102 if err != nil {
103 return fmt.Errorf("new jetstream client: %w", err)
104 }
105
106 go func() {
107 for {
108 // We re-read the cursor from the store at every (re)connect so we
109 // pick up any progress the previous connection persisted before
110 // dying. nil means "start from now", which is the right default on
111 // a brand-new install or after a corrupt cursor read.
112 cur, err := st.LoadCursor(ctx)
113 if err != nil {
114 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err)
115 cur = nil
116 }
117 // Rewind a few seconds before the saved cursor on reconnect.
118 // Jetstream cursors are time-based and the docs explicitly note
119 // that exact-boundary replay is not guaranteed gapless, so a
120 // small negative buffer protects against missing events that
121 // straddle the disconnect. Duplicates the rewind produces are
122 // safe: every applyCommit path is an idempotent upsert/delete
123 // keyed on (did, rkey), and SaveCursor only moves forward in
124 // practice because TimeUS is monotonic.
125 if cur != nil {
126 rewound := *cur - int64(jetstreamRewind/time.Microsecond)
127 if rewound < 0 {
128 rewound = 0
129 }
130 logger.Info("connecting to jetstream",
131 "cursor_us", *cur,
132 "rewound_us", rewound,
133 "rewind", jetstreamRewind,
134 )
135 cur = &rewound
136 } else {
137 logger.Info("connecting to jetstream from now (no cursor)")
138 }
139
140 // Reconnect loop. ConnectAndRead blocks on the websocket and returns
141 // either when the connection drops (transient network error, server
142 // restart, etc.) or when ctx is cancelled. On error we sleep briefly
143 // and reconnect; on ctx cancellation we exit cleanly.
144 if err := c.ConnectAndRead(ctx, cur); err != nil {
145 if ctx.Err() != nil {
146 return
147 }
148 logger.Error("jetstream read loop", "err", err)
149 time.Sleep(2 * time.Second)
150 continue
151 }
152 if ctx.Err() != nil {
153 return
154 }
155 }
156 }()
157
158 return nil
159}
160
161// handleJetstreamEvent is the per-event callback for the JetStream. It
162// applies the event to the store and advances the persisted cursor. Any
163// returned error is logged by the scheduler but does not tear down the
164// connection — the next event will retry the cursor write implicitly.
165func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error {
166 // We only care about commits, which are the actual record CRUD
167 // operations on a user's PDS. Account/identity events are ignored
168 // for now; if we ever care about handle changes we can add them.
169 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil {
170 return nil
171 }
172 logger := loggerFrom(ctx)
173
174 // Dispatch on collection. Unknown collections shouldn't happen given
175 // our wantedCollections filter, but be defensive — jetstream may
176 // send schema changes ahead of us updating the filter.
177 if err := applyCommit(ctx, st, knots, hostname, evt); err != nil {
178 logger.Error("apply commit",
179 "err", err,
180 "did", evt.Did,
181 "collection", evt.Commit.Collection,
182 "op", evt.Commit.Operation,
183 "rkey", evt.Commit.RKey,
184 )
185
186 // Fall through to cursor save: a single bad record shouldn't
187 // stall the cursor forever and force us to re-process every
188 // subsequent event after a restart.
189 }
190
191 // Advance the cursor. TimeUS is the jetstream-assigned microsecond
192 // timestamp; saving it after-apply means a crash mid-batch will at
193 // worst replay the failing event, never skip past it.
194 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil {
195 // Returning the error logs it; it doesn't kill the scheduler.
196 return fmt.Errorf("save cursor: %w", err)
197 }
198
199 return nil
200}
201
202// applyCommit routes a commit to the right store mutation based on its
203// collection NSID and operation.
204func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error {
205 c := evt.Commit
206 switch c.Collection {
207 case tangled.SpindleMemberNSID:
208 return applySpindleMember(ctx, st, evt.Did, c)
209 case tangled.RepoNSID:
210 return applyRepo(ctx, st, knots, hostname, evt.Did, c)
211 case tangled.RepoCollaboratorNSID:
212 return applyRepoCollaborator(ctx, st, evt.Did, c)
213 default:
214 // Server-side filter should prevent this, but log so we notice
215 // if jetstream ever changes behavior.
216 loggerFrom(ctx).Debug("ignoring unexpected collection",
217 "collection", c.Collection)
218 return nil
219 }
220}
221
222func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
223 switch c.Operation {
224 case jsOpCreate, jsOpUpdate:
225 var rec tangled.SpindleMember
226 if err := json.Unmarshal(c.Record, &rec); err != nil {
227 return fmt.Errorf("decode spindle.member: %w", err)
228 }
229 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt)
230 case jsOpDelete:
231 return st.DeleteSpindleMember(ctx, did, c.RKey)
232 }
233 return nil
234}
235
236func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error {
237 switch c.Operation {
238 case jsOpCreate, jsOpUpdate:
239 var rec tangled.Repo
240 if err := json.Unmarshal(c.Record, &rec); err != nil {
241 return fmt.Errorf("decode repo: %w", err)
242 }
243
244 // Capture the prior (knot, spindle) before the upsert so the
245 // post-mutation reconcile below can detect transitions like
246 // "repo used to point at us, no longer does" — which would
247 // otherwise leave a knot subscription dangling.
248 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
249 if err != nil {
250 return err
251 }
252
253 if err := st.UpsertRepo(ctx, did, c.RKey,
254 rec.Knot, rec.Name,
255 deref(rec.Spindle), deref(rec.RepoDid),
256 rec.CreatedAt,
257 ); err != nil {
258 return err
259 }
260
261 newSpindle := deref(rec.Spindle)
262 return reconcileKnot(ctx, st, knots, hostname,
263 oldKnot, oldSpindle,
264 rec.Knot, newSpindle,
265 )
266
267 case jsOpDelete:
268 // Same shape as the update path, just with no "new" side: we
269 // have to read the row out before deleting so we can decide
270 // whether deletion freed the last hold on a knot we'd been
271 // subscribed to.
272 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
273 if err != nil {
274 return err
275 }
276 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil {
277 return err
278 }
279 return reconcileKnot(ctx, st, knots, hostname,
280 oldKnot, oldSpindle,
281 "", "",
282 )
283 }
284 return nil
285}
286
287// reconcileKnot brings the knot consumer's subscriptions in line with
288// the latest store state after a single repo mutation. It is called
289// after the mutation has been applied so IsKnotWanted reflects the
290// post-mutation truth.
291//
292// Logic:
293// - If the new record names us as its spindle, ensure we're
294// subscribed to its knot (AddKnot is idempotent, so calling it on
295// an already-watched knot is cheap).
296// - If the old record named us as its spindle, check whether any
297// other repo still references that knot through us; if not,
298// RemoveKnot it. Skip this when the knot didn't change AND the
299// spindle didn't move away from us, because then nothing actually
300// released our hold.
301func reconcileKnot(
302 ctx context.Context,
303 st *store,
304 knots KnotConsumer,
305 hostname string,
306 oldKnot, oldSpindle string,
307 newKnot, newSpindle string,
308) error {
309 // Tests pass nil for the consumer when they only care about the
310 // store mutation half of the handler; tolerate that here so
311 // callers don't have to special-case it.
312 if knots == nil {
313 return nil
314 }
315
316 if newSpindle == hostname && newKnot != "" {
317 knots.AddKnot(ctx, newKnot)
318 }
319
320 // Did we just lose our claim on oldKnot? Two ways that can happen:
321 // the spindle field moved off of us, or the knot field moved to a
322 // different host. Either is a reason to consider unsubscribing
323 // from oldKnot — but only if no *other* repo still has us on it.
324 releasedOld := oldSpindle == hostname && oldKnot != "" &&
325 (newSpindle != hostname || newKnot != oldKnot)
326 if releasedOld {
327 stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot)
328 if err != nil {
329 return err
330 }
331 if !stillWanted {
332 knots.RemoveKnot(ctx, oldKnot)
333 }
334 }
335 return nil
336}
337
338func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
339 switch c.Operation {
340 case jsOpCreate, jsOpUpdate:
341 var rec tangled.RepoCollaborator
342 if err := json.Unmarshal(c.Record, &rec); err != nil {
343 return fmt.Errorf("decode repo.collaborator: %w", err)
344 }
345 return st.UpsertRepoCollaborator(ctx, did, c.RKey,
346 deref(rec.Repo), deref(rec.RepoDid),
347 rec.Subject, rec.CreatedAt,
348 )
349 case jsOpDelete:
350 return st.DeleteRepoCollaborator(ctx, did, c.RKey)
351 }
352 return nil
353}
354
355// deref returns the pointed-to string, or "" for nil. The lexicon types
356// model optional fields as *string; the store schema treats absent and
357// empty the same, so collapsing the two here keeps callers tidy.
358func deref(s *string) string {
359 if s == nil {
360 return ""
361 }
362 return *s
363}