Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// This file wires tack into the AT Protocol firehose via Bluesky's 4// "jetstream" — a JSON projection of the firehose served over a websocket 5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of 6// AT Proto: things like "this user is a spindle member", "this repo wants 7// this spindle", etc. are all atproto records published to users' PDSes, 8// and jetstream is how a service like a spindle observes them in real time. 9// 10// As a spindle, the records we care about are: 11// 12// - sh.tangled.spindle.member — owner authorizes a DID to use us 13// - sh.tangled.repo — a repo declares us as its spindle 14// - sh.tangled.repo.collaborator — collaborators on those repos 15// 16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over 17// jetstream; they're delivered by the knot servers via a separate event 18// stream. That is plumbed in separately.) 19 20import ( 21 "context" 22 "encoding/json" 23 "fmt" 24 "time" 25 26 "github.com/bluesky-social/jetstream/pkg/client" 27 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 28 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 29 "tangled.org/core/api/tangled" 30) 31 32// jetstream operation strings. The jetstream protocol publishes these as 33// the Commit.Operation field; pulling them out as constants keeps the 34// switch in handleJetstreamEvent honest about typos. 35const ( 36 jsOpCreate = "create" 37 jsOpUpdate = "update" 38 jsOpDelete = "delete" 39) 40 41// jetstreamRewind is how far before the persisted cursor we resume on 42// reconnect. Jetstream's docs recommend rewinding a few seconds because 43// the cursor is a time-based filter and exact-boundary replay is not 44// guaranteed gapless across disconnects. The handler's mutations are 45// idempotent (UPSERTs and DELETEs keyed on (did, rkey)) so the small 46// amount of duplicate replay this introduces is harmless. 47const jetstreamRewind = 5 * time.Second 48 49// startJetstream dials the configured jetstream endpoint and spawns a 50// background goroutine that consumes events for the lifetime of ctx. It 51// returns once the client is constructed; connection errors surface in 52// logs, not return values, because the read loop is expected to reconnect 53// on its own. 54// 55// The store is used for two things: loading the persisted cursor so we 56// resume from the last seen event after a restart, and persisting 57// observed records so the rest of tack can answer membership questions 58// without re-reading the firehose. 59// 60// The logger is pulled from ctx (see log.go); falls back to slog.Default() 61// if none is attached. 62func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 63 logger := loggerFrom(ctx).With("component", "jetstream") 64 65 // `wantedCollections` is a server-side filter: jetstream will only send 66 // us commit events whose record collection (NSID) is in this list. The 67 // NSIDs come from tangled-core's generated lexicon types so they stay 68 // in sync with whatever the appview/knots are publishing. 69 collections := []string{ 70 tangled.SpindleMemberNSID, 71 tangled.RepoNSID, 72 tangled.RepoCollaboratorNSID, 73 } 74 75 // Configure our JetStream client. 76 clientCfg := client.DefaultClientConfig() 77 clientCfg.WebsocketURL = cfg.JetstreamURL 78 clientCfg.WantedCollections = collections 79 80 // The handler closes over `st`, `knots` and the spindle hostname so 81 // the scheduler signature stays plain `func(ctx, *Event) error` and 82 // applyCommit can hand the knot consumer new sources as soon as 83 // matching repo records arrive. 84 handler := func(ctx context.Context, evt *jsmodels.Event) error { 85 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt) 86 } 87 88 // Re-attach the component-scoped logger so handler — which the 89 // scheduler invokes with the ctx we pass to ConnectAndRead — can 90 // pull it back out via loggerFrom. 91 ctx = loggerInto(ctx, logger) 92 93 // The sequential scheduler processes events one-at-a-time in arrival 94 // order. That's the right default for a spindle: ordering matters 95 // (e.g. a member-added event must apply before any record from that 96 // member is processed), and our event volume is tiny. 97 c, err := client.NewClient( 98 clientCfg, 99 logger, 100 sequential.NewScheduler("tack", logger, handler), 101 ) 102 if err != nil { 103 return fmt.Errorf("new jetstream client: %w", err) 104 } 105 106 go func() { 107 for { 108 // We re-read the cursor from the store at every (re)connect so we 109 // pick up any progress the previous connection persisted before 110 // dying. nil means "start from now", which is the right default on 111 // a brand-new install or after a corrupt cursor read. 112 cur, err := st.LoadCursor(ctx) 113 if err != nil { 114 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 115 cur = nil 116 } 117 // Rewind a few seconds before the saved cursor on reconnect. 118 // Jetstream cursors are time-based and the docs explicitly note 119 // that exact-boundary replay is not guaranteed gapless, so a 120 // small negative buffer protects against missing events that 121 // straddle the disconnect. Duplicates the rewind produces are 122 // safe: every applyCommit path is an idempotent upsert/delete 123 // keyed on (did, rkey), and SaveCursor only moves forward in 124 // practice because TimeUS is monotonic. 125 if cur != nil { 126 rewound := *cur - int64(jetstreamRewind/time.Microsecond) 127 if rewound < 0 { 128 rewound = 0 129 } 130 logger.Info("connecting to jetstream", 131 "cursor_us", *cur, 132 "rewound_us", rewound, 133 "rewind", jetstreamRewind, 134 ) 135 cur = &rewound 136 } else { 137 logger.Info("connecting to jetstream from now (no cursor)") 138 } 139 140 // Reconnect loop. ConnectAndRead blocks on the websocket and returns 141 // either when the connection drops (transient network error, server 142 // restart, etc.) or when ctx is cancelled. On error we sleep briefly 143 // and reconnect; on ctx cancellation we exit cleanly. 144 if err := c.ConnectAndRead(ctx, cur); err != nil { 145 if ctx.Err() != nil { 146 return 147 } 148 logger.Error("jetstream read loop", "err", err) 149 time.Sleep(2 * time.Second) 150 continue 151 } 152 if ctx.Err() != nil { 153 return 154 } 155 } 156 }() 157 158 return nil 159} 160 161// handleJetstreamEvent is the per-event callback for the JetStream. It 162// applies the event to the store and advances the persisted cursor. Any 163// returned error is logged by the scheduler but does not tear down the 164// connection — the next event will retry the cursor write implicitly. 165func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 166 // We only care about commits, which are the actual record CRUD 167 // operations on a user's PDS. Account/identity events are ignored 168 // for now; if we ever care about handle changes we can add them. 169 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil { 170 return nil 171 } 172 logger := loggerFrom(ctx) 173 174 // Dispatch on collection. Unknown collections shouldn't happen given 175 // our wantedCollections filter, but be defensive — jetstream may 176 // send schema changes ahead of us updating the filter. 177 if err := applyCommit(ctx, st, knots, hostname, evt); err != nil { 178 logger.Error("apply commit", 179 "err", err, 180 "did", evt.Did, 181 "collection", evt.Commit.Collection, 182 "op", evt.Commit.Operation, 183 "rkey", evt.Commit.RKey, 184 ) 185 186 // Fall through to cursor save: a single bad record shouldn't 187 // stall the cursor forever and force us to re-process every 188 // subsequent event after a restart. 189 } 190 191 // Advance the cursor. TimeUS is the jetstream-assigned microsecond 192 // timestamp; saving it after-apply means a crash mid-batch will at 193 // worst replay the failing event, never skip past it. 194 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil { 195 // Returning the error logs it; it doesn't kill the scheduler. 196 return fmt.Errorf("save cursor: %w", err) 197 } 198 199 return nil 200} 201 202// applyCommit routes a commit to the right store mutation based on its 203// collection NSID and operation. 204func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 205 c := evt.Commit 206 switch c.Collection { 207 case tangled.SpindleMemberNSID: 208 return applySpindleMember(ctx, st, evt.Did, c) 209 case tangled.RepoNSID: 210 return applyRepo(ctx, st, knots, hostname, evt.Did, c) 211 case tangled.RepoCollaboratorNSID: 212 return applyRepoCollaborator(ctx, st, evt.Did, c) 213 default: 214 // Server-side filter should prevent this, but log so we notice 215 // if jetstream ever changes behavior. 216 loggerFrom(ctx).Debug("ignoring unexpected collection", 217 "collection", c.Collection) 218 return nil 219 } 220} 221 222func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 223 switch c.Operation { 224 case jsOpCreate, jsOpUpdate: 225 var rec tangled.SpindleMember 226 if err := json.Unmarshal(c.Record, &rec); err != nil { 227 return fmt.Errorf("decode spindle.member: %w", err) 228 } 229 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 230 case jsOpDelete: 231 return st.DeleteSpindleMember(ctx, did, c.RKey) 232 } 233 return nil 234} 235 236func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error { 237 switch c.Operation { 238 case jsOpCreate, jsOpUpdate: 239 var rec tangled.Repo 240 if err := json.Unmarshal(c.Record, &rec); err != nil { 241 return fmt.Errorf("decode repo: %w", err) 242 } 243 244 // Capture the prior (knot, spindle) before the upsert so the 245 // post-mutation reconcile below can detect transitions like 246 // "repo used to point at us, no longer does" — which would 247 // otherwise leave a knot subscription dangling. 248 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 249 if err != nil { 250 return err 251 } 252 253 if err := st.UpsertRepo(ctx, did, c.RKey, 254 rec.Knot, rec.Name, 255 deref(rec.Spindle), deref(rec.RepoDid), 256 rec.CreatedAt, 257 ); err != nil { 258 return err 259 } 260 261 newSpindle := deref(rec.Spindle) 262 return reconcileKnot(ctx, st, knots, hostname, 263 oldKnot, oldSpindle, 264 rec.Knot, newSpindle, 265 ) 266 267 case jsOpDelete: 268 // Same shape as the update path, just with no "new" side: we 269 // have to read the row out before deleting so we can decide 270 // whether deletion freed the last hold on a knot we'd been 271 // subscribed to. 272 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 273 if err != nil { 274 return err 275 } 276 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil { 277 return err 278 } 279 return reconcileKnot(ctx, st, knots, hostname, 280 oldKnot, oldSpindle, 281 "", "", 282 ) 283 } 284 return nil 285} 286 287// reconcileKnot brings the knot consumer's subscriptions in line with 288// the latest store state after a single repo mutation. It is called 289// after the mutation has been applied so IsKnotWanted reflects the 290// post-mutation truth. 291// 292// Logic: 293// - If the new record names us as its spindle, ensure we're 294// subscribed to its knot (AddKnot is idempotent, so calling it on 295// an already-watched knot is cheap). 296// - If the old record named us as its spindle, check whether any 297// other repo still references that knot through us; if not, 298// RemoveKnot it. Skip this when the knot didn't change AND the 299// spindle didn't move away from us, because then nothing actually 300// released our hold. 301func reconcileKnot( 302 ctx context.Context, 303 st *store, 304 knots KnotConsumer, 305 hostname string, 306 oldKnot, oldSpindle string, 307 newKnot, newSpindle string, 308) error { 309 // Tests pass nil for the consumer when they only care about the 310 // store mutation half of the handler; tolerate that here so 311 // callers don't have to special-case it. 312 if knots == nil { 313 return nil 314 } 315 316 if newSpindle == hostname && newKnot != "" { 317 knots.AddKnot(ctx, newKnot) 318 } 319 320 // Did we just lose our claim on oldKnot? Two ways that can happen: 321 // the spindle field moved off of us, or the knot field moved to a 322 // different host. Either is a reason to consider unsubscribing 323 // from oldKnot — but only if no *other* repo still has us on it. 324 releasedOld := oldSpindle == hostname && oldKnot != "" && 325 (newSpindle != hostname || newKnot != oldKnot) 326 if releasedOld { 327 stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot) 328 if err != nil { 329 return err 330 } 331 if !stillWanted { 332 knots.RemoveKnot(ctx, oldKnot) 333 } 334 } 335 return nil 336} 337 338func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 339 switch c.Operation { 340 case jsOpCreate, jsOpUpdate: 341 var rec tangled.RepoCollaborator 342 if err := json.Unmarshal(c.Record, &rec); err != nil { 343 return fmt.Errorf("decode repo.collaborator: %w", err) 344 } 345 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 346 deref(rec.Repo), deref(rec.RepoDid), 347 rec.Subject, rec.CreatedAt, 348 ) 349 case jsOpDelete: 350 return st.DeleteRepoCollaborator(ctx, did, c.RKey) 351 } 352 return nil 353} 354 355// deref returns the pointed-to string, or "" for nil. The lexicon types 356// model optional fields as *string; the store schema treats absent and 357// empty the same, so collapsing the two here keeps callers tidy. 358func deref(s *string) string { 359 if s == nil { 360 return "" 361 } 362 return *s 363}