From 4c1f5c9735a8b35df515920bb337ba9962e7ac5d Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Tue, 30 Apr 2024 18:14:44 +0300 Subject: Finished structure changes and workload loaders --- robusta_krr/core/abstract/cluster_loader.py | 46 +++ robusta_krr/core/abstract/workload_loader.py | 23 ++ .../core/integrations/kubernetes/__init__.py | 101 ------ .../kubernetes/cluster_loader/__init__.py | 349 +++++++++++++++++++++ .../kubernetes/cluster_loader/loaders/__init__.py | 19 ++ .../kubernetes/cluster_loader/loaders/base.py | 116 +++++++ .../kubernetes/cluster_loader/loaders/cronjobs.py | 68 ++++ .../cluster_loader/loaders/daemonsets.py | 13 + .../cluster_loader/loaders/deploymentconfigs.py | 38 +++ .../cluster_loader/loaders/deployments.py | 13 + .../kubernetes/cluster_loader/loaders/jobs.py | 18 ++ .../kubernetes/cluster_loader/loaders/rollouts.py | 61 ++++ .../cluster_loader/loaders/statefulsets.py | 13 + .../kubernetes/workload_loader/__init__.py | 13 - .../kubernetes/workload_loader/base.py | 57 ---- .../workload_loader/kube_api/__init__.py | 323 ------------------- .../workload_loader/kube_api/loaders/__init__.py | 19 -- .../workload_loader/kube_api/loaders/base.py | 116 ------- .../workload_loader/kube_api/loaders/cronjobs.py | 68 ---- .../workload_loader/kube_api/loaders/daemonsets.py | 13 - .../kube_api/loaders/deploymentconfigs.py | 38 --- .../kube_api/loaders/deployments.py | 13 - .../workload_loader/kube_api/loaders/jobs.py | 18 -- .../workload_loader/kube_api/loaders/rollouts.py | 61 ---- .../kube_api/loaders/statefulsets.py | 13 - .../workload_loader/prometheus/__init__.py | 58 ---- .../workload_loader/prometheus/loaders/__init__.py | 9 - .../workload_loader/prometheus/loaders/base.py | 104 ------ .../prometheus/loaders/double_parent.py | 123 -------- .../prometheus/loaders/simple_parent.py | 85 ----- .../prometheus/cluster_loader/__init__.py | 78 +++++ .../prometheus/cluster_loader/loaders/__init__.py | 9 + .../prometheus/cluster_loader/loaders/base.py | 104 ++++++ .../cluster_loader/loaders/double_parent.py | 123 ++++++++ .../cluster_loader/loaders/simple_parent.py | 85 +++++ .../core/integrations/prometheus/connector.py | 96 +++--- .../metrics_service/mimir_metrics_service.py | 1 + .../metrics_service/prometheus_metrics_service.py | 43 +-- robusta_krr/core/models/config.py | 27 +- robusta_krr/core/models/exceptions.py | 2 + robusta_krr/core/runner.py | 83 +++-- robusta_krr/main.py | 16 +- 42 files changed, 1333 insertions(+), 1343 deletions(-) create mode 100644 robusta_krr/core/abstract/cluster_loader.py create mode 100644 robusta_krr/core/abstract/workload_loader.py delete mode 100644 robusta_krr/core/integrations/kubernetes/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py create mode 100644 robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/base.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py delete mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py create mode 100644 robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py create mode 100644 robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py create mode 100644 robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py create mode 100644 robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py create mode 100644 robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py create mode 100644 robusta_krr/core/models/exceptions.py diff --git a/robusta_krr/core/abstract/cluster_loader.py b/robusta_krr/core/abstract/cluster_loader.py new file mode 100644 index 0000000..86767d1 --- /dev/null +++ b/robusta_krr/core/abstract/cluster_loader.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import abc +import logging +from typing import Optional, TYPE_CHECKING + +from .workload_loader import BaseWorkloadLoader + +if TYPE_CHECKING: + from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector + + +logger = logging.getLogger("krr") + + +class BaseClusterLoader(abc.ABC): + """ + A class that wraps loading data from multiple clusters. + For example, a centralized prometheus server that can query multiple clusters. + Or one kubeconfig can define connections to multiple clusters. + """ + + @abc.abstractmethod + async def list_clusters(self) -> Optional[list[str]]: + pass + + @abc.abstractmethod + def get_workload_loader(self, cluster: Optional[str]) -> BaseWorkloadLoader: + pass + + def try_get_workload_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: + try: + return self.get_workload_loader(cluster) + except Exception as e: + logger.error(f"Could not connect to cluster {cluster} and will skip it: {e}") + return None + + @abc.abstractmethod + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + """ + Connect to a Prometheus server and return a PrometheusConnector instance. + Cluster = None means that prometheus is the only one: either centralized or in-cluster. + raise prometrix.PrometheusNotFound if Prometheus is not available. + """ + + pass diff --git a/robusta_krr/core/abstract/workload_loader.py b/robusta_krr/core/abstract/workload_loader.py new file mode 100644 index 0000000..42860ee --- /dev/null +++ b/robusta_krr/core/abstract/workload_loader.py @@ -0,0 +1,23 @@ +import abc +import logging + +from robusta_krr.core.models.objects import K8sWorkload, PodData + + +logger = logging.getLogger("krr") + + +class BaseWorkloadLoader(abc.ABC): + """A base class for single cluster workload loaders.""" + + @abc.abstractmethod + async def list_workloads(self) -> list[K8sWorkload]: + pass + + +class IListPodsFallback(abc.ABC): + """This is an interface that a workload loader can implement to have a fallback method to list pods.""" + + @abc.abstractmethod + async def load_pods(self, object: K8sWorkload) -> list[PodData]: + pass diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py deleted file mode 100644 index 403768c..0000000 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import logging -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Awaitable, Callable, Iterable, Optional, Union - -from kubernetes import client, config # type: ignore -from kubernetes.client import ApiException -from kubernetes.client.models import ( - V1Container, - V1DaemonSet, - V1Deployment, - V1Job, - V1Pod, - V1PodList, - V1StatefulSet, - V2HorizontalPodAutoscaler, -) - -from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData -from robusta_krr.core.models.result import ResourceAllocations -from robusta_krr.utils.object_like_dict import ObjectLikeDict -from prometrix import PrometheusNotFound - -from . import config_patch as _ -from .workload_loader import ( - BaseWorkloadLoader, - PrometheusWorkloadLoader, - BaseClusterLoader, - KubeAPIClusterLoader, - PrometheusClusterLoader, -) - -logger = logging.getLogger("krr") - -AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] -HPAKey = tuple[str, str, str] - - -class ClusterConnector: - EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) - - def __init__(self) -> None: - self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {} - self._connector_errors: set[Exception] = set() - - def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]: - if settings.workload_loader == "kubeapi": - logger.debug(f"Creating Prometheus connector for cluster {cluster}") - elif settings.workload_loader == "prometheus": - logger.debug(f"Creating Prometheus connector") - # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters - # so in case of multiple clusters in one Prometheus (centralized version) - # for each cluster we will have the same PrometheusConnector (keyed by None) - cluster = None - - - - def _create_cluster_loader(self) -> BaseClusterLoader: - try: - - except Exception as e: - logger.error(f"Could not connect to cluster loader and will skip it: {e}") - - return None - - async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]: - """List all scannable objects. - - Returns: - A list of all loaded objects. - """ - - if clusters is None: - _cluster_loaders = [self._try_create_cluster_loader(None)] - else: - _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters] - - self.cluster_loaders: dict[Optional[str], BaseWorkloadLoader] = { - cl.cluster: cl for cl in _cluster_loaders if cl is not None - } - - if self.cluster_loaders == {}: - logger.error("Could not load any cluster.") - return [] - - return [ - object - for cluster_loader in self.cluster_loaders.values() - for object in await cluster_loader.list_workloads() - ] - - async def load_pods(self, object: K8sWorkload) -> list[PodData]: - try: - cluster_loader = self.cluster_loaders[object.cluster] - except KeyError: - raise RuntimeError(f"Cluster loader for cluster {object.cluster} not found") from None - - return await cluster_loader.list_pods(object) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py new file mode 100644 index 0000000..ab520bb --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py @@ -0,0 +1,349 @@ +from __future__ import annotations + +import asyncio +import logging +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Awaitable, Callable, Optional + +from kubernetes import client, config # type: ignore +from kubernetes.client import ApiException # type: ignore +from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore +from functools import cache + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.exceptions import CriticalRunnerException +from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData +from robusta_krr.core.models.result import ResourceAllocations + + +from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader, IListPodsFallback +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader +from .loaders import ( + BaseKindLoader, + CronJobLoader, + DaemonSetLoader, + DeploymentConfigLoader, + DeploymentLoader, + JobLoader, + RolloutLoader, + StatefulSetLoader, +) + +logger = logging.getLogger("krr") + +HPAKey = tuple[str, str, str] + + +class KubeAPIClusterLoader(BaseClusterLoader): + # NOTE: For KubeAPIClusterLoader we have to first connect to read kubeconfig + # We do not need to connect to Prometheus from here, as we query all data from Kubernetes API + # Also here we might have different Prometeus instances for different clusters + + def __init__(self) -> None: + try: + settings.load_kubeconfig() + except Exception as e: + logger.error(f"Could not load kubernetes configuration: {e.__class__.__name__}\n{e}") + logger.error("Try to explicitly set --context and/or --kubeconfig flags.") + logger.error("Alternatively, try a prometheus-only mode with `--mode prometheus`") + raise CriticalRunnerException("Could not load kubernetes configuration") from e + + self.api_client = settings.get_kube_client() + self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {} + + async def list_clusters(self) -> Optional[list[str]]: + if settings.inside_cluster: + logger.debug("Working inside the cluster") + return None + + try: + contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) + except config.ConfigException: + if settings.clusters is not None and settings.clusters != "*": + logger.warning("Could not load context from kubeconfig.") + logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") + return settings.clusters + else: + logger.error( + "Could not load context from kubeconfig. " + "Please check your kubeconfig file or pass -c flag with the context name." + ) + return None + + logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") + logger.debug(f"Current cluster: {current_context['name']}") + logger.debug(f"Configured clusters: {settings.clusters}") + + # None, empty means current cluster + if not settings.clusters: + return [current_context["name"]] + + # * means all clusters + if settings.clusters == "*": + return [context["name"] for context in contexts] + + return [context["name"] for context in contexts if context["name"] in settings.clusters] + + @cache + def get_workload_loader(self, cluster: Optional[str]) -> KubeAPIWorkloadLoader: + return KubeAPIWorkloadLoader(cluster) + + @cache + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + connector = PrometheusConnector(cluster=cluster) + if settings.prometheus_url is not None: + logger.info(f"Connecting to Prometheus using URL: {settings.prometheus_url}") + connector.connect(settings.prometheus_url) + else: + logger.info(f"Trying to discover PromQL service" + (f" for cluster {cluster}" if cluster else "")) + connector.discover(api_client=self.api_client) + + +class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): + kind_loaders: list[BaseKindLoader] = [ + DeploymentLoader, + RolloutLoader, + DeploymentConfigLoader, + StatefulSetLoader, + DaemonSetLoader, + JobLoader, + CronJobLoader, + ] + + def __init__(self, cluster: Optional[str]) -> None: + self.cluster = cluster + + # This executor will be running requests to Kubernetes API + self.executor = ThreadPoolExecutor(settings.max_workers) + self.api_client = settings.get_kube_client(cluster) + + self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) + self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) + + self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) + self._hpa_list: dict[HPAKey, HPAData] = {} + self._workload_loaders: dict[KindLiteral, BaseKindLoader] = { + loader.kind: loader(self.api_client, self.executor) for loader in self.kind_loaders + } + + async def list_workloads(self) -> list[K8sWorkload]: + """List all scannable objects. + + Returns: + A list of scannable objects. + """ + + logger.info(f"Listing scannable objects in {self.cluster}") + logger.debug(f"Namespaces: {settings.namespaces}") + logger.debug(f"Resources: {settings.resources}") + + self._hpa_list = await self._try_list_hpa() + workload_object_lists = await asyncio.gather( + *[ + self._fetch_workload(loader) + for loader in self._workload_loaders.values() + if self._should_list_resource(loader.kind) + ] + ) + + return [ + object + for workload_objects in workload_object_lists + for object in workload_objects + # NOTE: By default we will filter out kube-system namespace + if not (settings.namespaces == "*" and object.namespace == "kube-system") + ] + + async def load_pods(self, object: K8sWorkload) -> list[PodData]: + return await self._workload_loaders[object.kind].list_pods(object) + + def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: + name = item.metadata.name + namespace = item.metadata.namespace + kind = kind or item.__class__.__name__[2:] + + obj = K8sWorkload( + cluster=self.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container.name, + allocations=ResourceAllocations.from_container(container), + hpa=self._hpa_list.get((namespace, kind, name)), + ) + obj._api_resource = item + return obj + + def _should_list_resource(self, resource: str) -> bool: + if settings.resources == "*": + return True + return resource in settings.resources + + async def _list_namespaced_or_global_objects( + self, + kind: KindLiteral, + all_namespaces_request: Callable[..., Awaitable[Any]], + namespaced_request: Callable[..., Awaitable[Any]], + ) -> list[Any]: + logger.debug(f"Listing {kind}s in {self.cluster}") + + if settings.namespaces == "*": + requests = [ + all_namespaces_request( + label_selector=settings.selector, + ) + ] + else: + requests = [ + namespaced_request( + namespace=namespace, + label_selector=settings.selector, + ) + for namespace in settings.namespaces + ] + + result = [item for request_result in await asyncio.gather(*requests) for item in request_result.items] + + logger.debug(f"Found {len(result)} {kind} in {self.cluster}") + return result + + async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]: + kind = loader.kind + + if not self._should_list_resource(kind): + logger.debug(f"Skipping {kind}s in {self.cluster}") + return + + if not self._kind_available[kind]: + return + + result = [] + try: + for item in await self._list_namespaced_or_global_objects( + kind, loader.all_namespaces_request_async, loader.namespaced_request_async + ): + if not loader.filter(item): + continue + + containers = await loader.extract_containers(item) + if asyncio.iscoroutine(containers): + containers = await containers + + result.extend(self._build_scannable_object(item, container, kind) for container in containers) + except ApiException as e: + if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: + if self._kind_available[kind]: + logger.debug(f"{kind} API not available in {self.cluster}") + self._kind_available[kind] = False + else: + logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") + logger.error("Will skip this object type and continue.") + + return result + + async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: + loop = asyncio.get_running_loop() + res = await self._list_namespaced_or_global_objects( + kind="HPA-v1", + all_namespaces_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), + ), + namespaced_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs), + ), + ) + + return { + ( + hpa.metadata.namespace, + hpa.spec.scale_target_ref.kind, + hpa.spec.scale_target_ref.name, + ): HPAData( + min_replicas=hpa.spec.min_replicas, + max_replicas=hpa.spec.max_replicas, + current_replicas=hpa.status.current_replicas, + desired_replicas=hpa.status.desired_replicas, + target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, + target_memory_utilization_percentage=None, + ) + async for hpa in res + } + + async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: + loop = asyncio.get_running_loop() + + res = await self._list_namespaced_or_global_objects( + kind="HPA-v2", + all_namespaces_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), + ), + namespaced_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs), + ), + ) + + def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: + return next( + ( + metric.resource.target.average_utilization + for metric in hpa.spec.metrics + if metric.type == "Resource" and metric.resource.name == metric_name + ), + None, + ) + + return { + ( + hpa.metadata.namespace, + hpa.spec.scale_target_ref.kind, + hpa.spec.scale_target_ref.name, + ): HPAData( + min_replicas=hpa.spec.min_replicas, + max_replicas=hpa.spec.max_replicas, + current_replicas=hpa.status.current_replicas, + desired_replicas=hpa.status.desired_replicas, + target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), + target_memory_utilization_percentage=__get_metric(hpa, "memory"), + ) + for hpa in res + } + + # TODO: What should we do in case of other metrics bound to the HPA? + async def __list_hpa(self) -> dict[HPAKey, HPAData]: + """List all HPA objects in the cluster. + + Returns: + dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name). + """ + + try: + # Try to use V2 API first + return await self.__list_hpa_v2() + except ApiException as e: + if e.status != 404: + # If the error is other than not found, then re-raise it. + raise + + # If V2 API does not exist, fall back to V1 + return await self.__list_hpa_v1() + + async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: + try: + return await self.__list_hpa() + except Exception as e: + logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") + logger.error( + "Will assume that there are no HPA. " + "Be careful as this may lead to inaccurate results if object actually has HPA." + ) + return {} + + +__all__ = ["KubeAPIWorkloadLoader", "KubeAPIClusterLoader"] diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py new file mode 100644 index 0000000..6ca6efd --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py @@ -0,0 +1,19 @@ +from .base import BaseKindLoader +from .cronjobs import CronJobLoader +from .daemonsets import DaemonSetLoader +from .deploymentconfigs import DeploymentConfigLoader +from .deployments import DeploymentLoader +from .jobs import JobLoader +from .rollouts import RolloutLoader +from .statefulsets import StatefulSetLoader + +__all__ = [ + "BaseKindLoader", + "CronJobLoader", + "DeploymentLoader", + "DaemonSetLoader", + "DeploymentConfigLoader", + "JobLoader", + "RolloutLoader", + "StatefulSetLoader", +] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py new file mode 100644 index 0000000..a6a552e --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py @@ -0,0 +1,116 @@ +import abc +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Iterable, Optional, Union + +from kubernetes import client # type: ignore +from kubernetes.client.api_client import ApiClient # type: ignore +from kubernetes.client.models import V1Container, V1PodList # type: ignore + +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +logger = logging.getLogger("krr") + +HPAKey = tuple[str, str, str] + + +class BaseKindLoader(abc.ABC): + """ + This class is used to define how to load a specific kind of Kubernetes object. + It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. + """ + + kind: KindLiteral + + def __init__(self, api_client: Optional[ApiClient], executor: ThreadPoolExecutor) -> None: + self.executor = executor + self.api_client = api_client + self.apps = client.AppsV1Api(api_client=self.api_client) + self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) + self.batch = client.BatchV1Api(api_client=self.api_client) + self.core = client.CoreV1Api(api_client=self.api_client) + + @abc.abstractmethod + def all_namespaces_request(self, label_selector: str) -> Any: + pass + + async def all_namespaces_request_async(self, label_selector: str) -> Any: + """Default async implementation executes the request in a thread pool.""" + + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.all_namespaces_request( + label_selector=label_selector, + ), + ) + + @abc.abstractmethod + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + pass + + async def namespaced_request_async(self, namespace: str, label_selector: str) -> Any: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.namespaced_request( + namespace=namespace, + label_selector=label_selector, + ), + ) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + return item.spec.template.spec.containers + + def filter(self, item: Any) -> bool: + return True + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + loop = asyncio.get_running_loop() + + if object.selector is None: + return [] + + selector = self._build_selector_query(object.selector) + if selector is None: + return [] + + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + + @classmethod + def _get_match_expression_filter(cls, expression: Any) -> str: + if expression.operator.lower() == "exists": + return expression.key + elif expression.operator.lower() == "doesnotexist": + return f"!{expression.key}" + + values = ",".join(expression.values) + return f"{expression.key} {expression.operator} ({values})" + + @classmethod + def _build_selector_query(cls, selector: Any) -> Union[str, None]: + label_filters = [] + + if selector.match_labels is not None: + label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] + + if selector.match_expressions is not None: + label_filters += [cls._get_match_expression_filter(expression) for expression in selector.match_expressions] + + if label_filters == []: + # NOTE: This might mean that we have DeploymentConfig, + # which uses ReplicationController and it has a dict like matchLabels + if len(selector) != 0: + label_filters += [f"{label[0]}={label[1]}" for label in selector.items()] + else: + return None + + return ",".join(label_filters) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py new file mode 100644 index 0000000..d999c65 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py @@ -0,0 +1,68 @@ +import asyncio +from collections import defaultdict +import logging +from typing import Any, Iterable + +from kubernetes.client.models import V1Container, V1Job, V1PodList # type: ignore + +from robusta_krr.core.models.objects import K8sWorkload, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class CronJobLoader(BaseKindLoader): + kind = "CronJob" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self._jobs: dict[str, list[V1Job]] = {} + self._jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.batch.list_cron_job_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.batch.list_namespaced_cron_job(namespace=namespace, label_selector=label_selector, watch=False) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + return item.spec.job_template.spec.template.spec.containers + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + loop = asyncio.get_running_loop() + + namespace_jobs = await self._list_jobs(object.namespace) + ownered_jobs_uids = [ + job.metadata.uid + for job in namespace_jobs + if any( + owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid + for owner in job.metadata.owner_references or [] + ) + ] + selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" + + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + + async def _list_jobs(self, namespace: str) -> list[V1Job]: + if namespace not in self._jobs: + loop = asyncio.get_running_loop() + + async with self._jobs_loading_locks[namespace]: + logging.debug(f"Loading jobs for cronjobs in {namespace}") + ret = await loop.run_in_executor( + self.executor, + lambda: self.batch.list_namespaced_job(namespace=namespace), + ) + self._jobs[namespace] = ret.items + + return self._jobs[namespace] diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py new file mode 100644 index 0000000..1005bed --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class DaemonSetLoader(BaseKindLoader): + kind = "DaemonSet" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_daemon_set_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_daemon_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py new file mode 100644 index 0000000..33b64d9 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py @@ -0,0 +1,38 @@ +import logging +from typing import Any + +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class DeploymentConfigLoader(BaseKindLoader): + kind = "DeploymentConfig" + + # NOTE: Using custom objects API returns dicts, but all other APIs return objects + # We need to handle this difference using a small wrapper + + def all_namespaces_request(self, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_cluster_custom_object( + group="apps.openshift.io", + version="v1", + plural="deploymentconfigs", + label_selector=label_selector, + watch=False, + ) + ) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_namespaced_custom_object( + group="apps.openshift.io", + version="v1", + plural="deploymentconfigs", + namespace=namespace, + label_selector=label_selector, + watch=False, + ) + ) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py new file mode 100644 index 0000000..fc202b5 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class DeploymentLoader(BaseKindLoader): + kind = "Deployment" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_deployment_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_deployment(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py new file mode 100644 index 0000000..87ce5a7 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py @@ -0,0 +1,18 @@ +from typing import Any + +from .base import BaseKindLoader + + +class JobLoader(BaseKindLoader): + kind = "Job" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.batch.list_job_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.batch.list_namespaced_job(namespace=namespace, label_selector=label_selector, watch=False) + + def filter(self, item: Any) -> bool: + # NOTE: If the job has ownerReference and it is a CronJob, + # then we should skip it, as it is a part of the CronJob + return not any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py new file mode 100644 index 0000000..0148745 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py @@ -0,0 +1,61 @@ +import asyncio +import logging +from typing import Any, Iterable + +from kubernetes.client.models import V1Container # type: ignore + +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class RolloutLoader(BaseKindLoader): + kind = "Rollout" + + # NOTE: Using custom objects API returns dicts, but all other APIs return objects + # We need to handle this difference using a small wrapper + + def all_namespaces_request(self, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_cluster_custom_object( + group="argoproj.io", + version="v1alpha1", + plural="rollouts", + label_selector=label_selector, + watch=False, + ) + ) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + plural="rollouts", + namespace=namespace, + label_selector=label_selector, + watch=False, + ) + ) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + if item.spec.template is not None: + return item.spec.template.spec.containers + + logging.debug( + f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}" + ) + + # Template can be None and object might have workloadRef + workloadRef = item.spec.workloadRef + if workloadRef is not None: + loop = asyncio.get_running_loop() + ret = await loop.run_in_executor( + self.executor, + lambda: self.apps.read_namespaced_deployment(namespace=item.metadata.namespace, name=workloadRef.name), + ) + return ret.spec.template.spec.containers + + return [] diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py new file mode 100644 index 0000000..069f2c6 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class StatefulSetLoader(BaseKindLoader): + kind = "StatefulSet" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_stateful_set_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_stateful_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py deleted file mode 100644 index 2cad0cc..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from .base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader -from .kube_api import KubeAPIWorkloadLoader, KubeAPIClusterLoader -from .prometheus import PrometheusWorkloadLoader, PrometheusClusterLoader - -__all__ = [ - "BaseWorkloadLoader", - "IListPodsFallback", - "KubeAPIWorkloadLoader", - "PrometheusWorkloadLoader", - "BaseClusterLoader", - "KubeAPIClusterLoader", - "PrometheusClusterLoader", -] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py deleted file mode 100644 index 316bcbd..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py +++ /dev/null @@ -1,57 +0,0 @@ -import abc -import logging - -from typing import Optional, Union -from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService -from robusta_krr.core.models.objects import K8sWorkload, PodData - - -logger = logging.getLogger("krr") - - -class BaseWorkloadLoader(abc.ABC): - """A base class for single cluster workload loaders.""" - - @abc.abstractmethod - async def list_workloads(self) -> list[K8sWorkload]: - pass - - -class IListPodsFallback(abc.ABC): - """This is an interface that a workload loader can implement to have a fallback method to list pods.""" - - @abc.abstractmethod - async def list_pods(self, object: K8sWorkload) -> list[PodData]: - pass - - -class BaseClusterLoader(abc.ABC): - """ - A class that wraps loading data from multiple clusters. - For example, a centralized prometheus server that can query multiple clusters. - Or one kubeconfig can define connections to multiple clusters. - """ - - def __init__(self) -> None: - self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {} - self._connector_errors: set[Exception] = set() - - @abc.abstractmethod - async def list_clusters(self) -> Optional[list[str]]: - pass - - @abc.abstractmethod - async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader: - pass - - def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusMetricsService: - """ - Connect to a Prometheus server and return a PrometheusConnector instance. - Cluster = None means that prometheus is the only one: either centralized or in-cluster. - """ - - if cluster not in self._prometheus_connectors: - self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster) - - return self._prometheus_connectors[cluster] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py deleted file mode 100644 index 19cd108..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py +++ /dev/null @@ -1,323 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Awaitable, Callable, Optional - -from kubernetes import client, config # type: ignore -from kubernetes.client import ApiException # type: ignore -from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore - -from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData -from robusta_krr.core.models.result import ResourceAllocations - -from ..base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader -from .loaders import ( - BaseKindLoader, - CronJobLoader, - DaemonSetLoader, - DeploymentConfigLoader, - DeploymentLoader, - JobLoader, - RolloutLoader, - StatefulSetLoader, -) - -logger = logging.getLogger("krr") - -HPAKey = tuple[str, str, str] - - -class KubeAPIClusterLoader(BaseClusterLoader): - # NOTE: For KubeAPIClusterLoader we have to first connect to read kubeconfig - # We do not need to connect to Prometheus from here, as we query all data from Kubernetes API - # Also here we might have different Prometeus instances for different clusters - - def __init__(self) -> None: - self.api_client = settings.get_kube_client() - - async def list_clusters(self) -> Optional[list[str]]: - if settings.inside_cluster: - logger.debug("Working inside the cluster") - return None - - try: - contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) - except config.ConfigException: - if settings.clusters is not None and settings.clusters != "*": - logger.warning("Could not load context from kubeconfig.") - logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") - return settings.clusters - else: - logger.error( - "Could not load context from kubeconfig. " - "Please check your kubeconfig file or pass -c flag with the context name." - ) - return None - - logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") - logger.debug(f"Current cluster: {current_context['name']}") - logger.debug(f"Configured clusters: {settings.clusters}") - - # None, empty means current cluster - if not settings.clusters: - return [current_context["name"]] - - # * means all clusters - if settings.clusters == "*": - return [context["name"] for context in contexts] - - return [context["name"] for context in contexts if context["name"] in settings.clusters] - - async def connect_cluster(self, cluster: str) -> KubeAPIWorkloadLoader: - return KubeAPIWorkloadLoader(cluster) - - -class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): - kind_loaders: list[BaseKindLoader] = [ - DeploymentLoader, - RolloutLoader, - DeploymentConfigLoader, - StatefulSetLoader, - DaemonSetLoader, - JobLoader, - CronJobLoader, - ] - - def __init__(self, cluster: Optional[str]) -> None: - self.cluster = cluster - - # This executor will be running requests to Kubernetes API - self.executor = ThreadPoolExecutor(settings.max_workers) - self.api_client = settings.get_kube_client(cluster) - - self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) - self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) - - self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) - self._hpa_list: dict[HPAKey, HPAData] = {} - self._workload_loaders: dict[KindLiteral, BaseKindLoader] = { - loader.kind: loader(self.api_client, self.executor) for loader in self.kind_loaders - } - - async def list_workloads(self) -> list[K8sWorkload]: - """List all scannable objects. - - Returns: - A list of scannable objects. - """ - - logger.info(f"Listing scannable objects in {self.cluster}") - logger.debug(f"Namespaces: {settings.namespaces}") - logger.debug(f"Resources: {settings.resources}") - - self._hpa_list = await self._try_list_hpa() - workload_object_lists = await asyncio.gather( - *[ - self._fetch_workload(loader) - for loader in self._workload_loaders.values() - if self._should_list_resource(loader.kind) - ] - ) - - return [ - object - for workload_objects in workload_object_lists - for object in workload_objects - # NOTE: By default we will filter out kube-system namespace - if not (settings.namespaces == "*" and object.namespace == "kube-system") - ] - - async def list_pods(self, object: K8sWorkload) -> list[PodData]: - return await self._workload_loaders[object.kind].list_pods(object) - - def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: - name = item.metadata.name - namespace = item.metadata.namespace - kind = kind or item.__class__.__name__[2:] - - obj = K8sWorkload( - cluster=self.cluster, - namespace=namespace, - name=name, - kind=kind, - container=container.name, - allocations=ResourceAllocations.from_container(container), - hpa=self._hpa_list.get((namespace, kind, name)), - ) - obj._api_resource = item - return obj - - def _should_list_resource(self, resource: str) -> bool: - if settings.resources == "*": - return True - return resource in settings.resources - - async def _list_namespaced_or_global_objects( - self, - kind: KindLiteral, - all_namespaces_request: Callable[..., Awaitable[Any]], - namespaced_request: Callable[..., Awaitable[Any]], - ) -> list[Any]: - logger.debug(f"Listing {kind}s in {self.cluster}") - - if settings.namespaces == "*": - requests = [ - all_namespaces_request( - label_selector=settings.selector, - ) - ] - else: - requests = [ - namespaced_request( - namespace=namespace, - label_selector=settings.selector, - ) - for namespace in settings.namespaces - ] - - result = [item for request_result in await asyncio.gather(*requests) for item in request_result.items] - - logger.debug(f"Found {len(result)} {kind} in {self.cluster}") - return result - - async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]: - kind = loader.kind - - if not self._should_list_resource(kind): - logger.debug(f"Skipping {kind}s in {self.cluster}") - return - - if not self._kind_available[kind]: - return - - result = [] - try: - for item in await self._list_namespaced_or_global_objects( - kind, loader.all_namespaces_request_async, loader.namespaced_request_async - ): - if not loader.filter(item): - continue - - containers = await loader.extract_containers(item) - if asyncio.iscoroutine(containers): - containers = await containers - - result.extend(self._build_scannable_object(item, container, kind) for container in containers) - except ApiException as e: - if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: - if self._kind_available[kind]: - logger.debug(f"{kind} API not available in {self.cluster}") - self._kind_available[kind] = False - else: - logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") - logger.error("Will skip this object type and continue.") - - return result - - async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: - loop = asyncio.get_running_loop() - res = await self._list_namespaced_or_global_objects( - kind="HPA-v1", - all_namespaces_request=lambda **kwargs: loop.run_in_executor( - self.executor, - lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), - ), - namespaced_request=lambda **kwargs: loop.run_in_executor( - self.executor, - lambda: self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs), - ), - ) - - return { - ( - hpa.metadata.namespace, - hpa.spec.scale_target_ref.kind, - hpa.spec.scale_target_ref.name, - ): HPAData( - min_replicas=hpa.spec.min_replicas, - max_replicas=hpa.spec.max_replicas, - current_replicas=hpa.status.current_replicas, - desired_replicas=hpa.status.desired_replicas, - target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, - target_memory_utilization_percentage=None, - ) - async for hpa in res - } - - async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: - loop = asyncio.get_running_loop() - - res = await self._list_namespaced_or_global_objects( - kind="HPA-v2", - all_namespaces_request=lambda **kwargs: loop.run_in_executor( - self.executor, - lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), - ), - namespaced_request=lambda **kwargs: loop.run_in_executor( - self.executor, - lambda: self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs), - ), - ) - - def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: - return next( - ( - metric.resource.target.average_utilization - for metric in hpa.spec.metrics - if metric.type == "Resource" and metric.resource.name == metric_name - ), - None, - ) - - return { - ( - hpa.metadata.namespace, - hpa.spec.scale_target_ref.kind, - hpa.spec.scale_target_ref.name, - ): HPAData( - min_replicas=hpa.spec.min_replicas, - max_replicas=hpa.spec.max_replicas, - current_replicas=hpa.status.current_replicas, - desired_replicas=hpa.status.desired_replicas, - target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), - target_memory_utilization_percentage=__get_metric(hpa, "memory"), - ) - for hpa in res - } - - # TODO: What should we do in case of other metrics bound to the HPA? - async def __list_hpa(self) -> dict[HPAKey, HPAData]: - """List all HPA objects in the cluster. - - Returns: - dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name). - """ - - try: - # Try to use V2 API first - return await self.__list_hpa_v2() - except ApiException as e: - if e.status != 404: - # If the error is other than not found, then re-raise it. - raise - - # If V2 API does not exist, fall back to V1 - return await self.__list_hpa_v1() - - async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: - try: - return await self.__list_hpa() - except Exception as e: - logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") - logger.error( - "Will assume that there are no HPA. " - "Be careful as this may lead to inaccurate results if object actually has HPA." - ) - return {} - - -__all__ = ["KubeAPIWorkloadLoader"] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py deleted file mode 100644 index 6ca6efd..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -from .base import BaseKindLoader -from .cronjobs import CronJobLoader -from .daemonsets import DaemonSetLoader -from .deploymentconfigs import DeploymentConfigLoader -from .deployments import DeploymentLoader -from .jobs import JobLoader -from .rollouts import RolloutLoader -from .statefulsets import StatefulSetLoader - -__all__ = [ - "BaseKindLoader", - "CronJobLoader", - "DeploymentLoader", - "DaemonSetLoader", - "DeploymentConfigLoader", - "JobLoader", - "RolloutLoader", - "StatefulSetLoader", -] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py deleted file mode 100644 index a6a552e..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py +++ /dev/null @@ -1,116 +0,0 @@ -import abc -import asyncio -import logging -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Iterable, Optional, Union - -from kubernetes import client # type: ignore -from kubernetes.client.api_client import ApiClient # type: ignore -from kubernetes.client.models import V1Container, V1PodList # type: ignore - -from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData - -logger = logging.getLogger("krr") - -HPAKey = tuple[str, str, str] - - -class BaseKindLoader(abc.ABC): - """ - This class is used to define how to load a specific kind of Kubernetes object. - It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. - """ - - kind: KindLiteral - - def __init__(self, api_client: Optional[ApiClient], executor: ThreadPoolExecutor) -> None: - self.executor = executor - self.api_client = api_client - self.apps = client.AppsV1Api(api_client=self.api_client) - self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) - self.batch = client.BatchV1Api(api_client=self.api_client) - self.core = client.CoreV1Api(api_client=self.api_client) - - @abc.abstractmethod - def all_namespaces_request(self, label_selector: str) -> Any: - pass - - async def all_namespaces_request_async(self, label_selector: str) -> Any: - """Default async implementation executes the request in a thread pool.""" - - loop = asyncio.get_running_loop() - return await loop.run_in_executor( - self.executor, - lambda: self.all_namespaces_request( - label_selector=label_selector, - ), - ) - - @abc.abstractmethod - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - pass - - async def namespaced_request_async(self, namespace: str, label_selector: str) -> Any: - loop = asyncio.get_running_loop() - return await loop.run_in_executor( - self.executor, - lambda: self.namespaced_request( - namespace=namespace, - label_selector=label_selector, - ), - ) - - async def extract_containers(self, item: Any) -> Iterable[V1Container]: - return item.spec.template.spec.containers - - def filter(self, item: Any) -> bool: - return True - - async def list_pods(self, object: K8sWorkload) -> list[PodData]: - loop = asyncio.get_running_loop() - - if object.selector is None: - return [] - - selector = self._build_selector_query(object.selector) - if selector is None: - return [] - - ret: V1PodList = await loop.run_in_executor( - self.executor, - lambda: self.core.list_namespaced_pod( - namespace=object._api_resource.metadata.namespace, label_selector=selector - ), - ) - - return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] - - @classmethod - def _get_match_expression_filter(cls, expression: Any) -> str: - if expression.operator.lower() == "exists": - return expression.key - elif expression.operator.lower() == "doesnotexist": - return f"!{expression.key}" - - values = ",".join(expression.values) - return f"{expression.key} {expression.operator} ({values})" - - @classmethod - def _build_selector_query(cls, selector: Any) -> Union[str, None]: - label_filters = [] - - if selector.match_labels is not None: - label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] - - if selector.match_expressions is not None: - label_filters += [cls._get_match_expression_filter(expression) for expression in selector.match_expressions] - - if label_filters == []: - # NOTE: This might mean that we have DeploymentConfig, - # which uses ReplicationController and it has a dict like matchLabels - if len(selector) != 0: - label_filters += [f"{label[0]}={label[1]}" for label in selector.items()] - else: - return None - - return ",".join(label_filters) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py deleted file mode 100644 index d999c65..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -from collections import defaultdict -import logging -from typing import Any, Iterable - -from kubernetes.client.models import V1Container, V1Job, V1PodList # type: ignore - -from robusta_krr.core.models.objects import K8sWorkload, PodData - -from .base import BaseKindLoader - -logger = logging.getLogger("krr") - - -class CronJobLoader(BaseKindLoader): - kind = "CronJob" - - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - - self._jobs: dict[str, list[V1Job]] = {} - self._jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) - - def all_namespaces_request(self, label_selector: str) -> Any: - return self.batch.list_cron_job_for_all_namespaces(label_selector=label_selector, watch=False) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return self.batch.list_namespaced_cron_job(namespace=namespace, label_selector=label_selector, watch=False) - - async def extract_containers(self, item: Any) -> Iterable[V1Container]: - return item.spec.job_template.spec.template.spec.containers - - async def list_pods(self, object: K8sWorkload) -> list[PodData]: - loop = asyncio.get_running_loop() - - namespace_jobs = await self._list_jobs(object.namespace) - ownered_jobs_uids = [ - job.metadata.uid - for job in namespace_jobs - if any( - owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid - for owner in job.metadata.owner_references or [] - ) - ] - selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" - - ret: V1PodList = await loop.run_in_executor( - self.executor, - lambda: self.core.list_namespaced_pod( - namespace=object._api_resource.metadata.namespace, label_selector=selector - ), - ) - - return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] - - async def _list_jobs(self, namespace: str) -> list[V1Job]: - if namespace not in self._jobs: - loop = asyncio.get_running_loop() - - async with self._jobs_loading_locks[namespace]: - logging.debug(f"Loading jobs for cronjobs in {namespace}") - ret = await loop.run_in_executor( - self.executor, - lambda: self.batch.list_namespaced_job(namespace=namespace), - ) - self._jobs[namespace] = ret.items - - return self._jobs[namespace] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py deleted file mode 100644 index 1005bed..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import Any - -from .base import BaseKindLoader - - -class DaemonSetLoader(BaseKindLoader): - kind = "DaemonSet" - - def all_namespaces_request(self, label_selector: str) -> Any: - return self.apps.list_daemon_set_for_all_namespaces(label_selector=label_selector, watch=False) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return self.apps.list_namespaced_daemon_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py deleted file mode 100644 index 33b64d9..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -from typing import Any - -from robusta_krr.utils.object_like_dict import ObjectLikeDict - -from .base import BaseKindLoader - -logger = logging.getLogger("krr") - - -class DeploymentConfigLoader(BaseKindLoader): - kind = "DeploymentConfig" - - # NOTE: Using custom objects API returns dicts, but all other APIs return objects - # We need to handle this difference using a small wrapper - - def all_namespaces_request(self, label_selector: str) -> Any: - return ObjectLikeDict( - self.custom_objects.list_cluster_custom_object( - group="apps.openshift.io", - version="v1", - plural="deploymentconfigs", - label_selector=label_selector, - watch=False, - ) - ) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return ObjectLikeDict( - self.custom_objects.list_namespaced_custom_object( - group="apps.openshift.io", - version="v1", - plural="deploymentconfigs", - namespace=namespace, - label_selector=label_selector, - watch=False, - ) - ) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py deleted file mode 100644 index fc202b5..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import Any - -from .base import BaseKindLoader - - -class DeploymentLoader(BaseKindLoader): - kind = "Deployment" - - def all_namespaces_request(self, label_selector: str) -> Any: - return self.apps.list_deployment_for_all_namespaces(label_selector=label_selector, watch=False) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return self.apps.list_namespaced_deployment(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py deleted file mode 100644 index 87ce5a7..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py +++ /dev/null @@ -1,18 +0,0 @@ -from typing import Any - -from .base import BaseKindLoader - - -class JobLoader(BaseKindLoader): - kind = "Job" - - def all_namespaces_request(self, label_selector: str) -> Any: - return self.batch.list_job_for_all_namespaces(label_selector=label_selector, watch=False) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return self.batch.list_namespaced_job(namespace=namespace, label_selector=label_selector, watch=False) - - def filter(self, item: Any) -> bool: - # NOTE: If the job has ownerReference and it is a CronJob, - # then we should skip it, as it is a part of the CronJob - return not any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py deleted file mode 100644 index 0148745..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py +++ /dev/null @@ -1,61 +0,0 @@ -import asyncio -import logging -from typing import Any, Iterable - -from kubernetes.client.models import V1Container # type: ignore - -from robusta_krr.utils.object_like_dict import ObjectLikeDict - -from .base import BaseKindLoader - -logger = logging.getLogger("krr") - - -class RolloutLoader(BaseKindLoader): - kind = "Rollout" - - # NOTE: Using custom objects API returns dicts, but all other APIs return objects - # We need to handle this difference using a small wrapper - - def all_namespaces_request(self, label_selector: str) -> Any: - return ObjectLikeDict( - self.custom_objects.list_cluster_custom_object( - group="argoproj.io", - version="v1alpha1", - plural="rollouts", - label_selector=label_selector, - watch=False, - ) - ) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return ObjectLikeDict( - self.custom_objects.list_namespaced_custom_object( - group="argoproj.io", - version="v1alpha1", - plural="rollouts", - namespace=namespace, - label_selector=label_selector, - watch=False, - ) - ) - - async def extract_containers(self, item: Any) -> Iterable[V1Container]: - if item.spec.template is not None: - return item.spec.template.spec.containers - - logging.debug( - f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}" - ) - - # Template can be None and object might have workloadRef - workloadRef = item.spec.workloadRef - if workloadRef is not None: - loop = asyncio.get_running_loop() - ret = await loop.run_in_executor( - self.executor, - lambda: self.apps.read_namespaced_deployment(namespace=item.metadata.namespace, name=workloadRef.name), - ) - return ret.spec.template.spec.containers - - return [] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py deleted file mode 100644 index 069f2c6..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import Any - -from .base import BaseKindLoader - - -class StatefulSetLoader(BaseKindLoader): - kind = "StatefulSet" - - def all_namespaces_request(self, label_selector: str) -> Any: - return self.apps.list_stateful_set_for_all_namespaces(label_selector=label_selector, watch=False) - - def namespaced_request(self, namespace: str, label_selector: str) -> Any: - return self.apps.list_namespaced_stateful_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py deleted file mode 100644 index 6796d20..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py +++ /dev/null @@ -1,58 +0,0 @@ -import asyncio -import itertools -import logging - -from collections import Counter - -from pyparsing import Optional - - -from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sWorkload -from ..base import BaseWorkloadLoader, BaseClusterLoader -from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader - - -logger = logging.getLogger("krr") - - -class PrometheusClusterLoader(BaseClusterLoader): - # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it - - def __init__(self) -> None: - super().__init__() - self.prometheus_connector = super().connect_prometheus() - - async def list_clusters(self) -> list[str]: - return [] - - async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader: - return PrometheusWorkloadLoader(cluster, self.prometheus_connector) - - def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusConnector: - # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters - # so in case of multiple clusters in one Prometheus (centralized version) - # for each cluster we will have the same PrometheusConnector (keyed by None) - return self.prometheus_connector - -class PrometheusWorkloadLoader(BaseWorkloadLoader): - workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader] - - def __init__(self, cluster: str, prometheus_connector: PrometheusConnector) -> None: - self.cluster = cluster - self.metric_service = prometheus_connector - self.loaders = [loader(prometheus_connector) for loader in self.workloads] - - async def list_workloads(self) -> list[K8sWorkload]: - workloads = list( - itertools.chain( - *await asyncio.gather(*[loader.list_workloads(settings.namespaces) for loader in self.loaders]) - ) - ) - - kind_counts = Counter([workload.kind for workload in workloads]) - for kind, count in kind_counts.items(): - logger.info(f"Found {count} {kind} in {self.cluster}") - - return workloads diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py deleted file mode 100644 index 18b9a3d..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .base import BaseKindLoader -from .double_parent import DoubleParentLoader -from .simple_parent import SimpleParentLoader - -__all__ = [ - "BaseKindLoader", - "DoubleParentLoader", - "SimpleParentLoader", -] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py deleted file mode 100644 index 9534261..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py +++ /dev/null @@ -1,104 +0,0 @@ -import abc -import asyncio -from collections import defaultdict -import logging -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Iterable, Literal, Optional, Union - -from kubernetes import client # type: ignore -from kubernetes.client.api_client import ApiClient # type: ignore -from kubernetes.client.models import ( # type: ignore - V1Container, - V1DaemonSet, - V1Deployment, - V1Job, - V1Pod, - V1PodList, - V1StatefulSet, -) -from robusta_krr.core.models.config import settings - -from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric -from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType -from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData - -logger = logging.getLogger("krr") - -AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] -HPAKey = tuple[str, str, str] - - -class BaseKindLoader(abc.ABC): - """ - This class is used to define how to load a specific kind of Kubernetes object. - It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. - """ - - kinds: list[KindLiteral] = [] - - def __init__(self, prometheus: PrometheusConnector) -> None: - self.prometheus = prometheus - self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label() - - @property - def kinds_to_scan(self) -> list[KindLiteral]: - return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds - - @abc.abstractmethod - def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: - pass - - async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations: - limits = await self.prometheus.loader.query( - f""" - avg by(resource) ( - kube_pod_container_resource_limits{{ - namespace="{namespace}", - pod=~"{'|'.join(pods)}", - container="{container_name}" - {self.cluster_selector} - }} - ) - """ - ) - requests = await self.prometheus.loader.query( - f""" - avg by(resource) ( - kube_pod_container_resource_requests{{ - namespace="{namespace}", - pod=~"{'|'.join(pods)}", - container="{container_name}" - {self.cluster_selector} - }} - ) - """ - ) - requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} - limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} - for limit in limits: - if limit["metric"]["resource"] == ResourceType.CPU: - limits_values[ResourceType.CPU] = float(limit["value"][1]) - elif limit["metric"]["resource"] == ResourceType.Memory: - limits_values[ResourceType.Memory] = float(limit["value"][1]) - - for request in requests: - if request["metric"]["resource"] == ResourceType.CPU: - requests_values[ResourceType.CPU] = float(request["value"][1]) - elif request["metric"]["resource"] == ResourceType.Memory: - requests_values[ResourceType.Memory] = float(request["value"][1]) - return ResourceAllocations(requests=requests_values, limits=limits_values) - - async def _list_containers_in_pods(self, pods: list[str]) -> set[str]: - containers = await self.prometheus.loader.query( - f""" - count by (container) ( - kube_pod_container_info{{ - pod=~"{'|'.join(pods)}" - {self.cluster_selector} - }} - ) - """ - ) - - return {container["metric"]["container"] for container in containers} diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py deleted file mode 100644 index e757af4..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging -from collections import defaultdict -import itertools -import asyncio -from typing import Literal, Union - -from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData - -from .base import BaseKindLoader - -logger = logging.getLogger("krr") - -SubownerLiteral = Literal["ReplicaSet", "ReplicationController", "Job"] - - -class DoubleParentLoader(BaseKindLoader): - kinds = ["Deployment", "Rollout", "DeploymentConfig", "CronJob"] - - kind_subowner_map: dict[KindLiteral, SubownerLiteral] = { - "Deployment": "ReplicaSet", - "Rollout": "ReplicaSet", - "DeploymentConfig": "ReplicationController", - "CronJob": "Job", - } - - async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: - return list( - itertools.chain( - *await asyncio.gather( - *[ - self.list_workloads_by_subowner(namespaces, subowner) - for subowner in set(self.kind_subowner_map.values()) - ] - ) - ) - ) - - async def list_workloads_by_subowner( - self, namespaces: Union[list[str], Literal["*"]], subowner_kind: SubownerLiteral - ) -> list[K8sWorkload]: - kinds = [kind for kind in self.kinds_to_scan if self.kind_subowner_map[kind] == subowner_kind] - - if kinds == []: - return [] - - logger.debug(f"Listing {', '.join(kinds)}") - # NOTE: kube-system is excluded if we scan all namespaces - namespace_selector = ( - ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' - ) - - metric_name = f"kube_{subowner_kind.lower()}_owner" - subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name" - - # Replica is for ReplicaSet and/or ReplicationController - subowners = await self.prometheus.loader.query( - f""" - count by (namespace, owner_name, {subowner_label}, owner_kind) ( - {metric_name} {{ - {namespace_selector}, - owner_kind=~"{'|'.join(kinds)}" - {self.cluster_selector} - }} - ) - """ - ) - # groupBy: (namespace, owner_name, owner_kind) => [replicaset,...] - replicas_by_owner = defaultdict(list) - for subowner in subowners: - metric = subowner["metric"] - key = metric["namespace"], metric["owner_name"], metric["owner_kind"] - replicas_by_owner[key].append(metric[subowner_label]) - - return list( - itertools.chain( - *await asyncio.gather( - *[ - self._list_pods_of_subowner( - namespace, - name, - kind, - subowner_kind, - subowners, - ) - for (namespace, name, kind), subowners in replicas_by_owner.items() - ] - ) - ) - ) - - async def _list_pods_of_subowner( - self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str] - ) -> list[K8sWorkload]: - pods = await self.prometheus.loader.query( - f""" - count by (namespace, owner_name, owner_kind, pod) ( - kube_pod_owner{{ - namespace="{namespace}", - owner_name=~"{'|'.join(subowner_names)}", - owner_kind="{subowner_kind}" - {self.cluster_selector} - }} - ) - """ - ) - if pods is None or len(pods) == 0: - return [] - - pod_names = [pod["metric"]["pod"] for pod in pods] - containers = await self._list_containers_in_pods(pod_names) - - return [ - K8sWorkload( - cluster=self.prometheus.cluster, - namespace=namespace, - name=name, - kind=kind, - container=container_name, - allocations=await self._parse_allocation(namespace, pod_names, container_name), # find - pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods - ) - for container_name in containers - ] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py deleted file mode 100644 index 233ebb2..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py +++ /dev/null @@ -1,85 +0,0 @@ -import asyncio -from collections import defaultdict -import logging -from typing import Literal, Union - -from robusta_krr.core.models.objects import K8sWorkload, PodData - -from .base import BaseKindLoader - -logger = logging.getLogger("krr") - - -class SimpleParentLoader(BaseKindLoader): - kinds = ["DaemonSet", "StatefulSet", "Job"] - - async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: - if self.kinds_to_scan == []: - return [] - - logger.debug(f"Listing {', '.join(self.kinds_to_scan)}") - namespace_selector = ( - ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' - ) - - results = await self.prometheus.loader.query( - f""" - count by (namespace, owner_name, owner_kind, pod) ( - kube_pod_owner{{ - {namespace_selector}, - owner_kind=~"{'|'.join(self.kinds_to_scan)}" - {self.cluster_selector} - }} - ) - """ - ) - if results is None or len(results) == 0: - return [] - - # groupBy: (namespace, owner_name, owner_kind) => [pod, ... ] - workloads: defaultdict[tuple[str, str, str], list[str]] = defaultdict(list) - for result in results: - metric = result["metric"] - key = metric["namespace"], metric["owner_name"], metric["owner_kind"] - workloads[key].append(metric["pod"]) - - # NOTE: We do not show jobs that are a part of a cronjob, so we filter them out - job_workloads = [name for (_, name, kind) in workloads if kind == "Job"] - if job_workloads != []: - cronjobs = await self.prometheus.loader.query( - f""" - count by (namespace, job_name) ( - kube_job_owner{{ - {namespace_selector}, - owner_kind="CronJob" - {self.cluster_selector} - }} - ) - """ - ) - for cronjob in cronjobs: - metric = cronjob["metric"] - key = (metric["namespace"], metric["job_name"], "Job") - if key in workloads: - del workloads[key] - - workloads_containers = dict( - zip( - workloads.keys(), - await asyncio.gather(*[self._list_containers_in_pods(pods) for pods in workloads.values()]), - ) - ) - - return [ - K8sWorkload( - cluster=self.prometheus.cluster, - namespace=namespace, - name=name, - kind=kind, - container=container, - allocations=await self._parse_allocation(namespace, pod_names, container), # find - pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods - ) - for (namespace, name, kind), pod_names in workloads.items() - for container in workloads_containers[namespace, name, kind] - ] diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py new file mode 100644 index 0000000..c073ab5 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +import itertools +import logging + +from collections import Counter + +from typing import Optional +from functools import cache + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.objects import K8sWorkload +from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader +from robusta_krr.core.models.exceptions import CriticalRunnerException +from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader + + +logger = logging.getLogger("krr") + + +class PrometheusClusterLoader(BaseClusterLoader): + # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it + + def __init__(self) -> None: + self._prometheus_connector = PrometheusConnector() + if not settings.prometheus_url: + raise CriticalRunnerException( + "Prometheus URL is not provided. " + "Can not auto-discover Prometheus with `--mode prometheus`. " + "Please provide the URL with `--prometheus-url` flag." + ) + + self._prometheus_connector.connect(settings.prometheus_url) + + async def list_clusters(self) -> Optional[list[str]]: + if settings.prometheus_cluster_label is None: + return None + + # TODO: We can try to auto-discover clusters by querying Prometheus, + # but for that we will need to rework PrometheusMetric.get_prometheus_cluster_label + return [settings.prometheus_cluster_label] + + @cache + def get_workload_loader(self, cluster: str) -> PrometheusWorkloadLoader: + return PrometheusWorkloadLoader(cluster, self._prometheus_connector) + + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters + # so in case of multiple clusters in one Prometheus (centralized version) + # for each cluster we will have the same PrometheusConnector (keyed by None) + return self._prometheus_connector + + +class PrometheusWorkloadLoader(BaseWorkloadLoader): + workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader] + + def __init__(self, cluster: str, prometheus_connector: PrometheusConnector) -> None: + self.cluster = cluster + self.metric_service = prometheus_connector + self.loaders = [loader(prometheus_connector) for loader in self.workloads] + + async def list_workloads(self) -> list[K8sWorkload]: + workloads = list( + itertools.chain( + *await asyncio.gather(*[loader.list_workloads(settings.namespaces) for loader in self.loaders]) + ) + ) + + kind_counts = Counter([workload.kind for workload in workloads]) + for kind, count in kind_counts.items(): + logger.info(f"Found {count} {kind} in {self.cluster}") + + return workloads + +__all__ = ["PrometheusClusterLoader", "PrometheusWorkloadLoader"] \ No newline at end of file diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py new file mode 100644 index 0000000..18b9a3d --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py @@ -0,0 +1,9 @@ +from .base import BaseKindLoader +from .double_parent import DoubleParentLoader +from .simple_parent import SimpleParentLoader + +__all__ = [ + "BaseKindLoader", + "DoubleParentLoader", + "SimpleParentLoader", +] \ No newline at end of file diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py new file mode 100644 index 0000000..9534261 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py @@ -0,0 +1,104 @@ +import abc +import asyncio +from collections import defaultdict +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Iterable, Literal, Optional, Union + +from kubernetes import client # type: ignore +from kubernetes.client.api_client import ApiClient # type: ignore +from kubernetes.client.models import ( # type: ignore + V1Container, + V1DaemonSet, + V1Deployment, + V1Job, + V1Pod, + V1PodList, + V1StatefulSet, +) +from robusta_krr.core.models.config import settings + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric +from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +logger = logging.getLogger("krr") + +AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] +HPAKey = tuple[str, str, str] + + +class BaseKindLoader(abc.ABC): + """ + This class is used to define how to load a specific kind of Kubernetes object. + It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. + """ + + kinds: list[KindLiteral] = [] + + def __init__(self, prometheus: PrometheusConnector) -> None: + self.prometheus = prometheus + self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label() + + @property + def kinds_to_scan(self) -> list[KindLiteral]: + return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds + + @abc.abstractmethod + def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + pass + + async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations: + limits = await self.prometheus.loader.query( + f""" + avg by(resource) ( + kube_pod_container_resource_limits{{ + namespace="{namespace}", + pod=~"{'|'.join(pods)}", + container="{container_name}" + {self.cluster_selector} + }} + ) + """ + ) + requests = await self.prometheus.loader.query( + f""" + avg by(resource) ( + kube_pod_container_resource_requests{{ + namespace="{namespace}", + pod=~"{'|'.join(pods)}", + container="{container_name}" + {self.cluster_selector} + }} + ) + """ + ) + requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + for limit in limits: + if limit["metric"]["resource"] == ResourceType.CPU: + limits_values[ResourceType.CPU] = float(limit["value"][1]) + elif limit["metric"]["resource"] == ResourceType.Memory: + limits_values[ResourceType.Memory] = float(limit["value"][1]) + + for request in requests: + if request["metric"]["resource"] == ResourceType.CPU: + requests_values[ResourceType.CPU] = float(request["value"][1]) + elif request["metric"]["resource"] == ResourceType.Memory: + requests_values[ResourceType.Memory] = float(request["value"][1]) + return ResourceAllocations(requests=requests_values, limits=limits_values) + + async def _list_containers_in_pods(self, pods: list[str]) -> set[str]: + containers = await self.prometheus.loader.query( + f""" + count by (container) ( + kube_pod_container_info{{ + pod=~"{'|'.join(pods)}" + {self.cluster_selector} + }} + ) + """ + ) + + return {container["metric"]["container"] for container in containers} diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py new file mode 100644 index 0000000..e757af4 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py @@ -0,0 +1,123 @@ +import logging +from collections import defaultdict +import itertools +import asyncio +from typing import Literal, Union + +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + +SubownerLiteral = Literal["ReplicaSet", "ReplicationController", "Job"] + + +class DoubleParentLoader(BaseKindLoader): + kinds = ["Deployment", "Rollout", "DeploymentConfig", "CronJob"] + + kind_subowner_map: dict[KindLiteral, SubownerLiteral] = { + "Deployment": "ReplicaSet", + "Rollout": "ReplicaSet", + "DeploymentConfig": "ReplicationController", + "CronJob": "Job", + } + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + return list( + itertools.chain( + *await asyncio.gather( + *[ + self.list_workloads_by_subowner(namespaces, subowner) + for subowner in set(self.kind_subowner_map.values()) + ] + ) + ) + ) + + async def list_workloads_by_subowner( + self, namespaces: Union[list[str], Literal["*"]], subowner_kind: SubownerLiteral + ) -> list[K8sWorkload]: + kinds = [kind for kind in self.kinds_to_scan if self.kind_subowner_map[kind] == subowner_kind] + + if kinds == []: + return [] + + logger.debug(f"Listing {', '.join(kinds)}") + # NOTE: kube-system is excluded if we scan all namespaces + namespace_selector = ( + ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' + ) + + metric_name = f"kube_{subowner_kind.lower()}_owner" + subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name" + + # Replica is for ReplicaSet and/or ReplicationController + subowners = await self.prometheus.loader.query( + f""" + count by (namespace, owner_name, {subowner_label}, owner_kind) ( + {metric_name} {{ + {namespace_selector}, + owner_kind=~"{'|'.join(kinds)}" + {self.cluster_selector} + }} + ) + """ + ) + # groupBy: (namespace, owner_name, owner_kind) => [replicaset,...] + replicas_by_owner = defaultdict(list) + for subowner in subowners: + metric = subowner["metric"] + key = metric["namespace"], metric["owner_name"], metric["owner_kind"] + replicas_by_owner[key].append(metric[subowner_label]) + + return list( + itertools.chain( + *await asyncio.gather( + *[ + self._list_pods_of_subowner( + namespace, + name, + kind, + subowner_kind, + subowners, + ) + for (namespace, name, kind), subowners in replicas_by_owner.items() + ] + ) + ) + ) + + async def _list_pods_of_subowner( + self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str] + ) -> list[K8sWorkload]: + pods = await self.prometheus.loader.query( + f""" + count by (namespace, owner_name, owner_kind, pod) ( + kube_pod_owner{{ + namespace="{namespace}", + owner_name=~"{'|'.join(subowner_names)}", + owner_kind="{subowner_kind}" + {self.cluster_selector} + }} + ) + """ + ) + if pods is None or len(pods) == 0: + return [] + + pod_names = [pod["metric"]["pod"] for pod in pods] + containers = await self._list_containers_in_pods(pod_names) + + return [ + K8sWorkload( + cluster=self.prometheus.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container_name, + allocations=await self._parse_allocation(namespace, pod_names, container_name), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for container_name in containers + ] diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py new file mode 100644 index 0000000..233ebb2 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py @@ -0,0 +1,85 @@ +import asyncio +from collections import defaultdict +import logging +from typing import Literal, Union + +from robusta_krr.core.models.objects import K8sWorkload, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class SimpleParentLoader(BaseKindLoader): + kinds = ["DaemonSet", "StatefulSet", "Job"] + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + if self.kinds_to_scan == []: + return [] + + logger.debug(f"Listing {', '.join(self.kinds_to_scan)}") + namespace_selector = ( + ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' + ) + + results = await self.prometheus.loader.query( + f""" + count by (namespace, owner_name, owner_kind, pod) ( + kube_pod_owner{{ + {namespace_selector}, + owner_kind=~"{'|'.join(self.kinds_to_scan)}" + {self.cluster_selector} + }} + ) + """ + ) + if results is None or len(results) == 0: + return [] + + # groupBy: (namespace, owner_name, owner_kind) => [pod, ... ] + workloads: defaultdict[tuple[str, str, str], list[str]] = defaultdict(list) + for result in results: + metric = result["metric"] + key = metric["namespace"], metric["owner_name"], metric["owner_kind"] + workloads[key].append(metric["pod"]) + + # NOTE: We do not show jobs that are a part of a cronjob, so we filter them out + job_workloads = [name for (_, name, kind) in workloads if kind == "Job"] + if job_workloads != []: + cronjobs = await self.prometheus.loader.query( + f""" + count by (namespace, job_name) ( + kube_job_owner{{ + {namespace_selector}, + owner_kind="CronJob" + {self.cluster_selector} + }} + ) + """ + ) + for cronjob in cronjobs: + metric = cronjob["metric"] + key = (metric["namespace"], metric["job_name"], "Job") + if key in workloads: + del workloads[key] + + workloads_containers = dict( + zip( + workloads.keys(), + await asyncio.gather(*[self._list_containers_in_pods(pods) for pods in workloads.values()]), + ) + ) + + return [ + K8sWorkload( + cluster=self.prometheus.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container, + allocations=await self._parse_allocation(namespace, pod_names, container), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for (namespace, name, kind), pod_names in workloads.items() + for container in workloads_containers[namespace, name, kind] + ] diff --git a/robusta_krr/core/integrations/prometheus/connector.py b/robusta_krr/core/integrations/prometheus/connector.py index 399209a..d75050f 100644 --- a/robusta_krr/core/integrations/prometheus/connector.py +++ b/robusta_krr/core/integrations/prometheus/connector.py @@ -5,7 +5,6 @@ import logging from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Optional -from kubernetes import config as k8s_config from kubernetes.client.api_client import ApiClient from kubernetes.client.exceptions import ApiException from prometrix import MetricsNotFound, PrometheusNotFound @@ -35,58 +34,56 @@ class PrometheusConnector: self.executor = ThreadPoolExecutor(settings.max_workers) self.cluster = cluster - self.api_client = settings.get_kube_client(context=cluster) - loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) - if loader is None: - raise PrometheusNotFound( - f"Wasn't able to connect to any Prometheus service in {cluster or 'inner'} cluster\n" - "Try using port-forwarding and/or setting the url manually (using the -p flag.).\n" - "For more information, see 'Giving the Explicit Prometheus URL' at " - "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" - ) - - self.loader = loader - - logger.info(f"{self.loader.name()} connected successfully for {cluster or 'inner'} cluster") - - @classmethod - def discover(cls, api_client: ApiClient) -> PrometheusConnector: - return cls() - @classmethod - def connect(cls, cluster: str) -> PrometheusConnector: - return cls(cluster=cluster) - - def get_metrics_service( - self, - api_client: Optional[ApiClient] = None, - cluster: Optional[str] = None, - ) -> Optional[PrometheusMetricsService]: - if settings.prometheus_url is not None: - logger.info("Prometheus URL is specified, will not auto-detect a metrics service") - metrics_to_check = [PrometheusMetricsService] - else: - logger.info("No Prometheus URL is specified, trying to auto-detect a metrics service") - metrics_to_check = [VictoriaMetricsService, ThanosMetricsService, MimirMetricsService, PrometheusMetricsService] + def discover(self, api_client: ApiClient) -> None: + """Try to automatically discover a Prometheus service.""" + metrics_to_check: list[PrometheusMetricsService] = [ + VictoriaMetricsService, + ThanosMetricsService, + MimirMetricsService, + PrometheusMetricsService, + ] for metric_service_class in metrics_to_check: - service_name = metric_service_class.name() + logger.info(f"Trying to find {metric_service_class.name()}{self._for_cluster_postfix}") try: - loader = metric_service_class(api_client=api_client, cluster=cluster, executor=self.executor) - loader.check_connection() - except MetricsNotFound as e: - logger.info(f"{service_name} not found: {e}") - except ApiException as e: - logger.warning( - f"Unable to automatically discover a {service_name} in the cluster ({e}). " - "Try specifying how to connect to Prometheus via cli options" - ) + loader = metric_service_class.discover(api_client=api_client) + self._connect(loader) + except Exception: + logger.info(f"Wasn't able to find {metric_service_class.name()}{self._for_cluster_postfix}") else: - logger.info(f"{service_name} found") - loader.validate_cluster_name() - return loader + return + + raise PrometheusNotFound - return None + def connect(self, url: Optional[str] = None) -> None: + """Connect to a Prometheus service using a URL.""" + try: + loader = PrometheusMetricsService(url=url) + self._connect(loader) + except Exception as e: + logger.warning(f"Unable to connect to Prometheus using the provided URL ({e})") + raise e + else: + logger.info(f"{loader.name()} connected successfully") + + def _connect(self, loader: PrometheusMetricsService) -> None: + service_name = loader.name() + try: + loader.check_connection() + except MetricsNotFound as e: + logger.info(f"{service_name} not found: {e}") + raise PrometheusNotFound(f"Wasn't able to connect to {service_name}" + self._for_cluster_postfix) + except ApiException as e: + logger.warning( + f"Unable to automatically discover a {service_name}{self._for_cluster_postfix} ({e}). " + "Try specifying how to connect to Prometheus via cli options" + ) + raise e + else: + logger.info(f"{service_name} found") + loader.validate_cluster_name() + self.loader = loader async def get_history_range( self, history_duration: datetime.timedelta @@ -125,3 +122,8 @@ class PrometheusConnector: MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step) for MetricLoader in strategy.metrics } + + @property + def _for_cluster_postfix(self) -> str: + """The string postfix to be used in logging messages.""" + return f" for {self.cluster} cluster" if self.cluster else "" diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py index ea3af57..e11d705 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py @@ -7,6 +7,7 @@ from robusta_krr.utils.service_discovery import MetricsServiceDiscovery from .prometheus_metrics_service import PrometheusMetricsService + class MimirMetricsDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: """ diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 4290e64..08fd83f 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,8 +1,11 @@ +from __future__ import annotations + import asyncio import logging from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Iterable, List, Optional +from typing_extensions import Self from kubernetes.client import ApiClient from prometheus_api_client import PrometheusApiClientException @@ -59,16 +62,18 @@ class PrometheusMetricsService(MetricsService): url: str, cluster: Optional[str] = None, executor: Optional[ThreadPoolExecutor] = None, + api_client: Optional[ApiClient] = None, ) -> None: self.url = url + self.url_postfix self.cluster = cluster self.executor = executor or ThreadPoolExecutor(settings.max_workers) - - logger.info(f"Trying to connect to {self.name()}" + self._for_cluster_postfix) + self.api_client = api_client self.auth_header = settings.prometheus_auth_header self.ssl_enabled = settings.prometheus_ssl_enabled + logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix) + if settings.openshift: logging.info("Openshift flag is set, trying to load token from service account.") openshift_token = openshift.load_token() @@ -79,20 +84,6 @@ class PrometheusMetricsService(MetricsService): else: logging.warning("Openshift token is not found, trying to connect without it.") - self.prometheus_discovery = self.service_discovery(api_client=self.api_client) - - self.url = settings.prometheus_url - self.url = self.url or self.prometheus_discovery.find_metrics_url() - - if not self.url: - raise PrometheusNotFound( - f"{self.name()} instance could not be found while scanning" + self._for_cluster_postfix - ) - - self.url += self.url_postfix - - logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix) - headers = settings.prometheus_other_headers headers |= self.additional_headers @@ -100,9 +91,23 @@ class PrometheusMetricsService(MetricsService): headers |= {"Authorization": self.auth_header} elif not settings.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) + self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self) self.prometheus = get_custom_prometheus_connect(self.prom_config) + @classmethod + def discover( + cls, + api_client: ApiClient, + cluster: Optional[str] = None, + executor: Optional[ThreadPoolExecutor] = None, + ) -> Self: + url = cls.service_discovery(api_client=api_client).find_metrics_url() + if not url: + raise PrometheusNotFound(f"{cls.name()} instance could not be found while scanning") + + return cls(url, cluster, executor, api_client) + def check_connection(self): """ Checks the connection to Prometheus. @@ -249,7 +254,9 @@ class PrometheusMetricsService(MetricsService): }}[{period_literal}] """ ) - pod_owners = {repl_controller["metric"]["replicationcontroller"] for repl_controller in replication_controllers} + pod_owners = { + repl_controller["metric"]["replicationcontroller"] for repl_controller in replication_controllers + } pod_owner_kind = "ReplicationController" del replication_controllers @@ -314,4 +321,4 @@ class PrometheusMetricsService(MetricsService): @property def _for_cluster_postfix(self) -> str: """The string postfix to be used in logging messages.""" - return (f" for {self.cluster} cluster" if self.cluster else "") + return f" for {self.cluster} cluster" if self.cluster else "" diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 4b337c7..4f8b506 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from enum import Enum import sys from typing import Any, Literal, Optional, Union @@ -12,19 +13,24 @@ from rich.logging import RichHandler from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy -from robusta_krr.core.integrations.kubernetes.workload_loader.base import BaseClusterLoader +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader from robusta_krr.core.models.objects import KindLiteral logger = logging.getLogger("krr") +class LoadingMode(str, Enum): + KUBEAPI = "kubeapi" + PROMETHETUS = "prometheus" + + class Config(pd.BaseSettings): quiet: bool = pd.Field(False) verbose: bool = pd.Field(False) clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None - workload_loader: Literal["kubeapi", "prometheus"] = pd.Field("kubeapi") + mode: LoadingMode = pd.Field(LoadingMode.KUBEAPI) impersonate_user: Optional[str] = None impersonate_group: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") @@ -138,19 +144,18 @@ class Config(pd.BaseSettings): self._logging_console = Console(file=sys.stderr if self.log_to_stderr else sys.stdout, width=self.width) return self._logging_console - @property - def cluster_loader(self) -> BaseClusterLoader: - from robusta_krr.core.integrations.kubernetes.workload_loader import ( - KubeAPIClusterLoader, - PrometheusClusterLoader, - ) + def create_cluster_loader(self) -> BaseClusterLoader: + from robusta_krr.core.integrations.prometheus.cluster_loader import PrometheusClusterLoader + from robusta_krr.core.integrations.kubernetes.cluster_loader import KubeAPIClusterLoader - if settings.workload_loader == "kubeapi": + if settings.mode == LoadingMode.KUBEAPI: + logger.info("Connecting using Kubernetes API, will load the kubeconfig.") return KubeAPIClusterLoader() - elif settings.workload_loader == "prometheus": + elif settings.mode == LoadingMode.PROMETHETUS: + logger.info("Connecting using Prometheus, will load the kubeconfig.") return PrometheusClusterLoader() else: - raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented") + raise NotImplementedError(f"Workload loader {settings.mode} is not implemented") def load_kubeconfig(self) -> None: try: diff --git a/robusta_krr/core/models/exceptions.py b/robusta_krr/core/models/exceptions.py new file mode 100644 index 0000000..70f3b05 --- /dev/null +++ b/robusta_krr/core/models/exceptions.py @@ -0,0 +1,2 @@ +class CriticalRunnerException(Exception): + """This exception will be raised when a critical error occurs in the runner and the runner cannot continue.""" diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index bb109df..75c2191 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -1,4 +1,5 @@ import asyncio +import itertools import logging import math import os @@ -8,12 +9,13 @@ from typing import Optional, Union from datetime import timedelta from rich.console import Console from slack_sdk import WebClient +from prometrix import PrometheusNotFound from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult -from robusta_krr.core.integrations.kubernetes.workload_loader import IListPodsFallback -from robusta_krr.core.integrations.kubernetes import ClusterConnector +from robusta_krr.core.abstract.workload_loader import IListPodsFallback from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException from robusta_krr.core.models.config import settings +from robusta_krr.core.models.exceptions import CriticalRunnerException from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData from robusta_krr.utils.intro import load_intro_message @@ -33,12 +35,8 @@ def custom_print(*objects, rich: bool = True, force: bool = False) -> None: print_func(*objects) # type: ignore -class CriticalRunnerException(Exception): ... - - class Runner: def __init__(self) -> None: - self.connector = ClusterConnector() self.strategy = settings.create_strategy() self.errors: list[dict] = [] @@ -144,16 +142,18 @@ class Runner: } async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]: - prometheus_loader = self.connector.get_prometheus(object.cluster) + prometheus = self.connector.get_prometheus(object.cluster) - if prometheus_loader is None: + if prometheus is None: return None - object.pods = await prometheus_loader.load_pods(object, self.strategy.settings.history_timedelta) - if object.pods == [] and isinstance(self.connector, IListPodsFallback): + cluster_loader = self.connector.get_workload_loader(object.cluster) + + object.pods = await prometheus.load_pods(object, self.strategy.settings.history_timedelta) + if object.pods == [] and isinstance(cluster_loader, IListPodsFallback): # Fallback to IListPodsFallback if Prometheus did not return any pods # IListPodsFallback is implemented by the Kubernetes API connector - object.pods = await self.connector.load_pods(object) + object.pods = await cluster_loader.load_pods(object) # NOTE: Kubernetes API returned pods, but Prometheus did not # This might happen with fast executing jobs @@ -164,7 +164,7 @@ class Runner: "Loaded pods from Kubernetes API instead." ) - metrics = await prometheus_loader.gather_data( + metrics = await prometheus.gather_data( object, self.strategy, self.strategy.settings.history_timedelta, @@ -179,10 +179,18 @@ class Runner: logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)") return self._format_result(result) - async def _check_data_availability(self, cluster: Optional[str]) -> None: - prometheus_loader = self.connector.get_prometheus(cluster) - if prometheus_loader is None: - return + async def _check_cluster(self, cluster: Optional[str]) -> bool: + try: + prometheus_loader = self.connector.get_prometheus(cluster) + except PrometheusNotFound: + logger.error( + f"Wasn't able to connect to any Prometheus service" + f' for cluster {cluster}' if cluster is not None else "" + "\nTry using port-forwarding and/or setting the url manually (using the -p flag.).\n" + "For more information, see 'Giving the Explicit Prometheus URL' at " + "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" + ) + return False try: history_range = await prometheus_loader.get_history_range(timedelta(hours=5)) @@ -195,7 +203,7 @@ class Runner: "name": "HistoryRangeError", } ) - return + return True # We can try to continue without history range logger.debug(f"History range for {cluster}: {history_range}") enough_data = self.strategy.settings.history_range_enough(history_range) @@ -218,6 +226,8 @@ class Runner: } ) + return True + async def _gather_object_allocations(self, k8s_object: K8sWorkload) -> Optional[ResourceScan]: recommendation = await self._calculate_object_recommendations(k8s_object) @@ -248,14 +258,36 @@ class Runner: logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') + # This is for code clarity. All functions take str | None as cluster parameter if clusters is None: - await self._check_data_availability(None) - else: - await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters]) + clusters = [None] + + checks = await asyncio.gather(*[self._check_cluster(cluster) for cluster in clusters]) + clusters = [cluster for cluster, check in zip(clusters, checks) if check] + + if clusters == []: + raise CriticalRunnerException("No clusters available to scan.") + + workload_loaders = {cluster: self.connector.try_get_workload_loader(cluster) for cluster in clusters} + + # NOTE: we filter out None values as they are clusters that we could not connect to + workload_loaders = {cluster: loader for cluster, loader in workload_loaders.items() if loader is not None} + + if workload_loaders == {}: + raise CriticalRunnerException("Could not connect to any cluster.") with ProgressBar(title="Calculating Recommendation") as self.__progressbar: - workloads = await self.connector.list_workloads(clusters) + # We gather all workloads from all clusters in parallel (asyncio.gather) + # Then we chain all workloads together (itertools.chain) + workloads = list( + itertools.chain(*await asyncio.gather(*[loader.list_workloads() for loader in workload_loaders.values()])) + ) + # Then we gather all recommendations for all workloads in parallel (asyncio.gather) scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads]) + # NOTE: Previously we were streaming workloads to + # calculate recommendations as soon as they were available (not waiting for all workloads to be loaded), + # but it gave minor performance improvements (most of the time was spent on calculating recommendations) + # So we decided to do those two steps sequentially to simplify the code successful_scans = [scan for scan in scans if scan is not None] @@ -283,13 +315,6 @@ class Runner: """Run the Runner. The return value is the exit code of the program.""" await self._greet() - try: - settings.load_kubeconfig() - except Exception as e: - logger.error(f"Could not load kubernetes configuration: {e}") - logger.error("Try to explicitly set --context and/or --kubeconfig flags.") - return 1 # Exit with error - try: # eks has a lower step limit than other types of prometheus, it will throw an error step_count = self.strategy.settings.history_duration * 60 / self.strategy.settings.timeframe_duration @@ -301,6 +326,8 @@ class Runner: ) self.strategy.settings.timeframe_duration = min_step + self.connector = settings.create_cluster_loader() + result = await self._collect_result() logger.info("Result collected, displaying...") self._process_result(result) diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 810ccd1..6fc0f1f 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -15,7 +15,7 @@ from typer.models import OptionInfo from robusta_krr import formatters as concrete_formatters # noqa: F401 from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import BaseStrategy -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import Config, LoadingMode from robusta_krr.core.runner import Runner from robusta_krr.utils.version import get_version @@ -55,10 +55,12 @@ def load_commands() -> None: help="Path to kubeconfig file. If not provided, will attempt to find it.", rich_help_panel="Kubernetes Settings", ), - workload_loader: str = typer.Option( - "kubeapi", - "--workload", - help="Workload loader to use (kubeapi, prometheus).", + mode: LoadingMode = typer.Option( + LoadingMode.KUBEAPI, + "--mode", + "-m", + help="Loading mode. KubeAPI mode requires a kubeconfig and supports auto-discovery. Prometheus mode requires to pass a --prometheus-url.", + case_sensitive=False, rich_help_panel="Kubernetes Settings", ), impersonate_user: Optional[str] = typer.Option( @@ -138,7 +140,7 @@ def load_commands() -> None: None, "--prometheus-cluster-label", "-l", - help="The label in prometheus for your cluster.(Only relevant for centralized prometheus)", + help="The label in prometheus for your cluster. (Only relevant for centralized prometheus)", rich_help_panel="Prometheus Settings", ), prometheus_label: str = typer.Option( @@ -256,7 +258,7 @@ def load_commands() -> None: try: config = Config( kubeconfig=kubeconfig, - workload_loader=workload_loader, + mode=mode, impersonate_user=impersonate_user, impersonate_group=impersonate_group, clusters="*" if all_clusters else clusters, -- cgit v1.2.3