Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

at master 3.9 kB View raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/netip" 10 "net/url" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/knotmirror/config" 17 "tangled.org/core/knotmirror/db" 18 "tangled.org/core/knotmirror/knotstream" 19 "tangled.org/core/knotmirror/models" 20 "tangled.org/core/log" 21 "tangled.org/core/tapc" 22) 23 24type Tap struct { 25 logger *slog.Logger 26 cfg *config.Config 27 tap tapc.Client 28 db *sql.DB 29 gitm GitMirrorManager 30 ks *knotstream.KnotStream 31} 32 33func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { 34 return &Tap{ 35 logger: log.SubLogger(l, "tapclient"), 36 cfg: cfg, 37 tap: tapc.NewClient(cfg.TapUrl, ""), 38 db: db, 39 gitm: gitm, 40 ks: ks, 41 } 42} 43 44func (t *Tap) Start(ctx context.Context) { 45 // TODO: better reconnect logic 46 go func() { 47 for { 48 t.tap.Connect(ctx, &tapc.SimpleIndexer{ 49 EventHandler: t.processEvent, 50 }) 51 time.Sleep(time.Second) 52 } 53 }() 54} 55 56func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 57 l := t.logger.With("component", "tapIndexer") 58 59 var err error 60 switch evt.Type { 61 case tapc.EvtRecord: 62 switch evt.Record.Collection.String() { 63 case tangled.RepoNSID: 64 err = t.processRepo(ctx, evt.Record) 65 } 66 } 67 68 if err != nil { 69 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 70 return err 71 } 72 return nil 73} 74 75func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 76 switch evt.Action { 77 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 78 record := tangled.Repo{} 79 if err := json.Unmarshal(evt.Record, &record); err != nil { 80 return fmt.Errorf("parsing record: %w", err) 81 } 82 83 knotUrl := record.Knot 84 if !strings.Contains(record.Knot, "://") { 85 if host, _ := db.GetHost(ctx, t.db, record.Knot); host != nil { 86 knotUrl = host.URL() 87 } else { 88 t.logger.Warn("repo is from unknown knot") 89 if t.cfg.KnotUseSSL { 90 knotUrl = "https://" + knotUrl 91 } else { 92 knotUrl = "http://" + knotUrl 93 } 94 } 95 } 96 97 status := models.RepoStatePending 98 errMsg := "" 99 u, err := url.Parse(knotUrl) 100 if err != nil { 101 status = models.RepoStateSuspended 102 errMsg = "failed to parse knot url" 103 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 104 status = models.RepoStateSuspended 105 errMsg = "suspending non-public knot" 106 } 107 108 if record.RepoDid == nil || *record.RepoDid == "" { 109 t.logger.Warn("dropping repo record without repo_did", "did", evt.Did, "rkey", evt.Rkey) 110 return nil 111 } 112 repo := &models.Repo{ 113 Did: evt.Did, 114 Rkey: evt.Rkey, 115 Cid: evt.CID, 116 Name: evt.Rkey.String(), 117 KnotDomain: knotUrl, 118 RepoDid: syntax.DID(*record.RepoDid), 119 State: status, 120 ErrorMsg: errMsg, 121 RetryAfter: 0, // clear retry info 122 RetryCount: 0, 123 } 124 125 if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 126 return fmt.Errorf("upserting repo to db: %w", err) 127 } 128 129 if !t.ks.CheckIfSubscribed(record.Knot) { 130 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 131 return fmt.Errorf("subscribing to knot: %w", err) 132 } 133 } 134 135 case tapc.RecordDeleteAction: 136 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 137 return fmt.Errorf("deleting repo from db: %w", err) 138 } 139 } 140 return nil 141} 142 143// isPrivate checks if host is private network. It doesn't perform DNS resolution 144func isPrivate(host string) bool { 145 if host == "localhost" { 146 return true 147 } 148 addr, err := netip.ParseAddr(host) 149 if err != nil { 150 return false 151 } 152 return isPrivateAddr(addr) 153} 154 155func isPrivateAddr(addr netip.Addr) bool { 156 return addr.IsLoopback() || 157 addr.IsPrivate() || 158 addr.IsLinkLocalUnicast() || 159 addr.IsLinkLocalMulticast() 160}