Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10
11 securejoin "github.com/cyphar/filepath-securejoin"
12 _ "github.com/mattn/go-sqlite3"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15)
16
17type DB struct {
18 db *sql.DB
19 logger *slog.Logger
20}
21
22type Querier interface {
23 QueryRow(query string, args ...any) *sql.Row
24 Exec(query string, args ...any) (sql.Result, error)
25}
26
27func Setup(ctx context.Context, dbPath string) (*DB, error) {
28 // https://github.com/mattn/go-sqlite3#connection-string
29 opts := []string{
30 "_foreign_keys=1",
31 "_journal_mode=WAL",
32 "_synchronous=NORMAL",
33 "_auto_vacuum=incremental",
34 "_busy_timeout=5000",
35 }
36
37 logger := log.FromContext(ctx)
38 logger = log.SubLogger(logger, "db")
39
40 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
41 if err != nil {
42 return nil, err
43 }
44
45 conn, err := db.Conn(ctx)
46 if err != nil {
47 return nil, err
48 }
49 defer conn.Close()
50
51 _, err = conn.ExecContext(ctx, `
52 create table if not exists known_dids (
53 did text primary key
54 );
55
56 create table if not exists public_keys (
57 id integer primary key autoincrement,
58 did text not null,
59 key text not null,
60 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
61 unique(did, key),
62 foreign key (did) references known_dids(did) on delete cascade
63 );
64
65 create table if not exists _jetstream (
66 id integer primary key autoincrement,
67 last_time_us integer not null
68 );
69
70 create table if not exists events (
71 rkey text not null,
72 nsid text not null,
73 event text not null, -- json
74 created integer not null default (strftime('%s', 'now')),
75 primary key (rkey, nsid)
76 );
77
78 create table if not exists repo_keys (
79 repo_did text primary key,
80 signing_key blob not null,
81 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
82 );
83
84 create table if not exists migrations (
85 id integer primary key autoincrement,
86 name text unique
87 );
88 `)
89 if err != nil {
90 return nil, err
91 }
92
93 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error {
94 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
95 return mErr
96 }); err != nil {
97 return nil, err
98 }
99
100 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error {
101 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
102 return mErr
103 }); err != nil {
104 return nil, err
105 }
106
107 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error {
108 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
109 return mErr
110 }); err != nil {
111 return nil, err
112 }
113
114 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error {
115 _, mErr := tx.ExecContext(ctx, `
116 create table repo_keys_new (
117 repo_did text primary key,
118 signing_key blob,
119 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
120 owner_did text,
121 repo_name text,
122 at_uri text,
123 key_type text not null default 'k256'
124 );
125 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
126 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
127 from repo_keys;
128 drop table repo_keys;
129 alter table repo_keys_new rename to repo_keys;
130 create unique index if not exists idx_repo_keys_owner_repo
131 on repo_keys(owner_did, repo_name);
132 `)
133 return mErr
134 }); err != nil {
135 return nil, err
136 }
137
138 if err := orm.RunMigration(conn, logger, "add-repo-aliases", func(tx *sql.Tx) error {
139 _, mErr := tx.ExecContext(ctx, `
140 create table if not exists repo_aliases (
141 owner_did text not null,
142 rkey text not null,
143 repo_did text not null,
144 rev text not null,
145 primary key (owner_did, rkey)
146 );
147 create index if not exists idx_repo_aliases_repo_did on repo_aliases(repo_did);
148
149 insert or ignore into repo_aliases (owner_did, rkey, repo_did, rev)
150 select owner_did, repo_name, repo_did, '1_' || created_at
151 from repo_keys
152 where owner_did is not null and repo_name is not null and repo_did is not null;
153 `)
154 return mErr
155 }); err != nil {
156 return nil, err
157 }
158
159 if err := orm.RunMigration(conn, logger, "drop-at-uri-from-repo-keys", func(tx *sql.Tx) error {
160 _, mErr := tx.ExecContext(ctx, `
161 create table repo_keys_new (
162 repo_did text primary key,
163 signing_key blob,
164 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
165 owner_did text,
166 repo_name text,
167 key_type text not null default 'k256'
168 );
169 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type)
170 select repo_did, signing_key, created_at, owner_did, repo_name, key_type
171 from repo_keys;
172 drop table repo_keys;
173 alter table repo_keys_new rename to repo_keys;
174 create unique index if not exists idx_repo_keys_owner_repo
175 on repo_keys(owner_did, repo_name);
176 `)
177 return mErr
178 }); err != nil {
179 return nil, err
180 }
181
182 return &DB{
183 db: db,
184 logger: logger,
185 }, nil
186}
187
188func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
189 return d.storeRepoKeyRow(repoDid, signingKey, ownerDid, repoName, "k256")
190}
191
192func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
193 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web")
194}
195
196func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) (err error) {
197 tx, err := d.db.Begin()
198 if err != nil {
199 return err
200 }
201 defer func() {
202 if err != nil {
203 tx.Rollback()
204 return
205 }
206 err = tx.Commit()
207 }()
208
209 if _, err = tx.Exec(
210 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`,
211 repoDid, signingKey, ownerDid, repoName, keyType,
212 ); err != nil {
213 return err
214 }
215
216 _, err = tx.Exec(
217 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev)
218 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
219 ON CONFLICT(owner_did, rkey) DO NOTHING`,
220 ownerDid, repoName, repoDid,
221 )
222 return err
223}
224
225func (d *DB) DeleteRepoKey(repoDid string) error {
226 _, err := d.db.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid)
227 return err
228}
229
230func (d *DB) RepoDidExists(repoDid string) (bool, error) {
231 var count int
232 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
233 return count > 0, err
234}
235
236func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
237 var repoDid string
238 err := d.db.QueryRow(
239 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`,
240 ownerDid, rkey,
241 ).Scan(&repoDid)
242 return repoDid, err
243}
244
245func (d *DB) GetRepoKeyOwner(repoDid string) (string, string, error) {
246 return GetRepoKeyOwner(d.db, repoDid)
247}
248
249func GetRepoKeyOwner(q Querier, repoDid string) (ownerDid string, repoName string, err error) {
250 err = q.QueryRow(
251 `SELECT owner_did, rkey FROM repo_aliases
252 WHERE repo_did = ?
253 ORDER BY rev DESC
254 LIMIT 1`,
255 repoDid,
256 ).Scan(&ownerDid, &repoName)
257 if err != nil {
258 return
259 }
260 if ownerDid == "" || repoName == "" {
261 err = fmt.Errorf("repo_aliases row for %s has empty owner_did or rkey", repoDid)
262 return
263 }
264 return
265}
266
267func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
268 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
269 if err != nil {
270 return
271 }
272
273 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
274 if joinErr != nil {
275 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr)
276 return
277 }
278
279 if _, statErr := os.Stat(didPath); statErr != nil {
280 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath)
281 return
282 }
283
284 repoPath = didPath
285 return
286}