Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// fakeProvider is a stand-in Provider implementation: it doesn't talk 4// to any external CI. For each workflow in a triggered pipeline it 5// spawns a goroutine that emits a fixed-cadence stream of 6// sh.tangled.pipeline.status records — "running" every five seconds 7// for thirty seconds, then a final "success" — through the broker. 8// In parallel it appends synthetic LogLine records into an in-memory 9// buffer, so /logs queries against a fake run replay something that 10// matches the upstream spindle's wire format. 11// 12// The point is to exercise the entire trigger → broker → /events → 13// appview path end-to-end before any real CI integration exists. Once 14// the Buildkite provider lands, this one stays around as a reference 15// implementation and as the test double of choice when a test wants 16// "something that publishes plausible status updates" without the 17// timing weight of real builds. 18 19import ( 20 "context" 21 "encoding/json" 22 "fmt" 23 "log/slog" 24 "sync" 25 "time" 26 27 "tangled.org/core/api/tangled" 28) 29 30// Fake-job timing knobs. Pulled out as constants so it's obvious 31// where the numbers come from and they can be tuned independently of 32// the rest of the file. Total publishes per workflow = 33// (fakeJobDuration / fakeJobInterval) heartbeats + 1 final success. 34const ( 35 fakeJobDuration = 30 * time.Second 36 fakeJobInterval = 5 * time.Second 37) 38 39// fakeLogChanBuffer is the size of the buffered channel returned by 40// fakeProvider.Logs. Big enough that a snapshot of a completed fake 41// run (≈ 8 lines today) drops in without blocking the producer 42// goroutine even if the consumer is briefly slow; small enough that 43// a misbehaving consumer can't pin a runaway amount of memory. 44const fakeLogChanBuffer = 64 45 46// fakeProvider implements Provider against the in-process broker. 47// 48// logs holds the captured per-workflow LogLine slices, keyed by the 49// fakeLogKey of (knot, pipelineRkey, workflow). The mutex guards the 50// map *and* concurrent appends to any contained slice — runWorkflow 51// is the only writer for a given key, but Logs() may snapshot the 52// slice concurrently. 53type fakeProvider struct { 54 br *broker 55 log *slog.Logger 56 57 mu sync.Mutex 58 logs map[string][]LogLine 59} 60 61// Compile-time interface check — keeps the fake honest if Provider 62// ever gains additional methods. 63var _ Provider = (*fakeProvider)(nil) 64 65// newFakeProvider constructs a fakeProvider bound to br. The provided 66// logger is annotated with component=provider so its output stands 67// apart from the knot-consumer / jetstream noise. 68func newFakeProvider(br *broker, log *slog.Logger) *fakeProvider { 69 return &fakeProvider{ 70 br: br, 71 log: log.With("component", "provider", "kind", "fake"), 72 logs: make(map[string][]LogLine), 73 } 74} 75 76// Spawn satisfies Provider. It kicks off one runWorkflow goroutine per 77// workflow, returning immediately so the eventconsumer worker that 78// invoked us isn't blocked. Goroutines inherit ctx (app-scoped) and 79// will exit early on cancellation. 80func (p *fakeProvider) Spawn( 81 ctx context.Context, 82 knot string, 83 pipelineRkey string, 84 _ string, 85 _ *tangled.Pipeline_TriggerMetadata, 86 workflows []*tangled.Pipeline_Workflow, 87) { 88 if len(workflows) == 0 { 89 // Without a workflow name there's no valid pipeline.status 90 // record to publish. Log loudly enough that an operator 91 // staring at the logs can tell the trigger arrived but 92 // produced no fake activity. 93 p.log.Warn("pipeline has no workflows; skipping fake run", 94 "knot", knot, "rkey", pipelineRkey, 95 ) 96 return 97 } 98 for _, wf := range workflows { 99 // Defensive: the lexicon allows pointer entries and doesn't 100 // enforce non-empty names. We can't publish a status for an 101 // unnamed workflow, so just skip it. 102 if wf == nil || wf.Name == "" { 103 continue 104 } 105 go p.runWorkflow(ctx, knot, pipelineRkey, wf.Name) 106 } 107} 108 109// Logs satisfies Provider. It snapshots the in-memory LogLine slice 110// for (knot, pipelineRkey, workflow) and returns a buffered channel 111// that a goroutine drains the snapshot into. The snapshot is taken 112// under the mutex so subsequent appends by a still-running workflow 113// won't be reflected — clients that want live updates re-poll. We 114// don't follow live writes here; the fake's purpose is to exercise 115// the wire format end-to-end, not to reproduce hpcloud/tail. 116// 117// The producer goroutine honours ctx so that a disconnecting HTTP 118// client tears the channel send down promptly instead of waiting on 119// an unbuffered receiver. 120func (p *fakeProvider) Logs( 121 ctx context.Context, 122 knot string, 123 pipelineRkey string, 124 workflow string, 125) (<-chan LogLine, error) { 126 key := fakeLogKey(knot, pipelineRkey, workflow) 127 128 p.mu.Lock() 129 stored, ok := p.logs[key] 130 if !ok { 131 p.mu.Unlock() 132 return nil, ErrLogsNotFound 133 } 134 // Copy under the lock so subsequent appendLogLine writes can't 135 // race with our drain goroutine reading the slice header. 136 snapshot := make([]LogLine, len(stored)) 137 copy(snapshot, stored) 138 p.mu.Unlock() 139 140 out := make(chan LogLine, fakeLogChanBuffer) 141 go func() { 142 defer close(out) 143 for _, line := range snapshot { 144 select { 145 case <-ctx.Done(): 146 return 147 case out <- line: 148 } 149 } 150 }() 151 return out, nil 152} 153 154// runWorkflow emits a "running" status every fakeJobInterval until 155// fakeJobDuration elapses, then a final "success". Alongside each 156// status it appends a corresponding LogLine into the in-memory log 157// buffer so /logs returns something coherent. 158// 159// On ctx cancellation it returns without issuing the terminal publish 160// or the closing control frame — the broker's underlying store may 161// already be closing during shutdown. 162func (p *fakeProvider) runWorkflow(ctx context.Context, knot, pipelineRkey, workflow string) { 163 // pipelineURI is what the appview parses out of the status record 164 // to associate it back with the originating pipeline. Format 165 // mirrors the upstream spindle's emission: 166 // at://did:web:<knot>/<nsid>/<rkey>. The appview strips the 167 // did:web: prefix and treats the remainder as the knot identifier. 168 pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s", 169 knot, tangled.PipelineNSID, pipelineRkey, 170 ) 171 172 logger := p.log.With( 173 "knot", knot, 174 "pipeline_rkey", pipelineRkey, 175 "workflow", workflow, 176 ) 177 178 // Start control frame. The appview's log renderer keys timing on 179 // matching start/end control frames per step_id, so we always 180 // emit one even though the fake has only a single synthetic step. 181 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{ 182 Kind: LogKindControl, 183 Time: time.Now(), 184 Content: workflow, 185 StepId: 0, 186 StepStatus: StepStatusStart, 187 }) 188 189 // Heartbeat phase. seq doubles as a per-workflow disambiguator 190 // in the synthesized status rkey so concurrent fakes (across 191 // workflows or pipelines) don't collide. 192 deadline := time.Now().Add(fakeJobDuration) 193 seq := 0 194 for time.Now().Before(deadline) { 195 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{ 196 Kind: LogKindData, 197 Time: time.Now(), 198 Content: fmt.Sprintf("[fake] heartbeat %d at %s\n", 199 seq, time.Now().UTC().Format(time.RFC3339), 200 ), 201 StepId: 0, 202 Stream: "stdout", 203 }) 204 if err := p.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil { 205 logger.Error("publish fake running status", "err", err, "seq", seq) 206 return 207 } 208 seq++ 209 select { 210 case <-ctx.Done(): 211 logger.Debug("fake job cancelled mid-run", "seq", seq) 212 return 213 case <-time.After(fakeJobInterval): 214 } 215 } 216 217 // End control + terminal status publish. "success" matches the 218 // upstream StatusKind enum (see tangled.org/core/spindle/models) 219 // — the appview routes status strings through that same enum. 220 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{ 221 Kind: LogKindControl, 222 Time: time.Now(), 223 Content: workflow, 224 StepId: 0, 225 StepStatus: StepStatusEnd, 226 }) 227 if err := p.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil { 228 logger.Error("publish fake success status", "err", err, "seq", seq) 229 return 230 } 231 logger.Info("fake job complete") 232} 233 234// publishStatus assembles a tangled.PipelineStatus record, marshals 235// it, and pushes it through the broker. The synthesized rkey just 236// needs to be unique within our event log; the appview keys its rows 237// on (spindle, rkey) so we mix in time + workflow + sequence to avoid 238// collisions across concurrent workflows on the same pipeline. 239func (p *fakeProvider) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error { 240 rec := tangled.PipelineStatus{ 241 LexiconTypeID: tangled.PipelineStatusNSID, 242 Pipeline: pipelineURI, 243 Workflow: workflow, 244 Status: status, 245 CreatedAt: time.Now().UTC().Format(time.RFC3339), 246 } 247 body, err := json.Marshal(rec) 248 if err != nil { 249 return fmt.Errorf("marshal pipeline.status: %w", err) 250 } 251 rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq) 252 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 253 return fmt.Errorf("publish pipeline.status: %w", err) 254 } 255 return nil 256} 257 258// appendLogLine records line as the next captured frame for the 259// workflow, allocating a slice on first use. Holding the mutex across 260// the append ensures a concurrent Logs() snapshot doesn't observe a 261// half-grown slice header. 262func (p *fakeProvider) appendLogLine(knot, pipelineRkey, workflow string, line LogLine) { 263 key := fakeLogKey(knot, pipelineRkey, workflow) 264 p.mu.Lock() 265 defer p.mu.Unlock() 266 p.logs[key] = append(p.logs[key], line) 267} 268 269// fakeLogKey is the canonical map key for the in-memory log buffer. 270// Centralised so reader and writer can't drift on separator choice. 271func fakeLogKey(knot, pipelineRkey, workflow string) string { 272 return knot + "\x00" + pipelineRkey + "\x00" + workflow 273}