Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "log/slog"
7 "slices"
8 "strings"
9
10 _ "github.com/mattn/go-sqlite3"
11 "tangled.org/core/log"
12 "tangled.org/core/orm"
13)
14
15type DB struct {
16 *sql.DB
17}
18
19type DBTX interface {
20 QueryRow(query string, args ...any) *sql.Row
21 Exec(query string, args ...any) (sql.Result, error)
22}
23
24func Make(ctx context.Context, dbPath string) (*DB, error) {
25 // https://github.com/mattn/go-sqlite3#connection-string
26 opts := []string{
27 "_foreign_keys=1",
28 "_journal_mode=WAL",
29 "_synchronous=NORMAL",
30 "_auto_vacuum=incremental",
31 "_busy_timeout=5000",
32 }
33
34 logger := log.FromContext(ctx)
35 logger = log.SubLogger(logger, "db")
36
37 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
38 if err != nil {
39 return nil, err
40 }
41
42 conn, err := db.Conn(ctx)
43 if err != nil {
44 return nil, err
45 }
46 defer conn.Close()
47
48 _, err = conn.ExecContext(ctx, `
49 create table if not exists _jetstream (
50 id integer primary key autoincrement,
51 last_time_us integer not null
52 );
53
54 create table if not exists known_dids (
55 did text primary key
56 );
57
58 create table if not exists repos (
59 id integer primary key autoincrement,
60 knot text not null,
61 owner text not null,
62 rkey text not null,
63 repo_did text,
64 created_at text,
65 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
66
67 unique(owner, rkey)
68 );
69
70 create table if not exists repo_collaborators (
71 id integer primary key autoincrement,
72 owner_did text not null,
73 rkey text not null,
74 subject text not null,
75 repo_did text not null,
76 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
77
78 unique(owner_did, rkey)
79 );
80
81 create table if not exists spindle_members (
82 -- identifiers for the record
83 id integer primary key autoincrement,
84 did text not null,
85 rkey text not null,
86
87 -- data
88 instance text not null,
89 subject text not null,
90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
91
92 -- constraints
93 unique (did, rkey)
94 );
95
96 -- status event for a single workflow
97 create table if not exists events (
98 rkey text not null,
99 nsid text not null,
100 event text not null, -- json
101 created integer not null -- unix nanos
102 );
103
104 create table if not exists nixos_toplevel_cache (
105 config_key text primary key,
106 toplevel text not null,
107 updated_at text not null
108 );
109
110 create table if not exists migrations (
111 id integer primary key autoincrement,
112 name text unique
113 );
114 `)
115 if err != nil {
116 return nil, err
117 }
118
119 if err := runMigrations(ctx, conn, logger); err != nil {
120 return nil, err
121 }
122
123 return &DB{db}, nil
124}
125
126func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error {
127 if err := orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error {
128 var hasName int
129 if err := tx.QueryRow(
130 `select count(*) from pragma_table_info('repos') where name = 'name'`,
131 ).Scan(&hasName); err != nil {
132 return err
133 }
134
135 if hasName > 0 {
136 var totalRows, copiedRows int
137 if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil {
138 return err
139 }
140 if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil {
141 return err
142 }
143 if dropped := totalRows - copiedRows; dropped > 0 {
144 logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows)
145 }
146
147 if _, err := tx.Exec(`
148 create table if not exists repos_new (
149 id integer primary key autoincrement,
150 knot text not null,
151 owner text not null,
152 rkey text not null,
153 repo_did text,
154 created_at text,
155 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
156
157 unique(owner, rkey)
158 );
159
160 insert into repos_new (id, knot, owner, rkey, addedAt)
161 select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> '';
162
163 drop table repos;
164 alter table repos_new rename to repos;
165 `); err != nil {
166 return err
167 }
168 }
169
170 _, err := tx.Exec(`
171 create index if not exists idx_repos_repo_did on repos(repo_did);
172 create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did);
173 create index if not exists idx_repo_collaborators_repo_did
174 on repo_collaborators(repo_did);
175 `)
176 return err
177 }); err != nil {
178 return err
179 }
180
181 return orm.RunMigration(conn, logger, "spindle-members-unique-on-rkey", func(tx *sql.Tx) error {
182 hasTarget, err := hasUniqueIndex(tx, "spindle_members", []string{"did", "rkey"})
183 if err != nil {
184 return err
185 }
186 if hasTarget {
187 return nil
188 }
189
190 var totalRows, distinctRows int
191 if err := tx.QueryRow(`select count(*) from spindle_members`).Scan(&totalRows); err != nil {
192 return err
193 }
194 if err := tx.QueryRow(`select count(*) from (select 1 from spindle_members group by did, rkey)`).Scan(&distinctRows); err != nil {
195 return err
196 }
197 if dropped := totalRows - distinctRows; dropped > 0 {
198 logger.Warn("dropping duplicate (did, rkey) rows during spindle_members rebuild", "dropped", dropped, "kept", distinctRows)
199 }
200
201 _, err = tx.Exec(`
202 create table spindle_members_new (
203 id integer primary key autoincrement,
204 did text not null,
205 rkey text not null,
206 instance text not null,
207 subject text not null,
208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
209 unique (did, rkey)
210 );
211
212 insert into spindle_members_new (id, did, rkey, instance, subject, created)
213 select id, did, rkey, instance, subject, created
214 from spindle_members sm
215 where id = (
216 select max(id) from spindle_members
217 where did = sm.did and rkey = sm.rkey
218 );
219
220 drop table spindle_members;
221 alter table spindle_members_new rename to spindle_members;
222 `)
223 return err
224 })
225}
226
227func hasUniqueIndex(tx *sql.Tx, table string, cols []string) (bool, error) {
228 rows, err := tx.Query(
229 `select name from pragma_index_list(?) where "unique" = 1`,
230 table,
231 )
232 if err != nil {
233 return false, err
234 }
235 defer rows.Close()
236
237 var indexNames []string
238 for rows.Next() {
239 var name string
240 if err := rows.Scan(&name); err != nil {
241 return false, err
242 }
243 indexNames = append(indexNames, name)
244 }
245 if err := rows.Err(); err != nil {
246 return false, err
247 }
248
249 wantSorted := slices.Clone(cols)
250 slices.Sort(wantSorted)
251
252 for _, name := range indexNames {
253 colRows, err := tx.Query(
254 `select name from pragma_index_info(?) order by seqno`,
255 name,
256 )
257 if err != nil {
258 return false, err
259 }
260 var got []string
261 for colRows.Next() {
262 var c string
263 if err := colRows.Scan(&c); err != nil {
264 colRows.Close()
265 return false, err
266 }
267 got = append(got, c)
268 }
269 colRows.Close()
270 slices.Sort(got)
271 if slices.Equal(got, wantSorted) {
272 return true, nil
273 }
274 }
275 return false, nil
276}
277
278func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
279 _, err := d.Exec(`
280 insert into _jetstream (id, last_time_us)
281 values (1, ?)
282 on conflict(id) do update set last_time_us = excluded.last_time_us
283 `, lastTimeUs)
284 return err
285}
286
287func (d *DB) GetLastTimeUs() (int64, error) {
288 var lastTimeUs int64
289 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`)
290 err := row.Scan(&lastTimeUs)
291 return lastTimeUs, err
292}