Stitch any CI into Tangled
1package main
2
3// Schema definition and migration logic for the SQLite store. Pulled out
4// of store.go so the big SQL block doesn't sit in the middle of the
5// runtime API surface.
6
7import (
8 "context"
9 "fmt"
10 "strings"
11 "time"
12)
13
14// schema is the full set of CREATE statements applied at startup. It is
15// idempotent and additive only — no `DROP`s — so future changes can be
16// layered on as additional statements without needing a separate
17// migration tool until the project actually outgrows that.
18const schema = `
19CREATE TABLE IF NOT EXISTS meta (
20 key TEXT PRIMARY KEY,
21 value TEXT NOT NULL
22);
23
24-- Records of sh.tangled.spindle.member. The owner of a spindle publishes
25-- one of these per authorized member. (did, rkey) is the natural ATProto
26-- key — did identifies the publisher's PDS, rkey identifies the record
27-- within that PDS's collection.
28CREATE TABLE IF NOT EXISTS spindle_members (
29 did TEXT NOT NULL,
30 rkey TEXT NOT NULL,
31 instance TEXT NOT NULL,
32 subject TEXT NOT NULL,
33 created_at TEXT NOT NULL,
34 PRIMARY KEY (did, rkey)
35);
36
37-- Records of sh.tangled.repo. We keep the full set so that when a
38-- pipeline trigger arrives we can look up which knot/spindle/repo_did
39-- it corresponds to without another round-trip.
40CREATE TABLE IF NOT EXISTS repos (
41 did TEXT NOT NULL,
42 rkey TEXT NOT NULL,
43 knot TEXT NOT NULL,
44 name TEXT NOT NULL,
45 spindle TEXT,
46 repo_did TEXT,
47 created_at TEXT NOT NULL,
48 PRIMARY KEY (did, rkey)
49);
50
51-- Records of sh.tangled.repo.collaborator. Used together with repos to
52-- decide whether a triggering DID is allowed to push builds to us.
53CREATE TABLE IF NOT EXISTS repo_collaborators (
54 did TEXT NOT NULL,
55 rkey TEXT NOT NULL,
56 repo TEXT,
57 repo_did TEXT,
58 subject TEXT NOT NULL,
59 created_at TEXT NOT NULL,
60 PRIMARY KEY (did, rkey)
61);
62
63-- Outbound event log. Each row is one record we want to fan out to
64-- connected /events websocket subscribers (typically the Tangled
65-- appview) — today only sh.tangled.pipeline.status.
66--
67-- We persist instead of pushing through an in-memory channel so that
68-- (a) a reconnecting subscriber can resume from a cursor without
69-- missing events that happened during the gap, and
70-- (b) slow subscribers can't make us drop events for fast ones — they
71-- simply lag behind in the rowid space.
72--
73-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids
74-- strictly increase and never get reused if a row is ever deleted, so
75-- treating the created column as a monotonic cursor is safe forever.
76CREATE TABLE IF NOT EXISTS events (
77 created INTEGER PRIMARY KEY AUTOINCREMENT,
78 rkey TEXT NOT NULL,
79 nsid TEXT NOT NULL,
80 event_json TEXT NOT NULL,
81 inserted_at TEXT NOT NULL
82);
83
84-- Mapping from a Buildkite build back to the Tangled pipeline that
85-- spawned it. The Buildkite webhook receiver only knows the build
86-- UUID; everything we need to publish a pipeline.status record
87-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives
88-- on this row.
89--
90-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the
91-- webhook handler doesn't have to recompute the at:// string on
92-- every event — it's a constant for the lifetime of the build and
93-- the webhook is the hot path for status fan-out.
94--
95-- The (knot, pipeline_rkey, workflow) index supports the /logs
96-- handler, which only knows that tuple at request time.
97-- org is the Buildkite organisation slug the build was created
98-- against. A workflow's YAML can override the spindle's default
99-- org via tack.buildkite.org, so we persist whatever was used at
100-- Spawn time and read it back when fetching jobs/logs. The empty
101-- string means "use the provider's defaultOrg" — that's both the
102-- usual single-org case and what every row written before this
103-- column existed will scan as.
104-- created_unix_ns is the monotonic recency key. created_at is kept
105-- (RFC3339Nano text) for human-readable inspection, but it must NOT
106-- be used for ordering: text comparison of nanosecond timestamps is
107-- not reliable, which used to make /logs occasionally resolve the
108-- wrong run. created_unix_ns is the integer the latest-build lookup
109-- sorts on instead.
110CREATE TABLE IF NOT EXISTS buildkite_builds (
111 build_uuid TEXT PRIMARY KEY,
112 build_number INTEGER NOT NULL,
113 pipeline_slug TEXT NOT NULL,
114 org TEXT NOT NULL DEFAULT '',
115 knot TEXT NOT NULL,
116 pipeline_rkey TEXT NOT NULL,
117 workflow TEXT NOT NULL,
118 pipeline_uri TEXT NOT NULL,
119 created_at TEXT NOT NULL,
120 created_unix_ns INTEGER NOT NULL DEFAULT 0
121);
122CREATE INDEX IF NOT EXISTS buildkite_builds_lookup
123 ON buildkite_builds (knot, pipeline_rkey, workflow);
124`
125
126// migrate applies the schema. Safe to call repeatedly.
127//
128// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it
129// won't widen an already-existing table. Columns added after the
130// initial release therefore need a parallel ALTER TABLE step here;
131// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and
132// swallow the "duplicate column" error that fires on subsequent
133// startups. Anything else is fatal.
134func (s *store) migrate(ctx context.Context) error {
135 if _, err := s.db.ExecContext(ctx, schema); err != nil {
136 return fmt.Errorf("apply schema: %w", err)
137 }
138 for _, alter := range []string{
139 // Persist the Buildkite org each build was created
140 // against so /logs can target the same org the workflow
141 // chose. Pre-existing rows scan as empty string, which
142 // the provider treats as "use defaultOrg".
143 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`,
144
145 // Monotonic integer ordering key for buildkite_builds.
146 // Replaces ORDER BY created_at (RFC3339Nano text), whose
147 // lexical order isn't reliable across nanosecond precision
148 // and could make /logs resolve the wrong run. Default 0
149 // covers pre-existing rows; the backfill below promotes
150 // them to their parsed timestamp so ordering stays stable
151 // across the upgrade.
152 `ALTER TABLE buildkite_builds ADD COLUMN created_unix_ns INTEGER NOT NULL DEFAULT 0`,
153 } {
154 if _, err := s.db.ExecContext(ctx, alter); err != nil {
155 if strings.Contains(err.Error(), "duplicate column name") {
156 continue
157 }
158 return fmt.Errorf("apply alter %q: %w", alter, err)
159 }
160 }
161
162 if err := s.backfillBuildkiteCreatedUnixNS(ctx); err != nil {
163 return fmt.Errorf("backfill buildkite created_unix_ns: %w", err)
164 }
165 return nil
166}
167
168// backfillBuildkiteCreatedUnixNS walks every buildkite_builds row whose
169// created_unix_ns is still the post-ALTER default (0) and sets it from
170// the RFC3339Nano text in created_at. SQLite has no native nanosecond
171// parser, so the conversion has to happen in Go.
172//
173// Rows whose created_at can't be parsed are left at 0; that keeps a
174// single corrupt row from blocking startup, and ordering between two
175// 0-keyed rows still falls through to created_at as a deterministic
176// tiebreaker in the lookup query.
177func (s *store) backfillBuildkiteCreatedUnixNS(ctx context.Context) error {
178 rows, err := s.db.QueryContext(ctx,
179 `SELECT build_uuid, created_at FROM buildkite_builds
180 WHERE created_unix_ns = 0`,
181 )
182 if err != nil {
183 return fmt.Errorf("query rows to backfill: %w", err)
184 }
185 type pending struct {
186 uuid string
187 ns int64
188 }
189 var todo []pending
190 for rows.Next() {
191 var uuid, createdAt string
192 if err := rows.Scan(&uuid, &createdAt); err != nil {
193 rows.Close()
194 return fmt.Errorf("scan row: %w", err)
195 }
196 t, perr := time.Parse(time.RFC3339Nano, createdAt)
197 if perr != nil {
198 // Skip unparseable rows rather than failing the whole
199 // migration. The lookup query still has a fallback
200 // ordering for rows that share the default 0 key.
201 continue
202 }
203 todo = append(todo, pending{uuid: uuid, ns: t.UnixNano()})
204 }
205 if err := rows.Err(); err != nil {
206 rows.Close()
207 return fmt.Errorf("iterate rows: %w", err)
208 }
209 rows.Close()
210
211 for _, p := range todo {
212 if _, err := s.db.ExecContext(ctx,
213 `UPDATE buildkite_builds SET created_unix_ns = ?
214 WHERE build_uuid = ?`,
215 p.ns, p.uuid,
216 ); err != nil {
217 return fmt.Errorf("update row %q: %w", p.uuid, err)
218 }
219 }
220 return nil
221}