Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// SQLite-backed persistence for tack. 4// 5// Two responsibilities live here: 6// 7// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto 8// firehose uses to resume from a specific point. Without persistence 9// every restart begins at "now" and we'd silently miss any record 10// published while we were down. 11// 12// 2. Tangled membership state derived from jetstream commits: 13// sh.tangled.spindle.member, sh.tangled.repo, and 14// sh.tangled.repo.collaborator. We need this to later answer 15// "is DID X allowed to trigger a build on this spindle for repo Y?" 16// 17// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver 18// for Go. It requires CGo, which is fine for tack — the project already 19// builds under Nix where a C toolchain is readily available. 20 21import ( 22 "context" 23 "database/sql" 24 "encoding/json" 25 "errors" 26 "fmt" 27 "strconv" 28 "time" 29 30 _ "github.com/mattn/go-sqlite3" 31) 32 33// store wraps the SQLite handle and exposes the small set of operations 34// the rest of tack needs. Keeping the surface narrow lets the persistence 35// layer be swapped or mocked later without rewriting callers. 36type store struct { 37 db *sql.DB 38} 39 40// openStore opens (or creates) the SQLite database at path and applies 41// the schema. It also flips on WAL + NORMAL synchronous, which is the 42// usual "this is a long-running server, not a one-shot script" config: 43// concurrent reads don't block the single writer, and we trade a little 44// crash-safety on power loss for a lot of write throughput. 45func openStore(path string) (*store, error) { 46 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode, 47 // _synchronous, _foreign_keys) and applies them on each new connection. 48 // journal_mode=WAL persists in the database file but setting it here 49 // also covers the first-ever open. 50 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path) 51 db, err := sql.Open("sqlite3", dsn) 52 if err != nil { 53 return nil, fmt.Errorf("open sqlite %q: %w", path, err) 54 } 55 56 // SQLite only supports one writer at a time. Capping max open conns 57 // at 1 keeps database/sql from spinning up extra connections that 58 // will only ever serialize behind that writer. 59 db.SetMaxOpenConns(1) 60 61 s := &store{db: db} 62 if err := s.migrate(context.Background()); err != nil { 63 _ = db.Close() 64 return nil, err 65 } 66 return s, nil 67} 68 69// Close releases the underlying database handle. 70func (s *store) Close() error { 71 return s.db.Close() 72} 73 74// metaCursorKey is the meta-table key under which we persist the 75// jetstream cursor. Pulled out as a constant to avoid drift between 76// load/save sites. 77const metaCursorKey = "jetstream_cursor" 78 79// LoadCursor returns the persisted jetstream cursor, or nil if none has 80// been saved yet (signaling "start from now"). The cursor is a unix 81// microsecond timestamp — see jetstream.Event.TimeUS. 82func (s *store) LoadCursor(ctx context.Context) (*int64, error) { 83 var raw string 84 err := s.db.QueryRowContext(ctx, 85 `SELECT value FROM meta WHERE key = ?`, metaCursorKey, 86 ).Scan(&raw) 87 if errors.Is(err, sql.ErrNoRows) { 88 return nil, nil 89 } 90 if err != nil { 91 return nil, fmt.Errorf("load cursor: %w", err) 92 } 93 v, err := strconv.ParseInt(raw, 10, 64) 94 if err != nil { 95 // A malformed cursor shouldn't wedge startup — log-and-ignore 96 // (return nil) so we just resume from "now". The caller has the 97 // logger; we surface the parse error so it can decide. 98 return nil, fmt.Errorf("parse cursor %q: %w", raw, err) 99 } 100 return &v, nil 101} 102 103// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta 104// row is created on first call and updated thereafter. 105func (s *store) SaveCursor(ctx context.Context, cursor int64) error { 106 _, err := s.db.ExecContext(ctx, 107 `INSERT INTO meta (key, value) VALUES (?, ?) 108 ON CONFLICT(key) DO UPDATE SET value = excluded.value`, 109 metaCursorKey, strconv.FormatInt(cursor, 10), 110 ) 111 if err != nil { 112 return fmt.Errorf("save cursor: %w", err) 113 } 114 return nil 115} 116 117// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member 118// observation. 119func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error { 120 _, err := s.db.ExecContext(ctx, 121 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at) 122 VALUES (?, ?, ?, ?, ?) 123 ON CONFLICT(did, rkey) DO UPDATE SET 124 instance = excluded.instance, 125 subject = excluded.subject, 126 created_at = excluded.created_at`, 127 did, rkey, instance, subject, createdAt, 128 ) 129 if err != nil { 130 return fmt.Errorf("upsert spindle_member: %w", err) 131 } 132 return nil 133} 134 135// DeleteSpindleMember removes a member record by its ATProto identity. 136func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error { 137 _, err := s.db.ExecContext(ctx, 138 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`, 139 did, rkey, 140 ) 141 if err != nil { 142 return fmt.Errorf("delete spindle_member: %w", err) 143 } 144 return nil 145} 146 147// UpsertRepo records (or refreshes) a sh.tangled.repo observation. 148// spindle and repoDid may be empty strings — we store them as such rather 149// than as SQL NULLs to keep the upsert path uniform. 150func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error { 151 _, err := s.db.ExecContext(ctx, 152 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at) 153 VALUES (?, ?, ?, ?, ?, ?, ?) 154 ON CONFLICT(did, rkey) DO UPDATE SET 155 knot = excluded.knot, 156 name = excluded.name, 157 spindle = excluded.spindle, 158 repo_did = excluded.repo_did, 159 created_at = excluded.created_at`, 160 did, rkey, knot, name, spindle, repoDid, createdAt, 161 ) 162 if err != nil { 163 return fmt.Errorf("upsert repo: %w", err) 164 } 165 return nil 166} 167 168// DeleteRepo removes a repo record by its ATProto identity. 169func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error { 170 _, err := s.db.ExecContext(ctx, 171 `DELETE FROM repos WHERE did = ? AND rkey = ?`, 172 did, rkey, 173 ) 174 if err != nil { 175 return fmt.Errorf("delete repo: %w", err) 176 } 177 return nil 178} 179 180// UpsertRepoCollaborator records (or refreshes) a 181// sh.tangled.repo.collaborator observation. 182func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error { 183 _, err := s.db.ExecContext(ctx, 184 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at) 185 VALUES (?, ?, ?, ?, ?, ?) 186 ON CONFLICT(did, rkey) DO UPDATE SET 187 repo = excluded.repo, 188 repo_did = excluded.repo_did, 189 subject = excluded.subject, 190 created_at = excluded.created_at`, 191 did, rkey, repo, repoDid, subject, createdAt, 192 ) 193 if err != nil { 194 return fmt.Errorf("upsert repo_collaborator: %w", err) 195 } 196 return nil 197} 198 199// GetRepo returns the (knot, spindle) currently stored for a (did, rkey) 200// pair. Both are returned as empty strings when no row exists; callers 201// that need to distinguish "absent" from "stored but empty" should 202// pre-check existence themselves. 203// 204// This exists so applyRepo can read the *previous* spindle/knot of a 205// record before applying a mutation, which is what makes it possible to 206// detect transitions like "this repo used to be ours, now it isn't" and 207// trigger a knot unsubscribe. 208func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) { 209 err = s.db.QueryRowContext(ctx, 210 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`, 211 did, rkey, 212 ).Scan(&knot, &spindle) 213 if errors.Is(err, sql.ErrNoRows) { 214 return "", "", nil 215 } 216 if err != nil { 217 return "", "", fmt.Errorf("get repo: %w", err) 218 } 219 return knot, spindle, nil 220} 221 222// IsKnotWanted reports whether any repo currently stored still names the 223// given hostname as its spindle and the given knot as its host. After a 224// repo update or delete this is the question we ask to decide whether 225// to keep watching that knot or unsubscribe from it. 226func (s *store) IsKnotWanted(ctx context.Context, hostname, knot string) (bool, error) { 227 var n int 228 err := s.db.QueryRowContext(ctx, 229 `SELECT COUNT(*) FROM repos WHERE spindle = ? AND knot = ?`, 230 hostname, knot, 231 ).Scan(&n) 232 if err != nil { 233 return false, fmt.Errorf("count repos for knot: %w", err) 234 } 235 return n > 0, nil 236} 237 238// KnotsForSpindle returns the distinct knot hostnames of all repos that 239// have declared the given spindle hostname as their CI spindle. The knot 240// event-stream subscriber uses this to decide which knots to dial. 241// 242// Returns an empty slice (not nil) when nothing matches, so callers can 243// range over the result without a nil check. 244func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) { 245 rows, err := s.db.QueryContext(ctx, 246 `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`, 247 hostname, 248 ) 249 if err != nil { 250 return nil, fmt.Errorf("query knots: %w", err) 251 } 252 defer rows.Close() 253 254 out := []string{} 255 for rows.Next() { 256 var k string 257 if err := rows.Scan(&k); err != nil { 258 return nil, fmt.Errorf("scan knot: %w", err) 259 } 260 out = append(out, k) 261 } 262 if err := rows.Err(); err != nil { 263 return nil, fmt.Errorf("iterate knots: %w", err) 264 } 265 return out, nil 266} 267 268// DeleteRepoCollaborator removes a collaborator record by its ATProto 269// identity. 270func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error { 271 _, err := s.db.ExecContext(ctx, 272 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`, 273 did, rkey, 274 ) 275 if err != nil { 276 return fmt.Errorf("delete repo_collaborator: %w", err) 277 } 278 return nil 279} 280 281// EventRow is one row of the events table. It represents an outbound 282// record we want to deliver to /events websocket subscribers, in the 283// shape callers actually need (raw record JSON, not stringly-typed). 284type EventRow struct { 285 // Created is the assigned monotonic rowid; doubles as the cursor 286 // value subscribers use to resume. 287 Created int64 288 // Rkey is the ATProto record key. For sh.tangled.pipeline.status 289 // records this is the rkey we mint when publishing. 290 Rkey string 291 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status). 292 Nsid string 293 // EventJSON is the record body verbatim — held as RawMessage so 294 // the /events handler can splice it into the wire envelope without 295 // an unmarshal/remarshal round-trip. 296 EventJSON json.RawMessage 297} 298 299// InsertEvent appends an event row and returns its assigned `created` 300// (rowid) cursor. Storage is the source of truth for fan-out, so we 301// write here even if zero subscribers are connected — a subscriber that 302// connects later (with an old cursor) will pick the row up via 303// EventsAfter. 304// 305// eventJSON must be a valid JSON object; we store it verbatim. Length 306// validation is intentionally absent — the schema accepts arbitrary 307// TEXT and SQLite handles huge blobs fine for our scale. 308func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) { 309 res, err := s.db.ExecContext(ctx, 310 `INSERT INTO events (rkey, nsid, event_json, inserted_at) 311 VALUES (?, ?, ?, ?)`, 312 rkey, nsid, string(eventJSON), 313 time.Now().UTC().Format(time.RFC3339Nano), 314 ) 315 if err != nil { 316 return 0, fmt.Errorf("insert event: %w", err) 317 } 318 id, err := res.LastInsertId() 319 if err != nil { 320 return 0, fmt.Errorf("event last insert id: %w", err) 321 } 322 return id, nil 323} 324 325// BuildkiteBuildRef is the persisted mapping from one Buildkite build 326// to the Tangled pipeline tuple that spawned it. It's the row written 327// by the Buildkite provider at Spawn time and read back from two 328// places: the webhook handler (by build UUID) when an event arrives, 329// and the /logs handler (by knot+rkey+workflow) when an appview 330// client asks for output. 331type BuildkiteBuildRef struct { 332 BuildUUID string 333 BuildNumber int64 334 PipelineSlug string 335 Knot string 336 PipelineRkey string 337 Workflow string 338 PipelineURI string 339} 340 341// InsertBuildkiteBuild records that a Buildkite build was created on 342// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 343// INSERT OR REPLACE so that an unlikely build-uuid collision (or a 344// Buildkite-side rebuild that re-fires us) just refreshes the row 345// instead of failing. 346func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error { 347 _, err := s.db.ExecContext(ctx, 348 `INSERT INTO buildkite_builds ( 349 build_uuid, build_number, pipeline_slug, 350 knot, pipeline_rkey, workflow, 351 pipeline_uri, created_at 352 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 353 ON CONFLICT(build_uuid) DO UPDATE SET 354 build_number = excluded.build_number, 355 pipeline_slug = excluded.pipeline_slug, 356 knot = excluded.knot, 357 pipeline_rkey = excluded.pipeline_rkey, 358 workflow = excluded.workflow, 359 pipeline_uri = excluded.pipeline_uri, 360 created_at = excluded.created_at`, 361 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, 362 ref.Knot, ref.PipelineRkey, ref.Workflow, 363 ref.PipelineURI, time.Now().UTC().Format(time.RFC3339Nano), 364 ) 365 if err != nil { 366 return fmt.Errorf("insert buildkite_build: %w", err) 367 } 368 return nil 369} 370 371// LookupBuildkiteBuildByUUID returns the saved mapping for the given 372// Buildkite build UUID, or nil when no such build is recorded. 373// Returning a nil pointer rather than a sentinel error keeps the 374// webhook handler's "we don't know about this build" branch a simple 375// nil check. 376func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) { 377 var ref BuildkiteBuildRef 378 err := s.db.QueryRowContext(ctx, 379 `SELECT build_uuid, build_number, pipeline_slug, 380 knot, pipeline_rkey, workflow, pipeline_uri 381 FROM buildkite_builds WHERE build_uuid = ?`, 382 buildUUID, 383 ).Scan( 384 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, 385 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 386 ) 387 if errors.Is(err, sql.ErrNoRows) { 388 return nil, nil 389 } 390 if err != nil { 391 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err) 392 } 393 return &ref, nil 394} 395 396// LookupBuildkiteBuildByTuple finds the most recently created build 397// for (knot, pipelineRkey, workflow). Returns nil when no build has 398// been recorded for that tuple — used by /logs to translate the 399// appview's path-based identity back into something Buildkite knows. 400// 401// "Most recent" matters because a workflow may have multiple builds 402// over time (rebuilds, re-triggers). We always serve logs for the 403// latest run; older runs are still queryable by build UUID directly 404// if anyone ever wants that. 405func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) { 406 var ref BuildkiteBuildRef 407 err := s.db.QueryRowContext(ctx, 408 `SELECT build_uuid, build_number, pipeline_slug, 409 knot, pipeline_rkey, workflow, pipeline_uri 410 FROM buildkite_builds 411 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ? 412 ORDER BY created_at DESC 413 LIMIT 1`, 414 knot, pipelineRkey, workflow, 415 ).Scan( 416 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, 417 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 418 ) 419 if errors.Is(err, sql.ErrNoRows) { 420 return nil, nil 421 } 422 if err != nil { 423 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err) 424 } 425 return &ref, nil 426} 427 428// EventsAfter returns every event row with `created` strictly greater 429// than cursor, in cursor order. Used by /events to backfill a 430// reconnecting subscriber and to drain newly-published rows on each 431// broker notification. 432// 433// Pass cursor=0 to get the full log from the beginning, which is what 434// happens when a subscriber connects without a ?cursor= query param. 435func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) { 436 rows, err := s.db.QueryContext(ctx, 437 `SELECT created, rkey, nsid, event_json 438 FROM events 439 WHERE created > ? 440 ORDER BY created ASC`, 441 cursor, 442 ) 443 if err != nil { 444 return nil, fmt.Errorf("query events: %w", err) 445 } 446 defer rows.Close() 447 448 out := []EventRow{} 449 for rows.Next() { 450 var ( 451 ev EventRow 452 raw string 453 ) 454 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil { 455 return nil, fmt.Errorf("scan event: %w", err) 456 } 457 ev.EventJSON = json.RawMessage(raw) 458 out = append(out, ev) 459 } 460 if err := rows.Err(); err != nil { 461 return nil, fmt.Errorf("iterate events: %w", err) 462 } 463 return out, nil 464}