Stitch any CI into Tangled
1package k8s
2
3import (
4 "bytes"
5 "context"
6 "crypto/tls"
7 "crypto/x509"
8 "encoding/json"
9 "fmt"
10 "io"
11 "net/http"
12 "net/url"
13 "os"
14 "strings"
15 "sync"
16 "time"
17)
18
19const (
20 serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount"
21 serviceTokenPath = serviceAccountDir + "/token"
22 serviceAccountCA = serviceAccountDir + "/ca.crt"
23 podsResource = "pods"
24 podLogsSubresource = "log"
25)
26
27// InClusterClient talks to the Kubernetes API server from inside the cluster
28// using the pod's mounted service-account credentials.
29type InClusterClient struct {
30 baseURL string
31 token string
32 http *http.Client
33}
34
35var _ Client = (*InClusterClient)(nil)
36
37// NewInClusterClient loads the conventional in-cluster service-account files and
38// builds an HTTPS client pinned to the cluster CA.
39func NewInClusterClient() (*InClusterClient, error) {
40 host := os.Getenv("KUBERNETES_SERVICE_HOST")
41 port := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")
42 if port == "" {
43 port = os.Getenv("KUBERNETES_SERVICE_PORT")
44 }
45 if host == "" || port == "" {
46 return nil, fmt.Errorf(
47 "kubernetes service env is incomplete: host=%q port=%q",
48 host, port,
49 )
50 }
51
52 token, err := os.ReadFile(serviceTokenPath)
53 if err != nil {
54 return nil, fmt.Errorf("read service-account token: %w", err)
55 }
56 caPEM, err := os.ReadFile(serviceAccountCA)
57 if err != nil {
58 return nil, fmt.Errorf("read service-account CA: %w", err)
59 }
60
61 roots := x509.NewCertPool()
62 if !roots.AppendCertsFromPEM(caPEM) {
63 return nil, fmt.Errorf("parse service-account CA: no certificates found")
64 }
65
66 transport := http.DefaultTransport.(*http.Transport).Clone()
67 transport.TLSClientConfig = &tls.Config{RootCAs: roots}
68
69 return newClient(
70 fmt.Sprintf("https://%s:%s", host, port),
71 strings.TrimSpace(string(token)),
72 &http.Client{Transport: transport},
73 ), nil
74}
75
76func newClient(baseURL, token string, httpClient *http.Client) *InClusterClient {
77 return &InClusterClient{
78 baseURL: strings.TrimRight(baseURL, "/"),
79 token: token,
80 http: httpClient,
81 }
82}
83
84func (c *InClusterClient) CreateObject(
85 ctx context.Context,
86 gvr GVR,
87 namespace string,
88 obj Object,
89) (Object, error) {
90 body, err := json.Marshal(obj)
91 if err != nil {
92 return nil, fmt.Errorf("marshal object: %w", err)
93 }
94 resp, err := c.do(ctx, http.MethodPost, resourcePath(gvr, namespace, ""),
95 nil, bytes.NewReader(body), "application/json")
96 if err != nil {
97 return nil, err
98 }
99 defer resp.Body.Close()
100
101 var out map[string]any
102 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
103 return nil, fmt.Errorf("decode created object: %w", err)
104 }
105 return Object(out), nil
106}
107
108func (c *InClusterClient) GetObject(
109 ctx context.Context,
110 gvr GVR,
111 namespace string,
112 name string,
113) (Object, error) {
114 resp, err := c.do(ctx, http.MethodGet, resourcePath(gvr, namespace, name),
115 nil, nil, "")
116 if err != nil {
117 return nil, err
118 }
119 defer resp.Body.Close()
120
121 var out map[string]any
122 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
123 return nil, fmt.Errorf("decode object: %w", err)
124 }
125 return Object(out), nil
126}
127
128func (c *InClusterClient) ListObjects(
129 ctx context.Context,
130 gvr GVR,
131 namespace string,
132 opts ListOptions,
133) ([]Object, error) {
134 resp, err := c.do(ctx, http.MethodGet, resourcePath(gvr, namespace, ""),
135 listQuery(opts), nil, "")
136 if err != nil {
137 return nil, err
138 }
139 defer resp.Body.Close()
140
141 var out struct {
142 Items []map[string]any `json:"items"`
143 }
144 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
145 return nil, fmt.Errorf("decode object list: %w", err)
146 }
147 items := make([]Object, 0, len(out.Items))
148 for _, item := range out.Items {
149 items = append(items, Object(item))
150 }
151 return items, nil
152}
153
154func (c *InClusterClient) WatchObjects(
155 ctx context.Context,
156 gvr GVR,
157 namespace string,
158 opts ListOptions,
159) (WatchInterface, error) {
160 watchCtx, cancel := context.WithCancel(ctx)
161 query := listQuery(opts)
162 query.Set("watch", "1")
163
164 resp, err := c.do(watchCtx, http.MethodGet,
165 resourcePath(gvr, namespace, ""), query, nil, "")
166 if err != nil {
167 cancel()
168 return nil, err
169 }
170
171 w := &httpWatch{
172 ch: make(chan WatchEvent),
173 cancel: cancel,
174 }
175 go w.run(watchCtx, resp)
176 return w, nil
177}
178
179func (c *InClusterClient) ListPods(
180 ctx context.Context,
181 namespace string,
182 labelSelector string,
183) ([]Pod, error) {
184 query := url.Values{}
185 if labelSelector != "" {
186 query.Set("labelSelector", labelSelector)
187 }
188 resp, err := c.do(ctx, http.MethodGet, coreResourcePath(namespace, podsResource, "", ""),
189 query, nil, "")
190 if err != nil {
191 return nil, err
192 }
193 defer resp.Body.Close()
194
195 var out struct {
196 Items []struct {
197 Metadata struct {
198 Name string `json:"name"`
199 Namespace string `json:"namespace"`
200 UID string `json:"uid"`
201 Labels map[string]string `json:"labels"`
202 CreationTimestamp time.Time `json:"creationTimestamp"`
203 } `json:"metadata"`
204 Spec struct {
205 InitContainers []struct {
206 Name string `json:"name"`
207 } `json:"initContainers"`
208 Containers []struct {
209 Name string `json:"name"`
210 } `json:"containers"`
211 } `json:"spec"`
212 } `json:"items"`
213 }
214 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
215 return nil, fmt.Errorf("decode pod list: %w", err)
216 }
217
218 pods := make([]Pod, 0, len(out.Items))
219 for _, item := range out.Items {
220 pod := Pod{
221 Name: item.Metadata.Name,
222 Namespace: item.Metadata.Namespace,
223 UID: item.Metadata.UID,
224 Labels: cloneLabels(item.Metadata.Labels),
225 CreationTimestamp: item.Metadata.CreationTimestamp,
226 InitContainers: make([]Container, 0, len(item.Spec.InitContainers)),
227 Containers: make([]Container, 0, len(item.Spec.Containers)),
228 }
229 for _, container := range item.Spec.InitContainers {
230 pod.InitContainers = append(pod.InitContainers, Container{Name: container.Name})
231 }
232 for _, container := range item.Spec.Containers {
233 pod.Containers = append(pod.Containers, Container{Name: container.Name})
234 }
235 pods = append(pods, pod)
236 }
237 return pods, nil
238}
239
240func (c *InClusterClient) StreamPodLogs(
241 ctx context.Context,
242 namespace string,
243 podName string,
244 container string,
245 opts LogOptions,
246) (io.ReadCloser, error) {
247 query := url.Values{}
248 if container != "" {
249 query.Set("container", container)
250 }
251 // follow=true makes the API server hold the connection open and push
252 // new log bytes as the container writes them, only closing when the
253 // container terminates. Without it, the response is a snapshot of
254 // whatever has been written so far, which silently truncates live
255 // runs.
256 if opts.Follow {
257 query.Set("follow", "true")
258 }
259 resp, err := c.do(ctx, http.MethodGet,
260 coreResourcePath(namespace, podsResource, podName, podLogsSubresource),
261 query, nil, "")
262 if err != nil {
263 return nil, err
264 }
265 return resp.Body, nil
266}
267
268func (c *InClusterClient) do(
269 ctx context.Context,
270 method string,
271 path string,
272 query url.Values,
273 body io.Reader,
274 contentType string,
275) (*http.Response, error) {
276 uri := c.baseURL + path
277 if encoded := query.Encode(); encoded != "" {
278 uri += "?" + encoded
279 }
280
281 req, err := http.NewRequestWithContext(ctx, method, uri, body)
282 if err != nil {
283 return nil, fmt.Errorf("build request: %w", err)
284 }
285 req.Header.Set("Authorization", "Bearer "+c.token)
286 if contentType != "" {
287 req.Header.Set("Content-Type", contentType)
288 }
289
290 resp, err := c.http.Do(req)
291 if err != nil {
292 return nil, fmt.Errorf("request %s %s: %w", method, path, err)
293 }
294 if resp.StatusCode >= 200 && resp.StatusCode < 300 {
295 return resp, nil
296 }
297 defer resp.Body.Close()
298 raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
299 bodyText := strings.TrimSpace(string(raw))
300 switch resp.StatusCode {
301 case http.StatusNotFound:
302 return nil, fmt.Errorf("%w: %s %s: %s", ErrNotFound, method, path, bodyText)
303 case http.StatusConflict:
304 return nil, fmt.Errorf("%w: %s %s: %s", ErrAlreadyExists, method, path, bodyText)
305 default:
306 return nil, fmt.Errorf("k8s api %s %s: status %d: %s",
307 method, path, resp.StatusCode, bodyText,
308 )
309 }
310}
311
312func listQuery(opts ListOptions) url.Values {
313 query := url.Values{}
314 if opts.LabelSelector != "" {
315 query.Set("labelSelector", opts.LabelSelector)
316 }
317 if opts.FieldSelector != "" {
318 query.Set("fieldSelector", opts.FieldSelector)
319 }
320 return query
321}
322
323func resourcePath(gvr GVR, namespace, name string) string {
324 prefix := "/apis/" + url.PathEscape(gvr.Group) + "/" + url.PathEscape(gvr.Version)
325 if gvr.Group == "" {
326 prefix = "/api/" + url.PathEscape(gvr.Version)
327 }
328 path := prefix
329 if namespace != "" {
330 path += "/namespaces/" + url.PathEscape(namespace)
331 }
332 path += "/" + url.PathEscape(gvr.Resource)
333 if name != "" {
334 path += "/" + url.PathEscape(name)
335 }
336 return path
337}
338
339func coreResourcePath(namespace, resource, name, subresource string) string {
340 path := "/api/v1"
341 if namespace != "" {
342 path += "/namespaces/" + url.PathEscape(namespace)
343 }
344 path += "/" + url.PathEscape(resource)
345 if name != "" {
346 path += "/" + url.PathEscape(name)
347 }
348 if subresource != "" {
349 path += "/" + url.PathEscape(subresource)
350 }
351 return path
352}
353
354func cloneLabels(in map[string]string) map[string]string {
355 if len(in) == 0 {
356 return nil
357 }
358 out := make(map[string]string, len(in))
359 for key, value := range in {
360 out[key] = value
361 }
362 return out
363}
364
365type httpWatch struct {
366 ch chan WatchEvent
367 cancel context.CancelFunc
368 once sync.Once
369}
370
371var _ WatchInterface = (*httpWatch)(nil)
372
373func (w *httpWatch) ResultChan() <-chan WatchEvent {
374 return w.ch
375}
376
377func (w *httpWatch) Stop() {
378 w.close()
379}
380
381func (w *httpWatch) run(ctx context.Context, resp *http.Response) {
382 defer resp.Body.Close()
383 defer w.close()
384
385 dec := json.NewDecoder(resp.Body)
386 for {
387 var event struct {
388 Type string `json:"type"`
389 Object map[string]any `json:"object"`
390 }
391 if err := dec.Decode(&event); err != nil {
392 if err == io.EOF || ctx.Err() != nil {
393 return
394 }
395 return
396 }
397
398 select {
399 case w.ch <- WatchEvent{Type: event.Type, Object: Object(event.Object)}:
400 case <-ctx.Done():
401 return
402 }
403 }
404}
405
406func (w *httpWatch) close() {
407 w.once.Do(func() {
408 w.cancel()
409 close(w.ch)
410 })
411}