Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// Schema definition and migration logic for the SQLite store. Pulled out 4// of store.go so the big SQL block doesn't sit in the middle of the 5// runtime API surface. 6 7import ( 8 "context" 9 "fmt" 10 "strings" 11 "time" 12) 13 14// schema is the full set of CREATE statements applied at startup. It is 15// idempotent and additive only — no `DROP`s — so future changes can be 16// layered on as additional statements without needing a separate 17// migration tool until the project actually outgrows that. 18const schema = ` 19CREATE TABLE IF NOT EXISTS meta ( 20 key TEXT PRIMARY KEY, 21 value TEXT NOT NULL 22); 23 24-- Records of sh.tangled.spindle.member. The owner of a spindle publishes 25-- one of these per authorized member. (did, rkey) is the natural ATProto 26-- key — did identifies the publisher's PDS, rkey identifies the record 27-- within that PDS's collection. 28CREATE TABLE IF NOT EXISTS spindle_members ( 29 did TEXT NOT NULL, 30 rkey TEXT NOT NULL, 31 instance TEXT NOT NULL, 32 subject TEXT NOT NULL, 33 created_at TEXT NOT NULL, 34 PRIMARY KEY (did, rkey) 35); 36 37-- Records of sh.tangled.repo. We keep the full set so that when a 38-- pipeline trigger arrives we can look up which knot/spindle/repo_did 39-- it corresponds to without another round-trip. 40CREATE TABLE IF NOT EXISTS repos ( 41 did TEXT NOT NULL, 42 rkey TEXT NOT NULL, 43 knot TEXT NOT NULL, 44 name TEXT NOT NULL, 45 spindle TEXT, 46 repo_did TEXT, 47 created_at TEXT NOT NULL, 48 PRIMARY KEY (did, rkey) 49); 50 51-- Records of sh.tangled.repo.collaborator. Used together with repos to 52-- decide whether a triggering DID is allowed to push builds to us. 53CREATE TABLE IF NOT EXISTS repo_collaborators ( 54 did TEXT NOT NULL, 55 rkey TEXT NOT NULL, 56 repo TEXT, 57 repo_did TEXT, 58 subject TEXT NOT NULL, 59 created_at TEXT NOT NULL, 60 PRIMARY KEY (did, rkey) 61); 62 63-- Outbound event log. Each row is one record we want to fan out to 64-- connected /events websocket subscribers (typically the Tangled 65-- appview) — today only sh.tangled.pipeline.status. 66-- 67-- We persist instead of pushing through an in-memory channel so that 68-- (a) a reconnecting subscriber can resume from a cursor without 69-- missing events that happened during the gap, and 70-- (b) slow subscribers can't make us drop events for fast ones — they 71-- simply lag behind in the rowid space. 72-- 73-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids 74-- strictly increase and never get reused if a row is ever deleted, so 75-- treating the created column as a monotonic cursor is safe forever. 76CREATE TABLE IF NOT EXISTS events ( 77 created INTEGER PRIMARY KEY AUTOINCREMENT, 78 rkey TEXT NOT NULL, 79 nsid TEXT NOT NULL, 80 event_json TEXT NOT NULL, 81 inserted_at TEXT NOT NULL 82); 83 84-- Mapping from a Buildkite build back to the Tangled pipeline that 85-- spawned it. The Buildkite webhook receiver only knows the build 86-- UUID; everything we need to publish a pipeline.status record 87-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives 88-- on this row. 89-- 90-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the 91-- webhook handler doesn't have to recompute the at:// string on 92-- every event — it's a constant for the lifetime of the build and 93-- the webhook is the hot path for status fan-out. 94-- 95-- The (knot, pipeline_rkey, workflow) index supports the /logs 96-- handler, which only knows that tuple at request time. 97-- org is the Buildkite organisation slug the build was created 98-- against. A workflow's YAML can override the spindle's default 99-- org via tack.buildkite.org, so we persist whatever was used at 100-- Spawn time and read it back when fetching jobs/logs. The empty 101-- string means "use the provider's defaultOrg" — that's both the 102-- usual single-org case and what every row written before this 103-- column existed will scan as. 104-- created_unix_ns is the monotonic recency key. created_at is kept 105-- (RFC3339Nano text) for human-readable inspection, but it must NOT 106-- be used for ordering: text comparison of nanosecond timestamps is 107-- not reliable, which used to make /logs occasionally resolve the 108-- wrong run. created_unix_ns is the integer the latest-build lookup 109-- sorts on instead. 110CREATE TABLE IF NOT EXISTS buildkite_builds ( 111 build_uuid TEXT PRIMARY KEY, 112 build_number INTEGER NOT NULL, 113 pipeline_slug TEXT NOT NULL, 114 org TEXT NOT NULL DEFAULT '', 115 knot TEXT NOT NULL, 116 pipeline_rkey TEXT NOT NULL, 117 workflow TEXT NOT NULL, 118 pipeline_uri TEXT NOT NULL, 119 created_at TEXT NOT NULL, 120 created_unix_ns INTEGER NOT NULL DEFAULT 0 121); 122CREATE INDEX IF NOT EXISTS buildkite_builds_lookup 123 ON buildkite_builds (knot, pipeline_rkey, workflow); 124 125-- Mapping from a Tangled workflow tuple to the latest Tekton 126-- PipelineRun tack created for it. Unlike Buildkite webhooks, Tekton 127-- status observation happens in-process, so the primary read path is 128-- /logs resolving (knot, pipeline_rkey, workflow) back to the concrete 129-- PipelineRun whose TaskRuns and pods hold output. 130CREATE TABLE IF NOT EXISTS tekton_runs ( 131 knot TEXT NOT NULL, 132 pipeline_rkey TEXT NOT NULL, 133 workflow TEXT NOT NULL, 134 namespace TEXT NOT NULL, 135 pipeline_run_name TEXT NOT NULL, 136 pipeline_run_uid TEXT NOT NULL, 137 pipeline_name TEXT NOT NULL, 138 pipeline_uri TEXT NOT NULL, 139 created_at TEXT NOT NULL, 140 created_unix_ns INTEGER NOT NULL, 141 PRIMARY KEY (knot, pipeline_rkey, workflow) 142); 143CREATE INDEX IF NOT EXISTS tekton_runs_uid 144 ON tekton_runs (pipeline_run_uid); 145 146-- Mapping from a Tangled workflow tuple to the latest builds.sr.ht 147-- job tack submitted for it. Same shape as tekton_runs: like Tekton, 148-- sourcehut is observed by polling rather than webhooks, so the read 149-- path is purely tuple → job_id resolution at /logs and watch time. 150-- instance is the builds.sr.ht-compatible base URL the job lives on 151-- (job IDs are scoped to one instance), preserved per-row so a later 152-- change to the provider's default instance can't silently retarget 153-- historical lookups at the wrong server. 154CREATE TABLE IF NOT EXISTS sourcehut_jobs ( 155 knot TEXT NOT NULL, 156 pipeline_rkey TEXT NOT NULL, 157 workflow TEXT NOT NULL, 158 job_id INTEGER NOT NULL, 159 owner TEXT NOT NULL, 160 instance TEXT NOT NULL, 161 pipeline_uri TEXT NOT NULL, 162 created_at TEXT NOT NULL, 163 created_unix_ns INTEGER NOT NULL, 164 PRIMARY KEY (knot, pipeline_rkey, workflow) 165); 166CREATE INDEX IF NOT EXISTS sourcehut_jobs_id 167 ON sourcehut_jobs (job_id); 168` 169 170// migrate applies the schema. Safe to call repeatedly. 171// 172// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it 173// won't widen an already-existing table. Columns added after the 174// initial release therefore need a parallel ALTER TABLE step here; 175// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and 176// swallow the "duplicate column" error that fires on subsequent 177// startups. Anything else is fatal. 178func (s *store) migrate(ctx context.Context) error { 179 if _, err := s.db.ExecContext(ctx, schema); err != nil { 180 return fmt.Errorf("apply schema: %w", err) 181 } 182 for _, alter := range []string{ 183 // Persist the Buildkite org each build was created 184 // against so /logs can target the same org the workflow 185 // chose. Pre-existing rows scan as empty string, which 186 // the provider treats as "use defaultOrg". 187 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`, 188 189 // Monotonic integer ordering key for buildkite_builds. 190 // Replaces ORDER BY created_at (RFC3339Nano text), whose 191 // lexical order isn't reliable across nanosecond precision 192 // and could make /logs resolve the wrong run. Default 0 193 // covers pre-existing rows; the backfill below promotes 194 // them to their parsed timestamp so ordering stays stable 195 // across the upgrade. 196 `ALTER TABLE buildkite_builds ADD COLUMN created_unix_ns INTEGER NOT NULL DEFAULT 0`, 197 } { 198 if _, err := s.db.ExecContext(ctx, alter); err != nil { 199 if strings.Contains(err.Error(), "duplicate column name") { 200 continue 201 } 202 return fmt.Errorf("apply alter %q: %w", alter, err) 203 } 204 } 205 206 if err := s.backfillBuildkiteCreatedUnixNS(ctx); err != nil { 207 return fmt.Errorf("backfill buildkite created_unix_ns: %w", err) 208 } 209 return nil 210} 211 212// backfillBuildkiteCreatedUnixNS walks every buildkite_builds row whose 213// created_unix_ns is still the post-ALTER default (0) and sets it from 214// the RFC3339Nano text in created_at. SQLite has no native nanosecond 215// parser, so the conversion has to happen in Go. 216// 217// Rows whose created_at can't be parsed are left at 0; that keeps a 218// single corrupt row from blocking startup, and ordering between two 219// 0-keyed rows still falls through to created_at as a deterministic 220// tiebreaker in the lookup query. 221func (s *store) backfillBuildkiteCreatedUnixNS(ctx context.Context) error { 222 rows, err := s.db.QueryContext(ctx, 223 `SELECT build_uuid, created_at FROM buildkite_builds 224 WHERE created_unix_ns = 0`, 225 ) 226 if err != nil { 227 return fmt.Errorf("query rows to backfill: %w", err) 228 } 229 type pending struct { 230 uuid string 231 ns int64 232 } 233 var todo []pending 234 for rows.Next() { 235 var uuid, createdAt string 236 if err := rows.Scan(&uuid, &createdAt); err != nil { 237 rows.Close() 238 return fmt.Errorf("scan row: %w", err) 239 } 240 t, perr := time.Parse(time.RFC3339Nano, createdAt) 241 if perr != nil { 242 // Skip unparseable rows rather than failing the whole 243 // migration. The lookup query still has a fallback 244 // ordering for rows that share the default 0 key. 245 continue 246 } 247 todo = append(todo, pending{uuid: uuid, ns: t.UnixNano()}) 248 } 249 if err := rows.Err(); err != nil { 250 rows.Close() 251 return fmt.Errorf("iterate rows: %w", err) 252 } 253 rows.Close() 254 255 for _, p := range todo { 256 if _, err := s.db.ExecContext(ctx, 257 `UPDATE buildkite_builds SET created_unix_ns = ? 258 WHERE build_uuid = ?`, 259 p.ns, p.uuid, 260 ); err != nil { 261 return fmt.Errorf("update row %q: %w", p.uuid, err) 262 } 263 } 264 return nil 265}