Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "io" 6 "log/slog" 7 "net/http" 8 "net/http/httptest" 9 "path/filepath" 10 "strings" 11 "testing" 12 "time" 13 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/pipelines" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/eventstream" 19 "tangled.org/core/notifier" 20 spindledb "tangled.org/core/spindle/db" 21 spindlemodels "tangled.org/core/spindle/models" 22) 23 24func TestColdStart_SpindleEventsRebuildPipelineStatuses(t *testing.T) { 25 ctx := t.Context() 26 27 spindleDB, err := spindledb.Make(ctx, filepath.Join(t.TempDir(), "spindle.db")) 28 if err != nil { 29 t.Fatalf("spindle Make: %v", err) 30 } 31 t.Cleanup(func() { spindleDB.Close() }) 32 33 n := notifier.New() 34 workflowId := spindlemodels.WorkflowId{ 35 PipelineId: spindlemodels.PipelineId{Knot: "knot.boltless.example", Rkey: "pipeline-rk1"}, 36 Name: "build", 37 } 38 for _, step := range []func() error{ 39 func() error { return spindleDB.StatusPending(workflowId, &n) }, 40 func() error { return spindleDB.StatusRunning(workflowId, &n) }, 41 func() error { return spindleDB.StatusSuccess(workflowId, &n) }, 42 } { 43 if err := step(); err != nil { 44 t.Fatalf("seed spindle event: %v", err) 45 } 46 } 47 48 mux := http.NewServeMux() 49 mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 50 _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 51 Backend: spindleDB, 52 Notifier: &n, 53 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 54 }) 55 }) 56 srv := httptest.NewServer(mux) 57 t.Cleanup(srv.Close) 58 source := ec.Source{Kind: "test", Host: strings.TrimPrefix(srv.URL, "http://"), NoTLS: true} 59 60 appviewDB, err := db.Make(ctx, filepath.Join(t.TempDir(), "appview.db")) 61 if err != nil { 62 t.Fatalf("appview Make: %v", err) 63 } 64 t.Cleanup(func() { appviewDB.Close() }) 65 66 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 67 processFunc := spindleIngester(appviewDB, pipelines.NewStatusNotifier()) 68 69 cfg := ec.ConsumerConfig{ 70 ProcessFunc: processFunc, 71 WorkerCount: 1, 72 QueueSize: 16, 73 ConnectionTimeout: 2 * time.Second, 74 CursorStore: &cursor.MemoryStore{}, 75 Logger: logger, 76 } 77 c := ec.NewConsumer(cfg) 78 79 consumerCtx, cancel := context.WithCancel(ctx) 80 defer cancel() 81 c.Start(consumerCtx) 82 c.AddSource(consumerCtx, source) 83 84 deadline := time.Now().Add(3 * time.Second) 85 for time.Now().Before(deadline) { 86 var n int 87 if err := appviewDB.QueryRow(`select count(*) from pipeline_statuses`).Scan(&n); err != nil { 88 t.Fatalf("count: %v", err) 89 } 90 if n >= 3 { 91 break 92 } 93 time.Sleep(20 * time.Millisecond) 94 } 95 96 rows, err := appviewDB.Query(` 97 select spindle, pipeline_knot, pipeline_rkey, workflow, status 98 from pipeline_statuses 99 order by created asc 100 `) 101 if err != nil { 102 t.Fatalf("query: %v", err) 103 } 104 defer rows.Close() 105 106 type rec struct { 107 spindle, knot, rkey, workflow, status string 108 } 109 var got []rec 110 for rows.Next() { 111 var r rec 112 if err := rows.Scan(&r.spindle, &r.knot, &r.rkey, &r.workflow, &r.status); err != nil { 113 t.Fatalf("scan: %v", err) 114 } 115 got = append(got, r) 116 } 117 118 if len(got) != 3 { 119 t.Fatalf("pipeline_statuses rows = %d, want 3: %+v", len(got), got) 120 } 121 122 wantStatuses := []string{"pending", "running", "success"} 123 gotStatuses := map[string]bool{} 124 for _, r := range got { 125 gotStatuses[r.status] = true 126 if r.spindle != source.Host { 127 t.Errorf("spindle = %q, want %q", r.spindle, source.Host) 128 } 129 if r.knot != workflowId.Knot { 130 t.Errorf("pipeline_knot = %q, want %q", r.knot, workflowId.Knot) 131 } 132 if r.rkey != workflowId.Rkey { 133 t.Errorf("pipeline_rkey = %q, want %q", r.rkey, workflowId.Rkey) 134 } 135 if r.workflow != workflowId.Name { 136 t.Errorf("workflow = %q, want %q", r.workflow, workflowId.Name) 137 } 138 } 139 for _, want := range wantStatuses { 140 if !gotStatuses[want] { 141 t.Errorf("missing status %q in projection", want) 142 } 143 } 144}