Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 30 kB View raw
1package main 2 3// SQLite-backed persistence for tack. Holds: 4// 5// - the jetstream cursor, so restarts resume the firehose where the 6// previous run left off 7// - mirrored Tangled state (spindle members, repos, collaborators) 8// and the authorization helpers built on top of it, used both to 9// gate inbound pipeline triggers and to decide which knots we 10// should be subscribed to 11// - the outbound event log fed to /events websocket subscribers 12// - the Buildkite build → pipeline mapping the webhook and /logs 13// handlers look up 14// 15// Per-method docs go into more detail. 16 17import ( 18 "context" 19 "database/sql" 20 "encoding/json" 21 "errors" 22 "fmt" 23 "strconv" 24 "time" 25 26 _ "github.com/mattn/go-sqlite3" 27) 28 29// store wraps the SQLite handle and exposes the small set of operations 30// the rest of tack needs. Keeping the surface narrow lets the persistence 31// layer be swapped or mocked later without rewriting callers. 32type store struct { 33 db *sql.DB 34} 35 36// openStore opens (or creates) the SQLite database at path and applies 37// the schema. It also flips on WAL + NORMAL synchronous, which is the 38// usual "this is a long-running server, not a one-shot script" config: 39// concurrent reads don't block the single writer, and we trade a little 40// crash-safety on power loss for a lot of write throughput. 41func openStore(path string) (*store, error) { 42 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode, 43 // _synchronous, _foreign_keys) and applies them on each new connection. 44 // journal_mode=WAL persists in the database file but setting it here 45 // also covers the first-ever open. 46 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path) 47 db, err := sql.Open("sqlite3", dsn) 48 if err != nil { 49 return nil, fmt.Errorf("open sqlite %q: %w", path, err) 50 } 51 52 // SQLite only supports one writer at a time. Capping max open conns 53 // at 1 keeps database/sql from spinning up extra connections that 54 // will only ever serialize behind that writer. 55 db.SetMaxOpenConns(1) 56 57 s := &store{db: db} 58 if err := s.migrate(context.Background()); err != nil { 59 _ = db.Close() 60 return nil, err 61 } 62 return s, nil 63} 64 65// Close releases the underlying database handle. 66func (s *store) Close() error { 67 return s.db.Close() 68} 69 70// metaCursorKey is the meta-table key under which we persist the 71// jetstream cursor. Pulled out as a constant to avoid drift between 72// load/save sites. 73const metaCursorKey = "jetstream_cursor" 74 75// LoadCursor returns the persisted jetstream cursor, or nil if none has 76// been saved yet (signaling "start from now"). The cursor is a unix 77// microsecond timestamp — see jetstream.Event.TimeUS. 78func (s *store) LoadCursor(ctx context.Context) (*int64, error) { 79 var raw string 80 err := s.db.QueryRowContext(ctx, 81 `SELECT value FROM meta WHERE key = ?`, metaCursorKey, 82 ).Scan(&raw) 83 if errors.Is(err, sql.ErrNoRows) { 84 return nil, nil 85 } 86 if err != nil { 87 return nil, fmt.Errorf("load cursor: %w", err) 88 } 89 v, err := strconv.ParseInt(raw, 10, 64) 90 if err != nil { 91 // A malformed cursor shouldn't wedge startup — log-and-ignore 92 // (return nil) so we just resume from "now". The caller has the 93 // logger; we surface the parse error so it can decide. 94 return nil, fmt.Errorf("parse cursor %q: %w", raw, err) 95 } 96 return &v, nil 97} 98 99// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta 100// row is created on first call and updated thereafter. 101func (s *store) SaveCursor(ctx context.Context, cursor int64) error { 102 _, err := s.db.ExecContext(ctx, 103 `INSERT INTO meta (key, value) VALUES (?, ?) 104 ON CONFLICT(key) DO UPDATE SET value = excluded.value`, 105 metaCursorKey, strconv.FormatInt(cursor, 10), 106 ) 107 if err != nil { 108 return fmt.Errorf("save cursor: %w", err) 109 } 110 return nil 111} 112 113// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member 114// observation. 115func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error { 116 _, err := s.db.ExecContext(ctx, 117 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at) 118 VALUES (?, ?, ?, ?, ?) 119 ON CONFLICT(did, rkey) DO UPDATE SET 120 instance = excluded.instance, 121 subject = excluded.subject, 122 created_at = excluded.created_at`, 123 did, rkey, instance, subject, createdAt, 124 ) 125 if err != nil { 126 return fmt.Errorf("upsert spindle_member: %w", err) 127 } 128 return nil 129} 130 131// DeleteSpindleMember removes a member record by its ATProto identity. 132func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error { 133 _, err := s.db.ExecContext(ctx, 134 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`, 135 did, rkey, 136 ) 137 if err != nil { 138 return fmt.Errorf("delete spindle_member: %w", err) 139 } 140 return nil 141} 142 143// UpsertRepo records (or refreshes) a sh.tangled.repo observation. 144// spindle and repoDid may be empty strings — we store them as such rather 145// than as SQL NULLs to keep the upsert path uniform. 146func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error { 147 _, err := s.db.ExecContext(ctx, 148 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at) 149 VALUES (?, ?, ?, ?, ?, ?, ?) 150 ON CONFLICT(did, rkey) DO UPDATE SET 151 knot = excluded.knot, 152 name = excluded.name, 153 spindle = excluded.spindle, 154 repo_did = excluded.repo_did, 155 created_at = excluded.created_at`, 156 did, rkey, knot, name, spindle, repoDid, createdAt, 157 ) 158 if err != nil { 159 return fmt.Errorf("upsert repo: %w", err) 160 } 161 return nil 162} 163 164// DeleteRepo removes a repo record by its ATProto identity. 165func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error { 166 _, err := s.db.ExecContext(ctx, 167 `DELETE FROM repos WHERE did = ? AND rkey = ?`, 168 did, rkey, 169 ) 170 if err != nil { 171 return fmt.Errorf("delete repo: %w", err) 172 } 173 return nil 174} 175 176// UpsertRepoCollaborator records (or refreshes) a 177// sh.tangled.repo.collaborator observation. 178func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error { 179 _, err := s.db.ExecContext(ctx, 180 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at) 181 VALUES (?, ?, ?, ?, ?, ?) 182 ON CONFLICT(did, rkey) DO UPDATE SET 183 repo = excluded.repo, 184 repo_did = excluded.repo_did, 185 subject = excluded.subject, 186 created_at = excluded.created_at`, 187 did, rkey, repo, repoDid, subject, createdAt, 188 ) 189 if err != nil { 190 return fmt.Errorf("upsert repo_collaborator: %w", err) 191 } 192 return nil 193} 194 195// GetRepo returns the (knot, spindle) currently stored for a (did, rkey) 196// pair. Both are returned as empty strings when no row exists; callers 197// that need to distinguish "absent" from "stored but empty" should 198// pre-check existence themselves. 199// 200// This exists so applyRepo can read the *previous* spindle/knot of a 201// record before applying a mutation, which is what makes it possible to 202// detect transitions like "this repo used to be ours, now it isn't" and 203// trigger a knot unsubscribe. 204func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) { 205 err = s.db.QueryRowContext(ctx, 206 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`, 207 did, rkey, 208 ).Scan(&knot, &spindle) 209 if errors.Is(err, sql.ErrNoRows) { 210 return "", "", nil 211 } 212 if err != nil { 213 return "", "", fmt.Errorf("get repo: %w", err) 214 } 215 return knot, spindle, nil 216} 217 218// AuthorizePipelineActor reports whether a pipeline trigger that 219// arrived on knot for repo (repoOwnerDID, repoName) should be allowed 220// to spawn work on this spindle. Both halves of the answer are 221// derived from the persisted Tangled state in spindle_members and 222// repos: the network's own authorization records, mirrored here from 223// the firehose by jetstream.go. 224// 225// The check is two independent gates; both must hold: 226// 227// 1. **Repo claim**: a sh.tangled.repo record exists naming 228// hostname as its CI spindle AND knot as its host knot, 229// published by repoOwnerDID with the matching name. Without this 230// a knot we already happen to be subscribed to (because *some* 231// repo on it points at us) could otherwise smuggle in pipeline 232// triggers for unrelated repos that never opted into us. 233// 234// 2. **Actor membership**: repoOwnerDID is either the spindle owner 235// itself, or has been authorized via a sh.tangled.spindle.member 236// record published by the spindle owner. The publisher (`did`) 237// of that membership grant must equal ownerDID. Anyone can 238// publish a member record naming anyone, so trusting unsigned 239// grants would let any DID grant itself access. We do NOT match 240// against the record's `instance` column because the upstream 241// ecosystem stores it inconsistently (URL vs hostname), and a 242// tack instance only ever speaks for a single hostname anyway. 243// 244// reason is a short, log-friendly description of why authorization 245// failed when ok is false; it is empty when ok is true. Errors 246// reported here are SQL/IO failures, never policy denials; those 247// surface as ok=false with a populated reason. 248func (s *store) AuthorizePipelineActor( 249 ctx context.Context, 250 hostname, knot, ownerDID, repoOwnerDID, repoName string, 251) (ok bool, reason string, err error) { 252 // We can't authorize an unknown actor or an unidentified repo: 253 // every gate below joins on these. Bail before touching SQLite 254 // so a malformed trigger logs cleanly instead of triggering a 255 // "no rows" miss that could be confused with a real denial. 256 if repoOwnerDID == "" { 257 return false, "trigger has no repo did", nil 258 } 259 if repoName == "" { 260 return false, "trigger has no repo name", nil 261 } 262 263 // Gate 1: this specific repo opted into us on this specific knot. 264 var n int 265 if err := s.db.QueryRowContext(ctx, 266 `SELECT COUNT(*) FROM repos 267 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`, 268 repoOwnerDID, repoName, knot, hostname, 269 ).Scan(&n); err != nil { 270 return false, "", fmt.Errorf("count repo claim: %w", err) 271 } 272 if n == 0 { 273 return false, "no repo record claims this spindle on this knot", nil 274 } 275 276 // Gate 2: actor is the spindle owner, or vouched for by them. 277 if repoOwnerDID == ownerDID { 278 return true, "", nil 279 } 280 if err := s.db.QueryRowContext(ctx, 281 `SELECT COUNT(*) FROM spindle_members 282 WHERE did = ? AND subject = ?`, 283 ownerDID, repoOwnerDID, 284 ).Scan(&n); err != nil { 285 return false, "", fmt.Errorf("count membership: %w", err) 286 } 287 if n == 0 { 288 return false, "actor is not a spindle member", nil 289 } 290 return true, "", nil 291} 292 293// IsKnotWanted reports whether any *authorized* repo currently stored 294// still names the given hostname as its spindle and the given knot as 295// its host. After a repo update or delete this is the question we ask 296// to decide whether to keep watching that knot or unsubscribe from it. 297// 298// "Authorized" means the repo's publisher (the row's `did` column) is 299// either the spindle owner or has been vouched for by the spindle owner 300// via a sh.tangled.spindle.member record. Without this filter, a non- 301// member could pin us to an arbitrary attacker-chosen knot just by 302// publishing a sh.tangled.repo record naming us as its spindle. See 303// the matching gate in IsAuthorizedActor and AuthorizePipelineActor. 304func (s *store) IsKnotWanted(ctx context.Context, hostname, ownerDID, knot string) (bool, error) { 305 var n int 306 err := s.db.QueryRowContext(ctx, 307 `SELECT COUNT(*) FROM repos r 308 WHERE r.spindle = ? AND r.knot = ? 309 AND ( 310 r.did = ? 311 OR EXISTS ( 312 SELECT 1 FROM spindle_members m 313 WHERE m.did = ? AND m.subject = r.did 314 ) 315 )`, 316 hostname, knot, ownerDID, ownerDID, 317 ).Scan(&n) 318 if err != nil { 319 return false, fmt.Errorf("count repos for knot: %w", err) 320 } 321 return n > 0, nil 322} 323 324// IsAuthorizedActor reports whether did is the spindle owner or has 325// been authorized by the spindle owner via a sh.tangled.spindle.member 326// record. The membership record's publisher (its `did` column) must 327// equal ownerDID; anyone can publish a membership record naming 328// anyone, so trusting unsigned grants would let any DID grant itself 329// access. This is the same trust rule AuthorizePipelineActor enforces; 330// it's pulled out here so knot-subscription decisions in jetstream.go 331// gate on the same check as pipeline-spawning decisions in knot.go. 332func (s *store) IsAuthorizedActor(ctx context.Context, ownerDID, did string) (bool, error) { 333 if did == "" { 334 return false, nil 335 } 336 if did == ownerDID { 337 return true, nil 338 } 339 var n int 340 err := s.db.QueryRowContext(ctx, 341 `SELECT COUNT(*) FROM spindle_members 342 WHERE did = ? AND subject = ?`, 343 ownerDID, did, 344 ).Scan(&n) 345 if err != nil { 346 return false, fmt.Errorf("count membership: %w", err) 347 } 348 return n > 0, nil 349} 350 351// GetSpindleMember returns the subject DID currently stored for a 352// (did, rkey) spindle.member row, or "" when no such row exists. 353// 354// Used by the jetstream handler to learn whose authorization a 355// delete/update of a membership record affects, so it can reconcile 356// that subject's knot subscriptions in the same step as the mutation. 357func (s *store) GetSpindleMember(ctx context.Context, did, rkey string) (string, error) { 358 var subject string 359 err := s.db.QueryRowContext(ctx, 360 `SELECT subject FROM spindle_members WHERE did = ? AND rkey = ?`, 361 did, rkey, 362 ).Scan(&subject) 363 if errors.Is(err, sql.ErrNoRows) { 364 return "", nil 365 } 366 if err != nil { 367 return "", fmt.Errorf("get spindle_member: %w", err) 368 } 369 return subject, nil 370} 371 372// KnotsForOwner returns the distinct knot hostnames of repos published 373// by `did` whose spindle field equals hostname. Used after a membership 374// change to find which knots that DID's repos want, so we can subscribe 375// (newly granted) or potentially unsubscribe (revoked) in lockstep with 376// the grant. 377// 378// Returns an empty slice (not nil) on no matches so callers can range 379// over the result without a nil check. 380func (s *store) KnotsForOwner(ctx context.Context, hostname, did string) ([]string, error) { 381 rows, err := s.db.QueryContext(ctx, 382 `SELECT DISTINCT knot FROM repos 383 WHERE did = ? AND spindle = ? AND knot <> ''`, 384 did, hostname, 385 ) 386 if err != nil { 387 return nil, fmt.Errorf("query knots for owner: %w", err) 388 } 389 defer rows.Close() 390 out := []string{} 391 for rows.Next() { 392 var k string 393 if err := rows.Scan(&k); err != nil { 394 return nil, fmt.Errorf("scan knot: %w", err) 395 } 396 out = append(out, k) 397 } 398 if err := rows.Err(); err != nil { 399 return nil, fmt.Errorf("iterate knots: %w", err) 400 } 401 return out, nil 402} 403 404// KnotsForSpindle returns the distinct knot hostnames of all repos that 405// (a) have declared the given spindle hostname as their CI spindle, and 406// (b) were published by an *authorized* DID: either the spindle owner 407// itself or a member they vouched for via sh.tangled.spindle.member. 408// 409// The membership filter is critical: without it, any DID that publishes 410// a sh.tangled.repo record naming us as its spindle could force us to 411// dial an attacker-chosen knot at startup. See IsAuthorizedActor for 412// the matching trust check applied at firehose-event time. 413// 414// Returns an empty slice (not nil) when nothing matches, so callers can 415// range over the result without a nil check. 416func (s *store) KnotsForSpindle(ctx context.Context, hostname, ownerDID string) ([]string, error) { 417 rows, err := s.db.QueryContext(ctx, 418 `SELECT DISTINCT r.knot FROM repos r 419 WHERE r.spindle = ? AND r.knot <> '' 420 AND ( 421 r.did = ? 422 OR EXISTS ( 423 SELECT 1 FROM spindle_members m 424 WHERE m.did = ? AND m.subject = r.did 425 ) 426 )`, 427 hostname, ownerDID, ownerDID, 428 ) 429 if err != nil { 430 return nil, fmt.Errorf("query knots: %w", err) 431 } 432 defer rows.Close() 433 434 out := []string{} 435 for rows.Next() { 436 var k string 437 if err := rows.Scan(&k); err != nil { 438 return nil, fmt.Errorf("scan knot: %w", err) 439 } 440 out = append(out, k) 441 } 442 if err := rows.Err(); err != nil { 443 return nil, fmt.Errorf("iterate knots: %w", err) 444 } 445 return out, nil 446} 447 448// DeleteRepoCollaborator removes a collaborator record by its ATProto 449// identity. 450func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error { 451 _, err := s.db.ExecContext(ctx, 452 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`, 453 did, rkey, 454 ) 455 if err != nil { 456 return fmt.Errorf("delete repo_collaborator: %w", err) 457 } 458 return nil 459} 460 461// EventRow is one row of the events table. It represents an outbound 462// record we want to deliver to /events websocket subscribers, in the 463// shape callers actually need (raw record JSON, not stringly-typed). 464type EventRow struct { 465 // Created is the assigned monotonic rowid; doubles as the cursor 466 // value subscribers use to resume. 467 Created int64 468 // Rkey is the ATProto record key. For sh.tangled.pipeline.status 469 // records this is the rkey we mint when publishing. 470 Rkey string 471 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status). 472 Nsid string 473 // EventJSON is the record body verbatim — held as RawMessage so 474 // the /events handler can splice it into the wire envelope without 475 // an unmarshal/remarshal round-trip. 476 EventJSON json.RawMessage 477} 478 479// InsertEvent appends an event row and returns its assigned `created` 480// (rowid) cursor. Storage is the source of truth for fan-out, so we 481// write here even if zero subscribers are connected — a subscriber that 482// connects later (with an old cursor) will pick the row up via 483// EventsAfter. 484// 485// eventJSON must be a valid JSON object; we store it verbatim. Length 486// validation is intentionally absent — the schema accepts arbitrary 487// TEXT and SQLite handles huge blobs fine for our scale. 488func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) { 489 res, err := s.db.ExecContext(ctx, 490 `INSERT INTO events (rkey, nsid, event_json, inserted_at) 491 VALUES (?, ?, ?, ?)`, 492 rkey, nsid, string(eventJSON), 493 time.Now().UTC().Format(time.RFC3339Nano), 494 ) 495 if err != nil { 496 return 0, fmt.Errorf("insert event: %w", err) 497 } 498 id, err := res.LastInsertId() 499 if err != nil { 500 return 0, fmt.Errorf("event last insert id: %w", err) 501 } 502 return id, nil 503} 504 505// BuildkiteBuildRef is the persisted mapping from one Buildkite build 506// to the Tangled pipeline tuple that spawned it. It's the row written 507// by the Buildkite provider at Spawn time and read back from two 508// places: the webhook handler (by build UUID) when an event arrives, 509// and the /logs handler (by knot+rkey+workflow) when an appview 510// client asks for output. 511type BuildkiteBuildRef struct { 512 BuildUUID string 513 BuildNumber int64 514 PipelineSlug string 515 // Org is the Buildkite organisation slug the build was created 516 // against. Persisted at Spawn time so /logs and any other 517 // post-creation API call can target the same org the workflow 518 // originally chose — see the workflow YAML `tack.buildkite.org` 519 // override. Empty means "use the provider's default org", which 520 // is what every row written before the org column existed will 521 // scan as. 522 Org string 523 Knot string 524 PipelineRkey string 525 Workflow string 526 PipelineURI string 527} 528 529// TektonRunRef is the persisted link from a Tangled workflow tuple 530// to the in-cluster PipelineRun tack created for it. The tuple is the 531// user-facing identity the appview knows; namespace/name/uid are the 532// Kubernetes identity needed for status watching and log lookup. 533type TektonRunRef struct { 534 Knot string 535 PipelineRkey string 536 Workflow string 537 Namespace string 538 PipelineRunName string 539 PipelineRunUID string 540 PipelineName string 541 PipelineURI string 542} 543 544// SourcehutJobRef is the persisted link from a Tangled workflow tuple 545// to the builds.sr.ht job tack submitted for it. Owner is the 546// canonical name (with leading "~") returned by the upstream API and 547// is the path component log URLs are rooted at — without it we can't 548// fetch logs even though the job ID alone is unique. Instance is the 549// base URL the job lives on; persisted per-row so post-creation API 550// calls always target the same server the workflow originally ran on. 551type SourcehutJobRef struct { 552 Knot string 553 PipelineRkey string 554 Workflow string 555 JobID int64 556 Owner string 557 Instance string 558 PipelineURI string 559} 560 561// InsertBuildkiteBuild records that a Buildkite build was created on 562// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 563// INSERT OR REPLACE so that an unlikely build-uuid collision (or a 564// Buildkite-side rebuild that re-fires us) just refreshes the row 565// instead of failing. 566func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error { 567 // Capture wall-clock and monotonic-friendly forms once so the two 568 // columns agree on the same instant. created_at is the 569 // human-readable RFC3339Nano string; created_unix_ns is the 570 // integer the lookup orders on (text comparison of nanosecond 571 // timestamps isn't reliable, so we sort on the int instead). 572 now := time.Now().UTC() 573 _, err := s.db.ExecContext(ctx, 574 `INSERT INTO buildkite_builds ( 575 build_uuid, build_number, pipeline_slug, org, 576 knot, pipeline_rkey, workflow, 577 pipeline_uri, created_at, created_unix_ns 578 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 579 ON CONFLICT(build_uuid) DO UPDATE SET 580 build_number = excluded.build_number, 581 pipeline_slug = excluded.pipeline_slug, 582 org = excluded.org, 583 knot = excluded.knot, 584 pipeline_rkey = excluded.pipeline_rkey, 585 workflow = excluded.workflow, 586 pipeline_uri = excluded.pipeline_uri, 587 created_at = excluded.created_at, 588 created_unix_ns = excluded.created_unix_ns`, 589 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org, 590 ref.Knot, ref.PipelineRkey, ref.Workflow, 591 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 592 ) 593 if err != nil { 594 return fmt.Errorf("insert buildkite_build: %w", err) 595 } 596 return nil 597} 598 599// LookupBuildkiteBuildByUUID returns the saved mapping for the given 600// Buildkite build UUID, or nil when no such build is recorded. 601// Returning a nil pointer rather than a sentinel error keeps the 602// webhook handler's "we don't know about this build" branch a simple 603// nil check. 604func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) { 605 var ref BuildkiteBuildRef 606 err := s.db.QueryRowContext(ctx, 607 `SELECT build_uuid, build_number, pipeline_slug, org, 608 knot, pipeline_rkey, workflow, pipeline_uri 609 FROM buildkite_builds WHERE build_uuid = ?`, 610 buildUUID, 611 ).Scan( 612 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 613 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 614 ) 615 if errors.Is(err, sql.ErrNoRows) { 616 return nil, nil 617 } 618 if err != nil { 619 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err) 620 } 621 return &ref, nil 622} 623 624// LookupBuildkiteBuildByTuple finds the most recently created build 625// for (knot, pipelineRkey, workflow). Returns nil when no build has 626// been recorded for that tuple — used by /logs to translate the 627// appview's path-based identity back into something Buildkite knows. 628// 629// "Most recent" matters because a workflow may have multiple builds 630// over time (rebuilds, re-triggers). We always serve logs for the 631// latest run; older runs are still queryable by build UUID directly 632// if anyone ever wants that. 633// 634// Ordering is on created_unix_ns (a monotonic int) rather than 635// created_at. Text comparison of RFC3339Nano timestamps is not 636// reliable across nanosecond precision, which used to make this 637// query occasionally pick the wrong run. created_at and build_number 638// are kept as deterministic tiebreakers for legacy rows that pre-date 639// the new column and still scan as 0. 640func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) { 641 var ref BuildkiteBuildRef 642 err := s.db.QueryRowContext(ctx, 643 `SELECT build_uuid, build_number, pipeline_slug, org, 644 knot, pipeline_rkey, workflow, pipeline_uri 645 FROM buildkite_builds 646 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ? 647 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC 648 LIMIT 1`, 649 knot, pipelineRkey, workflow, 650 ).Scan( 651 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 652 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 653 ) 654 if errors.Is(err, sql.ErrNoRows) { 655 return nil, nil 656 } 657 if err != nil { 658 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err) 659 } 660 return &ref, nil 661} 662 663// InsertTektonRun records the latest PipelineRun created for a Tangled 664// workflow tuple. Reusing the tuple as the primary key intentionally 665// makes /logs resolve to the newest run for that workflow identity. 666func (s *store) InsertTektonRun(ctx context.Context, ref TektonRunRef) error { 667 now := time.Now().UTC() 668 _, err := s.db.ExecContext(ctx, 669 `INSERT INTO tekton_runs ( 670 knot, pipeline_rkey, workflow, 671 namespace, pipeline_run_name, pipeline_run_uid, 672 pipeline_name, pipeline_uri, created_at, created_unix_ns 673 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 674 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET 675 namespace = excluded.namespace, 676 pipeline_run_name = excluded.pipeline_run_name, 677 pipeline_run_uid = excluded.pipeline_run_uid, 678 pipeline_name = excluded.pipeline_name, 679 pipeline_uri = excluded.pipeline_uri, 680 created_at = excluded.created_at, 681 created_unix_ns = excluded.created_unix_ns`, 682 ref.Knot, ref.PipelineRkey, ref.Workflow, 683 ref.Namespace, ref.PipelineRunName, ref.PipelineRunUID, 684 ref.PipelineName, ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 685 ) 686 if err != nil { 687 return fmt.Errorf("insert tekton_run: %w", err) 688 } 689 return nil 690} 691 692// InsertSourcehutJob records the latest builds.sr.ht job submitted for 693// a Tangled workflow tuple. Reusing the tuple as the primary key 694// intentionally makes /logs resolve to the newest job for that workflow 695// identity — same behaviour as InsertTektonRun. 696func (s *store) InsertSourcehutJob(ctx context.Context, ref SourcehutJobRef) error { 697 now := time.Now().UTC() 698 _, err := s.db.ExecContext(ctx, 699 `INSERT INTO sourcehut_jobs ( 700 knot, pipeline_rkey, workflow, 701 job_id, owner, instance, 702 pipeline_uri, created_at, created_unix_ns 703 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 704 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET 705 job_id = excluded.job_id, 706 owner = excluded.owner, 707 instance = excluded.instance, 708 pipeline_uri = excluded.pipeline_uri, 709 created_at = excluded.created_at, 710 created_unix_ns = excluded.created_unix_ns`, 711 ref.Knot, ref.PipelineRkey, ref.Workflow, 712 ref.JobID, ref.Owner, ref.Instance, 713 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 714 ) 715 if err != nil { 716 return fmt.Errorf("insert sourcehut_job: %w", err) 717 } 718 return nil 719} 720 721// LookupSourcehutJobByTuple resolves the appview's path-based identity 722// back to the concrete builds.sr.ht job tack submitted for it. 723func (s *store) LookupSourcehutJobByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*SourcehutJobRef, error) { 724 var ref SourcehutJobRef 725 err := s.db.QueryRowContext(ctx, 726 `SELECT knot, pipeline_rkey, workflow, 727 job_id, owner, instance, pipeline_uri 728 FROM sourcehut_jobs 729 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`, 730 knot, pipelineRkey, workflow, 731 ).Scan( 732 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, 733 &ref.JobID, &ref.Owner, &ref.Instance, &ref.PipelineURI, 734 ) 735 if errors.Is(err, sql.ErrNoRows) { 736 return nil, nil 737 } 738 if err != nil { 739 return nil, fmt.Errorf("lookup sourcehut_job by tuple: %w", err) 740 } 741 return &ref, nil 742} 743 744// LookupTektonRunByTuple resolves the appview's path-based identity to 745// the concrete PipelineRun tack created in Kubernetes. 746func (s *store) LookupTektonRunByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*TektonRunRef, error) { 747 var ref TektonRunRef 748 err := s.db.QueryRowContext(ctx, 749 `SELECT knot, pipeline_rkey, workflow, 750 namespace, pipeline_run_name, pipeline_run_uid, 751 pipeline_name, pipeline_uri 752 FROM tekton_runs 753 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`, 754 knot, pipelineRkey, workflow, 755 ).Scan( 756 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, 757 &ref.Namespace, &ref.PipelineRunName, &ref.PipelineRunUID, 758 &ref.PipelineName, &ref.PipelineURI, 759 ) 760 if errors.Is(err, sql.ErrNoRows) { 761 return nil, nil 762 } 763 if err != nil { 764 return nil, fmt.Errorf("lookup tekton_run by tuple: %w", err) 765 } 766 return &ref, nil 767} 768 769// EventsAfter returns every event row with `created` strictly greater 770// than cursor, in cursor order. Used by /events to backfill a 771// reconnecting subscriber and to drain newly-published rows on each 772// broker notification. 773// 774// Pass cursor=0 to get the full log from the beginning, which is what 775// happens when a subscriber connects without a ?cursor= query param. 776func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) { 777 rows, err := s.db.QueryContext(ctx, 778 `SELECT created, rkey, nsid, event_json 779 FROM events 780 WHERE created > ? 781 ORDER BY created ASC`, 782 cursor, 783 ) 784 if err != nil { 785 return nil, fmt.Errorf("query events: %w", err) 786 } 787 defer rows.Close() 788 789 out := []EventRow{} 790 for rows.Next() { 791 var ( 792 ev EventRow 793 raw string 794 ) 795 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil { 796 return nil, fmt.Errorf("scan event: %w", err) 797 } 798 ev.EventJSON = json.RawMessage(raw) 799 out = append(out, ev) 800 } 801 if err := rows.Err(); err != nil { 802 return nil, fmt.Errorf("iterate events: %w", err) 803 } 804 return out, nil 805}