Monorepo for Tangled
tangled.org
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/netip"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/knotmirror/config"
17 "tangled.org/core/knotmirror/db"
18 "tangled.org/core/knotmirror/knotstream"
19 "tangled.org/core/knotmirror/models"
20 "tangled.org/core/log"
21 "tangled.org/core/tapc"
22)
23
24type Tap struct {
25 logger *slog.Logger
26 cfg *config.Config
27 tap tapc.Client
28 db *sql.DB
29 gitm GitMirrorManager
30 ks *knotstream.KnotStream
31}
32
33func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap {
34 return &Tap{
35 logger: log.SubLogger(l, "tapclient"),
36 cfg: cfg,
37 tap: tapc.NewClient(cfg.TapUrl, ""),
38 db: db,
39 gitm: gitm,
40 ks: ks,
41 }
42}
43
44func (t *Tap) Start(ctx context.Context) {
45 // TODO: better reconnect logic
46 go func() {
47 for {
48 t.tap.Connect(ctx, &tapc.SimpleIndexer{
49 EventHandler: t.processEvent,
50 })
51 time.Sleep(time.Second)
52 }
53 }()
54}
55
56func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
57 l := t.logger.With("component", "tapIndexer")
58
59 var err error
60 switch evt.Type {
61 case tapc.EvtRecord:
62 switch evt.Record.Collection.String() {
63 case tangled.RepoNSID:
64 err = t.processRepo(ctx, evt.Record)
65 }
66 }
67
68 if err != nil {
69 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
70 return err
71 }
72 return nil
73}
74
75func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
76 switch evt.Action {
77 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
78 record := tangled.Repo{}
79 if err := json.Unmarshal(evt.Record, &record); err != nil {
80 return fmt.Errorf("parsing record: %w", err)
81 }
82
83 knotUrl := record.Knot
84 if !strings.Contains(record.Knot, "://") {
85 if host, _ := db.GetHost(ctx, t.db, record.Knot); host != nil {
86 knotUrl = host.URL()
87 } else {
88 t.logger.Warn("repo is from unknown knot")
89 if t.cfg.KnotUseSSL {
90 knotUrl = "https://" + knotUrl
91 } else {
92 knotUrl = "http://" + knotUrl
93 }
94 }
95 }
96
97 status := models.RepoStatePending
98 errMsg := ""
99 u, err := url.Parse(knotUrl)
100 if err != nil {
101 status = models.RepoStateSuspended
102 errMsg = "failed to parse knot url"
103 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) {
104 status = models.RepoStateSuspended
105 errMsg = "suspending non-public knot"
106 }
107
108 if record.RepoDid == nil || *record.RepoDid == "" {
109 t.logger.Warn("dropping repo record without repo_did", "did", evt.Did, "rkey", evt.Rkey)
110 return nil
111 }
112 repoDid, err := syntax.ParseDID(*record.RepoDid)
113 if err != nil {
114 t.logger.Warn("dropping repo record with invalid DID", "did", evt.Did, "rkey", evt.Rkey, "repo", repoDid)
115 return nil
116 }
117 repo := &models.Repo{
118 Did: evt.Did,
119 Rkey: evt.Rkey,
120 Cid: evt.CID,
121 Name: evt.Rkey.String(),
122 KnotDomain: knotUrl,
123 RepoDid: repoDid,
124 State: status,
125 ErrorMsg: errMsg,
126 RetryAfter: 0, // clear retry info
127 RetryCount: 0,
128 }
129
130 if err := db.UpsertRepo(ctx, t.db, repo); err != nil {
131 return fmt.Errorf("upserting repo to db: %w", err)
132 }
133
134 if !t.ks.CheckIfSubscribed(record.Knot) {
135 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil {
136 return fmt.Errorf("subscribing to knot: %w", err)
137 }
138 }
139
140 case tapc.RecordDeleteAction:
141 // no-op. deletion of sh.tangled.repo record doesn't mean repository deletion
142 }
143 return nil
144}
145
146// isPrivate checks if host is private network. It doesn't perform DNS resolution
147func isPrivate(host string) bool {
148 if host == "localhost" {
149 return true
150 }
151 addr, err := netip.ParseAddr(host)
152 if err != nil {
153 return false
154 }
155 return isPrivateAddr(addr)
156}
157
158func isPrivateAddr(addr netip.Addr) bool {
159 return addr.IsLoopback() ||
160 addr.IsPrivate() ||
161 addr.IsLinkLocalUnicast() ||
162 addr.IsLinkLocalMulticast()
163}