Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Schema definition and migration logic for the SQLite store. Pulled out 4// of store.go so the big SQL block doesn't sit in the middle of the 5// runtime API surface. 6 7import ( 8 "context" 9 "fmt" 10 "strings" 11 "time" 12) 13 14// schema is the full set of CREATE statements applied at startup. It is 15// idempotent and additive only — no `DROP`s — so future changes can be 16// layered on as additional statements without needing a separate 17// migration tool until the project actually outgrows that. 18const schema = ` 19CREATE TABLE IF NOT EXISTS meta ( 20 key TEXT PRIMARY KEY, 21 value TEXT NOT NULL 22); 23 24-- Records of sh.tangled.spindle.member. The owner of a spindle publishes 25-- one of these per authorized member. (did, rkey) is the natural ATProto 26-- key — did identifies the publisher's PDS, rkey identifies the record 27-- within that PDS's collection. 28CREATE TABLE IF NOT EXISTS spindle_members ( 29 did TEXT NOT NULL, 30 rkey TEXT NOT NULL, 31 instance TEXT NOT NULL, 32 subject TEXT NOT NULL, 33 created_at TEXT NOT NULL, 34 PRIMARY KEY (did, rkey) 35); 36 37-- Records of sh.tangled.repo. We keep the full set so that when a 38-- pipeline trigger arrives we can look up which knot/spindle/repo_did 39-- it corresponds to without another round-trip. 40CREATE TABLE IF NOT EXISTS repos ( 41 did TEXT NOT NULL, 42 rkey TEXT NOT NULL, 43 knot TEXT NOT NULL, 44 name TEXT NOT NULL, 45 spindle TEXT, 46 repo_did TEXT, 47 created_at TEXT NOT NULL, 48 PRIMARY KEY (did, rkey) 49); 50 51-- Records of sh.tangled.repo.collaborator. Used together with repos to 52-- decide whether a triggering DID is allowed to push builds to us. 53CREATE TABLE IF NOT EXISTS repo_collaborators ( 54 did TEXT NOT NULL, 55 rkey TEXT NOT NULL, 56 repo TEXT, 57 repo_did TEXT, 58 subject TEXT NOT NULL, 59 created_at TEXT NOT NULL, 60 PRIMARY KEY (did, rkey) 61); 62 63-- Outbound event log. Each row is one record we want to fan out to 64-- connected /events websocket subscribers (typically the Tangled 65-- appview) — today only sh.tangled.pipeline.status. 66-- 67-- We persist instead of pushing through an in-memory channel so that 68-- (a) a reconnecting subscriber can resume from a cursor without 69-- missing events that happened during the gap, and 70-- (b) slow subscribers can't make us drop events for fast ones — they 71-- simply lag behind in the rowid space. 72-- 73-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids 74-- strictly increase and never get reused if a row is ever deleted, so 75-- treating the created column as a monotonic cursor is safe forever. 76CREATE TABLE IF NOT EXISTS events ( 77 created INTEGER PRIMARY KEY AUTOINCREMENT, 78 rkey TEXT NOT NULL, 79 nsid TEXT NOT NULL, 80 event_json TEXT NOT NULL, 81 inserted_at TEXT NOT NULL 82); 83 84-- Mapping from a Buildkite build back to the Tangled pipeline that 85-- spawned it. The Buildkite webhook receiver only knows the build 86-- UUID; everything we need to publish a pipeline.status record 87-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives 88-- on this row. 89-- 90-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the 91-- webhook handler doesn't have to recompute the at:// string on 92-- every event — it's a constant for the lifetime of the build and 93-- the webhook is the hot path for status fan-out. 94-- 95-- The (knot, pipeline_rkey, workflow) index supports the /logs 96-- handler, which only knows that tuple at request time. 97-- org is the Buildkite organisation slug the build was created 98-- against. A workflow's YAML can override the spindle's default 99-- org via tack.buildkite.org, so we persist whatever was used at 100-- Spawn time and read it back when fetching jobs/logs. The empty 101-- string means "use the provider's defaultOrg" — that's both the 102-- usual single-org case and what every row written before this 103-- column existed will scan as. 104-- created_unix_ns is the monotonic recency key. created_at is kept 105-- (RFC3339Nano text) for human-readable inspection, but it must NOT 106-- be used for ordering: text comparison of nanosecond timestamps is 107-- not reliable, which used to make /logs occasionally resolve the 108-- wrong run. created_unix_ns is the integer the latest-build lookup 109-- sorts on instead. 110CREATE TABLE IF NOT EXISTS buildkite_builds ( 111 build_uuid TEXT PRIMARY KEY, 112 build_number INTEGER NOT NULL, 113 pipeline_slug TEXT NOT NULL, 114 org TEXT NOT NULL DEFAULT '', 115 knot TEXT NOT NULL, 116 pipeline_rkey TEXT NOT NULL, 117 workflow TEXT NOT NULL, 118 pipeline_uri TEXT NOT NULL, 119 created_at TEXT NOT NULL, 120 created_unix_ns INTEGER NOT NULL DEFAULT 0 121); 122CREATE INDEX IF NOT EXISTS buildkite_builds_lookup 123 ON buildkite_builds (knot, pipeline_rkey, workflow); 124 125-- Mapping from a Tangled workflow tuple to the latest Tekton 126-- PipelineRun tack created for it. Unlike Buildkite webhooks, Tekton 127-- status observation happens in-process, so the primary read path is 128-- /logs resolving (knot, pipeline_rkey, workflow) back to the concrete 129-- PipelineRun whose TaskRuns and pods hold output. 130CREATE TABLE IF NOT EXISTS tekton_runs ( 131 knot TEXT NOT NULL, 132 pipeline_rkey TEXT NOT NULL, 133 workflow TEXT NOT NULL, 134 namespace TEXT NOT NULL, 135 pipeline_run_name TEXT NOT NULL, 136 pipeline_run_uid TEXT NOT NULL, 137 pipeline_name TEXT NOT NULL, 138 pipeline_uri TEXT NOT NULL, 139 created_at TEXT NOT NULL, 140 created_unix_ns INTEGER NOT NULL, 141 PRIMARY KEY (knot, pipeline_rkey, workflow) 142); 143CREATE INDEX IF NOT EXISTS tekton_runs_uid 144 ON tekton_runs (pipeline_run_uid); 145` 146 147// migrate applies the schema. Safe to call repeatedly. 148// 149// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it 150// won't widen an already-existing table. Columns added after the 151// initial release therefore need a parallel ALTER TABLE step here; 152// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and 153// swallow the "duplicate column" error that fires on subsequent 154// startups. Anything else is fatal. 155func (s *store) migrate(ctx context.Context) error { 156 if _, err := s.db.ExecContext(ctx, schema); err != nil { 157 return fmt.Errorf("apply schema: %w", err) 158 } 159 for _, alter := range []string{ 160 // Persist the Buildkite org each build was created 161 // against so /logs can target the same org the workflow 162 // chose. Pre-existing rows scan as empty string, which 163 // the provider treats as "use defaultOrg". 164 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`, 165 166 // Monotonic integer ordering key for buildkite_builds. 167 // Replaces ORDER BY created_at (RFC3339Nano text), whose 168 // lexical order isn't reliable across nanosecond precision 169 // and could make /logs resolve the wrong run. Default 0 170 // covers pre-existing rows; the backfill below promotes 171 // them to their parsed timestamp so ordering stays stable 172 // across the upgrade. 173 `ALTER TABLE buildkite_builds ADD COLUMN created_unix_ns INTEGER NOT NULL DEFAULT 0`, 174 } { 175 if _, err := s.db.ExecContext(ctx, alter); err != nil { 176 if strings.Contains(err.Error(), "duplicate column name") { 177 continue 178 } 179 return fmt.Errorf("apply alter %q: %w", alter, err) 180 } 181 } 182 183 if err := s.backfillBuildkiteCreatedUnixNS(ctx); err != nil { 184 return fmt.Errorf("backfill buildkite created_unix_ns: %w", err) 185 } 186 return nil 187} 188 189// backfillBuildkiteCreatedUnixNS walks every buildkite_builds row whose 190// created_unix_ns is still the post-ALTER default (0) and sets it from 191// the RFC3339Nano text in created_at. SQLite has no native nanosecond 192// parser, so the conversion has to happen in Go. 193// 194// Rows whose created_at can't be parsed are left at 0; that keeps a 195// single corrupt row from blocking startup, and ordering between two 196// 0-keyed rows still falls through to created_at as a deterministic 197// tiebreaker in the lookup query. 198func (s *store) backfillBuildkiteCreatedUnixNS(ctx context.Context) error { 199 rows, err := s.db.QueryContext(ctx, 200 `SELECT build_uuid, created_at FROM buildkite_builds 201 WHERE created_unix_ns = 0`, 202 ) 203 if err != nil { 204 return fmt.Errorf("query rows to backfill: %w", err) 205 } 206 type pending struct { 207 uuid string 208 ns int64 209 } 210 var todo []pending 211 for rows.Next() { 212 var uuid, createdAt string 213 if err := rows.Scan(&uuid, &createdAt); err != nil { 214 rows.Close() 215 return fmt.Errorf("scan row: %w", err) 216 } 217 t, perr := time.Parse(time.RFC3339Nano, createdAt) 218 if perr != nil { 219 // Skip unparseable rows rather than failing the whole 220 // migration. The lookup query still has a fallback 221 // ordering for rows that share the default 0 key. 222 continue 223 } 224 todo = append(todo, pending{uuid: uuid, ns: t.UnixNano()}) 225 } 226 if err := rows.Err(); err != nil { 227 rows.Close() 228 return fmt.Errorf("iterate rows: %w", err) 229 } 230 rows.Close() 231 232 for _, p := range todo { 233 if _, err := s.db.ExecContext(ctx, 234 `UPDATE buildkite_builds SET created_unix_ns = ? 235 WHERE build_uuid = ?`, 236 p.ns, p.uuid, 237 ); err != nil { 238 return fmt.Errorf("update row %q: %w", p.uuid, err) 239 } 240 } 241 return nil 242}