diff options
| author | Natan Yellin <aantn@users.noreply.github.com> | 2024-03-27 05:58:42 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-03-27 05:58:42 +0200 |
| commit | 5546aef84aee5c2e290735f9cc658b8461eef4a1 (patch) | |
| tree | 5d00fb022c6d54985648d82918efceac02978ef8 /robusta_krr | |
| parent | 9fc5752297567790563ce1cd6e8f0212756a9373 (diff) | |
| parent | ba140253ac140acba61a0caf55791bed179ef297 (diff) | |
Merge branch 'main' into show_cluster_name_flag
Diffstat (limited to 'robusta_krr')
8 files changed, 162 insertions, 107 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index c7976cf..335b47a 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -10,13 +10,11 @@ from kubernetes.client.models import ( V1Container, V1DaemonSet, V1Deployment, - V1HorizontalPodAutoscalerList, V1Job, V1Pod, V1PodList, V1StatefulSet, V2HorizontalPodAutoscaler, - V2HorizontalPodAutoscalerList, ) from robusta_krr.core.models.config import settings @@ -34,15 +32,11 @@ HPAKey = tuple[str, str, str] class ClusterLoader: - def __init__(self, cluster: Optional[str]): + def __init__(self, cluster: Optional[str]=None): self.cluster = cluster # This executor will be running requests to Kubernetes API self.executor = ThreadPoolExecutor(settings.max_workers) - self.api_client = ( - config.new_client_from_config(context=cluster, config_file=settings.kubeconfig) - if cluster is not None - else None - ) + self.api_client = settings.get_kube_client(cluster) self.apps = client.AppsV1Api(api_client=self.api_client) self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) self.batch = client.BatchV1Api(api_client=self.api_client) @@ -162,7 +156,7 @@ class ClusterLoader: return ",".join(label_filters) - def __build_obj( + def __build_scannable_object( self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None ) -> K8sObjectData: name = item.metadata.name @@ -186,7 +180,48 @@ class ClusterLoader: return True return resource in settings.resources - async def _list_workflows( + async def _list_namespaced_or_global_objects( + self, + kind: KindLiteral, + all_namespaces_request: Callable, + namespaced_request: Callable + ) -> AsyncIterable[Any]: + logger.debug(f"Listing {kind}s in {self.cluster}") + loop = asyncio.get_running_loop() + + if settings.namespaces == "*": + tasks = [ + loop.run_in_executor( + self.executor, + lambda: all_namespaces_request( + watch=False, + label_selector=settings.selector, + ), + ) + ] + else: + tasks = [ + loop.run_in_executor( + self.executor, + lambda ns=namespace: namespaced_request( + namespace=ns, + watch=False, + label_selector=settings.selector, + ), + ) + for namespace in settings.namespaces + ] + + total_items = 0 + for task in asyncio.as_completed(tasks): + ret_single = await task + total_items += len(ret_single.items) + for item in ret_single.items: + yield item + + logger.debug(f"Found {total_items} {kind} in {self.cluster}") + + async def _list_scannable_objects( self, kind: KindLiteral, all_namespaces_request: Callable, @@ -201,49 +236,17 @@ class ClusterLoader: if not self.__kind_available[kind]: return - logger.debug(f"Listing {kind}s in {self.cluster}") - loop = asyncio.get_running_loop() - try: - if settings.namespaces == "*": - tasks = [ - loop.run_in_executor( - self.executor, - lambda: all_namespaces_request( - watch=False, - label_selector=settings.selector, - ), - ) - ] - else: - tasks = [ - loop.run_in_executor( - self.executor, - lambda ns=namespace: namespaced_request( - namespace=ns, - watch=False, - label_selector=settings.selector, - ), - ) - for namespace in settings.namespaces - ] - - total_items = 0 - for task in asyncio.as_completed(tasks): - ret_single = await task - total_items += len(ret_single.items) - for item in ret_single.items: - if filter_workflows is not None and not filter_workflows(item): - continue - - containers = extract_containers(item) - if asyncio.iscoroutine(containers): - containers = await containers - - for container in containers: - yield self.__build_obj(item, container, kind) - - logger.debug(f"Found {total_items} {kind} in {self.cluster}") + async for item in self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request): + if filter_workflows is not None and not filter_workflows(item): + continue + + containers = extract_containers(item) + if asyncio.iscoroutine(containers): + containers = await containers + + for container in containers: + yield self.__build_scannable_object(item, container, kind) except ApiException as e: if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: if self.__kind_available[kind]: @@ -254,7 +257,7 @@ class ClusterLoader: logger.error("Will skip this object type and continue.") def _list_deployments(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="Deployment", all_namespaces_request=self.apps.list_deployment_for_all_namespaces, namespaced_request=self.apps.list_namespaced_deployment, @@ -287,7 +290,7 @@ class ClusterLoader: # NOTE: Using custom objects API returns dicts, but all other APIs return objects # We need to handle this difference using a small wrapper - return self._list_workflows( + return self._list_scannable_objects( kind="Rollout", all_namespaces_request=lambda **kwargs: ObjectLikeDict( self.custom_objects.list_cluster_custom_object( @@ -311,7 +314,7 @@ class ClusterLoader: def _list_deploymentconfig(self) -> AsyncIterable[K8sObjectData]: # NOTE: Using custom objects API returns dicts, but all other APIs return objects # We need to handle this difference using a small wrapper - return self._list_workflows( + return self._list_scannable_objects( kind="DeploymentConfig", all_namespaces_request=lambda **kwargs: ObjectLikeDict( self.custom_objects.list_cluster_custom_object( @@ -333,7 +336,7 @@ class ClusterLoader: ) def _list_all_statefulsets(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="StatefulSet", all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces, namespaced_request=self.apps.list_namespaced_stateful_set, @@ -341,7 +344,7 @@ class ClusterLoader: ) def _list_all_daemon_set(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="DaemonSet", all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces, namespaced_request=self.apps.list_namespaced_daemon_set, @@ -349,7 +352,7 @@ class ClusterLoader: ) def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="Job", all_namespaces_request=self.batch.list_job_for_all_namespaces, namespaced_request=self.batch.list_namespaced_job, @@ -361,7 +364,7 @@ class ClusterLoader: ) def _list_all_cronjobs(self) -> AsyncIterable[K8sObjectData]: - return self._list_workflows( + return self._list_scannable_objects( kind="CronJob", all_namespaces_request=self.batch.list_cron_job_for_all_namespaces, namespaced_request=self.batch.list_namespaced_cron_job, @@ -370,11 +373,14 @@ class ClusterLoader: async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() - - res: V1HorizontalPodAutoscalerList = await loop.run_in_executor( - self.executor, lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False) + res = await loop.run_in_executor( + self.executor, + lambda: self._list_namespaced_or_global_objects( + kind="HPA-v1", + all_namespaces_request=self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces, + namespaced_request=self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler, + ), ) - return { ( hpa.metadata.namespace, @@ -388,17 +394,19 @@ class ClusterLoader: target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, target_memory_utilization_percentage=None, ) - for hpa in res.items + async for hpa in res } async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() - - res: V2HorizontalPodAutoscalerList = await loop.run_in_executor( + res = await loop.run_in_executor( self.executor, - lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False), + lambda: self._list_namespaced_or_global_objects( + kind="HPA-v2", + all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces, + namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler, + ), ) - def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: return next( ( @@ -408,7 +416,6 @@ class ClusterLoader: ), None, ) - return { ( hpa.metadata.namespace, @@ -422,7 +429,7 @@ class ClusterLoader: target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), target_memory_utilization_percentage=__get_metric(hpa, "memory"), ) - for hpa in res.items + async for hpa in res } # TODO: What should we do in case of other metrics bound to the HPA? diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index df5af96..82c7d74 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Optional from kubernetes import config as k8s_config from kubernetes.client.api_client import ApiClient +from kubernetes.client.exceptions import ApiException from prometrix import MetricsNotFound, PrometheusNotFound from robusta_krr.core.models.config import settings @@ -21,12 +22,6 @@ if TYPE_CHECKING: logger = logging.getLogger("krr") -METRICS_SERVICES = { - "Prometheus": PrometheusMetricsService, - "Victoria Metrics": VictoriaMetricsService, - "Thanos": ThanosMetricsService, -} - class PrometheusMetricsLoader: def __init__(self, *, cluster: Optional[str] = None) -> None: @@ -38,34 +33,47 @@ class PrometheusMetricsLoader: """ self.executor = ThreadPoolExecutor(settings.max_workers) - - self.api_client = ( - k8s_config.new_client_from_config(config_file=settings.kubeconfig, context=cluster) - if cluster is not None - else None - ) + self.api_client = settings.get_kube_client(context=cluster) loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) if loader is None: - raise PrometheusNotFound("No Prometheus or metrics service found") + raise PrometheusNotFound( + f"Wasn't able to connect to any Prometheus service in {cluster or 'inner'} cluster\n" + "Try using port-forwarding and/or setting the url manually (using the -p flag.).\n" + "For more information, see 'Giving the Explicit Prometheus URL' at https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" + ) self.loader = loader - logger.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster") + logger.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster") def get_metrics_service( self, api_client: Optional[ApiClient] = None, cluster: Optional[str] = None, ) -> Optional[PrometheusMetricsService]: - for service_name, metric_service_class in METRICS_SERVICES.items(): + if settings.prometheus_url is not None: + logger.info("Prometheus URL is specified, will not auto-detect a metrics service") + metrics_to_check = [PrometheusMetricsService] + else: + logger.info("No Prometheus URL is specified, trying to auto-detect a metrics service") + metrics_to_check = [VictoriaMetricsService, ThanosMetricsService, PrometheusMetricsService] + + for metric_service_class in metrics_to_check: + service_name = metric_service_class.name() try: loader = metric_service_class(api_client=api_client, cluster=cluster, executor=self.executor) loader.check_connection() + except MetricsNotFound as e: + logger.info(f"{service_name} not found: {e}") + except ApiException as e: + logger.warning( + f"Unable to automatically discover a {service_name} in the cluster ({e}). " + "Try specifying how to connect to Prometheus via cli options" + ) + else: logger.info(f"{service_name} found") loader.validate_cluster_name() return loader - except MetricsNotFound as e: - logger.info(f"{service_name} not found: {e}") return None diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index 9adb5b5..a3b0ee0 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -27,9 +27,9 @@ class MetricsService(abc.ABC): def check_connection(self): ... - @property - def name(self) -> str: - classname = self.__class__.__name__ + @classmethod + def name(cls) -> str: + classname = cls.__name__ return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname @abc.abstractmethod diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 0db59c2..e61ef4c 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -38,7 +38,6 @@ class PrometheusDiscovery(MetricsServiceDiscovery): "app=prometheus,component=server", "app=prometheus-server", "app=prometheus-operator-prometheus", - "app=prometheus-msteams", "app=rancher-monitoring-prometheus", "app=prometheus-prometheus", ] @@ -61,7 +60,7 @@ class PrometheusMetricsService(MetricsService): ) -> None: super().__init__(api_client=api_client, cluster=cluster, executor=executor) - logger.info(f"Connecting to {self.name} for {self.cluster} cluster") + logger.info(f"Trying to connect to {self.name()} for {self.cluster} cluster") self.auth_header = settings.prometheus_auth_header self.ssl_enabled = settings.prometheus_ssl_enabled @@ -83,11 +82,10 @@ class PrometheusMetricsService(MetricsService): if not self.url: raise PrometheusNotFound( - f"{self.name} instance could not be found while scanning in {self.cluster} cluster.\n" - "\tTry using port-forwarding and/or setting the url manually (using the -p flag.)." + f"{self.name()} instance could not be found while scanning in {self.cluster} cluster." ) - logger.info(f"Using {self.name} at {self.url} for cluster {cluster or 'default'}") + logger.info(f"Using {self.name()} at {self.url} for cluster {cluster or 'default'}") headers = settings.prometheus_other_headers @@ -183,7 +181,7 @@ class PrometheusMetricsService(MetricsService): """ logger.debug(f"Gathering {LoaderClass.__name__} metric for {object}") - metric_loader = LoaderClass(self.prometheus, self.name, self.executor) + metric_loader = LoaderClass(self.prometheus, self.name(), self.executor) data = await metric_loader.load_data(object, period, step) if len(data) == 0: diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py index 202055f..e8fbcd0 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py @@ -42,6 +42,10 @@ class VictoriaMetricsService(PrometheusMetricsService): service_discovery = VictoriaMetricsDiscovery + @classmethod + def name(cls) -> str: + return "Victoria Metrics" + def check_connection(self): """ Checks the connection to Prometheus. diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 36787d4..ac3b15c 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -23,6 +23,8 @@ class Config(pd.BaseSettings): clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None + impersonate_user: Optional[str] = None + impersonate_group: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") resources: Union[list[KindLiteral], Literal["*"]] = pd.Field("*") selector: Optional[str] = None @@ -141,6 +143,15 @@ class Config(pd.BaseSettings): else: self.inside_cluster = True + def get_kube_client(self, context: Optional[str] = None): + api_client = config.new_client_from_config(context=context, config_file=self.kubeconfig) + if self.impersonate_user is not None: + # trick copied from https://github.com/kubernetes-client/python/issues/362 + api_client.set_default_header("Impersonate-User", self.impersonate_user) + if self.impersonate_group is not None: + api_client.set_default_header("Impersonate-Group", self.impersonate_group) + return api_client + @staticmethod def set_config(config: Config) -> None: global _config diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index f651aa8..25a85e8 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -33,6 +33,9 @@ def custom_print(*objects, rich: bool = True, force: bool = False) -> None: print_func(*objects) # type: ignore +class CriticalRunnerException(Exception): ... + + class Runner: EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) @@ -141,11 +144,11 @@ class Runner: for resource, recommendation in result.items() } - async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult: + async def _calculate_object_recommendations(self, object: K8sObjectData) -> Optional[RunResult]: prometheus_loader = self._get_prometheus_loader(object.cluster) if prometheus_loader is None: - return {resource: ResourceRecommendation.undefined("Prometheus not found") for resource in ResourceType} + return None object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta) if object.pods == []: @@ -213,11 +216,14 @@ class Runner: } ) - async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan: + async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> Optional[ResourceScan]: recommendation = await self._calculate_object_recommendations(k8s_object) self.__progressbar.progress() + if recommendation is None: + return None + return ResourceScan.calculate( k8s_object, ResourceAllocations( @@ -253,6 +259,8 @@ class Runner: scans = await asyncio.gather(*scans_tasks) + successful_scans = [scan for scan in scans if scan is not None] + if len(scans) == 0: logger.warning("Current filters resulted in no objects available to scan.") logger.warning("Try to change the filters or check if there is anything available.") @@ -260,10 +268,9 @@ class Runner: logger.warning( "Note that you are using the '*' namespace filter, which by default excludes kube-system." ) - return Result( - scans=[], - strategy=StrategyData(name=str(self._strategy).lower(), settings=self._strategy.settings.dict()), - ) + raise CriticalRunnerException("No objects available to scan.") + elif len(successful_scans) == 0: + raise CriticalRunnerException("No successful scans were made. Check the logs for more information.") return Result( scans=scans, @@ -274,7 +281,8 @@ class Runner: ), ) - async def run(self) -> None: + async def run(self) -> int: + """Run the Runner. The return value is the exit code of the program.""" self._greet() try: @@ -298,7 +306,11 @@ class Runner: result = await self._collect_result() logger.info("Result collected, displaying...") self._process_result(result) - except ClusterNotSpecifiedException as e: - logger.error(e) + except (ClusterNotSpecifiedException, CriticalRunnerException) as e: + logger.critical(e) + return 1 # Exit with error except Exception: logger.exception("An unexpected error occurred") + return 1 # Exit with error + else: + return 0 # Exit with success diff --git a/robusta_krr/main.py b/robusta_krr/main.py index e61db7f..5c2d01a 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -55,6 +55,18 @@ def load_commands() -> None: help="Path to kubeconfig file. If not provided, will attempt to find it.", rich_help_panel="Kubernetes Settings", ), + impersonate_user: Optional[str] = typer.Option( + None, + "--as", + help="Impersonate a user, just like `kubectl --as`. For example, system:serviceaccount:default:krr-account.", + rich_help_panel="Kubernetes Settings", + ), + impersonate_group: Optional[str] = typer.Option( + None, + "--as-group", + help="Impersonate a user inside of a group, just like `kubectl --as-group`. For example, system:authenticated.", + rich_help_panel="Kubernetes Settings", + ), clusters: List[str] = typer.Option( None, "--context", @@ -238,6 +250,8 @@ def load_commands() -> None: try: config = Config( kubeconfig=kubeconfig, + impersonate_user=impersonate_user, + impersonate_group=impersonate_group, clusters="*" if all_clusters else clusters, namespaces="*" if "*" in namespaces else namespaces, resources="*" if "*" in resources else resources, @@ -275,7 +289,8 @@ def load_commands() -> None: logger.exception("Error occured while parsing arguments") else: runner = Runner() - asyncio.run(runner.run()) + exit_code = asyncio.run(runner.run()) + raise typer.Exit(code=exit_code) run_strategy.__name__ = strategy_name signature = inspect.signature(run_strategy) |
