Stitch any CI into Tangled
1package main
2
3// fakeProvider is a stand-in Provider implementation: it doesn't talk
4// to any external CI. For each workflow in a triggered pipeline it
5// spawns a goroutine that emits a fixed-cadence stream of
6// sh.tangled.pipeline.status records — "running" every five seconds
7// for thirty seconds, then a final "success" — through the broker.
8// In parallel it appends synthetic LogLine records into an in-memory
9// buffer, so /logs queries against a fake run replay something that
10// matches the upstream spindle's wire format.
11//
12// The point is to exercise the entire trigger → broker → /events →
13// appview path end-to-end before any real CI integration exists. Once
14// the Buildkite provider lands, this one stays around as a reference
15// implementation and as the test double of choice when a test wants
16// "something that publishes plausible status updates" without the
17// timing weight of real builds.
18
19import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "log/slog"
24 "sync"
25 "time"
26
27 "tangled.org/core/api/tangled"
28)
29
30// Fake-job timing knobs. Pulled out as constants so it's obvious
31// where the numbers come from and they can be tuned independently of
32// the rest of the file. Total publishes per workflow =
33// (fakeJobDuration / fakeJobInterval) heartbeats + 1 final success.
34const (
35 fakeJobDuration = 30 * time.Second
36 fakeJobInterval = 5 * time.Second
37)
38
39// fakeLogChanBuffer is the size of the buffered channel returned by
40// fakeProvider.Logs. Big enough that a snapshot of a completed fake
41// run (≈ 8 lines today) drops in without blocking the producer
42// goroutine even if the consumer is briefly slow; small enough that
43// a misbehaving consumer can't pin a runaway amount of memory.
44const fakeLogChanBuffer = 64
45
46// fakeProvider implements Provider against the in-process broker.
47//
48// logs holds the captured per-workflow LogLine slices, keyed by the
49// fakeLogKey of (knot, pipelineRkey, workflow). The mutex guards the
50// map *and* concurrent appends to any contained slice — runWorkflow
51// is the only writer for a given key, but Logs() may snapshot the
52// slice concurrently.
53type fakeProvider struct {
54 br *broker
55 log *slog.Logger
56
57 mu sync.Mutex
58 logs map[string][]LogLine
59}
60
61// Compile-time interface check — keeps the fake honest if Provider
62// ever gains additional methods.
63var _ Provider = (*fakeProvider)(nil)
64
65// newFakeProvider constructs a fakeProvider bound to br. The provided
66// logger is annotated with component=provider so its output stands
67// apart from the knot-consumer / jetstream noise.
68func newFakeProvider(br *broker, log *slog.Logger) *fakeProvider {
69 return &fakeProvider{
70 br: br,
71 log: log.With("component", "provider", "kind", "fake"),
72 logs: make(map[string][]LogLine),
73 }
74}
75
76// Spawn satisfies Provider. It kicks off one runWorkflow goroutine per
77// workflow, returning immediately so the eventconsumer worker that
78// invoked us isn't blocked. Goroutines inherit ctx (app-scoped) and
79// will exit early on cancellation.
80func (p *fakeProvider) Spawn(
81 ctx context.Context,
82 knot string,
83 pipelineRkey string,
84 _ *tangled.Pipeline_TriggerMetadata,
85 workflows []*tangled.Pipeline_Workflow,
86) {
87 if len(workflows) == 0 {
88 // Without a workflow name there's no valid pipeline.status
89 // record to publish. Log loudly enough that an operator
90 // staring at the logs can tell the trigger arrived but
91 // produced no fake activity.
92 p.log.Warn("pipeline has no workflows; skipping fake run",
93 "knot", knot, "rkey", pipelineRkey,
94 )
95 return
96 }
97 for _, wf := range workflows {
98 // Defensive: the lexicon allows pointer entries and doesn't
99 // enforce non-empty names. We can't publish a status for an
100 // unnamed workflow, so just skip it.
101 if wf == nil || wf.Name == "" {
102 continue
103 }
104 go p.runWorkflow(ctx, knot, pipelineRkey, wf.Name)
105 }
106}
107
108// Logs satisfies Provider. It snapshots the in-memory LogLine slice
109// for (knot, pipelineRkey, workflow) and returns a buffered channel
110// that a goroutine drains the snapshot into. The snapshot is taken
111// under the mutex so subsequent appends by a still-running workflow
112// won't be reflected — clients that want live updates re-poll. We
113// don't follow live writes here; the fake's purpose is to exercise
114// the wire format end-to-end, not to reproduce hpcloud/tail.
115//
116// The producer goroutine honours ctx so that a disconnecting HTTP
117// client tears the channel send down promptly instead of waiting on
118// an unbuffered receiver.
119func (p *fakeProvider) Logs(
120 ctx context.Context,
121 knot string,
122 pipelineRkey string,
123 workflow string,
124) (<-chan LogLine, error) {
125 key := fakeLogKey(knot, pipelineRkey, workflow)
126
127 p.mu.Lock()
128 stored, ok := p.logs[key]
129 if !ok {
130 p.mu.Unlock()
131 return nil, ErrLogsNotFound
132 }
133 // Copy under the lock so subsequent appendLogLine writes can't
134 // race with our drain goroutine reading the slice header.
135 snapshot := make([]LogLine, len(stored))
136 copy(snapshot, stored)
137 p.mu.Unlock()
138
139 out := make(chan LogLine, fakeLogChanBuffer)
140 go func() {
141 defer close(out)
142 for _, line := range snapshot {
143 select {
144 case <-ctx.Done():
145 return
146 case out <- line:
147 }
148 }
149 }()
150 return out, nil
151}
152
153// runWorkflow emits a "running" status every fakeJobInterval until
154// fakeJobDuration elapses, then a final "success". Alongside each
155// status it appends a corresponding LogLine into the in-memory log
156// buffer so /logs returns something coherent.
157//
158// On ctx cancellation it returns without issuing the terminal publish
159// or the closing control frame — the broker's underlying store may
160// already be closing during shutdown.
161func (p *fakeProvider) runWorkflow(ctx context.Context, knot, pipelineRkey, workflow string) {
162 // pipelineURI is what the appview parses out of the status record
163 // to associate it back with the originating pipeline. Format
164 // mirrors the upstream spindle's emission:
165 // at://did:web:<knot>/<nsid>/<rkey>. The appview strips the
166 // did:web: prefix and treats the remainder as the knot identifier.
167 pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s",
168 knot, tangled.PipelineNSID, pipelineRkey,
169 )
170
171 logger := p.log.With(
172 "knot", knot,
173 "pipeline_rkey", pipelineRkey,
174 "workflow", workflow,
175 )
176
177 // Start control frame. The appview's log renderer keys timing on
178 // matching start/end control frames per step_id, so we always
179 // emit one even though the fake has only a single synthetic step.
180 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
181 Kind: LogKindControl,
182 Time: time.Now(),
183 Content: workflow,
184 StepId: 0,
185 StepStatus: StepStatusStart,
186 })
187
188 // Heartbeat phase. seq doubles as a per-workflow disambiguator
189 // in the synthesized status rkey so concurrent fakes (across
190 // workflows or pipelines) don't collide.
191 deadline := time.Now().Add(fakeJobDuration)
192 seq := 0
193 for time.Now().Before(deadline) {
194 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
195 Kind: LogKindData,
196 Time: time.Now(),
197 Content: fmt.Sprintf("[fake] heartbeat %d at %s\n",
198 seq, time.Now().UTC().Format(time.RFC3339),
199 ),
200 StepId: 0,
201 Stream: "stdout",
202 })
203 if err := p.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil {
204 logger.Error("publish fake running status", "err", err, "seq", seq)
205 return
206 }
207 seq++
208 select {
209 case <-ctx.Done():
210 logger.Debug("fake job cancelled mid-run", "seq", seq)
211 return
212 case <-time.After(fakeJobInterval):
213 }
214 }
215
216 // End control + terminal status publish. "success" matches the
217 // upstream StatusKind enum (see tangled.org/core/spindle/models)
218 // — the appview routes status strings through that same enum.
219 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
220 Kind: LogKindControl,
221 Time: time.Now(),
222 Content: workflow,
223 StepId: 0,
224 StepStatus: StepStatusEnd,
225 })
226 if err := p.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil {
227 logger.Error("publish fake success status", "err", err, "seq", seq)
228 return
229 }
230 logger.Info("fake job complete")
231}
232
233// publishStatus assembles a tangled.PipelineStatus record, marshals
234// it, and pushes it through the broker. The synthesized rkey just
235// needs to be unique within our event log; the appview keys its rows
236// on (spindle, rkey) so we mix in time + workflow + sequence to avoid
237// collisions across concurrent workflows on the same pipeline.
238func (p *fakeProvider) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error {
239 rec := tangled.PipelineStatus{
240 LexiconTypeID: tangled.PipelineStatusNSID,
241 Pipeline: pipelineURI,
242 Workflow: workflow,
243 Status: status,
244 CreatedAt: time.Now().UTC().Format(time.RFC3339),
245 }
246 body, err := json.Marshal(rec)
247 if err != nil {
248 return fmt.Errorf("marshal pipeline.status: %w", err)
249 }
250 rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq)
251 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
252 return fmt.Errorf("publish pipeline.status: %w", err)
253 }
254 return nil
255}
256
257// appendLogLine records line as the next captured frame for the
258// workflow, allocating a slice on first use. Holding the mutex across
259// the append ensures a concurrent Logs() snapshot doesn't observe a
260// half-grown slice header.
261func (p *fakeProvider) appendLogLine(knot, pipelineRkey, workflow string, line LogLine) {
262 key := fakeLogKey(knot, pipelineRkey, workflow)
263 p.mu.Lock()
264 defer p.mu.Unlock()
265 p.logs[key] = append(p.logs[key], line)
266}
267
268// fakeLogKey is the canonical map key for the in-memory log buffer.
269// Centralised so reader and writer can't drift on separator choice.
270func fakeLogKey(knot, pipelineRkey, workflow string) string {
271 return knot + "\x00" + pipelineRkey + "\x00" + workflow
272}