Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand/v2" 10 "os" 11 "path/filepath" 12 "strings" 13 "time" 14 15 "tangled.org/core/knotserver/config" 16 "tangled.org/core/knotserver/db" 17 "tangled.org/core/knotserver/repodid" 18 "tangled.org/core/notifier" 19 "tangled.org/core/rbac" 20) 21 22type legacyRepo struct { 23 ownerDid string 24 repoName string 25 oldPath string 26} 27 28func scanLegacyRepos(scanPath string, logger *slog.Logger) []legacyRepo { 29 topEntries, err := os.ReadDir(scanPath) 30 if err != nil { 31 logger.Error("reading scan path", "error", err) 32 return nil 33 } 34 35 var repos []legacyRepo 36 for _, entry := range topEntries { 37 if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") { 38 continue 39 } 40 41 ownerPath := filepath.Join(scanPath, entry.Name()) 42 43 if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil { 44 continue 45 } 46 47 subEntries, readErr := os.ReadDir(ownerPath) 48 if readErr != nil { 49 logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr) 50 continue 51 } 52 53 ownerDid := entry.Name() 54 for _, sub := range subEntries { 55 if !sub.IsDir() { 56 continue 57 } 58 subPath := filepath.Join(ownerPath, sub.Name()) 59 if _, statErr := os.Stat(filepath.Join(subPath, "HEAD")); statErr != nil { 60 logger.Warn("skipping non-repo directory", "path", subPath) 61 continue 62 } 63 repos = append(repos, legacyRepo{ 64 ownerDid: ownerDid, 65 repoName: sub.Name(), 66 oldPath: subPath, 67 }) 68 } 69 } 70 return repos 71} 72 73func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) bool { 74 repos := scanLegacyRepos(c.Repo.ScanPath, logger) 75 if len(repos) == 0 { 76 logger.Info("no legacy repos found, migration complete") 77 return true 78 } 79 80 logger.Info("starting legacy repo migration", "count", len(repos)) 81 start := time.Now() 82 83 knotServiceUrl := "https://" + c.Server.Hostname 84 if c.Server.Dev { 85 knotServiceUrl = "http://" + c.Server.Hostname 86 } 87 88 migrated := 0 89 for _, repo := range repos { 90 select { 91 case <-ctx.Done(): 92 logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated) 93 return false 94 default: 95 } 96 97 err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl) 98 if err != nil { 99 logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err) 100 continue 101 } 102 migrated++ 103 } 104 105 logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start)) 106 return migrated == len(repos) 107} 108 109func migrateOneRepo( 110 ctx context.Context, 111 c *config.Config, 112 d *db.DB, 113 e *rbac.Enforcer, 114 n *notifier.Notifier, 115 logger *slog.Logger, 116 repo legacyRepo, 117 knotServiceUrl string, 118) error { 119 l := logger.With("owner", repo.ownerDid, "repo", repo.repoName) 120 121 repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName) 122 needsMint := errors.Is(err, sql.ErrNoRows) 123 if err != nil && !needsMint { 124 return fmt.Errorf("checking repo_keys: %w", err) 125 } 126 127 if needsMint { 128 repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl) 129 if err != nil { 130 return err 131 } 132 } 133 134 if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil { 135 return fmt.Errorf("rewriting RBAC policies: %w", err) 136 } 137 138 newPath := filepath.Join(c.Repo.ScanPath, repoDid) 139 140 if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil { 141 return err 142 } 143 144 ownerDir := filepath.Dir(repo.oldPath) 145 if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) { 146 l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err) 147 } 148 149 if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid); err != nil { 150 l.Error("emitting didAssign event failed (non-fatal)", "error", err) 151 } 152 153 l.Info("migrated repo", "repoDid", repoDid) 154 return nil 155} 156 157func mintAndStoreRepoDID( 158 ctx context.Context, 159 c *config.Config, 160 d *db.DB, 161 l *slog.Logger, 162 repo legacyRepo, 163 knotServiceUrl string, 164) (string, error) { 165 prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl) 166 if err != nil { 167 return "", fmt.Errorf("preparing DID: %w", err) 168 } 169 170 if err := submitWithBackoff(ctx, prepared, l); err != nil { 171 return "", fmt.Errorf("PLC submission: %w", err) 172 } 173 174 if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName); err != nil { 175 return "", fmt.Errorf("storing repo key: %w", err) 176 } 177 178 return prepared.RepoDid, nil 179} 180 181func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error { 182 backoff := 2 * time.Second 183 const maxBackoff = 5 * time.Minute 184 const maxAttempts = 20 185 186 for attempt := range maxAttempts { 187 plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 188 err := prepared.Submit(plcCtx) 189 cancel() 190 191 if err == nil { 192 return nil 193 } 194 195 errMsg := err.Error() 196 retryable := strings.Contains(errMsg, "429") || 197 strings.Contains(errMsg, "500") || 198 strings.Contains(errMsg, "502") || 199 strings.Contains(errMsg, "503") || 200 strings.Contains(errMsg, "504") 201 202 if !retryable || attempt == maxAttempts-1 { 203 return err 204 } 205 206 jitter := time.Duration(rand.Int64N(int64(backoff / 4))) 207 delay := backoff + jitter 208 209 l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1) 210 211 select { 212 case <-ctx.Done(): 213 return ctx.Err() 214 case <-time.After(delay): 215 } 216 217 backoff = min(backoff*2, maxBackoff) 218 } 219 220 return fmt.Errorf("unreachable: exceeded max attempts") 221} 222 223func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error { 224 if _, err := os.Stat(newPath); err == nil { 225 l.Info("new path already exists, skipping rename", "newPath", newPath) 226 return nil 227 } 228 229 if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) { 230 return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath) 231 } 232 233 if err := os.Rename(oldPath, newPath); err != nil { 234 return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err) 235 } 236 237 return nil 238} 239 240func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error { 241 oldResource := ownerDid + "/" + repoName 242 243 policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource) 244 if err != nil { 245 return fmt.Errorf("getting old policies: %w", err) 246 } 247 248 var addPolicies [][]string 249 var removePolicies [][]string 250 for _, p := range policies { 251 removePolicies = append(removePolicies, p) 252 newPolicy := make([]string, len(p)) 253 copy(newPolicy, p) 254 newPolicy[2] = repoDid 255 addPolicies = append(addPolicies, newPolicy) 256 } 257 258 if len(addPolicies) > 0 { 259 if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil { 260 return fmt.Errorf("adding new policies: %w", addErr) 261 } 262 } 263 264 if len(removePolicies) > 0 { 265 if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil { 266 return fmt.Errorf("removing old policies: %w", rmErr) 267 } 268 } 269 270 l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies)) 271 return nil 272}