Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand/v2"
10 "os"
11 "path/filepath"
12 "strings"
13 "time"
14
15 "tangled.org/core/knotserver/config"
16 "tangled.org/core/knotserver/db"
17 "tangled.org/core/knotserver/repodid"
18 "tangled.org/core/notifier"
19 "tangled.org/core/rbac"
20)
21
22type legacyRepo struct {
23 ownerDid string
24 repoName string
25 oldPath string
26}
27
28func scanLegacyRepos(scanPath string, logger *slog.Logger) []legacyRepo {
29 topEntries, err := os.ReadDir(scanPath)
30 if err != nil {
31 logger.Error("reading scan path", "error", err)
32 return nil
33 }
34
35 var repos []legacyRepo
36 for _, entry := range topEntries {
37 if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") {
38 continue
39 }
40
41 ownerPath := filepath.Join(scanPath, entry.Name())
42
43 if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil {
44 continue
45 }
46
47 subEntries, readErr := os.ReadDir(ownerPath)
48 if readErr != nil {
49 logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr)
50 continue
51 }
52
53 ownerDid := entry.Name()
54 for _, sub := range subEntries {
55 if !sub.IsDir() {
56 continue
57 }
58 subPath := filepath.Join(ownerPath, sub.Name())
59 if _, statErr := os.Stat(filepath.Join(subPath, "HEAD")); statErr != nil {
60 logger.Warn("skipping non-repo directory", "path", subPath)
61 continue
62 }
63 repos = append(repos, legacyRepo{
64 ownerDid: ownerDid,
65 repoName: sub.Name(),
66 oldPath: subPath,
67 })
68 }
69 }
70 return repos
71}
72
73func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) bool {
74 repos := scanLegacyRepos(c.Repo.ScanPath, logger)
75 if len(repos) == 0 {
76 logger.Info("no legacy repos found, migration complete")
77 return true
78 }
79
80 logger.Info("starting legacy repo migration", "count", len(repos))
81 start := time.Now()
82
83 knotServiceUrl := "https://" + c.Server.Hostname
84 if c.Server.Dev {
85 knotServiceUrl = "http://" + c.Server.Hostname
86 }
87
88 migrated := 0
89 for _, repo := range repos {
90 select {
91 case <-ctx.Done():
92 logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated)
93 return false
94 default:
95 }
96
97 err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl)
98 if err != nil {
99 logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err)
100 continue
101 }
102 migrated++
103 }
104
105 logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start))
106 return migrated == len(repos)
107}
108
109func migrateOneRepo(
110 ctx context.Context,
111 c *config.Config,
112 d *db.DB,
113 e *rbac.Enforcer,
114 n *notifier.Notifier,
115 logger *slog.Logger,
116 repo legacyRepo,
117 knotServiceUrl string,
118) error {
119 l := logger.With("owner", repo.ownerDid, "repo", repo.repoName)
120
121 repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName)
122 needsMint := errors.Is(err, sql.ErrNoRows)
123 if err != nil && !needsMint {
124 return fmt.Errorf("checking repo_keys: %w", err)
125 }
126
127 if needsMint {
128 repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl)
129 if err != nil {
130 return err
131 }
132 }
133
134 if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil {
135 return fmt.Errorf("rewriting RBAC policies: %w", err)
136 }
137
138 newPath := filepath.Join(c.Repo.ScanPath, repoDid)
139
140 if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil {
141 return err
142 }
143
144 ownerDir := filepath.Dir(repo.oldPath)
145 if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) {
146 l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err)
147 }
148
149 if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid); err != nil {
150 l.Error("emitting didAssign event failed (non-fatal)", "error", err)
151 }
152
153 l.Info("migrated repo", "repoDid", repoDid)
154 return nil
155}
156
157func mintAndStoreRepoDID(
158 ctx context.Context,
159 c *config.Config,
160 d *db.DB,
161 l *slog.Logger,
162 repo legacyRepo,
163 knotServiceUrl string,
164) (string, error) {
165 prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl)
166 if err != nil {
167 return "", fmt.Errorf("preparing DID: %w", err)
168 }
169
170 if err := submitWithBackoff(ctx, prepared, l); err != nil {
171 return "", fmt.Errorf("PLC submission: %w", err)
172 }
173
174 if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName); err != nil {
175 return "", fmt.Errorf("storing repo key: %w", err)
176 }
177
178 return prepared.RepoDid, nil
179}
180
181func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error {
182 backoff := 2 * time.Second
183 const maxBackoff = 5 * time.Minute
184 const maxAttempts = 20
185
186 for attempt := range maxAttempts {
187 plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
188 err := prepared.Submit(plcCtx)
189 cancel()
190
191 if err == nil {
192 return nil
193 }
194
195 errMsg := err.Error()
196 retryable := strings.Contains(errMsg, "429") ||
197 strings.Contains(errMsg, "500") ||
198 strings.Contains(errMsg, "502") ||
199 strings.Contains(errMsg, "503") ||
200 strings.Contains(errMsg, "504")
201
202 if !retryable || attempt == maxAttempts-1 {
203 return err
204 }
205
206 jitter := time.Duration(rand.Int64N(int64(backoff / 4)))
207 delay := backoff + jitter
208
209 l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1)
210
211 select {
212 case <-ctx.Done():
213 return ctx.Err()
214 case <-time.After(delay):
215 }
216
217 backoff = min(backoff*2, maxBackoff)
218 }
219
220 return fmt.Errorf("unreachable: exceeded max attempts")
221}
222
223func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error {
224 if _, err := os.Stat(newPath); err == nil {
225 l.Info("new path already exists, skipping rename", "newPath", newPath)
226 return nil
227 }
228
229 if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) {
230 return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath)
231 }
232
233 if err := os.Rename(oldPath, newPath); err != nil {
234 return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err)
235 }
236
237 return nil
238}
239
240func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error {
241 oldResource := ownerDid + "/" + repoName
242
243 policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource)
244 if err != nil {
245 return fmt.Errorf("getting old policies: %w", err)
246 }
247
248 var addPolicies [][]string
249 var removePolicies [][]string
250 for _, p := range policies {
251 removePolicies = append(removePolicies, p)
252 newPolicy := make([]string, len(p))
253 copy(newPolicy, p)
254 newPolicy[2] = repoDid
255 addPolicies = append(addPolicies, newPolicy)
256 }
257
258 if len(addPolicies) > 0 {
259 if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil {
260 return fmt.Errorf("adding new policies: %w", addErr)
261 }
262 }
263
264 if len(removePolicies) > 0 {
265 if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil {
266 return fmt.Errorf("removing old policies: %w", rmErr)
267 }
268 }
269
270 l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies))
271 return nil
272}