Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/kyxspm 3.9 kB View raw
1package state 2 3import ( 4 "context" 5 "io" 6 "log/slog" 7 "net/http" 8 "net/http/httptest" 9 "path/filepath" 10 "strings" 11 "testing" 12 "time" 13 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/pipelines" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/eventstream" 19 "tangled.org/core/notifier" 20 spindledb "tangled.org/core/spindle/db" 21 spindlemodels "tangled.org/core/spindle/models" 22) 23 24func TestColdStart_SpindleEventsRebuildPipelineStatuses(t *testing.T) { 25 ctx := t.Context() 26 27 spindleDB, err := spindledb.Make(ctx, filepath.Join(t.TempDir(), "spindle.db")) 28 if err != nil { 29 t.Fatalf("spindle Make: %v", err) 30 } 31 t.Cleanup(func() { spindleDB.Close() }) 32 33 n := notifier.New() 34 workflowId := spindlemodels.WorkflowId{ 35 PipelineId: spindlemodels.PipelineId{Knot: "knot.boltless.example", Rkey: "pipeline-rk1"}, 36 Name: "build", 37 } 38 for _, step := range []func() error{ 39 func() error { return spindleDB.StatusPending(workflowId, &n) }, 40 func() error { return spindleDB.StatusRunning(workflowId, &n) }, 41 func() error { return spindleDB.StatusSuccess(workflowId, &n) }, 42 } { 43 if err := step(); err != nil { 44 t.Fatalf("seed spindle event: %v", err) 45 } 46 } 47 48 mux := http.NewServeMux() 49 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 50 _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 51 Backend: spindleDB, 52 Notifier: &n, 53 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 54 }) 55 }) 56 srv := httptest.NewServer(mux) 57 t.Cleanup(srv.Close) 58 source := ec.Source{Kind: "test", Host: strings.TrimPrefix(srv.URL, "http://")} 59 60 appviewDB, err := db.Make(ctx, filepath.Join(t.TempDir(), "appview.db")) 61 if err != nil { 62 t.Fatalf("appview Make: %v", err) 63 } 64 t.Cleanup(func() { appviewDB.Close() }) 65 66 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 67 processFunc := spindleIngester(appviewDB, pipelines.NewStatusNotifier()) 68 69 cfg := ec.ConsumerConfig{ 70 ProcessFunc: processFunc, 71 WorkerCount: 1, 72 QueueSize: 16, 73 ConnectionTimeout: 2 * time.Second, 74 CursorStore: &cursor.MemoryStore{}, 75 URLFunc: ec.DefaultURL(true), 76 Logger: logger, 77 } 78 c := ec.NewConsumer(cfg) 79 80 consumerCtx, cancel := context.WithCancel(ctx) 81 defer cancel() 82 c.Start(consumerCtx) 83 c.AddSource(consumerCtx, source) 84 85 deadline := time.Now().Add(3 * time.Second) 86 for time.Now().Before(deadline) { 87 var n int 88 if err := appviewDB.QueryRow(`select count(*) from pipeline_statuses`).Scan(&n); err != nil { 89 t.Fatalf("count: %v", err) 90 } 91 if n >= 3 { 92 break 93 } 94 time.Sleep(20 * time.Millisecond) 95 } 96 97 rows, err := appviewDB.Query(` 98 select spindle, pipeline_knot, pipeline_rkey, workflow, status 99 from pipeline_statuses 100 order by created asc 101 `) 102 if err != nil { 103 t.Fatalf("query: %v", err) 104 } 105 defer rows.Close() 106 107 type rec struct { 108 spindle, knot, rkey, workflow, status string 109 } 110 var got []rec 111 for rows.Next() { 112 var r rec 113 if err := rows.Scan(&r.spindle, &r.knot, &r.rkey, &r.workflow, &r.status); err != nil { 114 t.Fatalf("scan: %v", err) 115 } 116 got = append(got, r) 117 } 118 119 if len(got) != 3 { 120 t.Fatalf("pipeline_statuses rows = %d, want 3: %+v", len(got), got) 121 } 122 123 wantStatuses := []string{"pending", "running", "success"} 124 gotStatuses := map[string]bool{} 125 for _, r := range got { 126 gotStatuses[r.status] = true 127 if r.spindle != source.Host { 128 t.Errorf("spindle = %q, want %q", r.spindle, source.Host) 129 } 130 if r.knot != workflowId.Knot { 131 t.Errorf("pipeline_knot = %q, want %q", r.knot, workflowId.Knot) 132 } 133 if r.rkey != workflowId.Rkey { 134 t.Errorf("pipeline_rkey = %q, want %q", r.rkey, workflowId.Rkey) 135 } 136 if r.workflow != workflowId.Name { 137 t.Errorf("workflow = %q, want %q", r.workflow, workflowId.Name) 138 } 139 } 140 for _, want := range wantStatuses { 141 if !gotStatuses[want] { 142 t.Errorf("missing status %q in projection", want) 143 } 144 } 145}