Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "io"
6 "log/slog"
7 "net/http"
8 "net/http/httptest"
9 "path/filepath"
10 "strings"
11 "testing"
12 "time"
13
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/pipelines"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/eventstream"
19 "tangled.org/core/notifier"
20 spindledb "tangled.org/core/spindle/db"
21 spindlemodels "tangled.org/core/spindle/models"
22)
23
24func TestColdStart_SpindleEventsRebuildPipelineStatuses(t *testing.T) {
25 ctx := t.Context()
26
27 spindleDB, err := spindledb.Make(ctx, filepath.Join(t.TempDir(), "spindle.db"))
28 if err != nil {
29 t.Fatalf("spindle Make: %v", err)
30 }
31 t.Cleanup(func() { spindleDB.Close() })
32
33 n := notifier.New()
34 workflowId := spindlemodels.WorkflowId{
35 PipelineId: spindlemodels.PipelineId{Knot: "knot.boltless.example", Rkey: "pipeline-rk1"},
36 Name: "build",
37 }
38 for _, step := range []func() error{
39 func() error { return spindleDB.StatusPending(workflowId, &n) },
40 func() error { return spindleDB.StatusRunning(workflowId, &n) },
41 func() error { return spindleDB.StatusSuccess(workflowId, &n) },
42 } {
43 if err := step(); err != nil {
44 t.Fatalf("seed spindle event: %v", err)
45 }
46 }
47
48 mux := http.NewServeMux()
49 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
50 _ = eventstream.Stream(w, r, eventstream.StreamConfig{
51 Backend: spindleDB,
52 Notifier: &n,
53 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
54 })
55 })
56 srv := httptest.NewServer(mux)
57 t.Cleanup(srv.Close)
58 source := ec.Source{Kind: "test", Host: strings.TrimPrefix(srv.URL, "http://")}
59
60 appviewDB, err := db.Make(ctx, filepath.Join(t.TempDir(), "appview.db"))
61 if err != nil {
62 t.Fatalf("appview Make: %v", err)
63 }
64 t.Cleanup(func() { appviewDB.Close() })
65
66 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
67 processFunc := spindleIngester(appviewDB, pipelines.NewStatusNotifier())
68
69 cfg := ec.ConsumerConfig{
70 ProcessFunc: processFunc,
71 WorkerCount: 1,
72 QueueSize: 16,
73 ConnectionTimeout: 2 * time.Second,
74 CursorStore: &cursor.MemoryStore{},
75 URLFunc: ec.DefaultURL(true),
76 Logger: logger,
77 }
78 c := ec.NewConsumer(cfg)
79
80 consumerCtx, cancel := context.WithCancel(ctx)
81 defer cancel()
82 c.Start(consumerCtx)
83 c.AddSource(consumerCtx, source)
84
85 deadline := time.Now().Add(3 * time.Second)
86 for time.Now().Before(deadline) {
87 var n int
88 if err := appviewDB.QueryRow(`select count(*) from pipeline_statuses`).Scan(&n); err != nil {
89 t.Fatalf("count: %v", err)
90 }
91 if n >= 3 {
92 break
93 }
94 time.Sleep(20 * time.Millisecond)
95 }
96
97 rows, err := appviewDB.Query(`
98 select spindle, pipeline_knot, pipeline_rkey, workflow, status
99 from pipeline_statuses
100 order by created asc
101 `)
102 if err != nil {
103 t.Fatalf("query: %v", err)
104 }
105 defer rows.Close()
106
107 type rec struct {
108 spindle, knot, rkey, workflow, status string
109 }
110 var got []rec
111 for rows.Next() {
112 var r rec
113 if err := rows.Scan(&r.spindle, &r.knot, &r.rkey, &r.workflow, &r.status); err != nil {
114 t.Fatalf("scan: %v", err)
115 }
116 got = append(got, r)
117 }
118
119 if len(got) != 3 {
120 t.Fatalf("pipeline_statuses rows = %d, want 3: %+v", len(got), got)
121 }
122
123 wantStatuses := []string{"pending", "running", "success"}
124 gotStatuses := map[string]bool{}
125 for _, r := range got {
126 gotStatuses[r.status] = true
127 if r.spindle != source.Host {
128 t.Errorf("spindle = %q, want %q", r.spindle, source.Host)
129 }
130 if r.knot != workflowId.Knot {
131 t.Errorf("pipeline_knot = %q, want %q", r.knot, workflowId.Knot)
132 }
133 if r.rkey != workflowId.Rkey {
134 t.Errorf("pipeline_rkey = %q, want %q", r.rkey, workflowId.Rkey)
135 }
136 if r.workflow != workflowId.Name {
137 t.Errorf("workflow = %q, want %q", r.workflow, workflowId.Name)
138 }
139 }
140 for _, want := range wantStatuses {
141 if !gotStatuses[want] {
142 t.Errorf("missing status %q in projection", want)
143 }
144 }
145}