Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "log/slog" 7 "slices" 8 "strings" 9 10 _ "github.com/mattn/go-sqlite3" 11 "tangled.org/core/log" 12 "tangled.org/core/orm" 13) 14 15type DB struct { 16 *sql.DB 17} 18 19type DBTX interface { 20 QueryRow(query string, args ...any) *sql.Row 21 Exec(query string, args ...any) (sql.Result, error) 22} 23 24func Make(ctx context.Context, dbPath string) (*DB, error) { 25 // https://github.com/mattn/go-sqlite3#connection-string 26 opts := []string{ 27 "_foreign_keys=1", 28 "_journal_mode=WAL", 29 "_synchronous=NORMAL", 30 "_auto_vacuum=incremental", 31 "_busy_timeout=5000", 32 } 33 34 logger := log.FromContext(ctx) 35 logger = log.SubLogger(logger, "db") 36 37 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 38 if err != nil { 39 return nil, err 40 } 41 42 conn, err := db.Conn(ctx) 43 if err != nil { 44 return nil, err 45 } 46 defer conn.Close() 47 48 _, err = conn.ExecContext(ctx, ` 49 create table if not exists _jetstream ( 50 id integer primary key autoincrement, 51 last_time_us integer not null 52 ); 53 54 create table if not exists known_dids ( 55 did text primary key 56 ); 57 58 create table if not exists repos ( 59 id integer primary key autoincrement, 60 knot text not null, 61 owner text not null, 62 rkey text not null, 63 repo_did text, 64 created_at text, 65 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 66 67 unique(owner, rkey) 68 ); 69 70 create table if not exists repo_collaborators ( 71 id integer primary key autoincrement, 72 owner_did text not null, 73 rkey text not null, 74 subject text not null, 75 repo_did text not null, 76 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 77 78 unique(owner_did, rkey) 79 ); 80 81 create table if not exists spindle_members ( 82 -- identifiers for the record 83 id integer primary key autoincrement, 84 did text not null, 85 rkey text not null, 86 87 -- data 88 instance text not null, 89 subject text not null, 90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 91 92 -- constraints 93 unique (did, rkey) 94 ); 95 96 -- status event for a single workflow 97 create table if not exists events ( 98 rkey text not null, 99 nsid text not null, 100 event text not null, -- json 101 created integer not null -- unix nanos 102 ); 103 104 create table if not exists migrations ( 105 id integer primary key autoincrement, 106 name text unique 107 ); 108 `) 109 if err != nil { 110 return nil, err 111 } 112 113 if err := runMigrations(ctx, conn, logger); err != nil { 114 return nil, err 115 } 116 117 return &DB{db}, nil 118} 119 120func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error { 121 if err := orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error { 122 var hasName int 123 if err := tx.QueryRow( 124 `select count(*) from pragma_table_info('repos') where name = 'name'`, 125 ).Scan(&hasName); err != nil { 126 return err 127 } 128 129 if hasName > 0 { 130 var totalRows, copiedRows int 131 if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil { 132 return err 133 } 134 if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil { 135 return err 136 } 137 if dropped := totalRows - copiedRows; dropped > 0 { 138 logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows) 139 } 140 141 if _, err := tx.Exec(` 142 create table if not exists repos_new ( 143 id integer primary key autoincrement, 144 knot text not null, 145 owner text not null, 146 rkey text not null, 147 repo_did text, 148 created_at text, 149 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 150 151 unique(owner, rkey) 152 ); 153 154 insert into repos_new (id, knot, owner, rkey, addedAt) 155 select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> ''; 156 157 drop table repos; 158 alter table repos_new rename to repos; 159 `); err != nil { 160 return err 161 } 162 } 163 164 _, err := tx.Exec(` 165 create index if not exists idx_repos_repo_did on repos(repo_did); 166 create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did); 167 create index if not exists idx_repo_collaborators_repo_did 168 on repo_collaborators(repo_did); 169 `) 170 return err 171 }); err != nil { 172 return err 173 } 174 175 return orm.RunMigration(conn, logger, "spindle-members-unique-on-rkey", func(tx *sql.Tx) error { 176 hasTarget, err := hasUniqueIndex(tx, "spindle_members", []string{"did", "rkey"}) 177 if err != nil { 178 return err 179 } 180 if hasTarget { 181 return nil 182 } 183 184 var totalRows, distinctRows int 185 if err := tx.QueryRow(`select count(*) from spindle_members`).Scan(&totalRows); err != nil { 186 return err 187 } 188 if err := tx.QueryRow(`select count(*) from (select 1 from spindle_members group by did, rkey)`).Scan(&distinctRows); err != nil { 189 return err 190 } 191 if dropped := totalRows - distinctRows; dropped > 0 { 192 logger.Warn("dropping duplicate (did, rkey) rows during spindle_members rebuild", "dropped", dropped, "kept", distinctRows) 193 } 194 195 _, err = tx.Exec(` 196 create table spindle_members_new ( 197 id integer primary key autoincrement, 198 did text not null, 199 rkey text not null, 200 instance text not null, 201 subject text not null, 202 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 203 unique (did, rkey) 204 ); 205 206 insert into spindle_members_new (id, did, rkey, instance, subject, created) 207 select id, did, rkey, instance, subject, created 208 from spindle_members sm 209 where id = ( 210 select max(id) from spindle_members 211 where did = sm.did and rkey = sm.rkey 212 ); 213 214 drop table spindle_members; 215 alter table spindle_members_new rename to spindle_members; 216 `) 217 return err 218 }) 219} 220 221func hasUniqueIndex(tx *sql.Tx, table string, cols []string) (bool, error) { 222 rows, err := tx.Query( 223 `select name from pragma_index_list(?) where "unique" = 1`, 224 table, 225 ) 226 if err != nil { 227 return false, err 228 } 229 defer rows.Close() 230 231 var indexNames []string 232 for rows.Next() { 233 var name string 234 if err := rows.Scan(&name); err != nil { 235 return false, err 236 } 237 indexNames = append(indexNames, name) 238 } 239 if err := rows.Err(); err != nil { 240 return false, err 241 } 242 243 wantSorted := slices.Clone(cols) 244 slices.Sort(wantSorted) 245 246 for _, name := range indexNames { 247 colRows, err := tx.Query( 248 `select name from pragma_index_info(?) order by seqno`, 249 name, 250 ) 251 if err != nil { 252 return false, err 253 } 254 var got []string 255 for colRows.Next() { 256 var c string 257 if err := colRows.Scan(&c); err != nil { 258 colRows.Close() 259 return false, err 260 } 261 got = append(got, c) 262 } 263 colRows.Close() 264 slices.Sort(got) 265 if slices.Equal(got, wantSorted) { 266 return true, nil 267 } 268 } 269 return false, nil 270} 271 272func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 273 _, err := d.Exec(` 274 insert into _jetstream (id, last_time_us) 275 values (1, ?) 276 on conflict(id) do update set last_time_us = excluded.last_time_us 277 `, lastTimeUs) 278 return err 279} 280 281func (d *DB) GetLastTimeUs() (int64, error) { 282 var lastTimeUs int64 283 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 284 err := row.Scan(&lastTimeUs) 285 return lastTimeUs, err 286}