Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

spindle: legacy secret migration via tap resync nudge

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 12, 2026, 9:03 PM +0300) commit 2b00f542 parent e6aff440 change-id rnkqzzzz
+766 -124
+12 -4
spindle/db/repos.go
··· 36 36 `delete from repos 37 37 where owner = ? 38 38 and repo_did = ? 39 - and created_at is not null 40 - and created_at < ( 41 - select max(created_at) from repos 42 - where owner = ? and repo_did = ? and created_at is not null 39 + and ( 40 + (created_at is null and exists ( 41 + select 1 from repos r2 42 + where r2.owner = repos.owner 43 + and r2.repo_did = repos.repo_did 44 + and r2.created_at is not null 45 + and r2.rkey <> repos.rkey 46 + )) 47 + or (created_at is not null and created_at < ( 48 + select max(created_at) from repos 49 + where owner = ? and repo_did = ? and created_at is not null 50 + )) 43 51 )`, 44 52 owner.String(), repoDid.String(), owner.String(), repoDid.String(), 45 53 )
+143
spindle/db/repos_test.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "path/filepath" 6 + "testing" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + ) 10 + 11 + func newTestDB(t *testing.T) *DB { 12 + t.Helper() 13 + d, err := Make(context.Background(), filepath.Join(t.TempDir(), "spindle.db")) 14 + if err != nil { 15 + t.Fatalf("Make: %v", err) 16 + } 17 + t.Cleanup(func() { d.Close() }) 18 + return d 19 + } 20 + 21 + func TestCollapseRepoSiblings_DeletesStaleNullCreatedAtWithDifferentRkey(t *testing.T) { 22 + d := newTestDB(t) 23 + owner := syntax.DID("did:plc:akshay") 24 + repoDid := syntax.DID("did:plc:boltless") 25 + 26 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 27 + ('k', ?, 'stale-bogus-rkey', ?, null), 28 + ('k', ?, 'fresh-pds-rkey', ?, '2024-06-01T00:00:00Z')`, 29 + owner.String(), repoDid.String(), 30 + owner.String(), repoDid.String()); err != nil { 31 + t.Fatalf("seed: %v", err) 32 + } 33 + 34 + n, err := d.CollapseRepoSiblings(owner, repoDid) 35 + if err != nil { 36 + t.Fatalf("CollapseRepoSiblings: %v", err) 37 + } 38 + if n != 1 { 39 + t.Errorf("expected 1 stale row deleted, got %d", n) 40 + } 41 + 42 + var rkey string 43 + if err := d.QueryRow(`select rkey from repos where owner = ? and repo_did = ?`, 44 + owner.String(), repoDid.String()).Scan(&rkey); err != nil { 45 + t.Fatalf("query: %v", err) 46 + } 47 + if rkey != "fresh-pds-rkey" { 48 + t.Errorf("expected fresh row preserved, got rkey=%q", rkey) 49 + } 50 + } 51 + 52 + func TestCollapseRepoSiblings_KeepsNullCreatedAtWhenAlone(t *testing.T) { 53 + d := newTestDB(t) 54 + owner := syntax.DID("did:plc:akshay") 55 + repoDid := syntax.DID("did:plc:boltless") 56 + 57 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 58 + ('k', ?, 'sole-row', ?, null)`, 59 + owner.String(), repoDid.String()); err != nil { 60 + t.Fatalf("seed: %v", err) 61 + } 62 + 63 + n, err := d.CollapseRepoSiblings(owner, repoDid) 64 + if err != nil { 65 + t.Fatalf("CollapseRepoSiblings: %v", err) 66 + } 67 + if n != 0 { 68 + t.Errorf("expected 0 deletions when only NULL row exists, got %d", n) 69 + } 70 + 71 + var count int 72 + if err := d.QueryRow(`select count(*) from repos where owner = ?`, owner.String()).Scan(&count); err != nil { 73 + t.Fatalf("count: %v", err) 74 + } 75 + if count != 1 { 76 + t.Errorf("sole NULL row should survive, got %d remaining", count) 77 + } 78 + } 79 + 80 + func TestCollapseRepoSiblings_OlderTimestampLoses(t *testing.T) { 81 + d := newTestDB(t) 82 + owner := syntax.DID("did:plc:akshay") 83 + repoDid := syntax.DID("did:plc:boltless") 84 + 85 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 86 + ('k', ?, 'older-rkey', ?, '2024-01-01T00:00:00Z'), 87 + ('k', ?, 'newer-rkey', ?, '2024-06-01T00:00:00Z')`, 88 + owner.String(), repoDid.String(), 89 + owner.String(), repoDid.String()); err != nil { 90 + t.Fatalf("seed: %v", err) 91 + } 92 + 93 + n, err := d.CollapseRepoSiblings(owner, repoDid) 94 + if err != nil { 95 + t.Fatalf("CollapseRepoSiblings: %v", err) 96 + } 97 + if n != 1 { 98 + t.Errorf("expected older row collapsed, got %d", n) 99 + } 100 + 101 + var rkey string 102 + if err := d.QueryRow(`select rkey from repos where owner = ? and repo_did = ?`, 103 + owner.String(), repoDid.String()).Scan(&rkey); err != nil { 104 + t.Fatalf("query: %v", err) 105 + } 106 + if rkey != "newer-rkey" { 107 + t.Errorf("expected newer row preserved, got rkey=%q", rkey) 108 + } 109 + } 110 + 111 + func TestCollapseRepoSiblings_KeepsNullRowWithMatchingRkey(t *testing.T) { 112 + d := newTestDB(t) 113 + owner := syntax.DID("did:plc:akshay") 114 + repoDid := syntax.DID("did:plc:boltless") 115 + 116 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 117 + ('k', ?, 'matched-rkey', ?, null)`, 118 + owner.String(), repoDid.String()); err != nil { 119 + t.Fatalf("seed: %v", err) 120 + } 121 + 122 + if err := d.AddRepo(Repo{ 123 + Knot: "k", 124 + Owner: owner, 125 + Rkey: "matched-rkey", 126 + RepoDid: repoDid, 127 + CreatedAt: "2024-06-01T00:00:00Z", 128 + }); err != nil { 129 + t.Fatalf("AddRepo upsert: %v", err) 130 + } 131 + 132 + if _, err := d.CollapseRepoSiblings(owner, repoDid); err != nil { 133 + t.Fatalf("CollapseRepoSiblings: %v", err) 134 + } 135 + 136 + var count int 137 + if err := d.QueryRow(`select count(*) from repos where owner = ?`, owner.String()).Scan(&count); err != nil { 138 + t.Fatalf("count: %v", err) 139 + } 140 + if count != 1 { 141 + t.Errorf("upserted row should be the single survivor, got %d", count) 142 + } 143 + }
+59
spindle/secret_copy.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 + "log/slog" 7 8 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/spindle/db" 8 11 "tangled.org/core/spindle/secrets" 9 12 ) 10 13 ··· 37 40 } 38 41 return step(cur, 0) 39 42 } 43 + 44 + func legacyKeyCandidates(owner syntax.DID, name string, rkey syntax.RecordKey) []string { 45 + o := owner.String() 46 + r := rkey.String() 47 + switch { 48 + case name == "" && r == "": 49 + return nil 50 + case name == "": 51 + return []string{o + "/" + r} 52 + case r == "" || name == r: 53 + return []string{o + "/" + name} 54 + default: 55 + return []string{o + "/" + name, o + "/" + r} 56 + } 57 + } 58 + 59 + func migrateLegacyRepoSecrets(ctx context.Context, d *db.DB, vault secrets.Manager, logger *slog.Logger, owner syntax.DID, name string, rkey syntax.RecordKey, repoDid syntax.DID) { 60 + candidates := legacyKeyCandidates(owner, name, rkey) 61 + if len(candidates) == 0 { 62 + return 63 + } 64 + flag := "legacy-secret-copy:" + repoDid.String() + ":" + rkey.String() 65 + var exists bool 66 + if err := d.QueryRowContext(ctx, `select exists (select 1 from migrations where name = ?)`, flag).Scan(&exists); err != nil { 67 + logger.Warn("legacy secret copy: check migration flag", "err", err) 68 + return 69 + } 70 + if exists { 71 + return 72 + } 73 + 74 + newID := secrets.RepoIdentifier(repoDid.String()) 75 + var step func(remaining []string, copied int) (int, error) 76 + step = func(remaining []string, copied int) (int, error) { 77 + if len(remaining) == 0 { 78 + return copied, nil 79 + } 80 + oldID := secrets.RepoIdentifier(remaining[0]) 81 + n, err := copyRepoSecrets(ctx, vault, oldID, newID) 82 + if err != nil { 83 + return copied, fmt.Errorf("copy %s -> %s: %w", oldID, newID, err) 84 + } 85 + return step(remaining[1:], copied+n) 86 + } 87 + total, err := step(candidates, 0) 88 + if err != nil { 89 + logger.Warn("legacy secret copy failed", "err", err) 90 + return 91 + } 92 + 93 + if _, err := d.ExecContext(ctx, `insert or ignore into migrations (name) values (?)`, flag); err != nil { 94 + logger.Warn("legacy secret copy: mark flag failed", "err", err) 95 + return 96 + } 97 + logger.Info("legacy secret migrate done", "owner", owner, "name", name, "rkey", rkey, "repoDid", repoDid, "candidates", candidates, "copied", total) 98 + }
+1 -1
spindle/server.go
··· 100 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 101 } 102 102 103 - if err := runStartupMigrations(ctx, d, vault, logger); err != nil { 103 + if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil { 104 104 return nil, fmt.Errorf("failed to run startup migrations: %w", err) 105 105 } 106 106
+80 -54
spindle/startup_migrations.go
··· 3 3 import ( 4 4 "context" 5 5 "database/sql" 6 + "errors" 6 7 "fmt" 7 8 "log/slog" 9 + "os" 8 10 9 - "tangled.org/core/orm" 11 + _ "github.com/mattn/go-sqlite3" 10 12 "tangled.org/core/spindle/db" 11 - "tangled.org/core/spindle/secrets" 12 13 ) 13 14 14 - func runStartupMigrations(ctx context.Context, d *db.DB, vault secrets.Manager, logger *slog.Logger) error { 15 - conn, err := d.DB.Conn(ctx) 16 - if err != nil { 17 - return fmt.Errorf("acquire spindle conn: %w", err) 18 - } 19 - defer conn.Close() 15 + const forceTapResyncFlag = "force-tap-repo-resync-v1" 20 16 21 - return orm.RunMigration(conn, logger, "copy-owner-rkey-secrets-to-repo-did", func(tx *sql.Tx) error { 22 - return copyOwnerRkeySecretsToRepoDid(ctx, tx, vault, logger) 23 - }) 17 + func runStartupMigrations(ctx context.Context, d *db.DB, tapEmbed bool, tapDBPath string, logger *slog.Logger) error { 18 + if err := cleanupOrphanRepos(ctx, d, logger); err != nil { 19 + return fmt.Errorf("cleanup orphan repos: %w", err) 20 + } 21 + if !tapEmbed { 22 + logger.Warn("tap not embedded: legacy repos won't auto-resync; trigger external tap resync to migrate secrets/casbin") 23 + return nil 24 + } 25 + if err := nudgeTapForResync(ctx, d, tapDBPath, logger); err != nil { 26 + return fmt.Errorf("nudge tap for resync: %w", err) 27 + } 28 + return nil 24 29 } 25 30 26 - type repoSecretPair struct { 27 - oldID, newID secrets.RepoIdentifier 31 + func cleanupOrphanRepos(ctx context.Context, d *db.DB, logger *slog.Logger) error { 32 + res, err := d.ExecContext(ctx, ` 33 + delete from repos 34 + where coalesce(repo_did, '') = '' 35 + and exists ( 36 + select 1 from repos r2 37 + where r2.owner = repos.owner 38 + and coalesce(r2.repo_did, '') <> '' 39 + ) 40 + `) 41 + if err != nil { 42 + return fmt.Errorf("delete orphan repos: %w", err) 43 + } 44 + n, _ := res.RowsAffected() 45 + if n > 0 { 46 + logger.Info("cleaned up orphan repos missing repo_did", "deleted", n) 47 + } 48 + return nil 28 49 } 29 50 30 - func loadRepoSecretPairs(ctx context.Context, tx *sql.Tx) ([]repoSecretPair, error) { 31 - rows, err := tx.QueryContext(ctx, 32 - `select owner, rkey, repo_did from repos 33 - where repo_did is not null and repo_did <> ''`, 34 - ) 35 - if err != nil { 36 - return nil, fmt.Errorf("select repos: %w", err) 51 + func nudgeTapForResync(ctx context.Context, d *db.DB, tapDBPath string, logger *slog.Logger) error { 52 + if tapDBPath == "" { 53 + return fmt.Errorf("tap db path empty in embed mode") 54 + } 55 + var exists bool 56 + if err := d.QueryRowContext(ctx, 57 + `select exists (select 1 from migrations where name = ?)`, 58 + forceTapResyncFlag, 59 + ).Scan(&exists); err != nil { 60 + return fmt.Errorf("check %s flag: %w", forceTapResyncFlag, err) 37 61 } 38 - defer rows.Close() 62 + if exists { 63 + logger.Warn("skipped migration, already applied", "migration", forceTapResyncFlag) 64 + return nil 65 + } 39 66 40 - var collect func(acc []repoSecretPair) ([]repoSecretPair, error) 41 - collect = func(acc []repoSecretPair) ([]repoSecretPair, error) { 42 - if !rows.Next() { 43 - return acc, rows.Err() 67 + markDone := func() error { 68 + if _, err := d.ExecContext(ctx, 69 + `insert or ignore into migrations (name) values (?)`, 70 + forceTapResyncFlag, 71 + ); err != nil { 72 + return fmt.Errorf("mark %s done: %w", forceTapResyncFlag, err) 44 73 } 45 - var owner, rkey, repoDid string 46 - if err := rows.Scan(&owner, &rkey, &repoDid); err != nil { 47 - return acc, fmt.Errorf("scan repos row: %w", err) 48 - } 49 - return collect(append(acc, repoSecretPair{ 50 - oldID: secrets.RepoIdentifier(owner + "/" + rkey), 51 - newID: secrets.RepoIdentifier(repoDid), 52 - })) 74 + return nil 75 + } 76 + 77 + if _, err := os.Stat(tapDBPath); errors.Is(err, os.ErrNotExist) { 78 + logger.Info("tap db not yet created, marking resync nudge done", "migration", forceTapResyncFlag, "path", tapDBPath) 79 + return markDone() 80 + } else if err != nil { 81 + return fmt.Errorf("stat tap db: %w", err) 82 + } 83 + 84 + tdb, err := sql.Open("sqlite3", tapDBPath+"?_busy_timeout=5000") 85 + if err != nil { 86 + return fmt.Errorf("open tap db: %w", err) 53 87 } 54 - return collect(nil) 55 - } 88 + defer tdb.Close() 56 89 57 - func copyOwnerRkeySecretsToRepoDid(ctx context.Context, tx *sql.Tx, vault secrets.Manager, logger *slog.Logger) error { 58 - pairs, err := loadRepoSecretPairs(ctx, tx) 90 + if _, err := tdb.ExecContext(ctx, `delete from repo_records`); err != nil { 91 + return fmt.Errorf("clear tap repo_records: %w", err) 92 + } 93 + res, err := tdb.ExecContext(ctx, 94 + `update repos set state = 'desynchronized', retry_after = 0 where state in ('active','error')`, 95 + ) 59 96 if err != nil { 60 - return err 97 + return fmt.Errorf("desync tap repos: %w", err) 61 98 } 99 + n, _ := res.RowsAffected() 62 100 63 - var step func(remaining []repoSecretPair, totalCopied int) error 64 - step = func(remaining []repoSecretPair, totalCopied int) error { 65 - if len(remaining) == 0 { 66 - logger.Info("secret copy migration complete", "rows", len(pairs), "copied", totalCopied) 67 - return nil 68 - } 69 - p := remaining[0] 70 - n, err := copyRepoSecrets(ctx, vault, p.oldID, p.newID) 71 - if err != nil { 72 - return fmt.Errorf("copy %s -> %s: %w", p.oldID, p.newID, err) 73 - } 74 - if n > 0 { 75 - logger.Info("secrets copied", "old", p.oldID, "new", p.newID, "count", n) 76 - } 77 - return step(remaining[1:], totalCopied+n) 101 + if err := markDone(); err != nil { 102 + return err 78 103 } 79 - return step(pairs, 0) 104 + logger.Info("nudged tap to resync", "migration", forceTapResyncFlag, "repos_desynced", n) 105 + return nil 80 106 }
+459 -65
spindle/startup_migrations_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "io" 6 7 "log/slog" 7 8 "path/filepath" ··· 14 15 "tangled.org/core/spindle/secrets" 15 16 ) 16 17 18 + func seedTapDB(t *testing.T, path string) { 19 + t.Helper() 20 + tdb, err := sql.Open("sqlite3", path) 21 + if err != nil { 22 + t.Fatalf("open tap db: %v", err) 23 + } 24 + defer tdb.Close() 25 + if _, err := tdb.Exec(` 26 + create table repos ( 27 + did text primary key, 28 + state text not null default 'pending', 29 + status text not null default 'active', 30 + handle text default '', 31 + rev text default '', 32 + prev_data text default '', 33 + error_msg text default '', 34 + retry_count integer not null default 0, 35 + retry_after integer not null default 0 36 + ); 37 + create table repo_records ( 38 + did text not null, 39 + collection text not null, 40 + rkey text not null, 41 + cid text not null, 42 + primary key (did, collection, rkey) 43 + ); 44 + `); err != nil { 45 + t.Fatalf("create tap tables: %v", err) 46 + } 47 + } 48 + 49 + func tapRepoState(t *testing.T, path, did string) string { 50 + t.Helper() 51 + tdb, err := sql.Open("sqlite3", path) 52 + if err != nil { 53 + t.Fatalf("open tap db: %v", err) 54 + } 55 + defer tdb.Close() 56 + var state string 57 + if err := tdb.QueryRow(`select state from repos where did = ?`, did).Scan(&state); err != nil { 58 + t.Fatalf("query state for %s: %v", did, err) 59 + } 60 + return state 61 + } 62 + 63 + func tapRecordCount(t *testing.T, path string) int { 64 + t.Helper() 65 + tdb, err := sql.Open("sqlite3", path) 66 + if err != nil { 67 + t.Fatalf("open tap db: %v", err) 68 + } 69 + defer tdb.Close() 70 + var n int 71 + if err := tdb.QueryRow(`select count(*) from repo_records`).Scan(&n); err != nil { 72 + t.Fatalf("count repo_records: %v", err) 73 + } 74 + return n 75 + } 76 + 17 77 func newTestSpindleDB(t *testing.T) *db.DB { 18 78 t.Helper() 19 79 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "spindle.db")) ··· 33 93 return vault 34 94 } 35 95 36 - func mustAddRepo(t *testing.T, d *db.DB, knot, owner, rkey, repoDid string) { 37 - t.Helper() 38 - if err := d.AddRepo(db.Repo{ 39 - Knot: knot, 40 - Owner: syntax.DID(owner), 41 - Rkey: syntax.RecordKey(rkey), 42 - RepoDid: syntax.DID(repoDid), 43 - }); err != nil { 44 - t.Fatalf("AddRepo(%s): %v", rkey, err) 45 - } 46 - } 47 - 48 96 func mustAddSecret(t *testing.T, vault secrets.Manager, repo, key, value string, createdAt time.Time, by string) { 49 97 t.Helper() 50 98 err := vault.AddSecret(context.Background(), secrets.UnlockedSecret{ ··· 59 107 } 60 108 } 61 109 62 - func TestStartupMigrations_CopyOwnerRkeySecretsToRepoDid(t *testing.T) { 110 + func TestMigrateLegacyRepoSecrets_NameCandidate(t *testing.T) { 63 111 ctx := context.Background() 64 112 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 65 - 66 113 d := newTestSpindleDB(t) 67 114 vault := newTestVault(t) 68 115 69 - owner := "did:plc:akshay" 70 - migratedRepoDid := "did:plc:boltless" 71 - skippedRkey := "3kspindlerkey00b" 72 - migratedRkey := "3kspindlerkey00a" 73 - 74 - mustAddRepo(t, d, "knot.test", owner, migratedRkey, migratedRepoDid) 75 - mustAddRepo(t, d, "knot.test", owner, skippedRkey, "") 116 + owner := syntax.DID("did:plc:akshay") 117 + repoDid := syntax.DID("did:plc:boltless") 118 + displayName := "myrepo" 119 + rkey := syntax.RecordKey("3kspindlerkey00a") 76 120 77 121 created := time.Date(2024, 6, 1, 12, 0, 0, 0, time.UTC) 78 - oldRepoKey := owner + "/" + migratedRkey 79 - skippedKey := owner + "/" + skippedRkey 122 + oldNameKey := owner.String() + "/" + displayName 80 123 81 - mustAddSecret(t, vault, oldRepoKey, "API_KEY", "alpha", created, owner) 82 - mustAddSecret(t, vault, oldRepoKey, "DB_PASSWORD", "bravo", created.Add(1*time.Hour), owner) 83 - mustAddSecret(t, vault, skippedKey, "STRAY", "delta", created, owner) 124 + mustAddSecret(t, vault, oldNameKey, "API_KEY", "alpha", created, owner.String()) 125 + mustAddSecret(t, vault, oldNameKey, "DB_PASSWORD", "bravo", created.Add(1*time.Hour), owner.String()) 84 126 85 - if err := runStartupMigrations(ctx, d, vault, logger); err != nil { 86 - t.Fatalf("first migration run: %v", err) 87 - } 127 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, displayName, rkey, repoDid) 88 128 89 - copied, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(migratedRepoDid)) 129 + copied, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 90 130 if err != nil { 91 131 t.Fatalf("GetSecretsUnlocked(new): %v", err) 92 132 } 93 133 if len(copied) != 2 { 94 - t.Fatalf("expected 2 secrets under new repo_did key, got %d", len(copied)) 134 + t.Fatalf("expected 2 secrets under repo_did key, got %d", len(copied)) 95 135 } 96 136 97 137 want := map[string]struct { ··· 104 144 for _, s := range copied { 105 145 w, ok := want[s.Key] 106 146 if !ok { 107 - t.Errorf("unexpected key %q under %s", s.Key, migratedRepoDid) 147 + t.Errorf("unexpected key %q under %s", s.Key, repoDid) 108 148 continue 109 149 } 110 150 if s.Value != w.value { ··· 113 153 if !s.CreatedAt.Equal(w.createdAt) { 114 154 t.Errorf("%s: CreatedAt got %s, want %s", s.Key, s.CreatedAt, w.createdAt) 115 155 } 116 - if string(s.Repo) != migratedRepoDid { 117 - t.Errorf("%s: Repo got %s, want %s", s.Key, s.Repo, migratedRepoDid) 156 + if string(s.Repo) != repoDid.String() { 157 + t.Errorf("%s: Repo got %s, want %s", s.Key, s.Repo, repoDid) 118 158 } 119 159 } 120 160 121 - orig, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(oldRepoKey)) 161 + orig, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(oldNameKey)) 122 162 if err != nil { 123 163 t.Fatalf("GetSecretsUnlocked(old): %v", err) 124 164 } ··· 126 166 t.Errorf("expected old-key secrets preserved, got %d", len(orig)) 127 167 } 128 168 129 - stray, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(skippedKey)) 130 - if err != nil { 131 - t.Fatalf("GetSecretsUnlocked(skipped): %v", err) 132 - } 133 - if len(stray) != 1 { 134 - t.Errorf("expected skipped repo's old-key secret untouched, got %d", len(stray)) 135 - } 136 - 137 - if err := runStartupMigrations(ctx, d, vault, logger); err != nil { 138 - t.Fatalf("second migration run: %v", err) 139 - } 140 - 141 - again, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(migratedRepoDid)) 169 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, displayName, rkey, repoDid) 170 + again, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 142 171 if err != nil { 143 172 t.Fatalf("GetSecretsUnlocked(new) after re-run: %v", err) 144 173 } ··· 149 178 var marked int 150 179 if err := d.QueryRow( 151 180 `select count(*) from migrations where name = ?`, 152 - "copy-owner-rkey-secrets-to-repo-did", 181 + "legacy-secret-copy:"+repoDid.String()+":"+rkey.String(), 153 182 ).Scan(&marked); err != nil { 154 183 t.Fatalf("query migrations: %v", err) 155 184 } 156 185 if marked != 1 { 157 - t.Errorf("expected migration recorded exactly once, got %d", marked) 186 + t.Errorf("expected per-repo flag recorded exactly once, got %d", marked) 158 187 } 159 188 } 160 189 161 - func TestStartupMigrations_NoRepos(t *testing.T) { 190 + func TestMigrateLegacyRepoSecrets_RkeyCandidate(t *testing.T) { 162 191 ctx := context.Background() 163 192 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 164 193 d := newTestSpindleDB(t) 165 194 vault := newTestVault(t) 166 195 167 - if err := runStartupMigrations(ctx, d, vault, logger); err != nil { 168 - t.Fatalf("migration on empty db: %v", err) 196 + owner := syntax.DID("did:plc:akshay") 197 + repoDid := syntax.DID("did:plc:boltless") 198 + displayName := "myrepo" 199 + rkey := syntax.RecordKey("3kspindlerkey00a") 200 + 201 + created := time.Date(2024, 6, 1, 12, 0, 0, 0, time.UTC) 202 + oldRkeyKey := owner.String() + "/" + rkey.String() 203 + 204 + mustAddSecret(t, vault, oldRkeyKey, "API_KEY", "alpha", created, owner.String()) 205 + mustAddSecret(t, vault, oldRkeyKey, "DB_PASSWORD", "bravo", created, owner.String()) 206 + 207 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, displayName, rkey, repoDid) 208 + 209 + got, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 210 + if err != nil { 211 + t.Fatalf("GetSecretsUnlocked: %v", err) 212 + } 213 + if len(got) != 2 { 214 + t.Fatalf("expected 2 secrets copied via rkey candidate, got %d", len(got)) 169 215 } 170 216 } 171 217 172 - func TestStartupMigrations_PartialPreExisting(t *testing.T) { 218 + func TestMigrateLegacyRepoSecrets_BothCandidates(t *testing.T) { 173 219 ctx := context.Background() 174 220 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 175 221 d := newTestSpindleDB(t) 176 222 vault := newTestVault(t) 177 223 178 - owner := "did:plc:akshay" 179 - repoDid := "did:plc:boltless" 180 - rkey := "3kspindlerkey00a" 181 - mustAddRepo(t, d, "knot.test", owner, rkey, repoDid) 224 + owner := syntax.DID("did:plc:akshay") 225 + repoDid := syntax.DID("did:plc:boltless") 226 + displayName := "myrepo" 227 + rkey := syntax.RecordKey("3kspindlerkey00a") 182 228 183 229 created := time.Date(2024, 6, 1, 12, 0, 0, 0, time.UTC) 184 - oldKey := owner + "/" + rkey 185 - mustAddSecret(t, vault, oldKey, "API_KEY", "alpha", created, owner) 186 - mustAddSecret(t, vault, oldKey, "DB_PASSWORD", "bravo", created, owner) 230 + oldNameKey := owner.String() + "/" + displayName 231 + oldRkeyKey := owner.String() + "/" + rkey.String() 232 + 233 + mustAddSecret(t, vault, oldNameKey, "FROM_NAME", "n", created, owner.String()) 234 + mustAddSecret(t, vault, oldRkeyKey, "FROM_RKEY", "r", created, owner.String()) 187 235 188 - mustAddSecret(t, vault, repoDid, "API_KEY", "pre-existing", created.Add(-24*time.Hour), owner) 236 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, displayName, rkey, repoDid) 189 237 190 - if err := runStartupMigrations(ctx, d, vault, logger); err != nil { 191 - t.Fatalf("migration: %v", err) 238 + got, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 239 + if err != nil { 240 + t.Fatalf("GetSecretsUnlocked: %v", err) 241 + } 242 + if len(got) != 2 { 243 + t.Fatalf("expected 2 secrets merged from both candidates, got %d", len(got)) 244 + } 245 + seen := map[string]string{} 246 + for _, s := range got { 247 + seen[s.Key] = s.Value 192 248 } 249 + if seen["FROM_NAME"] != "n" { 250 + t.Errorf("FROM_NAME missing or wrong value: %q", seen["FROM_NAME"]) 251 + } 252 + if seen["FROM_RKEY"] != "r" { 253 + t.Errorf("FROM_RKEY missing or wrong value: %q", seen["FROM_RKEY"]) 254 + } 255 + } 256 + 257 + func TestMigrateLegacyRepoSecrets_PreExistingTakesPriority(t *testing.T) { 258 + ctx := context.Background() 259 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 260 + d := newTestSpindleDB(t) 261 + vault := newTestVault(t) 262 + 263 + owner := syntax.DID("did:plc:akshay") 264 + repoDid := syntax.DID("did:plc:boltless") 265 + displayName := "myrepo" 266 + rkey := syntax.RecordKey("3kspindlerkey00a") 267 + 268 + created := time.Date(2024, 6, 1, 12, 0, 0, 0, time.UTC) 269 + oldKey := owner.String() + "/" + displayName 270 + 271 + mustAddSecret(t, vault, oldKey, "API_KEY", "alpha", created, owner.String()) 272 + mustAddSecret(t, vault, oldKey, "DB_PASSWORD", "bravo", created, owner.String()) 273 + mustAddSecret(t, vault, repoDid.String(), "API_KEY", "pre-existing", created.Add(-24*time.Hour), owner.String()) 274 + 275 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, displayName, rkey, repoDid) 193 276 194 277 got, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 195 278 if err != nil { 196 279 t.Fatalf("GetSecretsUnlocked: %v", err) 197 280 } 198 281 if len(got) != 2 { 199 - t.Fatalf("expected 2 secrets under new key, got %d", len(got)) 282 + t.Fatalf("expected 2 secrets under repo_did key, got %d", len(got)) 200 283 } 201 284 for _, s := range got { 202 285 if s.Key == "API_KEY" && s.Value != "pre-existing" { ··· 207 290 } 208 291 } 209 292 } 293 + 294 + func TestMigrateLegacyRepoSecrets_EmptyName(t *testing.T) { 295 + ctx := context.Background() 296 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 297 + d := newTestSpindleDB(t) 298 + vault := newTestVault(t) 299 + 300 + owner := syntax.DID("did:plc:akshay") 301 + repoDid := syntax.DID("did:plc:boltless") 302 + rkey := syntax.RecordKey("3kspindlerkey00a") 303 + 304 + created := time.Date(2024, 6, 1, 12, 0, 0, 0, time.UTC) 305 + oldRkeyKey := owner.String() + "/" + rkey.String() 306 + mustAddSecret(t, vault, oldRkeyKey, "API_KEY", "alpha", created, owner.String()) 307 + 308 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, "", rkey, repoDid) 309 + 310 + got, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 311 + if err != nil { 312 + t.Fatalf("GetSecretsUnlocked: %v", err) 313 + } 314 + if len(got) != 1 { 315 + t.Errorf("expected 1 secret via rkey candidate when name empty, got %d", len(got)) 316 + } 317 + } 318 + 319 + func TestMigrateLegacyRepoSecrets_BothEmpty(t *testing.T) { 320 + ctx := context.Background() 321 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 322 + d := newTestSpindleDB(t) 323 + vault := newTestVault(t) 324 + 325 + owner := syntax.DID("did:plc:akshay") 326 + repoDid := syntax.DID("did:plc:boltless") 327 + 328 + migrateLegacyRepoSecrets(ctx, d, vault, logger, owner, "", "", repoDid) 329 + 330 + got, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(repoDid)) 331 + if err != nil { 332 + t.Fatalf("GetSecretsUnlocked: %v", err) 333 + } 334 + if len(got) != 0 { 335 + t.Errorf("expected no work when both name and rkey empty, got %d secrets", len(got)) 336 + } 337 + 338 + var marked int 339 + if err := d.QueryRow( 340 + `select count(*) from migrations where name like ?`, 341 + "legacy-secret-copy:"+repoDid.String()+":%", 342 + ).Scan(&marked); err != nil { 343 + t.Fatalf("query migrations: %v", err) 344 + } 345 + if marked != 0 { 346 + t.Errorf("empty inputs should not record flag, got %d", marked) 347 + } 348 + } 349 + 350 + func TestNudgeTapForResync(t *testing.T) { 351 + ctx := context.Background() 352 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 353 + d := newTestSpindleDB(t) 354 + 355 + tapPath := filepath.Join(t.TempDir(), "tap.db") 356 + seedTapDB(t, tapPath) 357 + 358 + tdb, err := sql.Open("sqlite3", tapPath) 359 + if err != nil { 360 + t.Fatalf("open tap db: %v", err) 361 + } 362 + if _, err := tdb.Exec(`insert into repos (did, state) values 363 + ('did:plc:akshay', 'active'), 364 + ('did:plc:boltless', 'error'), 365 + ('did:plc:limpet', 'pending') 366 + `); err != nil { 367 + t.Fatalf("seed repos: %v", err) 368 + } 369 + if _, err := tdb.Exec(`insert into repo_records (did, collection, rkey, cid) values 370 + ('did:plc:akshay', 'sh.tangled.repo', '3kspindlerkey00a', 'bafyone'), 371 + ('did:plc:boltless', 'sh.tangled.repo', '3kspindlerkey00b', 'bafytwo') 372 + `); err != nil { 373 + t.Fatalf("seed records: %v", err) 374 + } 375 + tdb.Close() 376 + 377 + if err := nudgeTapForResync(ctx, d, tapPath, logger); err != nil { 378 + t.Fatalf("nudgeTapForResync: %v", err) 379 + } 380 + 381 + if got := tapRecordCount(t, tapPath); got != 0 { 382 + t.Errorf("expected repo_records cleared, got %d", got) 383 + } 384 + if got := tapRepoState(t, tapPath, "did:plc:akshay"); got != "desynchronized" { 385 + t.Errorf("active should flip to desynchronized, got %s", got) 386 + } 387 + if got := tapRepoState(t, tapPath, "did:plc:boltless"); got != "desynchronized" { 388 + t.Errorf("error should flip to desynchronized, got %s", got) 389 + } 390 + if got := tapRepoState(t, tapPath, "did:plc:limpet"); got != "pending" { 391 + t.Errorf("pending should not be touched, got %s", got) 392 + } 393 + 394 + tdb2, err := sql.Open("sqlite3", tapPath) 395 + if err != nil { 396 + t.Fatalf("reopen tap db: %v", err) 397 + } 398 + if _, err := tdb2.Exec(`update repos set state = 'active' where did = 'did:plc:akshay'`); err != nil { 399 + t.Fatalf("reseed: %v", err) 400 + } 401 + tdb2.Close() 402 + 403 + if err := nudgeTapForResync(ctx, d, tapPath, logger); err != nil { 404 + t.Fatalf("nudgeTapForResync second run: %v", err) 405 + } 406 + if got := tapRepoState(t, tapPath, "did:plc:akshay"); got != "active" { 407 + t.Errorf("idempotent re-run should not touch state, got %s", got) 408 + } 409 + 410 + var marked int 411 + if err := d.QueryRow( 412 + `select count(*) from migrations where name = ?`, 413 + "force-tap-repo-resync-v1", 414 + ).Scan(&marked); err != nil { 415 + t.Fatalf("query migrations: %v", err) 416 + } 417 + if marked != 1 { 418 + t.Errorf("expected flag recorded exactly once, got %d", marked) 419 + } 420 + } 421 + 422 + func TestNudgeTapForResync_MissingDB(t *testing.T) { 423 + ctx := context.Background() 424 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 425 + d := newTestSpindleDB(t) 426 + 427 + missing := filepath.Join(t.TempDir(), "absent.db") 428 + 429 + if err := nudgeTapForResync(ctx, d, missing, logger); err != nil { 430 + t.Fatalf("missing tap db should succeed: %v", err) 431 + } 432 + 433 + var marked int 434 + if err := d.QueryRow( 435 + `select count(*) from migrations where name = ?`, 436 + "force-tap-repo-resync-v1", 437 + ).Scan(&marked); err != nil { 438 + t.Fatalf("query migrations: %v", err) 439 + } 440 + if marked != 1 { 441 + t.Errorf("expected flag recorded even when tap db absent, got %d", marked) 442 + } 443 + } 444 + 445 + func TestNudgeTapForResync_EmptyPath(t *testing.T) { 446 + ctx := context.Background() 447 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 448 + d := newTestSpindleDB(t) 449 + 450 + if err := nudgeTapForResync(ctx, d, "", logger); err == nil { 451 + t.Errorf("expected error for empty tap db path") 452 + } 453 + 454 + var marked int 455 + if err := d.QueryRow( 456 + `select count(*) from migrations where name = ?`, 457 + "force-tap-repo-resync-v1", 458 + ).Scan(&marked); err != nil { 459 + t.Fatalf("query migrations: %v", err) 460 + } 461 + if marked != 0 { 462 + t.Errorf("empty path should not mark flag, got %d", marked) 463 + } 464 + } 465 + 466 + func TestRunStartupMigrations_NonEmbedSkipsTapNudge(t *testing.T) { 467 + ctx := context.Background() 468 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 469 + d := newTestSpindleDB(t) 470 + 471 + if err := runStartupMigrations(ctx, d, false, "", logger); err != nil { 472 + t.Fatalf("non-embed should not error on empty path: %v", err) 473 + } 474 + 475 + var marked int 476 + if err := d.QueryRow( 477 + `select count(*) from migrations where name = ?`, 478 + "force-tap-repo-resync-v1", 479 + ).Scan(&marked); err != nil { 480 + t.Fatalf("query migrations: %v", err) 481 + } 482 + if marked != 0 { 483 + t.Errorf("non-embed mode should skip tap nudge flag, got %d", marked) 484 + } 485 + } 486 + 487 + func TestCleanupOrphanRepos_DeletesWhenSiblingExists(t *testing.T) { 488 + ctx := context.Background() 489 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 490 + d := newTestSpindleDB(t) 491 + 492 + owner := "did:plc:akshay" 493 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 494 + ('k', ?, 'legacy_name', null, null), 495 + ('k', ?, '3kspindlerkey00a', 'did:plc:boltless', '2024-01-01T00:00:00Z')`, 496 + owner, owner); err != nil { 497 + t.Fatalf("seed: %v", err) 498 + } 499 + 500 + if err := cleanupOrphanRepos(ctx, d, logger); err != nil { 501 + t.Fatalf("cleanupOrphanRepos: %v", err) 502 + } 503 + 504 + var nullCount int 505 + if err := d.QueryRow(`select count(*) from repos where repo_did is null`).Scan(&nullCount); err != nil { 506 + t.Fatalf("null count: %v", err) 507 + } 508 + if nullCount != 0 { 509 + t.Errorf("orphan should be deleted when sibling exists, got %d remaining", nullCount) 510 + } 511 + 512 + var sibCount int 513 + if err := d.QueryRow(`select count(*) from repos where repo_did is not null`).Scan(&sibCount); err != nil { 514 + t.Fatalf("sibling count: %v", err) 515 + } 516 + if sibCount != 1 { 517 + t.Errorf("sibling row should be preserved, got %d", sibCount) 518 + } 519 + } 520 + 521 + func TestCleanupOrphanRepos_KeepsWhenAlone(t *testing.T) { 522 + ctx := context.Background() 523 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 524 + d := newTestSpindleDB(t) 525 + 526 + owner := "did:plc:akshay" 527 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 528 + ('k', ?, 'legacy_name', null, null)`, owner); err != nil { 529 + t.Fatalf("seed: %v", err) 530 + } 531 + 532 + if err := cleanupOrphanRepos(ctx, d, logger); err != nil { 533 + t.Fatalf("cleanupOrphanRepos: %v", err) 534 + } 535 + 536 + var remaining int 537 + if err := d.QueryRow(`select count(*) from repos where owner = ?`, owner).Scan(&remaining); err != nil { 538 + t.Fatalf("count: %v", err) 539 + } 540 + if remaining != 1 { 541 + t.Errorf("orphan with no sibling should be kept (preserves owner registration), got %d", remaining) 542 + } 543 + } 544 + 545 + func TestCleanupOrphanRepos_PerOwnerScope(t *testing.T) { 546 + ctx := context.Background() 547 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 548 + d := newTestSpindleDB(t) 549 + 550 + ownerA := "did:plc:akshay" 551 + ownerB := "did:plc:limpet" 552 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 553 + ('k', ?, 'legacy_a', null, null), 554 + ('k', ?, '3krealkey', 'did:plc:boltless', '2024-01-01T00:00:00Z'), 555 + ('k', ?, 'legacy_b', null, null)`, 556 + ownerA, ownerA, ownerB); err != nil { 557 + t.Fatalf("seed: %v", err) 558 + } 559 + 560 + if err := cleanupOrphanRepos(ctx, d, logger); err != nil { 561 + t.Fatalf("cleanupOrphanRepos: %v", err) 562 + } 563 + 564 + var ownerARows, ownerBRows int 565 + if err := d.QueryRow(`select count(*) from repos where owner = ?`, ownerA).Scan(&ownerARows); err != nil { 566 + t.Fatalf("count A: %v", err) 567 + } 568 + if ownerARows != 1 { 569 + t.Errorf("ownerA: orphan should be deleted (sibling exists), expected 1 row, got %d", ownerARows) 570 + } 571 + if err := d.QueryRow(`select count(*) from repos where owner = ?`, ownerB).Scan(&ownerBRows); err != nil { 572 + t.Fatalf("count B: %v", err) 573 + } 574 + if ownerBRows != 1 { 575 + t.Errorf("ownerB: orphan should be kept (no sibling), expected 1 row, got %d", ownerBRows) 576 + } 577 + } 578 + 579 + func TestCleanupOrphanRepos_EmptyStringRepoDid(t *testing.T) { 580 + ctx := context.Background() 581 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 582 + d := newTestSpindleDB(t) 583 + 584 + owner := "did:plc:akshay" 585 + if _, err := d.Exec(`insert into repos (knot, owner, rkey, repo_did, created_at) values 586 + ('k', ?, 'legacy_empty', '', null), 587 + ('k', ?, '3krealkey', 'did:plc:boltless', '2024-01-01T00:00:00Z')`, 588 + owner, owner); err != nil { 589 + t.Fatalf("seed: %v", err) 590 + } 591 + 592 + if err := cleanupOrphanRepos(ctx, d, logger); err != nil { 593 + t.Fatalf("cleanupOrphanRepos: %v", err) 594 + } 595 + 596 + var emptyCount int 597 + if err := d.QueryRow(`select count(*) from repos where coalesce(repo_did, '') = ''`).Scan(&emptyCount); err != nil { 598 + t.Fatalf("empty count: %v", err) 599 + } 600 + if emptyCount != 0 { 601 + t.Errorf("empty-string repo_did orphan should be deleted when sibling exists, got %d remaining", emptyCount) 602 + } 603 + }
+12
spindle/tapclient.go
··· 133 133 return fmt.Errorf("add repo: %w", err) 134 134 } 135 135 136 + legacyName := "" 137 + if record.Name != nil { 138 + legacyName = *record.Name 139 + } 140 + migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid) 141 + 136 142 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil { 137 143 l.Warn("collapse rename siblings failed", "err", err) 138 144 } else if removed > 0 { ··· 332 338 cutoff := time.Now().Add(-pendingCollabTTL) 333 339 t.pendingMu.Lock() 334 340 defer t.pendingMu.Unlock() 341 + expired := 0 335 342 for did, list := range t.pendingCollabs { 336 343 kept := list[:0] 337 344 for _, p := range list { 338 345 if !p.at.Before(cutoff) { 339 346 kept = append(kept, p) 347 + } else { 348 + expired++ 340 349 } 341 350 } 342 351 if len(kept) == 0 { ··· 344 353 } else { 345 354 t.pendingCollabs[did] = kept 346 355 } 356 + } 357 + if expired > 0 { 358 + t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL) 347 359 } 348 360 }