diff options
| author | Pavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com> | 2023-05-09 12:40:17 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-05-09 12:40:17 +0300 |
| commit | 060a955bd523f508b4c7b3a1caa1b0b6bd5e9652 (patch) | |
| tree | 57cf6b4985c9590061d85b77fa48ece2fba926bc | |
| parent | 3acdf8a4e77abdb265c5dbf21d8a09ee1d26a659 (diff) | |
| parent | 823bc27454d7658bcc73afd47ee8562d171d9336 (diff) | |
Merge pull request #12 from robusta-dev/fix-CPU-metric
Improve metrics to be available for more people
| -rw-r--r-- | README.md | 2 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/__init__.py | 1 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/loader.py (renamed from robusta_krr/core/integrations/prometheus.py) | 47 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics/__init__.py | 3 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py | 61 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics/base_metric.py | 82 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py | 10 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics/memory_metric.py | 10 | ||||
| -rw-r--r-- | robusta_krr/core/runner.py | 2 |
9 files changed, 175 insertions, 43 deletions
@@ -85,7 +85,7 @@ Robusta KRR uses the following Prometheus queries to gather usage data: - CPU Usage: ``` - sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}) + sum(irate(container_cpu_usage_seconds_total{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}}[{step}])) ``` - Memory Usage: diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py new file mode 100644 index 0000000..e43e8aa --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/__init__.py @@ -0,0 +1 @@ +from .loader import CustomPrometheusConnect, PrometheusDiscovery, PrometheusLoader, PrometheusNotFound diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus/loader.py index 39c7f00..c0c1961 100644 --- a/robusta_krr/core/integrations/prometheus.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -1,6 +1,4 @@ -import asyncio import datetime -from decimal import Decimal from typing import Optional, no_type_check import requests @@ -17,6 +15,8 @@ from robusta_krr.core.models.result import ResourceType from robusta_krr.utils.configurable import Configurable from robusta_krr.utils.service_discovery import ServiceDiscovery +from .metrics import BaseMetricLoader + class PrometheusDiscovery(ServiceDiscovery): def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: @@ -111,45 +111,10 @@ class PrometheusLoader(Configurable): resource: ResourceType, period: datetime.timedelta, *, - timeframe: datetime.timedelta = datetime.timedelta(minutes=30), + step: datetime.timedelta = datetime.timedelta(minutes=30), ) -> ResourceHistoryData: self.debug(f"Gathering data for {object} and {resource}") - if resource == ResourceType.CPU: - result = await asyncio.gather( - *[ - asyncio.to_thread( - self.prometheus.custom_query_range, - query=f'sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=f"{int(timeframe.total_seconds()) // 60}m", - ) - for pod in object.pods - ] - ) - elif resource == ResourceType.Memory: - result = await asyncio.gather( - *[ - asyncio.to_thread( - self.prometheus.custom_query_range, - query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=f"{int(timeframe.total_seconds()) // 60}m", - ) - for pod in object.pods - ] - ) - else: - raise ValueError(f"Unknown resource type: {resource}") - - if result == []: - return {pod: [] for pod in object.pods} - - pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} - return { - pod: [Decimal(value) for _, value in pod_result[0]["values"]] - for pod, pod_result in pod_results.items() - if pod_result != [] - } + MetricLoaderType = BaseMetricLoader.get_by_resource(resource) + metric_loader = MetricLoaderType(self.config, self.prometheus) + return await metric_loader.load_data(object, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics/__init__.py b/robusta_krr/core/integrations/prometheus/metrics/__init__.py new file mode 100644 index 0000000..0852b67 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/__init__.py @@ -0,0 +1,3 @@ +from .base_metric import BaseMetricLoader, bind_metric +from .cpu_metric import CPUMetricLoader +from .memory_metric import MemoryMetricLoader diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py new file mode 100644 index 0000000..80408c5 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py @@ -0,0 +1,61 @@ +import datetime +from typing import Any, Optional + +from .base_metric import BaseMetricLoader + +PrometheusSeries = Any + + +class BaseFilteredMetricLoader(BaseMetricLoader): + """ + This is the version of the BasicMetricLoader, that filters out data, + if multiple metrics with the same name were found. + + Searches for the kubelet metric. If not found - returns first one in alphabetical order. + """ + + @staticmethod + def get_target_name(series: PrometheusSeries) -> Optional[str]: + for label in ["container", "pod", "node"]: + if label in series["metric"]: + return series["metric"][label] + return None + + @staticmethod + def filter_prom_jobs_results( + series_list_result: list[PrometheusSeries], + ) -> list[PrometheusSeries]: + """ + Because there might be multiple metrics with the same name, we need to filter them out. + + :param series_list_result: list of PrometheusSeries + """ + + if len(series_list_result) == 1: + return series_list_result + + target_names = { + BaseFilteredMetricLoader.get_target_name(series) + for series in series_list_result + if BaseFilteredMetricLoader.get_target_name(series) + } + return_list: list[PrometheusSeries] = [] + + # takes kubelet job if exists, return first job alphabetically if it doesn't + for target_name in target_names: + relevant_series = [ + series for series in series_list_result if BaseFilteredMetricLoader.get_target_name(series) == target_name + ] + relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"] + if len(relevant_kubelet_metric) == 1: + return_list.append(relevant_kubelet_metric[0]) + continue + sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False) + return_list.append(sorted_relevant_series[0]) + return return_list + + async def query_prometheus( + self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta + ) -> list[PrometheusSeries]: + result = await super().query_prometheus(query, start_time, end_time, step) + return self.filter_prom_jobs_results(result) diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py new file mode 100644 index 0000000..407d3e3 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import abc +import asyncio +import datetime +from decimal import Decimal +from typing import TYPE_CHECKING, Callable, TypeVar + +from robusta_krr.core.abstract.strategies import ResourceHistoryData +from robusta_krr.core.models.config import Config +from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.utils.configurable import Configurable + +if TYPE_CHECKING: + from ..loader import CustomPrometheusConnect + +REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {} + + +class BaseMetricLoader(Configurable, abc.ABC): + def __init__(self, config: Config, prometheus: CustomPrometheusConnect) -> None: + super().__init__(config) + self.prometheus = prometheus + + @abc.abstractmethod + def get_query(self, namespace: str, pod: str, container: str) -> str: + ... + + async def query_prometheus( + self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta + ) -> list[dict]: + return await asyncio.to_thread( + self.prometheus.custom_query_range, + query=query, + start_time=start_time, + end_time=end_time, + step=f"{int(step.total_seconds()) // 60}m", + ) + + async def load_data( + self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + ) -> ResourceHistoryData: + result = await asyncio.gather( + *[ + self.query_prometheus( + query=self.get_query(object.namespace, pod, object.container), + start_time=datetime.datetime.now() - period, + end_time=datetime.datetime.now(), + step=step, + ) + for pod in object.pods + ] + ) + + if result == []: + self.warning(f"Prometheus returned no {self.__class__.__name__} metrics for {object}") + return {pod: [] for pod in object.pods} + + pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} + return { + pod: [Decimal(value) for _, value in pod_result[0]["values"]] + for pod, pod_result in pod_results.items() + if pod_result != [] + } + + @staticmethod + def get_by_resource(resource: str) -> type[BaseMetricLoader]: + try: + return REGISTERED_METRICS[resource] + except KeyError as e: + raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e + + +Self = TypeVar("Self", bound=BaseMetricLoader) + + +def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]: + def decorator(cls: type[Self]) -> type[Self]: + REGISTERED_METRICS[resource] = cls + return cls + + return decorator diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py new file mode 100644 index 0000000..ffbaa91 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -0,0 +1,10 @@ +from robusta_krr.core.models.allocations import ResourceType + +from .base_metric import bind_metric +from .base_filtered_metric import BaseFilteredMetricLoader + + +@bind_metric(ResourceType.CPU) +class CPUMetricLoader(BaseFilteredMetricLoader): + def get_query(self, namespace: str, pod: str, container: str) -> str: + return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)' diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py new file mode 100644 index 0000000..3355ed6 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -0,0 +1,10 @@ +from robusta_krr.core.models.allocations import ResourceType + +from .base_metric import bind_metric +from .base_filtered_metric import BaseFilteredMetricLoader + + +@bind_metric(ResourceType.Memory) +class MemoryMetricLoader(BaseFilteredMetricLoader): + def get_query(self, namespace: str, pod: str, container: str) -> str: + return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)' diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 00dcbb9..f4d4d94 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -94,7 +94,7 @@ class Runner(Configurable): object, resource, self._strategy.settings.history_timedelta, - timeframe=self._strategy.settings.timeframe_timedelta, + step=self._strategy.settings.timeframe_timedelta, ) for resource in ResourceType ] |
