forked from
tangled.org/core
Monorepo for Tangled
1package db
2
3import (
4 "context"
5 "database/sql"
6 "log/slog"
7 "strings"
8
9 _ "github.com/mattn/go-sqlite3"
10 "tangled.org/core/log"
11 "tangled.org/core/orm"
12)
13
14type DB struct {
15 *sql.DB
16}
17
18func Make(ctx context.Context, dbPath string) (*DB, error) {
19 // https://github.com/mattn/go-sqlite3#connection-string
20 opts := []string{
21 "_foreign_keys=1",
22 "_journal_mode=WAL",
23 "_synchronous=NORMAL",
24 "_auto_vacuum=incremental",
25 "_busy_timeout=5000",
26 }
27
28 logger := log.FromContext(ctx)
29 logger = log.SubLogger(logger, "db")
30
31 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
32 if err != nil {
33 return nil, err
34 }
35
36 conn, err := db.Conn(ctx)
37 if err != nil {
38 return nil, err
39 }
40 defer conn.Close()
41
42 _, err = conn.ExecContext(ctx, `
43 create table if not exists _jetstream (
44 id integer primary key autoincrement,
45 last_time_us integer not null
46 );
47
48 create table if not exists known_dids (
49 did text primary key
50 );
51
52 create table if not exists repos (
53 id integer primary key autoincrement,
54 knot text not null,
55 owner text not null,
56 rkey text not null,
57 repo_did text,
58 created_at text,
59 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
60
61 unique(owner, rkey)
62 );
63
64 create table if not exists repo_collaborators (
65 id integer primary key autoincrement,
66 owner_did text not null,
67 rkey text not null,
68 subject text not null,
69 repo_did text not null,
70 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
71
72 unique(owner_did, rkey)
73 );
74
75 create table if not exists spindle_members (
76 -- identifiers for the record
77 id integer primary key autoincrement,
78 did text not null,
79 rkey text not null,
80
81 -- data
82 instance text not null,
83 subject text not null,
84 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
85
86 -- constraints
87 unique (did, instance, subject)
88 );
89
90 -- status event for a single workflow
91 create table if not exists events (
92 rkey text not null,
93 nsid text not null,
94 event text not null, -- json
95 created integer not null -- unix nanos
96 );
97
98 create table if not exists migrations (
99 id integer primary key autoincrement,
100 name text unique
101 );
102 `)
103 if err != nil {
104 return nil, err
105 }
106
107 if err := runMigrations(ctx, conn, logger); err != nil {
108 return nil, err
109 }
110
111 return &DB{db}, nil
112}
113
114func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error {
115 return orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error {
116 var hasName int
117 if err := tx.QueryRow(
118 `select count(*) from pragma_table_info('repos') where name = 'name'`,
119 ).Scan(&hasName); err != nil {
120 return err
121 }
122
123 if hasName > 0 {
124 var totalRows, copiedRows int
125 if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil {
126 return err
127 }
128 if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil {
129 return err
130 }
131 if dropped := totalRows - copiedRows; dropped > 0 {
132 logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows)
133 }
134
135 if _, err := tx.Exec(`
136 create table if not exists repos_new (
137 id integer primary key autoincrement,
138 knot text not null,
139 owner text not null,
140 rkey text not null,
141 repo_did text,
142 created_at text,
143 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
144
145 unique(owner, rkey)
146 );
147
148 insert into repos_new (id, knot, owner, rkey, addedAt)
149 select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> '';
150
151 drop table repos;
152 alter table repos_new rename to repos;
153 `); err != nil {
154 return err
155 }
156 }
157
158 _, err := tx.Exec(`
159 create index if not exists idx_repos_repo_did on repos(repo_did);
160 create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did);
161 create index if not exists idx_repo_collaborators_repo_did
162 on repo_collaborators(repo_did);
163 `)
164 return err
165 })
166}
167
168func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
169 _, err := d.Exec(`
170 insert into _jetstream (id, last_time_us)
171 values (1, ?)
172 on conflict(id) do update set last_time_us = excluded.last_time_us
173 `, lastTimeUs)
174 return err
175}
176
177func (d *DB) GetLastTimeUs() (int64, error) {
178 var lastTimeUs int64
179 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`)
180 err := row.Scan(&lastTimeUs)
181 return lastTimeUs, err
182}