Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10
11 securejoin "github.com/cyphar/filepath-securejoin"
12 _ "github.com/mattn/go-sqlite3"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15)
16
17type DB struct {
18 db *sql.DB
19 logger *slog.Logger
20}
21
22type DBTX interface {
23 QueryRow(query string, args ...any) *sql.Row
24 Exec(query string, args ...any) (sql.Result, error)
25}
26
27func (d *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
28 return d.db.BeginTx(ctx, opts)
29}
30
31func (d *DB) Exec(query string, args ...any) (sql.Result, error) {
32 return d.db.Exec(query, args...)
33}
34
35func (d *DB) QueryRow(query string, args ...any) *sql.Row {
36 return d.db.QueryRow(query, args...)
37}
38
39func Setup(ctx context.Context, dbPath string) (*DB, error) {
40 // https://github.com/mattn/go-sqlite3#connection-string
41 opts := []string{
42 "_foreign_keys=1",
43 "_journal_mode=WAL",
44 "_synchronous=NORMAL",
45 "_auto_vacuum=incremental",
46 "_busy_timeout=5000",
47 }
48
49 logger := log.FromContext(ctx)
50 logger = log.SubLogger(logger, "db")
51
52 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
53 if err != nil {
54 return nil, err
55 }
56
57 conn, err := db.Conn(ctx)
58 if err != nil {
59 return nil, err
60 }
61 defer conn.Close()
62
63 _, err = conn.ExecContext(ctx, `
64 create table if not exists known_dids (
65 did text primary key
66 );
67
68 create table if not exists public_keys (
69 id integer primary key autoincrement,
70 did text not null,
71 key text not null,
72 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
73 unique(did, key),
74 foreign key (did) references known_dids(did) on delete cascade
75 );
76
77 create table if not exists _jetstream (
78 id integer primary key autoincrement,
79 last_time_us integer not null
80 );
81
82 create table if not exists events (
83 rkey text not null,
84 nsid text not null,
85 event text not null, -- json
86 created integer not null default (strftime('%s', 'now')),
87 primary key (rkey, nsid)
88 );
89
90 create table if not exists repo_keys (
91 repo_did text primary key,
92 signing_key blob not null,
93 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
94 );
95
96 create table if not exists migrations (
97 id integer primary key autoincrement,
98 name text unique
99 );
100 `)
101 if err != nil {
102 return nil, err
103 }
104
105 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error {
106 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
107 return mErr
108 }); err != nil {
109 return nil, err
110 }
111
112 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error {
113 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
114 return mErr
115 }); err != nil {
116 return nil, err
117 }
118
119 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error {
120 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
121 return mErr
122 }); err != nil {
123 return nil, err
124 }
125
126 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error {
127 _, mErr := tx.ExecContext(ctx, `
128 create table repo_keys_new (
129 repo_did text primary key,
130 signing_key blob,
131 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
132 owner_did text,
133 repo_name text,
134 at_uri text,
135 key_type text not null default 'k256'
136 );
137 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
138 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
139 from repo_keys;
140 drop table repo_keys;
141 alter table repo_keys_new rename to repo_keys;
142 create unique index if not exists idx_repo_keys_owner_repo
143 on repo_keys(owner_did, repo_name);
144 `)
145 return mErr
146 }); err != nil {
147 return nil, err
148 }
149
150 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error {
151 _, mErr := tx.ExecContext(ctx, `
152 create table if not exists repo_aliases (
153 owner_did text not null,
154 rkey text not null,
155 repo_did text not null,
156 rev text not null,
157 primary key (owner_did, rkey)
158 );
159 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did);
160
161 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev)
162 select owner_did, repo_name, repo_did, '1_' || created_at
163 from repo_keys
164 where owner_did is not null and repo_name is not null and repo_did is not null;
165 `)
166 return mErr
167 }); err != nil {
168 return nil, err
169 }
170
171 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error {
172 _, mErr := tx.ExecContext(ctx, `
173 create table repo_keys_new (
174 repo_did text primary key,
175 signing_key blob,
176 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
177 owner_did text,
178 repo_name text,
179 key_type text not null default 'k256'
180 );
181 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
182 select repo_did, signing_key, created_at, owner_did, repo_name, key_type
183 from repo_keys;
184 drop table repo_keys;
185 alter table repo_keys_new rename to repo_keys;
186 create unique index if not exists idx_repo_keys_owner_repo
187 on repo_keys(owner_did, repo_name);
188 `)
189 return mErr
190 }); err != nil {
191 return nil, err
192 }
193
194 if err := orm.RunMigration(conn, logger, "create-knot-members", func(tx *sql.Tx) error {
195 _, mErr := tx.ExecContext(ctx, `
196 create table if not exists knot_members (
197 id integer primary key autoincrement,
198 did text not null,
199 rkey text not null,
200 subject text not null,
201 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
202 unique (did, rkey)
203 );
204 `)
205 return mErr
206 }); err != nil {
207 return nil, err
208 }
209
210 return &DB{
211 db: db,
212 logger: logger,
213 }, nil
214}
215
216func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
217 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256")
218}
219
220func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
221 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web")
222}
223
224func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error {
225 tx, err := d.db.Begin()
226 if err != nil {
227 return err
228 }
229 defer tx.Rollback()
230
231 if _, err := tx.Exec(
232 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`,
233 repoDid, signingKey, ownerDid, repoName, keyType,
234 ); err != nil {
235 return err
236 }
237
238 if _, err := tx.Exec(
239 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev)
240 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
241 ON CONFLICT(owner_did, rkey) DO NOTHING`,
242 ownerDid, repoName, repoDid,
243 ); err != nil {
244 return err
245 }
246
247 return tx.Commit()
248}
249
250func (d *DB) DeleteRepoKey(repoDid string) error {
251 tx, err := d.db.Begin()
252 if err != nil {
253 return err
254 }
255 defer tx.Rollback()
256
257 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil {
258 return err
259 }
260
261 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil {
262 return err
263 }
264
265 return tx.Commit()
266}
267
268func (d *DB) RepoDidExists(repoDid string) (bool, error) {
269 var count int
270 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
271 return count > 0, err
272}
273
274func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
275 var repoDid string
276 err := d.db.QueryRow(
277 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`,
278 ownerDid, rkey,
279 ).Scan(&repoDid)
280 return repoDid, err
281}
282
283func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) {
284 var repoDid string
285 err := d.db.QueryRow(
286 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`,
287 ownerDid, repoName,
288 ).Scan(&repoDid)
289 return repoDid, err
290}
291
292func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) {
293 return GetRepoKeyOwner(d.db, repoDid)
294}
295
296func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) {
297 err = q.QueryRow(
298 `SELECT owner_did, rkey FROM repo_aliases
299 WHERE repo_did = ?
300 ORDER BY rev DESC
301 LIMIT 1`,
302 repoDid,
303 ).Scan(&ownerDid, &repoName)
304 if err != nil {
305 return
306 }
307 if ownerDid == "" || repoName == "" {
308 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid)
309 return
310 }
311 return
312}
313
314func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
315 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
316 if err != nil {
317 return
318 }
319
320 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
321 if joinErr != nil {
322 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr)
323 return
324 }
325
326 if _, statErr := os.Stat(didPath); statErr != nil {
327 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath)
328 return
329 }
330
331 repoPath = didPath
332 return
333}