Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "net/url" 8 "sync/atomic" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/events" 13 "github.com/bluesky-social/indigo/events/schedulers/sequential" 14 "github.com/gorilla/websocket" 15 _ "github.com/mattn/go-sqlite3" 16) 17 18const ( 19 tapDrainPollInterval = 3 * time.Second 20 tapDrainStableChecks = 2 21 tapEmptyGraceChecks = 10 22) 23 24func (s *Spindle) watchTapDrain(ctx context.Context, stop context.CancelFunc) { 25 headSeq, err := relayHeadSeq(ctx, s.cfg.Server.Tap.RelayUrl) 26 if err != nil { 27 s.l.Warn("tap drain watcher: relay head checking failed, falling back to resync-drain only", "err", err) 28 headSeq = 0 29 } else { 30 s.l.Info("tap drain watcher: relay head seq at startup", "head", headSeq, "relay", s.cfg.Server.Tap.RelayUrl) 31 } 32 33 conn, err := sql.Open("sqlite3", s.cfg.Server.Tap.DBPath) 34 if err != nil { 35 s.l.Warn("tap drain watcher: opening tap db failed", "err", err) 36 return 37 } 38 defer conn.Close() 39 40 ticker := time.NewTicker(tapDrainPollInterval) 41 defer ticker.Stop() 42 43 sawWork := false 44 readyStreak := 0 45 emptyStreak := 0 46 queryFailed := false 47 48 for { 49 select { 50 case <-ctx.Done(): 51 return 52 case <-ticker.C: 53 var total, busy int 54 if err := conn.QueryRowContext(ctx, ` 55 select count(*), 56 coalesce(sum(case when state in ('pending','resyncing','desynchronized') then 1 else 0 end), 0) 57 from repos`).Scan(&total, &busy); err != nil { 58 if !queryFailed { 59 s.l.Warn("tap drain watcher: repos query failed", "err", err) 60 queryFailed = true 61 } 62 continue 63 } 64 queryFailed = false 65 66 var cursor int64 67 if headSeq > 0 { 68 if err := conn.QueryRowContext(ctx, 69 `select cursor from firehose_cursors where url = ?`, 70 s.cfg.Server.Tap.RelayUrl, 71 ).Scan(&cursor); err != nil { 72 cursor = 0 73 } 74 } 75 76 if total > 0 { 77 sawWork = true 78 emptyStreak = 0 79 } else { 80 emptyStreak++ 81 } 82 83 caughtUp := headSeq <= 0 || cursor >= headSeq 84 drained := sawWork && busy == 0 85 86 if caughtUp && drained { 87 readyStreak++ 88 } else { 89 readyStreak = 0 90 } 91 92 if readyStreak >= tapDrainStableChecks { 93 s.l.Info("tap caught up and backfill drained, shutting down embedded tap!", "tracked", total, "cursor", cursor, "head", headSeq) 94 stop() 95 s.embedTap.Shutdown() 96 return 97 } 98 if !sawWork && emptyStreak >= tapEmptyGraceChecks { 99 s.l.Info("tap has nothing to backfill, shutting down embedded tap!") 100 stop() 101 s.embedTap.Shutdown() 102 return 103 } 104 } 105 } 106} 107 108func relayHeadSeq(ctx context.Context, relayURL string) (int64, error) { 109 u, err := url.Parse(relayURL) 110 if err != nil { 111 return 0, err 112 } 113 switch u.Scheme { 114 case "http": 115 u.Scheme = "ws" 116 case "https": 117 u.Scheme = "wss" 118 } 119 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 120 121 dialCtx, cancelDial := context.WithTimeout(ctx, 15*time.Second) 122 defer cancelDial() 123 124 conn, _, err := websocket.DefaultDialer.DialContext(dialCtx, u.String(), nil) 125 if err != nil { 126 return 0, fmt.Errorf("dial relay: %w", err) 127 } 128 defer conn.Close() 129 130 streamCtx, cancelStream := context.WithCancel(dialCtx) 131 defer cancelStream() 132 133 var seq atomic.Int64 134 capture := func(v int64) error { 135 seq.Store(v) 136 cancelStream() 137 return nil 138 } 139 rsc := &events.RepoStreamCallbacks{ 140 RepoCommit: func(e *comatproto.SyncSubscribeRepos_Commit) error { return capture(e.Seq) }, 141 RepoSync: func(e *comatproto.SyncSubscribeRepos_Sync) error { return capture(e.Seq) }, 142 RepoIdentity: func(e *comatproto.SyncSubscribeRepos_Identity) error { return capture(e.Seq) }, 143 RepoAccount: func(e *comatproto.SyncSubscribeRepos_Account) error { return capture(e.Seq) }, 144 } 145 sched := sequential.NewScheduler("spindle-head-probe", rsc.EventHandler) 146 _ = events.HandleRepoStream(streamCtx, conn, sched, nil) 147 148 if h := seq.Load(); h > 0 { 149 return h, nil 150 } 151 return 0, fmt.Errorf("no head seq received from relay") 152}