Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// This file wires tack into the AT Protocol firehose via Bluesky's 4// "jetstream" — a JSON projection of the firehose served over a websocket 5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of 6// AT Proto: things like "this user is a spindle member", "this repo wants 7// this spindle", etc. are all atproto records published to users' PDSes, 8// and jetstream is how a service like a spindle observes them in real time. 9// 10// As a spindle, the records we care about are: 11// 12// - sh.tangled.spindle.member — owner authorizes a DID to use us 13// - sh.tangled.repo — a repo declares us as its spindle 14// - sh.tangled.repo.collaborator — collaborators on those repos 15// 16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over 17// jetstream; they're delivered by the knot servers via a separate event 18// stream. That is plumbed in separately.) 19 20import ( 21 "context" 22 "encoding/json" 23 "fmt" 24 "time" 25 26 "github.com/bluesky-social/jetstream/pkg/client" 27 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 28 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 29 "tangled.org/core/api/tangled" 30) 31 32// jetstream operation strings. The jetstream protocol publishes these as 33// the Commit.Operation field; pulling them out as constants keeps the 34// switch in handleJetstreamEvent honest about typos. 35const ( 36 jsOpCreate = "create" 37 jsOpUpdate = "update" 38 jsOpDelete = "delete" 39) 40 41// startJetstream dials the configured jetstream endpoint and spawns a 42// background goroutine that consumes events for the lifetime of ctx. It 43// returns once the client is constructed; connection errors surface in 44// logs, not return values, because the read loop is expected to reconnect 45// on its own. 46// 47// The store is used for two things: loading the persisted cursor so we 48// resume from the last seen event after a restart, and persisting 49// observed records so the rest of tack can answer membership questions 50// without re-reading the firehose. 51// 52// The logger is pulled from ctx (see log.go); falls back to slog.Default() 53// if none is attached. 54func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 55 logger := loggerFrom(ctx).With("component", "jetstream") 56 57 // `wantedCollections` is a server-side filter: jetstream will only send 58 // us commit events whose record collection (NSID) is in this list. The 59 // NSIDs come from tangled-core's generated lexicon types so they stay 60 // in sync with whatever the appview/knots are publishing. 61 collections := []string{ 62 tangled.SpindleMemberNSID, 63 tangled.RepoNSID, 64 tangled.RepoCollaboratorNSID, 65 } 66 67 // Configure our JetStream client. 68 clientCfg := client.DefaultClientConfig() 69 clientCfg.WebsocketURL = cfg.JetstreamURL 70 clientCfg.WantedCollections = collections 71 72 // The handler closes over `st`, `knots` and the spindle hostname so 73 // the scheduler signature stays plain `func(ctx, *Event) error` and 74 // applyCommit can hand the knot consumer new sources as soon as 75 // matching repo records arrive. 76 handler := func(ctx context.Context, evt *jsmodels.Event) error { 77 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt) 78 } 79 80 // Re-attach the component-scoped logger so handler — which the 81 // scheduler invokes with the ctx we pass to ConnectAndRead — can 82 // pull it back out via loggerFrom. 83 ctx = loggerInto(ctx, logger) 84 85 // The sequential scheduler processes events one-at-a-time in arrival 86 // order. That's the right default for a spindle: ordering matters 87 // (e.g. a member-added event must apply before any record from that 88 // member is processed), and our event volume is tiny. 89 c, err := client.NewClient( 90 clientCfg, 91 logger, 92 sequential.NewScheduler("tack", logger, handler), 93 ) 94 if err != nil { 95 return fmt.Errorf("new jetstream client: %w", err) 96 } 97 98 go func() { 99 for { 100 // We re-read the cursor from the store at every (re)connect so we 101 // pick up any progress the previous connection persisted before 102 // dying. nil means "start from now", which is the right default on 103 // a brand-new install or after a corrupt cursor read. 104 cur, err := st.LoadCursor(ctx) 105 if err != nil { 106 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 107 cur = nil 108 } 109 if cur != nil { 110 logger.Info("connecting to jetstream", "cursor_us", *cur) 111 } else { 112 logger.Info("connecting to jetstream from now (no cursor)") 113 } 114 115 // Reconnect loop. ConnectAndRead blocks on the websocket and returns 116 // either when the connection drops (transient network error, server 117 // restart, etc.) or when ctx is cancelled. On error we sleep briefly 118 // and reconnect; on ctx cancellation we exit cleanly. 119 if err := c.ConnectAndRead(ctx, cur); err != nil { 120 if ctx.Err() != nil { 121 return 122 } 123 logger.Error("jetstream read loop", "err", err) 124 time.Sleep(2 * time.Second) 125 continue 126 } 127 if ctx.Err() != nil { 128 return 129 } 130 } 131 }() 132 133 return nil 134} 135 136// handleJetstreamEvent is the per-event callback for the JetStream. It 137// applies the event to the store and advances the persisted cursor. Any 138// returned error is logged by the scheduler but does not tear down the 139// connection — the next event will retry the cursor write implicitly. 140func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 141 // We only care about commits, which are the actual record CRUD 142 // operations on a user's PDS. Account/identity events are ignored 143 // for now; if we ever care about handle changes we can add them. 144 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil { 145 return nil 146 } 147 logger := loggerFrom(ctx) 148 149 // Dispatch on collection. Unknown collections shouldn't happen given 150 // our wantedCollections filter, but be defensive — jetstream may 151 // send schema changes ahead of us updating the filter. 152 if err := applyCommit(ctx, st, knots, hostname, evt); err != nil { 153 logger.Error("apply commit", 154 "err", err, 155 "did", evt.Did, 156 "collection", evt.Commit.Collection, 157 "op", evt.Commit.Operation, 158 "rkey", evt.Commit.RKey, 159 ) 160 161 // Fall through to cursor save: a single bad record shouldn't 162 // stall the cursor forever and force us to re-process every 163 // subsequent event after a restart. 164 } 165 166 // Advance the cursor. TimeUS is the jetstream-assigned microsecond 167 // timestamp; saving it after-apply means a crash mid-batch will at 168 // worst replay the failing event, never skip past it. 169 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil { 170 // Returning the error logs it; it doesn't kill the scheduler. 171 return fmt.Errorf("save cursor: %w", err) 172 } 173 174 return nil 175} 176 177// applyCommit routes a commit to the right store mutation based on its 178// collection NSID and operation. 179func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 180 c := evt.Commit 181 switch c.Collection { 182 case tangled.SpindleMemberNSID: 183 return applySpindleMember(ctx, st, evt.Did, c) 184 case tangled.RepoNSID: 185 return applyRepo(ctx, st, knots, hostname, evt.Did, c) 186 case tangled.RepoCollaboratorNSID: 187 return applyRepoCollaborator(ctx, st, evt.Did, c) 188 default: 189 // Server-side filter should prevent this, but log so we notice 190 // if jetstream ever changes behavior. 191 loggerFrom(ctx).Debug("ignoring unexpected collection", 192 "collection", c.Collection) 193 return nil 194 } 195} 196 197func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 198 switch c.Operation { 199 case jsOpCreate, jsOpUpdate: 200 var rec tangled.SpindleMember 201 if err := json.Unmarshal(c.Record, &rec); err != nil { 202 return fmt.Errorf("decode spindle.member: %w", err) 203 } 204 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 205 case jsOpDelete: 206 return st.DeleteSpindleMember(ctx, did, c.RKey) 207 } 208 return nil 209} 210 211func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error { 212 switch c.Operation { 213 case jsOpCreate, jsOpUpdate: 214 var rec tangled.Repo 215 if err := json.Unmarshal(c.Record, &rec); err != nil { 216 return fmt.Errorf("decode repo: %w", err) 217 } 218 if err := st.UpsertRepo(ctx, did, c.RKey, 219 rec.Knot, rec.Name, 220 deref(rec.Spindle), deref(rec.RepoDid), 221 rec.CreatedAt, 222 ); err != nil { 223 return err 224 } 225 226 // If this repo just declared us as its spindle, start (or 227 // continue) listening to its knot for pipeline triggers. The 228 // knot consumer dedupes on its own so this is safe to call 229 // even on update events that don't change the spindle field. 230 if knots != nil && rec.Spindle != nil && *rec.Spindle == hostname && rec.Knot != "" { 231 knots.AddKnot(ctx, rec.Knot) 232 } 233 234 return nil 235 case jsOpDelete: 236 // We don't unsubscribe from the knot here: other repos may 237 // still want us to watch it. A periodic reconciliation pass 238 // (not yet implemented) is the right place to drop unused 239 // subscriptions. 240 return st.DeleteRepo(ctx, did, c.RKey) 241 } 242 return nil 243} 244 245func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 246 switch c.Operation { 247 case jsOpCreate, jsOpUpdate: 248 var rec tangled.RepoCollaborator 249 if err := json.Unmarshal(c.Record, &rec); err != nil { 250 return fmt.Errorf("decode repo.collaborator: %w", err) 251 } 252 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 253 deref(rec.Repo), deref(rec.RepoDid), 254 rec.Subject, rec.CreatedAt, 255 ) 256 case jsOpDelete: 257 return st.DeleteRepoCollaborator(ctx, did, c.RKey) 258 } 259 return nil 260} 261 262// deref returns the pointed-to string, or "" for nil. The lexicon types 263// model optional fields as *string; the store schema treats absent and 264// empty the same, so collapsing the two here keeps callers tidy. 265func deref(s *string) string { 266 if s == nil { 267 return "" 268 } 269 return *s 270}