Stitch any CI into Tangled
1package main
2
3// This file wires tack into the AT Protocol firehose via Bluesky's
4// "jetstream" — a JSON projection of the firehose served over a websocket
5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of
6// AT Proto: things like "this user is a spindle member", "this repo wants
7// this spindle", etc. are all atproto records published to users' PDSes,
8// and jetstream is how a service like a spindle observes them in real time.
9//
10// As a spindle, the records we care about are:
11//
12// - sh.tangled.spindle.member — owner authorizes a DID to use us
13// - sh.tangled.repo — a repo declares us as its spindle
14// - sh.tangled.repo.collaborator — collaborators on those repos
15//
16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over
17// jetstream; they're delivered by the knot servers via a separate event
18// stream. That is plumbed in separately.)
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "time"
26
27 "github.com/bluesky-social/jetstream/pkg/client"
28 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
29 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
30 "tangled.org/core/api/tangled"
31)
32
33// jetstream operation strings. The jetstream protocol publishes these as
34// the Commit.Operation field; pulling them out as constants keeps the
35// switch in handleJetstreamEvent honest about typos.
36const (
37 jsOpCreate = "create"
38 jsOpUpdate = "update"
39 jsOpDelete = "delete"
40)
41
42// jetstreamRewind is how far before the persisted cursor we resume on
43// reconnect. Jetstream's docs recommend rewinding a few seconds because
44// the cursor is a time-based filter and exact-boundary replay is not
45// guaranteed gapless across disconnects. The handler's mutations are
46// idempotent (UPSERTs and DELETEs keyed on (did, rkey)) so the small
47// amount of duplicate replay this introduces is harmless.
48const jetstreamRewind = 5 * time.Second
49
50// startJetstream dials the configured jetstream endpoint and spawns a
51// background goroutine that consumes events for the lifetime of ctx. It
52// returns once the client is constructed; connection errors surface in
53// logs, not return values, because the read loop is expected to reconnect
54// on its own.
55//
56// The store is used for two things: loading the persisted cursor so we
57// resume from the last seen event after a restart, and persisting
58// observed records so the rest of tack can answer membership questions
59// without re-reading the firehose.
60//
61// The logger is pulled from ctx (see log.go); falls back to slog.Default()
62// if none is attached.
63func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error {
64 logger := loggerFrom(ctx).With("component", "jetstream")
65
66 // `wantedCollections` is a server-side filter: jetstream will only send
67 // us commit events whose record collection (NSID) is in this list. The
68 // NSIDs come from tangled-core's generated lexicon types so they stay
69 // in sync with whatever the appview/knots are publishing.
70 collections := []string{
71 tangled.SpindleMemberNSID,
72 tangled.RepoNSID,
73 tangled.RepoCollaboratorNSID,
74 }
75
76 // Configure our JetStream client.
77 clientCfg := client.DefaultClientConfig()
78 clientCfg.WebsocketURL = cfg.JetstreamURL
79 clientCfg.WantedCollections = collections
80
81 // The handler closes over `st`, `knots` and the spindle hostname so
82 // the scheduler signature stays plain `func(ctx, *Event) error` and
83 // applyCommit can hand the knot consumer new sources as soon as
84 // matching repo records arrive.
85 handler := func(ctx context.Context, evt *jsmodels.Event) error {
86 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt)
87 }
88
89 // Re-attach the component-scoped logger so handler — which the
90 // scheduler invokes with the ctx we pass to ConnectAndRead — can
91 // pull it back out via loggerFrom.
92 ctx = loggerInto(ctx, logger)
93
94 // The sequential scheduler processes events one-at-a-time in arrival
95 // order. That's the right default for a spindle: ordering matters
96 // (e.g. a member-added event must apply before any record from that
97 // member is processed), and our event volume is tiny.
98 c, err := client.NewClient(
99 clientCfg,
100 logger,
101 sequential.NewScheduler("tack", logger, handler),
102 )
103 if err != nil {
104 return fmt.Errorf("new jetstream client: %w", err)
105 }
106
107 go func() {
108 for {
109 // We re-read the cursor from the store at every (re)connect so we
110 // pick up any progress the previous connection persisted before
111 // dying. nil means "start from now", which is the right default on
112 // a brand-new install or after a corrupt cursor read.
113 cur, err := st.LoadCursor(ctx)
114 if err != nil {
115 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err)
116 cur = nil
117 }
118 // Rewind a few seconds before the saved cursor on reconnect.
119 // Jetstream cursors are time-based and the docs explicitly note
120 // that exact-boundary replay is not guaranteed gapless, so a
121 // small negative buffer protects against missing events that
122 // straddle the disconnect. Duplicates the rewind produces are
123 // safe: every applyCommit path is an idempotent upsert/delete
124 // keyed on (did, rkey), and SaveCursor only moves forward in
125 // practice because TimeUS is monotonic.
126 if cur != nil {
127 rewound := *cur - int64(jetstreamRewind/time.Microsecond)
128 if rewound < 0 {
129 rewound = 0
130 }
131 logger.Info("connecting to jetstream",
132 "cursor_us", *cur,
133 "rewound_us", rewound,
134 "rewind", jetstreamRewind,
135 )
136 cur = &rewound
137 } else {
138 logger.Info("connecting to jetstream from now (no cursor)")
139 }
140
141 // Reconnect loop. ConnectAndRead blocks on the websocket and returns
142 // either when the connection drops (transient network error, server
143 // restart, etc.) or when ctx is cancelled. On error we sleep briefly
144 // and reconnect; on ctx cancellation we exit cleanly.
145 if err := c.ConnectAndRead(ctx, cur); err != nil {
146 if ctx.Err() != nil {
147 return
148 }
149 logger.Error("jetstream read loop", "err", err)
150 time.Sleep(2 * time.Second)
151 continue
152 }
153 if ctx.Err() != nil {
154 return
155 }
156 }
157 }()
158
159 return nil
160}
161
162// handleJetstreamEvent is the per-event callback for the JetStream. It
163// applies the event to the store and, when appropriate, advances the
164// persisted cursor. Any returned error is logged by the scheduler but
165// does not tear down the connection.
166//
167// Cursor-advancement policy:
168//
169// - Apply succeeded: advance the cursor.
170// - Apply failed with a badRecordError (malformed record / unusable
171// input): advance anyway, since replaying a permanently-broken
172// event on every reconnect would stall the firehose forever.
173// - Apply failed with anything else (treated as transient: store
174// hiccup, SQLite busy, disk full, shutdown race, ...): do NOT
175// advance. Saving the cursor here would permanently skip the
176// record, which for membership/repo state means we'd silently
177// lose a row that the rest of tack relies on. Leaving the cursor
178// in place gives the next reconnect (which rewinds by
179// jetstreamRewind) a chance to re-deliver and re-apply it.
180func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error {
181 // We only care about commits, which are the actual record CRUD
182 // operations on a user's PDS. Account/identity events are ignored
183 // for now; if we ever care about handle changes we can add them.
184 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil {
185 return nil
186 }
187 logger := loggerFrom(ctx)
188
189 // Dispatch on collection. Unknown collections shouldn't happen given
190 // our wantedCollections filter, but be defensive — jetstream may
191 // send schema changes ahead of us updating the filter.
192 applyErr := applyCommit(ctx, st, knots, hostname, evt)
193 if applyErr != nil {
194 logger.Error("apply commit",
195 "err", applyErr,
196 "did", evt.Did,
197 "collection", evt.Commit.Collection,
198 "op", evt.Commit.Operation,
199 "rkey", evt.Commit.RKey,
200 "transient", !isBadRecord(applyErr),
201 )
202
203 // Transient failure: bail without advancing the cursor so the
204 // next delivery can retry this event. Returning the error
205 // surfaces it in the scheduler's logs too.
206 if !isBadRecord(applyErr) {
207 return applyErr
208 }
209 // Otherwise (badRecordError) fall through to cursor save: a
210 // single bad record shouldn't stall the cursor forever and
211 // force us to re-process every subsequent event after a
212 // restart.
213 }
214
215 // Advance the cursor. TimeUS is the jetstream-assigned microsecond
216 // timestamp; saving it after-apply means a crash mid-batch will at
217 // worst replay the failing event, never skip past it.
218 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil {
219 // Returning the error logs it; it doesn't kill the scheduler.
220 return fmt.Errorf("save cursor: %w", err)
221 }
222
223 return nil
224}
225
226// badRecordError marks an applyCommit failure as caused by the *record
227// itself* being permanently unusable, e.g. a malformed JSON body, or a
228// field that violates an invariant we can't recover from on retry.
229//
230// We need this distinction because the jetstream cursor is time-based
231// and once advanced past an event we will never see it again. So:
232//
233// - Permanent bad-input failures should still advance the cursor:
234// replaying the same broken record on every restart accomplishes
235// nothing and would stall progress on every later event behind it.
236// - Transient infrastructure failures (SQLite busy, disk full, store
237// closed mid-shutdown, etc.) must NOT advance the cursor: the
238// record is fine and the next attempt, on reconnect or after the
239// transient condition clears, should reapply it. Skipping past
240// such a failure can permanently lose membership/repo state.
241//
242// Anything returned from applyCommit that isn't a badRecordError is
243// treated as transient by handleJetstreamEvent.
244type badRecordError struct{ err error }
245
246func (e *badRecordError) Error() string { return e.err.Error() }
247func (e *badRecordError) Unwrap() error { return e.err }
248
249// badRecord wraps err so handleJetstreamEvent recognizes it as a
250// permanent, cursor-advancing failure. Use this for any error caused by
251// the contents of the record (decode errors, schema violations); never
252// for store/IO errors.
253func badRecord(err error) error { return &badRecordError{err: err} }
254
255// isBadRecord reports whether err (or anything it wraps) is a
256// badRecordError.
257func isBadRecord(err error) bool {
258 var b *badRecordError
259 return errors.As(err, &b)
260}
261
262// applyCommit routes a commit to the right store mutation based on its
263// collection NSID and operation.
264func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error {
265 c := evt.Commit
266 switch c.Collection {
267 case tangled.SpindleMemberNSID:
268 return applySpindleMember(ctx, st, evt.Did, c)
269 case tangled.RepoNSID:
270 return applyRepo(ctx, st, knots, hostname, evt.Did, c)
271 case tangled.RepoCollaboratorNSID:
272 return applyRepoCollaborator(ctx, st, evt.Did, c)
273 default:
274 // Server-side filter should prevent this, but log so we notice
275 // if jetstream ever changes behavior.
276 loggerFrom(ctx).Debug("ignoring unexpected collection",
277 "collection", c.Collection)
278 return nil
279 }
280}
281
282func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
283 switch c.Operation {
284 case jsOpCreate, jsOpUpdate:
285 var rec tangled.SpindleMember
286 if err := json.Unmarshal(c.Record, &rec); err != nil {
287 // Decode failures are a permanent property of the record's
288 // bytes; mark as bad so the cursor can advance past it.
289 return badRecord(fmt.Errorf("decode spindle.member: %w", err))
290 }
291 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt)
292 case jsOpDelete:
293 return st.DeleteSpindleMember(ctx, did, c.RKey)
294 }
295 return nil
296}
297
298func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error {
299 switch c.Operation {
300 case jsOpCreate, jsOpUpdate:
301 var rec tangled.Repo
302 if err := json.Unmarshal(c.Record, &rec); err != nil {
303 // See applySpindleMember: decode errors are permanent.
304 return badRecord(fmt.Errorf("decode repo: %w", err))
305 }
306
307 // Capture the prior (knot, spindle) before the upsert so the
308 // post-mutation reconcile below can detect transitions like
309 // "repo used to point at us, no longer does" — which would
310 // otherwise leave a knot subscription dangling.
311 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
312 if err != nil {
313 return err
314 }
315
316 if err := st.UpsertRepo(ctx, did, c.RKey,
317 rec.Knot, rec.Name,
318 deref(rec.Spindle), deref(rec.RepoDid),
319 rec.CreatedAt,
320 ); err != nil {
321 return err
322 }
323
324 newSpindle := deref(rec.Spindle)
325 return reconcileKnot(ctx, st, knots, hostname,
326 oldKnot, oldSpindle,
327 rec.Knot, newSpindle,
328 )
329
330 case jsOpDelete:
331 // Same shape as the update path, just with no "new" side: we
332 // have to read the row out before deleting so we can decide
333 // whether deletion freed the last hold on a knot we'd been
334 // subscribed to.
335 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
336 if err != nil {
337 return err
338 }
339 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil {
340 return err
341 }
342 return reconcileKnot(ctx, st, knots, hostname,
343 oldKnot, oldSpindle,
344 "", "",
345 )
346 }
347 return nil
348}
349
350// reconcileKnot brings the knot consumer's subscriptions in line with
351// the latest store state after a single repo mutation. It is called
352// after the mutation has been applied so IsKnotWanted reflects the
353// post-mutation truth.
354//
355// Logic:
356// - If the new record names us as its spindle, ensure we're
357// subscribed to its knot (AddKnot is idempotent, so calling it on
358// an already-watched knot is cheap).
359// - If the old record named us as its spindle, check whether any
360// other repo still references that knot through us; if not,
361// RemoveKnot it. Skip this when the knot didn't change AND the
362// spindle didn't move away from us, because then nothing actually
363// released our hold.
364func reconcileKnot(
365 ctx context.Context,
366 st *store,
367 knots KnotConsumer,
368 hostname string,
369 oldKnot, oldSpindle string,
370 newKnot, newSpindle string,
371) error {
372 // Tests pass nil for the consumer when they only care about the
373 // store mutation half of the handler; tolerate that here so
374 // callers don't have to special-case it.
375 if knots == nil {
376 return nil
377 }
378
379 if newSpindle == hostname && newKnot != "" {
380 knots.AddKnot(ctx, newKnot)
381 }
382
383 // Did we just lose our claim on oldKnot? Two ways that can happen:
384 // the spindle field moved off of us, or the knot field moved to a
385 // different host. Either is a reason to consider unsubscribing
386 // from oldKnot — but only if no *other* repo still has us on it.
387 releasedOld := oldSpindle == hostname && oldKnot != "" &&
388 (newSpindle != hostname || newKnot != oldKnot)
389 if releasedOld {
390 stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot)
391 if err != nil {
392 return err
393 }
394 if !stillWanted {
395 knots.RemoveKnot(ctx, oldKnot)
396 }
397 }
398 return nil
399}
400
401func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
402 switch c.Operation {
403 case jsOpCreate, jsOpUpdate:
404 var rec tangled.RepoCollaborator
405 if err := json.Unmarshal(c.Record, &rec); err != nil {
406 // See applySpindleMember: decode errors are permanent.
407 return badRecord(fmt.Errorf("decode repo.collaborator: %w", err))
408 }
409 return st.UpsertRepoCollaborator(ctx, did, c.RKey,
410 deref(rec.Repo), deref(rec.RepoDid),
411 rec.Subject, rec.CreatedAt,
412 )
413 case jsOpDelete:
414 return st.DeleteRepoCollaborator(ctx, did, c.RKey)
415 }
416 return nil
417}
418
419// deref returns the pointed-to string, or "" for nil. The lexicon types
420// model optional fields as *string; the store schema treats absent and
421// empty the same, so collapsing the two here keeps callers tidy.
422func deref(s *string) string {
423 if s == nil {
424 return ""
425 }
426 return *s
427}