Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "log/slog" 7 "strings" 8 9 _ "github.com/mattn/go-sqlite3" 10 "tangled.org/core/log" 11 "tangled.org/core/orm" 12) 13 14type DB struct { 15 *sql.DB 16} 17 18func Make(ctx context.Context, dbPath string) (*DB, error) { 19 // https://github.com/mattn/go-sqlite3#connection-string 20 opts := []string{ 21 "_foreign_keys=1", 22 "_journal_mode=WAL", 23 "_synchronous=NORMAL", 24 "_auto_vacuum=incremental", 25 "_busy_timeout=5000", 26 } 27 28 logger := log.FromContext(ctx) 29 logger = log.SubLogger(logger, "db") 30 31 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 32 if err != nil { 33 return nil, err 34 } 35 36 conn, err := db.Conn(ctx) 37 if err != nil { 38 return nil, err 39 } 40 defer conn.Close() 41 42 _, err = conn.ExecContext(ctx, ` 43 create table if not exists _jetstream ( 44 id integer primary key autoincrement, 45 last_time_us integer not null 46 ); 47 48 create table if not exists known_dids ( 49 did text primary key 50 ); 51 52 create table if not exists repos ( 53 id integer primary key autoincrement, 54 knot text not null, 55 owner text not null, 56 rkey text not null, 57 repo_did text, 58 created_at text, 59 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 60 61 unique(owner, rkey) 62 ); 63 64 create table if not exists repo_collaborators ( 65 id integer primary key autoincrement, 66 owner_did text not null, 67 rkey text not null, 68 subject text not null, 69 repo_did text not null, 70 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 71 72 unique(owner_did, rkey) 73 ); 74 75 create table if not exists spindle_members ( 76 -- identifiers for the record 77 id integer primary key autoincrement, 78 did text not null, 79 rkey text not null, 80 81 -- data 82 instance text not null, 83 subject text not null, 84 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 85 86 -- constraints 87 unique (did, instance, subject) 88 ); 89 90 -- status event for a single workflow 91 create table if not exists events ( 92 rkey text not null, 93 nsid text not null, 94 event text not null, -- json 95 created integer not null -- unix nanos 96 ); 97 98 create table if not exists migrations ( 99 id integer primary key autoincrement, 100 name text unique 101 ); 102 `) 103 if err != nil { 104 return nil, err 105 } 106 107 if err := runMigrations(ctx, conn, logger); err != nil { 108 return nil, err 109 } 110 111 return &DB{db}, nil 112} 113 114func runMigrations(_ context.Context, conn *sql.Conn, logger *slog.Logger) error { 115 return orm.RunMigration(conn, logger, "repos-to-repo-did", func(tx *sql.Tx) error { 116 var hasName int 117 if err := tx.QueryRow( 118 `select count(*) from pragma_table_info('repos') where name = 'name'`, 119 ).Scan(&hasName); err != nil { 120 return err 121 } 122 123 if hasName > 0 { 124 var totalRows, copiedRows int 125 if err := tx.QueryRow(`select count(*) from repos`).Scan(&totalRows); err != nil { 126 return err 127 } 128 if err := tx.QueryRow(`select count(*) from repos where coalesce(name, '') <> ''`).Scan(&copiedRows); err != nil { 129 return err 130 } 131 if dropped := totalRows - copiedRows; dropped > 0 { 132 logger.Warn("dropping repo rows with empty name during migration", "dropped", dropped, "kept", copiedRows) 133 } 134 135 if _, err := tx.Exec(` 136 create table if not exists repos_new ( 137 id integer primary key autoincrement, 138 knot text not null, 139 owner text not null, 140 rkey text not null, 141 repo_did text, 142 created_at text, 143 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 144 145 unique(owner, rkey) 146 ); 147 148 insert into repos_new (id, knot, owner, rkey, addedAt) 149 select id, knot, owner, name, addedAt from repos where coalesce(name, '') <> ''; 150 151 drop table repos; 152 alter table repos_new rename to repos; 153 `); err != nil { 154 return err 155 } 156 } 157 158 _, err := tx.Exec(` 159 create index if not exists idx_repos_repo_did on repos(repo_did); 160 create index if not exists idx_repos_owner_repo_did on repos(owner, repo_did); 161 create index if not exists idx_repo_collaborators_repo_did 162 on repo_collaborators(repo_did); 163 `) 164 return err 165 }) 166} 167 168func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 169 _, err := d.Exec(` 170 insert into _jetstream (id, last_time_us) 171 values (1, ?) 172 on conflict(id) do update set last_time_us = excluded.last_time_us 173 `, lastTimeUs) 174 return err 175} 176 177func (d *DB) GetLastTimeUs() (int64, error) { 178 var lastTimeUs int64 179 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 180 err := row.Scan(&lastTimeUs) 181 return lastTimeUs, err 182}