Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// SQLite-backed persistence for tack. 4// 5// Two responsibilities live here: 6// 7// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto 8// firehose uses to resume from a specific point. Without persistence 9// every restart begins at "now" and we'd silently miss any record 10// published while we were down. 11// 12// 2. Tangled membership state derived from jetstream commits: 13// sh.tangled.spindle.member, sh.tangled.repo, and 14// sh.tangled.repo.collaborator. We need this to later answer 15// "is DID X allowed to trigger a build on this spindle for repo Y?" 16// 17// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver 18// for Go. It requires CGo, which is fine for tack — the project already 19// builds under Nix where a C toolchain is readily available. 20 21import ( 22 "context" 23 "database/sql" 24 "errors" 25 "fmt" 26 "strconv" 27 28 _ "github.com/mattn/go-sqlite3" 29) 30 31// store wraps the SQLite handle and exposes the small set of operations 32// the rest of tack needs. Keeping the surface narrow lets the persistence 33// layer be swapped or mocked later without rewriting callers. 34type store struct { 35 db *sql.DB 36} 37 38// openStore opens (or creates) the SQLite database at path and applies 39// the schema. It also flips on WAL + NORMAL synchronous, which is the 40// usual "this is a long-running server, not a one-shot script" config: 41// concurrent reads don't block the single writer, and we trade a little 42// crash-safety on power loss for a lot of write throughput. 43func openStore(path string) (*store, error) { 44 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode, 45 // _synchronous, _foreign_keys) and applies them on each new connection. 46 // journal_mode=WAL persists in the database file but setting it here 47 // also covers the first-ever open. 48 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path) 49 db, err := sql.Open("sqlite3", dsn) 50 if err != nil { 51 return nil, fmt.Errorf("open sqlite %q: %w", path, err) 52 } 53 54 // SQLite only supports one writer at a time. Capping max open conns 55 // at 1 keeps database/sql from spinning up extra connections that 56 // will only ever serialize behind that writer. 57 db.SetMaxOpenConns(1) 58 59 s := &store{db: db} 60 if err := s.migrate(context.Background()); err != nil { 61 _ = db.Close() 62 return nil, err 63 } 64 return s, nil 65} 66 67// Close releases the underlying database handle. 68func (s *store) Close() error { 69 return s.db.Close() 70} 71 72// metaCursorKey is the meta-table key under which we persist the 73// jetstream cursor. Pulled out as a constant to avoid drift between 74// load/save sites. 75const metaCursorKey = "jetstream_cursor" 76 77// LoadCursor returns the persisted jetstream cursor, or nil if none has 78// been saved yet (signaling "start from now"). The cursor is a unix 79// microsecond timestamp — see jetstream.Event.TimeUS. 80func (s *store) LoadCursor(ctx context.Context) (*int64, error) { 81 var raw string 82 err := s.db.QueryRowContext(ctx, 83 `SELECT value FROM meta WHERE key = ?`, metaCursorKey, 84 ).Scan(&raw) 85 if errors.Is(err, sql.ErrNoRows) { 86 return nil, nil 87 } 88 if err != nil { 89 return nil, fmt.Errorf("load cursor: %w", err) 90 } 91 v, err := strconv.ParseInt(raw, 10, 64) 92 if err != nil { 93 // A malformed cursor shouldn't wedge startup — log-and-ignore 94 // (return nil) so we just resume from "now". The caller has the 95 // logger; we surface the parse error so it can decide. 96 return nil, fmt.Errorf("parse cursor %q: %w", raw, err) 97 } 98 return &v, nil 99} 100 101// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta 102// row is created on first call and updated thereafter. 103func (s *store) SaveCursor(ctx context.Context, cursor int64) error { 104 _, err := s.db.ExecContext(ctx, 105 `INSERT INTO meta (key, value) VALUES (?, ?) 106 ON CONFLICT(key) DO UPDATE SET value = excluded.value`, 107 metaCursorKey, strconv.FormatInt(cursor, 10), 108 ) 109 if err != nil { 110 return fmt.Errorf("save cursor: %w", err) 111 } 112 return nil 113} 114 115// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member 116// observation. 117func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error { 118 _, err := s.db.ExecContext(ctx, 119 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at) 120 VALUES (?, ?, ?, ?, ?) 121 ON CONFLICT(did, rkey) DO UPDATE SET 122 instance = excluded.instance, 123 subject = excluded.subject, 124 created_at = excluded.created_at`, 125 did, rkey, instance, subject, createdAt, 126 ) 127 if err != nil { 128 return fmt.Errorf("upsert spindle_member: %w", err) 129 } 130 return nil 131} 132 133// DeleteSpindleMember removes a member record by its ATProto identity. 134func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error { 135 _, err := s.db.ExecContext(ctx, 136 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`, 137 did, rkey, 138 ) 139 if err != nil { 140 return fmt.Errorf("delete spindle_member: %w", err) 141 } 142 return nil 143} 144 145// UpsertRepo records (or refreshes) a sh.tangled.repo observation. 146// spindle and repoDid may be empty strings — we store them as such rather 147// than as SQL NULLs to keep the upsert path uniform. 148func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error { 149 _, err := s.db.ExecContext(ctx, 150 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at) 151 VALUES (?, ?, ?, ?, ?, ?, ?) 152 ON CONFLICT(did, rkey) DO UPDATE SET 153 knot = excluded.knot, 154 name = excluded.name, 155 spindle = excluded.spindle, 156 repo_did = excluded.repo_did, 157 created_at = excluded.created_at`, 158 did, rkey, knot, name, spindle, repoDid, createdAt, 159 ) 160 if err != nil { 161 return fmt.Errorf("upsert repo: %w", err) 162 } 163 return nil 164} 165 166// DeleteRepo removes a repo record by its ATProto identity. 167func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error { 168 _, err := s.db.ExecContext(ctx, 169 `DELETE FROM repos WHERE did = ? AND rkey = ?`, 170 did, rkey, 171 ) 172 if err != nil { 173 return fmt.Errorf("delete repo: %w", err) 174 } 175 return nil 176} 177 178// UpsertRepoCollaborator records (or refreshes) a 179// sh.tangled.repo.collaborator observation. 180func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error { 181 _, err := s.db.ExecContext(ctx, 182 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at) 183 VALUES (?, ?, ?, ?, ?, ?) 184 ON CONFLICT(did, rkey) DO UPDATE SET 185 repo = excluded.repo, 186 repo_did = excluded.repo_did, 187 subject = excluded.subject, 188 created_at = excluded.created_at`, 189 did, rkey, repo, repoDid, subject, createdAt, 190 ) 191 if err != nil { 192 return fmt.Errorf("upsert repo_collaborator: %w", err) 193 } 194 return nil 195} 196 197// KnotsForSpindle returns the distinct knot hostnames of all repos that 198// have declared the given spindle hostname as their CI spindle. The knot 199// event-stream subscriber uses this to decide which knots to dial. 200// 201// Returns an empty slice (not nil) when nothing matches, so callers can 202// range over the result without a nil check. 203func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) { 204 rows, err := s.db.QueryContext(ctx, 205 `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`, 206 hostname, 207 ) 208 if err != nil { 209 return nil, fmt.Errorf("query knots: %w", err) 210 } 211 defer rows.Close() 212 213 out := []string{} 214 for rows.Next() { 215 var k string 216 if err := rows.Scan(&k); err != nil { 217 return nil, fmt.Errorf("scan knot: %w", err) 218 } 219 out = append(out, k) 220 } 221 if err := rows.Err(); err != nil { 222 return nil, fmt.Errorf("iterate knots: %w", err) 223 } 224 return out, nil 225} 226 227// DeleteRepoCollaborator removes a collaborator record by its ATProto 228// identity. 229func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error { 230 _, err := s.db.ExecContext(ctx, 231 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`, 232 did, rkey, 233 ) 234 if err != nil { 235 return fmt.Errorf("delete repo_collaborator: %w", err) 236 } 237 return nil 238}