Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

spindle: turn tap off when done backfilling

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 26, 2026, 9:09 AM +0300) commit 602b70f6 parent c3f92214 change-id snlulyqv
+226 -8
+5
spindle/embedtap.go
··· 10 10 "net" 11 11 "net/http" 12 12 "strings" 13 + "sync/atomic" 13 14 "time" 14 15 15 16 "github.com/bluesky-social/indigo/service/tap" ··· 46 47 type embeddedTap struct { 47 48 tap *tap.Tap 48 49 logger *slog.Logger 50 + closed atomic.Bool 49 51 } 50 52 51 53 func startEmbeddedTap(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*embeddedTap, error) { ··· 114 116 115 117 func (e *embeddedTap) Shutdown() { 116 118 if e == nil || e.tap == nil { 119 + return 120 + } 121 + if e.closed.Swap(true) { 117 122 return 118 123 } 119 124 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+36
spindle/ingester.go
··· 8 8 9 9 "tangled.org/core/api/tangled" 10 10 "tangled.org/core/spindle/db" 11 + "tangled.org/core/tapc" 11 12 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 13 14 "github.com/bluesky-social/jetstream/pkg/models" ··· 25 26 switch e.Commit.Collection { 26 27 case tangled.SpindleMemberNSID: 27 28 err = s.ingestMember(ctx, e) 29 + case tangled.RepoNSID, tangled.RepoCollaboratorNSID: 30 + if evt, ok := jetstreamToTapEvent(e); ok { 31 + err = s.tap.processEvent(ctx, evt) 32 + } 28 33 } 29 34 30 35 if err != nil { ··· 38 43 39 44 return nil 40 45 } 46 + } 47 + 48 + func jetstreamToTapEvent(e *models.Event) (tapc.Event, bool) { 49 + if e.Commit == nil { 50 + return tapc.Event{}, false 51 + } 52 + did, err := syntax.ParseDID(e.Did) 53 + if err != nil { 54 + return tapc.Event{}, false 55 + } 56 + var action tapc.RecordAction 57 + switch e.Commit.Operation { 58 + case models.CommitOperationCreate: 59 + action = tapc.RecordCreateAction 60 + case models.CommitOperationUpdate: 61 + action = tapc.RecordUpdateAction 62 + case models.CommitOperationDelete: 63 + action = tapc.RecordDeleteAction 64 + default: 65 + return tapc.Event{}, false 66 + } 67 + return tapc.Event{ 68 + Type: tapc.EvtRecord, 69 + Record: &tapc.RecordEventData{ 70 + Did: did, 71 + Rkey: syntax.RecordKey(e.Commit.RKey), 72 + Collection: syntax.NSID(e.Commit.Collection), 73 + Action: action, 74 + Record: e.Commit.Record, 75 + }, 76 + }, true 41 77 } 42 78 43 79 func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
+25 -3
spindle/server.go
··· 112 112 113 113 collections := []string{ 114 114 tangled.SpindleMemberNSID, 115 + tangled.RepoNSID, 116 + tangled.RepoCollaboratorNSID, 115 117 } 116 118 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 117 119 if err != nil { ··· 126 128 } 127 129 for _, d := range dids { 128 130 jc.AddDid(d) 131 + } 132 + 133 + knownRepos, err := d.AllRepos() 134 + if err != nil { 135 + return nil, fmt.Errorf("failed to get known repos: %w", err) 136 + } 137 + for _, r := range knownRepos { 138 + if r.Owner != "" { 139 + jc.AddDid(r.Owner.String()) 140 + } 129 141 } 130 142 131 143 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) ··· 252 264 defer stopper.Stop() 253 265 } 254 266 267 + tapCtx, tapCancel := context.WithCancel(ctx) 268 + 255 269 if s.cfg.Server.Tap.Embed { 256 - emb, err := startEmbeddedTap(ctx, s.cfg, log.SubLogger(s.l, "embedtap")) 270 + emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap")) 257 271 if err != nil { 272 + tapCancel() 258 273 return fmt.Errorf("starting embedded tap: %w", err) 259 274 } 260 275 s.embedTap = emb 261 - defer s.embedTap.Shutdown() 276 + defer func() { 277 + tapCancel() 278 + s.embedTap.Shutdown() 279 + }() 280 + 281 + go s.watchTapDrain(tapCtx, tapCancel) 282 + } else { 283 + defer tapCancel() 262 284 } 263 285 264 286 go func() { ··· 267 289 }() 268 290 269 291 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 270 - s.tap.Start(ctx) 292 + s.tap.Start(tapCtx) 271 293 272 294 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 273 295 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
+152
spindle/tap_drain.go
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "net/url" 8 + "sync/atomic" 9 + "time" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/events" 13 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 14 + "github.com/gorilla/websocket" 15 + _ "github.com/mattn/go-sqlite3" 16 + ) 17 + 18 + const ( 19 + tapDrainPollInterval = 3 * time.Second 20 + tapDrainStableChecks = 2 21 + tapEmptyGraceChecks = 10 22 + ) 23 + 24 + func (s *Spindle) watchTapDrain(ctx context.Context, stop context.CancelFunc) { 25 + headSeq, err := relayHeadSeq(ctx, s.cfg.Server.Tap.RelayUrl) 26 + if err != nil { 27 + s.l.Warn("tap drain watcher: relay head checking failed, falling back to resync-drain only", "err", err) 28 + headSeq = 0 29 + } else { 30 + s.l.Info("tap drain watcher: relay head seq at startup", "head", headSeq, "relay", s.cfg.Server.Tap.RelayUrl) 31 + } 32 + 33 + conn, err := sql.Open("sqlite3", s.cfg.Server.Tap.DBPath) 34 + if err != nil { 35 + s.l.Warn("tap drain watcher: opening tap db failed", "err", err) 36 + return 37 + } 38 + defer conn.Close() 39 + 40 + ticker := time.NewTicker(tapDrainPollInterval) 41 + defer ticker.Stop() 42 + 43 + sawWork := false 44 + readyStreak := 0 45 + emptyStreak := 0 46 + queryFailed := false 47 + 48 + for { 49 + select { 50 + case <-ctx.Done(): 51 + return 52 + case <-ticker.C: 53 + var total, busy int 54 + if err := conn.QueryRowContext(ctx, ` 55 + select count(*), 56 + coalesce(sum(case when state in ('pending','resyncing','desynchronized') then 1 else 0 end), 0) 57 + from repos`).Scan(&total, &busy); err != nil { 58 + if !queryFailed { 59 + s.l.Warn("tap drain watcher: repos query failed", "err", err) 60 + queryFailed = true 61 + } 62 + continue 63 + } 64 + queryFailed = false 65 + 66 + var cursor int64 67 + if headSeq > 0 { 68 + if err := conn.QueryRowContext(ctx, 69 + `select cursor from firehose_cursors where url = ?`, 70 + s.cfg.Server.Tap.RelayUrl, 71 + ).Scan(&cursor); err != nil { 72 + cursor = 0 73 + } 74 + } 75 + 76 + if total > 0 { 77 + sawWork = true 78 + emptyStreak = 0 79 + } else { 80 + emptyStreak++ 81 + } 82 + 83 + caughtUp := headSeq <= 0 || cursor >= headSeq 84 + drained := sawWork && busy == 0 85 + 86 + if caughtUp && drained { 87 + readyStreak++ 88 + } else { 89 + readyStreak = 0 90 + } 91 + 92 + if readyStreak >= tapDrainStableChecks { 93 + s.l.Info("tap caught up and backfill drained, shutting down embedded tap!", "tracked", total, "cursor", cursor, "head", headSeq) 94 + stop() 95 + s.embedTap.Shutdown() 96 + return 97 + } 98 + if !sawWork && emptyStreak >= tapEmptyGraceChecks { 99 + s.l.Info("tap has nothing to backfill, shutting down embedded tap!") 100 + stop() 101 + s.embedTap.Shutdown() 102 + return 103 + } 104 + } 105 + } 106 + } 107 + 108 + func relayHeadSeq(ctx context.Context, relayURL string) (int64, error) { 109 + u, err := url.Parse(relayURL) 110 + if err != nil { 111 + return 0, err 112 + } 113 + switch u.Scheme { 114 + case "http": 115 + u.Scheme = "ws" 116 + case "https": 117 + u.Scheme = "wss" 118 + } 119 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 120 + 121 + dialCtx, cancelDial := context.WithTimeout(ctx, 15*time.Second) 122 + defer cancelDial() 123 + 124 + conn, _, err := websocket.DefaultDialer.DialContext(dialCtx, u.String(), nil) 125 + if err != nil { 126 + return 0, fmt.Errorf("dial relay: %w", err) 127 + } 128 + defer conn.Close() 129 + 130 + streamCtx, cancelStream := context.WithCancel(dialCtx) 131 + defer cancelStream() 132 + 133 + var seq atomic.Int64 134 + capture := func(v int64) error { 135 + seq.Store(v) 136 + cancelStream() 137 + return nil 138 + } 139 + rsc := &events.RepoStreamCallbacks{ 140 + RepoCommit: func(e *comatproto.SyncSubscribeRepos_Commit) error { return capture(e.Seq) }, 141 + RepoSync: func(e *comatproto.SyncSubscribeRepos_Sync) error { return capture(e.Seq) }, 142 + RepoIdentity: func(e *comatproto.SyncSubscribeRepos_Identity) error { return capture(e.Seq) }, 143 + RepoAccount: func(e *comatproto.SyncSubscribeRepos_Account) error { return capture(e.Seq) }, 144 + } 145 + sched := sequential.NewScheduler("spindle-head-probe", rsc.EventHandler) 146 + _ = events.HandleRepoStream(streamCtx, conn, sched, nil) 147 + 148 + if h := seq.Load(); h > 0 { 149 + return h, nil 150 + } 151 + return 0, fmt.Errorf("no head seq received from relay") 152 + }
+8 -5
spindle/tapclient.go
··· 53 53 return t.tap.AddRepos(ctx, dids) 54 54 } 55 55 56 - func (t *Tap) Start(ctx context.Context) { 57 - go t.tap.Connect(ctx, &tapc.SimpleIndexer{ 56 + func (t *Tap) Start(connCtx context.Context) { 57 + go t.tap.Connect(connCtx, &tapc.SimpleIndexer{ 58 58 EventHandler: t.processEvent, 59 59 ConnectHandler: t.onConnect, 60 60 }) 61 - go t.purgePendingCollabsLoop(ctx) 61 + go t.purgePendingCollabsLoop(t.spindle.rootCtx) 62 62 } 63 63 64 64 func (t *Tap) onConnect(ctx context.Context) { ··· 146 146 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed) 147 147 } 148 148 149 - if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 150 - l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 149 + if e := t.spindle.embedTap; e == nil || !e.closed.Load() { 150 + if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 151 + l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 152 + } 151 153 } 154 + t.spindle.jc.AddDid(ownerDid.String()) 152 155 153 156 t.drainPendingCollabs(ctx, repoDid) 154 157