Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// SQLite-backed persistence for tack. Holds: 4// 5// - the jetstream cursor, so restarts resume the firehose where the 6// previous run left off 7// - mirrored Tangled state (spindle members, repos, collaborators) 8// and the authorization helpers built on top of it, used both to 9// gate inbound pipeline triggers and to decide which knots we 10// should be subscribed to 11// - the outbound event log fed to /events websocket subscribers 12// - the Buildkite build → pipeline mapping the webhook and /logs 13// handlers look up 14// 15// Per-method docs go into more detail. 16 17import ( 18 "context" 19 "database/sql" 20 "encoding/json" 21 "errors" 22 "fmt" 23 "strconv" 24 "time" 25 26 _ "github.com/mattn/go-sqlite3" 27) 28 29// store wraps the SQLite handle and exposes the small set of operations 30// the rest of tack needs. Keeping the surface narrow lets the persistence 31// layer be swapped or mocked later without rewriting callers. 32type store struct { 33 db *sql.DB 34} 35 36// openStore opens (or creates) the SQLite database at path and applies 37// the schema. It also flips on WAL + NORMAL synchronous, which is the 38// usual "this is a long-running server, not a one-shot script" config: 39// concurrent reads don't block the single writer, and we trade a little 40// crash-safety on power loss for a lot of write throughput. 41func openStore(path string) (*store, error) { 42 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode, 43 // _synchronous, _foreign_keys) and applies them on each new connection. 44 // journal_mode=WAL persists in the database file but setting it here 45 // also covers the first-ever open. 46 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path) 47 db, err := sql.Open("sqlite3", dsn) 48 if err != nil { 49 return nil, fmt.Errorf("open sqlite %q: %w", path, err) 50 } 51 52 // SQLite only supports one writer at a time. Capping max open conns 53 // at 1 keeps database/sql from spinning up extra connections that 54 // will only ever serialize behind that writer. 55 db.SetMaxOpenConns(1) 56 57 s := &store{db: db} 58 if err := s.migrate(context.Background()); err != nil { 59 _ = db.Close() 60 return nil, err 61 } 62 return s, nil 63} 64 65// Close releases the underlying database handle. 66func (s *store) Close() error { 67 return s.db.Close() 68} 69 70// metaCursorKey is the meta-table key under which we persist the 71// jetstream cursor. Pulled out as a constant to avoid drift between 72// load/save sites. 73const metaCursorKey = "jetstream_cursor" 74 75// LoadCursor returns the persisted jetstream cursor, or nil if none has 76// been saved yet (signaling "start from now"). The cursor is a unix 77// microsecond timestamp — see jetstream.Event.TimeUS. 78func (s *store) LoadCursor(ctx context.Context) (*int64, error) { 79 var raw string 80 err := s.db.QueryRowContext(ctx, 81 `SELECT value FROM meta WHERE key = ?`, metaCursorKey, 82 ).Scan(&raw) 83 if errors.Is(err, sql.ErrNoRows) { 84 return nil, nil 85 } 86 if err != nil { 87 return nil, fmt.Errorf("load cursor: %w", err) 88 } 89 v, err := strconv.ParseInt(raw, 10, 64) 90 if err != nil { 91 // A malformed cursor shouldn't wedge startup — log-and-ignore 92 // (return nil) so we just resume from "now". The caller has the 93 // logger; we surface the parse error so it can decide. 94 return nil, fmt.Errorf("parse cursor %q: %w", raw, err) 95 } 96 return &v, nil 97} 98 99// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta 100// row is created on first call and updated thereafter. 101func (s *store) SaveCursor(ctx context.Context, cursor int64) error { 102 _, err := s.db.ExecContext(ctx, 103 `INSERT INTO meta (key, value) VALUES (?, ?) 104 ON CONFLICT(key) DO UPDATE SET value = excluded.value`, 105 metaCursorKey, strconv.FormatInt(cursor, 10), 106 ) 107 if err != nil { 108 return fmt.Errorf("save cursor: %w", err) 109 } 110 return nil 111} 112 113// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member 114// observation. 115func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error { 116 _, err := s.db.ExecContext(ctx, 117 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at) 118 VALUES (?, ?, ?, ?, ?) 119 ON CONFLICT(did, rkey) DO UPDATE SET 120 instance = excluded.instance, 121 subject = excluded.subject, 122 created_at = excluded.created_at`, 123 did, rkey, instance, subject, createdAt, 124 ) 125 if err != nil { 126 return fmt.Errorf("upsert spindle_member: %w", err) 127 } 128 return nil 129} 130 131// DeleteSpindleMember removes a member record by its ATProto identity. 132func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error { 133 _, err := s.db.ExecContext(ctx, 134 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`, 135 did, rkey, 136 ) 137 if err != nil { 138 return fmt.Errorf("delete spindle_member: %w", err) 139 } 140 return nil 141} 142 143// UpsertRepo records (or refreshes) a sh.tangled.repo observation. 144// spindle and repoDid may be empty strings — we store them as such rather 145// than as SQL NULLs to keep the upsert path uniform. 146func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error { 147 _, err := s.db.ExecContext(ctx, 148 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at) 149 VALUES (?, ?, ?, ?, ?, ?, ?) 150 ON CONFLICT(did, rkey) DO UPDATE SET 151 knot = excluded.knot, 152 name = excluded.name, 153 spindle = excluded.spindle, 154 repo_did = excluded.repo_did, 155 created_at = excluded.created_at`, 156 did, rkey, knot, name, spindle, repoDid, createdAt, 157 ) 158 if err != nil { 159 return fmt.Errorf("upsert repo: %w", err) 160 } 161 return nil 162} 163 164// DeleteRepo removes a repo record by its ATProto identity. 165func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error { 166 _, err := s.db.ExecContext(ctx, 167 `DELETE FROM repos WHERE did = ? AND rkey = ?`, 168 did, rkey, 169 ) 170 if err != nil { 171 return fmt.Errorf("delete repo: %w", err) 172 } 173 return nil 174} 175 176// UpsertRepoCollaborator records (or refreshes) a 177// sh.tangled.repo.collaborator observation. 178func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error { 179 _, err := s.db.ExecContext(ctx, 180 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at) 181 VALUES (?, ?, ?, ?, ?, ?) 182 ON CONFLICT(did, rkey) DO UPDATE SET 183 repo = excluded.repo, 184 repo_did = excluded.repo_did, 185 subject = excluded.subject, 186 created_at = excluded.created_at`, 187 did, rkey, repo, repoDid, subject, createdAt, 188 ) 189 if err != nil { 190 return fmt.Errorf("upsert repo_collaborator: %w", err) 191 } 192 return nil 193} 194 195// GetRepo returns the (knot, spindle) currently stored for a (did, rkey) 196// pair. Both are returned as empty strings when no row exists; callers 197// that need to distinguish "absent" from "stored but empty" should 198// pre-check existence themselves. 199// 200// This exists so applyRepo can read the *previous* spindle/knot of a 201// record before applying a mutation, which is what makes it possible to 202// detect transitions like "this repo used to be ours, now it isn't" and 203// trigger a knot unsubscribe. 204func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) { 205 err = s.db.QueryRowContext(ctx, 206 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`, 207 did, rkey, 208 ).Scan(&knot, &spindle) 209 if errors.Is(err, sql.ErrNoRows) { 210 return "", "", nil 211 } 212 if err != nil { 213 return "", "", fmt.Errorf("get repo: %w", err) 214 } 215 return knot, spindle, nil 216} 217 218// AuthorizePipelineActor reports whether a pipeline trigger that 219// arrived on knot for repo (repoOwnerDID, repoName) should be allowed 220// to spawn work on this spindle. Both halves of the answer are 221// derived from the persisted Tangled state in spindle_members and 222// repos: the network's own authorization records, mirrored here from 223// the firehose by jetstream.go. 224// 225// The check is two independent gates; both must hold: 226// 227// 1. **Repo claim**: a sh.tangled.repo record exists naming 228// hostname as its CI spindle AND knot as its host knot, 229// published by repoOwnerDID with the matching name. Without this 230// a knot we already happen to be subscribed to (because *some* 231// repo on it points at us) could otherwise smuggle in pipeline 232// triggers for unrelated repos that never opted into us. 233// 234// 2. **Actor membership**: repoOwnerDID is either the spindle owner 235// itself, or has been authorized via a sh.tangled.spindle.member 236// record published by the spindle owner. The publisher (`did`) 237// of that membership grant must equal ownerDID. Anyone can 238// publish a member record naming anyone, so trusting unsigned 239// grants would let any DID grant itself access. We do NOT match 240// against the record's `instance` column because the upstream 241// ecosystem stores it inconsistently (URL vs hostname), and a 242// tack instance only ever speaks for a single hostname anyway. 243// 244// reason is a short, log-friendly description of why authorization 245// failed when ok is false; it is empty when ok is true. Errors 246// reported here are SQL/IO failures, never policy denials; those 247// surface as ok=false with a populated reason. 248func (s *store) AuthorizePipelineActor( 249 ctx context.Context, 250 hostname, knot, ownerDID, repoOwnerDID, repoName string, 251) (ok bool, reason string, err error) { 252 // We can't authorize an unknown actor or an unidentified repo: 253 // every gate below joins on these. Bail before touching SQLite 254 // so a malformed trigger logs cleanly instead of triggering a 255 // "no rows" miss that could be confused with a real denial. 256 if repoOwnerDID == "" { 257 return false, "trigger has no repo did", nil 258 } 259 if repoName == "" { 260 return false, "trigger has no repo name", nil 261 } 262 263 // Gate 1: this specific repo opted into us on this specific knot. 264 var n int 265 if err := s.db.QueryRowContext(ctx, 266 `SELECT COUNT(*) FROM repos 267 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`, 268 repoOwnerDID, repoName, knot, hostname, 269 ).Scan(&n); err != nil { 270 return false, "", fmt.Errorf("count repo claim: %w", err) 271 } 272 if n == 0 { 273 return false, "no repo record claims this spindle on this knot", nil 274 } 275 276 // Gate 2: actor is the spindle owner, or vouched for by them. 277 if repoOwnerDID == ownerDID { 278 return true, "", nil 279 } 280 if err := s.db.QueryRowContext(ctx, 281 `SELECT COUNT(*) FROM spindle_members 282 WHERE did = ? AND subject = ?`, 283 ownerDID, repoOwnerDID, 284 ).Scan(&n); err != nil { 285 return false, "", fmt.Errorf("count membership: %w", err) 286 } 287 if n == 0 { 288 return false, "actor is not a spindle member", nil 289 } 290 return true, "", nil 291} 292 293// IsKnotWanted reports whether any *authorized* repo currently stored 294// still names the given hostname as its spindle and the given knot as 295// its host. After a repo update or delete this is the question we ask 296// to decide whether to keep watching that knot or unsubscribe from it. 297// 298// "Authorized" means the repo's publisher (the row's `did` column) is 299// either the spindle owner or has been vouched for by the spindle owner 300// via a sh.tangled.spindle.member record. Without this filter, a non- 301// member could pin us to an arbitrary attacker-chosen knot just by 302// publishing a sh.tangled.repo record naming us as its spindle. See 303// the matching gate in IsAuthorizedActor and AuthorizePipelineActor. 304func (s *store) IsKnotWanted(ctx context.Context, hostname, ownerDID, knot string) (bool, error) { 305 var n int 306 err := s.db.QueryRowContext(ctx, 307 `SELECT COUNT(*) FROM repos r 308 WHERE r.spindle = ? AND r.knot = ? 309 AND ( 310 r.did = ? 311 OR EXISTS ( 312 SELECT 1 FROM spindle_members m 313 WHERE m.did = ? AND m.subject = r.did 314 ) 315 )`, 316 hostname, knot, ownerDID, ownerDID, 317 ).Scan(&n) 318 if err != nil { 319 return false, fmt.Errorf("count repos for knot: %w", err) 320 } 321 return n > 0, nil 322} 323 324// IsAuthorizedActor reports whether did is the spindle owner or has 325// been authorized by the spindle owner via a sh.tangled.spindle.member 326// record. The membership record's publisher (its `did` column) must 327// equal ownerDID; anyone can publish a membership record naming 328// anyone, so trusting unsigned grants would let any DID grant itself 329// access. This is the same trust rule AuthorizePipelineActor enforces; 330// it's pulled out here so knot-subscription decisions in jetstream.go 331// gate on the same check as pipeline-spawning decisions in knot.go. 332func (s *store) IsAuthorizedActor(ctx context.Context, ownerDID, did string) (bool, error) { 333 if did == "" { 334 return false, nil 335 } 336 if did == ownerDID { 337 return true, nil 338 } 339 var n int 340 err := s.db.QueryRowContext(ctx, 341 `SELECT COUNT(*) FROM spindle_members 342 WHERE did = ? AND subject = ?`, 343 ownerDID, did, 344 ).Scan(&n) 345 if err != nil { 346 return false, fmt.Errorf("count membership: %w", err) 347 } 348 return n > 0, nil 349} 350 351// GetSpindleMember returns the subject DID currently stored for a 352// (did, rkey) spindle.member row, or "" when no such row exists. 353// 354// Used by the jetstream handler to learn whose authorization a 355// delete/update of a membership record affects, so it can reconcile 356// that subject's knot subscriptions in the same step as the mutation. 357func (s *store) GetSpindleMember(ctx context.Context, did, rkey string) (string, error) { 358 var subject string 359 err := s.db.QueryRowContext(ctx, 360 `SELECT subject FROM spindle_members WHERE did = ? AND rkey = ?`, 361 did, rkey, 362 ).Scan(&subject) 363 if errors.Is(err, sql.ErrNoRows) { 364 return "", nil 365 } 366 if err != nil { 367 return "", fmt.Errorf("get spindle_member: %w", err) 368 } 369 return subject, nil 370} 371 372// KnotsForOwner returns the distinct knot hostnames of repos published 373// by `did` whose spindle field equals hostname. Used after a membership 374// change to find which knots that DID's repos want, so we can subscribe 375// (newly granted) or potentially unsubscribe (revoked) in lockstep with 376// the grant. 377// 378// Returns an empty slice (not nil) on no matches so callers can range 379// over the result without a nil check. 380func (s *store) KnotsForOwner(ctx context.Context, hostname, did string) ([]string, error) { 381 rows, err := s.db.QueryContext(ctx, 382 `SELECT DISTINCT knot FROM repos 383 WHERE did = ? AND spindle = ? AND knot <> ''`, 384 did, hostname, 385 ) 386 if err != nil { 387 return nil, fmt.Errorf("query knots for owner: %w", err) 388 } 389 defer rows.Close() 390 out := []string{} 391 for rows.Next() { 392 var k string 393 if err := rows.Scan(&k); err != nil { 394 return nil, fmt.Errorf("scan knot: %w", err) 395 } 396 out = append(out, k) 397 } 398 if err := rows.Err(); err != nil { 399 return nil, fmt.Errorf("iterate knots: %w", err) 400 } 401 return out, nil 402} 403 404// KnotsForSpindle returns the distinct knot hostnames of all repos that 405// (a) have declared the given spindle hostname as their CI spindle, and 406// (b) were published by an *authorized* DID: either the spindle owner 407// itself or a member they vouched for via sh.tangled.spindle.member. 408// 409// The membership filter is critical: without it, any DID that publishes 410// a sh.tangled.repo record naming us as its spindle could force us to 411// dial an attacker-chosen knot at startup. See IsAuthorizedActor for 412// the matching trust check applied at firehose-event time. 413// 414// Returns an empty slice (not nil) when nothing matches, so callers can 415// range over the result without a nil check. 416func (s *store) KnotsForSpindle(ctx context.Context, hostname, ownerDID string) ([]string, error) { 417 rows, err := s.db.QueryContext(ctx, 418 `SELECT DISTINCT r.knot FROM repos r 419 WHERE r.spindle = ? AND r.knot <> '' 420 AND ( 421 r.did = ? 422 OR EXISTS ( 423 SELECT 1 FROM spindle_members m 424 WHERE m.did = ? AND m.subject = r.did 425 ) 426 )`, 427 hostname, ownerDID, ownerDID, 428 ) 429 if err != nil { 430 return nil, fmt.Errorf("query knots: %w", err) 431 } 432 defer rows.Close() 433 434 out := []string{} 435 for rows.Next() { 436 var k string 437 if err := rows.Scan(&k); err != nil { 438 return nil, fmt.Errorf("scan knot: %w", err) 439 } 440 out = append(out, k) 441 } 442 if err := rows.Err(); err != nil { 443 return nil, fmt.Errorf("iterate knots: %w", err) 444 } 445 return out, nil 446} 447 448// DeleteRepoCollaborator removes a collaborator record by its ATProto 449// identity. 450func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error { 451 _, err := s.db.ExecContext(ctx, 452 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`, 453 did, rkey, 454 ) 455 if err != nil { 456 return fmt.Errorf("delete repo_collaborator: %w", err) 457 } 458 return nil 459} 460 461// EventRow is one row of the events table. It represents an outbound 462// record we want to deliver to /events websocket subscribers, in the 463// shape callers actually need (raw record JSON, not stringly-typed). 464type EventRow struct { 465 // Created is the assigned monotonic rowid; doubles as the cursor 466 // value subscribers use to resume. 467 Created int64 468 // Rkey is the ATProto record key. For sh.tangled.pipeline.status 469 // records this is the rkey we mint when publishing. 470 Rkey string 471 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status). 472 Nsid string 473 // EventJSON is the record body verbatim — held as RawMessage so 474 // the /events handler can splice it into the wire envelope without 475 // an unmarshal/remarshal round-trip. 476 EventJSON json.RawMessage 477} 478 479// InsertEvent appends an event row and returns its assigned `created` 480// (rowid) cursor. Storage is the source of truth for fan-out, so we 481// write here even if zero subscribers are connected — a subscriber that 482// connects later (with an old cursor) will pick the row up via 483// EventsAfter. 484// 485// eventJSON must be a valid JSON object; we store it verbatim. Length 486// validation is intentionally absent — the schema accepts arbitrary 487// TEXT and SQLite handles huge blobs fine for our scale. 488func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) { 489 res, err := s.db.ExecContext(ctx, 490 `INSERT INTO events (rkey, nsid, event_json, inserted_at) 491 VALUES (?, ?, ?, ?)`, 492 rkey, nsid, string(eventJSON), 493 time.Now().UTC().Format(time.RFC3339Nano), 494 ) 495 if err != nil { 496 return 0, fmt.Errorf("insert event: %w", err) 497 } 498 id, err := res.LastInsertId() 499 if err != nil { 500 return 0, fmt.Errorf("event last insert id: %w", err) 501 } 502 return id, nil 503} 504 505// BuildkiteBuildRef is the persisted mapping from one Buildkite build 506// to the Tangled pipeline tuple that spawned it. It's the row written 507// by the Buildkite provider at Spawn time and read back from two 508// places: the webhook handler (by build UUID) when an event arrives, 509// and the /logs handler (by knot+rkey+workflow) when an appview 510// client asks for output. 511type BuildkiteBuildRef struct { 512 BuildUUID string 513 BuildNumber int64 514 PipelineSlug string 515 // Org is the Buildkite organisation slug the build was created 516 // against. Persisted at Spawn time so /logs and any other 517 // post-creation API call can target the same org the workflow 518 // originally chose — see the workflow YAML `tack.buildkite.org` 519 // override. Empty means "use the provider's default org", which 520 // is what every row written before the org column existed will 521 // scan as. 522 Org string 523 Knot string 524 PipelineRkey string 525 Workflow string 526 PipelineURI string 527} 528 529// TektonRunRef is the persisted link from a Tangled workflow tuple 530// to the in-cluster PipelineRun tack created for it. The tuple is the 531// user-facing identity the appview knows; namespace/name/uid are the 532// Kubernetes identity needed for status watching and log lookup. 533type TektonRunRef struct { 534 Knot string 535 PipelineRkey string 536 Workflow string 537 Namespace string 538 PipelineRunName string 539 PipelineRunUID string 540 PipelineName string 541 PipelineURI string 542} 543 544// InsertBuildkiteBuild records that a Buildkite build was created on 545// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 546// INSERT OR REPLACE so that an unlikely build-uuid collision (or a 547// Buildkite-side rebuild that re-fires us) just refreshes the row 548// instead of failing. 549func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error { 550 // Capture wall-clock and monotonic-friendly forms once so the two 551 // columns agree on the same instant. created_at is the 552 // human-readable RFC3339Nano string; created_unix_ns is the 553 // integer the lookup orders on (text comparison of nanosecond 554 // timestamps isn't reliable, so we sort on the int instead). 555 now := time.Now().UTC() 556 _, err := s.db.ExecContext(ctx, 557 `INSERT INTO buildkite_builds ( 558 build_uuid, build_number, pipeline_slug, org, 559 knot, pipeline_rkey, workflow, 560 pipeline_uri, created_at, created_unix_ns 561 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 562 ON CONFLICT(build_uuid) DO UPDATE SET 563 build_number = excluded.build_number, 564 pipeline_slug = excluded.pipeline_slug, 565 org = excluded.org, 566 knot = excluded.knot, 567 pipeline_rkey = excluded.pipeline_rkey, 568 workflow = excluded.workflow, 569 pipeline_uri = excluded.pipeline_uri, 570 created_at = excluded.created_at, 571 created_unix_ns = excluded.created_unix_ns`, 572 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org, 573 ref.Knot, ref.PipelineRkey, ref.Workflow, 574 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 575 ) 576 if err != nil { 577 return fmt.Errorf("insert buildkite_build: %w", err) 578 } 579 return nil 580} 581 582// LookupBuildkiteBuildByUUID returns the saved mapping for the given 583// Buildkite build UUID, or nil when no such build is recorded. 584// Returning a nil pointer rather than a sentinel error keeps the 585// webhook handler's "we don't know about this build" branch a simple 586// nil check. 587func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) { 588 var ref BuildkiteBuildRef 589 err := s.db.QueryRowContext(ctx, 590 `SELECT build_uuid, build_number, pipeline_slug, org, 591 knot, pipeline_rkey, workflow, pipeline_uri 592 FROM buildkite_builds WHERE build_uuid = ?`, 593 buildUUID, 594 ).Scan( 595 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 596 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 597 ) 598 if errors.Is(err, sql.ErrNoRows) { 599 return nil, nil 600 } 601 if err != nil { 602 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err) 603 } 604 return &ref, nil 605} 606 607// LookupBuildkiteBuildByTuple finds the most recently created build 608// for (knot, pipelineRkey, workflow). Returns nil when no build has 609// been recorded for that tuple — used by /logs to translate the 610// appview's path-based identity back into something Buildkite knows. 611// 612// "Most recent" matters because a workflow may have multiple builds 613// over time (rebuilds, re-triggers). We always serve logs for the 614// latest run; older runs are still queryable by build UUID directly 615// if anyone ever wants that. 616// 617// Ordering is on created_unix_ns (a monotonic int) rather than 618// created_at. Text comparison of RFC3339Nano timestamps is not 619// reliable across nanosecond precision, which used to make this 620// query occasionally pick the wrong run. created_at and build_number 621// are kept as deterministic tiebreakers for legacy rows that pre-date 622// the new column and still scan as 0. 623func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) { 624 var ref BuildkiteBuildRef 625 err := s.db.QueryRowContext(ctx, 626 `SELECT build_uuid, build_number, pipeline_slug, org, 627 knot, pipeline_rkey, workflow, pipeline_uri 628 FROM buildkite_builds 629 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ? 630 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC 631 LIMIT 1`, 632 knot, pipelineRkey, workflow, 633 ).Scan( 634 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 635 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 636 ) 637 if errors.Is(err, sql.ErrNoRows) { 638 return nil, nil 639 } 640 if err != nil { 641 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err) 642 } 643 return &ref, nil 644} 645 646// InsertTektonRun records the latest PipelineRun created for a Tangled 647// workflow tuple. Reusing the tuple as the primary key intentionally 648// makes /logs resolve to the newest run for that workflow identity. 649func (s *store) InsertTektonRun(ctx context.Context, ref TektonRunRef) error { 650 now := time.Now().UTC() 651 _, err := s.db.ExecContext(ctx, 652 `INSERT INTO tekton_runs ( 653 knot, pipeline_rkey, workflow, 654 namespace, pipeline_run_name, pipeline_run_uid, 655 pipeline_name, pipeline_uri, created_at, created_unix_ns 656 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 657 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET 658 namespace = excluded.namespace, 659 pipeline_run_name = excluded.pipeline_run_name, 660 pipeline_run_uid = excluded.pipeline_run_uid, 661 pipeline_name = excluded.pipeline_name, 662 pipeline_uri = excluded.pipeline_uri, 663 created_at = excluded.created_at, 664 created_unix_ns = excluded.created_unix_ns`, 665 ref.Knot, ref.PipelineRkey, ref.Workflow, 666 ref.Namespace, ref.PipelineRunName, ref.PipelineRunUID, 667 ref.PipelineName, ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 668 ) 669 if err != nil { 670 return fmt.Errorf("insert tekton_run: %w", err) 671 } 672 return nil 673} 674 675// LookupTektonRunByTuple resolves the appview's path-based identity to 676// the concrete PipelineRun tack created in Kubernetes. 677func (s *store) LookupTektonRunByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*TektonRunRef, error) { 678 var ref TektonRunRef 679 err := s.db.QueryRowContext(ctx, 680 `SELECT knot, pipeline_rkey, workflow, 681 namespace, pipeline_run_name, pipeline_run_uid, 682 pipeline_name, pipeline_uri 683 FROM tekton_runs 684 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`, 685 knot, pipelineRkey, workflow, 686 ).Scan( 687 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, 688 &ref.Namespace, &ref.PipelineRunName, &ref.PipelineRunUID, 689 &ref.PipelineName, &ref.PipelineURI, 690 ) 691 if errors.Is(err, sql.ErrNoRows) { 692 return nil, nil 693 } 694 if err != nil { 695 return nil, fmt.Errorf("lookup tekton_run by tuple: %w", err) 696 } 697 return &ref, nil 698} 699 700// EventsAfter returns every event row with `created` strictly greater 701// than cursor, in cursor order. Used by /events to backfill a 702// reconnecting subscriber and to drain newly-published rows on each 703// broker notification. 704// 705// Pass cursor=0 to get the full log from the beginning, which is what 706// happens when a subscriber connects without a ?cursor= query param. 707func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) { 708 rows, err := s.db.QueryContext(ctx, 709 `SELECT created, rkey, nsid, event_json 710 FROM events 711 WHERE created > ? 712 ORDER BY created ASC`, 713 cursor, 714 ) 715 if err != nil { 716 return nil, fmt.Errorf("query events: %w", err) 717 } 718 defer rows.Close() 719 720 out := []EventRow{} 721 for rows.Next() { 722 var ( 723 ev EventRow 724 raw string 725 ) 726 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil { 727 return nil, fmt.Errorf("scan event: %w", err) 728 } 729 ev.EventJSON = json.RawMessage(raw) 730 out = append(out, ev) 731 } 732 if err := rows.Err(); err != nil { 733 return nil, fmt.Errorf("iterate events: %w", err) 734 } 735 return out, nil 736}