Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3// fakeProvider is a stand-in Provider implementation: it doesn't talk 4// to any external CI. For each workflow in a triggered pipeline it 5// spawns a goroutine that emits a fixed-cadence stream of 6// sh.tangled.pipeline.status records — "running" every five seconds 7// for thirty seconds, then a final "success" — through the broker. 8// In parallel it appends synthetic LogLine records into an in-memory 9// buffer, so /logs queries against a fake run replay something that 10// matches the upstream spindle's wire format. 11// 12// The point is to exercise the entire trigger → broker → /events → 13// appview path end-to-end before any real CI integration exists. Once 14// the Buildkite provider lands, this one stays around as a reference 15// implementation and as the test double of choice when a test wants 16// "something that publishes plausible status updates" without the 17// timing weight of real builds. 18 19import ( 20 "context" 21 "encoding/json" 22 "fmt" 23 "log/slog" 24 "sync" 25 "time" 26 27 "tangled.org/core/api/tangled" 28) 29 30// Fake-job timing knobs. Pulled out as constants so it's obvious 31// where the numbers come from and they can be tuned independently of 32// the rest of the file. Total publishes per workflow = 33// (fakeJobDuration / fakeJobInterval) heartbeats + 1 final success. 34const ( 35 fakeJobDuration = 30 * time.Second 36 fakeJobInterval = 5 * time.Second 37) 38 39// fakeLogChanBuffer is the size of the buffered channel returned by 40// fakeProvider.Logs. Big enough that a snapshot of a completed fake 41// run (≈ 8 lines today) drops in without blocking the producer 42// goroutine even if the consumer is briefly slow; small enough that 43// a misbehaving consumer can't pin a runaway amount of memory. 44const fakeLogChanBuffer = 64 45 46// fakeProvider implements Provider against the in-process broker. 47// 48// logs holds the captured per-workflow LogLine slices, keyed by the 49// fakeLogKey of (knot, pipelineRkey, workflow). The mutex guards the 50// map *and* concurrent appends to any contained slice — runWorkflow 51// is the only writer for a given key, but Logs() may snapshot the 52// slice concurrently. 53type fakeProvider struct { 54 br *broker 55 log *slog.Logger 56 57 mu sync.Mutex 58 logs map[string][]LogLine 59} 60 61// Compile-time interface check — keeps the fake honest if Provider 62// ever gains additional methods. 63var _ Provider = (*fakeProvider)(nil) 64 65// newFakeProvider constructs a fakeProvider bound to br. The provided 66// logger is annotated with component=provider so its output stands 67// apart from the knot-consumer / jetstream noise. 68func newFakeProvider(br *broker, log *slog.Logger) *fakeProvider { 69 return &fakeProvider{ 70 br: br, 71 log: log.With("component", "provider", "kind", "fake"), 72 logs: make(map[string][]LogLine), 73 } 74} 75 76// Spawn satisfies Provider. It kicks off one runWorkflow goroutine per 77// workflow, returning immediately so the eventconsumer worker that 78// invoked us isn't blocked. Goroutines inherit ctx (app-scoped) and 79// will exit early on cancellation. 80func (p *fakeProvider) Spawn( 81 ctx context.Context, 82 knot string, 83 pipelineRkey string, 84 _ *tangled.Pipeline_TriggerMetadata, 85 workflows []*tangled.Pipeline_Workflow, 86) { 87 if len(workflows) == 0 { 88 // Without a workflow name there's no valid pipeline.status 89 // record to publish. Log loudly enough that an operator 90 // staring at the logs can tell the trigger arrived but 91 // produced no fake activity. 92 p.log.Warn("pipeline has no workflows; skipping fake run", 93 "knot", knot, "rkey", pipelineRkey, 94 ) 95 return 96 } 97 for _, wf := range workflows { 98 // Defensive: the lexicon allows pointer entries and doesn't 99 // enforce non-empty names. We can't publish a status for an 100 // unnamed workflow, so just skip it. 101 if wf == nil || wf.Name == "" { 102 continue 103 } 104 go p.runWorkflow(ctx, knot, pipelineRkey, wf.Name) 105 } 106} 107 108// Logs satisfies Provider. It snapshots the in-memory LogLine slice 109// for (knot, pipelineRkey, workflow) and returns a buffered channel 110// that a goroutine drains the snapshot into. The snapshot is taken 111// under the mutex so subsequent appends by a still-running workflow 112// won't be reflected — clients that want live updates re-poll. We 113// don't follow live writes here; the fake's purpose is to exercise 114// the wire format end-to-end, not to reproduce hpcloud/tail. 115// 116// The producer goroutine honours ctx so that a disconnecting HTTP 117// client tears the channel send down promptly instead of waiting on 118// an unbuffered receiver. 119func (p *fakeProvider) Logs( 120 ctx context.Context, 121 knot string, 122 pipelineRkey string, 123 workflow string, 124) (<-chan LogLine, error) { 125 key := fakeLogKey(knot, pipelineRkey, workflow) 126 127 p.mu.Lock() 128 stored, ok := p.logs[key] 129 if !ok { 130 p.mu.Unlock() 131 return nil, ErrLogsNotFound 132 } 133 // Copy under the lock so subsequent appendLogLine writes can't 134 // race with our drain goroutine reading the slice header. 135 snapshot := make([]LogLine, len(stored)) 136 copy(snapshot, stored) 137 p.mu.Unlock() 138 139 out := make(chan LogLine, fakeLogChanBuffer) 140 go func() { 141 defer close(out) 142 for _, line := range snapshot { 143 select { 144 case <-ctx.Done(): 145 return 146 case out <- line: 147 } 148 } 149 }() 150 return out, nil 151} 152 153// runWorkflow emits a "running" status every fakeJobInterval until 154// fakeJobDuration elapses, then a final "success". Alongside each 155// status it appends a corresponding LogLine into the in-memory log 156// buffer so /logs returns something coherent. 157// 158// On ctx cancellation it returns without issuing the terminal publish 159// or the closing control frame — the broker's underlying store may 160// already be closing during shutdown. 161func (p *fakeProvider) runWorkflow(ctx context.Context, knot, pipelineRkey, workflow string) { 162 // pipelineURI is what the appview parses out of the status record 163 // to associate it back with the originating pipeline. Format 164 // mirrors the upstream spindle's emission: 165 // at://did:web:<knot>/<nsid>/<rkey>. The appview strips the 166 // did:web: prefix and treats the remainder as the knot identifier. 167 pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s", 168 knot, tangled.PipelineNSID, pipelineRkey, 169 ) 170 171 logger := p.log.With( 172 "knot", knot, 173 "pipeline_rkey", pipelineRkey, 174 "workflow", workflow, 175 ) 176 177 // Start control frame. The appview's log renderer keys timing on 178 // matching start/end control frames per step_id, so we always 179 // emit one even though the fake has only a single synthetic step. 180 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{ 181 Kind: LogKindControl, 182 Time: time.Now(), 183 Content: workflow, 184 StepId: 0, 185 StepStatus: StepStatusStart, 186 }) 187 188 // Heartbeat phase. seq doubles as a per-workflow disambiguator 189 // in the synthesized status rkey so concurrent fakes (across 190 // workflows or pipelines) don't collide. 191 deadline := time.Now().Add(fakeJobDuration) 192 seq := 0 193 for time.Now().Before(deadline) { 194 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{ 195 Kind: LogKindData, 196 Time: time.Now(), 197 Content: fmt.Sprintf("[fake] heartbeat %d at %s\n", 198 seq, time.Now().UTC().Format(time.RFC3339), 199 ), 200 StepId: 0, 201 Stream: "stdout", 202 }) 203 if err := p.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil { 204 logger.Error("publish fake running status", "err", err, "seq", seq) 205 return 206 } 207 seq++ 208 select { 209 case <-ctx.Done(): 210 logger.Debug("fake job cancelled mid-run", "seq", seq) 211 return 212 case <-time.After(fakeJobInterval): 213 } 214 } 215 216 // End control + terminal status publish. "success" matches the 217 // upstream StatusKind enum (see tangled.org/core/spindle/models) 218 // — the appview routes status strings through that same enum. 219 p.appendLogLine(knot, pipelineRkey, workflow, LogLine{ 220 Kind: LogKindControl, 221 Time: time.Now(), 222 Content: workflow, 223 StepId: 0, 224 StepStatus: StepStatusEnd, 225 }) 226 if err := p.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil { 227 logger.Error("publish fake success status", "err", err, "seq", seq) 228 return 229 } 230 logger.Info("fake job complete") 231} 232 233// publishStatus assembles a tangled.PipelineStatus record, marshals 234// it, and pushes it through the broker. The synthesized rkey just 235// needs to be unique within our event log; the appview keys its rows 236// on (spindle, rkey) so we mix in time + workflow + sequence to avoid 237// collisions across concurrent workflows on the same pipeline. 238func (p *fakeProvider) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error { 239 rec := tangled.PipelineStatus{ 240 LexiconTypeID: tangled.PipelineStatusNSID, 241 Pipeline: pipelineURI, 242 Workflow: workflow, 243 Status: status, 244 CreatedAt: time.Now().UTC().Format(time.RFC3339), 245 } 246 body, err := json.Marshal(rec) 247 if err != nil { 248 return fmt.Errorf("marshal pipeline.status: %w", err) 249 } 250 rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq) 251 if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 252 return fmt.Errorf("publish pipeline.status: %w", err) 253 } 254 return nil 255} 256 257// appendLogLine records line as the next captured frame for the 258// workflow, allocating a slice on first use. Holding the mutex across 259// the append ensures a concurrent Logs() snapshot doesn't observe a 260// half-grown slice header. 261func (p *fakeProvider) appendLogLine(knot, pipelineRkey, workflow string, line LogLine) { 262 key := fakeLogKey(knot, pipelineRkey, workflow) 263 p.mu.Lock() 264 defer p.mu.Unlock() 265 p.logs[key] = append(p.logs[key], line) 266} 267 268// fakeLogKey is the canonical map key for the in-memory log buffer. 269// Centralised so reader and writer can't drift on separator choice. 270func fakeLogKey(knot, pipelineRkey, workflow string) string { 271 return knot + "\x00" + pipelineRkey + "\x00" + workflow 272}