Stitch any CI into Tangled
1package main
2
3// Provider-level integration tests for the sourcehut implementation:
4// Spawn → SubmitJob + persist + initial pending publish, watch loop →
5// status transitions, and Logs → snapshot of master + per-task logs.
6// builds.sr.ht is stubbed with httptest so the tests don't need
7// network access.
8
9import (
10 "context"
11 "encoding/json"
12 "fmt"
13 "io"
14 "log/slog"
15 "net/http"
16 "net/http/httptest"
17 "strings"
18 "sync/atomic"
19 "testing"
20 "time"
21
22 "tangled.org/core/api/tangled"
23)
24
25// newSourcehutTestProvider wires a sourcehutProvider against an
26// httptest server impersonating builds.sr.ht.
27func newSourcehutTestProvider(
28 t *testing.T,
29 handler http.HandlerFunc,
30) (*sourcehutProvider, *store, *broker, *httptest.Server) {
31 t.Helper()
32 srv := httptest.NewServer(handler)
33 t.Cleanup(srv.Close)
34
35 st := newTestStore(t)
36 br := newBroker(st)
37 logger := slog.Default()
38 p := newSourcehutProvider(br, st, "tok", srv.URL, logger)
39 return p, st, br, srv
40}
41
42// gqlBody is a minimal GraphQL request body for routing test stubs
43// without pulling in a real GraphQL parser.
44type gqlBody struct {
45 Query string `json:"query"`
46 Variables map[string]any `json:"variables"`
47}
48
49// gqlOp returns the op keyword ("query" / "mutation" / "subscription")
50// at the head of a GraphQL document. Used by test stubs to route a
51// single /query endpoint to the right canned response without
52// substring-matching on field names — which would silently misroute a
53// query that happened to mention "submit" or vice-versa.
54func gqlOp(query string) string {
55 q := strings.TrimSpace(query)
56 for _, kw := range []string{"mutation", "query", "subscription"} {
57 if strings.HasPrefix(q, kw) {
58 return kw
59 }
60 }
61 return ""
62}
63
64// writeGQL is the test-side helper for shaping a successful GraphQL
65// response. It wraps `data` under the canonical {"data": …} envelope
66// the client decodes against.
67func writeGQL(w http.ResponseWriter, data map[string]any) {
68 w.Header().Set("Content-Type", "application/json")
69 _ = json.NewEncoder(w).Encode(map[string]any{"data": data})
70}
71
72// TestSourcehutSpawn covers the full submit path: trigger → API call
73// → DB row → "pending" status on the broker.
74func TestSourcehutSpawn(t *testing.T) {
75 manifestCh := make(chan string, 1)
76 notesCh := make(chan string, 1)
77 tagsCh := make(chan []string, 1)
78 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
79 if r.Method != http.MethodPost || r.URL.Path != "/query" {
80 http.NotFound(w, r)
81 return
82 }
83 var body gqlBody
84 _ = json.NewDecoder(r.Body).Decode(&body)
85 switch gqlOp(body.Query) {
86 case "mutation":
87 if m, _ := body.Variables["manifest"].(string); m != "" {
88 select {
89 case manifestCh <- m:
90 default:
91 }
92 }
93 if n, _ := body.Variables["note"].(string); n != "" {
94 select {
95 case notesCh <- n:
96 default:
97 }
98 }
99 if raw, ok := body.Variables["tags"].([]any); ok {
100 ts := make([]string, 0, len(raw))
101 for _, v := range raw {
102 if s, ok := v.(string); ok {
103 ts = append(ts, s)
104 }
105 }
106 select {
107 case tagsCh <- ts:
108 default:
109 }
110 }
111 writeGQL(w, map[string]any{
112 "submit": map[string]any{
113 "id": 42,
114 "status": "PENDING",
115 "owner": map[string]any{"canonicalName": "~tester"},
116 },
117 })
118 case "query":
119 // Watch loop; keep the job pending so the test exits
120 // deterministically once the row and initial publish land.
121 writeGQL(w, map[string]any{
122 "job": map[string]any{
123 "id": 42,
124 "status": "PENDING",
125 "owner": map[string]any{"canonicalName": "~tester"},
126 "tasks": []any{},
127 },
128 })
129 default:
130 http.Error(w, "unknown op", http.StatusBadRequest)
131 }
132 })
133 p, st, _, _ := newSourcehutTestProvider(t, handler)
134
135 trigger := &tangled.Pipeline_TriggerMetadata{
136 Push: &tangled.Pipeline_PushTriggerData{
137 NewSha: "abcdef0123",
138 Ref: "refs/heads/main",
139 },
140 }
141 raw := strings.Join([]string{
142 "tack:",
143 " sourcehut:",
144 " manifest: |",
145 " image: alpine/edge",
146 " tasks:",
147 " - hello: |",
148 " echo hi",
149 " tags: [tack, ci]",
150 " note: integration",
151 }, "\n") + "\n"
152
153 ctx, cancel := context.WithCancel(context.Background())
154 defer cancel()
155 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger,
156 []*tangled.Pipeline_Workflow{{Name: "ci.yml", Raw: raw}},
157 )
158
159 select {
160 case manifest := <-manifestCh:
161 // Manifest should round-trip through injectSourcehutEnvironment;
162 // we expect TACK_* env vars to land in a top-level environment
163 // block that didn't originally exist.
164 if !strings.Contains(manifest, "TACK_KNOT") {
165 t.Fatalf("manifest missing TACK_KNOT: %q", manifest)
166 }
167 if !strings.Contains(manifest, "abcdef0123") {
168 t.Fatalf("manifest missing TACK_COMMIT value: %q", manifest)
169 }
170 // User-supplied tasks must be preserved through the round trip.
171 if !strings.Contains(manifest, "echo hi") {
172 t.Fatalf("user manifest body lost: %q", manifest)
173 }
174 select {
175 case note := <-notesCh:
176 if note != "integration" {
177 t.Fatalf("note=%q; want integration", note)
178 }
179 case <-time.After(time.Second):
180 t.Fatal("note variable not sent")
181 }
182 select {
183 case ts := <-tagsCh:
184 if len(ts) != 2 || ts[0] != "tack" || ts[1] != "ci" {
185 t.Fatalf("tags=%v", ts)
186 }
187 case <-time.After(time.Second):
188 t.Fatal("tags variable not sent")
189 }
190 case <-time.After(2 * time.Second):
191 t.Fatal("SubmitJob not called")
192 }
193
194 // Wait for the row to land — that's the load-bearing artifact.
195 deadline := time.Now().Add(2 * time.Second)
196 var ref *SourcehutJobRef
197 for time.Now().Before(deadline) {
198 var err error
199 ref, err = st.LookupSourcehutJobByTuple(
200 context.Background(),
201 "knot.example.com", "rkey-1", "ci.yml",
202 )
203 if err != nil {
204 t.Fatalf("lookup: %v", err)
205 }
206 if ref != nil {
207 break
208 }
209 time.Sleep(20 * time.Millisecond)
210 }
211 if ref == nil {
212 t.Fatal("sourcehut job row not persisted within deadline")
213 }
214 if ref.JobID != 42 || ref.Owner != "~tester" {
215 t.Fatalf("ref mismatch: %+v", ref)
216 }
217
218 rows, err := st.EventsAfter(context.Background(), 0)
219 if err != nil {
220 t.Fatalf("EventsAfter: %v", err)
221 }
222 if len(rows) == 0 {
223 t.Fatal("no pending status published")
224 }
225 var rec tangled.PipelineStatus
226 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil {
227 t.Fatalf("decode status: %v", err)
228 }
229 if rec.Status != "pending" || rec.Workflow != "ci.yml" {
230 t.Fatalf("unexpected status: %+v", rec)
231 }
232 if !strings.Contains(rec.Pipeline, "knot.example.com") ||
233 !strings.Contains(rec.Pipeline, "rkey-1") {
234 t.Fatalf("pipeline ATURI wrong: %s", rec.Pipeline)
235 }
236}
237
238// TestSourcehutSpawnInvalidConfig confirms a workflow without a manifest
239// is rejected loudly without firing any HTTP call.
240func TestSourcehutSpawnInvalidConfig(t *testing.T) {
241 var called atomic.Bool
242 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
243 called.Store(true)
244 })
245 p, st, _, _ := newSourcehutTestProvider(t, handler)
246
247 p.Spawn(context.Background(), "knot.example.com", "rkey-1", "did:plc:actor",
248 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{
249 NewSha: "abc", Ref: "refs/heads/main",
250 }},
251 []*tangled.Pipeline_Workflow{
252 // `tack.sourcehut` block but no manifest.
253 {Name: "ci.yml", Raw: "tack:\n sourcehut:\n note: hi\n"},
254 },
255 )
256
257 time.Sleep(50 * time.Millisecond)
258 if called.Load() {
259 t.Fatal("SubmitJob called despite missing manifest")
260 }
261 rows, _ := st.EventsAfter(context.Background(), 0)
262 if len(rows) != 0 {
263 t.Fatalf("got %d events, want 0", len(rows))
264 }
265}
266
267// TestSourcehutWatchTransitions exercises the watch loop: a job that
268// flips through PENDING → RUNNING → SUCCESS should produce three
269// distinct status records in order, with no duplicates.
270func TestSourcehutWatchTransitions(t *testing.T) {
271 states := []string{"PENDING", "RUNNING", "RUNNING", "SUCCESS"}
272 var idx atomic.Int32
273 idx.Store(-1)
274
275 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
276 if r.Method != http.MethodPost || r.URL.Path != "/query" {
277 http.NotFound(w, r)
278 return
279 }
280 var body gqlBody
281 _ = json.NewDecoder(r.Body).Decode(&body)
282 if gqlOp(body.Query) != "query" {
283 http.Error(w, "unknown op", http.StatusBadRequest)
284 return
285 }
286 n := int(idx.Add(1))
287 if n >= len(states) {
288 n = len(states) - 1
289 }
290 writeGQL(w, map[string]any{
291 "job": map[string]any{
292 "id": 7,
293 "status": states[n],
294 "owner": map[string]any{"canonicalName": "~tester"},
295 "tasks": []any{},
296 },
297 })
298 })
299 p, st, _, _ := newSourcehutTestProvider(t, handler)
300
301 pipelineURI := pipelineATURI("knot.example.com", "rkey-2")
302 ref := SourcehutJobRef{
303 Knot: "knot.example.com", PipelineRkey: "rkey-2", Workflow: "ci.yml",
304 JobID: 7, Owner: "~tester",
305 Instance: p.defaultInstance, PipelineURI: pipelineURI,
306 }
307 if err := st.InsertSourcehutJob(context.Background(), ref); err != nil {
308 t.Fatalf("insert: %v", err)
309 }
310
311 // Lower the poll interval so the watch loop completes in
312 // milliseconds rather than waiting for the production 5s tick.
313 p.pollInterval = 20 * time.Millisecond
314
315 ctx, cancel := context.WithCancel(context.Background())
316 defer cancel()
317 done := make(chan struct{})
318 go func() {
319 defer close(done)
320 p.watchJob(ctx, ref, p.defaultClient)
321 }()
322
323 select {
324 case <-done:
325 case <-time.After(2 * time.Second):
326 t.Fatal("watch loop did not exit on terminal status")
327 }
328
329 rows, err := st.EventsAfter(context.Background(), 0)
330 if err != nil {
331 t.Fatalf("EventsAfter: %v", err)
332 }
333 gotStatuses := []string{}
334 for _, r := range rows {
335 var rec tangled.PipelineStatus
336 if err := json.Unmarshal(r.EventJSON, &rec); err != nil {
337 t.Fatalf("decode: %v", err)
338 }
339 gotStatuses = append(gotStatuses, rec.Status)
340 }
341 want := []string{"pending", "running", "success"}
342 if fmt.Sprint(gotStatuses) != fmt.Sprint(want) {
343 t.Fatalf("statuses = %v; want %v", gotStatuses, want)
344 }
345}
346
347// TestSourcehutLogs covers the snapshot Logs path: master log + each
348// task in the job's tasks list, bracketed by control frames.
349func TestSourcehutLogs(t *testing.T) {
350 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
351 switch {
352 case r.Method == http.MethodPost && r.URL.Path == "/query":
353 writeGQL(w, map[string]any{
354 "job": map[string]any{
355 "id": 99,
356 "status": "SUCCESS",
357 "owner": map[string]any{"canonicalName": "~tester"},
358 "tasks": []any{
359 map[string]any{"name": "build", "status": "SUCCESS"},
360 map[string]any{"name": "test", "status": "SUCCESS"},
361 },
362 },
363 })
364 case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/log":
365 _, _ = io.WriteString(w, "setup-output\n")
366 case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/build/log":
367 _, _ = io.WriteString(w, "build-line-1\nbuild-line-2\n")
368 case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/test/log":
369 _, _ = io.WriteString(w, "test-line\n")
370 default:
371 http.NotFound(w, r)
372 }
373 })
374 p, st, _, _ := newSourcehutTestProvider(t, handler)
375
376 if err := st.InsertSourcehutJob(context.Background(), SourcehutJobRef{
377 Knot: "knot.example.com", PipelineRkey: "rkey-3", Workflow: "ci.yml",
378 JobID: 99, Owner: "~tester",
379 Instance: p.defaultInstance,
380 PipelineURI: pipelineATURI("knot.example.com", "rkey-3"),
381 }); err != nil {
382 t.Fatalf("insert: %v", err)
383 }
384
385 ch, err := p.Logs(context.Background(),
386 "knot.example.com", "rkey-3", "ci.yml")
387 if err != nil {
388 t.Fatalf("Logs: %v", err)
389 }
390 var lines []LogLine
391 for line := range ch {
392 lines = append(lines, line)
393 }
394
395 wantContents := []string{
396 "setup", "setup-output\n", "setup",
397 "build", "build-line-1\n", "build-line-2\n", "build",
398 "test", "test-line\n", "test",
399 }
400 got := make([]string, 0, len(lines))
401 for _, l := range lines {
402 got = append(got, l.Content)
403 }
404 if fmt.Sprint(got) != fmt.Sprint(wantContents) {
405 t.Fatalf("log contents:\n got: %v\nwant: %v", got, wantContents)
406 }
407 if lines[0].Kind != LogKindControl || lines[0].StepStatus != StepStatusStart {
408 t.Fatalf("first frame not start control: %+v", lines[0])
409 }
410 if lines[len(lines)-1].Kind != LogKindControl ||
411 lines[len(lines)-1].StepStatus != StepStatusEnd {
412 t.Fatalf("last frame not end control: %+v", lines[len(lines)-1])
413 }
414}
415
416// TestSourcehutLogsNotFound asserts the Provider contract: looking up
417// logs for a tuple no job was ever spawned for returns ErrLogsNotFound.
418func TestSourcehutLogsNotFound(t *testing.T) {
419 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
420 t.Fatalf("unexpected request to upstream: %s %s", r.Method, r.URL.Path)
421 })
422 p, _, _, _ := newSourcehutTestProvider(t, handler)
423
424 _, err := p.Logs(context.Background(),
425 "knot.example.com", "rkey-missing", "ci.yml")
426 if err != ErrLogsNotFound {
427 t.Fatalf("err = %v; want ErrLogsNotFound", err)
428 }
429}
430
431// TestSourcehutMapStatus pins the mapping table independently of the
432// rest of the provider, since it's the contract between us and the
433// appview's status enum.
434func TestSourcehutMapStatus(t *testing.T) {
435 cases := []struct {
436 in string
437 want string
438 terminal bool
439 ok bool
440 }{
441 // Sourcehut emits uppercase enum values over GraphQL; the
442 // lowercased variants are kept here as a regression guard
443 // against a future surface flip.
444 {"PENDING", "pending", false, true},
445 {"QUEUED", "pending", false, true},
446 {"RUNNING", "running", false, true},
447 {"SUCCESS", "success", true, true},
448 {"FAILED", "failed", true, true},
449 {"TIMEOUT", "failed", true, true},
450 {"CANCELLED", "cancelled", true, true},
451 {"pending", "pending", false, true},
452 {"weird", "", false, false},
453 }
454 for _, c := range cases {
455 got, terminal, ok := mapSourcehutStatus(c.in)
456 if got != c.want || terminal != c.terminal || ok != c.ok {
457 t.Errorf("mapSourcehutStatus(%q) = %q,%v,%v; want %q,%v,%v",
458 c.in, got, terminal, ok, c.want, c.terminal, c.ok)
459 }
460 }
461}
462
463// TestSourcehutInjectEnvironmentPreservesUserOverride confirms a
464// user-set env entry is not clobbered by our injected default —
465// explicit user intent wins.
466func TestSourcehutInjectEnvironmentPreservesUserOverride(t *testing.T) {
467 manifest := strings.Join([]string{
468 "image: alpine/edge",
469 "environment:",
470 " TACK_KNOT: user-supplied",
471 "tasks:",
472 " - hello: echo hi",
473 }, "\n") + "\n"
474 out, err := injectSourcehutEnvironment(manifest, map[string]string{
475 "TACK_KNOT": "from-tack",
476 "TACK_COMMIT": "abc",
477 })
478 if err != nil {
479 t.Fatalf("inject: %v", err)
480 }
481 if !strings.Contains(out, "TACK_KNOT: user-supplied") {
482 t.Fatalf("user override clobbered: %q", out)
483 }
484 if !strings.Contains(out, "TACK_COMMIT: abc") {
485 t.Fatalf("missing injected var: %q", out)
486 }
487}