Stitch any CI into Tangled
1package main
2
3// SQLite-backed persistence for tack. Holds:
4//
5// - the jetstream cursor, so restarts resume the firehose where the
6// previous run left off
7// - mirrored Tangled state (spindle members, repos, collaborators)
8// and the authorization helpers built on top of it, used both to
9// gate inbound pipeline triggers and to decide which knots we
10// should be subscribed to
11// - the outbound event log fed to /events websocket subscribers
12// - the Buildkite build → pipeline mapping the webhook and /logs
13// handlers look up
14//
15// Per-method docs go into more detail.
16
17import (
18 "context"
19 "database/sql"
20 "encoding/json"
21 "errors"
22 "fmt"
23 "strconv"
24 "time"
25
26 _ "github.com/mattn/go-sqlite3"
27)
28
29// store wraps the SQLite handle and exposes the small set of operations
30// the rest of tack needs. Keeping the surface narrow lets the persistence
31// layer be swapped or mocked later without rewriting callers.
32type store struct {
33 db *sql.DB
34}
35
36// openStore opens (or creates) the SQLite database at path and applies
37// the schema. It also flips on WAL + NORMAL synchronous, which is the
38// usual "this is a long-running server, not a one-shot script" config:
39// concurrent reads don't block the single writer, and we trade a little
40// crash-safety on power loss for a lot of write throughput.
41func openStore(path string) (*store, error) {
42 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode,
43 // _synchronous, _foreign_keys) and applies them on each new connection.
44 // journal_mode=WAL persists in the database file but setting it here
45 // also covers the first-ever open.
46 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path)
47 db, err := sql.Open("sqlite3", dsn)
48 if err != nil {
49 return nil, fmt.Errorf("open sqlite %q: %w", path, err)
50 }
51
52 // SQLite only supports one writer at a time. Capping max open conns
53 // at 1 keeps database/sql from spinning up extra connections that
54 // will only ever serialize behind that writer.
55 db.SetMaxOpenConns(1)
56
57 s := &store{db: db}
58 if err := s.migrate(context.Background()); err != nil {
59 _ = db.Close()
60 return nil, err
61 }
62 return s, nil
63}
64
65// Close releases the underlying database handle.
66func (s *store) Close() error {
67 return s.db.Close()
68}
69
70// metaCursorKey is the meta-table key under which we persist the
71// jetstream cursor. Pulled out as a constant to avoid drift between
72// load/save sites.
73const metaCursorKey = "jetstream_cursor"
74
75// LoadCursor returns the persisted jetstream cursor, or nil if none has
76// been saved yet (signaling "start from now"). The cursor is a unix
77// microsecond timestamp — see jetstream.Event.TimeUS.
78func (s *store) LoadCursor(ctx context.Context) (*int64, error) {
79 var raw string
80 err := s.db.QueryRowContext(ctx,
81 `SELECT value FROM meta WHERE key = ?`, metaCursorKey,
82 ).Scan(&raw)
83 if errors.Is(err, sql.ErrNoRows) {
84 return nil, nil
85 }
86 if err != nil {
87 return nil, fmt.Errorf("load cursor: %w", err)
88 }
89 v, err := strconv.ParseInt(raw, 10, 64)
90 if err != nil {
91 // A malformed cursor shouldn't wedge startup — log-and-ignore
92 // (return nil) so we just resume from "now". The caller has the
93 // logger; we surface the parse error so it can decide.
94 return nil, fmt.Errorf("parse cursor %q: %w", raw, err)
95 }
96 return &v, nil
97}
98
99// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta
100// row is created on first call and updated thereafter.
101func (s *store) SaveCursor(ctx context.Context, cursor int64) error {
102 _, err := s.db.ExecContext(ctx,
103 `INSERT INTO meta (key, value) VALUES (?, ?)
104 ON CONFLICT(key) DO UPDATE SET value = excluded.value`,
105 metaCursorKey, strconv.FormatInt(cursor, 10),
106 )
107 if err != nil {
108 return fmt.Errorf("save cursor: %w", err)
109 }
110 return nil
111}
112
113// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member
114// observation.
115func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error {
116 _, err := s.db.ExecContext(ctx,
117 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at)
118 VALUES (?, ?, ?, ?, ?)
119 ON CONFLICT(did, rkey) DO UPDATE SET
120 instance = excluded.instance,
121 subject = excluded.subject,
122 created_at = excluded.created_at`,
123 did, rkey, instance, subject, createdAt,
124 )
125 if err != nil {
126 return fmt.Errorf("upsert spindle_member: %w", err)
127 }
128 return nil
129}
130
131// DeleteSpindleMember removes a member record by its ATProto identity.
132func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error {
133 _, err := s.db.ExecContext(ctx,
134 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`,
135 did, rkey,
136 )
137 if err != nil {
138 return fmt.Errorf("delete spindle_member: %w", err)
139 }
140 return nil
141}
142
143// UpsertRepo records (or refreshes) a sh.tangled.repo observation.
144// spindle and repoDid may be empty strings — we store them as such rather
145// than as SQL NULLs to keep the upsert path uniform.
146func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error {
147 _, err := s.db.ExecContext(ctx,
148 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at)
149 VALUES (?, ?, ?, ?, ?, ?, ?)
150 ON CONFLICT(did, rkey) DO UPDATE SET
151 knot = excluded.knot,
152 name = excluded.name,
153 spindle = excluded.spindle,
154 repo_did = excluded.repo_did,
155 created_at = excluded.created_at`,
156 did, rkey, knot, name, spindle, repoDid, createdAt,
157 )
158 if err != nil {
159 return fmt.Errorf("upsert repo: %w", err)
160 }
161 return nil
162}
163
164// DeleteRepo removes a repo record by its ATProto identity.
165func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error {
166 _, err := s.db.ExecContext(ctx,
167 `DELETE FROM repos WHERE did = ? AND rkey = ?`,
168 did, rkey,
169 )
170 if err != nil {
171 return fmt.Errorf("delete repo: %w", err)
172 }
173 return nil
174}
175
176// UpsertRepoCollaborator records (or refreshes) a
177// sh.tangled.repo.collaborator observation.
178func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error {
179 _, err := s.db.ExecContext(ctx,
180 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at)
181 VALUES (?, ?, ?, ?, ?, ?)
182 ON CONFLICT(did, rkey) DO UPDATE SET
183 repo = excluded.repo,
184 repo_did = excluded.repo_did,
185 subject = excluded.subject,
186 created_at = excluded.created_at`,
187 did, rkey, repo, repoDid, subject, createdAt,
188 )
189 if err != nil {
190 return fmt.Errorf("upsert repo_collaborator: %w", err)
191 }
192 return nil
193}
194
195// GetRepo returns the (knot, spindle) currently stored for a (did, rkey)
196// pair. Both are returned as empty strings when no row exists; callers
197// that need to distinguish "absent" from "stored but empty" should
198// pre-check existence themselves.
199//
200// This exists so applyRepo can read the *previous* spindle/knot of a
201// record before applying a mutation, which is what makes it possible to
202// detect transitions like "this repo used to be ours, now it isn't" and
203// trigger a knot unsubscribe.
204func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) {
205 err = s.db.QueryRowContext(ctx,
206 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`,
207 did, rkey,
208 ).Scan(&knot, &spindle)
209 if errors.Is(err, sql.ErrNoRows) {
210 return "", "", nil
211 }
212 if err != nil {
213 return "", "", fmt.Errorf("get repo: %w", err)
214 }
215 return knot, spindle, nil
216}
217
218// AuthorizePipelineActor reports whether a pipeline trigger that
219// arrived on knot for repo (repoOwnerDID, repoName) should be allowed
220// to spawn work on this spindle. Both halves of the answer are
221// derived from the persisted Tangled state in spindle_members and
222// repos: the network's own authorization records, mirrored here from
223// the firehose by jetstream.go.
224//
225// The check is two independent gates; both must hold:
226//
227// 1. **Repo claim**: a sh.tangled.repo record exists naming
228// hostname as its CI spindle AND knot as its host knot,
229// published by repoOwnerDID with the matching name. Without this
230// a knot we already happen to be subscribed to (because *some*
231// repo on it points at us) could otherwise smuggle in pipeline
232// triggers for unrelated repos that never opted into us.
233//
234// 2. **Actor membership**: repoOwnerDID is either the spindle owner
235// itself, or has been authorized via a sh.tangled.spindle.member
236// record published by the spindle owner. The publisher (`did`)
237// of that membership grant must equal ownerDID. Anyone can
238// publish a member record naming anyone, so trusting unsigned
239// grants would let any DID grant itself access. We do NOT match
240// against the record's `instance` column because the upstream
241// ecosystem stores it inconsistently (URL vs hostname), and a
242// tack instance only ever speaks for a single hostname anyway.
243//
244// reason is a short, log-friendly description of why authorization
245// failed when ok is false; it is empty when ok is true. Errors
246// reported here are SQL/IO failures, never policy denials; those
247// surface as ok=false with a populated reason.
248func (s *store) AuthorizePipelineActor(
249 ctx context.Context,
250 hostname, knot, ownerDID, repoOwnerDID, repoName string,
251) (ok bool, reason string, err error) {
252 // We can't authorize an unknown actor or an unidentified repo:
253 // every gate below joins on these. Bail before touching SQLite
254 // so a malformed trigger logs cleanly instead of triggering a
255 // "no rows" miss that could be confused with a real denial.
256 if repoOwnerDID == "" {
257 return false, "trigger has no repo did", nil
258 }
259 if repoName == "" {
260 return false, "trigger has no repo name", nil
261 }
262
263 // Gate 1: this specific repo opted into us on this specific knot.
264 var n int
265 if err := s.db.QueryRowContext(ctx,
266 `SELECT COUNT(*) FROM repos
267 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`,
268 repoOwnerDID, repoName, knot, hostname,
269 ).Scan(&n); err != nil {
270 return false, "", fmt.Errorf("count repo claim: %w", err)
271 }
272 if n == 0 {
273 return false, "no repo record claims this spindle on this knot", nil
274 }
275
276 // Gate 2: actor is the spindle owner, or vouched for by them.
277 if repoOwnerDID == ownerDID {
278 return true, "", nil
279 }
280 if err := s.db.QueryRowContext(ctx,
281 `SELECT COUNT(*) FROM spindle_members
282 WHERE did = ? AND subject = ?`,
283 ownerDID, repoOwnerDID,
284 ).Scan(&n); err != nil {
285 return false, "", fmt.Errorf("count membership: %w", err)
286 }
287 if n == 0 {
288 return false, "actor is not a spindle member", nil
289 }
290 return true, "", nil
291}
292
293// IsKnotWanted reports whether any *authorized* repo currently stored
294// still names the given hostname as its spindle and the given knot as
295// its host. After a repo update or delete this is the question we ask
296// to decide whether to keep watching that knot or unsubscribe from it.
297//
298// "Authorized" means the repo's publisher (the row's `did` column) is
299// either the spindle owner or has been vouched for by the spindle owner
300// via a sh.tangled.spindle.member record. Without this filter, a non-
301// member could pin us to an arbitrary attacker-chosen knot just by
302// publishing a sh.tangled.repo record naming us as its spindle. See
303// the matching gate in IsAuthorizedActor and AuthorizePipelineActor.
304func (s *store) IsKnotWanted(ctx context.Context, hostname, ownerDID, knot string) (bool, error) {
305 var n int
306 err := s.db.QueryRowContext(ctx,
307 `SELECT COUNT(*) FROM repos r
308 WHERE r.spindle = ? AND r.knot = ?
309 AND (
310 r.did = ?
311 OR EXISTS (
312 SELECT 1 FROM spindle_members m
313 WHERE m.did = ? AND m.subject = r.did
314 )
315 )`,
316 hostname, knot, ownerDID, ownerDID,
317 ).Scan(&n)
318 if err != nil {
319 return false, fmt.Errorf("count repos for knot: %w", err)
320 }
321 return n > 0, nil
322}
323
324// IsAuthorizedActor reports whether did is the spindle owner or has
325// been authorized by the spindle owner via a sh.tangled.spindle.member
326// record. The membership record's publisher (its `did` column) must
327// equal ownerDID; anyone can publish a membership record naming
328// anyone, so trusting unsigned grants would let any DID grant itself
329// access. This is the same trust rule AuthorizePipelineActor enforces;
330// it's pulled out here so knot-subscription decisions in jetstream.go
331// gate on the same check as pipeline-spawning decisions in knot.go.
332func (s *store) IsAuthorizedActor(ctx context.Context, ownerDID, did string) (bool, error) {
333 if did == "" {
334 return false, nil
335 }
336 if did == ownerDID {
337 return true, nil
338 }
339 var n int
340 err := s.db.QueryRowContext(ctx,
341 `SELECT COUNT(*) FROM spindle_members
342 WHERE did = ? AND subject = ?`,
343 ownerDID, did,
344 ).Scan(&n)
345 if err != nil {
346 return false, fmt.Errorf("count membership: %w", err)
347 }
348 return n > 0, nil
349}
350
351// GetSpindleMember returns the subject DID currently stored for a
352// (did, rkey) spindle.member row, or "" when no such row exists.
353//
354// Used by the jetstream handler to learn whose authorization a
355// delete/update of a membership record affects, so it can reconcile
356// that subject's knot subscriptions in the same step as the mutation.
357func (s *store) GetSpindleMember(ctx context.Context, did, rkey string) (string, error) {
358 var subject string
359 err := s.db.QueryRowContext(ctx,
360 `SELECT subject FROM spindle_members WHERE did = ? AND rkey = ?`,
361 did, rkey,
362 ).Scan(&subject)
363 if errors.Is(err, sql.ErrNoRows) {
364 return "", nil
365 }
366 if err != nil {
367 return "", fmt.Errorf("get spindle_member: %w", err)
368 }
369 return subject, nil
370}
371
372// KnotsForOwner returns the distinct knot hostnames of repos published
373// by `did` whose spindle field equals hostname. Used after a membership
374// change to find which knots that DID's repos want, so we can subscribe
375// (newly granted) or potentially unsubscribe (revoked) in lockstep with
376// the grant.
377//
378// Returns an empty slice (not nil) on no matches so callers can range
379// over the result without a nil check.
380func (s *store) KnotsForOwner(ctx context.Context, hostname, did string) ([]string, error) {
381 rows, err := s.db.QueryContext(ctx,
382 `SELECT DISTINCT knot FROM repos
383 WHERE did = ? AND spindle = ? AND knot <> ''`,
384 did, hostname,
385 )
386 if err != nil {
387 return nil, fmt.Errorf("query knots for owner: %w", err)
388 }
389 defer rows.Close()
390 out := []string{}
391 for rows.Next() {
392 var k string
393 if err := rows.Scan(&k); err != nil {
394 return nil, fmt.Errorf("scan knot: %w", err)
395 }
396 out = append(out, k)
397 }
398 if err := rows.Err(); err != nil {
399 return nil, fmt.Errorf("iterate knots: %w", err)
400 }
401 return out, nil
402}
403
404// KnotsForSpindle returns the distinct knot hostnames of all repos that
405// (a) have declared the given spindle hostname as their CI spindle, and
406// (b) were published by an *authorized* DID: either the spindle owner
407// itself or a member they vouched for via sh.tangled.spindle.member.
408//
409// The membership filter is critical: without it, any DID that publishes
410// a sh.tangled.repo record naming us as its spindle could force us to
411// dial an attacker-chosen knot at startup. See IsAuthorizedActor for
412// the matching trust check applied at firehose-event time.
413//
414// Returns an empty slice (not nil) when nothing matches, so callers can
415// range over the result without a nil check.
416func (s *store) KnotsForSpindle(ctx context.Context, hostname, ownerDID string) ([]string, error) {
417 rows, err := s.db.QueryContext(ctx,
418 `SELECT DISTINCT r.knot FROM repos r
419 WHERE r.spindle = ? AND r.knot <> ''
420 AND (
421 r.did = ?
422 OR EXISTS (
423 SELECT 1 FROM spindle_members m
424 WHERE m.did = ? AND m.subject = r.did
425 )
426 )`,
427 hostname, ownerDID, ownerDID,
428 )
429 if err != nil {
430 return nil, fmt.Errorf("query knots: %w", err)
431 }
432 defer rows.Close()
433
434 out := []string{}
435 for rows.Next() {
436 var k string
437 if err := rows.Scan(&k); err != nil {
438 return nil, fmt.Errorf("scan knot: %w", err)
439 }
440 out = append(out, k)
441 }
442 if err := rows.Err(); err != nil {
443 return nil, fmt.Errorf("iterate knots: %w", err)
444 }
445 return out, nil
446}
447
448// DeleteRepoCollaborator removes a collaborator record by its ATProto
449// identity.
450func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
451 _, err := s.db.ExecContext(ctx,
452 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`,
453 did, rkey,
454 )
455 if err != nil {
456 return fmt.Errorf("delete repo_collaborator: %w", err)
457 }
458 return nil
459}
460
461// EventRow is one row of the events table. It represents an outbound
462// record we want to deliver to /events websocket subscribers, in the
463// shape callers actually need (raw record JSON, not stringly-typed).
464type EventRow struct {
465 // Created is the assigned monotonic rowid; doubles as the cursor
466 // value subscribers use to resume.
467 Created int64
468 // Rkey is the ATProto record key. For sh.tangled.pipeline.status
469 // records this is the rkey we mint when publishing.
470 Rkey string
471 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status).
472 Nsid string
473 // EventJSON is the record body verbatim — held as RawMessage so
474 // the /events handler can splice it into the wire envelope without
475 // an unmarshal/remarshal round-trip.
476 EventJSON json.RawMessage
477}
478
479// InsertEvent appends an event row and returns its assigned `created`
480// (rowid) cursor. Storage is the source of truth for fan-out, so we
481// write here even if zero subscribers are connected — a subscriber that
482// connects later (with an old cursor) will pick the row up via
483// EventsAfter.
484//
485// eventJSON must be a valid JSON object; we store it verbatim. Length
486// validation is intentionally absent — the schema accepts arbitrary
487// TEXT and SQLite handles huge blobs fine for our scale.
488func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
489 res, err := s.db.ExecContext(ctx,
490 `INSERT INTO events (rkey, nsid, event_json, inserted_at)
491 VALUES (?, ?, ?, ?)`,
492 rkey, nsid, string(eventJSON),
493 time.Now().UTC().Format(time.RFC3339Nano),
494 )
495 if err != nil {
496 return 0, fmt.Errorf("insert event: %w", err)
497 }
498 id, err := res.LastInsertId()
499 if err != nil {
500 return 0, fmt.Errorf("event last insert id: %w", err)
501 }
502 return id, nil
503}
504
505// BuildkiteBuildRef is the persisted mapping from one Buildkite build
506// to the Tangled pipeline tuple that spawned it. It's the row written
507// by the Buildkite provider at Spawn time and read back from two
508// places: the webhook handler (by build UUID) when an event arrives,
509// and the /logs handler (by knot+rkey+workflow) when an appview
510// client asks for output.
511type BuildkiteBuildRef struct {
512 BuildUUID string
513 BuildNumber int64
514 PipelineSlug string
515 // Org is the Buildkite organisation slug the build was created
516 // against. Persisted at Spawn time so /logs and any other
517 // post-creation API call can target the same org the workflow
518 // originally chose — see the workflow YAML `tack.buildkite.org`
519 // override. Empty means "use the provider's default org", which
520 // is what every row written before the org column existed will
521 // scan as.
522 Org string
523 Knot string
524 PipelineRkey string
525 Workflow string
526 PipelineURI string
527}
528
529// TektonRunRef is the persisted link from a Tangled workflow tuple
530// to the in-cluster PipelineRun tack created for it. The tuple is the
531// user-facing identity the appview knows; namespace/name/uid are the
532// Kubernetes identity needed for status watching and log lookup.
533type TektonRunRef struct {
534 Knot string
535 PipelineRkey string
536 Workflow string
537 Namespace string
538 PipelineRunName string
539 PipelineRunUID string
540 PipelineName string
541 PipelineURI string
542}
543
544// SourcehutJobRef is the persisted link from a Tangled workflow tuple
545// to the builds.sr.ht job tack submitted for it. Owner is the
546// canonical name (with leading "~") returned by the upstream API and
547// is the path component log URLs are rooted at — without it we can't
548// fetch logs even though the job ID alone is unique. Instance is the
549// base URL the job lives on; persisted per-row so post-creation API
550// calls always target the same server the workflow originally ran on.
551type SourcehutJobRef struct {
552 Knot string
553 PipelineRkey string
554 Workflow string
555 JobID int64
556 Owner string
557 Instance string
558 PipelineURI string
559}
560
561// InsertBuildkiteBuild records that a Buildkite build was created on
562// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses
563// INSERT OR REPLACE so that an unlikely build-uuid collision (or a
564// Buildkite-side rebuild that re-fires us) just refreshes the row
565// instead of failing.
566func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error {
567 // Capture wall-clock and monotonic-friendly forms once so the two
568 // columns agree on the same instant. created_at is the
569 // human-readable RFC3339Nano string; created_unix_ns is the
570 // integer the lookup orders on (text comparison of nanosecond
571 // timestamps isn't reliable, so we sort on the int instead).
572 now := time.Now().UTC()
573 _, err := s.db.ExecContext(ctx,
574 `INSERT INTO buildkite_builds (
575 build_uuid, build_number, pipeline_slug, org,
576 knot, pipeline_rkey, workflow,
577 pipeline_uri, created_at, created_unix_ns
578 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
579 ON CONFLICT(build_uuid) DO UPDATE SET
580 build_number = excluded.build_number,
581 pipeline_slug = excluded.pipeline_slug,
582 org = excluded.org,
583 knot = excluded.knot,
584 pipeline_rkey = excluded.pipeline_rkey,
585 workflow = excluded.workflow,
586 pipeline_uri = excluded.pipeline_uri,
587 created_at = excluded.created_at,
588 created_unix_ns = excluded.created_unix_ns`,
589 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org,
590 ref.Knot, ref.PipelineRkey, ref.Workflow,
591 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(),
592 )
593 if err != nil {
594 return fmt.Errorf("insert buildkite_build: %w", err)
595 }
596 return nil
597}
598
599// LookupBuildkiteBuildByUUID returns the saved mapping for the given
600// Buildkite build UUID, or nil when no such build is recorded.
601// Returning a nil pointer rather than a sentinel error keeps the
602// webhook handler's "we don't know about this build" branch a simple
603// nil check.
604func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) {
605 var ref BuildkiteBuildRef
606 err := s.db.QueryRowContext(ctx,
607 `SELECT build_uuid, build_number, pipeline_slug, org,
608 knot, pipeline_rkey, workflow, pipeline_uri
609 FROM buildkite_builds WHERE build_uuid = ?`,
610 buildUUID,
611 ).Scan(
612 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org,
613 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
614 )
615 if errors.Is(err, sql.ErrNoRows) {
616 return nil, nil
617 }
618 if err != nil {
619 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err)
620 }
621 return &ref, nil
622}
623
624// LookupBuildkiteBuildByTuple finds the most recently created build
625// for (knot, pipelineRkey, workflow). Returns nil when no build has
626// been recorded for that tuple — used by /logs to translate the
627// appview's path-based identity back into something Buildkite knows.
628//
629// "Most recent" matters because a workflow may have multiple builds
630// over time (rebuilds, re-triggers). We always serve logs for the
631// latest run; older runs are still queryable by build UUID directly
632// if anyone ever wants that.
633//
634// Ordering is on created_unix_ns (a monotonic int) rather than
635// created_at. Text comparison of RFC3339Nano timestamps is not
636// reliable across nanosecond precision, which used to make this
637// query occasionally pick the wrong run. created_at and build_number
638// are kept as deterministic tiebreakers for legacy rows that pre-date
639// the new column and still scan as 0.
640func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) {
641 var ref BuildkiteBuildRef
642 err := s.db.QueryRowContext(ctx,
643 `SELECT build_uuid, build_number, pipeline_slug, org,
644 knot, pipeline_rkey, workflow, pipeline_uri
645 FROM buildkite_builds
646 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?
647 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC
648 LIMIT 1`,
649 knot, pipelineRkey, workflow,
650 ).Scan(
651 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org,
652 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
653 )
654 if errors.Is(err, sql.ErrNoRows) {
655 return nil, nil
656 }
657 if err != nil {
658 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err)
659 }
660 return &ref, nil
661}
662
663// InsertTektonRun records the latest PipelineRun created for a Tangled
664// workflow tuple. Reusing the tuple as the primary key intentionally
665// makes /logs resolve to the newest run for that workflow identity.
666func (s *store) InsertTektonRun(ctx context.Context, ref TektonRunRef) error {
667 now := time.Now().UTC()
668 _, err := s.db.ExecContext(ctx,
669 `INSERT INTO tekton_runs (
670 knot, pipeline_rkey, workflow,
671 namespace, pipeline_run_name, pipeline_run_uid,
672 pipeline_name, pipeline_uri, created_at, created_unix_ns
673 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
674 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET
675 namespace = excluded.namespace,
676 pipeline_run_name = excluded.pipeline_run_name,
677 pipeline_run_uid = excluded.pipeline_run_uid,
678 pipeline_name = excluded.pipeline_name,
679 pipeline_uri = excluded.pipeline_uri,
680 created_at = excluded.created_at,
681 created_unix_ns = excluded.created_unix_ns`,
682 ref.Knot, ref.PipelineRkey, ref.Workflow,
683 ref.Namespace, ref.PipelineRunName, ref.PipelineRunUID,
684 ref.PipelineName, ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(),
685 )
686 if err != nil {
687 return fmt.Errorf("insert tekton_run: %w", err)
688 }
689 return nil
690}
691
692// InsertSourcehutJob records the latest builds.sr.ht job submitted for
693// a Tangled workflow tuple. Reusing the tuple as the primary key
694// intentionally makes /logs resolve to the newest job for that workflow
695// identity — same behaviour as InsertTektonRun.
696func (s *store) InsertSourcehutJob(ctx context.Context, ref SourcehutJobRef) error {
697 now := time.Now().UTC()
698 _, err := s.db.ExecContext(ctx,
699 `INSERT INTO sourcehut_jobs (
700 knot, pipeline_rkey, workflow,
701 job_id, owner, instance,
702 pipeline_uri, created_at, created_unix_ns
703 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
704 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET
705 job_id = excluded.job_id,
706 owner = excluded.owner,
707 instance = excluded.instance,
708 pipeline_uri = excluded.pipeline_uri,
709 created_at = excluded.created_at,
710 created_unix_ns = excluded.created_unix_ns`,
711 ref.Knot, ref.PipelineRkey, ref.Workflow,
712 ref.JobID, ref.Owner, ref.Instance,
713 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(),
714 )
715 if err != nil {
716 return fmt.Errorf("insert sourcehut_job: %w", err)
717 }
718 return nil
719}
720
721// LookupSourcehutJobByTuple resolves the appview's path-based identity
722// back to the concrete builds.sr.ht job tack submitted for it.
723func (s *store) LookupSourcehutJobByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*SourcehutJobRef, error) {
724 var ref SourcehutJobRef
725 err := s.db.QueryRowContext(ctx,
726 `SELECT knot, pipeline_rkey, workflow,
727 job_id, owner, instance, pipeline_uri
728 FROM sourcehut_jobs
729 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`,
730 knot, pipelineRkey, workflow,
731 ).Scan(
732 &ref.Knot, &ref.PipelineRkey, &ref.Workflow,
733 &ref.JobID, &ref.Owner, &ref.Instance, &ref.PipelineURI,
734 )
735 if errors.Is(err, sql.ErrNoRows) {
736 return nil, nil
737 }
738 if err != nil {
739 return nil, fmt.Errorf("lookup sourcehut_job by tuple: %w", err)
740 }
741 return &ref, nil
742}
743
744// LookupTektonRunByTuple resolves the appview's path-based identity to
745// the concrete PipelineRun tack created in Kubernetes.
746func (s *store) LookupTektonRunByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*TektonRunRef, error) {
747 var ref TektonRunRef
748 err := s.db.QueryRowContext(ctx,
749 `SELECT knot, pipeline_rkey, workflow,
750 namespace, pipeline_run_name, pipeline_run_uid,
751 pipeline_name, pipeline_uri
752 FROM tekton_runs
753 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`,
754 knot, pipelineRkey, workflow,
755 ).Scan(
756 &ref.Knot, &ref.PipelineRkey, &ref.Workflow,
757 &ref.Namespace, &ref.PipelineRunName, &ref.PipelineRunUID,
758 &ref.PipelineName, &ref.PipelineURI,
759 )
760 if errors.Is(err, sql.ErrNoRows) {
761 return nil, nil
762 }
763 if err != nil {
764 return nil, fmt.Errorf("lookup tekton_run by tuple: %w", err)
765 }
766 return &ref, nil
767}
768
769// EventsAfter returns every event row with `created` strictly greater
770// than cursor, in cursor order. Used by /events to backfill a
771// reconnecting subscriber and to drain newly-published rows on each
772// broker notification.
773//
774// Pass cursor=0 to get the full log from the beginning, which is what
775// happens when a subscriber connects without a ?cursor= query param.
776func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) {
777 rows, err := s.db.QueryContext(ctx,
778 `SELECT created, rkey, nsid, event_json
779 FROM events
780 WHERE created > ?
781 ORDER BY created ASC`,
782 cursor,
783 )
784 if err != nil {
785 return nil, fmt.Errorf("query events: %w", err)
786 }
787 defer rows.Close()
788
789 out := []EventRow{}
790 for rows.Next() {
791 var (
792 ev EventRow
793 raw string
794 )
795 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil {
796 return nil, fmt.Errorf("scan event: %w", err)
797 }
798 ev.EventJSON = json.RawMessage(raw)
799 out = append(out, ev)
800 }
801 if err := rows.Err(); err != nil {
802 return nil, fmt.Errorf("iterate events: %w", err)
803 }
804 return out, nil
805}