Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10 "sync"
11
12 securejoin "github.com/cyphar/filepath-securejoin"
13 _ "github.com/mattn/go-sqlite3"
14 "tangled.org/core/log"
15 "tangled.org/core/orm"
16)
17
18type DB struct {
19 db *sql.DB
20 logger *slog.Logger
21
22 // uidAssignMu serialises GetOrAssignOwnerUID across goroutines so that
23 // concurrent callers don't race on the uid_counter read-modify-write.
24 uidAssignMu sync.Mutex
25}
26
27type DBTX interface {
28 QueryRow(query string, args ...any) *sql.Row
29 Query(query string, args ...any) (*sql.Rows, error)
30 Exec(query string, args ...any) (sql.Result, error)
31}
32
33func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
34 return d.db.BeginTx(ctx, opts)
35}
36
37func (d *DB) Exec(query string, args ...any) (sql.Result, error) {
38 return d.db.Exec(query, args...)
39}
40
41func (d *DB) QueryRow(query string, args ...any) *sql.Row {
42 return d.db.QueryRow(query, args...)
43}
44
45func (d *DB) Query(query string, args ...any) (*sql.Rows, error) {
46 return d.db.Query(query, args...)
47}
48
49func Setup(ctx context.Context, dbPath string) (*DB, error) {
50 // https://github.com/mattn/go-sqlite3#connection-string
51 opts := []string{
52 "_foreign_keys=1",
53 "_journal_mode=WAL",
54 "_synchronous=NORMAL",
55 "_auto_vacuum=incremental",
56 "_busy_timeout=5000",
57 }
58
59 logger := log.FromContext(ctx)
60 logger = log.SubLogger(logger, "db")
61
62 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
63 if err != nil {
64 return nil, err
65 }
66
67 conn, err := db.Conn(ctx)
68 if err != nil {
69 return nil, err
70 }
71 defer conn.Close()
72
73 _, err = conn.ExecContext(ctx, `
74 create table if not exists known_dids (
75 did text primary key
76 );
77
78 create table if not exists public_keys (
79 id integer primary key autoincrement,
80 did text not null,
81 key text not null,
82 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
83 unique(did, key),
84 foreign key (did) references known_dids(did) on delete cascade
85 );
86
87 create table if not exists _jetstream (
88 id integer primary key autoincrement,
89 last_time_us integer not null
90 );
91
92 create table if not exists events (
93 rkey text not null,
94 nsid text not null,
95 event text not null, -- json
96 created integer not null default (strftime('%s', 'now')),
97 primary key (rkey, nsid)
98 );
99
100 create table if not exists repo_keys (
101 repo_did text primary key,
102 signing_key blob not null,
103 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
104 );
105
106 create table if not exists migrations (
107 id integer primary key autoincrement,
108 name text unique
109 );
110 `)
111 if err != nil {
112 return nil, err
113 }
114
115 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error {
116 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
117 return mErr
118 }); err != nil {
119 return nil, err
120 }
121
122 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error {
123 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
124 return mErr
125 }); err != nil {
126 return nil, err
127 }
128
129 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error {
130 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
131 return mErr
132 }); err != nil {
133 return nil, err
134 }
135
136 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error {
137 _, mErr := tx.ExecContext(ctx, `
138 create table repo_keys_new (
139 repo_did text primary key,
140 signing_key blob,
141 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
142 owner_did text,
143 repo_name text,
144 at_uri text,
145 key_type text not null default 'k256'
146 );
147 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
148 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
149 from repo_keys;
150 drop table repo_keys;
151 alter table repo_keys_new rename to repo_keys;
152 create unique index if not exists idx_repo_keys_owner_repo
153 on repo_keys(owner_did, repo_name);
154 `)
155 return mErr
156 }); err != nil {
157 return nil, err
158 }
159
160 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error {
161 _, mErr := tx.ExecContext(ctx, `
162 create table if not exists repo_aliases (
163 owner_did text not null,
164 rkey text not null,
165 repo_did text not null,
166 rev text not null,
167 primary key (owner_did, rkey)
168 );
169 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did);
170
171 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev)
172 select owner_did, repo_name, repo_did, '1_' || created_at
173 from repo_keys
174 where owner_did is not null and repo_name is not null and repo_did is not null;
175 `)
176 return mErr
177 }); err != nil {
178 return nil, err
179 }
180
181 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error {
182 _, mErr := tx.ExecContext(ctx, `
183 create table repo_keys_new (
184 repo_did text primary key,
185 signing_key blob,
186 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
187 owner_did text,
188 repo_name text,
189 key_type text not null default 'k256'
190 );
191 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
192 select repo_did, signing_key, created_at, owner_did, repo_name, key_type
193 from repo_keys;
194 drop table repo_keys;
195 alter table repo_keys_new rename to repo_keys;
196 create unique index if not exists idx_repo_keys_owner_repo
197 on repo_keys(owner_did, repo_name);
198 `)
199 return mErr
200 }); err != nil {
201 return nil, err
202 }
203
204 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error {
205 _, mErr := tx.ExecContext(ctx, `
206 create table if not exists knot_members (
207 id integer primary key autoincrement,
208 did text not null,
209 rkey text not null,
210 subject text not null,
211 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
212 unique (did, rkey)
213 );
214 `)
215 return mErr
216 }); err != nil {
217 return nil, err
218 }
219
220 if err := orm.RunMigration(conn, logger, "add-isolated-at-to-repo-keys", func(tx *sql.Tx) error {
221 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN isolated_at DATETIME`)
222 return mErr
223 }); err != nil {
224 return nil, err
225 }
226
227 if err := orm.RunMigration(conn, logger, "add-owner-uid-tables", func(tx *sql.Tx) error {
228 _, mErr := tx.ExecContext(ctx, `
229 CREATE TABLE IF NOT EXISTS owner_uid_assignments (
230 owner_did TEXT PRIMARY KEY,
231 uid INTEGER NOT NULL UNIQUE,
232 created_at DATETIME DEFAULT CURRENT_TIMESTAMP
233 );
234
235 CREATE TABLE IF NOT EXISTS uid_counter (
236 next_uid INTEGER NOT NULL DEFAULT 100000
237 );
238 `)
239 if mErr != nil {
240 return mErr
241 }
242 // Seed the counter only if the table is empty.
243 _, mErr = tx.ExecContext(ctx, `
244 INSERT INTO uid_counter (next_uid)
245 SELECT 100000 WHERE NOT EXISTS (SELECT 1 FROM uid_counter)
246 `)
247 return mErr
248 }); err != nil {
249 return nil, err
250 }
251
252 if err := orm.RunMigration(conn, logger, "knot-members-nullable-rkey", func(tx *sql.Tx) error {
253 _, mErr := tx.ExecContext(ctx, `
254 create table knot_members_new (
255 id integer primary key autoincrement,
256 did text not null,
257 rkey text,
258 subject text not null,
259 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
260 unique (did, rkey)
261 );
262 insert into knot_members_new (id, did, rkey, subject, created)
263 select id, did, rkey, subject, created from knot_members;
264 drop table knot_members;
265 alter table knot_members_new rename to knot_members;
266 `)
267 return mErr
268 }); err != nil {
269 return nil, err
270 }
271
272 if err := orm.RunMigration(conn, logger, "create-collaborators", func(tx *sql.Tx) error {
273 _, mErr := tx.ExecContext(ctx, `
274 create table if not exists collaborators (
275 id integer primary key autoincrement,
276 repo_did text not null,
277 subject_did text not null,
278 added_by_did text not null,
279 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
280 unique (repo_did, subject_did)
281 );
282 create index if not exists idx_collaborators_repo_id
283 on collaborators(repo_did, id);
284 `)
285 return mErr
286 }); err != nil {
287 return nil, err
288 }
289
290 if err := orm.RunMigration(conn, logger, "knot-members-direct-subject-unique", func(tx *sql.Tx) error {
291 _, mErr := tx.ExecContext(ctx, `
292 create unique index if not exists idx_knot_members_direct_subject
293 on knot_members(subject) where rkey is null;
294 create index if not exists idx_knot_members_subject
295 on knot_members(subject);
296 `)
297 return mErr
298 }); err != nil {
299 return nil, err
300 }
301
302 if err := orm.RunMigration(conn, logger, "add-events-created-index", func(tx *sql.Tx) error {
303 _, mErr := tx.ExecContext(ctx, `create index if not exists idx_events_created on events(created)`)
304 return mErr
305 }); err != nil {
306 return nil, err
307 }
308
309 if err := orm.RunMigration(conn, logger, "add-rkey-to-public-keys", func(tx *sql.Tx) error {
310 _, mErr := tx.ExecContext(ctx, `ALTER TABLE public_keys ADD COLUMN rkey TEXT`)
311 return mErr
312 }); err != nil {
313 return nil, err
314 }
315
316 if err := orm.RunMigration(conn, logger, "enforce-global-key-uniqueness", func(tx *sql.Tx) error {
317 res, mErr := tx.ExecContext(ctx, `delete from public_keys where id not in (select min(id) from public_keys group by key)`)
318 if mErr != nil {
319 return mErr
320 }
321 if n, rErr := res.RowsAffected(); rErr == nil && n > 0 {
322 logger.Warn("dropped duplicate public keys to enforce global key uniqueness", "deleted", n)
323 }
324 _, mErr = tx.ExecContext(ctx, `create unique index if not exists idx_public_keys_key on public_keys(key)`)
325 return mErr
326 }); err != nil {
327 return nil, err
328 }
329
330 return &DB{
331 db: db,
332 logger: logger,
333 }, nil
334}
335
336func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
337 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256")
338}
339
340func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
341 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web")
342}
343
344func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error {
345 tx, err := d.db.Begin()
346 if err != nil {
347 return err
348 }
349 defer tx.Rollback()
350
351 if _, err := tx.Exec(
352 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`,
353 repoDid, signingKey, ownerDid, repoName, keyType,
354 ); err != nil {
355 return err
356 }
357
358 if _, err := tx.Exec(
359 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev)
360 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
361 ON CONFLICT(owner_did, rkey) DO NOTHING`,
362 ownerDid, repoName, repoDid,
363 ); err != nil {
364 return err
365 }
366
367 return tx.Commit()
368}
369
370func (d *DB) DeleteRepoKey(repoDid string) error {
371 tx, err := d.db.Begin()
372 if err != nil {
373 return err
374 }
375 defer tx.Rollback()
376
377 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil {
378 return err
379 }
380
381 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil {
382 return err
383 }
384
385 if _, err := tx.Exec(`DELETE FROM collaborators WHERE repo_did = ?`, repoDid); err != nil {
386 return err
387 }
388
389 return tx.Commit()
390}
391
392func (d *DB) RepoDidExists(repoDid string) (bool, error) {
393 var count int
394 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
395 return count > 0, err
396}
397
398func (d *DB) ListRepoDids() ([]string, error) {
399 rows, err := d.db.Query(`SELECT repo_did FROM repo_keys`)
400 if err != nil {
401 return nil, err
402 }
403 defer rows.Close()
404
405 dids := []string{}
406 for rows.Next() {
407 var did string
408 if err := rows.Scan(&did); err != nil {
409 return nil, err
410 }
411 dids = append(dids, did)
412 }
413 return dids, rows.Err()
414}
415
416func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
417 var repoDid string
418 err := d.db.QueryRow(
419 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`,
420 ownerDid, rkey,
421 ).Scan(&repoDid)
422 return repoDid, err
423}
424
425func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) {
426 var repoDid string
427 err := d.db.QueryRow(
428 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`,
429 ownerDid, repoName,
430 ).Scan(&repoDid)
431 return repoDid, err
432}
433
434func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) {
435 return GetRepoKeyOwner(d.db, repoDid)
436}
437
438func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) {
439 err = q.QueryRow(
440 `SELECT owner_did, rkey FROM repo_aliases
441 WHERE repo_did = ?
442 ORDER BY rev DESC
443 LIMIT 1`,
444 repoDid,
445 ).Scan(&ownerDid, &repoName)
446 if err != nil {
447 return
448 }
449 if ownerDid == "" || repoName == "" {
450 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid)
451 return
452 }
453 return
454}
455
456func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
457 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
458 if err != nil {
459 return
460 }
461
462 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
463 if joinErr != nil {
464 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr)
465 return
466 }
467
468 if _, statErr := os.Stat(didPath); statErr != nil {
469 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath)
470 return
471 }
472
473 repoPath = didPath
474 return
475}