Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "os" 9 "strings" 10 11 securejoin "github.com/cyphar/filepath-securejoin" 12 _ "github.com/mattn/go-sqlite3" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15) 16 17type DB struct { 18 db *sql.DB 19 logger *slog.Logger 20} 21 22type DBTX interface { 23 QueryRow(query string, args ...any) *sql.Row 24 Exec(query string, args ...any) (sql.Result, error) 25} 26 27func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { 28 return d.db.BeginTx(ctx, opts) 29} 30 31func (d *DB) Exec(query string, args ...any) (sql.Result, error) { 32 return d.db.Exec(query, args...) 33} 34 35func (d *DB) QueryRow(query string, args ...any) *sql.Row { 36 return d.db.QueryRow(query, args...) 37} 38 39func Setup(ctx context.Context, dbPath string) (*DB, error) { 40 // https://github.com/mattn/go-sqlite3#connection-string 41 opts := []string{ 42 "_foreign_keys=1", 43 "_journal_mode=WAL", 44 "_synchronous=NORMAL", 45 "_auto_vacuum=incremental", 46 "_busy_timeout=5000", 47 } 48 49 logger := log.FromContext(ctx) 50 logger = log.SubLogger(logger, "db") 51 52 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 53 if err != nil { 54 return nil, err 55 } 56 57 conn, err := db.Conn(ctx) 58 if err != nil { 59 return nil, err 60 } 61 defer conn.Close() 62 63 _, err = conn.ExecContext(ctx, ` 64 create table if not exists known_dids ( 65 did text primary key 66 ); 67 68 create table if not exists public_keys ( 69 id integer primary key autoincrement, 70 did text not null, 71 key text not null, 72 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 73 unique(did, key), 74 foreign key (did) references known_dids(did) on delete cascade 75 ); 76 77 create table if not exists _jetstream ( 78 id integer primary key autoincrement, 79 last_time_us integer not null 80 ); 81 82 create table if not exists events ( 83 rkey text not null, 84 nsid text not null, 85 event text not null, -- json 86 created integer not null default (strftime('%s', 'now')), 87 primary key (rkey, nsid) 88 ); 89 90 create table if not exists repo_keys ( 91 repo_did text primary key, 92 signing_key blob not null, 93 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 94 ); 95 96 create table if not exists migrations ( 97 id integer primary key autoincrement, 98 name text unique 99 ); 100 `) 101 if err != nil { 102 return nil, err 103 } 104 105 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error { 106 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`) 107 return mErr 108 }); err != nil { 109 return nil, err 110 } 111 112 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error { 113 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`) 114 return mErr 115 }); err != nil { 116 return nil, err 117 } 118 119 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error { 120 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`) 121 return mErr 122 }); err != nil { 123 return nil, err 124 } 125 126 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error { 127 _, mErr := tx.ExecContext(ctx, ` 128 create table repo_keys_new ( 129 repo_did text primary key, 130 signing_key blob, 131 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 132 owner_did text, 133 repo_name text, 134 at_uri text, 135 key_type text not null default 'k256' 136 ); 137 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 138 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256' 139 from repo_keys; 140 drop table repo_keys; 141 alter table repo_keys_new rename to repo_keys; 142 create unique index if not exists idx_repo_keys_owner_repo 143 on repo_keys(owner_did, repo_name); 144 `) 145 return mErr 146 }); err != nil { 147 return nil, err 148 } 149 150 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error { 151 _, mErr := tx.ExecContext(ctx, ` 152 create table if not exists repo_aliases ( 153 owner_did text not null, 154 rkey text not null, 155 repo_did text not null, 156 rev text not null, 157 primary key (owner_did, rkey) 158 ); 159 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did); 160 161 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev) 162 select owner_did, repo_name, repo_did, '1_' || created_at 163 from repo_keys 164 where owner_did is not null and repo_name is not null and repo_did is not null; 165 `) 166 return mErr 167 }); err != nil { 168 return nil, err 169 } 170 171 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error { 172 _, mErr := tx.ExecContext(ctx, ` 173 create table repo_keys_new ( 174 repo_did text primary key, 175 signing_key blob, 176 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 177 owner_did text, 178 repo_name text, 179 key_type text not null default 'k256' 180 ); 181 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 182 select repo_did, signing_key, created_at, owner_did, repo_name, key_type 183 from repo_keys; 184 drop table repo_keys; 185 alter table repo_keys_new rename to repo_keys; 186 create unique index if not exists idx_repo_keys_owner_repo 187 on repo_keys(owner_did, repo_name); 188 `) 189 return mErr 190 }); err != nil { 191 return nil, err 192 } 193 194 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error { 195 _, mErr := tx.ExecContext(ctx, ` 196 create table if not exists knot_members ( 197 id integer primary key autoincrement, 198 did text not null, 199 rkey text not null, 200 subject text not null, 201 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 202 unique (did, rkey) 203 ); 204 `) 205 return mErr 206 }); err != nil { 207 return nil, err 208 } 209 210 return &DB{ 211 db: db, 212 logger: logger, 213 }, nil 214} 215 216func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error { 217 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256") 218} 219 220func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error { 221 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web") 222} 223 224func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error { 225 tx, err := d.db.Begin() 226 if err != nil { 227 return err 228 } 229 defer tx.Rollback() 230 231 if _, err := tx.Exec( 232 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`, 233 repoDid, signingKey, ownerDid, repoName, keyType, 234 ); err != nil { 235 return err 236 } 237 238 if _, err := tx.Exec( 239 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev) 240 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 241 ON CONFLICT(owner_did, rkey) DO NOTHING`, 242 ownerDid, repoName, repoDid, 243 ); err != nil { 244 return err 245 } 246 247 return tx.Commit() 248} 249 250func (d *DB) DeleteRepoKey(repoDid string) error { 251 tx, err := d.db.Begin() 252 if err != nil { 253 return err 254 } 255 defer tx.Rollback() 256 257 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil { 258 return err 259 } 260 261 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil { 262 return err 263 } 264 265 return tx.Commit() 266} 267 268func (d *DB) RepoDidExists(repoDid string) (bool, error) { 269 var count int 270 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 271 return count > 0, err 272} 273 274func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) { 275 var repoDid string 276 err := d.db.QueryRow( 277 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`, 278 ownerDid, rkey, 279 ).Scan(&repoDid) 280 return repoDid, err 281} 282 283func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) { 284 var repoDid string 285 err := d.db.QueryRow( 286 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`, 287 ownerDid, repoName, 288 ).Scan(&repoDid) 289 return repoDid, err 290} 291 292func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) { 293 return GetRepoKeyOwner(d.db, repoDid) 294} 295 296func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) { 297 err = q.QueryRow( 298 `SELECT owner_did, rkey FROM repo_aliases 299 WHERE repo_did = ? 300 ORDER BY rev DESC 301 LIMIT 1`, 302 repoDid, 303 ).Scan(&ownerDid, &repoName) 304 if err != nil { 305 return 306 } 307 if ownerDid == "" || repoName == "" { 308 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid) 309 return 310 } 311 return 312} 313 314func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) { 315 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid) 316 if err != nil { 317 return 318 } 319 320 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid) 321 if joinErr != nil { 322 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr) 323 return 324 } 325 326 if _, statErr := os.Stat(didPath); statErr != nil { 327 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath) 328 return 329 } 330 331 repoPath = didPath 332 return 333}