Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10 "sync"
11
12 securejoin "github.com/cyphar/filepath-securejoin"
13 _ "github.com/mattn/go-sqlite3"
14 "tangled.org/core/log"
15 "tangled.org/core/orm"
16)
17
18type DB struct {
19 db *sql.DB
20 logger *slog.Logger
21
22 // uidAssignMu serialises GetOrAssignOwnerUID across goroutines so that
23 // concurrent callers don't race on the uid_counter read-modify-write.
24 uidAssignMu sync.Mutex
25}
26
27type DBTX interface {
28 QueryRow(query string, args ...any) *sql.Row
29 Query(query string, args ...any) (*sql.Rows, error)
30 Exec(query string, args ...any) (sql.Result, error)
31}
32
33func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
34 return d.db.BeginTx(ctx, opts)
35}
36
37func (d *DB) Exec(query string, args ...any) (sql.Result, error) {
38 return d.db.Exec(query, args...)
39}
40
41func (d *DB) QueryRow(query string, args ...any) *sql.Row {
42 return d.db.QueryRow(query, args...)
43}
44
45func (d *DB) Query(query string, args ...any) (*sql.Rows, error) {
46 return d.db.Query(query, args...)
47}
48
49func Setup(ctx context.Context, dbPath string) (*DB, error) {
50 // https://github.com/mattn/go-sqlite3#connection-string
51 opts := []string{
52 "_foreign_keys=1",
53 "_journal_mode=WAL",
54 "_synchronous=NORMAL",
55 "_auto_vacuum=incremental",
56 "_busy_timeout=5000",
57 }
58
59 logger := log.FromContext(ctx)
60 logger = log.SubLogger(logger, "db")
61
62 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
63 if err != nil {
64 return nil, err
65 }
66
67 conn, err := db.Conn(ctx)
68 if err != nil {
69 return nil, err
70 }
71 defer conn.Close()
72
73 _, err = conn.ExecContext(ctx, `
74 create table if not exists known_dids (
75 did text primary key
76 );
77
78 create table if not exists public_keys (
79 id integer primary key autoincrement,
80 did text not null,
81 key text not null,
82 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
83 unique(did, key),
84 foreign key (did) references known_dids(did) on delete cascade
85 );
86
87 create table if not exists _jetstream (
88 id integer primary key autoincrement,
89 last_time_us integer not null
90 );
91
92 create table if not exists events (
93 rkey text not null,
94 nsid text not null,
95 event text not null, -- json
96 created integer not null default (strftime('%s', 'now')),
97 primary key (rkey, nsid)
98 );
99
100 create table if not exists repo_keys (
101 repo_did text primary key,
102 signing_key blob not null,
103 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
104 );
105
106 create table if not exists migrations (
107 id integer primary key autoincrement,
108 name text unique
109 );
110 `)
111 if err != nil {
112 return nil, err
113 }
114
115 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error {
116 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
117 return mErr
118 }); err != nil {
119 return nil, err
120 }
121
122 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error {
123 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
124 return mErr
125 }); err != nil {
126 return nil, err
127 }
128
129 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error {
130 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
131 return mErr
132 }); err != nil {
133 return nil, err
134 }
135
136 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error {
137 _, mErr := tx.ExecContext(ctx, `
138 create table repo_keys_new (
139 repo_did text primary key,
140 signing_key blob,
141 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
142 owner_did text,
143 repo_name text,
144 at_uri text,
145 key_type text not null default 'k256'
146 );
147 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
148 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
149 from repo_keys;
150 drop table repo_keys;
151 alter table repo_keys_new rename to repo_keys;
152 create unique index if not exists idx_repo_keys_owner_repo
153 on repo_keys(owner_did, repo_name);
154 `)
155 return mErr
156 }); err != nil {
157 return nil, err
158 }
159
160 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error {
161 _, mErr := tx.ExecContext(ctx, `
162 create table if not exists repo_aliases (
163 owner_did text not null,
164 rkey text not null,
165 repo_did text not null,
166 rev text not null,
167 primary key (owner_did, rkey)
168 );
169 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did);
170
171 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev)
172 select owner_did, repo_name, repo_did, '1_' || created_at
173 from repo_keys
174 where owner_did is not null and repo_name is not null and repo_did is not null;
175 `)
176 return mErr
177 }); err != nil {
178 return nil, err
179 }
180
181 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error {
182 _, mErr := tx.ExecContext(ctx, `
183 create table repo_keys_new (
184 repo_did text primary key,
185 signing_key blob,
186 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
187 owner_did text,
188 repo_name text,
189 key_type text not null default 'k256'
190 );
191 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
192 select repo_did, signing_key, created_at, owner_did, repo_name, key_type
193 from repo_keys;
194 drop table repo_keys;
195 alter table repo_keys_new rename to repo_keys;
196 create unique index if not exists idx_repo_keys_owner_repo
197 on repo_keys(owner_did, repo_name);
198 `)
199 return mErr
200 }); err != nil {
201 return nil, err
202 }
203
204 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error {
205 _, mErr := tx.ExecContext(ctx, `
206 create table if not exists knot_members (
207 id integer primary key autoincrement,
208 did text not null,
209 rkey text not null,
210 subject text not null,
211 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
212 unique (did, rkey)
213 );
214 `)
215 return mErr
216 }); err != nil {
217 return nil, err
218 }
219
220 if err := orm.RunMigration(conn, logger, "add-isolated-at-to-repo-keys", func(tx *sql.Tx) error {
221 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN isolated_at DATETIME`)
222 return mErr
223 }); err != nil {
224 return nil, err
225 }
226
227 if err := orm.RunMigration(conn, logger, "add-owner-uid-tables", func(tx *sql.Tx) error {
228 _, mErr := tx.ExecContext(ctx, `
229 CREATE TABLE IF NOT EXISTS owner_uid_assignments (
230 owner_did TEXT PRIMARY KEY,
231 uid INTEGER NOT NULL UNIQUE,
232 created_at DATETIME DEFAULT CURRENT_TIMESTAMP
233 );
234
235 CREATE TABLE IF NOT EXISTS uid_counter (
236 next_uid INTEGER NOT NULL DEFAULT 100000
237 );
238 `)
239 if mErr != nil {
240 return mErr
241 }
242 // Seed the counter only if the table is empty.
243 _, mErr = tx.ExecContext(ctx, `
244 INSERT INTO uid_counter (next_uid)
245 SELECT 100000 WHERE NOT EXISTS (SELECT 1 FROM uid_counter)
246 `)
247 return mErr
248 }); err != nil {
249 return nil, err
250 }
251
252 if err := orm.RunMigration(conn, logger, "knot-members-nullable-rkey", func(tx *sql.Tx) error {
253 _, mErr := tx.ExecContext(ctx, `
254 create table knot_members_new (
255 id integer primary key autoincrement,
256 did text not null,
257 rkey text,
258 subject text not null,
259 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
260 unique (did, rkey)
261 );
262 insert into knot_members_new (id, did, rkey, subject, created)
263 select id, did, rkey, subject, created from knot_members;
264 drop table knot_members;
265 alter table knot_members_new rename to knot_members;
266 `)
267 return mErr
268 }); err != nil {
269 return nil, err
270 }
271
272 if err := orm.RunMigration(conn, logger, "create-collaborators", func(tx *sql.Tx) error {
273 _, mErr := tx.ExecContext(ctx, `
274 create table if not exists collaborators (
275 id integer primary key autoincrement,
276 repo_did text not null,
277 subject_did text not null,
278 added_by_did text not null,
279 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
280 unique (repo_did, subject_did)
281 );
282 create index if not exists idx_collaborators_repo_id
283 on collaborators(repo_did, id);
284 `)
285 return mErr
286 }); err != nil {
287 return nil, err
288 }
289
290 if err := orm.RunMigration(conn, logger, "knot-members-direct-subject-unique", func(tx *sql.Tx) error {
291 _, mErr := tx.ExecContext(ctx, `
292 create unique index if not exists idx_knot_members_direct_subject
293 on knot_members(subject) where rkey is null;
294 create index if not exists idx_knot_members_subject
295 on knot_members(subject);
296 `)
297 return mErr
298 }); err != nil {
299 return nil, err
300 }
301
302 if err := orm.RunMigration(conn, logger, "add-events-created-index", func(tx *sql.Tx) error {
303 _, mErr := tx.ExecContext(ctx, `create index if not exists idx_events_created on events(created)`)
304 return mErr
305 }); err != nil {
306 return nil, err
307 }
308
309 return &DB{
310 db: db,
311 logger: logger,
312 }, nil
313}
314
315func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
316 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256")
317}
318
319func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
320 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web")
321}
322
323func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error {
324 tx, err := d.db.Begin()
325 if err != nil {
326 return err
327 }
328 defer tx.Rollback()
329
330 if _, err := tx.Exec(
331 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`,
332 repoDid, signingKey, ownerDid, repoName, keyType,
333 ); err != nil {
334 return err
335 }
336
337 if _, err := tx.Exec(
338 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev)
339 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
340 ON CONFLICT(owner_did, rkey) DO NOTHING`,
341 ownerDid, repoName, repoDid,
342 ); err != nil {
343 return err
344 }
345
346 return tx.Commit()
347}
348
349func (d *DB) DeleteRepoKey(repoDid string) error {
350 tx, err := d.db.Begin()
351 if err != nil {
352 return err
353 }
354 defer tx.Rollback()
355
356 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil {
357 return err
358 }
359
360 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil {
361 return err
362 }
363
364 if _, err := tx.Exec(`DELETE FROM collaborators WHERE repo_did = ?`, repoDid); err != nil {
365 return err
366 }
367
368 return tx.Commit()
369}
370
371func (d *DB) RepoDidExists(repoDid string) (bool, error) {
372 var count int
373 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
374 return count > 0, err
375}
376
377func (d *DB) ListRepoDids() ([]string, error) {
378 rows, err := d.db.Query(`SELECT repo_did FROM repo_keys`)
379 if err != nil {
380 return nil, err
381 }
382 defer rows.Close()
383
384 dids := []string{}
385 for rows.Next() {
386 var did string
387 if err := rows.Scan(&did); err != nil {
388 return nil, err
389 }
390 dids = append(dids, did)
391 }
392 return dids, rows.Err()
393}
394
395func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
396 var repoDid string
397 err := d.db.QueryRow(
398 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`,
399 ownerDid, rkey,
400 ).Scan(&repoDid)
401 return repoDid, err
402}
403
404func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) {
405 var repoDid string
406 err := d.db.QueryRow(
407 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`,
408 ownerDid, repoName,
409 ).Scan(&repoDid)
410 return repoDid, err
411}
412
413func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) {
414 return GetRepoKeyOwner(d.db, repoDid)
415}
416
417func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) {
418 err = q.QueryRow(
419 `SELECT owner_did, rkey FROM repo_aliases
420 WHERE repo_did = ?
421 ORDER BY rev DESC
422 LIMIT 1`,
423 repoDid,
424 ).Scan(&ownerDid, &repoName)
425 if err != nil {
426 return
427 }
428 if ownerDid == "" || repoName == "" {
429 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid)
430 return
431 }
432 return
433}
434
435func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
436 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
437 if err != nil {
438 return
439 }
440
441 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
442 if joinErr != nil {
443 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr)
444 return
445 }
446
447 if _, statErr := os.Stat(didPath); statErr != nil {
448 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath)
449 return
450 }
451
452 repoPath = didPath
453 return
454}