Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

spindle: key repos and collabs on repoDID

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 12, 2026, 11:59 AM +0300) commit b66498b9 parent 66b56842 change-id mrsvstwx
+314 -56
+94
spindle/db/collaborators.go
··· 1 + package db 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + ) 9 + 10 + type RepoCollaborator struct { 11 + OwnerDid syntax.DID 12 + Rkey syntax.RecordKey 13 + Subject syntax.DID 14 + RepoDid syntax.DID 15 + } 16 + 17 + func (d *DB) AddRepoCollaborator(c RepoCollaborator) error { 18 + _, err := d.Exec( 19 + `insert into repo_collaborators (owner_did, rkey, subject, repo_did) 20 + values (?, ?, ?, ?) 21 + on conflict(owner_did, rkey) do update set 22 + subject = excluded.subject, 23 + repo_did = excluded.repo_did`, 24 + c.OwnerDid.String(), c.Rkey.String(), c.Subject.String(), c.RepoDid.String(), 25 + ) 26 + return err 27 + } 28 + 29 + func scanCollab(row interface{ Scan(...any) error }) (*RepoCollaborator, error) { 30 + var owner, rkey, subject, repoDid string 31 + if err := row.Scan(&owner, &rkey, &subject, &repoDid); err != nil { 32 + return nil, err 33 + } 34 + return &RepoCollaborator{ 35 + OwnerDid: syntax.DID(owner), 36 + Rkey: syntax.RecordKey(rkey), 37 + Subject: syntax.DID(subject), 38 + RepoDid: syntax.DID(repoDid), 39 + }, nil 40 + } 41 + 42 + func (d *DB) GetRepoCollaborator(ownerDid syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) { 43 + return scanCollab(d.QueryRow( 44 + `select owner_did, rkey, subject, repo_did from repo_collaborators where owner_did = ? and rkey = ?`, 45 + ownerDid.String(), rkey.String(), 46 + )) 47 + } 48 + 49 + func (d *DB) DeleteRepoCollaborator(ownerDid syntax.DID, rkey syntax.RecordKey) error { 50 + res, err := d.Exec(`delete from repo_collaborators where owner_did = ? and rkey = ?`, ownerDid.String(), rkey.String()) 51 + if err != nil { 52 + return err 53 + } 54 + n, err := res.RowsAffected() 55 + if err != nil { 56 + return err 57 + } 58 + if n == 0 { 59 + return sql.ErrNoRows 60 + } 61 + return nil 62 + } 63 + 64 + func (d *DB) DeleteRepoCollaboratorsByRepoDid(repoDid syntax.DID) error { 65 + _, err := d.Exec(`delete from repo_collaborators where repo_did = ?`, repoDid.String()) 66 + if err != nil { 67 + return fmt.Errorf("delete collaborators for %s: %w", repoDid, err) 68 + } 69 + return nil 70 + } 71 + 72 + func (d *DB) ListCollaboratorsByRepoDid(repoDid syntax.DID) ([]RepoCollaborator, error) { 73 + rows, err := d.Query( 74 + `select owner_did, rkey, subject, repo_did from repo_collaborators where repo_did = ?`, 75 + repoDid.String(), 76 + ) 77 + if err != nil { 78 + return nil, fmt.Errorf("list collaborators for %s: %w", repoDid, err) 79 + } 80 + defer rows.Close() 81 + 82 + var out []RepoCollaborator 83 + for rows.Next() { 84 + c, err := scanCollab(rows) 85 + if err != nil { 86 + return nil, err 87 + } 88 + out = append(out, *c) 89 + } 90 + if err := rows.Err(); err != nil { 91 + return nil, err 92 + } 93 + return out, nil 94 + }
+96 -11
spindle/db/db.go
··· 1 1 package db 2 2 3 3 import ( 4 + "context" 4 5 "database/sql" 6 + "log/slog" 5 7 "strings" 6 8 7 9 _ "github.com/mattn/go-sqlite3" 10 + "tangled.org/core/log" 11 + "tangled.org/core/orm" 8 12 ) 9 13 10 14 type DB struct { 11 15 *sql.DB 12 16 } 13 17 14 - func Make(dbPath string) (*DB, error) { 18 + func Make(ctx context.Context, dbPath string) (*DB, error) { 15 19 // https://github.com/mattn/go-sqlite3#connection-string 16 20 opts := []string{ 17 21 "_foreign_keys=1", ··· 21 25 "_busy_timeout=5000", 22 26 } 23 27 28 + logger := log.FromContext(ctx) 29 + logger = log.SubLogger(logger, "db") 30 + 24 31 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 25 32 if err != nil { 26 33 return nil, err 27 34 } 28 35 29 - // NOTE: If any other migration is added here, you MUST 30 - // copy the pattern in appview: use a single sql.Conn 31 - // for every migration. 36 + conn, err := db.Conn(ctx) 37 + if err != nil { 38 + return nil, err 39 + } 40 + defer conn.Close() 32 41 33 - _, err = db.Exec(` 42 + _, err = conn.ExecContext(ctx, ` 34 43 create table if not exists _jetstream ( 35 44 id integer primary key autoincrement, 36 45 last_time_us integer not null ··· 41 50 ); 42 51 43 52 create table if not exists repos ( 44 - id integer primary key autoincrement, 45 - knot text not null, 46 - owner text not null, 47 - name text not null, 48 - addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 53 + id integer primary key autoincrement, 54 + knot text not null, 55 + owner text not null, 56 + rkey text not null, 57 + repo_did text, 58 + created_at text, 59 + addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 49 60 50 - unique(owner, name) 61 + unique(owner, rkey) 62 + ); 63 + 64 + create table if not exists repo_collaborators ( 65 + id integer primary key autoincrement, 66 + owner_did text not null, 67 + rkey text not null, 68 + subject text not null, 69 + repo_did text not null, 70 + addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 71 + 72 + unique(owner_did, rkey) 51 73 ); 52 74 53 75 create table if not exists spindle_members ( ··· 72 94 event text not null, -- json 73 95 created integer not null -- unix nanos 74 96 ); 97 + 98 + create table if not exists migrations ( 99 + id integer primary key autoincrement, 100 + name text unique 101 + ); 75 102 `) 76 103 if err != nil { 77 104 return nil, err 78 105 } 79 106 107 + if err := runMigrations(ctx, conn, logger); err != nil { 108 + return nil, err 109 + } 110 + 80 111 return &DB{db}, nil 112 + } 113 + 114 + func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error { 115 + return orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error { 116 + var hasName int 117 + if err := tx.QueryRow( 118 + `select count(*) from pragma_table_info('repos') where name = 'name'`, 119 + ).Scan(&hasName); err != nil { 120 + return err 121 + } 122 + 123 + if hasName > 0 { 124 + var totalRows, copiedRows int 125 + if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil { 126 + return err 127 + } 128 + if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil { 129 + return err 130 + } 131 + if dropped := totalRows - copiedRows; dropped > 0 { 132 + logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows) 133 + } 134 + 135 + if _, err := tx.Exec(` 136 + create table if not exists repos_new ( 137 + id integer primary key autoincrement, 138 + knot text not null, 139 + owner text not null, 140 + rkey text not null, 141 + repo_did text, 142 + created_at text, 143 + addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 144 + 145 + unique(owner, rkey) 146 + ); 147 + 148 + insert into repos_new (id, knot, owner, rkey, addedAt) 149 + select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> ''; 150 + 151 + drop table repos; 152 + alter table repos_new rename to repos; 153 + `); err != nil { 154 + return err 155 + } 156 + } 157 + 158 + _, err := tx.Exec(` 159 + create index if not exists idx_repos_repo_did on repos(repo_did); 160 + create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did); 161 + create index if not exists idx_repo_collaborators_repo_did 162 + on repo_collaborators(repo_did); 163 + `) 164 + return err 165 + }) 81 166 } 82 167 83 168 func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
+92 -13
spindle/db/repos.go
··· 1 1 package db 2 2 3 + import ( 4 + "database/sql" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + ) 8 + 3 9 type Repo struct { 4 - Knot string 5 - Owner string 6 - Name string 10 + Knot string 11 + Owner syntax.DID 12 + Rkey syntax.RecordKey 13 + RepoDid syntax.DID 14 + CreatedAt string 7 15 } 8 16 9 - func (d *DB) AddRepo(knot, owner, name string) error { 10 - _, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name) 17 + func (d *DB) AddRepo(repo Repo) error { 18 + var createdAt sql.NullString 19 + if repo.CreatedAt != "" { 20 + createdAt = sql.NullString{String: repo.CreatedAt, Valid: true} 21 + } 22 + _, err := d.Exec( 23 + `insert into repos (knot, owner, rkey, repo_did, created_at) 24 + values (?, ?, ?, ?, ?) 25 + on conflict(owner, rkey) do update set 26 + knot = excluded.knot, 27 + repo_did = excluded.repo_did, 28 + created_at = coalesce(excluded.created_at, repos.created_at)`, 29 + repo.Knot, repo.Owner.String(), repo.Rkey.String(), repo.RepoDid.String(), createdAt, 30 + ) 11 31 return err 12 32 } 13 33 34 + func (d *DB) CollapseRepoSiblings(owner, repoDid syntax.DID) (int64, error) { 35 + res, err := d.Exec( 36 + `delete from repos 37 + where owner = ? 38 + and repo_did = ? 39 + and created_at is not null 40 + and created_at < ( 41 + select max(created_at) from repos 42 + where owner = ? and repo_did = ? and created_at is not null 43 + )`, 44 + owner.String(), repoDid.String(), owner.String(), repoDid.String(), 45 + ) 46 + if err != nil { 47 + return 0, err 48 + } 49 + return res.RowsAffected() 50 + } 51 + 14 52 func (d *DB) Knots() ([]string, error) { 15 - rows, err := d.Query(`select knot from repos`) 53 + rows, err := d.Query(`select distinct knot from repos`) 16 54 if err != nil { 17 55 return nil, err 18 56 } ··· 27 65 knots = append(knots, knot) 28 66 } 29 67 30 - if err = rows.Err(); err != nil { 68 + if err := rows.Err(); err != nil { 31 69 return nil, err 32 70 } 33 71 34 72 return knots, nil 35 73 } 36 74 37 - func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) { 38 - var repo Repo 75 + func scanRepo(row interface{ Scan(...any) error }) (*Repo, error) { 76 + var knot, owner, rkey, repoDid string 77 + if err := row.Scan(&knot, &owner, &rkey, &repoDid); err != nil { 78 + return nil, err 79 + } 80 + return &Repo{ 81 + Knot: knot, 82 + Owner: syntax.DID(owner), 83 + Rkey: syntax.RecordKey(rkey), 84 + RepoDid: syntax.DID(repoDid), 85 + }, nil 86 + } 39 87 40 - query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?" 41 - err := d.DB.QueryRow(query, knot, owner, name). 42 - Scan(&repo.Knot, &repo.Owner, &repo.Name) 88 + func (d *DB) GetRepoByDid(repoDid syntax.DID) (*Repo, error) { 89 + return scanRepo(d.QueryRow( 90 + `select knot, owner, rkey, coalesce(repo_did, '') from repos where repo_did = ?`, 91 + repoDid.String(), 92 + )) 93 + } 43 94 95 + func (d *DB) GetRepoByOwnerRkey(owner syntax.DID, rkey syntax.RecordKey) (*Repo, error) { 96 + return scanRepo(d.QueryRow( 97 + `select knot, owner, rkey, coalesce(repo_did, '') from repos where owner = ? and rkey = ?`, 98 + owner.String(), rkey.String(), 99 + )) 100 + } 101 + 102 + func (d *DB) AllRepos() ([]Repo, error) { 103 + rows, err := d.Query(`select knot, owner, rkey, coalesce(repo_did, '') from repos`) 44 104 if err != nil { 45 105 return nil, err 46 106 } 107 + defer rows.Close() 47 108 48 - return &repo, nil 109 + var repos []Repo 110 + for rows.Next() { 111 + r, err := scanRepo(rows) 112 + if err != nil { 113 + return nil, err 114 + } 115 + repos = append(repos, *r) 116 + } 117 + 118 + if err := rows.Err(); err != nil { 119 + return nil, err 120 + } 121 + 122 + return repos, nil 123 + } 124 + 125 + func (d *DB) DeleteRepoByOwnerRkey(owner syntax.DID, rkey syntax.RecordKey) error { 126 + _, err := d.Exec(`delete from repos where owner = ? and rkey = ?`, owner.String(), rkey.String()) 127 + return err 49 128 }
+2 -3
spindle/engine/engine.go
··· 8 8 "path/filepath" 9 9 "sync" 10 10 11 - securejoin "github.com/cyphar/filepath-securejoin" 12 11 "tangled.org/core/notifier" 13 12 "tangled.org/core/spindle/config" 14 13 "tangled.org/core/spindle/db" ··· 26 25 27 26 // extract secrets 28 27 var allSecrets []secrets.UnlockedSecret 29 - if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 30 - if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(didSlashRepo)); err == nil { 28 + if pipeline.RepoDid != "" { 29 + if res, err := vault.GetSecretsUnlocked(ctx, secrets.RepoIdentifier(pipeline.RepoDid.String())); err == nil { 31 30 allSecrets = res 32 31 } 33 32 }
+3 -2
spindle/models/pipeline.go
··· 1 1 package models 2 2 3 + import "github.com/bluesky-social/indigo/atproto/syntax" 4 + 3 5 type Pipeline struct { 4 - RepoOwner string 5 - RepoName string 6 + RepoDid syntax.DID 6 7 Workflows map[Engine][]Workflow 7 8 } 8 9
+7 -7
spindle/xrpc/add_secret.go
··· 9 9 "github.com/bluesky-social/indigo/api/atproto" 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/xrpc" 12 - securejoin "github.com/cyphar/filepath-securejoin" 13 12 "tangled.org/core/api/tangled" 14 13 "tangled.org/core/rbac" 15 14 "tangled.org/core/spindle/secrets" ··· 61 60 return 62 61 } 63 62 64 - if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 63 + repoRec, ok := resp.Value.Val.(*tangled.Repo) 64 + if !ok { 65 65 fail(xrpcerr.RepoNotFoundError) 66 66 return 67 67 } 68 - didPath, err := securejoin.SecureJoin(ident.DID.String(), repoAt.RecordKey().String()) 69 - if err != nil { 70 - fail(xrpcerr.GenericError(err)) 68 + if repoRec.RepoDid == nil || *repoRec.RepoDid == "" { 69 + fail(xrpcerr.GenericError(fmt.Errorf("repo record %s has no repoDid", repoAt))) 71 70 return 72 71 } 72 + repoDid := *repoRec.RepoDid 73 73 74 - if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil { 74 + if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, repoDid); !ok || err != nil { 75 75 l.Error("insufficient permissions", "did", actorDid.String()) 76 76 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 77 77 return 78 78 } 79 79 80 80 secret := secrets.UnlockedSecret{ 81 - Repo: secrets.RepoIdentifier(didPath), 81 + Repo: secrets.RepoIdentifier(repoDid), 82 82 Key: data.Key, 83 83 Value: data.Value, 84 84 CreatedAt: time.Now(),
+7 -7
spindle/xrpc/list_secrets.go
··· 9 9 "github.com/bluesky-social/indigo/api/atproto" 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/xrpc" 12 - securejoin "github.com/cyphar/filepath-securejoin" 13 12 "tangled.org/core/api/tangled" 14 13 "tangled.org/core/rbac" 15 14 "tangled.org/core/spindle/secrets" ··· 56 55 return 57 56 } 58 57 59 - if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 58 + repoRec, ok := resp.Value.Val.(*tangled.Repo) 59 + if !ok { 60 60 fail(xrpcerr.RepoNotFoundError) 61 61 return 62 62 } 63 - didPath, err := securejoin.SecureJoin(ident.DID.String(), repoAt.RecordKey().String()) 64 - if err != nil { 65 - fail(xrpcerr.GenericError(err)) 63 + if repoRec.RepoDid == nil || *repoRec.RepoDid == "" { 64 + fail(xrpcerr.GenericError(fmt.Errorf("repo record %s has no repoDid", repoAt))) 66 65 return 67 66 } 67 + repoDid := *repoRec.RepoDid 68 68 69 - if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil { 69 + if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, repoDid); !ok || err != nil { 70 70 l.Error("insufficient permissions", "did", actorDid.String()) 71 71 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 72 72 return 73 73 } 74 74 75 - ls, err := x.Vault.GetSecretsLocked(r.Context(), secrets.RepoIdentifier(didPath)) 75 + ls, err := x.Vault.GetSecretsLocked(r.Context(), secrets.RepoIdentifier(repoDid)) 76 76 if err != nil { 77 77 l.Error("failed to get secret from vault", "did", actorDid.String(), "err", err) 78 78 writeError(w, xrpcerr.GenericError(err), http.StatusInternalServerError)
+6 -6
spindle/xrpc/pipeline_cancel_pipeline.go
··· 9 9 "github.com/bluesky-social/indigo/api/atproto" 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/xrpc" 12 - securejoin "github.com/cyphar/filepath-securejoin" 13 12 "tangled.org/core/api/tangled" 14 13 "tangled.org/core/rbac" 15 14 "tangled.org/core/spindle/models" ··· 66 65 return 67 66 } 68 67 69 - if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 68 + repoRec, ok := resp.Value.Val.(*tangled.Repo) 69 + if !ok { 70 70 fail(xrpcerr.RepoNotFoundError) 71 71 return 72 72 } 73 - didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repoAt.RecordKey().String()) 74 - if err != nil { 75 - fail(xrpcerr.GenericError(err)) 73 + if repoRec.RepoDid == nil || *repoRec.RepoDid == "" { 74 + fail(xrpcerr.GenericError(fmt.Errorf("repo record %s has no repoDid", repoAt))) 76 75 return 77 76 } 77 + repoDid := *repoRec.RepoDid 78 78 79 79 // TODO: fine-grained role based control 80 - isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 80 + isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, repoDid) 81 81 if err != nil || !isRepoOwner { 82 82 fail(xrpcerr.AccessControlError(actorDid.String())) 83 83 return
+7 -7
spindle/xrpc/remove_secret.go
··· 8 8 "github.com/bluesky-social/indigo/api/atproto" 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 10 "github.com/bluesky-social/indigo/xrpc" 11 - securejoin "github.com/cyphar/filepath-securejoin" 12 11 "tangled.org/core/api/tangled" 13 12 "tangled.org/core/rbac" 14 13 "tangled.org/core/spindle/secrets" ··· 55 54 return 56 55 } 57 56 58 - if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 57 + repoRec, ok := resp.Value.Val.(*tangled.Repo) 58 + if !ok { 59 59 fail(xrpcerr.RepoNotFoundError) 60 60 return 61 61 } 62 - didPath, err := securejoin.SecureJoin(ident.DID.String(), repoAt.RecordKey().String()) 63 - if err != nil { 64 - fail(xrpcerr.GenericError(err)) 62 + if repoRec.RepoDid == nil || *repoRec.RepoDid == "" { 63 + fail(xrpcerr.GenericError(fmt.Errorf("repo record %s has no repoDid", repoAt))) 65 64 return 66 65 } 66 + repoDid := *repoRec.RepoDid 67 67 68 - if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil { 68 + if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, repoDid); !ok || err != nil { 69 69 l.Error("insufficient permissions", "did", actorDid.String()) 70 70 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 71 71 return 72 72 } 73 73 74 74 secret := secrets.Secret[any]{ 75 - Repo: secrets.RepoIdentifier(didPath), 75 + Repo: secrets.RepoIdentifier(repoDid), 76 76 Key: data.Key, 77 77 } 78 78 err = x.Vault.RemoveSecret(r.Context(), secret)