Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/sntnrt 10 kB View raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "os" 9 "strings" 10 "sync" 11 12 securejoin "github.com/cyphar/filepath-securejoin" 13 _ "github.com/mattn/go-sqlite3" 14 "tangled.org/core/log" 15 "tangled.org/core/orm" 16) 17 18type DB struct { 19 db *sql.DB 20 logger *slog.Logger 21 22 // uidAssignMu serialises GetOrAssignOwnerUID across goroutines so that 23 // concurrent callers don't race on the uid_counter read-modify-write. 24 uidAssignMu sync.Mutex 25} 26 27type DBTX interface { 28 QueryRow(query string, args ...any) *sql.Row 29 Exec(query string, args ...any) (sql.Result, error) 30} 31 32func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { 33 return d.db.BeginTx(ctx, opts) 34} 35 36func (d *DB) Exec(query string, args ...any) (sql.Result, error) { 37 return d.db.Exec(query, args...) 38} 39 40func (d *DB) QueryRow(query string, args ...any) *sql.Row { 41 return d.db.QueryRow(query, args...) 42} 43 44func Setup(ctx context.Context, dbPath string) (*DB, error) { 45 // https://github.com/mattn/go-sqlite3#connection-string 46 opts := []string{ 47 "_foreign_keys=1", 48 "_journal_mode=WAL", 49 "_synchronous=NORMAL", 50 "_auto_vacuum=incremental", 51 "_busy_timeout=5000", 52 } 53 54 logger := log.FromContext(ctx) 55 logger = log.SubLogger(logger, "db") 56 57 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 58 if err != nil { 59 return nil, err 60 } 61 62 conn, err := db.Conn(ctx) 63 if err != nil { 64 return nil, err 65 } 66 defer conn.Close() 67 68 _, err = conn.ExecContext(ctx, ` 69 create table if not exists known_dids ( 70 did text primary key 71 ); 72 73 create table if not exists public_keys ( 74 id integer primary key autoincrement, 75 did text not null, 76 key text not null, 77 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 78 unique(did, key), 79 foreign key (did) references known_dids(did) on delete cascade 80 ); 81 82 create table if not exists _jetstream ( 83 id integer primary key autoincrement, 84 last_time_us integer not null 85 ); 86 87 create table if not exists events ( 88 rkey text not null, 89 nsid text not null, 90 event text not null, -- json 91 created integer not null default (strftime('%s', 'now')), 92 primary key (rkey, nsid) 93 ); 94 95 create table if not exists repo_keys ( 96 repo_did text primary key, 97 signing_key blob not null, 98 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 99 ); 100 101 create table if not exists migrations ( 102 id integer primary key autoincrement, 103 name text unique 104 ); 105 `) 106 if err != nil { 107 return nil, err 108 } 109 110 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error { 111 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`) 112 return mErr 113 }); err != nil { 114 return nil, err 115 } 116 117 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error { 118 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`) 119 return mErr 120 }); err != nil { 121 return nil, err 122 } 123 124 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error { 125 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`) 126 return mErr 127 }); err != nil { 128 return nil, err 129 } 130 131 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error { 132 _, mErr := tx.ExecContext(ctx, ` 133 create table repo_keys_new ( 134 repo_did text primary key, 135 signing_key blob, 136 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 137 owner_did text, 138 repo_name text, 139 at_uri text, 140 key_type text not null default 'k256' 141 ); 142 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 143 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256' 144 from repo_keys; 145 drop table repo_keys; 146 alter table repo_keys_new rename to repo_keys; 147 create unique index if not exists idx_repo_keys_owner_repo 148 on repo_keys(owner_did, repo_name); 149 `) 150 return mErr 151 }); err != nil { 152 return nil, err 153 } 154 155 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error { 156 _, mErr := tx.ExecContext(ctx, ` 157 create table if not exists repo_aliases ( 158 owner_did text not null, 159 rkey text not null, 160 repo_did text not null, 161 rev text not null, 162 primary key (owner_did, rkey) 163 ); 164 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did); 165 166 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev) 167 select owner_did, repo_name, repo_did, '1_' || created_at 168 from repo_keys 169 where owner_did is not null and repo_name is not null and repo_did is not null; 170 `) 171 return mErr 172 }); err != nil { 173 return nil, err 174 } 175 176 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error { 177 _, mErr := tx.ExecContext(ctx, ` 178 create table repo_keys_new ( 179 repo_did text primary key, 180 signing_key blob, 181 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 182 owner_did text, 183 repo_name text, 184 key_type text not null default 'k256' 185 ); 186 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 187 select repo_did, signing_key, created_at, owner_did, repo_name, key_type 188 from repo_keys; 189 drop table repo_keys; 190 alter table repo_keys_new rename to repo_keys; 191 create unique index if not exists idx_repo_keys_owner_repo 192 on repo_keys(owner_did, repo_name); 193 `) 194 return mErr 195 }); err != nil { 196 return nil, err 197 } 198 199 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error { 200 _, mErr := tx.ExecContext(ctx, ` 201 create table if not exists knot_members ( 202 id integer primary key autoincrement, 203 did text not null, 204 rkey text not null, 205 subject text not null, 206 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 207 unique (did, rkey) 208 ); 209 `) 210 return mErr 211 }); err != nil { 212 return nil, err 213 } 214 215 if err := orm.RunMigration(conn, logger, "add-isolated-at-to-repo-keys", func(tx *sql.Tx) error { 216 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN isolated_at DATETIME`) 217 return mErr 218 }); err != nil { 219 return nil, err 220 } 221 222 if err := orm.RunMigration(conn, logger, "add-owner-uid-tables", func(tx *sql.Tx) error { 223 _, mErr := tx.ExecContext(ctx, ` 224 CREATE TABLE IF NOT EXISTS owner_uid_assignments ( 225 owner_did TEXT PRIMARY KEY, 226 uid INTEGER NOT NULL UNIQUE, 227 created_at DATETIME DEFAULT CURRENT_TIMESTAMP 228 ); 229 230 CREATE TABLE IF NOT EXISTS uid_counter ( 231 next_uid INTEGER NOT NULL DEFAULT 100000 232 ); 233 `) 234 if mErr != nil { 235 return mErr 236 } 237 // Seed the counter only if the table is empty. 238 _, mErr = tx.ExecContext(ctx, ` 239 INSERT INTO uid_counter (next_uid) 240 SELECT 100000 WHERE NOT EXISTS (SELECT 1 FROM uid_counter) 241 `) 242 return mErr 243 }); err != nil { 244 return nil, err 245 } 246 247 return &DB{ 248 db: db, 249 logger: logger, 250 }, nil 251} 252 253func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error { 254 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256") 255} 256 257func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error { 258 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web") 259} 260 261func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error { 262 tx, err := d.db.Begin() 263 if err != nil { 264 return err 265 } 266 defer tx.Rollback() 267 268 if _, err := tx.Exec( 269 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`, 270 repoDid, signingKey, ownerDid, repoName, keyType, 271 ); err != nil { 272 return err 273 } 274 275 if _, err := tx.Exec( 276 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev) 277 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 278 ON CONFLICT(owner_did, rkey) DO NOTHING`, 279 ownerDid, repoName, repoDid, 280 ); err != nil { 281 return err 282 } 283 284 return tx.Commit() 285} 286 287func (d *DB) DeleteRepoKey(repoDid string) error { 288 tx, err := d.db.Begin() 289 if err != nil { 290 return err 291 } 292 defer tx.Rollback() 293 294 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil { 295 return err 296 } 297 298 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil { 299 return err 300 } 301 302 return tx.Commit() 303} 304 305func (d *DB) RepoDidExists(repoDid string) (bool, error) { 306 var count int 307 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 308 return count > 0, err 309} 310 311func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) { 312 var repoDid string 313 err := d.db.QueryRow( 314 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`, 315 ownerDid, rkey, 316 ).Scan(&repoDid) 317 return repoDid, err 318} 319 320func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) { 321 var repoDid string 322 err := d.db.QueryRow( 323 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`, 324 ownerDid, repoName, 325 ).Scan(&repoDid) 326 return repoDid, err 327} 328 329func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) { 330 return GetRepoKeyOwner(d.db, repoDid) 331} 332 333func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) { 334 err = q.QueryRow( 335 `SELECT owner_did, rkey FROM repo_aliases 336 WHERE repo_did = ? 337 ORDER BY rev DESC 338 LIMIT 1`, 339 repoDid, 340 ).Scan(&ownerDid, &repoName) 341 if err != nil { 342 return 343 } 344 if ownerDid == "" || repoName == "" { 345 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid) 346 return 347 } 348 return 349} 350 351func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) { 352 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid) 353 if err != nil { 354 return 355 } 356 357 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid) 358 if joinErr != nil { 359 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr) 360 return 361 } 362 363 if _, statErr := os.Stat(didPath); statErr != nil { 364 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath) 365 return 366 } 367 368 repoPath = didPath 369 return 370}