Monorepo for Tangled
tangled.org
1package knotstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/util/ssrf"
16 "github.com/carlmjohnson/versioninfo"
17 "github.com/gorilla/websocket"
18 "tangled.org/core/knotmirror/config"
19 "tangled.org/core/knotmirror/db"
20 "tangled.org/core/knotmirror/models"
21 "tangled.org/core/log"
22)
23
24type KnotSlurper struct {
25 logger *slog.Logger
26 db *sql.DB
27 cfg config.SlurperConfig
28 ssrf bool
29
30 subsLk sync.Mutex
31 subs map[string]*subscription
32}
33
34func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotSlurper {
35 return &KnotSlurper{
36 logger: log.SubLogger(l, "slurper"),
37 db: db,
38 cfg: cfg.Slurper,
39 ssrf: cfg.KnotSSRF,
40 subs: make(map[string]*subscription),
41 }
42}
43
44func (s *KnotSlurper) Run(ctx context.Context) {
45 for {
46 select {
47 case <-ctx.Done():
48 return
49 case <-time.After(s.cfg.PersistCursorPeriod):
50 if err := s.persistCursors(ctx); err != nil {
51 s.logger.Error("failed to flush cursors", "err", err)
52 }
53 }
54 }
55}
56
57func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool {
58 s.subsLk.Lock()
59 defer s.subsLk.Unlock()
60
61 _, ok := s.subs[hostname]
62 return ok
63}
64
65func (s *KnotSlurper) Shutdown(ctx context.Context) error {
66 s.logger.Info("starting shutdown host cursor flush")
67 err := s.persistCursors(ctx)
68 if err != nil {
69 s.logger.Error("shutdown error", "err", err)
70 }
71 s.logger.Info("slurper shutdown complete")
72 return err
73}
74
75func (s *KnotSlurper) persistCursors(ctx context.Context) error {
76 // // gather cursor list from subscriptions and store them to DB
77 // start := time.Now()
78
79 s.subsLk.Lock()
80 cursors := make([]models.HostCursor, len(s.subs))
81 i := 0
82 for _, sub := range s.subs {
83 cursors[i] = sub.HostCursor()
84 i++
85 }
86 s.subsLk.Unlock()
87
88 err := db.StoreCursors(ctx, s.db, cursors)
89 // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err)
90 return err
91}
92
93func (s *KnotSlurper) Subscribe(host models.Host) error {
94 s.subsLk.Lock()
95 defer s.subsLk.Unlock()
96
97 _, ok := s.subs[host.Hostname]
98 if ok {
99 return fmt.Errorf("already subscribed: %s", host.Hostname)
100 }
101
102 // TODO: include `cancel` function to kill subscription by hostname
103 sub := &subscription{
104 hostname: host.Hostname,
105 scheduler: NewParallelScheduler(
106 s.cfg.ConcurrencyPerHost,
107 host.Hostname,
108 s.ProcessEvent,
109 ),
110 }
111 s.subs[host.Hostname] = sub
112
113 // TODO: use service level context, not the top-most one.
114 // Using top-most context should be avoided to do graceful shutdown.
115 ctx := context.TODO()
116
117 sub.scheduler.Start(ctx)
118 go s.subscribeWithRedialer(ctx, host, sub)
119 return nil
120}
121
122func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) {
123 l := s.logger.With("host", host.Hostname)
124 defer func() {
125 s.subsLk.Lock()
126 defer s.subsLk.Unlock()
127
128 l.Info("unsubscribing knot")
129 delete(s.subs, host.Hostname)
130 }()
131
132 dialer := websocket.Dialer{
133 HandshakeTimeout: time.Second * 5,
134 }
135
136 // if this isn't a localhost / private connection, then we should enable SSRF protections
137 if !host.NoSSL || s.ssrf {
138 netDialer := ssrf.PublicOnlyDialer()
139 dialer.NetDialContext = netDialer.DialContext
140 }
141
142 cursor := host.LastSeq
143
144 connectedInbound.Inc()
145 defer connectedInbound.Dec()
146
147 var backoff int
148 for {
149 select {
150 case <-ctx.Done():
151 return
152 default:
153 }
154 u := host.LegacyEventsURL(cursor)
155 l.Debug("made url with cursor", "cursor", cursor, "url", u)
156
157 // NOTE: manual backoff retry implementation to explicitly handle fails
158 hdr := make(http.Header)
159 hdr.Add("User-Agent", userAgent())
160 conn, resp, err := dialer.DialContext(ctx, u, hdr)
161 if err != nil {
162 l.Warn("dialing failed", "err", err, "backoff", backoff)
163 time.Sleep(sleepForBackoff(backoff))
164 backoff++
165 if backoff > 30 {
166 l.Warn("host does not appear to be online, disabling for now")
167 host.Status = models.HostStatusOffline
168 if err := db.UpsertHost(ctx, s.db, &host); err != nil {
169 l.Error("failed to update host status", "err", err)
170 }
171 return
172 }
173 continue
174 }
175
176 l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u)
177
178 if err := s.handleConnection(ctx, conn, sub); err != nil {
179 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl
180 l.Warn("host connection failed", "err", err, "backoff", backoff)
181 }
182
183 updatedCursor := sub.LastSeq()
184 didProgress := updatedCursor > cursor
185 l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress)
186 if cursor == 0 || didProgress {
187 cursor = updatedCursor
188 backoff = 0
189
190 batch := []models.HostCursor{sub.HostCursor()}
191 if err := db.StoreCursors(ctx, s.db, batch); err != nil {
192 l.Error("failed to store cursors", "err", err)
193 }
194 }
195 }
196}
197
198// handleConnection handles websocket connection.
199// Schedules task from received event and return when connection is closed
200func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error {
201 // ping on every 30s
202 ctx, cancel := context.WithCancel(ctx)
203 defer cancel() // close the background ping job on connection close
204 go func() {
205 t := time.NewTicker(30 * time.Second)
206 defer t.Stop()
207 failcount := 0
208
209 for {
210 select {
211 case <-t.C:
212 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil {
213 s.logger.Warn("failed to ping", "err", err)
214 failcount++
215 if failcount >= 4 {
216 s.logger.Error("too many ping fails", "count", failcount)
217 _ = conn.Close()
218 return
219 }
220 } else {
221 failcount = 0 // ok ping
222 }
223 case <-ctx.Done():
224 _ = conn.Close()
225 return
226 }
227 }
228 }()
229
230 conn.SetPingHandler(func(message string) error {
231 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute))
232 if err == websocket.ErrCloseSent {
233 return nil
234 }
235 return err
236 })
237 conn.SetPongHandler(func(_ string) error {
238 if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil {
239 s.logger.Error("failed to set read deadline", "err", err)
240 }
241 return nil
242 })
243
244 for {
245 select {
246 case <-ctx.Done():
247 return ctx.Err()
248 default:
249 }
250 msgType, msg, err := conn.ReadMessage()
251 if err != nil {
252 return err
253 }
254
255 if msgType != websocket.TextMessage {
256 continue
257 }
258
259 sub.scheduler.AddTask(ctx, &Task{
260 Key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency
261 message: msg,
262 })
263 }
264}
265
266type legacyGitRefUpdate struct {
267 OwnerDid *string `json:"ownerDid,omitempty"`
268 Repo *string `json:"repo,omitempty"`
269 LegacyRepoDid *string `json:"repoDid,omitempty"`
270}
271
272type LegacyGitEvent struct {
273 Rkey string
274 Nsid string
275 Event legacyGitRefUpdate
276}
277
278func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error {
279 var legacyMessage LegacyGitEvent
280 if err := json.Unmarshal(task.message, &legacyMessage); err != nil {
281 return fmt.Errorf("unmarshaling message: %w", err)
282 }
283
284 if err := s.ProcessLegacyGitRefUpdate(ctx, task.Key, &legacyMessage); err != nil {
285 return fmt.Errorf("processing gitRefUpdate: %w", err)
286 }
287 return nil
288}
289
290// lookupRepoForRefUpdate resolves the local repo row for an incoming refUpdate
291// via the stable RepoDid join. Returns (nil, "", nil) when the event has no
292// repoDid (unjoinable) and (nil, key, nil) on a clean miss.
293func (s *KnotSlurper) lookupRepoForRefUpdate(ctx context.Context, evt *LegacyGitEvent) (*models.Repo, string, error) {
294 raw := evt.Event.Repo
295 if raw == nil || *raw == "" {
296 raw = evt.Event.LegacyRepoDid
297 }
298 if raw == nil || *raw == "" {
299 return nil, "", nil
300 }
301 repoDid := syntax.DID(*raw)
302 curr, err := db.GetRepoByRepoDid(ctx, s.db, repoDid)
303 return curr, repoDid.String(), err
304}
305
306func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, source string, evt *LegacyGitEvent) error {
307 knotstreamEventsReceived.Inc()
308
309 l := s.logger.With("src", source)
310
311 curr, lookupKey, err := s.lookupRepoForRefUpdate(ctx, evt)
312 if err != nil {
313 return fmt.Errorf("failed to get repo '%s': %w", lookupKey, err)
314 }
315 if curr == nil {
316 if lookupKey == "" {
317 l.Warn("skipping gitRefUpdate: event has no fields to join on",
318 "repo", evt.Event.Repo, "legacy_repo_did", evt.Event.LegacyRepoDid)
319 } else {
320 // if repo doesn't exist in DB, just ignore the event. That repo is unknown.
321 // Hopefully crawler/tap will sync it later.
322 l.Warn("skipping event from unknown repo", "key", lookupKey)
323 }
324 knotstreamEventsSkipped.Inc()
325 return nil
326 }
327 l = l.With("repoAt", curr.AtUri())
328
329 // TODO: should plan resync to resyncBuffer on RepoStateResyncing
330 if curr.State != models.RepoStateActive {
331 l.Debug("skipping non-active repo")
332 knotstreamEventsSkipped.Inc()
333 return nil
334 }
335
336 if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() {
337 l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev)
338 knotstreamEventsSkipped.Inc()
339 return nil
340 }
341
342 // can't skip anything, update repo state
343 if err := db.UpdateRepoState(ctx, s.db, curr.RepoDid, models.RepoStateDesynchronized); err != nil {
344 return err
345 }
346
347 l.Info("event processed", "eventRev", evt.Rkey)
348
349 knotstreamEventsProcessed.Inc()
350 return nil
351}
352
353func userAgent() string {
354 return fmt.Sprintf("knotmirror/%s", versioninfo.Short())
355}
356
357func sleepForBackoff(b int) time.Duration {
358 if b == 0 {
359 return 0
360 }
361 if b < 10 {
362 return time.Millisecond * time.Duration((50*b)+rand.Intn(500))
363 }
364 return time.Second * 30
365}