Stitch any CI into Tangled
1package main
2
3// This file wires tack into the AT Protocol firehose via Bluesky's
4// "jetstream" — a JSON projection of the firehose served over a websocket
5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of
6// AT Proto: things like "this user is a spindle member", "this repo wants
7// this spindle", etc. are all atproto records published to users' PDSes,
8// and jetstream is how a service like a spindle observes them in real time.
9//
10// As a spindle, the records we care about are:
11//
12// - sh.tangled.spindle.member — owner authorizes a DID to use us
13// - sh.tangled.repo — a repo declares us as its spindle
14// - sh.tangled.repo.collaborator — collaborators on those repos
15//
16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over
17// jetstream; they're delivered by the knot servers via a separate event
18// stream. That is plumbed in separately.)
19
20import (
21 "context"
22 "encoding/json"
23 "fmt"
24
25 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
26 js "go.mitchellh.com/tack/internal/jetstream"
27 "tangled.org/core/api/tangled"
28)
29
30// jetstream operation strings. The jetstream protocol publishes these as
31// the Commit.Operation field; pulling them out as constants keeps the
32// switch in applyCommit honest about typos.
33const (
34 jsOpCreate = "create"
35 jsOpUpdate = "update"
36 jsOpDelete = "delete"
37)
38
39var _ js.CursorStore = (*store)(nil)
40
41// jetstreamCollections is the server-side and local filter for the Tangled
42// records tack mirrors out of jetstream.
43var jetstreamCollections = []string{
44 tangled.SpindleMemberNSID,
45 tangled.RepoNSID,
46 tangled.RepoCollaboratorNSID,
47}
48
49// startJetstream dials the configured jetstream endpoint and spawns a
50// background goroutine that consumes events for the lifetime of ctx. It
51// returns once the client is constructed; connection errors surface in
52// logs, not return values, because the read loop is expected to reconnect
53// on its own.
54//
55// The store is used for two things: loading the persisted cursor so we
56// resume from the last seen event after a restart, and persisting
57// observed records so the rest of tack can answer membership questions
58// without re-reading the firehose.
59//
60// The logger is pulled from ctx (see log.go); falls back to slog.Default()
61// if none is attached.
62func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error {
63 logger := loggerFrom(ctx).With("component", "jetstream")
64
65 // The handler closes over `st`, `knots`, the spindle hostname and
66 // the owner DID so the scheduler signature stays plain
67 // `func(ctx, *Event) error` and applyCommit can hand the knot
68 // consumer new sources as soon as matching repo records arrive
69 // (gated on the publisher being an authorized actor).
70 handler := js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error {
71 return applyCommit(ctx, st, knots, cfg.Hostname, cfg.OwnerDID, evt)
72 })
73
74 // Re-attach the component-scoped logger so handler — which the
75 // consumer invokes with the ctx we pass to ConnectAndRead — can pull
76 // it back out via loggerFrom.
77 ctx = loggerInto(ctx, logger)
78
79 _, err := js.Start(ctx, js.Config{
80 WebsocketURL: cfg.JetstreamURL,
81 Collections: jetstreamCollections,
82 CursorStore: st,
83 Handler: handler,
84 Logger: logger,
85 SchedulerIdent: "tack",
86 })
87 return err
88}
89
90// applyCommit routes a commit to the right store mutation based on its
91// collection NSID and operation.
92func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID string, evt *jsmodels.Event) error {
93 c := evt.Commit
94 switch c.Collection {
95 case tangled.SpindleMemberNSID:
96 return applySpindleMember(ctx, st, knots, hostname, ownerDID, evt.Did, c)
97 case tangled.RepoNSID:
98 return applyRepo(ctx, st, knots, hostname, ownerDID, evt.Did, c)
99 case tangled.RepoCollaboratorNSID:
100 return applyRepoCollaborator(ctx, st, evt.Did, c)
101 default:
102 // Server-side filter should prevent this, but log so we notice
103 // if jetstream ever changes behavior.
104 loggerFrom(ctx).Debug("ignoring unexpected collection",
105 "collection", c.Collection)
106 return nil
107 }
108}
109
110func applySpindleMember(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID, did string, c *jsmodels.Commit) error {
111 switch c.Operation {
112 case jsOpCreate, jsOpUpdate:
113 // Capture the previous subject. Necessary because a same-rkey
114 // update can move the grant to a different DID, in which case
115 // the *old* subject's knots may need to be released even as
116 // the new subject's are picked up.
117 oldSubject, err := st.GetSpindleMember(ctx, did, c.RKey)
118 if err != nil {
119 return err
120 }
121
122 var rec tangled.SpindleMember
123 if err := json.Unmarshal(c.Record, &rec); err != nil {
124 // Decode failures are a permanent property of the record's
125 // bytes; mark as bad so the cursor can advance past it.
126 return js.BadRecord(fmt.Errorf("decode spindle.member: %w", err))
127 }
128 if err := st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt); err != nil {
129 return err
130 }
131
132 // Only grants published by the spindle owner actually change
133 // authorization (see IsAuthorizedActor). Forged grants are
134 // stored but don't move the needle, so don't bother
135 // reconciling on them: it would be a no-op at best and
136 // add log noise.
137 if did != ownerDID {
138 return nil
139 }
140 if oldSubject != "" && oldSubject != rec.Subject {
141 if err := reconcileMember(ctx, st, knots, hostname, ownerDID, oldSubject); err != nil {
142 return err
143 }
144 }
145 return reconcileMember(ctx, st, knots, hostname, ownerDID, rec.Subject)
146
147 case jsOpDelete:
148 oldSubject, err := st.GetSpindleMember(ctx, did, c.RKey)
149 if err != nil {
150 return err
151 }
152 if err := st.DeleteSpindleMember(ctx, did, c.RKey); err != nil {
153 return err
154 }
155 // As above: only the owner's grants matter for authorization.
156 if did != ownerDID || oldSubject == "" {
157 return nil
158 }
159 return reconcileMember(ctx, st, knots, hostname, ownerDID, oldSubject)
160 }
161 return nil
162}
163
164func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID, did string, c *jsmodels.Commit) error {
165 switch c.Operation {
166 case jsOpCreate, jsOpUpdate:
167 var rec tangled.Repo
168 if err := json.Unmarshal(c.Record, &rec); err != nil {
169 // See applySpindleMember: decode errors are permanent.
170 return js.BadRecord(fmt.Errorf("decode repo: %w", err))
171 }
172
173 // Capture the prior (knot, spindle) before the upsert so the
174 // post-mutation reconcile below can detect transitions like
175 // "repo used to point at us, no longer does" — which would
176 // otherwise leave a knot subscription dangling.
177 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
178 if err != nil {
179 return err
180 }
181
182 if err := st.UpsertRepo(ctx, did, c.RKey,
183 rec.Knot, rec.Name,
184 deref(rec.Spindle), deref(rec.RepoDid),
185 rec.CreatedAt,
186 ); err != nil {
187 return err
188 }
189
190 newSpindle := deref(rec.Spindle)
191 return reconcileKnot(ctx, st, knots, hostname, ownerDID, did,
192 oldKnot, oldSpindle,
193 rec.Knot, newSpindle,
194 )
195
196 case jsOpDelete:
197 // Same shape as the update path, just with no "new" side: we
198 // have to read the row out before deleting so we can decide
199 // whether deletion freed the last hold on a knot we'd been
200 // subscribed to.
201 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
202 if err != nil {
203 return err
204 }
205 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil {
206 return err
207 }
208 return reconcileKnot(ctx, st, knots, hostname, ownerDID, did,
209 oldKnot, oldSpindle,
210 "", "",
211 )
212 }
213 return nil
214}
215
216// reconcileKnot brings the knot consumer's subscriptions in line with
217// the latest store state after a single repo mutation. It is called
218// after the mutation has been applied so IsKnotWanted reflects the
219// post-mutation truth.
220//
221// Logic:
222// - If the new record names us as its spindle AND the publisher is
223// an authorized actor (spindle owner or owner-vouched member),
224// ensure we're subscribed to its knot. Without the membership
225// check, any firehose publisher could pin us to an attacker-chosen
226// knot just by publishing a sh.tangled.repo record naming us.
227// - If the old record named us as its spindle, check whether any
228// other authorized repo still references that knot through us;
229// if not, RemoveKnot it. Skip this when the knot didn't change
230// AND the spindle didn't move away from us, because then nothing
231// actually released our hold.
232func reconcileKnot(
233 ctx context.Context,
234 st *store,
235 knots KnotConsumer,
236 hostname, ownerDID, publisherDID string,
237 oldKnot, oldSpindle string,
238 newKnot, newSpindle string,
239) error {
240 // Tests pass nil for the consumer when they only care about the
241 // store mutation half of the handler; tolerate that here so
242 // callers don't have to special-case it.
243 if knots == nil {
244 return nil
245 }
246
247 if newSpindle == hostname && newKnot != "" {
248 ok, err := st.IsAuthorizedActor(ctx, ownerDID, publisherDID)
249 if err != nil {
250 return err
251 }
252 if ok {
253 knots.AddKnot(ctx, newKnot)
254 } else {
255 loggerFrom(ctx).Warn("ignoring repo from unauthorized publisher",
256 "publisher_did", publisherDID,
257 "knot", newKnot,
258 )
259 }
260 }
261
262 // Did we just lose our claim on oldKnot? Two ways that can happen:
263 // the spindle field moved off of us, or the knot field moved to a
264 // different host. Either is a reason to consider unsubscribing
265 // from oldKnot, but only if no *other* authorized repo still has
266 // us on it.
267 releasedOld := oldSpindle == hostname && oldKnot != "" &&
268 (newSpindle != hostname || newKnot != oldKnot)
269 if releasedOld {
270 stillWanted, err := st.IsKnotWanted(ctx, hostname, ownerDID, oldKnot)
271 if err != nil {
272 return err
273 }
274 if !stillWanted {
275 knots.RemoveKnot(ctx, oldKnot)
276 }
277 }
278 return nil
279}
280
281// reconcileMember adjusts knot subscriptions after a membership grant
282// or revocation may have changed `subject`'s authorization status.
283// For each knot named by subject's repos that point at us:
284//
285// - if subject is now authorized, AddKnot (idempotent: already-
286// subscribed knots are no-ops in the consumer);
287// - if subject is now unauthorized, ask IsKnotWanted whether any
288// *other* authorized repo still holds the knot; if not,
289// RemoveKnot.
290//
291// Without this, a member's repos picked up over the firehose before
292// the grant arrived would never get subscribed (the grant doesn't
293// re-deliver the older repo events), and a revocation would leave
294// the now-unauthorized publisher's knot subscribed until restart.
295func reconcileMember(
296 ctx context.Context,
297 st *store,
298 knots KnotConsumer,
299 hostname, ownerDID, subject string,
300) error {
301 if knots == nil || subject == "" {
302 return nil
303 }
304 knotsForSubject, err := st.KnotsForOwner(ctx, hostname, subject)
305 if err != nil {
306 return err
307 }
308 if len(knotsForSubject) == 0 {
309 return nil
310 }
311 authorized, err := st.IsAuthorizedActor(ctx, ownerDID, subject)
312 if err != nil {
313 return err
314 }
315 for _, k := range knotsForSubject {
316 if authorized {
317 knots.AddKnot(ctx, k)
318 continue
319 }
320 stillWanted, err := st.IsKnotWanted(ctx, hostname, ownerDID, k)
321 if err != nil {
322 return err
323 }
324 if !stillWanted {
325 knots.RemoveKnot(ctx, k)
326 }
327 }
328 return nil
329}
330
331func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
332 switch c.Operation {
333 case jsOpCreate, jsOpUpdate:
334 var rec tangled.RepoCollaborator
335 if err := json.Unmarshal(c.Record, &rec); err != nil {
336 // See applySpindleMember: decode errors are permanent.
337 return js.BadRecord(fmt.Errorf("decode repo.collaborator: %w", err))
338 }
339 return st.UpsertRepoCollaborator(ctx, did, c.RKey,
340 deref(rec.Repo), deref(rec.RepoDid),
341 rec.Subject, rec.CreatedAt,
342 )
343 case jsOpDelete:
344 return st.DeleteRepoCollaborator(ctx, did, c.RKey)
345 }
346 return nil
347}
348
349// deref returns the pointed-to string, or "" for nil. The lexicon types
350// model optional fields as *string; the store schema treats absent and
351// empty the same, so collapsing the two here keeps callers tidy.
352func deref(s *string) string {
353 if s == nil {
354 return ""
355 }
356 return *s
357}