Monorepo for Tangled
tangled.org
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/netip"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/knotmirror/config"
17 "tangled.org/core/knotmirror/db"
18 "tangled.org/core/knotmirror/knotstream"
19 "tangled.org/core/knotmirror/models"
20 "tangled.org/core/log"
21 "tangled.org/core/tapc"
22)
23
24type Tap struct {
25 logger *slog.Logger
26 cfg *config.Config
27 tap tapc.Client
28 db *sql.DB
29 gitm GitMirrorManager
30 ks *knotstream.KnotStream
31}
32
33func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap {
34 return &Tap{
35 logger: log.SubLogger(l, "tapclient"),
36 cfg: cfg,
37 tap: tapc.NewClient(cfg.TapUrl, ""),
38 db: db,
39 gitm: gitm,
40 ks: ks,
41 }
42}
43
44func (t *Tap) Start(ctx context.Context) {
45 // TODO: better reconnect logic
46 go func() {
47 for {
48 t.tap.Connect(ctx, &tapc.SimpleIndexer{
49 EventHandler: t.processEvent,
50 })
51 time.Sleep(time.Second)
52 }
53 }()
54}
55
56func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
57 l := t.logger.With("component", "tapIndexer")
58
59 var err error
60 switch evt.Type {
61 case tapc.EvtRecord:
62 switch evt.Record.Collection.String() {
63 case tangled.RepoNSID:
64 err = t.processRepo(ctx, evt.Record)
65 }
66 }
67
68 if err != nil {
69 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
70 return err
71 }
72 return nil
73}
74
75func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
76 switch evt.Action {
77 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
78 record := tangled.Repo{}
79 if err := json.Unmarshal(evt.Record, &record); err != nil {
80 return fmt.Errorf("parsing record: %w", err)
81 }
82
83 knotUrl := record.Knot
84 if !strings.Contains(record.Knot, "://") {
85 if host, _ := db.GetHost(ctx, t.db, record.Knot); host != nil {
86 knotUrl = host.URL()
87 } else {
88 t.logger.Warn("repo is from unknown knot")
89 if t.cfg.KnotUseSSL {
90 knotUrl = "https://" + knotUrl
91 } else {
92 knotUrl = "http://" + knotUrl
93 }
94 }
95 }
96
97 status := models.RepoStatePending
98 errMsg := ""
99 u, err := url.Parse(knotUrl)
100 if err != nil {
101 status = models.RepoStateSuspended
102 errMsg = "failed to parse knot url"
103 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) {
104 status = models.RepoStateSuspended
105 errMsg = "suspending non-public knot"
106 }
107
108 if record.RepoDid == nil || *record.RepoDid == "" {
109 t.logger.Warn("dropping repo record without repo_did", "did", evt.Did, "rkey", evt.Rkey)
110 return nil
111 }
112 repo := &models.Repo{
113 Did: evt.Did,
114 Rkey: evt.Rkey,
115 Cid: evt.CID,
116 Name: evt.Rkey.String(),
117 KnotDomain: knotUrl,
118 RepoDid: syntax.DID(*record.RepoDid),
119 State: status,
120 ErrorMsg: errMsg,
121 RetryAfter: 0, // clear retry info
122 RetryCount: 0,
123 }
124
125 if err := db.UpsertRepo(ctx, t.db, repo); err != nil {
126 return fmt.Errorf("upserting repo to db: %w", err)
127 }
128
129 if !t.ks.CheckIfSubscribed(record.Knot) {
130 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil {
131 return fmt.Errorf("subscribing to knot: %w", err)
132 }
133 }
134
135 case tapc.RecordDeleteAction:
136 // no-op. deletion of sh.tangled.repo record doesn't mean repository deletion
137 }
138 return nil
139}
140
141// isPrivate checks if host is private network. It doesn't perform DNS resolution
142func isPrivate(host string) bool {
143 if host == "localhost" {
144 return true
145 }
146 addr, err := netip.ParseAddr(host)
147 if err != nil {
148 return false
149 }
150 return isPrivateAddr(addr)
151}
152
153func isPrivateAddr(addr netip.Addr) bool {
154 return addr.IsLoopback() ||
155 addr.IsPrivate() ||
156 addr.IsLinkLocalUnicast() ||
157 addr.IsLinkLocalMulticast()
158}