Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Schema definition and migration logic for the SQLite store. Pulled out 4// of store.go so the big SQL block doesn't sit in the middle of the 5// runtime API surface. 6 7import ( 8 "context" 9 "fmt" 10 "strings" 11 "time" 12) 13 14// schema is the full set of CREATE statements applied at startup. It is 15// idempotent and additive only — no `DROP`s — so future changes can be 16// layered on as additional statements without needing a separate 17// migration tool until the project actually outgrows that. 18const schema = ` 19CREATE TABLE IF NOT EXISTS meta ( 20 key TEXT PRIMARY KEY, 21 value TEXT NOT NULL 22); 23 24-- Records of sh.tangled.spindle.member. The owner of a spindle publishes 25-- one of these per authorized member. (did, rkey) is the natural ATProto 26-- key — did identifies the publisher's PDS, rkey identifies the record 27-- within that PDS's collection. 28CREATE TABLE IF NOT EXISTS spindle_members ( 29 did TEXT NOT NULL, 30 rkey TEXT NOT NULL, 31 instance TEXT NOT NULL, 32 subject TEXT NOT NULL, 33 created_at TEXT NOT NULL, 34 PRIMARY KEY (did, rkey) 35); 36 37-- Records of sh.tangled.repo. We keep the full set so that when a 38-- pipeline trigger arrives we can look up which knot/spindle/repo_did 39-- it corresponds to without another round-trip. 40CREATE TABLE IF NOT EXISTS repos ( 41 did TEXT NOT NULL, 42 rkey TEXT NOT NULL, 43 knot TEXT NOT NULL, 44 name TEXT NOT NULL, 45 spindle TEXT, 46 repo_did TEXT, 47 created_at TEXT NOT NULL, 48 PRIMARY KEY (did, rkey) 49); 50 51-- Records of sh.tangled.repo.collaborator. Used together with repos to 52-- decide whether a triggering DID is allowed to push builds to us. 53CREATE TABLE IF NOT EXISTS repo_collaborators ( 54 did TEXT NOT NULL, 55 rkey TEXT NOT NULL, 56 repo TEXT, 57 repo_did TEXT, 58 subject TEXT NOT NULL, 59 created_at TEXT NOT NULL, 60 PRIMARY KEY (did, rkey) 61); 62 63-- Outbound event log. Each row is one record we want to fan out to 64-- connected /events websocket subscribers (typically the Tangled 65-- appview) — today only sh.tangled.pipeline.status. 66-- 67-- We persist instead of pushing through an in-memory channel so that 68-- (a) a reconnecting subscriber can resume from a cursor without 69-- missing events that happened during the gap, and 70-- (b) slow subscribers can't make us drop events for fast ones — they 71-- simply lag behind in the rowid space. 72-- 73-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids 74-- strictly increase and never get reused if a row is ever deleted, so 75-- treating the created column as a monotonic cursor is safe forever. 76CREATE TABLE IF NOT EXISTS events ( 77 created INTEGER PRIMARY KEY AUTOINCREMENT, 78 rkey TEXT NOT NULL, 79 nsid TEXT NOT NULL, 80 event_json TEXT NOT NULL, 81 inserted_at TEXT NOT NULL 82); 83 84-- Mapping from a Buildkite build back to the Tangled pipeline that 85-- spawned it. The Buildkite webhook receiver only knows the build 86-- UUID; everything we need to publish a pipeline.status record 87-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives 88-- on this row. 89-- 90-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the 91-- webhook handler doesn't have to recompute the at:// string on 92-- every event — it's a constant for the lifetime of the build and 93-- the webhook is the hot path for status fan-out. 94-- 95-- The (knot, pipeline_rkey, workflow) index supports the /logs 96-- handler, which only knows that tuple at request time. 97-- org is the Buildkite organisation slug the build was created 98-- against. A workflow's YAML can override the spindle's default 99-- org via tack.buildkite.org, so we persist whatever was used at 100-- Spawn time and read it back when fetching jobs/logs. The empty 101-- string means "use the provider's defaultOrg" — that's both the 102-- usual single-org case and what every row written before this 103-- column existed will scan as. 104-- created_unix_ns is the monotonic recency key. created_at is kept 105-- (RFC3339Nano text) for human-readable inspection, but it must NOT 106-- be used for ordering: text comparison of nanosecond timestamps is 107-- not reliable, which used to make /logs occasionally resolve the 108-- wrong run. created_unix_ns is the integer the latest-build lookup 109-- sorts on instead. 110CREATE TABLE IF NOT EXISTS buildkite_builds ( 111 build_uuid TEXT PRIMARY KEY, 112 build_number INTEGER NOT NULL, 113 pipeline_slug TEXT NOT NULL, 114 org TEXT NOT NULL DEFAULT '', 115 knot TEXT NOT NULL, 116 pipeline_rkey TEXT NOT NULL, 117 workflow TEXT NOT NULL, 118 pipeline_uri TEXT NOT NULL, 119 created_at TEXT NOT NULL, 120 created_unix_ns INTEGER NOT NULL DEFAULT 0 121); 122CREATE INDEX IF NOT EXISTS buildkite_builds_lookup 123 ON buildkite_builds (knot, pipeline_rkey, workflow); 124` 125 126// migrate applies the schema. Safe to call repeatedly. 127// 128// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it 129// won't widen an already-existing table. Columns added after the 130// initial release therefore need a parallel ALTER TABLE step here; 131// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and 132// swallow the "duplicate column" error that fires on subsequent 133// startups. Anything else is fatal. 134func (s *store) migrate(ctx context.Context) error { 135 if _, err := s.db.ExecContext(ctx, schema); err != nil { 136 return fmt.Errorf("apply schema: %w", err) 137 } 138 for _, alter := range []string{ 139 // Persist the Buildkite org each build was created 140 // against so /logs can target the same org the workflow 141 // chose. Pre-existing rows scan as empty string, which 142 // the provider treats as "use defaultOrg". 143 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`, 144 145 // Monotonic integer ordering key for buildkite_builds. 146 // Replaces ORDER BY created_at (RFC3339Nano text), whose 147 // lexical order isn't reliable across nanosecond precision 148 // and could make /logs resolve the wrong run. Default 0 149 // covers pre-existing rows; the backfill below promotes 150 // them to their parsed timestamp so ordering stays stable 151 // across the upgrade. 152 `ALTER TABLE buildkite_builds ADD COLUMN created_unix_ns INTEGER NOT NULL DEFAULT 0`, 153 } { 154 if _, err := s.db.ExecContext(ctx, alter); err != nil { 155 if strings.Contains(err.Error(), "duplicate column name") { 156 continue 157 } 158 return fmt.Errorf("apply alter %q: %w", alter, err) 159 } 160 } 161 162 if err := s.backfillBuildkiteCreatedUnixNS(ctx); err != nil { 163 return fmt.Errorf("backfill buildkite created_unix_ns: %w", err) 164 } 165 return nil 166} 167 168// backfillBuildkiteCreatedUnixNS walks every buildkite_builds row whose 169// created_unix_ns is still the post-ALTER default (0) and sets it from 170// the RFC3339Nano text in created_at. SQLite has no native nanosecond 171// parser, so the conversion has to happen in Go. 172// 173// Rows whose created_at can't be parsed are left at 0; that keeps a 174// single corrupt row from blocking startup, and ordering between two 175// 0-keyed rows still falls through to created_at as a deterministic 176// tiebreaker in the lookup query. 177func (s *store) backfillBuildkiteCreatedUnixNS(ctx context.Context) error { 178 rows, err := s.db.QueryContext(ctx, 179 `SELECT build_uuid, created_at FROM buildkite_builds 180 WHERE created_unix_ns = 0`, 181 ) 182 if err != nil { 183 return fmt.Errorf("query rows to backfill: %w", err) 184 } 185 type pending struct { 186 uuid string 187 ns int64 188 } 189 var todo []pending 190 for rows.Next() { 191 var uuid, createdAt string 192 if err := rows.Scan(&uuid, &createdAt); err != nil { 193 rows.Close() 194 return fmt.Errorf("scan row: %w", err) 195 } 196 t, perr := time.Parse(time.RFC3339Nano, createdAt) 197 if perr != nil { 198 // Skip unparseable rows rather than failing the whole 199 // migration. The lookup query still has a fallback 200 // ordering for rows that share the default 0 key. 201 continue 202 } 203 todo = append(todo, pending{uuid: uuid, ns: t.UnixNano()}) 204 } 205 if err := rows.Err(); err != nil { 206 rows.Close() 207 return fmt.Errorf("iterate rows: %w", err) 208 } 209 rows.Close() 210 211 for _, p := range todo { 212 if _, err := s.db.ExecContext(ctx, 213 `UPDATE buildkite_builds SET created_unix_ns = ? 214 WHERE build_uuid = ?`, 215 p.ns, p.uuid, 216 ); err != nil { 217 return fmt.Errorf("update row %q: %w", p.uuid, err) 218 } 219 } 220 return nil 221}