Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "log/slog" 7 "slices" 8 "strings" 9 10 _ "github.com/mattn/go-sqlite3" 11 "tangled.org/core/log" 12 "tangled.org/core/orm" 13) 14 15type DB struct { 16 *sql.DB 17} 18 19type DBTX interface { 20 QueryRow(query string, args ...any) *sql.Row 21 Exec(query string, args ...any) (sql.Result, error) 22} 23 24func Make(ctx context.Context, dbPath string) (*DB, error) { 25 // https://github.com/mattn/go-sqlite3#connection-string 26 opts := []string{ 27 "_foreign_keys=1", 28 "_journal_mode=WAL", 29 "_synchronous=NORMAL", 30 "_auto_vacuum=incremental", 31 "_busy_timeout=5000", 32 } 33 34 logger := log.FromContext(ctx) 35 logger = log.SubLogger(logger, "db") 36 37 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 38 if err != nil { 39 return nil, err 40 } 41 42 conn, err := db.Conn(ctx) 43 if err != nil { 44 return nil, err 45 } 46 defer conn.Close() 47 48 _, err = conn.ExecContext(ctx, ` 49 create table if not exists _jetstream ( 50 id integer primary key autoincrement, 51 last_time_us integer not null 52 ); 53 54 create table if not exists known_dids ( 55 did text primary key 56 ); 57 58 create table if not exists repos ( 59 id integer primary key autoincrement, 60 knot text not null, 61 owner text not null, 62 rkey text not null, 63 repo_did text, 64 created_at text, 65 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 66 67 unique(owner, rkey) 68 ); 69 70 create table if not exists repo_collaborators ( 71 id integer primary key autoincrement, 72 owner_did text not null, 73 rkey text not null, 74 subject text not null, 75 repo_did text not null, 76 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 77 78 unique(owner_did, rkey) 79 ); 80 81 create table if not exists spindle_members ( 82 -- identifiers for the record 83 id integer primary key autoincrement, 84 did text not null, 85 rkey text not null, 86 87 -- data 88 instance text not null, 89 subject text not null, 90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 91 92 -- constraints 93 unique (did, rkey) 94 ); 95 96 -- status event for a single workflow 97 create table if not exists events ( 98 rkey text not null, 99 nsid text not null, 100 event text not null, -- json 101 created integer not null -- unix nanos 102 ); 103 104 create table if not exists nixos_toplevel_cache ( 105 config_key text primary key, 106 toplevel text not null, 107 updated_at text not null 108 ); 109 110 create table if not exists migrations ( 111 id integer primary key autoincrement, 112 name text unique 113 ); 114 `) 115 if err != nil { 116 return nil, err 117 } 118 119 if err := runMigrations(ctx, conn, logger); err != nil { 120 return nil, err 121 } 122 123 return &DB{db}, nil 124} 125 126func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error { 127 if err := orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error { 128 var hasName int 129 if err := tx.QueryRow( 130 `select count(*) from pragma_table_info('repos') where name = 'name'`, 131 ).Scan(&hasName); err != nil { 132 return err 133 } 134 135 if hasName > 0 { 136 var totalRows, copiedRows int 137 if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil { 138 return err 139 } 140 if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil { 141 return err 142 } 143 if dropped := totalRows - copiedRows; dropped > 0 { 144 logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows) 145 } 146 147 if _, err := tx.Exec(` 148 create table if not exists repos_new ( 149 id integer primary key autoincrement, 150 knot text not null, 151 owner text not null, 152 rkey text not null, 153 repo_did text, 154 created_at text, 155 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 156 157 unique(owner, rkey) 158 ); 159 160 insert into repos_new (id, knot, owner, rkey, addedAt) 161 select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> ''; 162 163 drop table repos; 164 alter table repos_new rename to repos; 165 `); err != nil { 166 return err 167 } 168 } 169 170 _, err := tx.Exec(` 171 create index if not exists idx_repos_repo_did on repos(repo_did); 172 create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did); 173 create index if not exists idx_repo_collaborators_repo_did 174 on repo_collaborators(repo_did); 175 `) 176 return err 177 }); err != nil { 178 return err 179 } 180 181 return orm.RunMigration(conn, logger, "spindle-members-unique-on-rkey", func(tx *sql.Tx) error { 182 hasTarget, err := hasUniqueIndex(tx, "spindle_members", []string{"did", "rkey"}) 183 if err != nil { 184 return err 185 } 186 if hasTarget { 187 return nil 188 } 189 190 var totalRows, distinctRows int 191 if err := tx.QueryRow(`select count(*) from spindle_members`).Scan(&totalRows); err != nil { 192 return err 193 } 194 if err := tx.QueryRow(`select count(*) from (select 1 from spindle_members group by did, rkey)`).Scan(&distinctRows); err != nil { 195 return err 196 } 197 if dropped := totalRows - distinctRows; dropped > 0 { 198 logger.Warn("dropping duplicate (did, rkey) rows during spindle_members rebuild", "dropped", dropped, "kept", distinctRows) 199 } 200 201 _, err = tx.Exec(` 202 create table spindle_members_new ( 203 id integer primary key autoincrement, 204 did text not null, 205 rkey text not null, 206 instance text not null, 207 subject text not null, 208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 209 unique (did, rkey) 210 ); 211 212 insert into spindle_members_new (id, did, rkey, instance, subject, created) 213 select id, did, rkey, instance, subject, created 214 from spindle_members sm 215 where id = ( 216 select max(id) from spindle_members 217 where did = sm.did and rkey = sm.rkey 218 ); 219 220 drop table spindle_members; 221 alter table spindle_members_new rename to spindle_members; 222 `) 223 return err 224 }) 225} 226 227func hasUniqueIndex(tx *sql.Tx, table string, cols []string) (bool, error) { 228 rows, err := tx.Query( 229 `select name from pragma_index_list(?) where "unique" = 1`, 230 table, 231 ) 232 if err != nil { 233 return false, err 234 } 235 defer rows.Close() 236 237 var indexNames []string 238 for rows.Next() { 239 var name string 240 if err := rows.Scan(&name); err != nil { 241 return false, err 242 } 243 indexNames = append(indexNames, name) 244 } 245 if err := rows.Err(); err != nil { 246 return false, err 247 } 248 249 wantSorted := slices.Clone(cols) 250 slices.Sort(wantSorted) 251 252 for _, name := range indexNames { 253 colRows, err := tx.Query( 254 `select name from pragma_index_info(?) order by seqno`, 255 name, 256 ) 257 if err != nil { 258 return false, err 259 } 260 var got []string 261 for colRows.Next() { 262 var c string 263 if err := colRows.Scan(&c); err != nil { 264 colRows.Close() 265 return false, err 266 } 267 got = append(got, c) 268 } 269 colRows.Close() 270 slices.Sort(got) 271 if slices.Equal(got, wantSorted) { 272 return true, nil 273 } 274 } 275 return false, nil 276} 277 278func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 279 _, err := d.Exec(` 280 insert into _jetstream (id, last_time_us) 281 values (1, ?) 282 on conflict(id) do update set last_time_us = excluded.last_time_us 283 `, lastTimeUs) 284 return err 285} 286 287func (d *DB) GetLastTimeUs() (int64, error) { 288 var lastTimeUs int64 289 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 290 err := row.Scan(&lastTimeUs) 291 return lastTimeUs, err 292}