Stitch any CI into Tangled
1package main
2
3// SQLite-backed persistence for tack.
4//
5// Two responsibilities live here:
6//
7// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto
8// firehose uses to resume from a specific point. Without persistence
9// every restart begins at "now" and we'd silently miss any record
10// published while we were down.
11//
12// 2. Tangled membership state derived from jetstream commits:
13// sh.tangled.spindle.member, sh.tangled.repo, and
14// sh.tangled.repo.collaborator. We need this to later answer
15// "is DID X allowed to trigger a build on this spindle for repo Y?"
16//
17// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver
18// for Go. It requires CGo, which is fine for tack — the project already
19// builds under Nix where a C toolchain is readily available.
20
21import (
22 "context"
23 "database/sql"
24 "encoding/json"
25 "errors"
26 "fmt"
27 "strconv"
28 "time"
29
30 _ "github.com/mattn/go-sqlite3"
31)
32
33// store wraps the SQLite handle and exposes the small set of operations
34// the rest of tack needs. Keeping the surface narrow lets the persistence
35// layer be swapped or mocked later without rewriting callers.
36type store struct {
37 db *sql.DB
38}
39
40// openStore opens (or creates) the SQLite database at path and applies
41// the schema. It also flips on WAL + NORMAL synchronous, which is the
42// usual "this is a long-running server, not a one-shot script" config:
43// concurrent reads don't block the single writer, and we trade a little
44// crash-safety on power loss for a lot of write throughput.
45func openStore(path string) (*store, error) {
46 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode,
47 // _synchronous, _foreign_keys) and applies them on each new connection.
48 // journal_mode=WAL persists in the database file but setting it here
49 // also covers the first-ever open.
50 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path)
51 db, err := sql.Open("sqlite3", dsn)
52 if err != nil {
53 return nil, fmt.Errorf("open sqlite %q: %w", path, err)
54 }
55
56 // SQLite only supports one writer at a time. Capping max open conns
57 // at 1 keeps database/sql from spinning up extra connections that
58 // will only ever serialize behind that writer.
59 db.SetMaxOpenConns(1)
60
61 s := &store{db: db}
62 if err := s.migrate(context.Background()); err != nil {
63 _ = db.Close()
64 return nil, err
65 }
66 return s, nil
67}
68
69// Close releases the underlying database handle.
70func (s *store) Close() error {
71 return s.db.Close()
72}
73
74// metaCursorKey is the meta-table key under which we persist the
75// jetstream cursor. Pulled out as a constant to avoid drift between
76// load/save sites.
77const metaCursorKey = "jetstream_cursor"
78
79// LoadCursor returns the persisted jetstream cursor, or nil if none has
80// been saved yet (signaling "start from now"). The cursor is a unix
81// microsecond timestamp — see jetstream.Event.TimeUS.
82func (s *store) LoadCursor(ctx context.Context) (*int64, error) {
83 var raw string
84 err := s.db.QueryRowContext(ctx,
85 `SELECT value FROM meta WHERE key = ?`, metaCursorKey,
86 ).Scan(&raw)
87 if errors.Is(err, sql.ErrNoRows) {
88 return nil, nil
89 }
90 if err != nil {
91 return nil, fmt.Errorf("load cursor: %w", err)
92 }
93 v, err := strconv.ParseInt(raw, 10, 64)
94 if err != nil {
95 // A malformed cursor shouldn't wedge startup — log-and-ignore
96 // (return nil) so we just resume from "now". The caller has the
97 // logger; we surface the parse error so it can decide.
98 return nil, fmt.Errorf("parse cursor %q: %w", raw, err)
99 }
100 return &v, nil
101}
102
103// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta
104// row is created on first call and updated thereafter.
105func (s *store) SaveCursor(ctx context.Context, cursor int64) error {
106 _, err := s.db.ExecContext(ctx,
107 `INSERT INTO meta (key, value) VALUES (?, ?)
108 ON CONFLICT(key) DO UPDATE SET value = excluded.value`,
109 metaCursorKey, strconv.FormatInt(cursor, 10),
110 )
111 if err != nil {
112 return fmt.Errorf("save cursor: %w", err)
113 }
114 return nil
115}
116
117// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member
118// observation.
119func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error {
120 _, err := s.db.ExecContext(ctx,
121 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at)
122 VALUES (?, ?, ?, ?, ?)
123 ON CONFLICT(did, rkey) DO UPDATE SET
124 instance = excluded.instance,
125 subject = excluded.subject,
126 created_at = excluded.created_at`,
127 did, rkey, instance, subject, createdAt,
128 )
129 if err != nil {
130 return fmt.Errorf("upsert spindle_member: %w", err)
131 }
132 return nil
133}
134
135// DeleteSpindleMember removes a member record by its ATProto identity.
136func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error {
137 _, err := s.db.ExecContext(ctx,
138 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`,
139 did, rkey,
140 )
141 if err != nil {
142 return fmt.Errorf("delete spindle_member: %w", err)
143 }
144 return nil
145}
146
147// UpsertRepo records (or refreshes) a sh.tangled.repo observation.
148// spindle and repoDid may be empty strings — we store them as such rather
149// than as SQL NULLs to keep the upsert path uniform.
150func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error {
151 _, err := s.db.ExecContext(ctx,
152 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at)
153 VALUES (?, ?, ?, ?, ?, ?, ?)
154 ON CONFLICT(did, rkey) DO UPDATE SET
155 knot = excluded.knot,
156 name = excluded.name,
157 spindle = excluded.spindle,
158 repo_did = excluded.repo_did,
159 created_at = excluded.created_at`,
160 did, rkey, knot, name, spindle, repoDid, createdAt,
161 )
162 if err != nil {
163 return fmt.Errorf("upsert repo: %w", err)
164 }
165 return nil
166}
167
168// DeleteRepo removes a repo record by its ATProto identity.
169func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error {
170 _, err := s.db.ExecContext(ctx,
171 `DELETE FROM repos WHERE did = ? AND rkey = ?`,
172 did, rkey,
173 )
174 if err != nil {
175 return fmt.Errorf("delete repo: %w", err)
176 }
177 return nil
178}
179
180// UpsertRepoCollaborator records (or refreshes) a
181// sh.tangled.repo.collaborator observation.
182func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error {
183 _, err := s.db.ExecContext(ctx,
184 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at)
185 VALUES (?, ?, ?, ?, ?, ?)
186 ON CONFLICT(did, rkey) DO UPDATE SET
187 repo = excluded.repo,
188 repo_did = excluded.repo_did,
189 subject = excluded.subject,
190 created_at = excluded.created_at`,
191 did, rkey, repo, repoDid, subject, createdAt,
192 )
193 if err != nil {
194 return fmt.Errorf("upsert repo_collaborator: %w", err)
195 }
196 return nil
197}
198
199// GetRepo returns the (knot, spindle) currently stored for a (did, rkey)
200// pair. Both are returned as empty strings when no row exists; callers
201// that need to distinguish "absent" from "stored but empty" should
202// pre-check existence themselves.
203//
204// This exists so applyRepo can read the *previous* spindle/knot of a
205// record before applying a mutation, which is what makes it possible to
206// detect transitions like "this repo used to be ours, now it isn't" and
207// trigger a knot unsubscribe.
208func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) {
209 err = s.db.QueryRowContext(ctx,
210 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`,
211 did, rkey,
212 ).Scan(&knot, &spindle)
213 if errors.Is(err, sql.ErrNoRows) {
214 return "", "", nil
215 }
216 if err != nil {
217 return "", "", fmt.Errorf("get repo: %w", err)
218 }
219 return knot, spindle, nil
220}
221
222// AuthorizePipelineActor reports whether a pipeline trigger that
223// arrived on knot for repo (repoOwnerDID, repoName) should be allowed
224// to spawn work on this spindle. Both halves of the answer are
225// derived from the persisted Tangled state in spindle_members and
226// repos: the network's own authorization records, mirrored here from
227// the firehose by jetstream.go.
228//
229// The check is two independent gates; both must hold:
230//
231// 1. **Repo claim**: a sh.tangled.repo record exists naming
232// hostname as its CI spindle AND knot as its host knot,
233// published by repoOwnerDID with the matching name. Without this
234// a knot we already happen to be subscribed to (because *some*
235// repo on it points at us) could otherwise smuggle in pipeline
236// triggers for unrelated repos that never opted into us.
237//
238// 2. **Actor membership**: repoOwnerDID is either the spindle owner
239// itself, or has been authorized via a sh.tangled.spindle.member
240// record published by the spindle owner. The publisher (`did`)
241// of that membership grant must equal ownerDID. Anyone can
242// publish a member record naming anyone, so trusting unsigned
243// grants would let any DID grant itself access. We do NOT match
244// against the record's `instance` column because the upstream
245// ecosystem stores it inconsistently (URL vs hostname), and a
246// tack instance only ever speaks for a single hostname anyway.
247//
248// reason is a short, log-friendly description of why authorization
249// failed when ok is false; it is empty when ok is true. Errors
250// reported here are SQL/IO failures, never policy denials; those
251// surface as ok=false with a populated reason.
252func (s *store) AuthorizePipelineActor(
253 ctx context.Context,
254 hostname, knot, ownerDID, repoOwnerDID, repoName string,
255) (ok bool, reason string, err error) {
256 // We can't authorize an unknown actor or an unidentified repo:
257 // every gate below joins on these. Bail before touching SQLite
258 // so a malformed trigger logs cleanly instead of triggering a
259 // "no rows" miss that could be confused with a real denial.
260 if repoOwnerDID == "" {
261 return false, "trigger has no repo did", nil
262 }
263 if repoName == "" {
264 return false, "trigger has no repo name", nil
265 }
266
267 // Gate 1: this specific repo opted into us on this specific knot.
268 var n int
269 if err := s.db.QueryRowContext(ctx,
270 `SELECT COUNT(*) FROM repos
271 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`,
272 repoOwnerDID, repoName, knot, hostname,
273 ).Scan(&n); err != nil {
274 return false, "", fmt.Errorf("count repo claim: %w", err)
275 }
276 if n == 0 {
277 return false, "no repo record claims this spindle on this knot", nil
278 }
279
280 // Gate 2: actor is the spindle owner, or vouched for by them.
281 if repoOwnerDID == ownerDID {
282 return true, "", nil
283 }
284 if err := s.db.QueryRowContext(ctx,
285 `SELECT COUNT(*) FROM spindle_members
286 WHERE did = ? AND subject = ?`,
287 ownerDID, repoOwnerDID,
288 ).Scan(&n); err != nil {
289 return false, "", fmt.Errorf("count membership: %w", err)
290 }
291 if n == 0 {
292 return false, "actor is not a spindle member", nil
293 }
294 return true, "", nil
295}
296
297// IsKnotWanted reports whether any repo currently stored still names the
298// given hostname as its spindle and the given knot as its host. After a
299// repo update or delete this is the question we ask to decide whether
300// to keep watching that knot or unsubscribe from it.
301func (s *store) IsKnotWanted(ctx context.Context, hostname, knot string) (bool, error) {
302 var n int
303 err := s.db.QueryRowContext(ctx,
304 `SELECT COUNT(*) FROM repos WHERE spindle = ? AND knot = ?`,
305 hostname, knot,
306 ).Scan(&n)
307 if err != nil {
308 return false, fmt.Errorf("count repos for knot: %w", err)
309 }
310 return n > 0, nil
311}
312
313// KnotsForSpindle returns the distinct knot hostnames of all repos that
314// have declared the given spindle hostname as their CI spindle. The knot
315// event-stream subscriber uses this to decide which knots to dial.
316//
317// Returns an empty slice (not nil) when nothing matches, so callers can
318// range over the result without a nil check.
319func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) {
320 rows, err := s.db.QueryContext(ctx,
321 `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`,
322 hostname,
323 )
324 if err != nil {
325 return nil, fmt.Errorf("query knots: %w", err)
326 }
327 defer rows.Close()
328
329 out := []string{}
330 for rows.Next() {
331 var k string
332 if err := rows.Scan(&k); err != nil {
333 return nil, fmt.Errorf("scan knot: %w", err)
334 }
335 out = append(out, k)
336 }
337 if err := rows.Err(); err != nil {
338 return nil, fmt.Errorf("iterate knots: %w", err)
339 }
340 return out, nil
341}
342
343// DeleteRepoCollaborator removes a collaborator record by its ATProto
344// identity.
345func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
346 _, err := s.db.ExecContext(ctx,
347 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`,
348 did, rkey,
349 )
350 if err != nil {
351 return fmt.Errorf("delete repo_collaborator: %w", err)
352 }
353 return nil
354}
355
356// EventRow is one row of the events table. It represents an outbound
357// record we want to deliver to /events websocket subscribers, in the
358// shape callers actually need (raw record JSON, not stringly-typed).
359type EventRow struct {
360 // Created is the assigned monotonic rowid; doubles as the cursor
361 // value subscribers use to resume.
362 Created int64
363 // Rkey is the ATProto record key. For sh.tangled.pipeline.status
364 // records this is the rkey we mint when publishing.
365 Rkey string
366 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status).
367 Nsid string
368 // EventJSON is the record body verbatim — held as RawMessage so
369 // the /events handler can splice it into the wire envelope without
370 // an unmarshal/remarshal round-trip.
371 EventJSON json.RawMessage
372}
373
374// InsertEvent appends an event row and returns its assigned `created`
375// (rowid) cursor. Storage is the source of truth for fan-out, so we
376// write here even if zero subscribers are connected — a subscriber that
377// connects later (with an old cursor) will pick the row up via
378// EventsAfter.
379//
380// eventJSON must be a valid JSON object; we store it verbatim. Length
381// validation is intentionally absent — the schema accepts arbitrary
382// TEXT and SQLite handles huge blobs fine for our scale.
383func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
384 res, err := s.db.ExecContext(ctx,
385 `INSERT INTO events (rkey, nsid, event_json, inserted_at)
386 VALUES (?, ?, ?, ?)`,
387 rkey, nsid, string(eventJSON),
388 time.Now().UTC().Format(time.RFC3339Nano),
389 )
390 if err != nil {
391 return 0, fmt.Errorf("insert event: %w", err)
392 }
393 id, err := res.LastInsertId()
394 if err != nil {
395 return 0, fmt.Errorf("event last insert id: %w", err)
396 }
397 return id, nil
398}
399
400// BuildkiteBuildRef is the persisted mapping from one Buildkite build
401// to the Tangled pipeline tuple that spawned it. It's the row written
402// by the Buildkite provider at Spawn time and read back from two
403// places: the webhook handler (by build UUID) when an event arrives,
404// and the /logs handler (by knot+rkey+workflow) when an appview
405// client asks for output.
406type BuildkiteBuildRef struct {
407 BuildUUID string
408 BuildNumber int64
409 PipelineSlug string
410 // Org is the Buildkite organisation slug the build was created
411 // against. Persisted at Spawn time so /logs and any other
412 // post-creation API call can target the same org the workflow
413 // originally chose — see the workflow YAML `tack.buildkite.org`
414 // override. Empty means "use the provider's default org", which
415 // is what every row written before the org column existed will
416 // scan as.
417 Org string
418 Knot string
419 PipelineRkey string
420 Workflow string
421 PipelineURI string
422}
423
424// InsertBuildkiteBuild records that a Buildkite build was created on
425// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses
426// INSERT OR REPLACE so that an unlikely build-uuid collision (or a
427// Buildkite-side rebuild that re-fires us) just refreshes the row
428// instead of failing.
429func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error {
430 // Capture wall-clock and monotonic-friendly forms once so the two
431 // columns agree on the same instant. created_at is the
432 // human-readable RFC3339Nano string; created_unix_ns is the
433 // integer the lookup orders on (text comparison of nanosecond
434 // timestamps isn't reliable, so we sort on the int instead).
435 now := time.Now().UTC()
436 _, err := s.db.ExecContext(ctx,
437 `INSERT INTO buildkite_builds (
438 build_uuid, build_number, pipeline_slug, org,
439 knot, pipeline_rkey, workflow,
440 pipeline_uri, created_at, created_unix_ns
441 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
442 ON CONFLICT(build_uuid) DO UPDATE SET
443 build_number = excluded.build_number,
444 pipeline_slug = excluded.pipeline_slug,
445 org = excluded.org,
446 knot = excluded.knot,
447 pipeline_rkey = excluded.pipeline_rkey,
448 workflow = excluded.workflow,
449 pipeline_uri = excluded.pipeline_uri,
450 created_at = excluded.created_at,
451 created_unix_ns = excluded.created_unix_ns`,
452 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org,
453 ref.Knot, ref.PipelineRkey, ref.Workflow,
454 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(),
455 )
456 if err != nil {
457 return fmt.Errorf("insert buildkite_build: %w", err)
458 }
459 return nil
460}
461
462// LookupBuildkiteBuildByUUID returns the saved mapping for the given
463// Buildkite build UUID, or nil when no such build is recorded.
464// Returning a nil pointer rather than a sentinel error keeps the
465// webhook handler's "we don't know about this build" branch a simple
466// nil check.
467func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) {
468 var ref BuildkiteBuildRef
469 err := s.db.QueryRowContext(ctx,
470 `SELECT build_uuid, build_number, pipeline_slug, org,
471 knot, pipeline_rkey, workflow, pipeline_uri
472 FROM buildkite_builds WHERE build_uuid = ?`,
473 buildUUID,
474 ).Scan(
475 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org,
476 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
477 )
478 if errors.Is(err, sql.ErrNoRows) {
479 return nil, nil
480 }
481 if err != nil {
482 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err)
483 }
484 return &ref, nil
485}
486
487// LookupBuildkiteBuildByTuple finds the most recently created build
488// for (knot, pipelineRkey, workflow). Returns nil when no build has
489// been recorded for that tuple — used by /logs to translate the
490// appview's path-based identity back into something Buildkite knows.
491//
492// "Most recent" matters because a workflow may have multiple builds
493// over time (rebuilds, re-triggers). We always serve logs for the
494// latest run; older runs are still queryable by build UUID directly
495// if anyone ever wants that.
496//
497// Ordering is on created_unix_ns (a monotonic int) rather than
498// created_at. Text comparison of RFC3339Nano timestamps is not
499// reliable across nanosecond precision, which used to make this
500// query occasionally pick the wrong run. created_at and build_number
501// are kept as deterministic tiebreakers for legacy rows that pre-date
502// the new column and still scan as 0.
503func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) {
504 var ref BuildkiteBuildRef
505 err := s.db.QueryRowContext(ctx,
506 `SELECT build_uuid, build_number, pipeline_slug, org,
507 knot, pipeline_rkey, workflow, pipeline_uri
508 FROM buildkite_builds
509 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?
510 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC
511 LIMIT 1`,
512 knot, pipelineRkey, workflow,
513 ).Scan(
514 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org,
515 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
516 )
517 if errors.Is(err, sql.ErrNoRows) {
518 return nil, nil
519 }
520 if err != nil {
521 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err)
522 }
523 return &ref, nil
524}
525
526// EventsAfter returns every event row with `created` strictly greater
527// than cursor, in cursor order. Used by /events to backfill a
528// reconnecting subscriber and to drain newly-published rows on each
529// broker notification.
530//
531// Pass cursor=0 to get the full log from the beginning, which is what
532// happens when a subscriber connects without a ?cursor= query param.
533func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) {
534 rows, err := s.db.QueryContext(ctx,
535 `SELECT created, rkey, nsid, event_json
536 FROM events
537 WHERE created > ?
538 ORDER BY created ASC`,
539 cursor,
540 )
541 if err != nil {
542 return nil, fmt.Errorf("query events: %w", err)
543 }
544 defer rows.Close()
545
546 out := []EventRow{}
547 for rows.Next() {
548 var (
549 ev EventRow
550 raw string
551 )
552 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil {
553 return nil, fmt.Errorf("scan event: %w", err)
554 }
555 ev.EventJSON = json.RawMessage(raw)
556 out = append(out, ev)
557 }
558 if err := rows.Err(); err != nil {
559 return nil, fmt.Errorf("iterate events: %w", err)
560 }
561 return out, nil
562}