Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "os" 9 "strings" 10 "sync" 11 12 securejoin "github.com/cyphar/filepath-securejoin" 13 _ "github.com/mattn/go-sqlite3" 14 "tangled.org/core/log" 15 "tangled.org/core/orm" 16) 17 18type DB struct { 19 db *sql.DB 20 logger *slog.Logger 21 22 // uidAssignMu serialises GetOrAssignOwnerUID across goroutines so that 23 // concurrent callers don't race on the uid_counter read-modify-write. 24 uidAssignMu sync.Mutex 25} 26 27type DBTX interface { 28 QueryRow(query string, args ...any) *sql.Row 29 Query(query string, args ...any) (*sql.Rows, error) 30 Exec(query string, args ...any) (sql.Result, error) 31} 32 33func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { 34 return d.db.BeginTx(ctx, opts) 35} 36 37func (d *DB) Exec(query string, args ...any) (sql.Result, error) { 38 return d.db.Exec(query, args...) 39} 40 41func (d *DB) QueryRow(query string, args ...any) *sql.Row { 42 return d.db.QueryRow(query, args...) 43} 44 45func (d *DB) Query(query string, args ...any) (*sql.Rows, error) { 46 return d.db.Query(query, args...) 47} 48 49func Setup(ctx context.Context, dbPath string) (*DB, error) { 50 // https://github.com/mattn/go-sqlite3#connection-string 51 opts := []string{ 52 "_foreign_keys=1", 53 "_journal_mode=WAL", 54 "_synchronous=NORMAL", 55 "_auto_vacuum=incremental", 56 "_busy_timeout=5000", 57 } 58 59 logger := log.FromContext(ctx) 60 logger = log.SubLogger(logger, "db") 61 62 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 63 if err != nil { 64 return nil, err 65 } 66 67 conn, err := db.Conn(ctx) 68 if err != nil { 69 return nil, err 70 } 71 defer conn.Close() 72 73 _, err = conn.ExecContext(ctx, ` 74 create table if not exists known_dids ( 75 did text primary key 76 ); 77 78 create table if not exists public_keys ( 79 id integer primary key autoincrement, 80 did text not null, 81 key text not null, 82 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 83 unique(did, key), 84 foreign key (did) references known_dids(did) on delete cascade 85 ); 86 87 create table if not exists _jetstream ( 88 id integer primary key autoincrement, 89 last_time_us integer not null 90 ); 91 92 create table if not exists events ( 93 rkey text not null, 94 nsid text not null, 95 event text not null, -- json 96 created integer not null default (strftime('%s', 'now')), 97 primary key (rkey, nsid) 98 ); 99 100 create table if not exists repo_keys ( 101 repo_did text primary key, 102 signing_key blob not null, 103 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 104 ); 105 106 create table if not exists migrations ( 107 id integer primary key autoincrement, 108 name text unique 109 ); 110 `) 111 if err != nil { 112 return nil, err 113 } 114 115 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error { 116 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`) 117 return mErr 118 }); err != nil { 119 return nil, err 120 } 121 122 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error { 123 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`) 124 return mErr 125 }); err != nil { 126 return nil, err 127 } 128 129 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error { 130 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`) 131 return mErr 132 }); err != nil { 133 return nil, err 134 } 135 136 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error { 137 _, mErr := tx.ExecContext(ctx, ` 138 create table repo_keys_new ( 139 repo_did text primary key, 140 signing_key blob, 141 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 142 owner_did text, 143 repo_name text, 144 at_uri text, 145 key_type text not null default 'k256' 146 ); 147 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 148 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256' 149 from repo_keys; 150 drop table repo_keys; 151 alter table repo_keys_new rename to repo_keys; 152 create unique index if not exists idx_repo_keys_owner_repo 153 on repo_keys(owner_did, repo_name); 154 `) 155 return mErr 156 }); err != nil { 157 return nil, err 158 } 159 160 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error { 161 _, mErr := tx.ExecContext(ctx, ` 162 create table if not exists repo_aliases ( 163 owner_did text not null, 164 rkey text not null, 165 repo_did text not null, 166 rev text not null, 167 primary key (owner_did, rkey) 168 ); 169 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did); 170 171 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev) 172 select owner_did, repo_name, repo_did, '1_' || created_at 173 from repo_keys 174 where owner_did is not null and repo_name is not null and repo_did is not null; 175 `) 176 return mErr 177 }); err != nil { 178 return nil, err 179 } 180 181 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error { 182 _, mErr := tx.ExecContext(ctx, ` 183 create table repo_keys_new ( 184 repo_did text primary key, 185 signing_key blob, 186 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 187 owner_did text, 188 repo_name text, 189 key_type text not null default 'k256' 190 ); 191 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 192 select repo_did, signing_key, created_at, owner_did, repo_name, key_type 193 from repo_keys; 194 drop table repo_keys; 195 alter table repo_keys_new rename to repo_keys; 196 create unique index if not exists idx_repo_keys_owner_repo 197 on repo_keys(owner_did, repo_name); 198 `) 199 return mErr 200 }); err != nil { 201 return nil, err 202 } 203 204 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error { 205 _, mErr := tx.ExecContext(ctx, ` 206 create table if not exists knot_members ( 207 id integer primary key autoincrement, 208 did text not null, 209 rkey text not null, 210 subject text not null, 211 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 212 unique (did, rkey) 213 ); 214 `) 215 return mErr 216 }); err != nil { 217 return nil, err 218 } 219 220 if err := orm.RunMigration(conn, logger, "add-isolated-at-to-repo-keys", func(tx *sql.Tx) error { 221 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN isolated_at DATETIME`) 222 return mErr 223 }); err != nil { 224 return nil, err 225 } 226 227 if err := orm.RunMigration(conn, logger, "add-owner-uid-tables", func(tx *sql.Tx) error { 228 _, mErr := tx.ExecContext(ctx, ` 229 CREATE TABLE IF NOT EXISTS owner_uid_assignments ( 230 owner_did TEXT PRIMARY KEY, 231 uid INTEGER NOT NULL UNIQUE, 232 created_at DATETIME DEFAULT CURRENT_TIMESTAMP 233 ); 234 235 CREATE TABLE IF NOT EXISTS uid_counter ( 236 next_uid INTEGER NOT NULL DEFAULT 100000 237 ); 238 `) 239 if mErr != nil { 240 return mErr 241 } 242 // Seed the counter only if the table is empty. 243 _, mErr = tx.ExecContext(ctx, ` 244 INSERT INTO uid_counter (next_uid) 245 SELECT 100000 WHERE NOT EXISTS (SELECT 1 FROM uid_counter) 246 `) 247 return mErr 248 }); err != nil { 249 return nil, err 250 } 251 252 if err := orm.RunMigration(conn, logger, "knot-members-nullable-rkey", func(tx *sql.Tx) error { 253 _, mErr := tx.ExecContext(ctx, ` 254 create table knot_members_new ( 255 id integer primary key autoincrement, 256 did text not null, 257 rkey text, 258 subject text not null, 259 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 260 unique (did, rkey) 261 ); 262 insert into knot_members_new (id, did, rkey, subject, created) 263 select id, did, rkey, subject, created from knot_members; 264 drop table knot_members; 265 alter table knot_members_new rename to knot_members; 266 `) 267 return mErr 268 }); err != nil { 269 return nil, err 270 } 271 272 if err := orm.RunMigration(conn, logger, "create-collaborators", func(tx *sql.Tx) error { 273 _, mErr := tx.ExecContext(ctx, ` 274 create table if not exists collaborators ( 275 id integer primary key autoincrement, 276 repo_did text not null, 277 subject_did text not null, 278 added_by_did text not null, 279 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 280 unique (repo_did, subject_did) 281 ); 282 create index if not exists idx_collaborators_repo_id 283 on collaborators(repo_did, id); 284 `) 285 return mErr 286 }); err != nil { 287 return nil, err 288 } 289 290 if err := orm.RunMigration(conn, logger, "knot-members-direct-subject-unique", func(tx *sql.Tx) error { 291 _, mErr := tx.ExecContext(ctx, ` 292 create unique index if not exists idx_knot_members_direct_subject 293 on knot_members(subject) where rkey is null; 294 create index if not exists idx_knot_members_subject 295 on knot_members(subject); 296 `) 297 return mErr 298 }); err != nil { 299 return nil, err 300 } 301 302 if err := orm.RunMigration(conn, logger, "add-events-created-index", func(tx *sql.Tx) error { 303 _, mErr := tx.ExecContext(ctx, `create index if not exists idx_events_created on events(created)`) 304 return mErr 305 }); err != nil { 306 return nil, err 307 } 308 309 return &DB{ 310 db: db, 311 logger: logger, 312 }, nil 313} 314 315func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error { 316 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256") 317} 318 319func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error { 320 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web") 321} 322 323func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error { 324 tx, err := d.db.Begin() 325 if err != nil { 326 return err 327 } 328 defer tx.Rollback() 329 330 if _, err := tx.Exec( 331 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`, 332 repoDid, signingKey, ownerDid, repoName, keyType, 333 ); err != nil { 334 return err 335 } 336 337 if _, err := tx.Exec( 338 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev) 339 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 340 ON CONFLICT(owner_did, rkey) DO NOTHING`, 341 ownerDid, repoName, repoDid, 342 ); err != nil { 343 return err 344 } 345 346 return tx.Commit() 347} 348 349func (d *DB) DeleteRepoKey(repoDid string) error { 350 tx, err := d.db.Begin() 351 if err != nil { 352 return err 353 } 354 defer tx.Rollback() 355 356 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil { 357 return err 358 } 359 360 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil { 361 return err 362 } 363 364 if _, err := tx.Exec(`DELETE FROM collaborators WHERE repo_did = ?`, repoDid); err != nil { 365 return err 366 } 367 368 return tx.Commit() 369} 370 371func (d *DB) RepoDidExists(repoDid string) (bool, error) { 372 var count int 373 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 374 return count > 0, err 375} 376 377func (d *DB) ListRepoDids() ([]string, error) { 378 rows, err := d.db.Query(`SELECT repo_did FROM repo_keys`) 379 if err != nil { 380 return nil, err 381 } 382 defer rows.Close() 383 384 dids := []string{} 385 for rows.Next() { 386 var did string 387 if err := rows.Scan(&did); err != nil { 388 return nil, err 389 } 390 dids = append(dids, did) 391 } 392 return dids, rows.Err() 393} 394 395func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) { 396 var repoDid string 397 err := d.db.QueryRow( 398 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`, 399 ownerDid, rkey, 400 ).Scan(&repoDid) 401 return repoDid, err 402} 403 404func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) { 405 var repoDid string 406 err := d.db.QueryRow( 407 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`, 408 ownerDid, repoName, 409 ).Scan(&repoDid) 410 return repoDid, err 411} 412 413func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) { 414 return GetRepoKeyOwner(d.db, repoDid) 415} 416 417func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) { 418 err = q.QueryRow( 419 `SELECT owner_did, rkey FROM repo_aliases 420 WHERE repo_did = ? 421 ORDER BY rev DESC 422 LIMIT 1`, 423 repoDid, 424 ).Scan(&ownerDid, &repoName) 425 if err != nil { 426 return 427 } 428 if ownerDid == "" || repoName == "" { 429 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid) 430 return 431 } 432 return 433} 434 435func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) { 436 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid) 437 if err != nil { 438 return 439 } 440 441 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid) 442 if joinErr != nil { 443 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr) 444 return 445 } 446 447 if _, statErr := os.Stat(didPath); statErr != nil { 448 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath) 449 return 450 } 451 452 repoPath = didPath 453 return 454}