From 7b6be353ff7108c5a23b61ea1b5bd238b51e9d0f Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Thu, 2 May 2024 18:33:14 +0300 Subject: Rework ckyster selector for prometheus mode --- robusta_krr/core/abstract/strategies.py | 1 - .../prometheus/cluster_loader/__init__.py | 28 +++++++++--------- .../prometheus/cluster_loader/loaders/base.py | 33 ++++++++++++---------- .../cluster_loader/loaders/double_parent.py | 6 ++-- .../cluster_loader/loaders/simple_parent.py | 6 ++-- .../core/integrations/prometheus/metrics/base.py | 12 -------- .../core/integrations/prometheus/metrics/cpu.py | 13 ++++----- .../core/integrations/prometheus/metrics/memory.py | 14 ++++----- robusta_krr/core/models/objects.py | 12 ++++++++ 9 files changed, 61 insertions(+), 64 deletions(-) diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index 388595f..3f64002 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -12,7 +12,6 @@ from numpy.typing import NDArray from robusta_krr.core.models.result import K8sWorkload, ResourceType if TYPE_CHECKING: - from robusta_krr.core.abstract.metrics import BaseMetric # noqa: F401 from robusta_krr.core.integrations.prometheus.metrics import PrometheusMetric SelfRR = TypeVar("SelfRR", bound="ResourceRecommendation") diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py index 5ac42eb..5c88dc7 100644 --- a/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py @@ -26,7 +26,7 @@ class PrometheusClusterLoader(BaseClusterLoader): # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it def __init__(self) -> None: - self._prometheus_connector = PrometheusConnector() + self.prometheus = PrometheusConnector() if not settings.prometheus_url: raise CriticalRunnerException( "Prometheus URL is not provided. " @@ -34,33 +34,33 @@ class PrometheusClusterLoader(BaseClusterLoader): "Please provide the URL with `--prometheus-url` flag." ) - self._prometheus_connector.connect(settings.prometheus_url) + self.prometheus.connect(settings.prometheus_url) async def list_clusters(self) -> Optional[list[str]]: - if settings.prometheus_label is None: + if settings.prometheus_cluster_label is None: logger.info("Assuming that Prometheus contains only one cluster.") logger.info("If you have multiple clusters in Prometheus, please provide the `-l` flag.") return None clusters = await self.prometheus.loader.query( f""" - avg by({settings.prometheus_label}) ( + avg by({settings.prometheus_cluster_label}) ( kube_pod_container_resource_limits ) """ ) - return [cluster["metric"][settings.prometheus_label] for cluster in clusters["data"]["result"]] + return [cluster["metric"][settings.prometheus_cluster_label] for cluster in clusters] @cache def get_workload_loader(self, cluster: str) -> PrometheusWorkloadLoader: - return PrometheusWorkloadLoader(cluster, self._prometheus_connector) + return PrometheusWorkloadLoader(cluster, self.prometheus) def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters # so in case of multiple clusters in one Prometheus (centralized version) # for each cluster we will have the same PrometheusConnector (keyed by None) - return self._prometheus_connector + return self.prometheus class PrometheusWorkloadLoader(BaseWorkloadLoader): @@ -69,9 +69,7 @@ class PrometheusWorkloadLoader(BaseWorkloadLoader): def __init__(self, cluster: str, prometheus: PrometheusConnector) -> None: self.cluster = cluster self.prometheus = prometheus - self.loaders = [loader(prometheus) for loader in self.workloads] - - self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label() + self.loaders = [loader(cluster, prometheus) for loader in self.workloads] async def list_workloads(self) -> list[K8sWorkload]: workloads = list( @@ -98,11 +96,13 @@ class PrometheusWorkloadLoader(BaseWorkloadLoader): return workloads async def __list_hpa(self) -> dict[HPAKey, HPAData]: + cluster_selector = f"{settings.prometheus_label}={self.cluster}" if settings.prometheus_label else "" + hpa_metrics, max_replicas, min_replicas, target_metrics = await asyncio.gather( - self.prometheus.loader.query("kube_horizontalpodautoscaler_info"), - self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_max_replicas"), - self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_min_replicas"), - self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_target_metric"), + self.prometheus.loader.query("kube_horizontalpodautoscaler_info" + cluster_selector), + self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_max_replicas" + cluster_selector), + self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_min_replicas" + cluster_selector), + self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_target_metric" + cluster_selector), ) max_replicas_dict = { diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py index 9534261..cc91a3f 100644 --- a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py @@ -1,27 +1,20 @@ import abc -import asyncio -from collections import defaultdict import logging -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Iterable, Literal, Optional, Union -from kubernetes import client # type: ignore -from kubernetes.client.api_client import ApiClient # type: ignore +from typing import Literal, Union + from kubernetes.client.models import ( # type: ignore - V1Container, V1DaemonSet, V1Deployment, V1Job, V1Pod, - V1PodList, V1StatefulSet, ) from robusta_krr.core.models.config import settings from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType -from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral logger = logging.getLogger("krr") @@ -37,14 +30,24 @@ class BaseKindLoader(abc.ABC): kinds: list[KindLiteral] = [] - def __init__(self, prometheus: PrometheusConnector) -> None: + def __init__(self, cluster: str, prometheus: PrometheusConnector) -> None: + self.cluster = cluster self.prometheus = prometheus - self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label() @property def kinds_to_scan(self) -> list[KindLiteral]: return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds + @property + def cluster_selector(self) -> str: + if settings.prometheus_label is not None: + return f'{settings.prometheus_cluster_label}="{settings.prometheus_label}",' + + if settings.prometheus_cluster_label is None: + return "" + + return f'{settings.prometheus_cluster_label}="{self.cluster}",' if self.cluster else "" + @abc.abstractmethod def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: pass @@ -54,10 +57,10 @@ class BaseKindLoader(abc.ABC): f""" avg by(resource) ( kube_pod_container_resource_limits{{ + {self.cluster_selector} namespace="{namespace}", pod=~"{'|'.join(pods)}", container="{container_name}" - {self.cluster_selector} }} ) """ @@ -66,10 +69,10 @@ class BaseKindLoader(abc.ABC): f""" avg by(resource) ( kube_pod_container_resource_requests{{ + {self.cluster_selector} namespace="{namespace}", pod=~"{'|'.join(pods)}", container="{container_name}" - {self.cluster_selector} }} ) """ @@ -94,8 +97,8 @@ class BaseKindLoader(abc.ABC): f""" count by (container) ( kube_pod_container_info{{ - pod=~"{'|'.join(pods)}" {self.cluster_selector} + pod=~"{'|'.join(pods)}" }} ) """ diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py index e757af4..942d3fa 100644 --- a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py @@ -57,9 +57,9 @@ class DoubleParentLoader(BaseKindLoader): f""" count by (namespace, owner_name, {subowner_label}, owner_kind) ( {metric_name} {{ + {self.cluster_selector} {namespace_selector}, owner_kind=~"{'|'.join(kinds)}" - {self.cluster_selector} }} ) """ @@ -95,10 +95,10 @@ class DoubleParentLoader(BaseKindLoader): f""" count by (namespace, owner_name, owner_kind, pod) ( kube_pod_owner{{ + {self.cluster_selector} namespace="{namespace}", owner_name=~"{'|'.join(subowner_names)}", owner_kind="{subowner_kind}" - {self.cluster_selector} }} ) """ @@ -111,7 +111,7 @@ class DoubleParentLoader(BaseKindLoader): return [ K8sWorkload( - cluster=self.prometheus.cluster, + cluster=self.cluster, namespace=namespace, name=name, kind=kind, diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py index 233ebb2..00d3023 100644 --- a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py @@ -26,9 +26,9 @@ class SimpleParentLoader(BaseKindLoader): f""" count by (namespace, owner_name, owner_kind, pod) ( kube_pod_owner{{ + {self.cluster_selector} {namespace_selector}, owner_kind=~"{'|'.join(self.kinds_to_scan)}" - {self.cluster_selector} }} ) """ @@ -50,9 +50,9 @@ class SimpleParentLoader(BaseKindLoader): f""" count by (namespace, job_name) ( kube_job_owner{{ + {self.cluster_selector} {namespace_selector}, owner_kind="CronJob" - {self.cluster_selector} }} ) """ @@ -72,7 +72,7 @@ class SimpleParentLoader(BaseKindLoader): return [ K8sWorkload( - cluster=self.prometheus.cluster, + cluster=self.cluster, namespace=namespace, name=name, kind=kind, diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index 807a11f..67266dd 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -75,18 +75,6 @@ class PrometheusMetric(BaseMetric): if self.pods_batch_size is not None and self.pods_batch_size <= 0: raise ValueError("pods_batch_size must be positive") - @staticmethod - def get_prometheus_cluster_label() -> str: - """ - Generates the cluster label for querying a centralized Prometheus - - Returns: - str: a promql safe label string for querying the cluster. - """ - if settings.prometheus_cluster_label is None: - return "" - return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"' - @abc.abstractmethod def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: """ diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu.py b/robusta_krr/core/integrations/prometheus/metrics/cpu.py index 7257654..b30a4d9 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu.py @@ -12,15 +12,14 @@ class CPULoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max( rate( container_cpu_usage_seconds_total{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }}[{step}] ) ) by (container, pod, job) @@ -38,17 +37,16 @@ def PercentileCPULoader(percentile: float) -> type[PrometheusMetric]: class PercentileCPULoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" quantile_over_time( {round(percentile / 100, 2)}, max( rate( container_cpu_usage_seconds_total{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }}[{step}] ) ) by (container, pod, job) @@ -66,17 +64,18 @@ class CPUAmountLoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() - return f""" + res = f""" count_over_time( max( container_cpu_usage_seconds_total{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) [{duration}:{step}] ) """ + + raise(Exception(res)) diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory.py b/robusta_krr/core/integrations/prometheus/metrics/memory.py index 97fb2a6..d6d5baa 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory.py @@ -12,14 +12,13 @@ class MemoryLoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max( container_memory_working_set_bytes{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) """ @@ -32,15 +31,14 @@ class MaxMemoryLoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max_over_time( max( container_memory_working_set_bytes{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) [{duration}:{step}] @@ -55,15 +53,14 @@ class MemoryAmountLoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" count_over_time( max( container_memory_working_set_bytes{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) [{duration}:{step}] @@ -81,27 +78,26 @@ class MaxOOMKilledMemoryLoader(PrometheusMetric): def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max_over_time( max( max( kube_pod_container_resource_limits{{ + {object.cluster_selector} resource="memory", namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (pod, container, job) * on(pod, container, job) group_left(reason) max( kube_pod_container_status_last_terminated_reason{{ + {object.cluster_selector} reason="OOMKilled", namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (pod, container, job, reason) ) by (container, pod, job) diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index c5d375e..2128903 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -71,6 +71,18 @@ class K8sWorkload(pd.BaseModel): def add_warning(self, warning: PodWarning) -> None: self.warnings.add(warning) + @property + def cluster_selector(self) -> str: + from robusta_krr.core.models.config import settings + + if settings.prometheus_label is not None: + return f'{settings.prometheus_cluster_label}="{settings.prometheus_label}",' + + if settings.prometheus_cluster_label is None: + return "" + + return f'{settings.prometheus_cluster_label}="{self.cluster}",' if self.cluster else "" + @property def current_pods_count(self) -> int: return len([pod for pod in self.pods if not pod.deleted]) -- cgit v1.2.3