Stitch any CI into Tangled
1package main
2
3// Schema definition and migration logic for the SQLite store. Pulled out
4// of store.go so the big SQL block doesn't sit in the middle of the
5// runtime API surface.
6
7import (
8 "context"
9 "fmt"
10 "strings"
11)
12
13// schema is the full set of CREATE statements applied at startup. It is
14// idempotent and additive only — no `DROP`s — so future changes can be
15// layered on as additional statements without needing a separate
16// migration tool until the project actually outgrows that.
17const schema = `
18CREATE TABLE IF NOT EXISTS meta (
19 key TEXT PRIMARY KEY,
20 value TEXT NOT NULL
21);
22
23-- Records of sh.tangled.spindle.member. The owner of a spindle publishes
24-- one of these per authorized member. (did, rkey) is the natural ATProto
25-- key — did identifies the publisher's PDS, rkey identifies the record
26-- within that PDS's collection.
27CREATE TABLE IF NOT EXISTS spindle_members (
28 did TEXT NOT NULL,
29 rkey TEXT NOT NULL,
30 instance TEXT NOT NULL,
31 subject TEXT NOT NULL,
32 created_at TEXT NOT NULL,
33 PRIMARY KEY (did, rkey)
34);
35
36-- Records of sh.tangled.repo. We keep the full set so that when a
37-- pipeline trigger arrives we can look up which knot/spindle/repo_did
38-- it corresponds to without another round-trip.
39CREATE TABLE IF NOT EXISTS repos (
40 did TEXT NOT NULL,
41 rkey TEXT NOT NULL,
42 knot TEXT NOT NULL,
43 name TEXT NOT NULL,
44 spindle TEXT,
45 repo_did TEXT,
46 created_at TEXT NOT NULL,
47 PRIMARY KEY (did, rkey)
48);
49
50-- Records of sh.tangled.repo.collaborator. Used together with repos to
51-- decide whether a triggering DID is allowed to push builds to us.
52CREATE TABLE IF NOT EXISTS repo_collaborators (
53 did TEXT NOT NULL,
54 rkey TEXT NOT NULL,
55 repo TEXT,
56 repo_did TEXT,
57 subject TEXT NOT NULL,
58 created_at TEXT NOT NULL,
59 PRIMARY KEY (did, rkey)
60);
61
62-- Outbound event log. Each row is one record we want to fan out to
63-- connected /events websocket subscribers (typically the Tangled
64-- appview) — today only sh.tangled.pipeline.status.
65--
66-- We persist instead of pushing through an in-memory channel so that
67-- (a) a reconnecting subscriber can resume from a cursor without
68-- missing events that happened during the gap, and
69-- (b) slow subscribers can't make us drop events for fast ones — they
70-- simply lag behind in the rowid space.
71--
72-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids
73-- strictly increase and never get reused if a row is ever deleted, so
74-- treating the created column as a monotonic cursor is safe forever.
75CREATE TABLE IF NOT EXISTS events (
76 created INTEGER PRIMARY KEY AUTOINCREMENT,
77 rkey TEXT NOT NULL,
78 nsid TEXT NOT NULL,
79 event_json TEXT NOT NULL,
80 inserted_at TEXT NOT NULL
81);
82
83-- Mapping from a Buildkite build back to the Tangled pipeline that
84-- spawned it. The Buildkite webhook receiver only knows the build
85-- UUID; everything we need to publish a pipeline.status record
86-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives
87-- on this row.
88--
89-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the
90-- webhook handler doesn't have to recompute the at:// string on
91-- every event — it's a constant for the lifetime of the build and
92-- the webhook is the hot path for status fan-out.
93--
94-- The (knot, pipeline_rkey, workflow) index supports the /logs
95-- handler, which only knows that tuple at request time.
96-- org is the Buildkite organisation slug the build was created
97-- against. A workflow's YAML can override the spindle's default
98-- org via tack.buildkite.org, so we persist whatever was used at
99-- Spawn time and read it back when fetching jobs/logs. The empty
100-- string means "use the provider's defaultOrg" — that's both the
101-- usual single-org case and what every row written before this
102-- column existed will scan as.
103CREATE TABLE IF NOT EXISTS buildkite_builds (
104 build_uuid TEXT PRIMARY KEY,
105 build_number INTEGER NOT NULL,
106 pipeline_slug TEXT NOT NULL,
107 org TEXT NOT NULL DEFAULT '',
108 knot TEXT NOT NULL,
109 pipeline_rkey TEXT NOT NULL,
110 workflow TEXT NOT NULL,
111 pipeline_uri TEXT NOT NULL,
112 created_at TEXT NOT NULL
113);
114CREATE INDEX IF NOT EXISTS buildkite_builds_lookup
115 ON buildkite_builds (knot, pipeline_rkey, workflow);
116`
117
118// migrate applies the schema. Safe to call repeatedly.
119//
120// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it
121// won't widen an already-existing table. Columns added after the
122// initial release therefore need a parallel ALTER TABLE step here;
123// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and
124// swallow the "duplicate column" error that fires on subsequent
125// startups. Anything else is fatal.
126func (s *store) migrate(ctx context.Context) error {
127 if _, err := s.db.ExecContext(ctx, schema); err != nil {
128 return fmt.Errorf("apply schema: %w", err)
129 }
130 for _, alter := range []string{
131 // Persist the Buildkite org each build was created
132 // against so /logs can target the same org the workflow
133 // chose. Pre-existing rows scan as empty string, which
134 // the provider treats as "use defaultOrg".
135 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`,
136 } {
137 if _, err := s.db.ExecContext(ctx, alter); err != nil {
138 if strings.Contains(err.Error(), "duplicate column name") {
139 continue
140 }
141 return fmt.Errorf("apply alter %q: %w", alter, err)
142 }
143 }
144 return nil
145}