Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "log/slog"
7 "slices"
8 "strings"
9
10 _ "github.com/mattn/go-sqlite3"
11 "tangled.org/core/log"
12 "tangled.org/core/orm"
13)
14
15type DB struct {
16 *sql.DB
17}
18
19type DBTX interface {
20 QueryRow(query string, args ...any) *sql.Row
21 Exec(query string, args ...any) (sql.Result, error)
22}
23
24func Make(ctx context.Context, dbPath string) (*DB, error) {
25 // https://github.com/mattn/go-sqlite3#connection-string
26 opts := []string{
27 "_foreign_keys=1",
28 "_journal_mode=WAL",
29 "_synchronous=NORMAL",
30 "_auto_vacuum=incremental",
31 "_busy_timeout=5000",
32 }
33
34 logger := log.FromContext(ctx)
35 logger = log.SubLogger(logger, "db")
36
37 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
38 if err != nil {
39 return nil, err
40 }
41
42 conn, err := db.Conn(ctx)
43 if err != nil {
44 return nil, err
45 }
46 defer conn.Close()
47
48 _, err = conn.ExecContext(ctx, `
49 create table if not exists _jetstream (
50 id integer primary key autoincrement,
51 last_time_us integer not null
52 );
53
54 create table if not exists known_dids (
55 did text primary key
56 );
57
58 create table if not exists repos (
59 id integer primary key autoincrement,
60 knot text not null,
61 owner text not null,
62 rkey text not null,
63 repo_did text,
64 created_at text,
65 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
66
67 unique(owner, rkey)
68 );
69
70 create table if not exists repo_collaborators (
71 id integer primary key autoincrement,
72 owner_did text not null,
73 rkey text not null,
74 subject text not null,
75 repo_did text not null,
76 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
77
78 unique(owner_did, rkey)
79 );
80
81 create table if not exists spindle_members (
82 -- identifiers for the record
83 id integer primary key autoincrement,
84 did text not null,
85 rkey text not null,
86
87 -- data
88 instance text not null,
89 subject text not null,
90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
91
92 -- constraints
93 unique (did, rkey)
94 );
95
96 -- status event for a single workflow
97 create table if not exists events (
98 rkey text not null,
99 nsid text not null,
100 event text not null, -- json
101 created integer not null -- unix nanos
102 );
103
104 create table if not exists migrations (
105 id integer primary key autoincrement,
106 name text unique
107 );
108 `)
109 if err != nil {
110 return nil, err
111 }
112
113 if err := runMigrations(ctx, conn, logger); err != nil {
114 return nil, err
115 }
116
117 return &DB{db}, nil
118}
119
120func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error {
121 if err := orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error {
122 var hasName int
123 if err := tx.QueryRow(
124 `select count(*) from pragma_table_info('repos') where name = 'name'`,
125 ).Scan(&hasName); err != nil {
126 return err
127 }
128
129 if hasName > 0 {
130 var totalRows, copiedRows int
131 if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil {
132 return err
133 }
134 if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil {
135 return err
136 }
137 if dropped := totalRows - copiedRows; dropped > 0 {
138 logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows)
139 }
140
141 if _, err := tx.Exec(`
142 create table if not exists repos_new (
143 id integer primary key autoincrement,
144 knot text not null,
145 owner text not null,
146 rkey text not null,
147 repo_did text,
148 created_at text,
149 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
150
151 unique(owner, rkey)
152 );
153
154 insert into repos_new (id, knot, owner, rkey, addedAt)
155 select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> '';
156
157 drop table repos;
158 alter table repos_new rename to repos;
159 `); err != nil {
160 return err
161 }
162 }
163
164 _, err := tx.Exec(`
165 create index if not exists idx_repos_repo_did on repos(repo_did);
166 create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did);
167 create index if not exists idx_repo_collaborators_repo_did
168 on repo_collaborators(repo_did);
169 `)
170 return err
171 }); err != nil {
172 return err
173 }
174
175 return orm.RunMigration(conn, logger, "spindle-members-unique-on-rkey", func(tx *sql.Tx) error {
176 hasTarget, err := hasUniqueIndex(tx, "spindle_members", []string{"did", "rkey"})
177 if err != nil {
178 return err
179 }
180 if hasTarget {
181 return nil
182 }
183
184 var totalRows, distinctRows int
185 if err := tx.QueryRow(`select count(*) from spindle_members`).Scan(&totalRows); err != nil {
186 return err
187 }
188 if err := tx.QueryRow(`select count(*) from (select 1 from spindle_members group by did, rkey)`).Scan(&distinctRows); err != nil {
189 return err
190 }
191 if dropped := totalRows - distinctRows; dropped > 0 {
192 logger.Warn("dropping duplicate (did, rkey) rows during spindle_members rebuild", "dropped", dropped, "kept", distinctRows)
193 }
194
195 _, err = tx.Exec(`
196 create table spindle_members_new (
197 id integer primary key autoincrement,
198 did text not null,
199 rkey text not null,
200 instance text not null,
201 subject text not null,
202 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
203 unique (did, rkey)
204 );
205
206 insert into spindle_members_new (id, did, rkey, instance, subject, created)
207 select id, did, rkey, instance, subject, created
208 from spindle_members sm
209 where id = (
210 select max(id) from spindle_members
211 where did = sm.did and rkey = sm.rkey
212 );
213
214 drop table spindle_members;
215 alter table spindle_members_new rename to spindle_members;
216 `)
217 return err
218 })
219}
220
221func hasUniqueIndex(tx *sql.Tx, table string, cols []string) (bool, error) {
222 rows, err := tx.Query(
223 `select name from pragma_index_list(?) where "unique" = 1`,
224 table,
225 )
226 if err != nil {
227 return false, err
228 }
229 defer rows.Close()
230
231 var indexNames []string
232 for rows.Next() {
233 var name string
234 if err := rows.Scan(&name); err != nil {
235 return false, err
236 }
237 indexNames = append(indexNames, name)
238 }
239 if err := rows.Err(); err != nil {
240 return false, err
241 }
242
243 wantSorted := slices.Clone(cols)
244 slices.Sort(wantSorted)
245
246 for _, name := range indexNames {
247 colRows, err := tx.Query(
248 `select name from pragma_index_info(?) order by seqno`,
249 name,
250 )
251 if err != nil {
252 return false, err
253 }
254 var got []string
255 for colRows.Next() {
256 var c string
257 if err := colRows.Scan(&c); err != nil {
258 colRows.Close()
259 return false, err
260 }
261 got = append(got, c)
262 }
263 colRows.Close()
264 slices.Sort(got)
265 if slices.Equal(got, wantSorted) {
266 return true, nil
267 }
268 }
269 return false, nil
270}
271
272func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
273 _, err := d.Exec(`
274 insert into _jetstream (id, last_time_us)
275 values (1, ?)
276 on conflict(id) do update set last_time_us = excluded.last_time_us
277 `, lastTimeUs)
278 return err
279}
280
281func (d *DB) GetLastTimeUs() (int64, error) {
282 var lastTimeUs int64
283 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`)
284 err := row.Scan(&lastTimeUs)
285 return lastTimeUs, err
286}