Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// This file wires tack into the AT Protocol firehose via Bluesky's 4// "jetstream" — a JSON projection of the firehose served over a websocket 5// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of 6// AT Proto: things like "this user is a spindle member", "this repo wants 7// this spindle", etc. are all atproto records published to users' PDSes, 8// and jetstream is how a service like a spindle observes them in real time. 9// 10// As a spindle, the records we care about are: 11// 12// - sh.tangled.spindle.member — owner authorizes a DID to use us 13// - sh.tangled.repo — a repo declares us as its spindle 14// - sh.tangled.repo.collaborator — collaborators on those repos 15// 16// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over 17// jetstream; they're delivered by the knot servers via a separate event 18// stream. That is plumbed in separately.) 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "time" 26 27 "github.com/bluesky-social/jetstream/pkg/client" 28 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 29 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 30 "tangled.org/core/api/tangled" 31) 32 33// jetstream operation strings. The jetstream protocol publishes these as 34// the Commit.Operation field; pulling them out as constants keeps the 35// switch in handleJetstreamEvent honest about typos. 36const ( 37 jsOpCreate = "create" 38 jsOpUpdate = "update" 39 jsOpDelete = "delete" 40) 41 42// jetstreamRewind is how far before the persisted cursor we resume on 43// reconnect. Jetstream's docs recommend rewinding a few seconds because 44// the cursor is a time-based filter and exact-boundary replay is not 45// guaranteed gapless across disconnects. The handler's mutations are 46// idempotent (UPSERTs and DELETEs keyed on (did, rkey)) so the small 47// amount of duplicate replay this introduces is harmless. 48const jetstreamRewind = 5 * time.Second 49 50// startJetstream dials the configured jetstream endpoint and spawns a 51// background goroutine that consumes events for the lifetime of ctx. It 52// returns once the client is constructed; connection errors surface in 53// logs, not return values, because the read loop is expected to reconnect 54// on its own. 55// 56// The store is used for two things: loading the persisted cursor so we 57// resume from the last seen event after a restart, and persisting 58// observed records so the rest of tack can answer membership questions 59// without re-reading the firehose. 60// 61// The logger is pulled from ctx (see log.go); falls back to slog.Default() 62// if none is attached. 63func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 64 logger := loggerFrom(ctx).With("component", "jetstream") 65 66 // `wantedCollections` is a server-side filter: jetstream will only send 67 // us commit events whose record collection (NSID) is in this list. The 68 // NSIDs come from tangled-core's generated lexicon types so they stay 69 // in sync with whatever the appview/knots are publishing. 70 collections := []string{ 71 tangled.SpindleMemberNSID, 72 tangled.RepoNSID, 73 tangled.RepoCollaboratorNSID, 74 } 75 76 // Configure our JetStream client. 77 clientCfg := client.DefaultClientConfig() 78 clientCfg.WebsocketURL = cfg.JetstreamURL 79 clientCfg.WantedCollections = collections 80 81 // The handler closes over `st`, `knots` and the spindle hostname so 82 // the scheduler signature stays plain `func(ctx, *Event) error` and 83 // applyCommit can hand the knot consumer new sources as soon as 84 // matching repo records arrive. 85 handler := func(ctx context.Context, evt *jsmodels.Event) error { 86 return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt) 87 } 88 89 // Re-attach the component-scoped logger so handler — which the 90 // scheduler invokes with the ctx we pass to ConnectAndRead — can 91 // pull it back out via loggerFrom. 92 ctx = loggerInto(ctx, logger) 93 94 // The sequential scheduler processes events one-at-a-time in arrival 95 // order. That's the right default for a spindle: ordering matters 96 // (e.g. a member-added event must apply before any record from that 97 // member is processed), and our event volume is tiny. 98 c, err := client.NewClient( 99 clientCfg, 100 logger, 101 sequential.NewScheduler("tack", logger, handler), 102 ) 103 if err != nil { 104 return fmt.Errorf("new jetstream client: %w", err) 105 } 106 107 go func() { 108 for { 109 // We re-read the cursor from the store at every (re)connect so we 110 // pick up any progress the previous connection persisted before 111 // dying. nil means "start from now", which is the right default on 112 // a brand-new install or after a corrupt cursor read. 113 cur, err := st.LoadCursor(ctx) 114 if err != nil { 115 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 116 cur = nil 117 } 118 // Rewind a few seconds before the saved cursor on reconnect. 119 // Jetstream cursors are time-based and the docs explicitly note 120 // that exact-boundary replay is not guaranteed gapless, so a 121 // small negative buffer protects against missing events that 122 // straddle the disconnect. Duplicates the rewind produces are 123 // safe: every applyCommit path is an idempotent upsert/delete 124 // keyed on (did, rkey), and SaveCursor only moves forward in 125 // practice because TimeUS is monotonic. 126 if cur != nil { 127 rewound := *cur - int64(jetstreamRewind/time.Microsecond) 128 if rewound < 0 { 129 rewound = 0 130 } 131 logger.Info("connecting to jetstream", 132 "cursor_us", *cur, 133 "rewound_us", rewound, 134 "rewind", jetstreamRewind, 135 ) 136 cur = &rewound 137 } else { 138 logger.Info("connecting to jetstream from now (no cursor)") 139 } 140 141 // Reconnect loop. ConnectAndRead blocks on the websocket and returns 142 // either when the connection drops (transient network error, server 143 // restart, etc.) or when ctx is cancelled. On error we sleep briefly 144 // and reconnect; on ctx cancellation we exit cleanly. 145 if err := c.ConnectAndRead(ctx, cur); err != nil { 146 if ctx.Err() != nil { 147 return 148 } 149 logger.Error("jetstream read loop", "err", err) 150 time.Sleep(2 * time.Second) 151 continue 152 } 153 if ctx.Err() != nil { 154 return 155 } 156 } 157 }() 158 159 return nil 160} 161 162// handleJetstreamEvent is the per-event callback for the JetStream. It 163// applies the event to the store and, when appropriate, advances the 164// persisted cursor. Any returned error is logged by the scheduler but 165// does not tear down the connection. 166// 167// Cursor-advancement policy: 168// 169// - Apply succeeded: advance the cursor. 170// - Apply failed with a badRecordError (malformed record / unusable 171// input): advance anyway, since replaying a permanently-broken 172// event on every reconnect would stall the firehose forever. 173// - Apply failed with anything else (treated as transient: store 174// hiccup, SQLite busy, disk full, shutdown race, ...): do NOT 175// advance. Saving the cursor here would permanently skip the 176// record, which for membership/repo state means we'd silently 177// lose a row that the rest of tack relies on. Leaving the cursor 178// in place gives the next reconnect (which rewinds by 179// jetstreamRewind) a chance to re-deliver and re-apply it. 180func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 181 // We only care about commits, which are the actual record CRUD 182 // operations on a user's PDS. Account/identity events are ignored 183 // for now; if we ever care about handle changes we can add them. 184 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil { 185 return nil 186 } 187 logger := loggerFrom(ctx) 188 189 // Dispatch on collection. Unknown collections shouldn't happen given 190 // our wantedCollections filter, but be defensive — jetstream may 191 // send schema changes ahead of us updating the filter. 192 applyErr := applyCommit(ctx, st, knots, hostname, evt) 193 if applyErr != nil { 194 logger.Error("apply commit", 195 "err", applyErr, 196 "did", evt.Did, 197 "collection", evt.Commit.Collection, 198 "op", evt.Commit.Operation, 199 "rkey", evt.Commit.RKey, 200 "transient", !isBadRecord(applyErr), 201 ) 202 203 // Transient failure: bail without advancing the cursor so the 204 // next delivery can retry this event. Returning the error 205 // surfaces it in the scheduler's logs too. 206 if !isBadRecord(applyErr) { 207 return applyErr 208 } 209 // Otherwise (badRecordError) fall through to cursor save: a 210 // single bad record shouldn't stall the cursor forever and 211 // force us to re-process every subsequent event after a 212 // restart. 213 } 214 215 // Advance the cursor. TimeUS is the jetstream-assigned microsecond 216 // timestamp; saving it after-apply means a crash mid-batch will at 217 // worst replay the failing event, never skip past it. 218 if err := st.SaveCursor(ctx, evt.TimeUS); err != nil { 219 // Returning the error logs it; it doesn't kill the scheduler. 220 return fmt.Errorf("save cursor: %w", err) 221 } 222 223 return nil 224} 225 226// badRecordError marks an applyCommit failure as caused by the *record 227// itself* being permanently unusable, e.g. a malformed JSON body, or a 228// field that violates an invariant we can't recover from on retry. 229// 230// We need this distinction because the jetstream cursor is time-based 231// and once advanced past an event we will never see it again. So: 232// 233// - Permanent bad-input failures should still advance the cursor: 234// replaying the same broken record on every restart accomplishes 235// nothing and would stall progress on every later event behind it. 236// - Transient infrastructure failures (SQLite busy, disk full, store 237// closed mid-shutdown, etc.) must NOT advance the cursor: the 238// record is fine and the next attempt, on reconnect or after the 239// transient condition clears, should reapply it. Skipping past 240// such a failure can permanently lose membership/repo state. 241// 242// Anything returned from applyCommit that isn't a badRecordError is 243// treated as transient by handleJetstreamEvent. 244type badRecordError struct{ err error } 245 246func (e *badRecordError) Error() string { return e.err.Error() } 247func (e *badRecordError) Unwrap() error { return e.err } 248 249// badRecord wraps err so handleJetstreamEvent recognizes it as a 250// permanent, cursor-advancing failure. Use this for any error caused by 251// the contents of the record (decode errors, schema violations); never 252// for store/IO errors. 253func badRecord(err error) error { return &badRecordError{err: err} } 254 255// isBadRecord reports whether err (or anything it wraps) is a 256// badRecordError. 257func isBadRecord(err error) bool { 258 var b *badRecordError 259 return errors.As(err, &b) 260} 261 262// applyCommit routes a commit to the right store mutation based on its 263// collection NSID and operation. 264func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 265 c := evt.Commit 266 switch c.Collection { 267 case tangled.SpindleMemberNSID: 268 return applySpindleMember(ctx, st, evt.Did, c) 269 case tangled.RepoNSID: 270 return applyRepo(ctx, st, knots, hostname, evt.Did, c) 271 case tangled.RepoCollaboratorNSID: 272 return applyRepoCollaborator(ctx, st, evt.Did, c) 273 default: 274 // Server-side filter should prevent this, but log so we notice 275 // if jetstream ever changes behavior. 276 loggerFrom(ctx).Debug("ignoring unexpected collection", 277 "collection", c.Collection) 278 return nil 279 } 280} 281 282func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 283 switch c.Operation { 284 case jsOpCreate, jsOpUpdate: 285 var rec tangled.SpindleMember 286 if err := json.Unmarshal(c.Record, &rec); err != nil { 287 // Decode failures are a permanent property of the record's 288 // bytes; mark as bad so the cursor can advance past it. 289 return badRecord(fmt.Errorf("decode spindle.member: %w", err)) 290 } 291 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 292 case jsOpDelete: 293 return st.DeleteSpindleMember(ctx, did, c.RKey) 294 } 295 return nil 296} 297 298func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error { 299 switch c.Operation { 300 case jsOpCreate, jsOpUpdate: 301 var rec tangled.Repo 302 if err := json.Unmarshal(c.Record, &rec); err != nil { 303 // See applySpindleMember: decode errors are permanent. 304 return badRecord(fmt.Errorf("decode repo: %w", err)) 305 } 306 307 // Capture the prior (knot, spindle) before the upsert so the 308 // post-mutation reconcile below can detect transitions like 309 // "repo used to point at us, no longer does" — which would 310 // otherwise leave a knot subscription dangling. 311 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 312 if err != nil { 313 return err 314 } 315 316 if err := st.UpsertRepo(ctx, did, c.RKey, 317 rec.Knot, rec.Name, 318 deref(rec.Spindle), deref(rec.RepoDid), 319 rec.CreatedAt, 320 ); err != nil { 321 return err 322 } 323 324 newSpindle := deref(rec.Spindle) 325 return reconcileKnot(ctx, st, knots, hostname, 326 oldKnot, oldSpindle, 327 rec.Knot, newSpindle, 328 ) 329 330 case jsOpDelete: 331 // Same shape as the update path, just with no "new" side: we 332 // have to read the row out before deleting so we can decide 333 // whether deletion freed the last hold on a knot we'd been 334 // subscribed to. 335 oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey) 336 if err != nil { 337 return err 338 } 339 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil { 340 return err 341 } 342 return reconcileKnot(ctx, st, knots, hostname, 343 oldKnot, oldSpindle, 344 "", "", 345 ) 346 } 347 return nil 348} 349 350// reconcileKnot brings the knot consumer's subscriptions in line with 351// the latest store state after a single repo mutation. It is called 352// after the mutation has been applied so IsKnotWanted reflects the 353// post-mutation truth. 354// 355// Logic: 356// - If the new record names us as its spindle, ensure we're 357// subscribed to its knot (AddKnot is idempotent, so calling it on 358// an already-watched knot is cheap). 359// - If the old record named us as its spindle, check whether any 360// other repo still references that knot through us; if not, 361// RemoveKnot it. Skip this when the knot didn't change AND the 362// spindle didn't move away from us, because then nothing actually 363// released our hold. 364func reconcileKnot( 365 ctx context.Context, 366 st *store, 367 knots KnotConsumer, 368 hostname string, 369 oldKnot, oldSpindle string, 370 newKnot, newSpindle string, 371) error { 372 // Tests pass nil for the consumer when they only care about the 373 // store mutation half of the handler; tolerate that here so 374 // callers don't have to special-case it. 375 if knots == nil { 376 return nil 377 } 378 379 if newSpindle == hostname && newKnot != "" { 380 knots.AddKnot(ctx, newKnot) 381 } 382 383 // Did we just lose our claim on oldKnot? Two ways that can happen: 384 // the spindle field moved off of us, or the knot field moved to a 385 // different host. Either is a reason to consider unsubscribing 386 // from oldKnot — but only if no *other* repo still has us on it. 387 releasedOld := oldSpindle == hostname && oldKnot != "" && 388 (newSpindle != hostname || newKnot != oldKnot) 389 if releasedOld { 390 stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot) 391 if err != nil { 392 return err 393 } 394 if !stillWanted { 395 knots.RemoveKnot(ctx, oldKnot) 396 } 397 } 398 return nil 399} 400 401func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 402 switch c.Operation { 403 case jsOpCreate, jsOpUpdate: 404 var rec tangled.RepoCollaborator 405 if err := json.Unmarshal(c.Record, &rec); err != nil { 406 // See applySpindleMember: decode errors are permanent. 407 return badRecord(fmt.Errorf("decode repo.collaborator: %w", err)) 408 } 409 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 410 deref(rec.Repo), deref(rec.RepoDid), 411 rec.Subject, rec.CreatedAt, 412 ) 413 case jsOpDelete: 414 return st.DeleteRepoCollaborator(ctx, did, c.RKey) 415 } 416 return nil 417} 418 419// deref returns the pointed-to string, or "" for nil. The lexicon types 420// model optional fields as *string; the store schema treats absent and 421// empty the same, so collapsing the two here keeps callers tidy. 422func deref(s *string) string { 423 if s == nil { 424 return "" 425 } 426 return *s 427}