Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "net/url"
8 "sync/atomic"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/events"
13 "github.com/bluesky-social/indigo/events/schedulers/sequential"
14 "github.com/gorilla/websocket"
15 _ "github.com/mattn/go-sqlite3"
16)
17
18const (
19 tapDrainPollInterval = 3 * time.Second
20 tapDrainStableChecks = 2
21 tapEmptyGraceChecks = 10
22)
23
24func (s *Spindle) watchTapDrain(ctx context.Context, stop context.CancelFunc) {
25 headSeq, err := relayHeadSeq(ctx, s.cfg.Server.Tap.RelayUrl)
26 if err != nil {
27 s.l.Warn("tap drain watcher: relay head checking failed, falling back to resync-drain only", "err", err)
28 headSeq = 0
29 } else {
30 s.l.Info("tap drain watcher: relay head seq at startup", "head", headSeq, "relay", s.cfg.Server.Tap.RelayUrl)
31 }
32
33 conn, err := sql.Open("sqlite3", s.cfg.Server.Tap.DBPath)
34 if err != nil {
35 s.l.Warn("tap drain watcher: opening tap db failed", "err", err)
36 return
37 }
38 defer conn.Close()
39
40 ticker := time.NewTicker(tapDrainPollInterval)
41 defer ticker.Stop()
42
43 sawWork := false
44 readyStreak := 0
45 emptyStreak := 0
46 queryFailed := false
47
48 for {
49 select {
50 case <-ctx.Done():
51 return
52 case <-ticker.C:
53 var total, busy int
54 if err := conn.QueryRowContext(ctx, `
55 select count(*),
56 coalesce(sum(case when state in ('pending','resyncing','desynchronized') then 1 else 0 end), 0)
57 from repos`).Scan(&total, &busy); err != nil {
58 if !queryFailed {
59 s.l.Warn("tap drain watcher: repos query failed", "err", err)
60 queryFailed = true
61 }
62 continue
63 }
64 queryFailed = false
65
66 var cursor int64
67 if headSeq > 0 {
68 if err := conn.QueryRowContext(ctx,
69 `select cursor from firehose_cursors where url = ?`,
70 s.cfg.Server.Tap.RelayUrl,
71 ).Scan(&cursor); err != nil {
72 cursor = 0
73 }
74 }
75
76 if total > 0 {
77 sawWork = true
78 emptyStreak = 0
79 } else {
80 emptyStreak++
81 }
82
83 caughtUp := headSeq <= 0 || cursor >= headSeq
84 drained := sawWork && busy == 0
85
86 if caughtUp && drained {
87 readyStreak++
88 } else {
89 readyStreak = 0
90 }
91
92 if readyStreak >= tapDrainStableChecks {
93 s.l.Info("tap caught up and backfill drained, shutting down embedded tap!", "tracked", total, "cursor", cursor, "head", headSeq)
94 stop()
95 s.embedTap.Shutdown()
96 return
97 }
98 if !sawWork && emptyStreak >= tapEmptyGraceChecks {
99 s.l.Info("tap has nothing to backfill, shutting down embedded tap!")
100 stop()
101 s.embedTap.Shutdown()
102 return
103 }
104 }
105 }
106}
107
108func relayHeadSeq(ctx context.Context, relayURL string) (int64, error) {
109 u, err := url.Parse(relayURL)
110 if err != nil {
111 return 0, err
112 }
113 switch u.Scheme {
114 case "http":
115 u.Scheme = "ws"
116 case "https":
117 u.Scheme = "wss"
118 }
119 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
120
121 dialCtx, cancelDial := context.WithTimeout(ctx, 15*time.Second)
122 defer cancelDial()
123
124 conn, _, err := websocket.DefaultDialer.DialContext(dialCtx, u.String(), nil)
125 if err != nil {
126 return 0, fmt.Errorf("dial relay: %w", err)
127 }
128 defer conn.Close()
129
130 streamCtx, cancelStream := context.WithCancel(dialCtx)
131 defer cancelStream()
132
133 var seq atomic.Int64
134 capture := func(v int64) error {
135 seq.Store(v)
136 cancelStream()
137 return nil
138 }
139 rsc := &events.RepoStreamCallbacks{
140 RepoCommit: func(e *comatproto.SyncSubscribeRepos_Commit) error { return capture(e.Seq) },
141 RepoSync: func(e *comatproto.SyncSubscribeRepos_Sync) error { return capture(e.Seq) },
142 RepoIdentity: func(e *comatproto.SyncSubscribeRepos_Identity) error { return capture(e.Seq) },
143 RepoAccount: func(e *comatproto.SyncSubscribeRepos_Account) error { return capture(e.Seq) },
144 }
145 sched := sequential.NewScheduler("spindle-head-probe", rsc.EventHandler)
146 _ = events.HandleRepoStream(streamCtx, conn, sched, nil)
147
148 if h := seq.Load(); h > 0 {
149 return h, nil
150 }
151 return 0, fmt.Errorf("no head seq received from relay")
152}