Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// This file wires tack into the AT Protocol firehose via Bluesky's 4// "jetstream" — a JSON projection of the firehose served over a websocket 5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of 6// AT Proto: things like "this user is a spindle member", "this repo wants 7// this spindle", etc. are all atproto records published to users' PDSes, 8// and jetstream is how a service like a spindle observes them in real time. 9// 10// As a spindle, the records we care about are: 11// 12// - sh.tangled.spindle.member — owner authorizes a DID to use us 13// - sh.tangled.repo — a repo declares us as its spindle 14// - sh.tangled.repo.collaborator — collaborators on those repos 15// 16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over 17// jetstream; they're delivered by the knot servers via a separate event 18// stream. That is plumbed in separately.) 19 20import ( 21 "context" 22 "encoding/json" 23 "fmt" 24 25 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 26 js "go.mitchellh.com/tack/internal/jetstream" 27 "tangled.org/core/api/tangled" 28) 29 30// jetstream operation strings. The jetstream protocol publishes these as 31// the Commit.Operation field; pulling them out as constants keeps the 32// switch in applyCommit honest about typos. 33const ( 34 jsOpCreate = "create" 35 jsOpUpdate = "update" 36 jsOpDelete = "delete" 37) 38 39var _ js.CursorStore = (*store)(nil) 40 41// jetstreamCollections is the server-side and local filter for the Tangled 42// records tack mirrors out of jetstream. 43var jetstreamCollections = []string{ 44 tangled.SpindleMemberNSID, 45 tangled.RepoNSID, 46 tangled.RepoCollaboratorNSID, 47} 48 49// startJetstream dials the configured jetstream endpoint and spawns a 50// background goroutine that consumes events for the lifetime of ctx. It 51// returns once the client is constructed; connection errors surface in 52// logs, not return values, because the read loop is expected to reconnect 53// on its own. 54// 55// The store is used for two things: loading the persisted cursor so we 56// resume from the last seen event after a restart, and persisting 57// observed records so the rest of tack can answer membership questions 58// without re-reading the firehose. 59// 60// The logger is pulled from ctx (see log.go); falls back to slog.Default() 61// if none is attached. 62func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 63 logger := loggerFrom(ctx).With("component", "jetstream") 64 65 // The handler closes over `st`, `knots`, the spindle hostname and 66 // the owner DID so the scheduler signature stays plain 67 // `func(ctx, *Event) error` and applyCommit can hand the knot 68 // consumer new sources as soon as matching repo records arrive 69 // (gated on the publisher being an authorized actor). 70 handler := js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error { 71 return applyCommit(ctx, st, knots, cfg.Hostname, cfg.OwnerDID, evt) 72 }) 73 74 // Re-attach the component-scoped logger so handler — which the 75 // consumer invokes with the ctx we pass to ConnectAndRead — can pull 76 // it back out via loggerFrom. 77 ctx = loggerInto(ctx, logger) 78 79 _, err := js.Start(ctx, js.Config{ 80 WebsocketURL: cfg.JetstreamURL, 81 Collections: jetstreamCollections, 82 CursorStore: st, 83 Handler: handler, 84 Logger: logger, 85 SchedulerIdent: "tack", 86 }) 87 return err 88} 89 90// applyCommit routes a commit to the right store mutation based on its 91// collection NSID and operation. 92func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID string, evt *jsmodels.Event) error { 93 c := evt.Commit 94 switch c.Collection { 95 case tangled.SpindleMemberNSID: 96 return applySpindleMember(ctx, st, knots, hostname, ownerDID, evt.Did, c) 97 case tangled.RepoNSID: 98 return applyRepo(ctx, st, knots, hostname, ownerDID, evt.Did, c) 99 case tangled.RepoCollaboratorNSID: 100 return applyRepoCollaborator(ctx, st, evt.Did, c) 101 default: 102 // Server-side filter should prevent this, but log so we notice 103 // if jetstream ever changes behavior. 104 loggerFrom(ctx).Debug("ignoring unexpected collection", 105 "collection", c.Collection) 106 return nil 107 } 108} 109 110func applySpindleMember(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID, did string, c *jsmodels.Commit) error { 111 switch c.Operation { 112 case jsOpCreate, jsOpUpdate: 113 // Capture the previous subject. Necessary because a same-rkey 114 // update can move the grant to a different DID, in which case 115 // the *old* subject's knots may need to be released even as 116 // the new subject's are picked up. 117 oldSubject, err := st.GetSpindleMember(ctx, did, c.RKey) 118 if err != nil { 119 return err 120 } 121 122 var rec tangled.SpindleMember 123 if err := json.Unmarshal(c.Record, &rec); err != nil { 124 // Decode failures are a permanent property of the record's 125 // bytes; mark as bad so the cursor can advance past it. 126 return js.BadRecord(fmt.Errorf("decode spindle.member: %w", err)) 127 } 128 if err := st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt); err != nil { 129 return err 130 } 131 132 // Only grants published by the spindle owner actually change 133 // authorization (see IsAuthorizedActor). Forged grants are 134 // stored but don't move the needle, so don't bother 135 // reconciling on them: it would be a no-op at best and 136 // add log noise. 137 if did != ownerDID { 138 return nil 139 } 140 if oldSubject != "" && oldSubject != rec.Subject { 141 if err := reconcileMember(ctx, st, knots, hostname, ownerDID, oldSubject); err != nil { 142 return err 143 } 144 } 145 return reconcileMember(ctx, st, knots, hostname, ownerDID, rec.Subject) 146 147 case jsOpDelete: 148 oldSubject, err := st.GetSpindleMember(ctx, did, c.RKey) 149 if err != nil { 150 return err 151 } 152 if err := st.DeleteSpindleMember(ctx, did, c.RKey); err != nil { 153 return err 154 } 155 // As above: only the owner's grants matter for authorization. 156 if did != ownerDID || oldSubject == "" { 157 return nil 158 } 159 return reconcileMember(ctx, st, knots, hostname, ownerDID, oldSubject) 160 } 161 return nil 162} 163 164func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID, did string, c *jsmodels.Commit) error { 165 switch c.Operation { 166 case jsOpCreate, jsOpUpdate: 167 var rec tangled.Repo 168 if err := json.Unmarshal(c.Record, &rec); err != nil { 169 // See applySpindleMember: decode errors are permanent. 170 return js.BadRecord(fmt.Errorf("decode repo: %w", err)) 171 } 172 173 // Capture the prior (knot, spindle) before the upsert so the 174 // post-mutation reconcile below can detect transitions like 175 // "repo used to point at us, no longer does" — which would 176 // otherwise leave a knot subscription dangling. 177 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 178 if err != nil { 179 return err 180 } 181 182 if err := st.UpsertRepo(ctx, did, c.RKey, 183 rec.Knot, rec.Name, 184 deref(rec.Spindle), deref(rec.RepoDid), 185 rec.CreatedAt, 186 ); err != nil { 187 return err 188 } 189 190 newSpindle := deref(rec.Spindle) 191 return reconcileKnot(ctx, st, knots, hostname, ownerDID, did, 192 oldKnot, oldSpindle, 193 rec.Knot, newSpindle, 194 ) 195 196 case jsOpDelete: 197 // Same shape as the update path, just with no "new" side: we 198 // have to read the row out before deleting so we can decide 199 // whether deletion freed the last hold on a knot we'd been 200 // subscribed to. 201 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 202 if err != nil { 203 return err 204 } 205 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil { 206 return err 207 } 208 return reconcileKnot(ctx, st, knots, hostname, ownerDID, did, 209 oldKnot, oldSpindle, 210 "", "", 211 ) 212 } 213 return nil 214} 215 216// reconcileKnot brings the knot consumer's subscriptions in line with 217// the latest store state after a single repo mutation. It is called 218// after the mutation has been applied so IsKnotWanted reflects the 219// post-mutation truth. 220// 221// Logic: 222// - If the new record names us as its spindle AND the publisher is 223// an authorized actor (spindle owner or owner-vouched member), 224// ensure we're subscribed to its knot. Without the membership 225// check, any firehose publisher could pin us to an attacker-chosen 226// knot just by publishing a sh.tangled.repo record naming us. 227// - If the old record named us as its spindle, check whether any 228// other authorized repo still references that knot through us; 229// if not, RemoveKnot it. Skip this when the knot didn't change 230// AND the spindle didn't move away from us, because then nothing 231// actually released our hold. 232func reconcileKnot( 233 ctx context.Context, 234 st *store, 235 knots KnotConsumer, 236 hostname, ownerDID, publisherDID string, 237 oldKnot, oldSpindle string, 238 newKnot, newSpindle string, 239) error { 240 // Tests pass nil for the consumer when they only care about the 241 // store mutation half of the handler; tolerate that here so 242 // callers don't have to special-case it. 243 if knots == nil { 244 return nil 245 } 246 247 if newSpindle == hostname && newKnot != "" { 248 ok, err := st.IsAuthorizedActor(ctx, ownerDID, publisherDID) 249 if err != nil { 250 return err 251 } 252 if ok { 253 knots.AddKnot(ctx, newKnot) 254 } else { 255 loggerFrom(ctx).Warn("ignoring repo from unauthorized publisher", 256 "publisher_did", publisherDID, 257 "knot", newKnot, 258 ) 259 } 260 } 261 262 // Did we just lose our claim on oldKnot? Two ways that can happen: 263 // the spindle field moved off of us, or the knot field moved to a 264 // different host. Either is a reason to consider unsubscribing 265 // from oldKnot, but only if no *other* authorized repo still has 266 // us on it. 267 releasedOld := oldSpindle == hostname && oldKnot != "" && 268 (newSpindle != hostname || newKnot != oldKnot) 269 if releasedOld { 270 stillWanted, err := st.IsKnotWanted(ctx, hostname, ownerDID, oldKnot) 271 if err != nil { 272 return err 273 } 274 if !stillWanted { 275 knots.RemoveKnot(ctx, oldKnot) 276 } 277 } 278 return nil 279} 280 281// reconcileMember adjusts knot subscriptions after a membership grant 282// or revocation may have changed `subject`'s authorization status. 283// For each knot named by subject's repos that point at us: 284// 285// - if subject is now authorized, AddKnot (idempotent: already- 286// subscribed knots are no-ops in the consumer); 287// - if subject is now unauthorized, ask IsKnotWanted whether any 288// *other* authorized repo still holds the knot; if not, 289// RemoveKnot. 290// 291// Without this, a member's repos picked up over the firehose before 292// the grant arrived would never get subscribed (the grant doesn't 293// re-deliver the older repo events), and a revocation would leave 294// the now-unauthorized publisher's knot subscribed until restart. 295func reconcileMember( 296 ctx context.Context, 297 st *store, 298 knots KnotConsumer, 299 hostname, ownerDID, subject string, 300) error { 301 if knots == nil || subject == "" { 302 return nil 303 } 304 knotsForSubject, err := st.KnotsForOwner(ctx, hostname, subject) 305 if err != nil { 306 return err 307 } 308 if len(knotsForSubject) == 0 { 309 return nil 310 } 311 authorized, err := st.IsAuthorizedActor(ctx, ownerDID, subject) 312 if err != nil { 313 return err 314 } 315 for _, k := range knotsForSubject { 316 if authorized { 317 knots.AddKnot(ctx, k) 318 continue 319 } 320 stillWanted, err := st.IsKnotWanted(ctx, hostname, ownerDID, k) 321 if err != nil { 322 return err 323 } 324 if !stillWanted { 325 knots.RemoveKnot(ctx, k) 326 } 327 } 328 return nil 329} 330 331func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 332 switch c.Operation { 333 case jsOpCreate, jsOpUpdate: 334 var rec tangled.RepoCollaborator 335 if err := json.Unmarshal(c.Record, &rec); err != nil { 336 // See applySpindleMember: decode errors are permanent. 337 return js.BadRecord(fmt.Errorf("decode repo.collaborator: %w", err)) 338 } 339 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 340 deref(rec.Repo), deref(rec.RepoDid), 341 rec.Subject, rec.CreatedAt, 342 ) 343 case jsOpDelete: 344 return st.DeleteRepoCollaborator(ctx, did, c.RKey) 345 } 346 return nil 347} 348 349// deref returns the pointed-to string, or "" for nil. The lexicon types 350// model optional fields as *string; the store schema treats absent and 351// empty the same, so collapsing the two here keeps callers tidy. 352func deref(s *string) string { 353 if s == nil { 354 return "" 355 } 356 return *s 357}