Stitch any CI into Tangled
1package k8s
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "strings"
8 "sync"
9 "time"
10)
11
12// FakeClient is a tiny in-memory implementation used by Tekton tests. It keeps
13// the test surface aligned with the real client package without reintroducing
14// external Kubernetes fakes.
15type FakeClient struct {
16 mu sync.Mutex
17 nextUID int64
18 nextWatchID int
19 objects map[objectKey]Object
20 watchers map[int]*watchRegistration
21 pods map[string]map[string]Pod
22 podLogs map[podLogKey]string
23}
24
25var _ Client = (*FakeClient)(nil)
26
27type objectKey struct {
28 gvr GVR
29 namespace string
30 name string
31}
32
33type podLogKey struct {
34 namespace string
35 podName string
36 container string
37}
38
39type watchRegistration struct {
40 gvr GVR
41 namespace string
42 opts ListOptions
43 watch *fakeWatch
44}
45
46// NewFakeClient returns an empty fake client.
47func NewFakeClient() *FakeClient {
48 return &FakeClient{
49 objects: map[objectKey]Object{},
50 watchers: map[int]*watchRegistration{},
51 pods: map[string]map[string]Pod{},
52 podLogs: map[podLogKey]string{},
53 }
54}
55
56// SeedObject preloads an object without emitting watch events.
57func (c *FakeClient) SeedObject(gvr GVR, namespace string, obj Object) {
58 c.mu.Lock()
59 defer c.mu.Unlock()
60 stored := c.prepareObjectLocked(namespace, obj.DeepCopy())
61 c.objects[objectKey{gvr: gvr, namespace: namespace, name: stored.GetName()}] = stored
62}
63
64// UpdateObject overwrites an existing object and emits a MODIFIED watch event.
65func (c *FakeClient) UpdateObject(gvr GVR, namespace string, obj Object) {
66 stored, watchers := c.storeObject(gvr, namespace, obj.DeepCopy())
67 c.broadcast(watchers, WatchEvent{Type: "MODIFIED", Object: stored})
68}
69
70// SeedPod preloads a pod for later ListPods or StreamPodLogs calls.
71func (c *FakeClient) SeedPod(namespace string, pod Pod) {
72 c.mu.Lock()
73 defer c.mu.Unlock()
74 if namespace == "" {
75 namespace = pod.Namespace
76 }
77 if namespace == "" {
78 namespace = "default"
79 }
80 pod.Namespace = namespace
81 if pod.CreationTimestamp.IsZero() {
82 pod.CreationTimestamp = time.Now().UTC()
83 }
84 if pod.Labels != nil {
85 pod.Labels = cloneLabels(pod.Labels)
86 }
87 if _, ok := c.pods[namespace]; !ok {
88 c.pods[namespace] = map[string]Pod{}
89 }
90 c.pods[namespace][pod.Name] = clonePod(pod)
91}
92
93// SetPodLog stores the exact bytes StreamPodLogs should later return.
94func (c *FakeClient) SetPodLog(namespace, podName, container, content string) {
95 c.mu.Lock()
96 defer c.mu.Unlock()
97 c.podLogs[podLogKey{
98 namespace: namespace,
99 podName: podName,
100 container: container,
101 }] = content
102}
103
104func (c *FakeClient) CreateObject(
105 ctx context.Context,
106 gvr GVR,
107 namespace string,
108 obj Object,
109) (Object, error) {
110 select {
111 case <-ctx.Done():
112 return nil, ctx.Err()
113 default:
114 }
115
116 stored, watchers, err := c.createObject(gvr, namespace, obj.DeepCopy())
117 if err != nil {
118 return nil, err
119 }
120 c.broadcast(watchers, WatchEvent{Type: "ADDED", Object: stored})
121 return stored.DeepCopy(), nil
122}
123
124func (c *FakeClient) GetObject(
125 ctx context.Context,
126 gvr GVR,
127 namespace string,
128 name string,
129) (Object, error) {
130 select {
131 case <-ctx.Done():
132 return nil, ctx.Err()
133 default:
134 }
135
136 c.mu.Lock()
137 defer c.mu.Unlock()
138 obj, ok := c.objects[objectKey{gvr: gvr, namespace: namespace, name: name}]
139 if !ok {
140 return nil, fmt.Errorf("%w: %s/%s", ErrNotFound, namespace, name)
141 }
142 return obj.DeepCopy(), nil
143}
144
145func (c *FakeClient) ListObjects(
146 ctx context.Context,
147 gvr GVR,
148 namespace string,
149 opts ListOptions,
150) ([]Object, error) {
151 select {
152 case <-ctx.Done():
153 return nil, ctx.Err()
154 default:
155 }
156
157 c.mu.Lock()
158 defer c.mu.Unlock()
159 out := make([]Object, 0)
160 for key, obj := range c.objects {
161 if key.gvr != gvr || key.namespace != namespace {
162 continue
163 }
164 if !matchesListOptions(obj, opts) {
165 continue
166 }
167 out = append(out, obj.DeepCopy())
168 }
169 return out, nil
170}
171
172func (c *FakeClient) WatchObjects(
173 ctx context.Context,
174 gvr GVR,
175 namespace string,
176 opts ListOptions,
177) (WatchInterface, error) {
178 c.mu.Lock()
179 id := c.nextWatchID
180 c.nextWatchID++
181 w := &fakeWatch{ch: make(chan WatchEvent, 64)}
182 w.stop = func() {
183 c.mu.Lock()
184 defer c.mu.Unlock()
185 delete(c.watchers, id)
186 }
187 c.watchers[id] = &watchRegistration{
188 gvr: gvr,
189 namespace: namespace,
190 opts: opts,
191 watch: w,
192 }
193 c.mu.Unlock()
194
195 go func() {
196 <-ctx.Done()
197 w.Stop()
198 }()
199 return w, nil
200}
201
202func (c *FakeClient) ListPods(
203 ctx context.Context,
204 namespace string,
205 labelSelector string,
206) ([]Pod, error) {
207 select {
208 case <-ctx.Done():
209 return nil, ctx.Err()
210 default:
211 }
212
213 c.mu.Lock()
214 defer c.mu.Unlock()
215 entries := c.pods[namespace]
216 out := make([]Pod, 0, len(entries))
217 for _, pod := range entries {
218 if !matchesSelector(pod.Labels, labelSelector) {
219 continue
220 }
221 out = append(out, clonePod(pod))
222 }
223 return out, nil
224}
225
226func (c *FakeClient) StreamPodLogs(
227 ctx context.Context,
228 namespace string,
229 podName string,
230 container string,
231 opts LogOptions,
232) (io.ReadCloser, error) {
233 select {
234 case <-ctx.Done():
235 return nil, ctx.Err()
236 default:
237 }
238
239 c.mu.Lock()
240 raw, ok := c.podLogs[podLogKey{
241 namespace: namespace,
242 podName: podName,
243 container: container,
244 }]
245 c.mu.Unlock()
246 if !ok {
247 return nil, fmt.Errorf(
248 "%w: pod log %s/%s[%s]",
249 ErrNotFound, namespace, podName, container,
250 )
251 }
252 if !opts.Follow {
253 return io.NopCloser(strings.NewReader(raw)), nil
254 }
255 // In Follow mode, mirror the kube apiserver: yield the seeded bytes,
256 // then keep the stream open until ctx is cancelled. Tests that exercise
257 // the live-streaming path drive termination by cancelling the context;
258 // tests that want a one-shot snapshot should leave Follow=false.
259 return &fakeFollowReader{rest: raw, ctx: ctx}, nil
260}
261
262// fakeFollowReader returns the seeded log bytes, then blocks on ctx.Done()
263// before EOFing. This matches the apiserver behaviour for follow=true:
264// the connection only closes once the container terminates (or the caller
265// hangs up), so callers using bufio.Scanner do not see a premature EOF and
266// treat a still-running container as completed.
267type fakeFollowReader struct {
268 rest string
269 ctx context.Context
270 done bool
271}
272
273var _ io.ReadCloser = (*fakeFollowReader)(nil)
274
275func (r *fakeFollowReader) Read(p []byte) (int, error) {
276 if len(r.rest) > 0 {
277 n := copy(p, r.rest)
278 r.rest = r.rest[n:]
279 return n, nil
280 }
281 if r.done {
282 return 0, io.EOF
283 }
284 r.done = true
285 <-r.ctx.Done()
286 return 0, io.EOF
287}
288
289func (r *fakeFollowReader) Close() error { return nil }
290
291func (c *FakeClient) createObject(
292 gvr GVR,
293 namespace string,
294 obj Object,
295) (Object, []*fakeWatch, error) {
296 c.mu.Lock()
297 defer c.mu.Unlock()
298 stored := c.prepareObjectLocked(namespace, obj)
299 key := objectKey{gvr: gvr, namespace: namespace, name: stored.GetName()}
300 if _, ok := c.objects[key]; ok {
301 return nil, nil, fmt.Errorf("%w: %s/%s", ErrAlreadyExists, namespace, key.name)
302 }
303 c.objects[key] = stored
304 return stored.DeepCopy(), c.matchingWatchersLocked(gvr, namespace, stored), nil
305}
306
307func (c *FakeClient) storeObject(
308 gvr GVR,
309 namespace string,
310 obj Object,
311) (Object, []*fakeWatch) {
312 c.mu.Lock()
313 defer c.mu.Unlock()
314 stored := c.prepareObjectLocked(namespace, obj)
315 key := objectKey{gvr: gvr, namespace: namespace, name: stored.GetName()}
316 c.objects[key] = stored
317 return stored.DeepCopy(), c.matchingWatchersLocked(gvr, namespace, stored)
318}
319
320func (c *FakeClient) prepareObjectLocked(namespace string, obj Object) Object {
321 if obj.GetName() == "" {
322 panic("k8s.FakeClient: object missing metadata.name")
323 }
324 meta := ensureNestedMap(obj, "metadata")
325 if namespace != "" && meta["namespace"] == nil {
326 meta["namespace"] = namespace
327 }
328 if obj.GetUID() == "" {
329 c.nextUID++
330 obj.SetUID(fmt.Sprintf("fake-%d", c.nextUID))
331 }
332 if obj.GetCreationTimestamp().IsZero() {
333 obj.SetCreationTimestamp(time.Now().UTC())
334 }
335 return obj
336}
337
338func (c *FakeClient) matchingWatchersLocked(
339 gvr GVR,
340 namespace string,
341 obj Object,
342) []*fakeWatch {
343 watchers := make([]*fakeWatch, 0)
344 for _, registration := range c.watchers {
345 if registration.gvr != gvr || registration.namespace != namespace {
346 continue
347 }
348 if !matchesListOptions(obj, registration.opts) {
349 continue
350 }
351 watchers = append(watchers, registration.watch)
352 }
353 return watchers
354}
355
356func (c *FakeClient) broadcast(watchers []*fakeWatch, event WatchEvent) {
357 for _, watcher := range watchers {
358 watcher.send(event)
359 }
360}
361
362type fakeWatch struct {
363 ch chan WatchEvent
364 stop func()
365 once sync.Once
366}
367
368var _ WatchInterface = (*fakeWatch)(nil)
369
370func (w *fakeWatch) ResultChan() <-chan WatchEvent {
371 return w.ch
372}
373
374func (w *fakeWatch) Stop() {
375 w.once.Do(func() {
376 if w.stop != nil {
377 w.stop()
378 }
379 close(w.ch)
380 })
381}
382
383func (w *fakeWatch) send(event WatchEvent) {
384 defer func() {
385 _ = recover()
386 }()
387 w.ch <- WatchEvent{Type: event.Type, Object: event.Object.DeepCopy()}
388}
389
390func matchesListOptions(obj Object, opts ListOptions) bool {
391 if !matchesSelector(obj.GetLabels(), opts.LabelSelector) {
392 return false
393 }
394 if !matchesFieldSelector(obj, opts.FieldSelector) {
395 return false
396 }
397 return true
398}
399
400func matchesFieldSelector(obj Object, selector string) bool {
401 selector = strings.TrimSpace(selector)
402 if selector == "" {
403 return true
404 }
405 parts := strings.Split(selector, ",")
406 for _, part := range parts {
407 part = strings.TrimSpace(part)
408 if part == "" {
409 continue
410 }
411 key, value, ok := strings.Cut(part, "=")
412 if !ok {
413 return false
414 }
415 if strings.TrimSpace(key) != "metadata.name" {
416 return false
417 }
418 if obj.GetName() != strings.TrimSpace(value) {
419 return false
420 }
421 }
422 return true
423}
424
425func matchesSelector(labels map[string]string, selector string) bool {
426 selector = strings.TrimSpace(selector)
427 if selector == "" {
428 return true
429 }
430 parts := strings.Split(selector, ",")
431 for _, part := range parts {
432 part = strings.TrimSpace(part)
433 if part == "" {
434 continue
435 }
436 key, value, ok := strings.Cut(part, "=")
437 if !ok {
438 return false
439 }
440 if labels[strings.TrimSpace(key)] != strings.TrimSpace(value) {
441 return false
442 }
443 }
444 return true
445}
446
447func clonePod(pod Pod) Pod {
448 out := pod
449 out.Labels = cloneLabels(pod.Labels)
450 out.InitContainers = append([]Container(nil), pod.InitContainers...)
451 out.Containers = append([]Container(nil), pod.Containers...)
452 return out
453}