Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package knotstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/util/ssrf" 16 "github.com/carlmjohnson/versioninfo" 17 "github.com/gorilla/websocket" 18 "tangled.org/core/knotmirror/config" 19 "tangled.org/core/knotmirror/db" 20 "tangled.org/core/knotmirror/models" 21 "tangled.org/core/log" 22) 23 24type KnotSlurper struct { 25 logger *slog.Logger 26 db *sql.DB 27 cfg config.SlurperConfig 28 ssrf bool 29 30 subsLk sync.Mutex 31 subs map[string]*subscription 32} 33 34func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotSlurper { 35 return &KnotSlurper{ 36 logger: log.SubLogger(l, "slurper"), 37 db: db, 38 cfg: cfg.Slurper, 39 ssrf: cfg.KnotSSRF, 40 subs: make(map[string]*subscription), 41 } 42} 43 44func (s *KnotSlurper) Run(ctx context.Context) { 45 for { 46 select { 47 case <-ctx.Done(): 48 return 49 case <-time.After(s.cfg.PersistCursorPeriod): 50 if err := s.persistCursors(ctx); err != nil { 51 s.logger.Error("failed to flush cursors", "err", err) 52 } 53 } 54 } 55} 56 57func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 58 s.subsLk.Lock() 59 defer s.subsLk.Unlock() 60 61 _, ok := s.subs[hostname] 62 return ok 63} 64 65func (s *KnotSlurper) Shutdown(ctx context.Context) error { 66 s.logger.Info("starting shutdown host cursor flush") 67 err := s.persistCursors(ctx) 68 if err != nil { 69 s.logger.Error("shutdown error", "err", err) 70 } 71 s.logger.Info("slurper shutdown complete") 72 return err 73} 74 75func (s *KnotSlurper) persistCursors(ctx context.Context) error { 76 // // gather cursor list from subscriptions and store them to DB 77 // start := time.Now() 78 79 s.subsLk.Lock() 80 cursors := make([]models.HostCursor, len(s.subs)) 81 i := 0 82 for _, sub := range s.subs { 83 cursors[i] = sub.HostCursor() 84 i++ 85 } 86 s.subsLk.Unlock() 87 88 err := db.StoreCursors(ctx, s.db, cursors) 89 // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 90 return err 91} 92 93func (s *KnotSlurper) Subscribe(host models.Host) error { 94 s.subsLk.Lock() 95 defer s.subsLk.Unlock() 96 97 _, ok := s.subs[host.Hostname] 98 if ok { 99 return fmt.Errorf("already subscribed: %s", host.Hostname) 100 } 101 102 // TODO: include `cancel` function to kill subscription by hostname 103 sub := &subscription{ 104 hostname: host.Hostname, 105 scheduler: NewParallelScheduler( 106 s.cfg.ConcurrencyPerHost, 107 host.Hostname, 108 s.ProcessEvent, 109 ), 110 } 111 s.subs[host.Hostname] = sub 112 113 // TODO: use service level context, not the top-most one. 114 // Using top-most context should be avoided to do graceful shutdown. 115 ctx := context.TODO() 116 117 sub.scheduler.Start(ctx) 118 go s.subscribeWithRedialer(ctx, host, sub) 119 return nil 120} 121 122func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 123 l := s.logger.With("host", host.Hostname) 124 defer func() { 125 s.subsLk.Lock() 126 defer s.subsLk.Unlock() 127 128 l.Info("unsubscribing knot") 129 delete(s.subs, host.Hostname) 130 }() 131 132 dialer := websocket.Dialer{ 133 HandshakeTimeout: time.Second * 5, 134 } 135 136 // if this isn't a localhost / private connection, then we should enable SSRF protections 137 if !host.NoSSL || s.ssrf { 138 netDialer := ssrf.PublicOnlyDialer() 139 dialer.NetDialContext = netDialer.DialContext 140 } 141 142 cursor := host.LastSeq 143 144 connectedInbound.Inc() 145 defer connectedInbound.Dec() 146 147 var backoff int 148 for { 149 select { 150 case <-ctx.Done(): 151 return 152 default: 153 } 154 u := host.LegacyEventsURL(cursor) 155 l.Debug("made url with cursor", "cursor", cursor, "url", u) 156 157 // NOTE: manual backoff retry implementation to explicitly handle fails 158 hdr := make(http.Header) 159 hdr.Add("User-Agent", userAgent()) 160 conn, resp, err := dialer.DialContext(ctx, u, hdr) 161 if err != nil { 162 l.Warn("dialing failed", "err", err, "backoff", backoff) 163 time.Sleep(sleepForBackoff(backoff)) 164 backoff++ 165 if backoff > 30 { 166 l.Warn("host does not appear to be online, disabling for now") 167 host.Status = models.HostStatusOffline 168 if err := db.UpsertHost(ctx, s.db, &host); err != nil { 169 l.Error("failed to update host status", "err", err) 170 } 171 return 172 } 173 continue 174 } 175 176 l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 177 178 if err := s.handleConnection(ctx, conn, sub); err != nil { 179 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 180 l.Warn("host connection failed", "err", err, "backoff", backoff) 181 } 182 183 updatedCursor := sub.LastSeq() 184 didProgress := updatedCursor > cursor 185 l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 186 if cursor == 0 || didProgress { 187 cursor = updatedCursor 188 backoff = 0 189 190 batch := []models.HostCursor{sub.HostCursor()} 191 if err := db.StoreCursors(ctx, s.db, batch); err != nil { 192 l.Error("failed to store cursors", "err", err) 193 } 194 } 195 } 196} 197 198// handleConnection handles websocket connection. 199// Schedules task from received event and return when connection is closed 200func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 201 // ping on every 30s 202 ctx, cancel := context.WithCancel(ctx) 203 defer cancel() // close the background ping job on connection close 204 go func() { 205 t := time.NewTicker(30 * time.Second) 206 defer t.Stop() 207 failcount := 0 208 209 for { 210 select { 211 case <-t.C: 212 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 213 s.logger.Warn("failed to ping", "err", err) 214 failcount++ 215 if failcount >= 4 { 216 s.logger.Error("too many ping fails", "count", failcount) 217 _ = conn.Close() 218 return 219 } 220 } else { 221 failcount = 0 // ok ping 222 } 223 case <-ctx.Done(): 224 _ = conn.Close() 225 return 226 } 227 } 228 }() 229 230 conn.SetPingHandler(func(message string) error { 231 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 232 if err == websocket.ErrCloseSent { 233 return nil 234 } 235 return err 236 }) 237 conn.SetPongHandler(func(_ string) error { 238 if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 239 s.logger.Error("failed to set read deadline", "err", err) 240 } 241 return nil 242 }) 243 244 for { 245 select { 246 case <-ctx.Done(): 247 return ctx.Err() 248 default: 249 } 250 msgType, msg, err := conn.ReadMessage() 251 if err != nil { 252 return err 253 } 254 255 if msgType != websocket.TextMessage { 256 continue 257 } 258 259 sub.scheduler.AddTask(ctx, &Task{ 260 Key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 261 message: msg, 262 }) 263 } 264} 265 266type legacyGitRefUpdate struct { 267 OwnerDid *string `json:"ownerDid,omitempty"` 268 Repo *string `json:"repo,omitempty"` 269 LegacyRepoDid *string `json:"repoDid,omitempty"` 270} 271 272type LegacyGitEvent struct { 273 Rkey string 274 Nsid string 275 Event legacyGitRefUpdate 276} 277 278func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 279 var legacyMessage LegacyGitEvent 280 if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 281 return fmt.Errorf("unmarshaling message: %w", err) 282 } 283 284 if err := s.ProcessLegacyGitRefUpdate(ctx, task.Key, &legacyMessage); err != nil { 285 return fmt.Errorf("processing gitRefUpdate: %w", err) 286 } 287 return nil 288} 289 290// lookupRepoForRefUpdate resolves the local repo row for an incoming refUpdate 291// via the stable RepoDid join. Returns (nil, "", nil) when the event has no 292// repoDid (unjoinable) and (nil, key, nil) on a clean miss. 293func (s *KnotSlurper) lookupRepoForRefUpdate(ctx context.Context, evt *LegacyGitEvent) (*models.Repo, string, error) { 294 raw := evt.Event.Repo 295 if raw == nil || *raw == "" { 296 raw = evt.Event.LegacyRepoDid 297 } 298 if raw == nil || *raw == "" { 299 return nil, "", nil 300 } 301 repoDid := syntax.DID(*raw) 302 curr, err := db.GetRepoByRepoDid(ctx, s.db, repoDid) 303 return curr, repoDid.String(), err 304} 305 306func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, source string, evt *LegacyGitEvent) error { 307 knotstreamEventsReceived.Inc() 308 309 l := s.logger.With("src", source) 310 311 curr, lookupKey, err := s.lookupRepoForRefUpdate(ctx, evt) 312 if err != nil { 313 return fmt.Errorf("failed to get repo '%s': %w", lookupKey, err) 314 } 315 if curr == nil { 316 if lookupKey == "" { 317 l.Warn("skipping gitRefUpdate: event has no fields to join on", 318 "repo", evt.Event.Repo, "legacy_repo_did", evt.Event.LegacyRepoDid) 319 } else { 320 // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 321 // Hopefully crawler/tap will sync it later. 322 l.Warn("skipping event from unknown repo", "key", lookupKey) 323 } 324 knotstreamEventsSkipped.Inc() 325 return nil 326 } 327 l = l.With("repoAt", curr.AtUri()) 328 329 // TODO: should plan resync to resyncBuffer on RepoStateResyncing 330 if curr.State != models.RepoStateActive { 331 l.Debug("skipping non-active repo") 332 knotstreamEventsSkipped.Inc() 333 return nil 334 } 335 336 if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 337 l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev) 338 knotstreamEventsSkipped.Inc() 339 return nil 340 } 341 342 // can't skip anything, update repo state 343 if err := db.UpdateRepoState(ctx, s.db, curr.RepoDid, models.RepoStateDesynchronized); err != nil { 344 return err 345 } 346 347 l.Info("event processed", "eventRev", evt.Rkey) 348 349 knotstreamEventsProcessed.Inc() 350 return nil 351} 352 353func userAgent() string { 354 return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 355} 356 357func sleepForBackoff(b int) time.Duration { 358 if b == 0 { 359 return 0 360 } 361 if b < 10 { 362 return time.Millisecond * time.Duration((50*b)+rand.Intn(500)) 363 } 364 return time.Second * 30 365}