From 0f13099317cab3d5f76db093c82d80b35a6946a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Fri, 21 Apr 2023 12:43:45 +0300 Subject: Rework CPU gathering metric --- robusta_krr/core/integrations/prometheus.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus.py index 39c7f00..ced865f 100644 --- a/robusta_krr/core/integrations/prometheus.py +++ b/robusta_krr/core/integrations/prometheus.py @@ -115,15 +115,16 @@ class PrometheusLoader(Configurable): ) -> ResourceHistoryData: self.debug(f"Gathering data for {object} and {resource}") + step = f"{int(timeframe.total_seconds()) // 60}m" if resource == ResourceType.CPU: result = await asyncio.gather( *[ asyncio.to_thread( self.prometheus.custom_query_range, - query=f'sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', + query=f'sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}]))', start_time=datetime.datetime.now() - period, end_time=datetime.datetime.now(), - step=f"{int(timeframe.total_seconds()) // 60}m", + step=step, ) for pod in object.pods ] @@ -136,7 +137,7 @@ class PrometheusLoader(Configurable): query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', start_time=datetime.datetime.now() - period, end_time=datetime.datetime.now(), - step=f"{int(timeframe.total_seconds()) // 60}m", + step=step, ) for pod in object.pods ] -- cgit v1.2.3 From 2542bb1b5ef059935ca152cdd5998ebb3041f155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Fri, 21 Apr 2023 14:49:57 +0300 Subject: Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3c2c35e..9dd6168 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Robusta KRR uses the following Prometheus queries to gather usage data: - CPU Usage: ``` - sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}) + sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}])) ``` - Memory Usage: -- cgit v1.2.3 From 9c85cb67ea10c5cd807665ef25ad05b19dcc9a43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 26 Apr 2023 10:36:46 +0300 Subject: Refactor the way metrics are collected, fix memory metric query --- robusta_krr/core/integrations/prometheus.py | 156 --------------------- .../core/integrations/prometheus/__init__.py | 1 + robusta_krr/core/integrations/prometheus/loader.py | 121 ++++++++++++++++ .../integrations/prometheus/metrics/__init__.py | 3 + .../integrations/prometheus/metrics/base_metric.py | 66 +++++++++ .../integrations/prometheus/metrics/cpu_metric.py | 9 ++ .../prometheus/metrics/memory_metric.py | 9 ++ robusta_krr/core/runner.py | 2 +- 8 files changed, 210 insertions(+), 157 deletions(-) delete mode 100644 robusta_krr/core/integrations/prometheus.py create mode 100644 robusta_krr/core/integrations/prometheus/__init__.py create mode 100644 robusta_krr/core/integrations/prometheus/loader.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/__init__.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/base_metric.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/memory_metric.py diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus.py deleted file mode 100644 index ced865f..0000000 --- a/robusta_krr/core/integrations/prometheus.py +++ /dev/null @@ -1,156 +0,0 @@ -import asyncio -import datetime -from decimal import Decimal -from typing import Optional, no_type_check - -import requests -from kubernetes import config as k8s_config -from kubernetes.client import ApiClient -from prometheus_api_client import PrometheusConnect, Retry -from requests.adapters import HTTPAdapter -from requests.exceptions import ConnectionError, HTTPError - -from robusta_krr.core.abstract.strategies import ResourceHistoryData -from robusta_krr.core.models.config import Config -from robusta_krr.core.models.objects import K8sObjectData -from robusta_krr.core.models.result import ResourceType -from robusta_krr.utils.configurable import Configurable -from robusta_krr.utils.service_discovery import ServiceDiscovery - - -class PrometheusDiscovery(ServiceDiscovery): - def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: - return super().find_url( - selectors=[ - "app=kube-prometheus-stack-prometheus", - "app=prometheus,component=server", - "app=prometheus-server", - "app=prometheus-operator-prometheus", - "app=prometheus-msteams", - "app=rancher-monitoring-prometheus", - "app=prometheus-prometheus", - ], - api_client=api_client, - ) - - -class PrometheusNotFound(Exception): - pass - - -class CustomPrometheusConnect(PrometheusConnect): - @no_type_check - def __init__( - self, - url: str = "http://127.0.0.1:9090", - headers: dict = None, - disable_ssl: bool = False, - retry: Retry = None, - auth: tuple = None, - ): - super().__init__(url, headers, disable_ssl, retry, auth) - self._session = requests.Session() - self._session.mount(self.url, HTTPAdapter(max_retries=retry, pool_maxsize=10, pool_block=True)) - - -class PrometheusLoader(Configurable): - def __init__( - self, - config: Config, - *, - cluster: Optional[str] = None, - ) -> None: - super().__init__(config=config) - - self.debug(f"Initializing PrometheusLoader for {cluster or 'default'} cluster") - - self.auth_header = self.config.prometheus_auth_header - self.ssl_enabled = self.config.prometheus_ssl_enabled - - self.api_client = k8s_config.new_client_from_config(context=cluster) if cluster is not None else None - self.prometheus_discovery = PrometheusDiscovery(config=self.config) - - self.url = self.config.prometheus_url - self.url = self.url or self.prometheus_discovery.find_prometheus_url(api_client=self.api_client) - - if not self.url: - raise PrometheusNotFound( - f"Prometheus url could not be found while scanning in {cluster or 'default'} cluster" - ) - - headers = {} - - if self.auth_header: - headers = {"Authorization": self.auth_header} - elif not self.config.inside_cluster: - self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) - - self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) - self._check_prometheus_connection() - - self.debug(f"PrometheusLoader initialized for {cluster or 'default'} cluster") - - def _check_prometheus_connection(self): - try: - response = self.prometheus._session.get( - f"{self.prometheus.url}/api/v1/query", - verify=self.prometheus.ssl_verification, - headers=self.prometheus.headers, - # This query should return empty results, but is correct - params={"query": "example"}, - ) - response.raise_for_status() - except (ConnectionError, HTTPError) as e: - raise PrometheusNotFound( - f"Couldn't connect to Prometheus found under {self.prometheus.url}\nCaused by {e.__class__.__name__}: {e})" - ) from e - - async def gather_data( - self, - object: K8sObjectData, - resource: ResourceType, - period: datetime.timedelta, - *, - timeframe: datetime.timedelta = datetime.timedelta(minutes=30), - ) -> ResourceHistoryData: - self.debug(f"Gathering data for {object} and {resource}") - - step = f"{int(timeframe.total_seconds()) // 60}m" - if resource == ResourceType.CPU: - result = await asyncio.gather( - *[ - asyncio.to_thread( - self.prometheus.custom_query_range, - query=f'sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}]))', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=step, - ) - for pod in object.pods - ] - ) - elif resource == ResourceType.Memory: - result = await asyncio.gather( - *[ - asyncio.to_thread( - self.prometheus.custom_query_range, - query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=step, - ) - for pod in object.pods - ] - ) - else: - raise ValueError(f"Unknown resource type: {resource}") - - if result == []: - return {pod: [] for pod in object.pods} - - pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} - return { - pod: [Decimal(value) for _, value in pod_result[0]["values"]] - for pod, pod_result in pod_results.items() - if pod_result != [] - } diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py new file mode 100644 index 0000000..e43e8aa --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/__init__.py @@ -0,0 +1 @@ +from .loader import CustomPrometheusConnect, PrometheusDiscovery, PrometheusLoader, PrometheusNotFound diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py new file mode 100644 index 0000000..4a79005 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -0,0 +1,121 @@ +import datetime +from typing import Optional, no_type_check + +import requests +from kubernetes import config as k8s_config +from kubernetes.client import ApiClient +from prometheus_api_client import PrometheusConnect, Retry +from requests.adapters import HTTPAdapter +from requests.exceptions import ConnectionError, HTTPError + +from robusta_krr.core.abstract.strategies import ResourceHistoryData +from robusta_krr.core.models.config import Config +from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.result import ResourceType +from robusta_krr.utils.configurable import Configurable +from robusta_krr.utils.service_discovery import ServiceDiscovery + +from .metrics import BaseMetricLoader + + +class PrometheusDiscovery(ServiceDiscovery): + def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + return super().find_url( + selectors=[ + "app=kube-prometheus-stack-prometheus", + "app=prometheus,component=server", + "app=prometheus-server", + "app=prometheus-operator-prometheus", + "app=prometheus-msteams", + "app=rancher-monitoring-prometheus", + "app=prometheus-prometheus", + ], + api_client=api_client, + ) + + +class PrometheusNotFound(Exception): + pass + + +class CustomPrometheusConnect(PrometheusConnect): + @no_type_check + def __init__( + self, + url: str = "http://127.0.0.1:9090", + headers: dict = None, + disable_ssl: bool = False, + retry: Retry = None, + auth: tuple = None, + ): + super().__init__(url, headers, disable_ssl, retry, auth) + self._session = requests.Session() + self._session.mount(self.url, HTTPAdapter(max_retries=retry, pool_maxsize=10, pool_block=True)) + + +class PrometheusLoader(Configurable): + def __init__( + self, + config: Config, + *, + cluster: Optional[str] = None, + ) -> None: + super().__init__(config=config) + + self.debug(f"Initializing PrometheusLoader for {cluster or 'default'} cluster") + + self.auth_header = self.config.prometheus_auth_header + self.ssl_enabled = self.config.prometheus_ssl_enabled + + self.api_client = k8s_config.new_client_from_config(context=cluster) if cluster is not None else None + self.prometheus_discovery = PrometheusDiscovery(config=self.config) + + self.url = self.config.prometheus_url + self.url = self.url or self.prometheus_discovery.find_prometheus_url(api_client=self.api_client) + + if not self.url: + raise PrometheusNotFound( + f"Prometheus url could not be found while scanning in {cluster or 'default'} cluster" + ) + + headers = {} + + if self.auth_header: + headers = {"Authorization": self.auth_header} + elif not self.config.inside_cluster: + self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) + + self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) + self._check_prometheus_connection() + + self.debug(f"PrometheusLoader initialized for {cluster or 'default'} cluster") + + def _check_prometheus_connection(self): + try: + response = self.prometheus._session.get( + f"{self.prometheus.url}/api/v1/query", + verify=self.prometheus.ssl_verification, + headers=self.prometheus.headers, + # This query should return empty results, but is correct + params={"query": "example"}, + ) + response.raise_for_status() + except (ConnectionError, HTTPError) as e: + raise PrometheusNotFound( + f"Couldn't connect to Prometheus found under {self.prometheus.url}\nCaused by {e.__class__.__name__}: {e})" + ) from e + + async def gather_data( + self, + object: K8sObjectData, + resource: ResourceType, + period: datetime.timedelta, + *, + timeframe: datetime.timedelta = datetime.timedelta(minutes=30), + ) -> ResourceHistoryData: + self.debug(f"Gathering data for {object} and {resource}") + + step = f"{int(timeframe.total_seconds()) // 60}m" + MetricLoaderType = BaseMetricLoader.get_by_resource(resource) + metric_loader = MetricLoaderType(self.prometheus) + return await metric_loader.load_data(object, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics/__init__.py b/robusta_krr/core/integrations/prometheus/metrics/__init__.py new file mode 100644 index 0000000..0852b67 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/__init__.py @@ -0,0 +1,3 @@ +from .base_metric import BaseMetricLoader, bind_metric +from .cpu_metric import CPUMetricLoader +from .memory_metric import MemoryMetricLoader diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py new file mode 100644 index 0000000..5b91db2 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import abc +import asyncio +import datetime +from decimal import Decimal +from typing import TYPE_CHECKING, Callable, TypeVar + +if TYPE_CHECKING: + from robusta_krr.core.abstract.strategies import ResourceHistoryData + from robusta_krr.core.models.objects import K8sObjectData + + from ..loader import CustomPrometheusConnect + +REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {} + + +class BaseMetricLoader(abc.ABC): + def __init__(self, prometheus: CustomPrometheusConnect) -> None: + self.prometheus = prometheus + + @abc.abstractmethod + def get_query(self, namespace: str, pod: str, container: str) -> str: + ... + + async def load_data(self, object: K8sObjectData, period: datetime.timedelta, step: str) -> ResourceHistoryData: + result = await asyncio.gather( + *[ + asyncio.to_thread( + self.prometheus.custom_query_range, + query=self.get_query(object.namespace, pod, object.container), + start_time=datetime.datetime.now() - period, + end_time=datetime.datetime.now(), + step=step, + ) + for pod in object.pods + ] + ) + + if result == []: + return {pod: [] for pod in object.pods} + + pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} + return { + pod: [Decimal(value) for _, value in pod_result[0]["values"]] + for pod, pod_result in pod_results.items() + if pod_result != [] + } + + @staticmethod + def get_by_resource(resource: str) -> type[BaseMetricLoader]: + try: + return REGISTERED_METRICS[resource] + except KeyError as e: + raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e + + +Self = TypeVar("Self", bound=BaseMetricLoader) + + +def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]: + def decorator(cls: type[Self]) -> type[Self]: + REGISTERED_METRICS[resource] = cls + return cls + + return decorator diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py new file mode 100644 index 0000000..a8c2b8e --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -0,0 +1,9 @@ +from robusta_krr.core.models.allocations import ResourceType + +from .base_metric import BaseMetricLoader, bind_metric + + +@bind_metric(ResourceType.CPU) +class CPUMetricLoader(BaseMetricLoader): + def get_query(self, namespace: str, pod: str, container: str) -> str: + return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[1m]))' diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py new file mode 100644 index 0000000..4e018ae --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -0,0 +1,9 @@ +from robusta_krr.core.models.allocations import ResourceType + +from .base_metric import BaseMetricLoader, bind_metric + + +@bind_metric(ResourceType.Memory) +class MemoryMetricLoader(BaseMetricLoader): + def get_query(self, namespace: str, pod: str, container: str) -> str: + return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}})' diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 196894d..0737cf0 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -5,7 +5,7 @@ from typing import Optional, Union from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader -from robusta_krr.core.integrations.prometheus import PrometheusLoader +from robusta_krr.core.integrations.prometheus.loader import PrometheusLoader from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result -- cgit v1.2.3 From 47ece52acd2d9d810cb72101139759427eb379d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Fri, 21 Apr 2023 12:43:45 +0300 Subject: Rework CPU gathering metric --- robusta_krr/core/integrations/prometheus.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus.py index 39c7f00..ced865f 100644 --- a/robusta_krr/core/integrations/prometheus.py +++ b/robusta_krr/core/integrations/prometheus.py @@ -115,15 +115,16 @@ class PrometheusLoader(Configurable): ) -> ResourceHistoryData: self.debug(f"Gathering data for {object} and {resource}") + step = f"{int(timeframe.total_seconds()) // 60}m" if resource == ResourceType.CPU: result = await asyncio.gather( *[ asyncio.to_thread( self.prometheus.custom_query_range, - query=f'sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', + query=f'sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}]))', start_time=datetime.datetime.now() - period, end_time=datetime.datetime.now(), - step=f"{int(timeframe.total_seconds()) // 60}m", + step=step, ) for pod in object.pods ] @@ -136,7 +137,7 @@ class PrometheusLoader(Configurable): query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', start_time=datetime.datetime.now() - period, end_time=datetime.datetime.now(), - step=f"{int(timeframe.total_seconds()) // 60}m", + step=step, ) for pod in object.pods ] -- cgit v1.2.3 From c3d465223ef60baa75d902fa891f848058fc64aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Fri, 21 Apr 2023 14:49:57 +0300 Subject: Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7545e94..701ccd5 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Robusta KRR uses the following Prometheus queries to gather usage data: - CPU Usage: ``` - sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}) + sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}])) ``` - Memory Usage: -- cgit v1.2.3 From 32fd0b042e6b07e0a66241588e81d791afa08786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 26 Apr 2023 10:36:46 +0300 Subject: Refactor the way metrics are collected, fix memory metric query --- robusta_krr/core/integrations/prometheus.py | 156 --------------------- .../core/integrations/prometheus/__init__.py | 1 + robusta_krr/core/integrations/prometheus/loader.py | 121 ++++++++++++++++ .../integrations/prometheus/metrics/__init__.py | 3 + .../integrations/prometheus/metrics/base_metric.py | 66 +++++++++ .../integrations/prometheus/metrics/cpu_metric.py | 9 ++ .../prometheus/metrics/memory_metric.py | 9 ++ robusta_krr/core/runner.py | 2 +- 8 files changed, 210 insertions(+), 157 deletions(-) delete mode 100644 robusta_krr/core/integrations/prometheus.py create mode 100644 robusta_krr/core/integrations/prometheus/__init__.py create mode 100644 robusta_krr/core/integrations/prometheus/loader.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/__init__.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/base_metric.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py create mode 100644 robusta_krr/core/integrations/prometheus/metrics/memory_metric.py diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus.py deleted file mode 100644 index ced865f..0000000 --- a/robusta_krr/core/integrations/prometheus.py +++ /dev/null @@ -1,156 +0,0 @@ -import asyncio -import datetime -from decimal import Decimal -from typing import Optional, no_type_check - -import requests -from kubernetes import config as k8s_config -from kubernetes.client import ApiClient -from prometheus_api_client import PrometheusConnect, Retry -from requests.adapters import HTTPAdapter -from requests.exceptions import ConnectionError, HTTPError - -from robusta_krr.core.abstract.strategies import ResourceHistoryData -from robusta_krr.core.models.config import Config -from robusta_krr.core.models.objects import K8sObjectData -from robusta_krr.core.models.result import ResourceType -from robusta_krr.utils.configurable import Configurable -from robusta_krr.utils.service_discovery import ServiceDiscovery - - -class PrometheusDiscovery(ServiceDiscovery): - def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: - return super().find_url( - selectors=[ - "app=kube-prometheus-stack-prometheus", - "app=prometheus,component=server", - "app=prometheus-server", - "app=prometheus-operator-prometheus", - "app=prometheus-msteams", - "app=rancher-monitoring-prometheus", - "app=prometheus-prometheus", - ], - api_client=api_client, - ) - - -class PrometheusNotFound(Exception): - pass - - -class CustomPrometheusConnect(PrometheusConnect): - @no_type_check - def __init__( - self, - url: str = "http://127.0.0.1:9090", - headers: dict = None, - disable_ssl: bool = False, - retry: Retry = None, - auth: tuple = None, - ): - super().__init__(url, headers, disable_ssl, retry, auth) - self._session = requests.Session() - self._session.mount(self.url, HTTPAdapter(max_retries=retry, pool_maxsize=10, pool_block=True)) - - -class PrometheusLoader(Configurable): - def __init__( - self, - config: Config, - *, - cluster: Optional[str] = None, - ) -> None: - super().__init__(config=config) - - self.debug(f"Initializing PrometheusLoader for {cluster or 'default'} cluster") - - self.auth_header = self.config.prometheus_auth_header - self.ssl_enabled = self.config.prometheus_ssl_enabled - - self.api_client = k8s_config.new_client_from_config(context=cluster) if cluster is not None else None - self.prometheus_discovery = PrometheusDiscovery(config=self.config) - - self.url = self.config.prometheus_url - self.url = self.url or self.prometheus_discovery.find_prometheus_url(api_client=self.api_client) - - if not self.url: - raise PrometheusNotFound( - f"Prometheus url could not be found while scanning in {cluster or 'default'} cluster" - ) - - headers = {} - - if self.auth_header: - headers = {"Authorization": self.auth_header} - elif not self.config.inside_cluster: - self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) - - self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) - self._check_prometheus_connection() - - self.debug(f"PrometheusLoader initialized for {cluster or 'default'} cluster") - - def _check_prometheus_connection(self): - try: - response = self.prometheus._session.get( - f"{self.prometheus.url}/api/v1/query", - verify=self.prometheus.ssl_verification, - headers=self.prometheus.headers, - # This query should return empty results, but is correct - params={"query": "example"}, - ) - response.raise_for_status() - except (ConnectionError, HTTPError) as e: - raise PrometheusNotFound( - f"Couldn't connect to Prometheus found under {self.prometheus.url}\nCaused by {e.__class__.__name__}: {e})" - ) from e - - async def gather_data( - self, - object: K8sObjectData, - resource: ResourceType, - period: datetime.timedelta, - *, - timeframe: datetime.timedelta = datetime.timedelta(minutes=30), - ) -> ResourceHistoryData: - self.debug(f"Gathering data for {object} and {resource}") - - step = f"{int(timeframe.total_seconds()) // 60}m" - if resource == ResourceType.CPU: - result = await asyncio.gather( - *[ - asyncio.to_thread( - self.prometheus.custom_query_range, - query=f'sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}]))', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=step, - ) - for pod in object.pods - ] - ) - elif resource == ResourceType.Memory: - result = await asyncio.gather( - *[ - asyncio.to_thread( - self.prometheus.custom_query_range, - query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=step, - ) - for pod in object.pods - ] - ) - else: - raise ValueError(f"Unknown resource type: {resource}") - - if result == []: - return {pod: [] for pod in object.pods} - - pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} - return { - pod: [Decimal(value) for _, value in pod_result[0]["values"]] - for pod, pod_result in pod_results.items() - if pod_result != [] - } diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py new file mode 100644 index 0000000..e43e8aa --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/__init__.py @@ -0,0 +1 @@ +from .loader import CustomPrometheusConnect, PrometheusDiscovery, PrometheusLoader, PrometheusNotFound diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py new file mode 100644 index 0000000..4a79005 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -0,0 +1,121 @@ +import datetime +from typing import Optional, no_type_check + +import requests +from kubernetes import config as k8s_config +from kubernetes.client import ApiClient +from prometheus_api_client import PrometheusConnect, Retry +from requests.adapters import HTTPAdapter +from requests.exceptions import ConnectionError, HTTPError + +from robusta_krr.core.abstract.strategies import ResourceHistoryData +from robusta_krr.core.models.config import Config +from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.result import ResourceType +from robusta_krr.utils.configurable import Configurable +from robusta_krr.utils.service_discovery import ServiceDiscovery + +from .metrics import BaseMetricLoader + + +class PrometheusDiscovery(ServiceDiscovery): + def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + return super().find_url( + selectors=[ + "app=kube-prometheus-stack-prometheus", + "app=prometheus,component=server", + "app=prometheus-server", + "app=prometheus-operator-prometheus", + "app=prometheus-msteams", + "app=rancher-monitoring-prometheus", + "app=prometheus-prometheus", + ], + api_client=api_client, + ) + + +class PrometheusNotFound(Exception): + pass + + +class CustomPrometheusConnect(PrometheusConnect): + @no_type_check + def __init__( + self, + url: str = "http://127.0.0.1:9090", + headers: dict = None, + disable_ssl: bool = False, + retry: Retry = None, + auth: tuple = None, + ): + super().__init__(url, headers, disable_ssl, retry, auth) + self._session = requests.Session() + self._session.mount(self.url, HTTPAdapter(max_retries=retry, pool_maxsize=10, pool_block=True)) + + +class PrometheusLoader(Configurable): + def __init__( + self, + config: Config, + *, + cluster: Optional[str] = None, + ) -> None: + super().__init__(config=config) + + self.debug(f"Initializing PrometheusLoader for {cluster or 'default'} cluster") + + self.auth_header = self.config.prometheus_auth_header + self.ssl_enabled = self.config.prometheus_ssl_enabled + + self.api_client = k8s_config.new_client_from_config(context=cluster) if cluster is not None else None + self.prometheus_discovery = PrometheusDiscovery(config=self.config) + + self.url = self.config.prometheus_url + self.url = self.url or self.prometheus_discovery.find_prometheus_url(api_client=self.api_client) + + if not self.url: + raise PrometheusNotFound( + f"Prometheus url could not be found while scanning in {cluster or 'default'} cluster" + ) + + headers = {} + + if self.auth_header: + headers = {"Authorization": self.auth_header} + elif not self.config.inside_cluster: + self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) + + self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) + self._check_prometheus_connection() + + self.debug(f"PrometheusLoader initialized for {cluster or 'default'} cluster") + + def _check_prometheus_connection(self): + try: + response = self.prometheus._session.get( + f"{self.prometheus.url}/api/v1/query", + verify=self.prometheus.ssl_verification, + headers=self.prometheus.headers, + # This query should return empty results, but is correct + params={"query": "example"}, + ) + response.raise_for_status() + except (ConnectionError, HTTPError) as e: + raise PrometheusNotFound( + f"Couldn't connect to Prometheus found under {self.prometheus.url}\nCaused by {e.__class__.__name__}: {e})" + ) from e + + async def gather_data( + self, + object: K8sObjectData, + resource: ResourceType, + period: datetime.timedelta, + *, + timeframe: datetime.timedelta = datetime.timedelta(minutes=30), + ) -> ResourceHistoryData: + self.debug(f"Gathering data for {object} and {resource}") + + step = f"{int(timeframe.total_seconds()) // 60}m" + MetricLoaderType = BaseMetricLoader.get_by_resource(resource) + metric_loader = MetricLoaderType(self.prometheus) + return await metric_loader.load_data(object, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics/__init__.py b/robusta_krr/core/integrations/prometheus/metrics/__init__.py new file mode 100644 index 0000000..0852b67 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/__init__.py @@ -0,0 +1,3 @@ +from .base_metric import BaseMetricLoader, bind_metric +from .cpu_metric import CPUMetricLoader +from .memory_metric import MemoryMetricLoader diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py new file mode 100644 index 0000000..5b91db2 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import abc +import asyncio +import datetime +from decimal import Decimal +from typing import TYPE_CHECKING, Callable, TypeVar + +if TYPE_CHECKING: + from robusta_krr.core.abstract.strategies import ResourceHistoryData + from robusta_krr.core.models.objects import K8sObjectData + + from ..loader import CustomPrometheusConnect + +REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {} + + +class BaseMetricLoader(abc.ABC): + def __init__(self, prometheus: CustomPrometheusConnect) -> None: + self.prometheus = prometheus + + @abc.abstractmethod + def get_query(self, namespace: str, pod: str, container: str) -> str: + ... + + async def load_data(self, object: K8sObjectData, period: datetime.timedelta, step: str) -> ResourceHistoryData: + result = await asyncio.gather( + *[ + asyncio.to_thread( + self.prometheus.custom_query_range, + query=self.get_query(object.namespace, pod, object.container), + start_time=datetime.datetime.now() - period, + end_time=datetime.datetime.now(), + step=step, + ) + for pod in object.pods + ] + ) + + if result == []: + return {pod: [] for pod in object.pods} + + pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} + return { + pod: [Decimal(value) for _, value in pod_result[0]["values"]] + for pod, pod_result in pod_results.items() + if pod_result != [] + } + + @staticmethod + def get_by_resource(resource: str) -> type[BaseMetricLoader]: + try: + return REGISTERED_METRICS[resource] + except KeyError as e: + raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e + + +Self = TypeVar("Self", bound=BaseMetricLoader) + + +def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]: + def decorator(cls: type[Self]) -> type[Self]: + REGISTERED_METRICS[resource] = cls + return cls + + return decorator diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py new file mode 100644 index 0000000..a8c2b8e --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -0,0 +1,9 @@ +from robusta_krr.core.models.allocations import ResourceType + +from .base_metric import BaseMetricLoader, bind_metric + + +@bind_metric(ResourceType.CPU) +class CPUMetricLoader(BaseMetricLoader): + def get_query(self, namespace: str, pod: str, container: str) -> str: + return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[1m]))' diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py new file mode 100644 index 0000000..4e018ae --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -0,0 +1,9 @@ +from robusta_krr.core.models.allocations import ResourceType + +from .base_metric import BaseMetricLoader, bind_metric + + +@bind_metric(ResourceType.Memory) +class MemoryMetricLoader(BaseMetricLoader): + def get_query(self, namespace: str, pod: str, container: str) -> str: + return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}})' diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 00dcbb9..1ef0e14 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -5,7 +5,7 @@ from typing import Optional, Union from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader -from robusta_krr.core.integrations.prometheus import PrometheusLoader +from robusta_krr.core.integrations.prometheus.loader import PrometheusLoader from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result -- cgit v1.2.3 From afbb08efb31031ffee80b919256c0a1015365520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 26 Apr 2023 13:00:33 +0300 Subject: Add warning if prometheus returned no data --- robusta_krr/core/integrations/prometheus/loader.py | 2 +- .../core/integrations/prometheus/metrics/base_metric.py | 14 +++++++++----- robusta_krr/core/runner.py | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 4a79005..8938496 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -117,5 +117,5 @@ class PrometheusLoader(Configurable): step = f"{int(timeframe.total_seconds()) // 60}m" MetricLoaderType = BaseMetricLoader.get_by_resource(resource) - metric_loader = MetricLoaderType(self.prometheus) + metric_loader = MetricLoaderType(self.config, self.prometheus) return await metric_loader.load_data(object, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py index 5b91db2..9d366c8 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -6,17 +6,20 @@ import datetime from decimal import Decimal from typing import TYPE_CHECKING, Callable, TypeVar -if TYPE_CHECKING: - from robusta_krr.core.abstract.strategies import ResourceHistoryData - from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.abstract.strategies import ResourceHistoryData +from robusta_krr.core.models.config import Config +from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.utils.configurable import Configurable +if TYPE_CHECKING: from ..loader import CustomPrometheusConnect REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {} -class BaseMetricLoader(abc.ABC): - def __init__(self, prometheus: CustomPrometheusConnect) -> None: +class BaseMetricLoader(Configurable, abc.ABC): + def __init__(self, config: Config, prometheus: CustomPrometheusConnect) -> None: + super().__init__(config) self.prometheus = prometheus @abc.abstractmethod @@ -38,6 +41,7 @@ class BaseMetricLoader(abc.ABC): ) if result == []: + self.warning(f"Prometheus returned no {self.__class__.__name__} metrics for {object}") return {pod: [] for pod in object.pods} pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 1ef0e14..00dcbb9 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -5,7 +5,7 @@ from typing import Optional, Union from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader -from robusta_krr.core.integrations.prometheus.loader import PrometheusLoader +from robusta_krr.core.integrations.prometheus import PrometheusLoader from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result -- cgit v1.2.3 From 3729eec24a8b3cec8fee2c89b5718ccd63fd4c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Thu, 27 Apr 2023 18:13:02 +0300 Subject: Add metric filtering to fix bug with double value --- robusta_krr/core/integrations/prometheus/loader.py | 3 +- .../integrations/prometheus/metrics/base_metric.py | 18 ++++++-- .../integrations/prometheus/metrics/cpu_metric.py | 2 +- .../prometheus/metrics/memory_metric.py | 51 ++++++++++++++++++++++ robusta_krr/core/runner.py | 2 +- 5 files changed, 69 insertions(+), 7 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 8938496..c0c1961 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -111,11 +111,10 @@ class PrometheusLoader(Configurable): resource: ResourceType, period: datetime.timedelta, *, - timeframe: datetime.timedelta = datetime.timedelta(minutes=30), + step: datetime.timedelta = datetime.timedelta(minutes=30), ) -> ResourceHistoryData: self.debug(f"Gathering data for {object} and {resource}") - step = f"{int(timeframe.total_seconds()) // 60}m" MetricLoaderType = BaseMetricLoader.get_by_resource(resource) metric_loader = MetricLoaderType(self.config, self.prometheus) return await metric_loader.load_data(object, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py index 9d366c8..407d3e3 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -26,11 +26,23 @@ class BaseMetricLoader(Configurable, abc.ABC): def get_query(self, namespace: str, pod: str, container: str) -> str: ... - async def load_data(self, object: K8sObjectData, period: datetime.timedelta, step: str) -> ResourceHistoryData: + async def query_prometheus( + self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta + ) -> list[dict]: + return await asyncio.to_thread( + self.prometheus.custom_query_range, + query=query, + start_time=start_time, + end_time=end_time, + step=f"{int(step.total_seconds()) // 60}m", + ) + + async def load_data( + self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + ) -> ResourceHistoryData: result = await asyncio.gather( *[ - asyncio.to_thread( - self.prometheus.custom_query_range, + self.query_prometheus( query=self.get_query(object.namespace, pod, object.container), start_time=datetime.datetime.now() - period, end_time=datetime.datetime.now(), diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index a8c2b8e..d2ad841 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -6,4 +6,4 @@ from .base_metric import BaseMetricLoader, bind_metric @bind_metric(ResourceType.CPU) class CPUMetricLoader(BaseMetricLoader): def get_query(self, namespace: str, pod: str, container: str) -> str: - return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[1m]))' + return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)' diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index 4e018ae..6d5bc0c 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -1,9 +1,60 @@ +import datetime +from typing import Any, Optional + from robusta_krr.core.models.allocations import ResourceType from .base_metric import BaseMetricLoader, bind_metric +PrometheusSeries = Any + @bind_metric(ResourceType.Memory) class MemoryMetricLoader(BaseMetricLoader): + @staticmethod + def get_target_name(series: PrometheusSeries) -> Optional[str]: + for label in ["container", "pod", "node"]: + if label in series.metric: + return series.metric[label] + return None + + @staticmethod + def filter_prom_jobs_results( + series_list_result: list[PrometheusSeries], + ) -> list[PrometheusSeries]: + """ + Because there might be multiple metrics with the same name, we need to filter them out. + + :param series_list_result: list of PrometheusSeries + """ + + if len(series_list_result) == 1: + return series_list_result + + target_names = { + MemoryMetricLoader.get_target_name(series) + for series in series_list_result + if MemoryMetricLoader.get_target_name(series) + } + return_list: list[PrometheusSeries] = [] + + # takes kubelet job if exists, return first job alphabetically if it doesn't + for target_name in target_names: + relevant_series = [ + series for series in series_list_result if MemoryMetricLoader.get_target_name(series) == target_name + ] + relevant_kubelet_metric = [series for series in relevant_series if series.metric.get("job") == "kubelet"] + if len(relevant_kubelet_metric) == 1: + return_list.append(relevant_kubelet_metric[0]) + continue + sorted_relevant_series = sorted(relevant_series, key=lambda series: series.metric.get("job"), reverse=False) + return_list.append(sorted_relevant_series[0]) + return return_list + + async def query_prometheus( + self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta + ) -> list[PrometheusSeries]: + result = await super().query_prometheus(query, start_time, end_time, step) + return self.filter_prom_jobs_results(result) + def get_query(self, namespace: str, pod: str, container: str) -> str: return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}})' diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 00dcbb9..f4d4d94 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -94,7 +94,7 @@ class Runner(Configurable): object, resource, self._strategy.settings.history_timedelta, - timeframe=self._strategy.settings.timeframe_timedelta, + step=self._strategy.settings.timeframe_timedelta, ) for resource in ResourceType ] -- cgit v1.2.3 From de328dfb7a904be91aab51355eadd7c118181dbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Fri, 5 May 2023 11:46:26 +0300 Subject: Group memory metric, fix filtering --- .../core/integrations/prometheus/metrics/memory_metric.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index 6d5bc0c..d471ccd 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -13,8 +13,8 @@ class MemoryMetricLoader(BaseMetricLoader): @staticmethod def get_target_name(series: PrometheusSeries) -> Optional[str]: for label in ["container", "pod", "node"]: - if label in series.metric: - return series.metric[label] + if label in series["metric"]: + return series["metric"][label] return None @staticmethod @@ -42,11 +42,11 @@ class MemoryMetricLoader(BaseMetricLoader): relevant_series = [ series for series in series_list_result if MemoryMetricLoader.get_target_name(series) == target_name ] - relevant_kubelet_metric = [series for series in relevant_series if series.metric.get("job") == "kubelet"] + relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"] if len(relevant_kubelet_metric) == 1: return_list.append(relevant_kubelet_metric[0]) continue - sorted_relevant_series = sorted(relevant_series, key=lambda series: series.metric.get("job"), reverse=False) + sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False) return_list.append(sorted_relevant_series[0]) return return_list @@ -57,4 +57,4 @@ class MemoryMetricLoader(BaseMetricLoader): return self.filter_prom_jobs_results(result) def get_query(self, namespace: str, pod: str, container: str) -> str: - return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}})' + return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)' -- cgit v1.2.3 From 823bc27454d7658bcc73afd47ee8562d171d9336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Tue, 9 May 2023 12:35:32 +0300 Subject: Move BaseFilteredMetric to abstraction, use at CPU --- .../prometheus/metrics/base_filtered_metric.py | 61 ++++++++++++++++++++++ .../integrations/prometheus/metrics/cpu_metric.py | 5 +- .../prometheus/metrics/memory_metric.py | 56 ++------------------ 3 files changed, 67 insertions(+), 55 deletions(-) create mode 100644 robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py new file mode 100644 index 0000000..80408c5 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py @@ -0,0 +1,61 @@ +import datetime +from typing import Any, Optional + +from .base_metric import BaseMetricLoader + +PrometheusSeries = Any + + +class BaseFilteredMetricLoader(BaseMetricLoader): + """ + This is the version of the BasicMetricLoader, that filters out data, + if multiple metrics with the same name were found. + + Searches for the kubelet metric. If not found - returns first one in alphabetical order. + """ + + @staticmethod + def get_target_name(series: PrometheusSeries) -> Optional[str]: + for label in ["container", "pod", "node"]: + if label in series["metric"]: + return series["metric"][label] + return None + + @staticmethod + def filter_prom_jobs_results( + series_list_result: list[PrometheusSeries], + ) -> list[PrometheusSeries]: + """ + Because there might be multiple metrics with the same name, we need to filter them out. + + :param series_list_result: list of PrometheusSeries + """ + + if len(series_list_result) == 1: + return series_list_result + + target_names = { + BaseFilteredMetricLoader.get_target_name(series) + for series in series_list_result + if BaseFilteredMetricLoader.get_target_name(series) + } + return_list: list[PrometheusSeries] = [] + + # takes kubelet job if exists, return first job alphabetically if it doesn't + for target_name in target_names: + relevant_series = [ + series for series in series_list_result if BaseFilteredMetricLoader.get_target_name(series) == target_name + ] + relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"] + if len(relevant_kubelet_metric) == 1: + return_list.append(relevant_kubelet_metric[0]) + continue + sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False) + return_list.append(sorted_relevant_series[0]) + return return_list + + async def query_prometheus( + self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta + ) -> list[PrometheusSeries]: + result = await super().query_prometheus(query, start_time, end_time, step) + return self.filter_prom_jobs_results(result) diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index d2ad841..ffbaa91 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -1,9 +1,10 @@ from robusta_krr.core.models.allocations import ResourceType -from .base_metric import BaseMetricLoader, bind_metric +from .base_metric import bind_metric +from .base_filtered_metric import BaseFilteredMetricLoader @bind_metric(ResourceType.CPU) -class CPUMetricLoader(BaseMetricLoader): +class CPUMetricLoader(BaseFilteredMetricLoader): def get_query(self, namespace: str, pod: str, container: str) -> str: return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)' diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index d471ccd..3355ed6 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -1,60 +1,10 @@ -import datetime -from typing import Any, Optional - from robusta_krr.core.models.allocations import ResourceType -from .base_metric import BaseMetricLoader, bind_metric - -PrometheusSeries = Any +from .base_metric import bind_metric +from .base_filtered_metric import BaseFilteredMetricLoader @bind_metric(ResourceType.Memory) -class MemoryMetricLoader(BaseMetricLoader): - @staticmethod - def get_target_name(series: PrometheusSeries) -> Optional[str]: - for label in ["container", "pod", "node"]: - if label in series["metric"]: - return series["metric"][label] - return None - - @staticmethod - def filter_prom_jobs_results( - series_list_result: list[PrometheusSeries], - ) -> list[PrometheusSeries]: - """ - Because there might be multiple metrics with the same name, we need to filter them out. - - :param series_list_result: list of PrometheusSeries - """ - - if len(series_list_result) == 1: - return series_list_result - - target_names = { - MemoryMetricLoader.get_target_name(series) - for series in series_list_result - if MemoryMetricLoader.get_target_name(series) - } - return_list: list[PrometheusSeries] = [] - - # takes kubelet job if exists, return first job alphabetically if it doesn't - for target_name in target_names: - relevant_series = [ - series for series in series_list_result if MemoryMetricLoader.get_target_name(series) == target_name - ] - relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"] - if len(relevant_kubelet_metric) == 1: - return_list.append(relevant_kubelet_metric[0]) - continue - sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False) - return_list.append(sorted_relevant_series[0]) - return return_list - - async def query_prometheus( - self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta - ) -> list[PrometheusSeries]: - result = await super().query_prometheus(query, start_time, end_time, step) - return self.filter_prom_jobs_results(result) - +class MemoryMetricLoader(BaseFilteredMetricLoader): def get_query(self, namespace: str, pod: str, container: str) -> str: return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)' -- cgit v1.2.3