diff options
| author | LeaveMyYard <zhukovpavel2001@gmail.com> | 2024-04-24 16:05:03 +0300 |
|---|---|---|
| committer | LeaveMyYard <zhukovpavel2001@gmail.com> | 2024-04-24 16:05:03 +0300 |
| commit | bf909780f67a58c8647c689a72e2044e53a84e6c (patch) | |
| tree | e543a8667bca4cb713fc8119e6e99e39bc774ba4 /robusta_krr | |
| parent | c7ad1cde09e79d4821383ceb5dd32fd0471d427d (diff) | |
Implement remaining kinds in prometheus workload loader
Diffstat (limited to 'robusta_krr')
16 files changed, 305 insertions, 174 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index 2ff4458..99b4e3c 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -17,7 +17,7 @@ from kubernetes.client.models import ( V2HorizontalPodAutoscaler, ) -from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations @@ -37,8 +37,8 @@ class ClusterConnector: EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) def __init__(self) -> None: - self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} - self._metrics_service_loaders_error_logged: set[Exception] = set() + self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {} + self._connector_errors: set[Exception] = set() async def list_clusters(self) -> Optional[list[str]]: """List all clusters. @@ -80,22 +80,22 @@ class ClusterConnector: return [context["name"] for context in contexts if context["name"] in settings.clusters] - def get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]: - if cluster not in self._metrics_service_loaders: + def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]: + if cluster not in self._prometheus_connectors: try: - self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster) + logger.debug(f"Creating Prometheus connector for cluster {cluster}") + self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster) except Exception as e: - self._metrics_service_loaders[cluster] = e + self._prometheus_connectors[cluster] = e - result = self._metrics_service_loaders[cluster] + result = self._prometheus_connectors[cluster] if isinstance(result, self.EXPECTED_EXCEPTIONS): - if result not in self._metrics_service_loaders_error_logged: - self._metrics_service_loaders_error_logged.add(result) + if result not in self._connector_errors: + self._connector_errors.add(result) logger.error(str(result)) return None elif isinstance(result, Exception): raise result - return result def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: @@ -103,7 +103,7 @@ class ClusterConnector: if settings.workload_loader == "kubeapi": return KubeAPIWorkloadLoader(cluster=cluster) elif settings.workload_loader == "prometheus": - cluster_loader = self.get_prometheus_loader(cluster) + cluster_loader = self.get_prometheus(cluster) if cluster_loader is not None: return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader) else: diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py index 9543147..06dd8c0 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py @@ -1,5 +1,5 @@ -from .base import BaseWorkloadLoader +from .base import BaseWorkloadLoader, IListPodsFallback from .kube_api import KubeAPIWorkloadLoader from .prometheus import PrometheusWorkloadLoader -__all__ = ["BaseWorkloadLoader", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"]
\ No newline at end of file +__all__ = ["BaseWorkloadLoader", "IListPodsFallback", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"]
\ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py index 8581c1a..11a2783 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py @@ -1,13 +1,18 @@ import abc -from typing import Optional from robusta_krr.core.models.objects import K8sWorkload, PodData class BaseWorkloadLoader(abc.ABC): + """A base class for workload loaders.""" + @abc.abstractmethod async def list_workloads(self) -> list[K8sWorkload]: pass + +class IListPodsFallback(abc.ABC): + """This is an interface that a workload loader can implement to have a fallback method to list pods.""" + @abc.abstractmethod async def list_pods(self, object: K8sWorkload) -> list[PodData]: pass diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py index d23f7a4..a7470d7 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py @@ -12,7 +12,7 @@ from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations -from ..base import BaseWorkloadLoader +from ..base import BaseWorkloadLoader, IListPodsFallback from .loaders import ( BaseKindLoader, CronJobLoader, @@ -29,7 +29,7 @@ logger = logging.getLogger("krr") HPAKey = tuple[str, str, str] -class KubeAPIWorkloadLoader(BaseWorkloadLoader): +class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): workload_loaders: list[BaseKindLoader] = [ DeploymentLoader, RolloutLoader, @@ -176,10 +176,12 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader): res = await self._list_namespaced_or_global_objects( kind="HPA-v1", all_namespaces_request=lambda **kwargs: loop.run_in_executor( - self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), + self.executor, + lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), ), namespaced_request=lambda **kwargs: loop.run_in_executor( - self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs), + self.executor, + lambda: self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs), ), ) @@ -205,10 +207,12 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader): res = await self._list_namespaced_or_global_objects( kind="HPA-v2", all_namespaces_request=lambda **kwargs: loop.run_in_executor( - self.executor, self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs) + self.executor, + lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), ), namespaced_request=lambda **kwargs: loop.run_in_executor( - self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs), + self.executor, + lambda: self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs), ), ) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py index b277bc2..bfe6853 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py @@ -2,30 +2,36 @@ import asyncio import itertools import logging +from collections import Counter -from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector from robusta_krr.core.models.config import settings -from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService -from robusta_krr.core.models.objects import K8sWorkload, PodData +from robusta_krr.core.models.objects import K8sWorkload from ..base import BaseWorkloadLoader -from .loaders import BaseKindLoader, DeploymentLoader +from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader logger = logging.getLogger("krr") class PrometheusWorkloadLoader(BaseWorkloadLoader): - workloads: list[type[BaseKindLoader]] = [DeploymentLoader] + workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader] - def __init__(self, cluster: str, metric_loader: PrometheusMetricsLoader) -> None: + def __init__(self, cluster: str, metric_loader: PrometheusConnector) -> None: self.cluster = cluster self.metric_service = metric_loader self.loaders = [loader(metric_loader) for loader in self.workloads] async def list_workloads(self) -> list[K8sWorkload]: - return itertools.chain(await asyncio.gather(*[loader.list_workloads(settings.namespaces, "") for loader in self.loaders])) + workloads = list( + itertools.chain( + *await asyncio.gather(*[loader.list_workloads(settings.namespaces) for loader in self.loaders]) + ) + ) + + kind_counts = Counter([workload.kind for workload in workloads]) + for kind, count in kind_counts.items(): + logger.info(f"Found {count} {kind} in {self.cluster}") - async def list_pods(self, object: K8sWorkload) -> list[PodData]: - # This should not be implemented, as implementation will repeat PrometheusMetricsLoader.load_pods - # As this method is ment to be a fallback, repeating the same logic will not be beneficial - raise NotImplementedError + return workloads diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py index 6ca6efd..18b9a3d 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py @@ -1,19 +1,9 @@ from .base import BaseKindLoader -from .cronjobs import CronJobLoader -from .daemonsets import DaemonSetLoader -from .deploymentconfigs import DeploymentConfigLoader -from .deployments import DeploymentLoader -from .jobs import JobLoader -from .rollouts import RolloutLoader -from .statefulsets import StatefulSetLoader +from .double_parent import DoubleParentLoader +from .simple_parent import SimpleParentLoader __all__ = [ "BaseKindLoader", - "CronJobLoader", - "DeploymentLoader", - "DaemonSetLoader", - "DeploymentConfigLoader", - "JobLoader", - "RolloutLoader", - "StatefulSetLoader", + "DoubleParentLoader", + "SimpleParentLoader", ]
\ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py index a767eb2..4367d80 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py @@ -1,5 +1,6 @@ import abc import asyncio +from collections import defaultdict import logging from concurrent.futures import ThreadPoolExecutor from typing import Any, Iterable, Literal, Optional, Union @@ -15,8 +16,10 @@ from kubernetes.client.models import ( # type: ignore V1PodList, V1StatefulSet, ) +from robusta_krr.core.models.config import settings -from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData @@ -32,29 +35,44 @@ class BaseKindLoader(abc.ABC): It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. """ - kind: KindLiteral + kinds: list[KindLiteral] = [] - def __init__(self, metrics_loader: PrometheusMetricsLoader) -> None: - self.metrics_loader = metrics_loader + def __init__(self, connector: PrometheusConnector) -> None: + self.connector = connector + self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label() + + @property + def kinds_to_scan(self) -> list[KindLiteral]: + return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds @abc.abstractmethod - def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]: + def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: pass - async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations: - limits = await self.metrics_loader.loader.query( - "avg by(resource) (kube_pod_container_resource_limits{" - f'namespace="{namespace}", ' - f'pod=~"{pod_selector}", ' - f'container="{container_name}"' - "})" + async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations: + limits = await self.connector.loader.query( + f""" + avg by(resource) ( + kube_pod_container_resource_limits{{ + {self.cluster_selector} + namespace="{namespace}", + pod=~"{'|'.join(pods)}", + container="{container_name}" + }} + ) + """ ) - requests = await self.metrics_loader.loader.query( - "avg by(resource) (kube_pod_container_resource_requests{" - f'namespace="{namespace}", ' - f'pod=~"{pod_selector}", ' - f'container="{container_name}"' - "})" + requests = await self.connector.loader.query( + f""" + avg by(resource) ( + kube_pod_container_resource_requests{{ + {self.cluster_selector} + namespace="{namespace}", + pod=~"{'|'.join(pods)}", + container="{container_name}" + }} + ) + """ ) requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} @@ -71,56 +89,16 @@ class BaseKindLoader(abc.ABC): requests_values[ResourceType.Memory] = float(request["value"][1]) return ResourceAllocations(requests=requests_values, limits=limits_values) - async def __build_from_owner( - self, namespace: str, app_name: str, containers: list[str], pod_names: list[str] - ) -> list[K8sWorkload]: - return [ - K8sWorkload( - cluster=None, - namespace=namespace, - name=app_name, - kind="Deployment", - container=container_name, - allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find - pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods - ) - for container_name in containers - ] - - async def _list_containers(self, namespace: str, pod_selector: str) -> list[str]: - containers = await self.metrics_loader.loader.query( + async def _list_containers_in_pods(self, pods: list[str]) -> set[str]: + containers = await self.connector.loader.query( f""" count by (container) ( kube_pod_container_info{{ - namespace="{namespace}", - pod=~"{pod_selector}" + {self.cluster_selector} + pod=~"{'|'.join(pods)}" }} ) """ ) - return [container["metric"]["container"] for container in containers] - async def _list_containers_in_pods( - self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str - ) -> list[K8sWorkload]: - if pod_owner_kind == "ReplicaSet": - # owner_name is ReplicaSet names - pods = await self.metrics_loader.loader.query( - f""" - count by (owner_name, replicaset, pod) ( - kube_pod_owner{{ - namespace="{namespace}", - owner_name=~"{owner_name}", ' - owner_kind="ReplicaSet" - }} - ) - """ - ) - if pods is None or len(pods) == 0: - return [] # no container - # [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']}, - # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}] - pod_names = [pod["metric"]["pod"] for pod in pods] - container_names = await self._list_containers(namespace, "|".join(pod_names)) - return await self.__build_from_owner(namespace, app_name, container_names, pod_names) - return [] + return {container["metric"]["container"] for container in containers} diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py deleted file mode 100644 index c895d21..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py +++ /dev/null @@ -1,50 +0,0 @@ -import logging -from collections import defaultdict -import itertools -import asyncio -from typing import Literal, Optional, Union - -from robusta_krr.core.models.objects import K8sWorkload - -from .base import BaseKindLoader - -logger = logging.getLogger("krr") - - -class DeploymentLoader(BaseKindLoader): - kind = "Deployment" - - async def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]: - logger.debug( - f"Listing deployments in namespace({namespaces})" - ) - ns = "|".join(namespaces) - replicasets = await self.metrics_loader.loader.query( - f""" - count by (namespace, owner_name, replicaset) ( - kube_replicaset_owner{{ - namespace=~"{ns}", - owner_kind="Deployment", - }} - ) - """ - ) - # groupBy: 'ns/owner_name' => [{metadata}...] - pod_owner_kind = "ReplicaSet" - replicaset_dict = defaultdict(list) - for replicaset in replicasets: - replicaset_dict[replicaset["metric"]["namespace"] + "/" + replicaset["metric"]["owner_name"]].append( - replicaset["metric"] - ) - objects = await asyncio.gather( - *[ - self._list_containers_in_pods( - replicas[0]["owner_name"], - pod_owner_kind, - replicas[0]["namespace"], - "|".join(list(map(lambda metric: metric["replicaset"], replicas))), - ) - for replicas in replicaset_dict.values() - ] - ) - return list(itertools.chain(*objects)) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py new file mode 100644 index 0000000..b4ce13e --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py @@ -0,0 +1,123 @@ +import logging +from collections import defaultdict +import itertools +import asyncio +from typing import Literal, Union + +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + +SubownerLiteral = Literal["ReplicaSet", "ReplicationController", "Job"] + + +class DoubleParentLoader(BaseKindLoader): + kinds = ["Deployment", "Rollout", "DeploymentConfig", "CronJob"] + + kind_subowner_map: dict[KindLiteral, SubownerLiteral] = { + "Deployment": "ReplicaSet", + "Rollout": "ReplicaSet", + "DeploymentConfig": "ReplicationController", + "CronJob": "Job", + } + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + return list( + itertools.chain( + *await asyncio.gather( + *[ + self.list_workloads_by_subowner(namespaces, subowner) + for subowner in set(self.kind_subowner_map.values()) + ] + ) + ) + ) + + async def list_workloads_by_subowner( + self, namespaces: Union[list[str], Literal["*"]], subowner_kind: SubownerLiteral + ) -> list[K8sWorkload]: + kinds = [kind for kind in self.kinds_to_scan if self.kind_subowner_map[kind] == subowner_kind] + + if kinds == []: + return [] + + logger.debug(f"Listing {', '.join(kinds)}") + # NOTE: kube-system is excluded if we scan all namespaces + namespace_selector = ( + ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' + ) + + metric_name = f"kube_{subowner_kind.lower()}_owner" + subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name" + + # Replica is for ReplicaSet and/or ReplicationController + subowners = await self.connector.loader.query( + f""" + count by (namespace, owner_name, {subowner_label}, owner_kind) ( + {metric_name} {{ + {self.cluster_selector} + {namespace_selector}, + owner_kind=~"{'|'.join(kinds)}" + }} + ) + """ + ) + # groupBy: (namespace, owner_name, owner_kind) => [replicaset,...] + replicas_by_owner = defaultdict(list) + for subowner in subowners: + metric = subowner["metric"] + key = metric["namespace"], metric["owner_name"], metric["owner_kind"] + replicas_by_owner[key].append(metric[subowner_label]) + + return list( + itertools.chain( + *await asyncio.gather( + *[ + self._list_pods_of_subowner( + namespace, + name, + kind, + subowner_kind, + subowners, + ) + for (namespace, name, kind), subowners in replicas_by_owner.items() + ] + ) + ) + ) + + async def _list_pods_of_subowner( + self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str] + ) -> list[K8sWorkload]: + pods = await self.connector.loader.query( + f""" + count by (namespace, owner_name, owner_kind, pod) ( + kube_pod_owner{{ + {self.cluster_selector} + namespace="{namespace}", + owner_name=~"{'|'.join(subowner_names)}", + owner_kind="{subowner_kind}" + }} + ) + """ + ) + if pods is None or len(pods) == 0: + return [] + + pod_names = [pod["metric"]["pod"] for pod in pods] + containers = await self._list_containers_in_pods(pod_names) + + return [ + K8sWorkload( + cluster=self.connector.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container_name, + allocations=await self._parse_allocation(namespace, pod_names, container_name), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for container_name in containers + ] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py new file mode 100644 index 0000000..33a8c5a --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py @@ -0,0 +1,65 @@ +import asyncio +from collections import defaultdict +import logging +from typing import Literal, Union + +from robusta_krr.core.models.objects import K8sWorkload, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class SimpleParentLoader(BaseKindLoader): + kinds = ["DaemonSet", "StatefulSet", "Job"] + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + if self.kinds_to_scan == []: + return [] + + logger.debug(f"Listing {', '.join(self.kinds_to_scan)}") + namespace_selector = ( + ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' + ) + + results = await self.connector.loader.query( + f""" + count by (namespace, owner_name, owner_kind, pod) ( + kube_pod_owner{{ + {self.cluster_selector} + {namespace_selector}, + owner_kind=~"{'|'.join(self.kinds_to_scan)}" + }} + ) + """ + ) + if results is None or len(results) == 0: + return [] + + # groupBy: (namespace, owner_name, owner_kind) => [pod, ... ] + workloads: defaultdict[tuple[str, str, str], list[str]] = defaultdict(list) + for result in results: + metric = result["metric"] + key = metric["namespace"], metric["owner_name"], metric["owner_kind"] + workloads[key].append(metric["pod"]) + + workloads_containers = dict( + zip( + workloads.keys(), + await asyncio.gather(*[self._list_containers_in_pods(pods) for pods in workloads.values()]), + ) + ) + + return [ + K8sWorkload( + cluster=self.connector.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container, + allocations=await self._parse_allocation(namespace, pod_names, container), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for (namespace, name, kind), pod_names in workloads.items() + for container in workloads_containers[namespace, name, kind] + ] diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py index cedf1c0..1ce9d2a 100644 --- a/robusta_krr/core/integrations/prometheus/__init__.py +++ b/robusta_krr/core/integrations/prometheus/__init__.py @@ -1,3 +1,3 @@ -from .loader import PrometheusMetricsLoader +from .connector import PrometheusConnector from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound from .prometheus_utils import ClusterNotSpecifiedException diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/connector.py index eb734ef..ab89b34 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/connector.py @@ -24,7 +24,7 @@ if TYPE_CHECKING: logger = logging.getLogger("krr") -class PrometheusMetricsLoader: +class PrometheusConnector: def __init__(self, *, cluster: Optional[str] = None) -> None: """ Initializes the Prometheus Loader. @@ -34,6 +34,7 @@ class PrometheusMetricsLoader: """ self.executor = ThreadPoolExecutor(settings.max_workers) + self.cluster = cluster self.api_client = settings.get_kube_client(context=cluster) loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) if loader is None: @@ -46,7 +47,7 @@ class PrometheusMetricsLoader: self.loader = loader - logger.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster") + logger.info(f"{self.loader.name()} connected successfully for {cluster or 'inner'} cluster") def get_metrics_service( self, diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index d55af00..34957ef 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -74,7 +74,8 @@ class PrometheusMetric(BaseMetric): if self.pods_batch_size is not None and self.pods_batch_size <= 0: raise ValueError("pods_batch_size must be positive") - def get_prometheus_cluster_label(self) -> str: + @staticmethod + def get_prometheus_cluster_label() -> str: """ Generates the cluster label for querying a centralized Prometheus diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 5347422..50b86a6 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -112,10 +112,14 @@ class PrometheusMetricsService(MetricsService): async def query(self, query: str) -> dict: loop = asyncio.get_running_loop() - return await loop.run_in_executor( - self.executor, - lambda: self.prometheus.safe_custom_query(query=query)["result"], - ) + try: + return await loop.run_in_executor( + self.executor, + lambda: self.prometheus.safe_custom_query(query=query)["result"], + ) + except PrometheusApiClientException as e: + logger.error(f"Error while querying Prometheus: {query}") + raise e async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict: loop = asyncio.get_running_loop() diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 5dac123..27f1c29 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -52,6 +52,9 @@ class K8sWorkload(pd.BaseModel): def __str__(self) -> str: return f"{self.kind} {self.namespace}/{self.name}/{self.container}" + def __repr__(self) -> str: + return f"<K8sWorkload {self}>" + def __hash__(self) -> int: return hash(str(self)) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 528fd89..bb109df 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -6,13 +6,13 @@ import warnings from concurrent.futures import ThreadPoolExecutor from typing import Optional, Union from datetime import timedelta -from prometrix import PrometheusNotFound from rich.console import Console from slack_sdk import WebClient from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult -from robusta_krr.core.integrations.kubernetes import ClusterConnector, KubeAPIWorkloadLoader -from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader +from robusta_krr.core.integrations.kubernetes.workload_loader import IListPodsFallback +from robusta_krr.core.integrations.kubernetes import ClusterConnector +from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData @@ -144,14 +144,15 @@ class Runner: } async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]: - prometheus_loader = self._get_prometheus_loader(object.cluster) + prometheus_loader = self.connector.get_prometheus(object.cluster) if prometheus_loader is None: return None object.pods = await prometheus_loader.load_pods(object, self.strategy.settings.history_timedelta) - if object.pods == [] and isinstance(self.connector, KubeAPIWorkloadLoader): - # Fallback to Kubernetes API + if object.pods == [] and isinstance(self.connector, IListPodsFallback): + # Fallback to IListPodsFallback if Prometheus did not return any pods + # IListPodsFallback is implemented by the Kubernetes API connector object.pods = await self.connector.load_pods(object) # NOTE: Kubernetes API returned pods, but Prometheus did not @@ -179,7 +180,7 @@ class Runner: return self._format_result(result) async def _check_data_availability(self, cluster: Optional[str]) -> None: - prometheus_loader = self.connector.get_prometheus_loader(cluster) + prometheus_loader = self.connector.get_prometheus(cluster) if prometheus_loader is None: return |
