Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "io"
6 "log/slog"
7 "net/http"
8 "net/http/httptest"
9 "path/filepath"
10 "strings"
11 "testing"
12 "time"
13
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/pipelines"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/eventstream"
19 "tangled.org/core/notifier"
20 spindledb "tangled.org/core/spindle/db"
21 spindlemodels "tangled.org/core/spindle/models"
22)
23
24func TestColdStart_SpindleEventsRebuildPipelineStatuses(t *testing.T) {
25 ctx := t.Context()
26
27 spindleDB, err := spindledb.Make(ctx, filepath.Join(t.TempDir(), "spindle.db"))
28 if err != nil {
29 t.Fatalf("spindle Make: %v", err)
30 }
31 t.Cleanup(func() { spindleDB.Close() })
32
33 n := notifier.New()
34 workflowId := spindlemodels.WorkflowId{
35 PipelineId: spindlemodels.PipelineId{Knot: "knot.boltless.example", Rkey: "pipeline-rk1"},
36 Name: "build",
37 }
38 for _, step := range []func() error{
39 func() error { return spindleDB.StatusPending(workflowId, &n) },
40 func() error { return spindleDB.StatusRunning(workflowId, &n) },
41 func() error { return spindleDB.StatusSuccess(workflowId, &n) },
42 } {
43 if err := step(); err != nil {
44 t.Fatalf("seed spindle event: %v", err)
45 }
46 }
47
48 mux := http.NewServeMux()
49 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
50 _ = eventstream.Stream(w, r, eventstream.StreamConfig{
51 Backend: spindleDB,
52 Notifier: &n,
53 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
54 })
55 })
56 srv := httptest.NewServer(mux)
57 t.Cleanup(srv.Close)
58 source := ec.Source{Kind: "test", Host: strings.TrimPrefix(srv.URL, "http://"), NoTLS: true}
59
60 appviewDB, err := db.Make(ctx, filepath.Join(t.TempDir(), "appview.db"))
61 if err != nil {
62 t.Fatalf("appview Make: %v", err)
63 }
64 t.Cleanup(func() { appviewDB.Close() })
65
66 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
67 processFunc := spindleIngester(appviewDB, pipelines.NewStatusNotifier())
68
69 cfg := ec.ConsumerConfig{
70 ProcessFunc: processFunc,
71 WorkerCount: 1,
72 QueueSize: 16,
73 ConnectionTimeout: 2 * time.Second,
74 CursorStore: &cursor.MemoryStore{},
75 Logger: logger,
76 }
77 c := ec.NewConsumer(cfg)
78
79 consumerCtx, cancel := context.WithCancel(ctx)
80 defer cancel()
81 c.Start(consumerCtx)
82 c.AddSource(consumerCtx, source)
83
84 deadline := time.Now().Add(3 * time.Second)
85 for time.Now().Before(deadline) {
86 var n int
87 if err := appviewDB.QueryRow(`select count(*) from pipeline_statuses`).Scan(&n); err != nil {
88 t.Fatalf("count: %v", err)
89 }
90 if n >= 3 {
91 break
92 }
93 time.Sleep(20 * time.Millisecond)
94 }
95
96 rows, err := appviewDB.Query(`
97 select spindle, pipeline_knot, pipeline_rkey, workflow, status
98 from pipeline_statuses
99 order by created asc
100 `)
101 if err != nil {
102 t.Fatalf("query: %v", err)
103 }
104 defer rows.Close()
105
106 type rec struct {
107 spindle, knot, rkey, workflow, status string
108 }
109 var got []rec
110 for rows.Next() {
111 var r rec
112 if err := rows.Scan(&r.spindle, &r.knot, &r.rkey, &r.workflow, &r.status); err != nil {
113 t.Fatalf("scan: %v", err)
114 }
115 got = append(got, r)
116 }
117
118 if len(got) != 3 {
119 t.Fatalf("pipeline_statuses rows = %d, want 3: %+v", len(got), got)
120 }
121
122 wantStatuses := []string{"pending", "running", "success"}
123 gotStatuses := map[string]bool{}
124 for _, r := range got {
125 gotStatuses[r.status] = true
126 if r.spindle != source.Host {
127 t.Errorf("spindle = %q, want %q", r.spindle, source.Host)
128 }
129 if r.knot != workflowId.Knot {
130 t.Errorf("pipeline_knot = %q, want %q", r.knot, workflowId.Knot)
131 }
132 if r.rkey != workflowId.Rkey {
133 t.Errorf("pipeline_rkey = %q, want %q", r.rkey, workflowId.Rkey)
134 }
135 if r.workflow != workflowId.Name {
136 t.Errorf("workflow = %q, want %q", r.workflow, workflowId.Name)
137 }
138 }
139 for _, want := range wantStatuses {
140 if !gotStatuses[want] {
141 t.Errorf("missing status %q in projection", want)
142 }
143 }
144}