Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 15 kB View raw
1package main 2 3// Provider-level integration tests for the sourcehut implementation: 4// Spawn → SubmitJob + persist + initial pending publish, watch loop → 5// status transitions, and Logs → snapshot of master + per-task logs. 6// builds.sr.ht is stubbed with httptest so the tests don't need 7// network access. 8 9import ( 10 "context" 11 "encoding/json" 12 "fmt" 13 "io" 14 "log/slog" 15 "net/http" 16 "net/http/httptest" 17 "strings" 18 "sync/atomic" 19 "testing" 20 "time" 21 22 "tangled.org/core/api/tangled" 23) 24 25// newSourcehutTestProvider wires a sourcehutProvider against an 26// httptest server impersonating builds.sr.ht. 27func newSourcehutTestProvider( 28 t *testing.T, 29 handler http.HandlerFunc, 30) (*sourcehutProvider, *store, *broker, *httptest.Server) { 31 t.Helper() 32 srv := httptest.NewServer(handler) 33 t.Cleanup(srv.Close) 34 35 st := newTestStore(t) 36 br := newBroker(st) 37 logger := slog.Default() 38 p := newSourcehutProvider(br, st, "tok", srv.URL, logger) 39 return p, st, br, srv 40} 41 42// gqlBody is a minimal GraphQL request body for routing test stubs 43// without pulling in a real GraphQL parser. 44type gqlBody struct { 45 Query string `json:"query"` 46 Variables map[string]any `json:"variables"` 47} 48 49// gqlOp returns the op keyword ("query" / "mutation" / "subscription") 50// at the head of a GraphQL document. Used by test stubs to route a 51// single /query endpoint to the right canned response without 52// substring-matching on field names — which would silently misroute a 53// query that happened to mention "submit" or vice-versa. 54func gqlOp(query string) string { 55 q := strings.TrimSpace(query) 56 for _, kw := range []string{"mutation", "query", "subscription"} { 57 if strings.HasPrefix(q, kw) { 58 return kw 59 } 60 } 61 return "" 62} 63 64// writeGQL is the test-side helper for shaping a successful GraphQL 65// response. It wraps `data` under the canonical {"data": …} envelope 66// the client decodes against. 67func writeGQL(w http.ResponseWriter, data map[string]any) { 68 w.Header().Set("Content-Type", "application/json") 69 _ = json.NewEncoder(w).Encode(map[string]any{"data": data}) 70} 71 72// TestSourcehutSpawn covers the full submit path: trigger → API call 73// → DB row → "pending" status on the broker. 74func TestSourcehutSpawn(t *testing.T) { 75 manifestCh := make(chan string, 1) 76 notesCh := make(chan string, 1) 77 tagsCh := make(chan []string, 1) 78 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 79 if r.Method != http.MethodPost || r.URL.Path != "/query" { 80 http.NotFound(w, r) 81 return 82 } 83 var body gqlBody 84 _ = json.NewDecoder(r.Body).Decode(&body) 85 switch gqlOp(body.Query) { 86 case "mutation": 87 if m, _ := body.Variables["manifest"].(string); m != "" { 88 select { 89 case manifestCh <- m: 90 default: 91 } 92 } 93 if n, _ := body.Variables["note"].(string); n != "" { 94 select { 95 case notesCh <- n: 96 default: 97 } 98 } 99 if raw, ok := body.Variables["tags"].([]any); ok { 100 ts := make([]string, 0, len(raw)) 101 for _, v := range raw { 102 if s, ok := v.(string); ok { 103 ts = append(ts, s) 104 } 105 } 106 select { 107 case tagsCh <- ts: 108 default: 109 } 110 } 111 writeGQL(w, map[string]any{ 112 "submit": map[string]any{ 113 "id": 42, 114 "status": "PENDING", 115 "owner": map[string]any{"canonicalName": "~tester"}, 116 }, 117 }) 118 case "query": 119 // Watch loop; keep the job pending so the test exits 120 // deterministically once the row and initial publish land. 121 writeGQL(w, map[string]any{ 122 "job": map[string]any{ 123 "id": 42, 124 "status": "PENDING", 125 "owner": map[string]any{"canonicalName": "~tester"}, 126 "tasks": []any{}, 127 }, 128 }) 129 default: 130 http.Error(w, "unknown op", http.StatusBadRequest) 131 } 132 }) 133 p, st, _, _ := newSourcehutTestProvider(t, handler) 134 135 trigger := &tangled.Pipeline_TriggerMetadata{ 136 Push: &tangled.Pipeline_PushTriggerData{ 137 NewSha: "abcdef0123", 138 Ref: "refs/heads/main", 139 }, 140 } 141 raw := strings.Join([]string{ 142 "tack:", 143 " sourcehut:", 144 " manifest: |", 145 " image: alpine/edge", 146 " tasks:", 147 " - hello: |", 148 " echo hi", 149 " tags: [tack, ci]", 150 " note: integration", 151 }, "\n") + "\n" 152 153 ctx, cancel := context.WithCancel(context.Background()) 154 defer cancel() 155 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger, 156 []*tangled.Pipeline_Workflow{{Name: "ci.yml", Raw: raw}}, 157 ) 158 159 select { 160 case manifest := <-manifestCh: 161 // Manifest should round-trip through injectSourcehutEnvironment; 162 // we expect TACK_* env vars to land in a top-level environment 163 // block that didn't originally exist. 164 if !strings.Contains(manifest, "TACK_KNOT") { 165 t.Fatalf("manifest missing TACK_KNOT: %q", manifest) 166 } 167 if !strings.Contains(manifest, "abcdef0123") { 168 t.Fatalf("manifest missing TACK_COMMIT value: %q", manifest) 169 } 170 // User-supplied tasks must be preserved through the round trip. 171 if !strings.Contains(manifest, "echo hi") { 172 t.Fatalf("user manifest body lost: %q", manifest) 173 } 174 select { 175 case note := <-notesCh: 176 if note != "integration" { 177 t.Fatalf("note=%q; want integration", note) 178 } 179 case <-time.After(time.Second): 180 t.Fatal("note variable not sent") 181 } 182 select { 183 case ts := <-tagsCh: 184 if len(ts) != 2 || ts[0] != "tack" || ts[1] != "ci" { 185 t.Fatalf("tags=%v", ts) 186 } 187 case <-time.After(time.Second): 188 t.Fatal("tags variable not sent") 189 } 190 case <-time.After(2 * time.Second): 191 t.Fatal("SubmitJob not called") 192 } 193 194 // Wait for the row to land — that's the load-bearing artifact. 195 deadline := time.Now().Add(2 * time.Second) 196 var ref *SourcehutJobRef 197 for time.Now().Before(deadline) { 198 var err error 199 ref, err = st.LookupSourcehutJobByTuple( 200 context.Background(), 201 "knot.example.com", "rkey-1", "ci.yml", 202 ) 203 if err != nil { 204 t.Fatalf("lookup: %v", err) 205 } 206 if ref != nil { 207 break 208 } 209 time.Sleep(20 * time.Millisecond) 210 } 211 if ref == nil { 212 t.Fatal("sourcehut job row not persisted within deadline") 213 } 214 if ref.JobID != 42 || ref.Owner != "~tester" { 215 t.Fatalf("ref mismatch: %+v", ref) 216 } 217 218 rows, err := st.EventsAfter(context.Background(), 0) 219 if err != nil { 220 t.Fatalf("EventsAfter: %v", err) 221 } 222 if len(rows) == 0 { 223 t.Fatal("no pending status published") 224 } 225 var rec tangled.PipelineStatus 226 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 227 t.Fatalf("decode status: %v", err) 228 } 229 if rec.Status != "pending" || rec.Workflow != "ci.yml" { 230 t.Fatalf("unexpected status: %+v", rec) 231 } 232 if !strings.Contains(rec.Pipeline, "knot.example.com") || 233 !strings.Contains(rec.Pipeline, "rkey-1") { 234 t.Fatalf("pipeline ATURI wrong: %s", rec.Pipeline) 235 } 236} 237 238// TestSourcehutSpawnInvalidConfig confirms a workflow without a manifest 239// is rejected loudly without firing any HTTP call. 240func TestSourcehutSpawnInvalidConfig(t *testing.T) { 241 var called atomic.Bool 242 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 243 called.Store(true) 244 }) 245 p, st, _, _ := newSourcehutTestProvider(t, handler) 246 247 p.Spawn(context.Background(), "knot.example.com", "rkey-1", "did:plc:actor", 248 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{ 249 NewSha: "abc", Ref: "refs/heads/main", 250 }}, 251 []*tangled.Pipeline_Workflow{ 252 // `tack.sourcehut` block but no manifest. 253 {Name: "ci.yml", Raw: "tack:\n sourcehut:\n note: hi\n"}, 254 }, 255 ) 256 257 time.Sleep(50 * time.Millisecond) 258 if called.Load() { 259 t.Fatal("SubmitJob called despite missing manifest") 260 } 261 rows, _ := st.EventsAfter(context.Background(), 0) 262 if len(rows) != 0 { 263 t.Fatalf("got %d events, want 0", len(rows)) 264 } 265} 266 267// TestSourcehutWatchTransitions exercises the watch loop: a job that 268// flips through PENDING → RUNNING → SUCCESS should produce three 269// distinct status records in order, with no duplicates. 270func TestSourcehutWatchTransitions(t *testing.T) { 271 states := []string{"PENDING", "RUNNING", "RUNNING", "SUCCESS"} 272 var idx atomic.Int32 273 idx.Store(-1) 274 275 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 276 if r.Method != http.MethodPost || r.URL.Path != "/query" { 277 http.NotFound(w, r) 278 return 279 } 280 var body gqlBody 281 _ = json.NewDecoder(r.Body).Decode(&body) 282 if gqlOp(body.Query) != "query" { 283 http.Error(w, "unknown op", http.StatusBadRequest) 284 return 285 } 286 n := int(idx.Add(1)) 287 if n >= len(states) { 288 n = len(states) - 1 289 } 290 writeGQL(w, map[string]any{ 291 "job": map[string]any{ 292 "id": 7, 293 "status": states[n], 294 "owner": map[string]any{"canonicalName": "~tester"}, 295 "tasks": []any{}, 296 }, 297 }) 298 }) 299 p, st, _, _ := newSourcehutTestProvider(t, handler) 300 301 pipelineURI := pipelineATURI("knot.example.com", "rkey-2") 302 ref := SourcehutJobRef{ 303 Knot: "knot.example.com", PipelineRkey: "rkey-2", Workflow: "ci.yml", 304 JobID: 7, Owner: "~tester", 305 Instance: p.defaultInstance, PipelineURI: pipelineURI, 306 } 307 if err := st.InsertSourcehutJob(context.Background(), ref); err != nil { 308 t.Fatalf("insert: %v", err) 309 } 310 311 // Lower the poll interval so the watch loop completes in 312 // milliseconds rather than waiting for the production 5s tick. 313 p.pollInterval = 20 * time.Millisecond 314 315 ctx, cancel := context.WithCancel(context.Background()) 316 defer cancel() 317 done := make(chan struct{}) 318 go func() { 319 defer close(done) 320 p.watchJob(ctx, ref, p.defaultClient) 321 }() 322 323 select { 324 case <-done: 325 case <-time.After(2 * time.Second): 326 t.Fatal("watch loop did not exit on terminal status") 327 } 328 329 rows, err := st.EventsAfter(context.Background(), 0) 330 if err != nil { 331 t.Fatalf("EventsAfter: %v", err) 332 } 333 gotStatuses := []string{} 334 for _, r := range rows { 335 var rec tangled.PipelineStatus 336 if err := json.Unmarshal(r.EventJSON, &rec); err != nil { 337 t.Fatalf("decode: %v", err) 338 } 339 gotStatuses = append(gotStatuses, rec.Status) 340 } 341 want := []string{"pending", "running", "success"} 342 if fmt.Sprint(gotStatuses) != fmt.Sprint(want) { 343 t.Fatalf("statuses = %v; want %v", gotStatuses, want) 344 } 345} 346 347// TestSourcehutLogs covers the snapshot Logs path: master log + each 348// task in the job's tasks list, bracketed by control frames. 349func TestSourcehutLogs(t *testing.T) { 350 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 351 switch { 352 case r.Method == http.MethodPost && r.URL.Path == "/query": 353 writeGQL(w, map[string]any{ 354 "job": map[string]any{ 355 "id": 99, 356 "status": "SUCCESS", 357 "owner": map[string]any{"canonicalName": "~tester"}, 358 "tasks": []any{ 359 map[string]any{"name": "build", "status": "SUCCESS"}, 360 map[string]any{"name": "test", "status": "SUCCESS"}, 361 }, 362 }, 363 }) 364 case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/log": 365 _, _ = io.WriteString(w, "setup-output\n") 366 case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/build/log": 367 _, _ = io.WriteString(w, "build-line-1\nbuild-line-2\n") 368 case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/test/log": 369 _, _ = io.WriteString(w, "test-line\n") 370 default: 371 http.NotFound(w, r) 372 } 373 }) 374 p, st, _, _ := newSourcehutTestProvider(t, handler) 375 376 if err := st.InsertSourcehutJob(context.Background(), SourcehutJobRef{ 377 Knot: "knot.example.com", PipelineRkey: "rkey-3", Workflow: "ci.yml", 378 JobID: 99, Owner: "~tester", 379 Instance: p.defaultInstance, 380 PipelineURI: pipelineATURI("knot.example.com", "rkey-3"), 381 }); err != nil { 382 t.Fatalf("insert: %v", err) 383 } 384 385 ch, err := p.Logs(context.Background(), 386 "knot.example.com", "rkey-3", "ci.yml") 387 if err != nil { 388 t.Fatalf("Logs: %v", err) 389 } 390 var lines []LogLine 391 for line := range ch { 392 lines = append(lines, line) 393 } 394 395 wantContents := []string{ 396 "setup", "setup-output\n", "setup", 397 "build", "build-line-1\n", "build-line-2\n", "build", 398 "test", "test-line\n", "test", 399 } 400 got := make([]string, 0, len(lines)) 401 for _, l := range lines { 402 got = append(got, l.Content) 403 } 404 if fmt.Sprint(got) != fmt.Sprint(wantContents) { 405 t.Fatalf("log contents:\n got: %v\nwant: %v", got, wantContents) 406 } 407 if lines[0].Kind != LogKindControl || lines[0].StepStatus != StepStatusStart { 408 t.Fatalf("first frame not start control: %+v", lines[0]) 409 } 410 if lines[len(lines)-1].Kind != LogKindControl || 411 lines[len(lines)-1].StepStatus != StepStatusEnd { 412 t.Fatalf("last frame not end control: %+v", lines[len(lines)-1]) 413 } 414} 415 416// TestSourcehutLogsNotFound asserts the Provider contract: looking up 417// logs for a tuple no job was ever spawned for returns ErrLogsNotFound. 418func TestSourcehutLogsNotFound(t *testing.T) { 419 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 420 t.Fatalf("unexpected request to upstream: %s %s", r.Method, r.URL.Path) 421 }) 422 p, _, _, _ := newSourcehutTestProvider(t, handler) 423 424 _, err := p.Logs(context.Background(), 425 "knot.example.com", "rkey-missing", "ci.yml") 426 if err != ErrLogsNotFound { 427 t.Fatalf("err = %v; want ErrLogsNotFound", err) 428 } 429} 430 431// TestSourcehutMapStatus pins the mapping table independently of the 432// rest of the provider, since it's the contract between us and the 433// appview's status enum. 434func TestSourcehutMapStatus(t *testing.T) { 435 cases := []struct { 436 in string 437 want string 438 terminal bool 439 ok bool 440 }{ 441 // Sourcehut emits uppercase enum values over GraphQL; the 442 // lowercased variants are kept here as a regression guard 443 // against a future surface flip. 444 {"PENDING", "pending", false, true}, 445 {"QUEUED", "pending", false, true}, 446 {"RUNNING", "running", false, true}, 447 {"SUCCESS", "success", true, true}, 448 {"FAILED", "failed", true, true}, 449 {"TIMEOUT", "failed", true, true}, 450 {"CANCELLED", "cancelled", true, true}, 451 {"pending", "pending", false, true}, 452 {"weird", "", false, false}, 453 } 454 for _, c := range cases { 455 got, terminal, ok := mapSourcehutStatus(c.in) 456 if got != c.want || terminal != c.terminal || ok != c.ok { 457 t.Errorf("mapSourcehutStatus(%q) = %q,%v,%v; want %q,%v,%v", 458 c.in, got, terminal, ok, c.want, c.terminal, c.ok) 459 } 460 } 461} 462 463// TestSourcehutInjectEnvironmentPreservesUserOverride confirms a 464// user-set env entry is not clobbered by our injected default — 465// explicit user intent wins. 466func TestSourcehutInjectEnvironmentPreservesUserOverride(t *testing.T) { 467 manifest := strings.Join([]string{ 468 "image: alpine/edge", 469 "environment:", 470 " TACK_KNOT: user-supplied", 471 "tasks:", 472 " - hello: echo hi", 473 }, "\n") + "\n" 474 out, err := injectSourcehutEnvironment(manifest, map[string]string{ 475 "TACK_KNOT": "from-tack", 476 "TACK_COMMIT": "abc", 477 }) 478 if err != nil { 479 t.Fatalf("inject: %v", err) 480 } 481 if !strings.Contains(out, "TACK_KNOT: user-supplied") { 482 t.Fatalf("user override clobbered: %q", out) 483 } 484 if !strings.Contains(out, "TACK_COMMIT: abc") { 485 t.Fatalf("missing injected var: %q", out) 486 } 487}