Stitch any CI into Tangled
1package main
2
3// fakeProvider is a stand-in Provider implementation: it doesn't talk
4// to any external CI. For each workflow in a triggered pipeline it
5// spawns a goroutine that emits a fixed-cadence stream of
6// sh.tangled.pipeline.status records — "running" every five seconds
7// for thirty seconds, then a final "success" — through the broker.
8// In parallel it appends synthetic LogLine records into an in-memory
9// buffer, so /logs queries against a fake run replay something that
10// matches the upstream spindle's wire format.
11//
12// The point is to exercise the entire trigger → broker → /events →
13// appview path end-to-end before any real CI integration exists. Once
14// the Buildkite provider lands, this one stays around as a reference
15// implementation and as the test double of choice when a test wants
16// "something that publishes plausible status updates" without the
17// timing weight of real builds.
18
19import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "log/slog"
24 "sync"
25 "time"
26
27 "tangled.org/core/api/tangled"
28)
29
30// Fake-job timing knobs. Pulled out as constants so it's obvious
31// where the numbers come from and they can be tuned independently of
32// the rest of the file. Total publishes per workflow =
33// (fakeJobDuration / fakeJobInterval) heartbeats + 1 final success.
34const (
35 fakeJobDuration = 30 * time.Second
36 fakeJobInterval = 5 * time.Second
37)
38
39// fakeLogChanBuffer is the size of the buffered channel returned by
40// fakeProvider.Logs. Big enough that a snapshot of a completed fake
41// run (≈ 8 lines today) drops in without blocking the producer
42// goroutine even if the consumer is briefly slow; small enough that
43// a misbehaving consumer can't pin a runaway amount of memory.
44const fakeLogChanBuffer = 64
45
46// fakeProvider implements Provider against the in-process broker.
47//
48// logs holds the captured per-workflow LogLine slices, keyed by the
49// fakeLogKey of (knot, pipelineRkey, workflow). The mutex guards the
50// map *and* concurrent appends to any contained slice — runWorkflow
51// is the only writer for a given key, but Logs() may snapshot the
52// slice concurrently.
53type fakeProvider struct {
54 br *broker
55 log *slog.Logger
56
57 mu sync.Mutex
58 logs map[string][]LogLine
59}
60
61// Compile-time interface check — keeps the fake honest if Provider
62// ever gains additional methods.
63var _ Provider = (*fakeProvider)(nil)
64
65// newFakeProvider constructs a fakeProvider bound to br. The provided
66// logger is annotated with component=provider so its output stands
67// apart from the knot-consumer / jetstream noise.
68func newFakeProvider(br *broker, log *slog.Logger) *fakeProvider {
69 return &fakeProvider{
70 br: br,
71 log: log.With("component", "provider", "kind", "fake"),
72 logs: make(map[string][]LogLine),
73 }
74}
75
76// Spawn satisfies Provider. It kicks off one runWorkflow goroutine per
77// workflow, returning immediately so the eventconsumer worker that
78// invoked us isn't blocked. Goroutines inherit ctx (app-scoped) and
79// will exit early on cancellation.
80func (p *fakeProvider) Spawn(
81 ctx context.Context,
82 knot string,
83 pipelineRkey string,
84 _ string,
85 _ *tangled.Pipeline_TriggerMetadata,
86 workflows []*tangled.Pipeline_Workflow,
87) {
88 if len(workflows) == 0 {
89 // Without a workflow name there's no valid pipeline.status
90 // record to publish. Log loudly enough that an operator
91 // staring at the logs can tell the trigger arrived but
92 // produced no fake activity.
93 p.log.Warn("pipeline has no workflows; skipping fake run",
94 "knot", knot, "rkey", pipelineRkey,
95 )
96 return
97 }
98 for _, wf := range workflows {
99 // Defensive: the lexicon allows pointer entries and doesn't
100 // enforce non-empty names. We can't publish a status for an
101 // unnamed workflow, so just skip it.
102 if wf == nil || wf.Name == "" {
103 continue
104 }
105 go p.runWorkflow(ctx, knot, pipelineRkey, wf.Name)
106 }
107}
108
109// Logs satisfies Provider. It snapshots the in-memory LogLine slice
110// for (knot, pipelineRkey, workflow) and returns a buffered channel
111// that a goroutine drains the snapshot into. The snapshot is taken
112// under the mutex so subsequent appends by a still-running workflow
113// won't be reflected — clients that want live updates re-poll. We
114// don't follow live writes here; the fake's purpose is to exercise
115// the wire format end-to-end, not to reproduce hpcloud/tail.
116//
117// The producer goroutine honours ctx so that a disconnecting HTTP
118// client tears the channel send down promptly instead of waiting on
119// an unbuffered receiver.
120func (p *fakeProvider) Logs(
121 ctx context.Context,
122 knot string,
123 pipelineRkey string,
124 workflow string,
125) (<-chan LogLine, error) {
126 key := fakeLogKey(knot, pipelineRkey, workflow)
127
128 p.mu.Lock()
129 stored, ok := p.logs[key]
130 if !ok {
131 p.mu.Unlock()
132 return nil, ErrLogsNotFound
133 }
134 // Copy under the lock so subsequent appendLogLine writes can't
135 // race with our drain goroutine reading the slice header.
136 snapshot := make([]LogLine, len(stored))
137 copy(snapshot, stored)
138 p.mu.Unlock()
139
140 out := make(chan LogLine, fakeLogChanBuffer)
141 go func() {
142 defer close(out)
143 for _, line := range snapshot {
144 select {
145 case <-ctx.Done():
146 return
147 case out <- line:
148 }
149 }
150 }()
151 return out, nil
152}
153
154// runWorkflow emits a "running" status every fakeJobInterval until
155// fakeJobDuration elapses, then a final "success". Alongside each
156// status it appends a corresponding LogLine into the in-memory log
157// buffer so /logs returns something coherent.
158//
159// On ctx cancellation it returns without issuing the terminal publish
160// or the closing control frame — the broker's underlying store may
161// already be closing during shutdown.
162func (p *fakeProvider) runWorkflow(ctx context.Context, knot, pipelineRkey, workflow string) {
163 // pipelineURI is what the appview parses out of the status record
164 // to associate it back with the originating pipeline. Format
165 // mirrors the upstream spindle's emission:
166 // at://did:web:<knot>/<nsid>/<rkey>. The appview strips the
167 // did:web: prefix and treats the remainder as the knot identifier.
168 pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s",
169 knot, tangled.PipelineNSID, pipelineRkey,
170 )
171
172 logger := p.log.With(
173 "knot", knot,
174 "pipeline_rkey", pipelineRkey,
175 "workflow", workflow,
176 )
177
178 // Start control frame. The appview's log renderer keys timing on
179 // matching start/end control frames per step_id, so we always
180 // emit one even though the fake has only a single synthetic step.
181 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
182 Kind: LogKindControl,
183 Time: time.Now(),
184 Content: workflow,
185 StepId: 0,
186 StepStatus: StepStatusStart,
187 })
188
189 // Heartbeat phase. seq doubles as a per-workflow disambiguator
190 // in the synthesized status rkey so concurrent fakes (across
191 // workflows or pipelines) don't collide.
192 deadline := time.Now().Add(fakeJobDuration)
193 seq := 0
194 for time.Now().Before(deadline) {
195 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
196 Kind: LogKindData,
197 Time: time.Now(),
198 Content: fmt.Sprintf("[fake] heartbeat %d at %s\n",
199 seq, time.Now().UTC().Format(time.RFC3339),
200 ),
201 StepId: 0,
202 Stream: "stdout",
203 })
204 if err := p.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil {
205 logger.Error("publish fake running status", "err", err, "seq", seq)
206 return
207 }
208 seq++
209 select {
210 case <-ctx.Done():
211 logger.Debug("fake job cancelled mid-run", "seq", seq)
212 return
213 case <-time.After(fakeJobInterval):
214 }
215 }
216
217 // End control + terminal status publish. "success" matches the
218 // upstream StatusKind enum (see tangled.org/core/spindle/models)
219 // — the appview routes status strings through that same enum.
220 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
221 Kind: LogKindControl,
222 Time: time.Now(),
223 Content: workflow,
224 StepId: 0,
225 StepStatus: StepStatusEnd,
226 })
227 if err := p.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil {
228 logger.Error("publish fake success status", "err", err, "seq", seq)
229 return
230 }
231 logger.Info("fake job complete")
232}
233
234// publishStatus assembles a tangled.PipelineStatus record, marshals
235// it, and pushes it through the broker. The synthesized rkey just
236// needs to be unique within our event log; the appview keys its rows
237// on (spindle, rkey) so we mix in time + workflow + sequence to avoid
238// collisions across concurrent workflows on the same pipeline.
239func (p *fakeProvider) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error {
240 rec := tangled.PipelineStatus{
241 LexiconTypeID: tangled.PipelineStatusNSID,
242 Pipeline: pipelineURI,
243 Workflow: workflow,
244 Status: status,
245 CreatedAt: time.Now().UTC().Format(time.RFC3339),
246 }
247 body, err := json.Marshal(rec)
248 if err != nil {
249 return fmt.Errorf("marshal pipeline.status: %w", err)
250 }
251 rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq)
252 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
253 return fmt.Errorf("publish pipeline.status: %w", err)
254 }
255 return nil
256}
257
258// appendLogLine records line as the next captured frame for the
259// workflow, allocating a slice on first use. Holding the mutex across
260// the append ensures a concurrent Logs() snapshot doesn't observe a
261// half-grown slice header.
262func (p *fakeProvider) appendLogLine(knot, pipelineRkey, workflow string, line LogLine) {
263 key := fakeLogKey(knot, pipelineRkey, workflow)
264 p.mu.Lock()
265 defer p.mu.Unlock()
266 p.logs[key] = append(p.logs[key], line)
267}
268
269// fakeLogKey is the canonical map key for the in-memory log buffer.
270// Centralised so reader and writer can't drift on separator choice.
271func fakeLogKey(knot, pipelineRkey, workflow string) string {
272 return knot + "\x00" + pipelineRkey + "\x00" + workflow
273}