Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// SQLite-backed persistence for tack. 4// 5// Two responsibilities live here: 6// 7// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto 8// firehose uses to resume from a specific point. Without persistence 9// every restart begins at "now" and we'd silently miss any record 10// published while we were down. 11// 12// 2. Tangled membership state derived from jetstream commits: 13// sh.tangled.spindle.member, sh.tangled.repo, and 14// sh.tangled.repo.collaborator. We need this to later answer 15// "is DID X allowed to trigger a build on this spindle for repo Y?" 16// 17// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver 18// for Go. It requires CGo, which is fine for tack — the project already 19// builds under Nix where a C toolchain is readily available. 20 21import ( 22 "context" 23 "database/sql" 24 "encoding/json" 25 "errors" 26 "fmt" 27 "strconv" 28 "time" 29 30 _ "github.com/mattn/go-sqlite3" 31) 32 33// store wraps the SQLite handle and exposes the small set of operations 34// the rest of tack needs. Keeping the surface narrow lets the persistence 35// layer be swapped or mocked later without rewriting callers. 36type store struct { 37 db *sql.DB 38} 39 40// openStore opens (or creates) the SQLite database at path and applies 41// the schema. It also flips on WAL + NORMAL synchronous, which is the 42// usual "this is a long-running server, not a one-shot script" config: 43// concurrent reads don't block the single writer, and we trade a little 44// crash-safety on power loss for a lot of write throughput. 45func openStore(path string) (*store, error) { 46 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode, 47 // _synchronous, _foreign_keys) and applies them on each new connection. 48 // journal_mode=WAL persists in the database file but setting it here 49 // also covers the first-ever open. 50 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path) 51 db, err := sql.Open("sqlite3", dsn) 52 if err != nil { 53 return nil, fmt.Errorf("open sqlite %q: %w", path, err) 54 } 55 56 // SQLite only supports one writer at a time. Capping max open conns 57 // at 1 keeps database/sql from spinning up extra connections that 58 // will only ever serialize behind that writer. 59 db.SetMaxOpenConns(1) 60 61 s := &store{db: db} 62 if err := s.migrate(context.Background()); err != nil { 63 _ = db.Close() 64 return nil, err 65 } 66 return s, nil 67} 68 69// Close releases the underlying database handle. 70func (s *store) Close() error { 71 return s.db.Close() 72} 73 74// metaCursorKey is the meta-table key under which we persist the 75// jetstream cursor. Pulled out as a constant to avoid drift between 76// load/save sites. 77const metaCursorKey = "jetstream_cursor" 78 79// LoadCursor returns the persisted jetstream cursor, or nil if none has 80// been saved yet (signaling "start from now"). The cursor is a unix 81// microsecond timestamp — see jetstream.Event.TimeUS. 82func (s *store) LoadCursor(ctx context.Context) (*int64, error) { 83 var raw string 84 err := s.db.QueryRowContext(ctx, 85 `SELECT value FROM meta WHERE key = ?`, metaCursorKey, 86 ).Scan(&raw) 87 if errors.Is(err, sql.ErrNoRows) { 88 return nil, nil 89 } 90 if err != nil { 91 return nil, fmt.Errorf("load cursor: %w", err) 92 } 93 v, err := strconv.ParseInt(raw, 10, 64) 94 if err != nil { 95 // A malformed cursor shouldn't wedge startup — log-and-ignore 96 // (return nil) so we just resume from "now". The caller has the 97 // logger; we surface the parse error so it can decide. 98 return nil, fmt.Errorf("parse cursor %q: %w", raw, err) 99 } 100 return &v, nil 101} 102 103// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta 104// row is created on first call and updated thereafter. 105func (s *store) SaveCursor(ctx context.Context, cursor int64) error { 106 _, err := s.db.ExecContext(ctx, 107 `INSERT INTO meta (key, value) VALUES (?, ?) 108 ON CONFLICT(key) DO UPDATE SET value = excluded.value`, 109 metaCursorKey, strconv.FormatInt(cursor, 10), 110 ) 111 if err != nil { 112 return fmt.Errorf("save cursor: %w", err) 113 } 114 return nil 115} 116 117// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member 118// observation. 119func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error { 120 _, err := s.db.ExecContext(ctx, 121 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at) 122 VALUES (?, ?, ?, ?, ?) 123 ON CONFLICT(did, rkey) DO UPDATE SET 124 instance = excluded.instance, 125 subject = excluded.subject, 126 created_at = excluded.created_at`, 127 did, rkey, instance, subject, createdAt, 128 ) 129 if err != nil { 130 return fmt.Errorf("upsert spindle_member: %w", err) 131 } 132 return nil 133} 134 135// DeleteSpindleMember removes a member record by its ATProto identity. 136func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error { 137 _, err := s.db.ExecContext(ctx, 138 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`, 139 did, rkey, 140 ) 141 if err != nil { 142 return fmt.Errorf("delete spindle_member: %w", err) 143 } 144 return nil 145} 146 147// UpsertRepo records (or refreshes) a sh.tangled.repo observation. 148// spindle and repoDid may be empty strings — we store them as such rather 149// than as SQL NULLs to keep the upsert path uniform. 150func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error { 151 _, err := s.db.ExecContext(ctx, 152 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at) 153 VALUES (?, ?, ?, ?, ?, ?, ?) 154 ON CONFLICT(did, rkey) DO UPDATE SET 155 knot = excluded.knot, 156 name = excluded.name, 157 spindle = excluded.spindle, 158 repo_did = excluded.repo_did, 159 created_at = excluded.created_at`, 160 did, rkey, knot, name, spindle, repoDid, createdAt, 161 ) 162 if err != nil { 163 return fmt.Errorf("upsert repo: %w", err) 164 } 165 return nil 166} 167 168// DeleteRepo removes a repo record by its ATProto identity. 169func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error { 170 _, err := s.db.ExecContext(ctx, 171 `DELETE FROM repos WHERE did = ? AND rkey = ?`, 172 did, rkey, 173 ) 174 if err != nil { 175 return fmt.Errorf("delete repo: %w", err) 176 } 177 return nil 178} 179 180// UpsertRepoCollaborator records (or refreshes) a 181// sh.tangled.repo.collaborator observation. 182func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error { 183 _, err := s.db.ExecContext(ctx, 184 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at) 185 VALUES (?, ?, ?, ?, ?, ?) 186 ON CONFLICT(did, rkey) DO UPDATE SET 187 repo = excluded.repo, 188 repo_did = excluded.repo_did, 189 subject = excluded.subject, 190 created_at = excluded.created_at`, 191 did, rkey, repo, repoDid, subject, createdAt, 192 ) 193 if err != nil { 194 return fmt.Errorf("upsert repo_collaborator: %w", err) 195 } 196 return nil 197} 198 199// GetRepo returns the (knot, spindle) currently stored for a (did, rkey) 200// pair. Both are returned as empty strings when no row exists; callers 201// that need to distinguish "absent" from "stored but empty" should 202// pre-check existence themselves. 203// 204// This exists so applyRepo can read the *previous* spindle/knot of a 205// record before applying a mutation, which is what makes it possible to 206// detect transitions like "this repo used to be ours, now it isn't" and 207// trigger a knot unsubscribe. 208func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) { 209 err = s.db.QueryRowContext(ctx, 210 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`, 211 did, rkey, 212 ).Scan(&knot, &spindle) 213 if errors.Is(err, sql.ErrNoRows) { 214 return "", "", nil 215 } 216 if err != nil { 217 return "", "", fmt.Errorf("get repo: %w", err) 218 } 219 return knot, spindle, nil 220} 221 222// AuthorizePipelineActor reports whether a pipeline trigger that 223// arrived on knot for repo (repoOwnerDID, repoName) should be allowed 224// to spawn work on this spindle. Both halves of the answer are 225// derived from the persisted Tangled state in spindle_members and 226// repos: the network's own authorization records, mirrored here from 227// the firehose by jetstream.go. 228// 229// The check is two independent gates; both must hold: 230// 231// 1. **Repo claim**: a sh.tangled.repo record exists naming 232// hostname as its CI spindle AND knot as its host knot, 233// published by repoOwnerDID with the matching name. Without this 234// a knot we already happen to be subscribed to (because *some* 235// repo on it points at us) could otherwise smuggle in pipeline 236// triggers for unrelated repos that never opted into us. 237// 238// 2. **Actor membership**: repoOwnerDID is either the spindle owner 239// itself, or has been authorized via a sh.tangled.spindle.member 240// record published by the spindle owner. The publisher (`did`) 241// of that membership grant must equal ownerDID. Anyone can 242// publish a member record naming anyone, so trusting unsigned 243// grants would let any DID grant itself access. We do NOT match 244// against the record's `instance` column because the upstream 245// ecosystem stores it inconsistently (URL vs hostname), and a 246// tack instance only ever speaks for a single hostname anyway. 247// 248// reason is a short, log-friendly description of why authorization 249// failed when ok is false; it is empty when ok is true. Errors 250// reported here are SQL/IO failures, never policy denials; those 251// surface as ok=false with a populated reason. 252func (s *store) AuthorizePipelineActor( 253 ctx context.Context, 254 hostname, knot, ownerDID, repoOwnerDID, repoName string, 255) (ok bool, reason string, err error) { 256 // We can't authorize an unknown actor or an unidentified repo: 257 // every gate below joins on these. Bail before touching SQLite 258 // so a malformed trigger logs cleanly instead of triggering a 259 // "no rows" miss that could be confused with a real denial. 260 if repoOwnerDID == "" { 261 return false, "trigger has no repo did", nil 262 } 263 if repoName == "" { 264 return false, "trigger has no repo name", nil 265 } 266 267 // Gate 1: this specific repo opted into us on this specific knot. 268 var n int 269 if err := s.db.QueryRowContext(ctx, 270 `SELECT COUNT(*) FROM repos 271 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`, 272 repoOwnerDID, repoName, knot, hostname, 273 ).Scan(&n); err != nil { 274 return false, "", fmt.Errorf("count repo claim: %w", err) 275 } 276 if n == 0 { 277 return false, "no repo record claims this spindle on this knot", nil 278 } 279 280 // Gate 2: actor is the spindle owner, or vouched for by them. 281 if repoOwnerDID == ownerDID { 282 return true, "", nil 283 } 284 if err := s.db.QueryRowContext(ctx, 285 `SELECT COUNT(*) FROM spindle_members 286 WHERE did = ? AND subject = ?`, 287 ownerDID, repoOwnerDID, 288 ).Scan(&n); err != nil { 289 return false, "", fmt.Errorf("count membership: %w", err) 290 } 291 if n == 0 { 292 return false, "actor is not a spindle member", nil 293 } 294 return true, "", nil 295} 296 297// IsKnotWanted reports whether any repo currently stored still names the 298// given hostname as its spindle and the given knot as its host. After a 299// repo update or delete this is the question we ask to decide whether 300// to keep watching that knot or unsubscribe from it. 301func (s *store) IsKnotWanted(ctx context.Context, hostname, knot string) (bool, error) { 302 var n int 303 err := s.db.QueryRowContext(ctx, 304 `SELECT COUNT(*) FROM repos WHERE spindle = ? AND knot = ?`, 305 hostname, knot, 306 ).Scan(&n) 307 if err != nil { 308 return false, fmt.Errorf("count repos for knot: %w", err) 309 } 310 return n > 0, nil 311} 312 313// KnotsForSpindle returns the distinct knot hostnames of all repos that 314// have declared the given spindle hostname as their CI spindle. The knot 315// event-stream subscriber uses this to decide which knots to dial. 316// 317// Returns an empty slice (not nil) when nothing matches, so callers can 318// range over the result without a nil check. 319func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) { 320 rows, err := s.db.QueryContext(ctx, 321 `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`, 322 hostname, 323 ) 324 if err != nil { 325 return nil, fmt.Errorf("query knots: %w", err) 326 } 327 defer rows.Close() 328 329 out := []string{} 330 for rows.Next() { 331 var k string 332 if err := rows.Scan(&k); err != nil { 333 return nil, fmt.Errorf("scan knot: %w", err) 334 } 335 out = append(out, k) 336 } 337 if err := rows.Err(); err != nil { 338 return nil, fmt.Errorf("iterate knots: %w", err) 339 } 340 return out, nil 341} 342 343// DeleteRepoCollaborator removes a collaborator record by its ATProto 344// identity. 345func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error { 346 _, err := s.db.ExecContext(ctx, 347 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`, 348 did, rkey, 349 ) 350 if err != nil { 351 return fmt.Errorf("delete repo_collaborator: %w", err) 352 } 353 return nil 354} 355 356// EventRow is one row of the events table. It represents an outbound 357// record we want to deliver to /events websocket subscribers, in the 358// shape callers actually need (raw record JSON, not stringly-typed). 359type EventRow struct { 360 // Created is the assigned monotonic rowid; doubles as the cursor 361 // value subscribers use to resume. 362 Created int64 363 // Rkey is the ATProto record key. For sh.tangled.pipeline.status 364 // records this is the rkey we mint when publishing. 365 Rkey string 366 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status). 367 Nsid string 368 // EventJSON is the record body verbatim — held as RawMessage so 369 // the /events handler can splice it into the wire envelope without 370 // an unmarshal/remarshal round-trip. 371 EventJSON json.RawMessage 372} 373 374// InsertEvent appends an event row and returns its assigned `created` 375// (rowid) cursor. Storage is the source of truth for fan-out, so we 376// write here even if zero subscribers are connected — a subscriber that 377// connects later (with an old cursor) will pick the row up via 378// EventsAfter. 379// 380// eventJSON must be a valid JSON object; we store it verbatim. Length 381// validation is intentionally absent — the schema accepts arbitrary 382// TEXT and SQLite handles huge blobs fine for our scale. 383func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) { 384 res, err := s.db.ExecContext(ctx, 385 `INSERT INTO events (rkey, nsid, event_json, inserted_at) 386 VALUES (?, ?, ?, ?)`, 387 rkey, nsid, string(eventJSON), 388 time.Now().UTC().Format(time.RFC3339Nano), 389 ) 390 if err != nil { 391 return 0, fmt.Errorf("insert event: %w", err) 392 } 393 id, err := res.LastInsertId() 394 if err != nil { 395 return 0, fmt.Errorf("event last insert id: %w", err) 396 } 397 return id, nil 398} 399 400// BuildkiteBuildRef is the persisted mapping from one Buildkite build 401// to the Tangled pipeline tuple that spawned it. It's the row written 402// by the Buildkite provider at Spawn time and read back from two 403// places: the webhook handler (by build UUID) when an event arrives, 404// and the /logs handler (by knot+rkey+workflow) when an appview 405// client asks for output. 406type BuildkiteBuildRef struct { 407 BuildUUID string 408 BuildNumber int64 409 PipelineSlug string 410 // Org is the Buildkite organisation slug the build was created 411 // against. Persisted at Spawn time so /logs and any other 412 // post-creation API call can target the same org the workflow 413 // originally chose — see the workflow YAML `tack.buildkite.org` 414 // override. Empty means "use the provider's default org", which 415 // is what every row written before the org column existed will 416 // scan as. 417 Org string 418 Knot string 419 PipelineRkey string 420 Workflow string 421 PipelineURI string 422} 423 424// InsertBuildkiteBuild records that a Buildkite build was created on 425// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 426// INSERT OR REPLACE so that an unlikely build-uuid collision (or a 427// Buildkite-side rebuild that re-fires us) just refreshes the row 428// instead of failing. 429func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error { 430 // Capture wall-clock and monotonic-friendly forms once so the two 431 // columns agree on the same instant. created_at is the 432 // human-readable RFC3339Nano string; created_unix_ns is the 433 // integer the lookup orders on (text comparison of nanosecond 434 // timestamps isn't reliable, so we sort on the int instead). 435 now := time.Now().UTC() 436 _, err := s.db.ExecContext(ctx, 437 `INSERT INTO buildkite_builds ( 438 build_uuid, build_number, pipeline_slug, org, 439 knot, pipeline_rkey, workflow, 440 pipeline_uri, created_at, created_unix_ns 441 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 442 ON CONFLICT(build_uuid) DO UPDATE SET 443 build_number = excluded.build_number, 444 pipeline_slug = excluded.pipeline_slug, 445 org = excluded.org, 446 knot = excluded.knot, 447 pipeline_rkey = excluded.pipeline_rkey, 448 workflow = excluded.workflow, 449 pipeline_uri = excluded.pipeline_uri, 450 created_at = excluded.created_at, 451 created_unix_ns = excluded.created_unix_ns`, 452 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org, 453 ref.Knot, ref.PipelineRkey, ref.Workflow, 454 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 455 ) 456 if err != nil { 457 return fmt.Errorf("insert buildkite_build: %w", err) 458 } 459 return nil 460} 461 462// LookupBuildkiteBuildByUUID returns the saved mapping for the given 463// Buildkite build UUID, or nil when no such build is recorded. 464// Returning a nil pointer rather than a sentinel error keeps the 465// webhook handler's "we don't know about this build" branch a simple 466// nil check. 467func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) { 468 var ref BuildkiteBuildRef 469 err := s.db.QueryRowContext(ctx, 470 `SELECT build_uuid, build_number, pipeline_slug, org, 471 knot, pipeline_rkey, workflow, pipeline_uri 472 FROM buildkite_builds WHERE build_uuid = ?`, 473 buildUUID, 474 ).Scan( 475 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 476 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 477 ) 478 if errors.Is(err, sql.ErrNoRows) { 479 return nil, nil 480 } 481 if err != nil { 482 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err) 483 } 484 return &ref, nil 485} 486 487// LookupBuildkiteBuildByTuple finds the most recently created build 488// for (knot, pipelineRkey, workflow). Returns nil when no build has 489// been recorded for that tuple — used by /logs to translate the 490// appview's path-based identity back into something Buildkite knows. 491// 492// "Most recent" matters because a workflow may have multiple builds 493// over time (rebuilds, re-triggers). We always serve logs for the 494// latest run; older runs are still queryable by build UUID directly 495// if anyone ever wants that. 496// 497// Ordering is on created_unix_ns (a monotonic int) rather than 498// created_at. Text comparison of RFC3339Nano timestamps is not 499// reliable across nanosecond precision, which used to make this 500// query occasionally pick the wrong run. created_at and build_number 501// are kept as deterministic tiebreakers for legacy rows that pre-date 502// the new column and still scan as 0. 503func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) { 504 var ref BuildkiteBuildRef 505 err := s.db.QueryRowContext(ctx, 506 `SELECT build_uuid, build_number, pipeline_slug, org, 507 knot, pipeline_rkey, workflow, pipeline_uri 508 FROM buildkite_builds 509 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ? 510 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC 511 LIMIT 1`, 512 knot, pipelineRkey, workflow, 513 ).Scan( 514 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 515 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 516 ) 517 if errors.Is(err, sql.ErrNoRows) { 518 return nil, nil 519 } 520 if err != nil { 521 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err) 522 } 523 return &ref, nil 524} 525 526// EventsAfter returns every event row with `created` strictly greater 527// than cursor, in cursor order. Used by /events to backfill a 528// reconnecting subscriber and to drain newly-published rows on each 529// broker notification. 530// 531// Pass cursor=0 to get the full log from the beginning, which is what 532// happens when a subscriber connects without a ?cursor= query param. 533func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) { 534 rows, err := s.db.QueryContext(ctx, 535 `SELECT created, rkey, nsid, event_json 536 FROM events 537 WHERE created > ? 538 ORDER BY created ASC`, 539 cursor, 540 ) 541 if err != nil { 542 return nil, fmt.Errorf("query events: %w", err) 543 } 544 defer rows.Close() 545 546 out := []EventRow{} 547 for rows.Next() { 548 var ( 549 ev EventRow 550 raw string 551 ) 552 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil { 553 return nil, fmt.Errorf("scan event: %w", err) 554 } 555 ev.EventJSON = json.RawMessage(raw) 556 out = append(out, ev) 557 } 558 if err := rows.Err(); err != nil { 559 return nil, fmt.Errorf("iterate events: %w", err) 560 } 561 return out, nil 562}