Stitch any CI into Tangled
3

Configure Feed

Select the types of activity you want to include in your feed.

at main 10 kB View raw
1package k8s 2 3import ( 4 "bytes" 5 "context" 6 "crypto/tls" 7 "crypto/x509" 8 "encoding/json" 9 "fmt" 10 "io" 11 "net/http" 12 "net/url" 13 "os" 14 "strings" 15 "sync" 16 "time" 17) 18 19const ( 20 serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount" 21 serviceTokenPath = serviceAccountDir + "/token" 22 serviceAccountCA = serviceAccountDir + "/ca.crt" 23 podsResource = "pods" 24 podLogsSubresource = "log" 25) 26 27// InClusterClient talks to the Kubernetes API server from inside the cluster 28// using the pod's mounted service-account credentials. 29type InClusterClient struct { 30 baseURL string 31 token string 32 http *http.Client 33} 34 35var _ Client = (*InClusterClient)(nil) 36 37// NewInClusterClient loads the conventional in-cluster service-account files and 38// builds an HTTPS client pinned to the cluster CA. 39func NewInClusterClient() (*InClusterClient, error) { 40 host := os.Getenv("KUBERNETES_SERVICE_HOST") 41 port := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS") 42 if port == "" { 43 port = os.Getenv("KUBERNETES_SERVICE_PORT") 44 } 45 if host == "" || port == "" { 46 return nil, fmt.Errorf( 47 "kubernetes service env is incomplete: host=%q port=%q", 48 host, port, 49 ) 50 } 51 52 token, err := os.ReadFile(serviceTokenPath) 53 if err != nil { 54 return nil, fmt.Errorf("read service-account token: %w", err) 55 } 56 caPEM, err := os.ReadFile(serviceAccountCA) 57 if err != nil { 58 return nil, fmt.Errorf("read service-account CA: %w", err) 59 } 60 61 roots := x509.NewCertPool() 62 if !roots.AppendCertsFromPEM(caPEM) { 63 return nil, fmt.Errorf("parse service-account CA: no certificates found") 64 } 65 66 transport := http.DefaultTransport.(*http.Transport).Clone() 67 transport.TLSClientConfig = &tls.Config{RootCAs: roots} 68 69 return newClient( 70 fmt.Sprintf("https://%s:%s", host, port), 71 strings.TrimSpace(string(token)), 72 &http.Client{Transport: transport}, 73 ), nil 74} 75 76func newClient(baseURL, token string, httpClient *http.Client) *InClusterClient { 77 return &InClusterClient{ 78 baseURL: strings.TrimRight(baseURL, "/"), 79 token: token, 80 http: httpClient, 81 } 82} 83 84func (c *InClusterClient) CreateObject( 85 ctx context.Context, 86 gvr GVR, 87 namespace string, 88 obj Object, 89) (Object, error) { 90 body, err := json.Marshal(obj) 91 if err != nil { 92 return nil, fmt.Errorf("marshal object: %w", err) 93 } 94 resp, err := c.do(ctx, http.MethodPost, resourcePath(gvr, namespace, ""), 95 nil, bytes.NewReader(body), "application/json") 96 if err != nil { 97 return nil, err 98 } 99 defer resp.Body.Close() 100 101 var out map[string]any 102 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 103 return nil, fmt.Errorf("decode created object: %w", err) 104 } 105 return Object(out), nil 106} 107 108func (c *InClusterClient) GetObject( 109 ctx context.Context, 110 gvr GVR, 111 namespace string, 112 name string, 113) (Object, error) { 114 resp, err := c.do(ctx, http.MethodGet, resourcePath(gvr, namespace, name), 115 nil, nil, "") 116 if err != nil { 117 return nil, err 118 } 119 defer resp.Body.Close() 120 121 var out map[string]any 122 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 123 return nil, fmt.Errorf("decode object: %w", err) 124 } 125 return Object(out), nil 126} 127 128func (c *InClusterClient) ListObjects( 129 ctx context.Context, 130 gvr GVR, 131 namespace string, 132 opts ListOptions, 133) ([]Object, error) { 134 resp, err := c.do(ctx, http.MethodGet, resourcePath(gvr, namespace, ""), 135 listQuery(opts), nil, "") 136 if err != nil { 137 return nil, err 138 } 139 defer resp.Body.Close() 140 141 var out struct { 142 Items []map[string]any `json:"items"` 143 } 144 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 145 return nil, fmt.Errorf("decode object list: %w", err) 146 } 147 items := make([]Object, 0, len(out.Items)) 148 for _, item := range out.Items { 149 items = append(items, Object(item)) 150 } 151 return items, nil 152} 153 154func (c *InClusterClient) WatchObjects( 155 ctx context.Context, 156 gvr GVR, 157 namespace string, 158 opts ListOptions, 159) (WatchInterface, error) { 160 watchCtx, cancel := context.WithCancel(ctx) 161 query := listQuery(opts) 162 query.Set("watch", "1") 163 164 resp, err := c.do(watchCtx, http.MethodGet, 165 resourcePath(gvr, namespace, ""), query, nil, "") 166 if err != nil { 167 cancel() 168 return nil, err 169 } 170 171 w := &httpWatch{ 172 ch: make(chan WatchEvent), 173 cancel: cancel, 174 } 175 go w.run(watchCtx, resp) 176 return w, nil 177} 178 179func (c *InClusterClient) ListPods( 180 ctx context.Context, 181 namespace string, 182 labelSelector string, 183) ([]Pod, error) { 184 query := url.Values{} 185 if labelSelector != "" { 186 query.Set("labelSelector", labelSelector) 187 } 188 resp, err := c.do(ctx, http.MethodGet, coreResourcePath(namespace, podsResource, "", ""), 189 query, nil, "") 190 if err != nil { 191 return nil, err 192 } 193 defer resp.Body.Close() 194 195 var out struct { 196 Items []struct { 197 Metadata struct { 198 Name string `json:"name"` 199 Namespace string `json:"namespace"` 200 UID string `json:"uid"` 201 Labels map[string]string `json:"labels"` 202 CreationTimestamp time.Time `json:"creationTimestamp"` 203 } `json:"metadata"` 204 Spec struct { 205 InitContainers []struct { 206 Name string `json:"name"` 207 } `json:"initContainers"` 208 Containers []struct { 209 Name string `json:"name"` 210 } `json:"containers"` 211 } `json:"spec"` 212 } `json:"items"` 213 } 214 if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 215 return nil, fmt.Errorf("decode pod list: %w", err) 216 } 217 218 pods := make([]Pod, 0, len(out.Items)) 219 for _, item := range out.Items { 220 pod := Pod{ 221 Name: item.Metadata.Name, 222 Namespace: item.Metadata.Namespace, 223 UID: item.Metadata.UID, 224 Labels: cloneLabels(item.Metadata.Labels), 225 CreationTimestamp: item.Metadata.CreationTimestamp, 226 InitContainers: make([]Container, 0, len(item.Spec.InitContainers)), 227 Containers: make([]Container, 0, len(item.Spec.Containers)), 228 } 229 for _, container := range item.Spec.InitContainers { 230 pod.InitContainers = append(pod.InitContainers, Container{Name: container.Name}) 231 } 232 for _, container := range item.Spec.Containers { 233 pod.Containers = append(pod.Containers, Container{Name: container.Name}) 234 } 235 pods = append(pods, pod) 236 } 237 return pods, nil 238} 239 240func (c *InClusterClient) StreamPodLogs( 241 ctx context.Context, 242 namespace string, 243 podName string, 244 container string, 245 opts LogOptions, 246) (io.ReadCloser, error) { 247 query := url.Values{} 248 if container != "" { 249 query.Set("container", container) 250 } 251 // follow=true makes the API server hold the connection open and push 252 // new log bytes as the container writes them, only closing when the 253 // container terminates. Without it, the response is a snapshot of 254 // whatever has been written so far, which silently truncates live 255 // runs. 256 if opts.Follow { 257 query.Set("follow", "true") 258 } 259 resp, err := c.do(ctx, http.MethodGet, 260 coreResourcePath(namespace, podsResource, podName, podLogsSubresource), 261 query, nil, "") 262 if err != nil { 263 return nil, err 264 } 265 return resp.Body, nil 266} 267 268func (c *InClusterClient) do( 269 ctx context.Context, 270 method string, 271 path string, 272 query url.Values, 273 body io.Reader, 274 contentType string, 275) (*http.Response, error) { 276 uri := c.baseURL + path 277 if encoded := query.Encode(); encoded != "" { 278 uri += "?" + encoded 279 } 280 281 req, err := http.NewRequestWithContext(ctx, method, uri, body) 282 if err != nil { 283 return nil, fmt.Errorf("build request: %w", err) 284 } 285 req.Header.Set("Authorization", "Bearer "+c.token) 286 if contentType != "" { 287 req.Header.Set("Content-Type", contentType) 288 } 289 290 resp, err := c.http.Do(req) 291 if err != nil { 292 return nil, fmt.Errorf("request %s %s: %w", method, path, err) 293 } 294 if resp.StatusCode >= 200 && resp.StatusCode < 300 { 295 return resp, nil 296 } 297 defer resp.Body.Close() 298 raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) 299 bodyText := strings.TrimSpace(string(raw)) 300 switch resp.StatusCode { 301 case http.StatusNotFound: 302 return nil, fmt.Errorf("%w: %s %s: %s", ErrNotFound, method, path, bodyText) 303 case http.StatusConflict: 304 return nil, fmt.Errorf("%w: %s %s: %s", ErrAlreadyExists, method, path, bodyText) 305 default: 306 return nil, fmt.Errorf("k8s api %s %s: status %d: %s", 307 method, path, resp.StatusCode, bodyText, 308 ) 309 } 310} 311 312func listQuery(opts ListOptions) url.Values { 313 query := url.Values{} 314 if opts.LabelSelector != "" { 315 query.Set("labelSelector", opts.LabelSelector) 316 } 317 if opts.FieldSelector != "" { 318 query.Set("fieldSelector", opts.FieldSelector) 319 } 320 return query 321} 322 323func resourcePath(gvr GVR, namespace, name string) string { 324 prefix := "/apis/" + url.PathEscape(gvr.Group) + "/" + url.PathEscape(gvr.Version) 325 if gvr.Group == "" { 326 prefix = "/api/" + url.PathEscape(gvr.Version) 327 } 328 path := prefix 329 if namespace != "" { 330 path += "/namespaces/" + url.PathEscape(namespace) 331 } 332 path += "/" + url.PathEscape(gvr.Resource) 333 if name != "" { 334 path += "/" + url.PathEscape(name) 335 } 336 return path 337} 338 339func coreResourcePath(namespace, resource, name, subresource string) string { 340 path := "/api/v1" 341 if namespace != "" { 342 path += "/namespaces/" + url.PathEscape(namespace) 343 } 344 path += "/" + url.PathEscape(resource) 345 if name != "" { 346 path += "/" + url.PathEscape(name) 347 } 348 if subresource != "" { 349 path += "/" + url.PathEscape(subresource) 350 } 351 return path 352} 353 354func cloneLabels(in map[string]string) map[string]string { 355 if len(in) == 0 { 356 return nil 357 } 358 out := make(map[string]string, len(in)) 359 for key, value := range in { 360 out[key] = value 361 } 362 return out 363} 364 365type httpWatch struct { 366 ch chan WatchEvent 367 cancel context.CancelFunc 368 once sync.Once 369} 370 371var _ WatchInterface = (*httpWatch)(nil) 372 373func (w *httpWatch) ResultChan() <-chan WatchEvent { 374 return w.ch 375} 376 377func (w *httpWatch) Stop() { 378 w.close() 379} 380 381func (w *httpWatch) run(ctx context.Context, resp *http.Response) { 382 defer resp.Body.Close() 383 defer w.close() 384 385 dec := json.NewDecoder(resp.Body) 386 for { 387 var event struct { 388 Type string `json:"type"` 389 Object map[string]any `json:"object"` 390 } 391 if err := dec.Decode(&event); err != nil { 392 if err == io.EOF || ctx.Err() != nil { 393 return 394 } 395 return 396 } 397 398 select { 399 case w.ch <- WatchEvent{Type: event.Type, Object: Object(event.Object)}: 400 case <-ctx.Done(): 401 return 402 } 403 } 404} 405 406func (w *httpWatch) close() { 407 w.once.Do(func() { 408 w.cancel() 409 close(w.ch) 410 }) 411}