Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// This file wires tack into the AT Protocol firehose via Bluesky's 4// "jetstream" — a JSON projection of the firehose served over a websocket 5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of 6// AT Proto: things like "this user is a spindle member", "this repo wants 7// this spindle", etc. are all atproto records published to users' PDSes, 8// and jetstream is how a service like a spindle observes them in real time. 9// 10// As a spindle, the records we care about are: 11// 12// - sh.tangled.spindle.member — owner authorizes a DID to use us 13// - sh.tangled.repo — a repo declares us as its spindle 14// - sh.tangled.repo.collaborator — collaborators on those repos 15// 16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over 17// jetstream; they're delivered by the knot servers via a separate event 18// stream. That is plumbed in separately.) 19 20import ( 21 "context" 22 "encoding/json" 23 "fmt" 24 "time" 25 26 "github.com/bluesky-social/jetstream/pkg/client" 27 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 28 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 29 "tangled.org/core/api/tangled" 30) 31 32// jetstream operation strings. The jetstream protocol publishes these as 33// the Commit.Operation field; pulling them out as constants keeps the 34// switch in handleJetstreamEvent honest about typos. 35const ( 36 jsOpCreate = "create" 37 jsOpUpdate = "update" 38 jsOpDelete = "delete" 39) 40 41// startJetstream dials the configured jetstream endpoint and spawns a 42// background goroutine that consumes events for the lifetime of ctx. It 43// returns once the client is constructed; connection errors surface in 44// logs, not return values, because the read loop is expected to reconnect 45// on its own. 46// 47// The store is used for two things: loading the persisted cursor so we 48// resume from the last seen event after a restart, and persisting 49// observed records so the rest of tack can answer membership questions 50// without re-reading the firehose. 51// 52// The logger is pulled from ctx (see log.go); falls back to slog.Default() 53// if none is attached. 54func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 55 logger := loggerFrom(ctx).With("component", "jetstream") 56 57 // `wantedCollections` is a server-side filter: jetstream will only send 58 // us commit events whose record collection (NSID) is in this list. The 59 // NSIDs come from tangled-core's generated lexicon types so they stay 60 // in sync with whatever the appview/knots are publishing. 61 collections := []string{ 62 tangled.SpindleMemberNSID, 63 tangled.RepoNSID, 64 tangled.RepoCollaboratorNSID, 65 } 66 67 // Configure our JetStream client. 68 clientCfg := client.DefaultClientConfig() 69 clientCfg.WebsocketURL = cfg.JetstreamURL 70 clientCfg.WantedCollections = collections 71 72 // The handler closes over `st`, `knots` and the spindle hostname so 73 // the scheduler signature stays plain `func(ctx, *Event) error` and 74 // applyCommit can hand the knot consumer new sources as soon as 75 // matching repo records arrive. 76 handler := func(ctx context.Context, evt *jsmodels.Event) error { 77 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt) 78 } 79 80 // Re-attach the component-scoped logger so handler — which the 81 // scheduler invokes with the ctx we pass to ConnectAndRead — can 82 // pull it back out via loggerFrom. 83 ctx = loggerInto(ctx, logger) 84 85 // The sequential scheduler processes events one-at-a-time in arrival 86 // order. That's the right default for a spindle: ordering matters 87 // (e.g. a member-added event must apply before any record from that 88 // member is processed), and our event volume is tiny. 89 c, err := client.NewClient( 90 clientCfg, 91 logger, 92 sequential.NewScheduler("tack", logger, handler), 93 ) 94 if err != nil { 95 return fmt.Errorf("new jetstream client: %w", err) 96 } 97 98 go func() { 99 for { 100 // We re-read the cursor from the store at every (re)connect so we 101 // pick up any progress the previous connection persisted before 102 // dying. nil means "start from now", which is the right default on 103 // a brand-new install or after a corrupt cursor read. 104 cur, err := st.LoadCursor(ctx) 105 if err != nil { 106 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 107 cur = nil 108 } 109 if cur != nil { 110 logger.Info("connecting to jetstream", "cursor_us", *cur) 111 } else { 112 logger.Info("connecting to jetstream from now (no cursor)") 113 } 114 115 // Reconnect loop. ConnectAndRead blocks on the websocket and returns 116 // either when the connection drops (transient network error, server 117 // restart, etc.) or when ctx is cancelled. On error we sleep briefly 118 // and reconnect; on ctx cancellation we exit cleanly. 119 if err := c.ConnectAndRead(ctx, cur); err != nil { 120 if ctx.Err() != nil { 121 return 122 } 123 logger.Error("jetstream read loop", "err", err) 124 time.Sleep(2 * time.Second) 125 continue 126 } 127 if ctx.Err() != nil { 128 return 129 } 130 } 131 }() 132 133 return nil 134} 135 136// handleJetstreamEvent is the per-event callback for the JetStream. It 137// applies the event to the store and advances the persisted cursor. Any 138// returned error is logged by the scheduler but does not tear down the 139// connection — the next event will retry the cursor write implicitly. 140func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 141 // We only care about commits, which are the actual record CRUD 142 // operations on a user's PDS. Account/identity events are ignored 143 // for now; if we ever care about handle changes we can add them. 144 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil { 145 return nil 146 } 147 logger := loggerFrom(ctx) 148 149 // Dispatch on collection. Unknown collections shouldn't happen given 150 // our wantedCollections filter, but be defensive — jetstream may 151 // send schema changes ahead of us updating the filter. 152 if err := applyCommit(ctx, st, knots, hostname, evt); err != nil { 153 logger.Error("apply commit", 154 "err", err, 155 "did", evt.Did, 156 "collection", evt.Commit.Collection, 157 "op", evt.Commit.Operation, 158 "rkey", evt.Commit.RKey, 159 ) 160 161 // Fall through to cursor save: a single bad record shouldn't 162 // stall the cursor forever and force us to re-process every 163 // subsequent event after a restart. 164 } 165 166 // Advance the cursor. TimeUS is the jetstream-assigned microsecond 167 // timestamp; saving it after-apply means a crash mid-batch will at 168 // worst replay the failing event, never skip past it. 169 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil { 170 // Returning the error logs it; it doesn't kill the scheduler. 171 return fmt.Errorf("save cursor: %w", err) 172 } 173 174 return nil 175} 176 177// applyCommit routes a commit to the right store mutation based on its 178// collection NSID and operation. 179func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 180 c := evt.Commit 181 switch c.Collection { 182 case tangled.SpindleMemberNSID: 183 return applySpindleMember(ctx, st, evt.Did, c) 184 case tangled.RepoNSID: 185 return applyRepo(ctx, st, knots, hostname, evt.Did, c) 186 case tangled.RepoCollaboratorNSID: 187 return applyRepoCollaborator(ctx, st, evt.Did, c) 188 default: 189 // Server-side filter should prevent this, but log so we notice 190 // if jetstream ever changes behavior. 191 loggerFrom(ctx).Debug("ignoring unexpected collection", 192 "collection", c.Collection) 193 return nil 194 } 195} 196 197func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 198 switch c.Operation { 199 case jsOpCreate, jsOpUpdate: 200 var rec tangled.SpindleMember 201 if err := json.Unmarshal(c.Record, &rec); err != nil { 202 return fmt.Errorf("decode spindle.member: %w", err) 203 } 204 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 205 case jsOpDelete: 206 return st.DeleteSpindleMember(ctx, did, c.RKey) 207 } 208 return nil 209} 210 211func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error { 212 switch c.Operation { 213 case jsOpCreate, jsOpUpdate: 214 var rec tangled.Repo 215 if err := json.Unmarshal(c.Record, &rec); err != nil { 216 return fmt.Errorf("decode repo: %w", err) 217 } 218 219 // Capture the prior (knot, spindle) before the upsert so the 220 // post-mutation reconcile below can detect transitions like 221 // "repo used to point at us, no longer does" — which would 222 // otherwise leave a knot subscription dangling. 223 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 224 if err != nil { 225 return err 226 } 227 228 if err := st.UpsertRepo(ctx, did, c.RKey, 229 rec.Knot, rec.Name, 230 deref(rec.Spindle), deref(rec.RepoDid), 231 rec.CreatedAt, 232 ); err != nil { 233 return err 234 } 235 236 newSpindle := deref(rec.Spindle) 237 return reconcileKnot(ctx, st, knots, hostname, 238 oldKnot, oldSpindle, 239 rec.Knot, newSpindle, 240 ) 241 242 case jsOpDelete: 243 // Same shape as the update path, just with no "new" side: we 244 // have to read the row out before deleting so we can decide 245 // whether deletion freed the last hold on a knot we'd been 246 // subscribed to. 247 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 248 if err != nil { 249 return err 250 } 251 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil { 252 return err 253 } 254 return reconcileKnot(ctx, st, knots, hostname, 255 oldKnot, oldSpindle, 256 "", "", 257 ) 258 } 259 return nil 260} 261 262// reconcileKnot brings the knot consumer's subscriptions in line with 263// the latest store state after a single repo mutation. It is called 264// after the mutation has been applied so IsKnotWanted reflects the 265// post-mutation truth. 266// 267// Logic: 268// - If the new record names us as its spindle, ensure we're 269// subscribed to its knot (AddKnot is idempotent, so calling it on 270// an already-watched knot is cheap). 271// - If the old record named us as its spindle, check whether any 272// other repo still references that knot through us; if not, 273// RemoveKnot it. Skip this when the knot didn't change AND the 274// spindle didn't move away from us, because then nothing actually 275// released our hold. 276func reconcileKnot( 277 ctx context.Context, 278 st *store, 279 knots KnotConsumer, 280 hostname string, 281 oldKnot, oldSpindle string, 282 newKnot, newSpindle string, 283) error { 284 // Tests pass nil for the consumer when they only care about the 285 // store mutation half of the handler; tolerate that here so 286 // callers don't have to special-case it. 287 if knots == nil { 288 return nil 289 } 290 291 if newSpindle == hostname && newKnot != "" { 292 knots.AddKnot(ctx, newKnot) 293 } 294 295 // Did we just lose our claim on oldKnot? Two ways that can happen: 296 // the spindle field moved off of us, or the knot field moved to a 297 // different host. Either is a reason to consider unsubscribing 298 // from oldKnot — but only if no *other* repo still has us on it. 299 releasedOld := oldSpindle == hostname && oldKnot != "" && 300 (newSpindle != hostname || newKnot != oldKnot) 301 if releasedOld { 302 stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot) 303 if err != nil { 304 return err 305 } 306 if !stillWanted { 307 knots.RemoveKnot(ctx, oldKnot) 308 } 309 } 310 return nil 311} 312 313func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 314 switch c.Operation { 315 case jsOpCreate, jsOpUpdate: 316 var rec tangled.RepoCollaborator 317 if err := json.Unmarshal(c.Record, &rec); err != nil { 318 return fmt.Errorf("decode repo.collaborator: %w", err) 319 } 320 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 321 deref(rec.Repo), deref(rec.RepoDid), 322 rec.Subject, rec.CreatedAt, 323 ) 324 case jsOpDelete: 325 return st.DeleteRepoCollaborator(ctx, did, c.RKey) 326 } 327 return nil 328} 329 330// deref returns the pointed-to string, or "" for nil. The lexicon types 331// model optional fields as *string; the store schema treats absent and 332// empty the same, so collapsing the two here keeps callers tidy. 333func deref(s *string) string { 334 if s == nil { 335 return "" 336 } 337 return *s 338}