Stitch any CI into Tangled
1package main
2
3// SQLite-backed persistence for tack.
4//
5// Two responsibilities live here:
6//
7// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto
8// firehose uses to resume from a specific point. Without persistence
9// every restart begins at "now" and we'd silently miss any record
10// published while we were down.
11//
12// 2. Tangled membership state derived from jetstream commits:
13// sh.tangled.spindle.member, sh.tangled.repo, and
14// sh.tangled.repo.collaborator. We need this to later answer
15// "is DID X allowed to trigger a build on this spindle for repo Y?"
16//
17// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver
18// for Go. It requires CGo, which is fine for tack — the project already
19// builds under Nix where a C toolchain is readily available.
20
21import (
22 "context"
23 "database/sql"
24 "encoding/json"
25 "errors"
26 "fmt"
27 "strconv"
28 "time"
29
30 _ "github.com/mattn/go-sqlite3"
31)
32
33// store wraps the SQLite handle and exposes the small set of operations
34// the rest of tack needs. Keeping the surface narrow lets the persistence
35// layer be swapped or mocked later without rewriting callers.
36type store struct {
37 db *sql.DB
38}
39
40// openStore opens (or creates) the SQLite database at path and applies
41// the schema. It also flips on WAL + NORMAL synchronous, which is the
42// usual "this is a long-running server, not a one-shot script" config:
43// concurrent reads don't block the single writer, and we trade a little
44// crash-safety on power loss for a lot of write throughput.
45func openStore(path string) (*store, error) {
46 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode,
47 // _synchronous, _foreign_keys) and applies them on each new connection.
48 // journal_mode=WAL persists in the database file but setting it here
49 // also covers the first-ever open.
50 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path)
51 db, err := sql.Open("sqlite3", dsn)
52 if err != nil {
53 return nil, fmt.Errorf("open sqlite %q: %w", path, err)
54 }
55
56 // SQLite only supports one writer at a time. Capping max open conns
57 // at 1 keeps database/sql from spinning up extra connections that
58 // will only ever serialize behind that writer.
59 db.SetMaxOpenConns(1)
60
61 s := &store{db: db}
62 if err := s.migrate(context.Background()); err != nil {
63 _ = db.Close()
64 return nil, err
65 }
66 return s, nil
67}
68
69// Close releases the underlying database handle.
70func (s *store) Close() error {
71 return s.db.Close()
72}
73
74// metaCursorKey is the meta-table key under which we persist the
75// jetstream cursor. Pulled out as a constant to avoid drift between
76// load/save sites.
77const metaCursorKey = "jetstream_cursor"
78
79// LoadCursor returns the persisted jetstream cursor, or nil if none has
80// been saved yet (signaling "start from now"). The cursor is a unix
81// microsecond timestamp — see jetstream.Event.TimeUS.
82func (s *store) LoadCursor(ctx context.Context) (*int64, error) {
83 var raw string
84 err := s.db.QueryRowContext(ctx,
85 `SELECT value FROM meta WHERE key = ?`, metaCursorKey,
86 ).Scan(&raw)
87 if errors.Is(err, sql.ErrNoRows) {
88 return nil, nil
89 }
90 if err != nil {
91 return nil, fmt.Errorf("load cursor: %w", err)
92 }
93 v, err := strconv.ParseInt(raw, 10, 64)
94 if err != nil {
95 // A malformed cursor shouldn't wedge startup — log-and-ignore
96 // (return nil) so we just resume from "now". The caller has the
97 // logger; we surface the parse error so it can decide.
98 return nil, fmt.Errorf("parse cursor %q: %w", raw, err)
99 }
100 return &v, nil
101}
102
103// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta
104// row is created on first call and updated thereafter.
105func (s *store) SaveCursor(ctx context.Context, cursor int64) error {
106 _, err := s.db.ExecContext(ctx,
107 `INSERT INTO meta (key, value) VALUES (?, ?)
108 ON CONFLICT(key) DO UPDATE SET value = excluded.value`,
109 metaCursorKey, strconv.FormatInt(cursor, 10),
110 )
111 if err != nil {
112 return fmt.Errorf("save cursor: %w", err)
113 }
114 return nil
115}
116
117// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member
118// observation.
119func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error {
120 _, err := s.db.ExecContext(ctx,
121 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at)
122 VALUES (?, ?, ?, ?, ?)
123 ON CONFLICT(did, rkey) DO UPDATE SET
124 instance = excluded.instance,
125 subject = excluded.subject,
126 created_at = excluded.created_at`,
127 did, rkey, instance, subject, createdAt,
128 )
129 if err != nil {
130 return fmt.Errorf("upsert spindle_member: %w", err)
131 }
132 return nil
133}
134
135// DeleteSpindleMember removes a member record by its ATProto identity.
136func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error {
137 _, err := s.db.ExecContext(ctx,
138 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`,
139 did, rkey,
140 )
141 if err != nil {
142 return fmt.Errorf("delete spindle_member: %w", err)
143 }
144 return nil
145}
146
147// UpsertRepo records (or refreshes) a sh.tangled.repo observation.
148// spindle and repoDid may be empty strings — we store them as such rather
149// than as SQL NULLs to keep the upsert path uniform.
150func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error {
151 _, err := s.db.ExecContext(ctx,
152 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at)
153 VALUES (?, ?, ?, ?, ?, ?, ?)
154 ON CONFLICT(did, rkey) DO UPDATE SET
155 knot = excluded.knot,
156 name = excluded.name,
157 spindle = excluded.spindle,
158 repo_did = excluded.repo_did,
159 created_at = excluded.created_at`,
160 did, rkey, knot, name, spindle, repoDid, createdAt,
161 )
162 if err != nil {
163 return fmt.Errorf("upsert repo: %w", err)
164 }
165 return nil
166}
167
168// DeleteRepo removes a repo record by its ATProto identity.
169func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error {
170 _, err := s.db.ExecContext(ctx,
171 `DELETE FROM repos WHERE did = ? AND rkey = ?`,
172 did, rkey,
173 )
174 if err != nil {
175 return fmt.Errorf("delete repo: %w", err)
176 }
177 return nil
178}
179
180// UpsertRepoCollaborator records (or refreshes) a
181// sh.tangled.repo.collaborator observation.
182func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error {
183 _, err := s.db.ExecContext(ctx,
184 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at)
185 VALUES (?, ?, ?, ?, ?, ?)
186 ON CONFLICT(did, rkey) DO UPDATE SET
187 repo = excluded.repo,
188 repo_did = excluded.repo_did,
189 subject = excluded.subject,
190 created_at = excluded.created_at`,
191 did, rkey, repo, repoDid, subject, createdAt,
192 )
193 if err != nil {
194 return fmt.Errorf("upsert repo_collaborator: %w", err)
195 }
196 return nil
197}
198
199// GetRepo returns the (knot, spindle) currently stored for a (did, rkey)
200// pair. Both are returned as empty strings when no row exists; callers
201// that need to distinguish "absent" from "stored but empty" should
202// pre-check existence themselves.
203//
204// This exists so applyRepo can read the *previous* spindle/knot of a
205// record before applying a mutation, which is what makes it possible to
206// detect transitions like "this repo used to be ours, now it isn't" and
207// trigger a knot unsubscribe.
208func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) {
209 err = s.db.QueryRowContext(ctx,
210 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`,
211 did, rkey,
212 ).Scan(&knot, &spindle)
213 if errors.Is(err, sql.ErrNoRows) {
214 return "", "", nil
215 }
216 if err != nil {
217 return "", "", fmt.Errorf("get repo: %w", err)
218 }
219 return knot, spindle, nil
220}
221
222// IsKnotWanted reports whether any repo currently stored still names the
223// given hostname as its spindle and the given knot as its host. After a
224// repo update or delete this is the question we ask to decide whether
225// to keep watching that knot or unsubscribe from it.
226func (s *store) IsKnotWanted(ctx context.Context, hostname, knot string) (bool, error) {
227 var n int
228 err := s.db.QueryRowContext(ctx,
229 `SELECT COUNT(*) FROM repos WHERE spindle = ? AND knot = ?`,
230 hostname, knot,
231 ).Scan(&n)
232 if err != nil {
233 return false, fmt.Errorf("count repos for knot: %w", err)
234 }
235 return n > 0, nil
236}
237
238// KnotsForSpindle returns the distinct knot hostnames of all repos that
239// have declared the given spindle hostname as their CI spindle. The knot
240// event-stream subscriber uses this to decide which knots to dial.
241//
242// Returns an empty slice (not nil) when nothing matches, so callers can
243// range over the result without a nil check.
244func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) {
245 rows, err := s.db.QueryContext(ctx,
246 `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`,
247 hostname,
248 )
249 if err != nil {
250 return nil, fmt.Errorf("query knots: %w", err)
251 }
252 defer rows.Close()
253
254 out := []string{}
255 for rows.Next() {
256 var k string
257 if err := rows.Scan(&k); err != nil {
258 return nil, fmt.Errorf("scan knot: %w", err)
259 }
260 out = append(out, k)
261 }
262 if err := rows.Err(); err != nil {
263 return nil, fmt.Errorf("iterate knots: %w", err)
264 }
265 return out, nil
266}
267
268// DeleteRepoCollaborator removes a collaborator record by its ATProto
269// identity.
270func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
271 _, err := s.db.ExecContext(ctx,
272 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`,
273 did, rkey,
274 )
275 if err != nil {
276 return fmt.Errorf("delete repo_collaborator: %w", err)
277 }
278 return nil
279}
280
281// EventRow is one row of the events table. It represents an outbound
282// record we want to deliver to /events websocket subscribers, in the
283// shape callers actually need (raw record JSON, not stringly-typed).
284type EventRow struct {
285 // Created is the assigned monotonic rowid; doubles as the cursor
286 // value subscribers use to resume.
287 Created int64
288 // Rkey is the ATProto record key. For sh.tangled.pipeline.status
289 // records this is the rkey we mint when publishing.
290 Rkey string
291 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status).
292 Nsid string
293 // EventJSON is the record body verbatim — held as RawMessage so
294 // the /events handler can splice it into the wire envelope without
295 // an unmarshal/remarshal round-trip.
296 EventJSON json.RawMessage
297}
298
299// InsertEvent appends an event row and returns its assigned `created`
300// (rowid) cursor. Storage is the source of truth for fan-out, so we
301// write here even if zero subscribers are connected — a subscriber that
302// connects later (with an old cursor) will pick the row up via
303// EventsAfter.
304//
305// eventJSON must be a valid JSON object; we store it verbatim. Length
306// validation is intentionally absent — the schema accepts arbitrary
307// TEXT and SQLite handles huge blobs fine for our scale.
308func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
309 res, err := s.db.ExecContext(ctx,
310 `INSERT INTO events (rkey, nsid, event_json, inserted_at)
311 VALUES (?, ?, ?, ?)`,
312 rkey, nsid, string(eventJSON),
313 time.Now().UTC().Format(time.RFC3339Nano),
314 )
315 if err != nil {
316 return 0, fmt.Errorf("insert event: %w", err)
317 }
318 id, err := res.LastInsertId()
319 if err != nil {
320 return 0, fmt.Errorf("event last insert id: %w", err)
321 }
322 return id, nil
323}
324
325// BuildkiteBuildRef is the persisted mapping from one Buildkite build
326// to the Tangled pipeline tuple that spawned it. It's the row written
327// by the Buildkite provider at Spawn time and read back from two
328// places: the webhook handler (by build UUID) when an event arrives,
329// and the /logs handler (by knot+rkey+workflow) when an appview
330// client asks for output.
331type BuildkiteBuildRef struct {
332 BuildUUID string
333 BuildNumber int64
334 PipelineSlug string
335 Knot string
336 PipelineRkey string
337 Workflow string
338 PipelineURI string
339}
340
341// InsertBuildkiteBuild records that a Buildkite build was created on
342// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses
343// INSERT OR REPLACE so that an unlikely build-uuid collision (or a
344// Buildkite-side rebuild that re-fires us) just refreshes the row
345// instead of failing.
346func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error {
347 _, err := s.db.ExecContext(ctx,
348 `INSERT INTO buildkite_builds (
349 build_uuid, build_number, pipeline_slug,
350 knot, pipeline_rkey, workflow,
351 pipeline_uri, created_at
352 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
353 ON CONFLICT(build_uuid) DO UPDATE SET
354 build_number = excluded.build_number,
355 pipeline_slug = excluded.pipeline_slug,
356 knot = excluded.knot,
357 pipeline_rkey = excluded.pipeline_rkey,
358 workflow = excluded.workflow,
359 pipeline_uri = excluded.pipeline_uri,
360 created_at = excluded.created_at`,
361 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug,
362 ref.Knot, ref.PipelineRkey, ref.Workflow,
363 ref.PipelineURI, time.Now().UTC().Format(time.RFC3339Nano),
364 )
365 if err != nil {
366 return fmt.Errorf("insert buildkite_build: %w", err)
367 }
368 return nil
369}
370
371// LookupBuildkiteBuildByUUID returns the saved mapping for the given
372// Buildkite build UUID, or nil when no such build is recorded.
373// Returning a nil pointer rather than a sentinel error keeps the
374// webhook handler's "we don't know about this build" branch a simple
375// nil check.
376func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) {
377 var ref BuildkiteBuildRef
378 err := s.db.QueryRowContext(ctx,
379 `SELECT build_uuid, build_number, pipeline_slug,
380 knot, pipeline_rkey, workflow, pipeline_uri
381 FROM buildkite_builds WHERE build_uuid = ?`,
382 buildUUID,
383 ).Scan(
384 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug,
385 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
386 )
387 if errors.Is(err, sql.ErrNoRows) {
388 return nil, nil
389 }
390 if err != nil {
391 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err)
392 }
393 return &ref, nil
394}
395
396// LookupBuildkiteBuildByTuple finds the most recently created build
397// for (knot, pipelineRkey, workflow). Returns nil when no build has
398// been recorded for that tuple — used by /logs to translate the
399// appview's path-based identity back into something Buildkite knows.
400//
401// "Most recent" matters because a workflow may have multiple builds
402// over time (rebuilds, re-triggers). We always serve logs for the
403// latest run; older runs are still queryable by build UUID directly
404// if anyone ever wants that.
405func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) {
406 var ref BuildkiteBuildRef
407 err := s.db.QueryRowContext(ctx,
408 `SELECT build_uuid, build_number, pipeline_slug,
409 knot, pipeline_rkey, workflow, pipeline_uri
410 FROM buildkite_builds
411 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?
412 ORDER BY created_at DESC
413 LIMIT 1`,
414 knot, pipelineRkey, workflow,
415 ).Scan(
416 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug,
417 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
418 )
419 if errors.Is(err, sql.ErrNoRows) {
420 return nil, nil
421 }
422 if err != nil {
423 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err)
424 }
425 return &ref, nil
426}
427
428// EventsAfter returns every event row with `created` strictly greater
429// than cursor, in cursor order. Used by /events to backfill a
430// reconnecting subscriber and to drain newly-published rows on each
431// broker notification.
432//
433// Pass cursor=0 to get the full log from the beginning, which is what
434// happens when a subscriber connects without a ?cursor= query param.
435func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) {
436 rows, err := s.db.QueryContext(ctx,
437 `SELECT created, rkey, nsid, event_json
438 FROM events
439 WHERE created > ?
440 ORDER BY created ASC`,
441 cursor,
442 )
443 if err != nil {
444 return nil, fmt.Errorf("query events: %w", err)
445 }
446 defer rows.Close()
447
448 out := []EventRow{}
449 for rows.Next() {
450 var (
451 ev EventRow
452 raw string
453 )
454 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil {
455 return nil, fmt.Errorf("scan event: %w", err)
456 }
457 ev.EventJSON = json.RawMessage(raw)
458 out = append(out, ev)
459 }
460 if err := rows.Err(); err != nil {
461 return nil, fmt.Errorf("iterate events: %w", err)
462 }
463 return out, nil
464}