Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10
11 securejoin "github.com/cyphar/filepath-securejoin"
12 _ "github.com/mattn/go-sqlite3"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15)
16
17type DB struct {
18 db *sql.DB
19 logger *slog.Logger
20}
21
22type Querier interface {
23 QueryRow(query string, args ...any) *sql.Row
24 Exec(query string, args ...any) (sql.Result, error)
25}
26
27func Setup(ctx context.Context, dbPath string) (*DB, error) {
28 // https://github.com/mattn/go-sqlite3#connection-string
29 opts := []string{
30 "_foreign_keys=1",
31 "_journal_mode=WAL",
32 "_synchronous=NORMAL",
33 "_auto_vacuum=incremental",
34 "_busy_timeout=5000",
35 }
36
37 logger := log.FromContext(ctx)
38 logger = log.SubLogger(logger, "db")
39
40 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
41 if err != nil {
42 return nil, err
43 }
44
45 conn, err := db.Conn(ctx)
46 if err != nil {
47 return nil, err
48 }
49 defer conn.Close()
50
51 _, err = conn.ExecContext(ctx, `
52 create table if not exists known_dids (
53 did text primary key
54 );
55
56 create table if not exists public_keys (
57 id integer primary key autoincrement,
58 did text not null,
59 key text not null,
60 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
61 unique(did, key),
62 foreign key (did) references known_dids(did) on delete cascade
63 );
64
65 create table if not exists _jetstream (
66 id integer primary key autoincrement,
67 last_time_us integer not null
68 );
69
70 create table if not exists events (
71 rkey text not null,
72 nsid text not null,
73 event text not null, -- json
74 created integer not null default (strftime('%s', 'now')),
75 primary key (rkey, nsid)
76 );
77
78 create table if not exists repo_keys (
79 repo_did text primary key,
80 signing_key blob not null,
81 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
82 );
83
84 create table if not exists migrations (
85 id integer primary key autoincrement,
86 name text unique
87 );
88 `)
89 if err != nil {
90 return nil, err
91 }
92
93 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error {
94 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
95 return mErr
96 }); err != nil {
97 return nil, err
98 }
99
100 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error {
101 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
102 return mErr
103 }); err != nil {
104 return nil, err
105 }
106
107 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error {
108 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
109 return mErr
110 }); err != nil {
111 return nil, err
112 }
113
114 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error {
115 _, mErr := tx.ExecContext(ctx, `
116 create table repo_keys_new (
117 repo_did text primary key,
118 signing_key blob,
119 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
120 owner_did text,
121 repo_name text,
122 at_uri text,
123 key_type text not null default 'k256'
124 );
125 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
126 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
127 from repo_keys;
128 drop table repo_keys;
129 alter table repo_keys_new rename to repo_keys;
130 create unique index if not exists idx_repo_keys_owner_repo
131 on repo_keys(owner_did, repo_name);
132 `)
133 return mErr
134 }); err != nil {
135 return nil, err
136 }
137
138 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error {
139 _, mErr := tx.ExecContext(ctx, `
140 create table if not exists repo_aliases (
141 owner_did text not null,
142 rkey text not null,
143 repo_did text not null,
144 rev text not null,
145 primary key (owner_did, rkey)
146 );
147 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did);
148
149 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev)
150 select owner_did, repo_name, repo_did, '1_' || created_at
151 from repo_keys
152 where owner_did is not null and repo_name is not null and repo_did is not null;
153 `)
154 return mErr
155 }); err != nil {
156 return nil, err
157 }
158
159 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error {
160 _, mErr := tx.ExecContext(ctx, `
161 create table repo_keys_new (
162 repo_did text primary key,
163 signing_key blob,
164 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
165 owner_did text,
166 repo_name text,
167 key_type text not null default 'k256'
168 );
169 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
170 select repo_did, signing_key, created_at, owner_did, repo_name, key_type
171 from repo_keys;
172 drop table repo_keys;
173 alter table repo_keys_new rename to repo_keys;
174 create unique index if not exists idx_repo_keys_owner_repo
175 on repo_keys(owner_did, repo_name);
176 `)
177 return mErr
178 }); err != nil {
179 return nil, err
180 }
181
182 return &DB{
183 db: db,
184 logger: logger,
185 }, nil
186}
187
188func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
189 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256")
190}
191
192func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
193 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web")
194}
195
196func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error {
197 tx, err := d.db.Begin()
198 if err != nil {
199 return err
200 }
201 defer tx.Rollback()
202
203 if _, err := tx.Exec(
204 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`,
205 repoDid, signingKey, ownerDid, repoName, keyType,
206 ); err != nil {
207 return err
208 }
209
210 if _, err := tx.Exec(
211 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev)
212 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
213 ON CONFLICT(owner_did, rkey) DO NOTHING`,
214 ownerDid, repoName, repoDid,
215 ); err != nil {
216 return err
217 }
218
219 return tx.Commit()
220}
221
222func (d *DB) DeleteRepoKey(repoDid string) error {
223 tx, err := d.db.Begin()
224 if err != nil {
225 return err
226 }
227 defer tx.Rollback()
228
229 if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil {
230 return err
231 }
232
233 if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil {
234 return err
235 }
236
237 return tx.Commit()
238}
239
240func (d *DB) RepoDidExists(repoDid string) (bool, error) {
241 var count int
242 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
243 return count > 0, err
244}
245
246func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
247 var repoDid string
248 err := d.db.QueryRow(
249 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`,
250 ownerDid, rkey,
251 ).Scan(&repoDid)
252 return repoDid, err
253}
254
255func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) {
256 var repoDid string
257 err := d.db.QueryRow(
258 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`,
259 ownerDid, repoName,
260 ).Scan(&repoDid)
261 return repoDid, err
262}
263
264func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) {
265 return GetRepoKeyOwner(d.db, repoDid)
266}
267
268func GetRepoKeyOwner(q Querier, repoDid string) (ownerDid string, repoName string, err error) {
269 err = q.QueryRow(
270 `SELECT owner_did, rkey FROM repo_aliases
271 WHERE repo_did = ?
272 ORDER BY rev DESC
273 LIMIT 1`,
274 repoDid,
275 ).Scan(&ownerDid, &repoName)
276 if err != nil {
277 return
278 }
279 if ownerDid == "" || repoName == "" {
280 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid)
281 return
282 }
283 return
284}
285
286func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
287 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
288 if err != nil {
289 return
290 }
291
292 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
293 if joinErr != nil {
294 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr)
295 return
296 }
297
298 if _, statErr := os.Stat(didPath); statErr != nil {
299 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath)
300 return
301 }
302
303 repoPath = didPath
304 return
305}