Stitch any CI into Tangled
1package main
2
3// SQLite-backed persistence for tack.
4//
5// Two responsibilities live here:
6//
7// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto
8// firehose uses to resume from a specific point. Without persistence
9// every restart begins at "now" and we'd silently miss any record
10// published while we were down.
11//
12// 2. Tangled membership state derived from jetstream commits:
13// sh.tangled.spindle.member, sh.tangled.repo, and
14// sh.tangled.repo.collaborator. We need this to later answer
15// "is DID X allowed to trigger a build on this spindle for repo Y?"
16//
17// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver
18// for Go. It requires CGo, which is fine for tack — the project already
19// builds under Nix where a C toolchain is readily available.
20
21import (
22 "context"
23 "database/sql"
24 "errors"
25 "fmt"
26 "strconv"
27
28 _ "github.com/mattn/go-sqlite3"
29)
30
31// store wraps the SQLite handle and exposes the small set of operations
32// the rest of tack needs. Keeping the surface narrow lets the persistence
33// layer be swapped or mocked later without rewriting callers.
34type store struct {
35 db *sql.DB
36}
37
38// openStore opens (or creates) the SQLite database at path and applies
39// the schema. It also flips on WAL + NORMAL synchronous, which is the
40// usual "this is a long-running server, not a one-shot script" config:
41// concurrent reads don't block the single writer, and we trade a little
42// crash-safety on power loss for a lot of write throughput.
43func openStore(path string) (*store, error) {
44 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode,
45 // _synchronous, _foreign_keys) and applies them on each new connection.
46 // journal_mode=WAL persists in the database file but setting it here
47 // also covers the first-ever open.
48 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path)
49 db, err := sql.Open("sqlite3", dsn)
50 if err != nil {
51 return nil, fmt.Errorf("open sqlite %q: %w", path, err)
52 }
53
54 // SQLite only supports one writer at a time. Capping max open conns
55 // at 1 keeps database/sql from spinning up extra connections that
56 // will only ever serialize behind that writer.
57 db.SetMaxOpenConns(1)
58
59 s := &store{db: db}
60 if err := s.migrate(context.Background()); err != nil {
61 _ = db.Close()
62 return nil, err
63 }
64 return s, nil
65}
66
67// Close releases the underlying database handle.
68func (s *store) Close() error {
69 return s.db.Close()
70}
71
72// metaCursorKey is the meta-table key under which we persist the
73// jetstream cursor. Pulled out as a constant to avoid drift between
74// load/save sites.
75const metaCursorKey = "jetstream_cursor"
76
77// LoadCursor returns the persisted jetstream cursor, or nil if none has
78// been saved yet (signaling "start from now"). The cursor is a unix
79// microsecond timestamp — see jetstream.Event.TimeUS.
80func (s *store) LoadCursor(ctx context.Context) (*int64, error) {
81 var raw string
82 err := s.db.QueryRowContext(ctx,
83 `SELECT value FROM meta WHERE key = ?`, metaCursorKey,
84 ).Scan(&raw)
85 if errors.Is(err, sql.ErrNoRows) {
86 return nil, nil
87 }
88 if err != nil {
89 return nil, fmt.Errorf("load cursor: %w", err)
90 }
91 v, err := strconv.ParseInt(raw, 10, 64)
92 if err != nil {
93 // A malformed cursor shouldn't wedge startup — log-and-ignore
94 // (return nil) so we just resume from "now". The caller has the
95 // logger; we surface the parse error so it can decide.
96 return nil, fmt.Errorf("parse cursor %q: %w", raw, err)
97 }
98 return &v, nil
99}
100
101// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta
102// row is created on first call and updated thereafter.
103func (s *store) SaveCursor(ctx context.Context, cursor int64) error {
104 _, err := s.db.ExecContext(ctx,
105 `INSERT INTO meta (key, value) VALUES (?, ?)
106 ON CONFLICT(key) DO UPDATE SET value = excluded.value`,
107 metaCursorKey, strconv.FormatInt(cursor, 10),
108 )
109 if err != nil {
110 return fmt.Errorf("save cursor: %w", err)
111 }
112 return nil
113}
114
115// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member
116// observation.
117func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error {
118 _, err := s.db.ExecContext(ctx,
119 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at)
120 VALUES (?, ?, ?, ?, ?)
121 ON CONFLICT(did, rkey) DO UPDATE SET
122 instance = excluded.instance,
123 subject = excluded.subject,
124 created_at = excluded.created_at`,
125 did, rkey, instance, subject, createdAt,
126 )
127 if err != nil {
128 return fmt.Errorf("upsert spindle_member: %w", err)
129 }
130 return nil
131}
132
133// DeleteSpindleMember removes a member record by its ATProto identity.
134func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error {
135 _, err := s.db.ExecContext(ctx,
136 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`,
137 did, rkey,
138 )
139 if err != nil {
140 return fmt.Errorf("delete spindle_member: %w", err)
141 }
142 return nil
143}
144
145// UpsertRepo records (or refreshes) a sh.tangled.repo observation.
146// spindle and repoDid may be empty strings — we store them as such rather
147// than as SQL NULLs to keep the upsert path uniform.
148func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error {
149 _, err := s.db.ExecContext(ctx,
150 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at)
151 VALUES (?, ?, ?, ?, ?, ?, ?)
152 ON CONFLICT(did, rkey) DO UPDATE SET
153 knot = excluded.knot,
154 name = excluded.name,
155 spindle = excluded.spindle,
156 repo_did = excluded.repo_did,
157 created_at = excluded.created_at`,
158 did, rkey, knot, name, spindle, repoDid, createdAt,
159 )
160 if err != nil {
161 return fmt.Errorf("upsert repo: %w", err)
162 }
163 return nil
164}
165
166// DeleteRepo removes a repo record by its ATProto identity.
167func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error {
168 _, err := s.db.ExecContext(ctx,
169 `DELETE FROM repos WHERE did = ? AND rkey = ?`,
170 did, rkey,
171 )
172 if err != nil {
173 return fmt.Errorf("delete repo: %w", err)
174 }
175 return nil
176}
177
178// UpsertRepoCollaborator records (or refreshes) a
179// sh.tangled.repo.collaborator observation.
180func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error {
181 _, err := s.db.ExecContext(ctx,
182 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at)
183 VALUES (?, ?, ?, ?, ?, ?)
184 ON CONFLICT(did, rkey) DO UPDATE SET
185 repo = excluded.repo,
186 repo_did = excluded.repo_did,
187 subject = excluded.subject,
188 created_at = excluded.created_at`,
189 did, rkey, repo, repoDid, subject, createdAt,
190 )
191 if err != nil {
192 return fmt.Errorf("upsert repo_collaborator: %w", err)
193 }
194 return nil
195}
196
197// KnotsForSpindle returns the distinct knot hostnames of all repos that
198// have declared the given spindle hostname as their CI spindle. The knot
199// event-stream subscriber uses this to decide which knots to dial.
200//
201// Returns an empty slice (not nil) when nothing matches, so callers can
202// range over the result without a nil check.
203func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) {
204 rows, err := s.db.QueryContext(ctx,
205 `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`,
206 hostname,
207 )
208 if err != nil {
209 return nil, fmt.Errorf("query knots: %w", err)
210 }
211 defer rows.Close()
212
213 out := []string{}
214 for rows.Next() {
215 var k string
216 if err := rows.Scan(&k); err != nil {
217 return nil, fmt.Errorf("scan knot: %w", err)
218 }
219 out = append(out, k)
220 }
221 if err := rows.Err(); err != nil {
222 return nil, fmt.Errorf("iterate knots: %w", err)
223 }
224 return out, nil
225}
226
227// DeleteRepoCollaborator removes a collaborator record by its ATProto
228// identity.
229func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
230 _, err := s.db.ExecContext(ctx,
231 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`,
232 did, rkey,
233 )
234 if err != nil {
235 return fmt.Errorf("delete repo_collaborator: %w", err)
236 }
237 return nil
238}