Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "os" 9 "strings" 10 11 securejoin "github.com/cyphar/filepath-securejoin" 12 _ "github.com/mattn/go-sqlite3" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15) 16 17type DB struct { 18 db *sql.DB 19 logger *slog.Logger 20} 21 22type Querier interface { 23 QueryRow(query string, args ...any) *sql.Row 24 Exec(query string, args ...any) (sql.Result, error) 25} 26 27func Setup(ctx context.Context, dbPath string) (*DB, error) { 28 // https://github.com/mattn/go-sqlite3#connection-string 29 opts := []string{ 30 "_foreign_keys=1", 31 "_journal_mode=WAL", 32 "_synchronous=NORMAL", 33 "_auto_vacuum=incremental", 34 "_busy_timeout=5000", 35 } 36 37 logger := log.FromContext(ctx) 38 logger = log.SubLogger(logger, "db") 39 40 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 41 if err != nil { 42 return nil, err 43 } 44 45 conn, err := db.Conn(ctx) 46 if err != nil { 47 return nil, err 48 } 49 defer conn.Close() 50 51 _, err = conn.ExecContext(ctx, ` 52 create table if not exists known_dids ( 53 did text primary key 54 ); 55 56 create table if not exists public_keys ( 57 id integer primary key autoincrement, 58 did text not null, 59 key text not null, 60 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 61 unique(did, key), 62 foreign key (did) references known_dids(did) on delete cascade 63 ); 64 65 create table if not exists _jetstream ( 66 id integer primary key autoincrement, 67 last_time_us integer not null 68 ); 69 70 create table if not exists events ( 71 rkey text not null, 72 nsid text not null, 73 event text not null, -- json 74 created integer not null default (strftime('%s', 'now')), 75 primary key (rkey, nsid) 76 ); 77 78 create table if not exists repo_keys ( 79 repo_did text primary key, 80 signing_key blob not null, 81 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 82 ); 83 84 create table if not exists migrations ( 85 id integer primary key autoincrement, 86 name text unique 87 ); 88 `) 89 if err != nil { 90 return nil, err 91 } 92 93 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error { 94 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`) 95 return mErr 96 }); err != nil { 97 return nil, err 98 } 99 100 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error { 101 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`) 102 return mErr 103 }); err != nil { 104 return nil, err 105 } 106 107 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error { 108 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`) 109 return mErr 110 }); err != nil { 111 return nil, err 112 } 113 114 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error { 115 _, mErr := tx.ExecContext(ctx, ` 116 create table repo_keys_new ( 117 repo_did text primary key, 118 signing_key blob, 119 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 120 owner_did text, 121 repo_name text, 122 at_uri text, 123 key_type text not null default 'k256' 124 ); 125 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 126 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256' 127 from repo_keys; 128 drop table repo_keys; 129 alter table repo_keys_new rename to repo_keys; 130 create unique index if not exists idx_repo_keys_owner_repo 131 on repo_keys(owner_did, repo_name); 132 `) 133 return mErr 134 }); err != nil { 135 return nil, err 136 } 137 138 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error { 139 _, mErr := tx.ExecContext(ctx, ` 140 create table if not exists repo_aliases ( 141 owner_did text not null, 142 rkey text not null, 143 repo_did text not null, 144 rev text not null, 145 primary key (owner_did, rkey) 146 ); 147 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did); 148 149 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev) 150 select owner_did, repo_name, repo_did, '1_' || created_at 151 from repo_keys 152 where owner_did is not null and repo_name is not null and repo_did is not null; 153 `) 154 return mErr 155 }); err != nil { 156 return nil, err 157 } 158 159 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error { 160 _, mErr := tx.ExecContext(ctx, ` 161 create table repo_keys_new ( 162 repo_did text primary key, 163 signing_key blob, 164 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 165 owner_did text, 166 repo_name text, 167 key_type text not null default 'k256' 168 ); 169 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 170 select repo_did, signing_key, created_at, owner_did, repo_name, key_type 171 from repo_keys; 172 drop table repo_keys; 173 alter table repo_keys_new rename to repo_keys; 174 create unique index if not exists idx_repo_keys_owner_repo 175 on repo_keys(owner_did, repo_name); 176 `) 177 return mErr 178 }); err != nil { 179 return nil, err 180 } 181 182 return &DB{ 183 db: db, 184 logger: logger, 185 }, nil 186} 187 188func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error { 189 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256") 190} 191 192func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error { 193 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web") 194} 195 196func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) (err error) { 197 tx, err := d.db.Begin() 198 if err != nil { 199 return err 200 } 201 defer func() { 202 if err != nil { 203 tx.Rollback() 204 return 205 } 206 err = tx.Commit() 207 }() 208 209 if _, err = tx.Exec( 210 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`, 211 repoDid, signingKey, ownerDid, repoName, keyType, 212 ); err != nil { 213 return err 214 } 215 216 _, err = tx.Exec( 217 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev) 218 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 219 ON CONFLICT(owner_did, rkey) DO NOTHING`, 220 ownerDid, repoName, repoDid, 221 ) 222 return err 223} 224 225func (d *DB) DeleteRepoKey(repoDid string) error { 226 _, err := d.db.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid) 227 return err 228} 229 230func (d *DB) RepoDidExists(repoDid string) (bool, error) { 231 var count int 232 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 233 return count > 0, err 234} 235 236func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) { 237 var repoDid string 238 err := d.db.QueryRow( 239 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`, 240 ownerDid, rkey, 241 ).Scan(&repoDid) 242 return repoDid, err 243} 244 245func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) { 246 return GetRepoKeyOwner(d.db, repoDid) 247} 248 249func GetRepoKeyOwner(q Querier, repoDid string) (ownerDid string, repoName string, err error) { 250 err = q.QueryRow( 251 `SELECT owner_did, rkey FROM repo_aliases 252 WHERE repo_did = ? 253 ORDER BY rev DESC 254 LIMIT 1`, 255 repoDid, 256 ).Scan(&ownerDid, &repoName) 257 if err != nil { 258 return 259 } 260 if ownerDid == "" || repoName == "" { 261 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid) 262 return 263 } 264 return 265} 266 267func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) { 268 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid) 269 if err != nil { 270 return 271 } 272 273 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid) 274 if joinErr != nil { 275 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr) 276 return 277 } 278 279 if _, statErr := os.Stat(didPath); statErr != nil { 280 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath) 281 return 282 } 283 284 repoPath = didPath 285 return 286}