Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10 "sync"
11
12 securejoin "github.com/cyphar/filepath-securejoin"
13 _ "github.com/mattn/go-sqlite3"
14 "tangled.org/core/log"
15 "tangled.org/core/orm"
16)
17
18type DB struct {
19 db *sql.DB
20 logger *slog.Logger
21
22 // uidAssignMu serialises GetOrAssignOwnerUID across goroutines so that
23 // concurrent callers don't race on the uid_counter read-modify-write.
24 uidAssignMu sync.Mutex
25}
26
27type DBTX interface {
28 QueryRow(query string, args ...any) *sql.Row
29 Exec(query string, args ...any) (sql.Result, error)
30}
31
32func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
33 return d.db.BeginTx(ctx, opts)
34}
35
36func (d *DB) Exec(query string, args ...any) (sql.Result, error) {
37 return d.db.Exec(query, args...)
38}
39
40func (d *DB) QueryRow(query string, args ...any) *sql.Row {
41 return d.db.QueryRow(query, args...)
42}
43
44func Setup(ctx context.Context, dbPath string) (*DB, error) {
45 // https://github.com/mattn/go-sqlite3#connection-string
46 opts := []string{
47 "_foreign_keys=1",
48 "_journal_mode=WAL",
49 "_synchronous=NORMAL",
50 "_auto_vacuum=incremental",
51 "_busy_timeout=5000",
52 }
53
54 logger := log.FromContext(ctx)
55 logger = log.SubLogger(logger, "db")
56
57 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
58 if err != nil {
59 return nil, err
60 }
61
62 conn, err := db.Conn(ctx)
63 if err != nil {
64 return nil, err
65 }
66 defer conn.Close()
67
68 _, err = conn.ExecContext(ctx, `
69 create table if not exists known_dids (
70 did text primary key
71 );
72
73 create table if not exists public_keys (
74 id integer primary key autoincrement,
75 did text not null,
76 key text not null,
77 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
78 unique(did, key),
79 foreign key (did) references known_dids(did) on delete cascade
80 );
81
82 create table if not exists _jetstream (
83 id integer primary key autoincrement,
84 last_time_us integer not null
85 );
86
87 create table if not exists events (
88 rkey text not null,
89 nsid text not null,
90 event text not null, -- json
91 created integer not null default (strftime('%s', 'now')),
92 primary key (rkey, nsid)
93 );
94
95 create table if not exists repo_keys (
96 repo_did text primary key,
97 signing_key blob not null,
98 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
99 );
100
101 create table if not exists migrations (
102 id integer primary key autoincrement,
103 name text unique
104 );
105 `)
106 if err != nil {
107 return nil, err
108 }
109
110 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error {
111 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
112 return mErr
113 }); err != nil {
114 return nil, err
115 }
116
117 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error {
118 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
119 return mErr
120 }); err != nil {
121 return nil, err
122 }
123
124 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error {
125 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
126 return mErr
127 }); err != nil {
128 return nil, err
129 }
130
131 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error {
132 _, mErr := tx.ExecContext(ctx, `
133 create table repo_keys_new (
134 repo_did text primary key,
135 signing_key blob,
136 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
137 owner_did text,
138 repo_name text,
139 at_uri text,
140 key_type text not null default 'k256'
141 );
142 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
143 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
144 from repo_keys;
145 drop table repo_keys;
146 alter table repo_keys_new rename to repo_keys;
147 create unique index if not exists idx_repo_keys_owner_repo
148 on repo_keys(owner_did, repo_name);
149 `)
150 return mErr
151 }); err != nil {
152 return nil, err
153 }
154
155 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error {
156 _, mErr := tx.ExecContext(ctx, `
157 create table if not exists repo_aliases (
158 owner_did text not null,
159 rkey text not null,
160 repo_did text not null,
161 rev text not null,
162 primary key (owner_did, rkey)
163 );
164 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did);
165
166 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev)
167 select owner_did, repo_name, repo_did, '1_' || created_at
168 from repo_keys
169 where owner_did is not null and repo_name is not null and repo_did is not null;
170 `)
171 return mErr
172 }); err != nil {
173 return nil, err
174 }
175
176 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error {
177 _, mErr := tx.ExecContext(ctx, `
178 create table repo_keys_new (
179 repo_did text primary key,
180 signing_key blob,
181 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
182 owner_did text,
183 repo_name text,
184 key_type text not null default 'k256'
185 );
186 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
187 select repo_did, signing_key, created_at, owner_did, repo_name, key_type
188 from repo_keys;
189 drop table repo_keys;
190 alter table repo_keys_new rename to repo_keys;
191 create unique index if not exists idx_repo_keys_owner_repo
192 on repo_keys(owner_did, repo_name);
193 `)
194 return mErr
195 }); err != nil {
196 return nil, err
197 }
198
199 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error {
200 _, mErr := tx.ExecContext(ctx, `
201 create table if not exists knot_members (
202 id integer primary key autoincrement,
203 did text not null,
204 rkey text not null,
205 subject text not null,
206 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
207 unique (did, rkey)
208 );
209 `)
210 return mErr
211 }); err != nil {
212 return nil, err
213 }
214
215 if err := orm.RunMigration(conn, logger, "add-isolated-at-to-repo-keys", func(tx *sql.Tx) error {
216 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN isolated_at DATETIME`)
217 return mErr
218 }); err != nil {
219 return nil, err
220 }
221
222 if err := orm.RunMigration(conn, logger, "add-owner-uid-tables", func(tx *sql.Tx) error {
223 _, mErr := tx.ExecContext(ctx, `
224 CREATE TABLE IF NOT EXISTS owner_uid_assignments (
225 owner_did TEXT PRIMARY KEY,
226 uid INTEGER NOT NULL UNIQUE,
227 created_at DATETIME DEFAULT CURRENT_TIMESTAMP
228 );
229
230 CREATE TABLE IF NOT EXISTS uid_counter (
231 next_uid INTEGER NOT NULL DEFAULT 100000
232 );
233 `)
234 if mErr != nil {
235 return mErr
236 }
237 // Seed the counter only if the table is empty.
238 _, mErr = tx.ExecContext(ctx, `
239 INSERT INTO uid_counter (next_uid)
240 SELECT 100000 WHERE NOT EXISTS (SELECT 1 FROM uid_counter)
241 `)
242 return mErr
243 }); err != nil {
244 return nil, err
245 }
246
247 return &DB{
248 db: db,
249 logger: logger,
250 }, nil
251}
252
253func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
254 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256")
255}
256
257func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
258 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web")
259}
260
261func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error {
262 tx, err := d.db.Begin()
263 if err != nil {
264 return err
265 }
266 defer tx.Rollback()
267
268 if _, err := tx.Exec(
269 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`,
270 repoDid, signingKey, ownerDid, repoName, keyType,
271 ); err != nil {
272 return err
273 }
274
275 if _, err := tx.Exec(
276 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev)
277 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
278 ON CONFLICT(owner_did, rkey) DO NOTHING`,
279 ownerDid, repoName, repoDid,
280 ); err != nil {
281 return err
282 }
283
284 return tx.Commit()
285}
286
287func (d *DB) DeleteRepoKey(repoDid string) error {
288 tx, err := d.db.Begin()
289 if err != nil {
290 return err
291 }
292 defer tx.Rollback()
293
294 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil {
295 return err
296 }
297
298 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil {
299 return err
300 }
301
302 return tx.Commit()
303}
304
305func (d *DB) RepoDidExists(repoDid string) (bool, error) {
306 var count int
307 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
308 return count > 0, err
309}
310
311func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
312 var repoDid string
313 err := d.db.QueryRow(
314 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`,
315 ownerDid, rkey,
316 ).Scan(&repoDid)
317 return repoDid, err
318}
319
320func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) {
321 var repoDid string
322 err := d.db.QueryRow(
323 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`,
324 ownerDid, repoName,
325 ).Scan(&repoDid)
326 return repoDid, err
327}
328
329func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) {
330 return GetRepoKeyOwner(d.db, repoDid)
331}
332
333func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) {
334 err = q.QueryRow(
335 `SELECT owner_did, rkey FROM repo_aliases
336 WHERE repo_did = ?
337 ORDER BY rev DESC
338 LIMIT 1`,
339 repoDid,
340 ).Scan(&ownerDid, &repoName)
341 if err != nil {
342 return
343 }
344 if ownerDid == "" || repoName == "" {
345 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid)
346 return
347 }
348 return
349}
350
351func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
352 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
353 if err != nil {
354 return
355 }
356
357 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
358 if joinErr != nil {
359 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr)
360 return
361 }
362
363 if _, statErr := os.Stat(didPath); statErr != nil {
364 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath)
365 return
366 }
367
368 repoPath = didPath
369 return
370}