diff options
| author | Pavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com> | 2024-03-26 11:54:14 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-03-26 11:54:14 +0200 |
| commit | 4e450f3ecf7a7df161956fe1574392e9c406054c (patch) | |
| tree | 7cc088fbd8e03edb21fd041e8ee6e6d9ed7f9db8 /robusta_krr/core | |
| parent | faa69edc547d36bfa7ad4edf5f5ea79906cb90de (diff) | |
Improve limited permissions (cherry-picked from #220) (#238)
* Add --as option to impersonate a specific user
* Update test case
* Don't exit if the user lacks permissions to auto-discover prometheus
* Add a comment
* Add support for HPA w/o cluster-level permissions
* feat: cli option for --as-group (#224)
* feat: cli option for --as-group
* add: as-group example
* Improve a message in case of API error
* Return the debug log with found items in cluster
---------
Co-authored-by: Robusta Runner <aantny@gmail.com>
Co-authored-by: Rohan Katkar <rohan.katkar@dnv.com>
Co-authored-by: LeaveMyYard <zhukovpave2001@gmail.com>
Diffstat (limited to 'robusta_krr/core')
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/__init__.py | 147 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/loader.py | 14 | ||||
| -rw-r--r-- | robusta_krr/core/models/config.py | 11 |
3 files changed, 95 insertions, 77 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index c7976cf..335b47a 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -10,13 +10,11 @@ from kubernetes.client.models import ( V1Container, V1DaemonSet, V1Deployment, - V1HorizontalPodAutoscalerList, V1Job, V1Pod, V1PodList, V1StatefulSet, V2HorizontalPodAutoscaler, - V2HorizontalPodAutoscalerList, ) from robusta_krr.core.models.config import settings @@ -34,15 +32,11 @@ HPAKey = tuple[str, str, str] class ClusterLoader: - def __init__(self, cluster: Optional[str]): + def __init__(self, cluster: Optional[str]=None): self.cluster = cluster # This executor will be running requests to Kubernetes API self.executor = ThreadPoolExecutor(settings.max_workers) - self.api_client = ( - config.new_client_from_config(context=cluster, config_file=settings.kubeconfig) - if cluster is not None - else None - ) + self.api_client = settings.get_kube_client(cluster) self.apps = client.AppsV1Api(api_client=self.api_client) self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) self.batch = client.BatchV1Api(api_client=self.api_client) @@ -162,7 +156,7 @@ class ClusterLoader: return ",".join(label_filters) - def __build_obj( + def __build_scannable_object( self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None ) -> K8sObjectData: name = item.metadata.name @@ -186,7 +180,48 @@ class ClusterLoader: return True return resource in settings.resources - async def _list_workflows( + async def _list_namespaced_or_global_objects( + self, + kind: KindLiteral, + all_namespaces_request: Callable, + namespaced_request: Callable + ) -> AsyncIterable[Any]: + logger.debug(f"Listing {kind}s in {self.cluster}") + loop = asyncio.get_running_loop() + + if settings.namespaces == "*": + tasks = [ + loop.run_in_executor( + self.executor, + lambda: all_namespaces_request( + watch=False, + label_selector=settings.selector, + ), + ) + ] + else: + tasks = [ + loop.run_in_executor( + self.executor, + lambda ns=namespace: namespaced_request( + namespace=ns, + watch=False, + label_selector=settings.selector, + ), + ) + for namespace in settings.namespaces + ] + + total_items = 0 + for task in asyncio.as_completed(tasks): + ret_single = await task + total_items += len(ret_single.items) + for item in ret_single.items: + yield item + + logger.debug(f"Found {total_items} {kind} in {self.cluster}") + + async def _list_scannable_objects( self, kind: KindLiteral, all_namespaces_request: Callable, @@ -201,49 +236,17 @@ class ClusterLoader: if not self.__kind_available[kind]: return - logger.debug(f"Listing {kind}s in {self.cluster}") - loop = asyncio.get_running_loop() - try: - if settings.namespaces == "*": - tasks = [ - loop.run_in_executor( - self.executor, - lambda: all_namespaces_request( - watch=False, - label_selector=settings.selector, - ), - ) - ] - else: - tasks = [ - loop.run_in_executor( - self.executor, - lambda ns=namespace: namespaced_request( - namespace=ns, - watch=False, - label_selector=settings.selector, - ), - ) - for namespace in settings.namespaces - ] - - total_items = 0 - for task in asyncio.as_completed(tasks): - ret_single = await task - total_items += len(ret_single.items) - for item in ret_single.items: - if filter_workflows is not None and not filter_workflows(item): - continue - - containers = extract_containers(item) - if asyncio.iscoroutine(containers): - containers = await containers - - for container in containers: - yield self.__build_obj(item, container, kind) - - logger.debug(f"Found {total_items} {kind} in {self.cluster}") + async for item in self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request): + if filter_workflows is not None and not filter_workflows(item): + continue + + containers = extract_containers(item) + if asyncio.iscoroutine(containers): + containers = await containers + + for container in containers: + yield self.__build_scannable_object(item, container, kind) except ApiException as e: if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: if self.__kind_available[kind]: @@ -254,7 +257,7 @@ class ClusterLoader: logger.error("Will skip this object type and continue.") def _list_deployments(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="Deployment", all_namespaces_request=self.apps.list_deployment_for_all_namespaces, namespaced_request=self.apps.list_namespaced_deployment, @@ -287,7 +290,7 @@ class ClusterLoader: # NOTE: Using custom objects API returns dicts, but all other APIs return objects # We need to handle this difference using a small wrapper - return self._list_workflows( + return self._list_scannable_objects( kind="Rollout", all_namespaces_request=lambda **kwargs: ObjectLikeDict( self.custom_objects.list_cluster_custom_object( @@ -311,7 +314,7 @@ class ClusterLoader: def _list_deploymentconfig(self) -> AsyncIterable[K8sObjectData]: # NOTE: Using custom objects API returns dicts, but all other APIs return objects # We need to handle this difference using a small wrapper - return self._list_workflows( + return self._list_scannable_objects( kind="DeploymentConfig", all_namespaces_request=lambda **kwargs: ObjectLikeDict( self.custom_objects.list_cluster_custom_object( @@ -333,7 +336,7 @@ class ClusterLoader: ) def _list_all_statefulsets(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="StatefulSet", all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces, namespaced_request=self.apps.list_namespaced_stateful_set, @@ -341,7 +344,7 @@ class ClusterLoader: ) def _list_all_daemon_set(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="DaemonSet", all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces, namespaced_request=self.apps.list_namespaced_daemon_set, @@ -349,7 +352,7 @@ class ClusterLoader: ) def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="Job", all_namespaces_request=self.batch.list_job_for_all_namespaces, namespaced_request=self.batch.list_namespaced_job, @@ -361,7 +364,7 @@ class ClusterLoader: ) def _list_all_cronjobs(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="CronJob", all_namespaces_request=self.batch.list_cron_job_for_all_namespaces, namespaced_request=self.batch.list_namespaced_cron_job, @@ -370,11 +373,14 @@ class ClusterLoader: async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() - - res: V1HorizontalPodAutoscalerList = await loop.run_in_executor( - self.executor, lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False) + res = await loop.run_in_executor( + self.executor, + lambda: self._list_namespaced_or_global_objects( + kind="HPA-v1", + all_namespaces_request=self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces, + namespaced_request=self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler, + ), ) - return { ( hpa.metadata.namespace, @@ -388,17 +394,19 @@ class ClusterLoader: target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, target_memory_utilization_percentage=None, ) - for hpa in res.items + async for hpa in res } async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() - - res: V2HorizontalPodAutoscalerList = await loop.run_in_executor( + res = await loop.run_in_executor( self.executor, - lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False), + lambda: self._list_namespaced_or_global_objects( + kind="HPA-v2", + all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces, + namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler, + ), ) - def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: return next( ( @@ -408,7 +416,6 @@ class ClusterLoader: ), None, ) - return { ( hpa.metadata.namespace, @@ -422,7 +429,7 @@ class ClusterLoader: target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), target_memory_utilization_percentage=__get_metric(hpa, "memory"), ) - for hpa in res.items + async for hpa in res } # TODO: What should we do in case of other metrics bound to the HPA? diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 5593d69..b9b600f 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Optional from kubernetes import config as k8s_config from kubernetes.client.api_client import ApiClient +from kubernetes.client.exceptions import ApiException from prometrix import MetricsNotFound, PrometheusNotFound from robusta_krr.core.models.config import settings @@ -38,13 +39,7 @@ class PrometheusMetricsLoader: """ self.executor = ThreadPoolExecutor(settings.max_workers) - logger.info(f"Prometheus loader max workers: {settings.max_workers}") - - self.api_client = ( - k8s_config.new_client_from_config(config_file=settings.kubeconfig, context=cluster) - if cluster is not None - else None - ) + self.api_client = settings.get_kube_client(context=cluster) loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) if loader is None: raise PrometheusNotFound("No Prometheus or metrics service found") @@ -67,6 +62,11 @@ class PrometheusMetricsLoader: return loader except MetricsNotFound as e: logger.info(f"{service_name} not found: {e}") + except ApiException as e: + logger.warning( + f"Unable to automatically discover a {service_name} in the cluster ({e}). " + "Try specifying how to connect to Prometheus via cli options" + ) return None diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 36787d4..ac3b15c 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -23,6 +23,8 @@ class Config(pd.BaseSettings): clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None + impersonate_user: Optional[str] = None + impersonate_group: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") resources: Union[list[KindLiteral], Literal["*"]] = pd.Field("*") selector: Optional[str] = None @@ -141,6 +143,15 @@ class Config(pd.BaseSettings): else: self.inside_cluster = True + def get_kube_client(self, context: Optional[str] = None): + api_client = config.new_client_from_config(context=context, config_file=self.kubeconfig) + if self.impersonate_user is not None: + # trick copied from https://github.com/kubernetes-client/python/issues/362 + api_client.set_default_header("Impersonate-User", self.impersonate_user) + if self.impersonate_group is not None: + api_client.set_default_header("Impersonate-Group", self.impersonate_group) + return api_client + @staticmethod def set_config(config: Config) -> None: global _config |
