Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

at main 10 kB View raw
1package k8s 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "sync" 9 "time" 10) 11 12// FakeClient is a tiny in-memory implementation used by Tekton tests. It keeps 13// the test surface aligned with the real client package without reintroducing 14// external Kubernetes fakes. 15type FakeClient struct { 16 mu sync.Mutex 17 nextUID int64 18 nextWatchID int 19 objects map[objectKey]Object 20 watchers map[int]*watchRegistration 21 pods map[string]map[string]Pod 22 podLogs map[podLogKey]string 23} 24 25var _ Client = (*FakeClient)(nil) 26 27type objectKey struct { 28 gvr GVR 29 namespace string 30 name string 31} 32 33type podLogKey struct { 34 namespace string 35 podName string 36 container string 37} 38 39type watchRegistration struct { 40 gvr GVR 41 namespace string 42 opts ListOptions 43 watch *fakeWatch 44} 45 46// NewFakeClient returns an empty fake client. 47func NewFakeClient() *FakeClient { 48 return &FakeClient{ 49 objects: map[objectKey]Object{}, 50 watchers: map[int]*watchRegistration{}, 51 pods: map[string]map[string]Pod{}, 52 podLogs: map[podLogKey]string{}, 53 } 54} 55 56// SeedObject preloads an object without emitting watch events. 57func (c *FakeClient) SeedObject(gvr GVR, namespace string, obj Object) { 58 c.mu.Lock() 59 defer c.mu.Unlock() 60 stored := c.prepareObjectLocked(namespace, obj.DeepCopy()) 61 c.objects[objectKey{gvr: gvr, namespace: namespace, name: stored.GetName()}] = stored 62} 63 64// UpdateObject overwrites an existing object and emits a MODIFIED watch event. 65func (c *FakeClient) UpdateObject(gvr GVR, namespace string, obj Object) { 66 stored, watchers := c.storeObject(gvr, namespace, obj.DeepCopy()) 67 c.broadcast(watchers, WatchEvent{Type: "MODIFIED", Object: stored}) 68} 69 70// SeedPod preloads a pod for later ListPods or StreamPodLogs calls. 71func (c *FakeClient) SeedPod(namespace string, pod Pod) { 72 c.mu.Lock() 73 defer c.mu.Unlock() 74 if namespace == "" { 75 namespace = pod.Namespace 76 } 77 if namespace == "" { 78 namespace = "default" 79 } 80 pod.Namespace = namespace 81 if pod.CreationTimestamp.IsZero() { 82 pod.CreationTimestamp = time.Now().UTC() 83 } 84 if pod.Labels != nil { 85 pod.Labels = cloneLabels(pod.Labels) 86 } 87 if _, ok := c.pods[namespace]; !ok { 88 c.pods[namespace] = map[string]Pod{} 89 } 90 c.pods[namespace][pod.Name] = clonePod(pod) 91} 92 93// SetPodLog stores the exact bytes StreamPodLogs should later return. 94func (c *FakeClient) SetPodLog(namespace, podName, container, content string) { 95 c.mu.Lock() 96 defer c.mu.Unlock() 97 c.podLogs[podLogKey{ 98 namespace: namespace, 99 podName: podName, 100 container: container, 101 }] = content 102} 103 104func (c *FakeClient) CreateObject( 105 ctx context.Context, 106 gvr GVR, 107 namespace string, 108 obj Object, 109) (Object, error) { 110 select { 111 case <-ctx.Done(): 112 return nil, ctx.Err() 113 default: 114 } 115 116 stored, watchers, err := c.createObject(gvr, namespace, obj.DeepCopy()) 117 if err != nil { 118 return nil, err 119 } 120 c.broadcast(watchers, WatchEvent{Type: "ADDED", Object: stored}) 121 return stored.DeepCopy(), nil 122} 123 124func (c *FakeClient) GetObject( 125 ctx context.Context, 126 gvr GVR, 127 namespace string, 128 name string, 129) (Object, error) { 130 select { 131 case <-ctx.Done(): 132 return nil, ctx.Err() 133 default: 134 } 135 136 c.mu.Lock() 137 defer c.mu.Unlock() 138 obj, ok := c.objects[objectKey{gvr: gvr, namespace: namespace, name: name}] 139 if !ok { 140 return nil, fmt.Errorf("%w: %s/%s", ErrNotFound, namespace, name) 141 } 142 return obj.DeepCopy(), nil 143} 144 145func (c *FakeClient) ListObjects( 146 ctx context.Context, 147 gvr GVR, 148 namespace string, 149 opts ListOptions, 150) ([]Object, error) { 151 select { 152 case <-ctx.Done(): 153 return nil, ctx.Err() 154 default: 155 } 156 157 c.mu.Lock() 158 defer c.mu.Unlock() 159 out := make([]Object, 0) 160 for key, obj := range c.objects { 161 if key.gvr != gvr || key.namespace != namespace { 162 continue 163 } 164 if !matchesListOptions(obj, opts) { 165 continue 166 } 167 out = append(out, obj.DeepCopy()) 168 } 169 return out, nil 170} 171 172func (c *FakeClient) WatchObjects( 173 ctx context.Context, 174 gvr GVR, 175 namespace string, 176 opts ListOptions, 177) (WatchInterface, error) { 178 c.mu.Lock() 179 id := c.nextWatchID 180 c.nextWatchID++ 181 w := &fakeWatch{ch: make(chan WatchEvent, 64)} 182 w.stop = func() { 183 c.mu.Lock() 184 defer c.mu.Unlock() 185 delete(c.watchers, id) 186 } 187 c.watchers[id] = &watchRegistration{ 188 gvr: gvr, 189 namespace: namespace, 190 opts: opts, 191 watch: w, 192 } 193 c.mu.Unlock() 194 195 go func() { 196 <-ctx.Done() 197 w.Stop() 198 }() 199 return w, nil 200} 201 202func (c *FakeClient) ListPods( 203 ctx context.Context, 204 namespace string, 205 labelSelector string, 206) ([]Pod, error) { 207 select { 208 case <-ctx.Done(): 209 return nil, ctx.Err() 210 default: 211 } 212 213 c.mu.Lock() 214 defer c.mu.Unlock() 215 entries := c.pods[namespace] 216 out := make([]Pod, 0, len(entries)) 217 for _, pod := range entries { 218 if !matchesSelector(pod.Labels, labelSelector) { 219 continue 220 } 221 out = append(out, clonePod(pod)) 222 } 223 return out, nil 224} 225 226func (c *FakeClient) StreamPodLogs( 227 ctx context.Context, 228 namespace string, 229 podName string, 230 container string, 231 opts LogOptions, 232) (io.ReadCloser, error) { 233 select { 234 case <-ctx.Done(): 235 return nil, ctx.Err() 236 default: 237 } 238 239 c.mu.Lock() 240 raw, ok := c.podLogs[podLogKey{ 241 namespace: namespace, 242 podName: podName, 243 container: container, 244 }] 245 c.mu.Unlock() 246 if !ok { 247 return nil, fmt.Errorf( 248 "%w: pod log %s/%s[%s]", 249 ErrNotFound, namespace, podName, container, 250 ) 251 } 252 if !opts.Follow { 253 return io.NopCloser(strings.NewReader(raw)), nil 254 } 255 // In Follow mode, mirror the kube apiserver: yield the seeded bytes, 256 // then keep the stream open until ctx is cancelled. Tests that exercise 257 // the live-streaming path drive termination by cancelling the context; 258 // tests that want a one-shot snapshot should leave Follow=false. 259 return &fakeFollowReader{rest: raw, ctx: ctx}, nil 260} 261 262// fakeFollowReader returns the seeded log bytes, then blocks on ctx.Done() 263// before EOFing. This matches the apiserver behaviour for follow=true: 264// the connection only closes once the container terminates (or the caller 265// hangs up), so callers using bufio.Scanner do not see a premature EOF and 266// treat a still-running container as completed. 267type fakeFollowReader struct { 268 rest string 269 ctx context.Context 270 done bool 271} 272 273var _ io.ReadCloser = (*fakeFollowReader)(nil) 274 275func (r *fakeFollowReader) Read(p []byte) (int, error) { 276 if len(r.rest) > 0 { 277 n := copy(p, r.rest) 278 r.rest = r.rest[n:] 279 return n, nil 280 } 281 if r.done { 282 return 0, io.EOF 283 } 284 r.done = true 285 <-r.ctx.Done() 286 return 0, io.EOF 287} 288 289func (r *fakeFollowReader) Close() error { return nil } 290 291func (c *FakeClient) createObject( 292 gvr GVR, 293 namespace string, 294 obj Object, 295) (Object, []*fakeWatch, error) { 296 c.mu.Lock() 297 defer c.mu.Unlock() 298 stored := c.prepareObjectLocked(namespace, obj) 299 key := objectKey{gvr: gvr, namespace: namespace, name: stored.GetName()} 300 if _, ok := c.objects[key]; ok { 301 return nil, nil, fmt.Errorf("%w: %s/%s", ErrAlreadyExists, namespace, key.name) 302 } 303 c.objects[key] = stored 304 return stored.DeepCopy(), c.matchingWatchersLocked(gvr, namespace, stored), nil 305} 306 307func (c *FakeClient) storeObject( 308 gvr GVR, 309 namespace string, 310 obj Object, 311) (Object, []*fakeWatch) { 312 c.mu.Lock() 313 defer c.mu.Unlock() 314 stored := c.prepareObjectLocked(namespace, obj) 315 key := objectKey{gvr: gvr, namespace: namespace, name: stored.GetName()} 316 c.objects[key] = stored 317 return stored.DeepCopy(), c.matchingWatchersLocked(gvr, namespace, stored) 318} 319 320func (c *FakeClient) prepareObjectLocked(namespace string, obj Object) Object { 321 if obj.GetName() == "" { 322 panic("k8s.FakeClient: object missing metadata.name") 323 } 324 meta := ensureNestedMap(obj, "metadata") 325 if namespace != "" && meta["namespace"] == nil { 326 meta["namespace"] = namespace 327 } 328 if obj.GetUID() == "" { 329 c.nextUID++ 330 obj.SetUID(fmt.Sprintf("fake-%d", c.nextUID)) 331 } 332 if obj.GetCreationTimestamp().IsZero() { 333 obj.SetCreationTimestamp(time.Now().UTC()) 334 } 335 return obj 336} 337 338func (c *FakeClient) matchingWatchersLocked( 339 gvr GVR, 340 namespace string, 341 obj Object, 342) []*fakeWatch { 343 watchers := make([]*fakeWatch, 0) 344 for _, registration := range c.watchers { 345 if registration.gvr != gvr || registration.namespace != namespace { 346 continue 347 } 348 if !matchesListOptions(obj, registration.opts) { 349 continue 350 } 351 watchers = append(watchers, registration.watch) 352 } 353 return watchers 354} 355 356func (c *FakeClient) broadcast(watchers []*fakeWatch, event WatchEvent) { 357 for _, watcher := range watchers { 358 watcher.send(event) 359 } 360} 361 362type fakeWatch struct { 363 ch chan WatchEvent 364 stop func() 365 once sync.Once 366} 367 368var _ WatchInterface = (*fakeWatch)(nil) 369 370func (w *fakeWatch) ResultChan() <-chan WatchEvent { 371 return w.ch 372} 373 374func (w *fakeWatch) Stop() { 375 w.once.Do(func() { 376 if w.stop != nil { 377 w.stop() 378 } 379 close(w.ch) 380 }) 381} 382 383func (w *fakeWatch) send(event WatchEvent) { 384 defer func() { 385 _ = recover() 386 }() 387 w.ch <- WatchEvent{Type: event.Type, Object: event.Object.DeepCopy()} 388} 389 390func matchesListOptions(obj Object, opts ListOptions) bool { 391 if !matchesSelector(obj.GetLabels(), opts.LabelSelector) { 392 return false 393 } 394 if !matchesFieldSelector(obj, opts.FieldSelector) { 395 return false 396 } 397 return true 398} 399 400func matchesFieldSelector(obj Object, selector string) bool { 401 selector = strings.TrimSpace(selector) 402 if selector == "" { 403 return true 404 } 405 parts := strings.Split(selector, ",") 406 for _, part := range parts { 407 part = strings.TrimSpace(part) 408 if part == "" { 409 continue 410 } 411 key, value, ok := strings.Cut(part, "=") 412 if !ok { 413 return false 414 } 415 if strings.TrimSpace(key) != "metadata.name" { 416 return false 417 } 418 if obj.GetName() != strings.TrimSpace(value) { 419 return false 420 } 421 } 422 return true 423} 424 425func matchesSelector(labels map[string]string, selector string) bool { 426 selector = strings.TrimSpace(selector) 427 if selector == "" { 428 return true 429 } 430 parts := strings.Split(selector, ",") 431 for _, part := range parts { 432 part = strings.TrimSpace(part) 433 if part == "" { 434 continue 435 } 436 key, value, ok := strings.Cut(part, "=") 437 if !ok { 438 return false 439 } 440 if labels[strings.TrimSpace(key)] != strings.TrimSpace(value) { 441 return false 442 } 443 } 444 return true 445} 446 447func clonePod(pod Pod) Pod { 448 out := pod 449 out.Labels = cloneLabels(pod.Labels) 450 out.InitContainers = append([]Container(nil), pod.InitContainers...) 451 out.Containers = append([]Container(nil), pod.Containers...) 452 return out 453}