diff options
| author | Avi-Robusta <97387909+Avi-Robusta@users.noreply.github.com> | 2023-07-04 17:32:43 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-07-04 17:32:43 +0300 |
| commit | 682c2e5f45930f94fc0f2485d62fe536c61417c2 (patch) | |
| tree | f01d83abd395046719a4a6f97b713fbdd10e7b4f | |
| parent | 26f9f705d56689b59556c39b3bb1683246a3f44c (diff) | |
| parent | 14c4ae526a7b8767f94a9dcfd64312dc865113a5 (diff) | |
Merge pull request #92 from robusta-dev/mem_test_v2
Fixed prometheus query memory issue
8 files changed, 147 insertions, 34 deletions
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py index f6a535e..c7ba3e7 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py @@ -1,8 +1,7 @@ from typing import Any, Optional - from robusta_krr.core.abstract.strategies import Metric -from .base_metric import BaseMetricLoader +from .base_metric import BaseMetricLoader, QueryType PrometheusSeries = Any @@ -57,6 +56,6 @@ class BaseFilteredMetricLoader(BaseMetricLoader): return_list.append(sorted_relevant_series[0]) return return_list - async def query_prometheus(self, metric: Metric) -> list[PrometheusSeries]: - result = await super().query_prometheus(metric) + async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[PrometheusSeries]: + result = await super().query_prometheus(metric, query_type) return self.filter_prom_jobs_results(result) diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py index b3230e8..8bf263c 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -5,9 +5,8 @@ import asyncio from concurrent.futures import ThreadPoolExecutor import datetime from typing import TYPE_CHECKING, Callable, Optional, TypeVar - +import enum import numpy as np - from robusta_krr.core.abstract.strategies import Metric, ResourceHistoryData from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData @@ -16,8 +15,17 @@ from robusta_krr.utils.configurable import Configurable if TYPE_CHECKING: from .. import CustomPrometheusConnect + MetricsDictionary = dict[str, type[BaseMetricLoader]] + + +class QueryType(str, enum.Enum): + Query = "query" + QueryRange = "query_range" + + # A registry of metrics that can be used to fetch a corresponding metric loader. -REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {} +REGISTERED_METRICS: MetricsDictionary = {} +STRATEGY_METRICS_OVERRIDES: dict[str, MetricsDictionary] = {} class BaseMetricLoader(Configurable, abc.ABC): @@ -50,12 +58,13 @@ class BaseMetricLoader(Configurable, abc.ABC): return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"' @abc.abstractmethod - def get_query(self, object: K8sObjectData) -> str: + def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: """ This method should be implemented by all subclasses to provide a query string to fetch metrics. Args: object (K8sObjectData): The object for which metrics need to be fetched. + resolution (Optional[str]): a string for configurable resolution to the query. Returns: str: The query string. @@ -63,6 +72,19 @@ class BaseMetricLoader(Configurable, abc.ABC): pass + def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: + """ + This method should be implemented by all subclasses to provide a query string in the metadata to produce relevant graphs. + + Args: + object (K8sObjectData): The object for which metrics need to be fetched. + resolution (Optional[str]): a string for configurable resolution to the query. + + Returns: + str: The query string. + """ + return self.get_query(object, resolution) + def _step_to_string(self, step: datetime.timedelta) -> str: """ Converts step in datetime.timedelta format to a string format used by Prometheus. @@ -73,10 +95,28 @@ class BaseMetricLoader(Configurable, abc.ABC): Returns: str: Step size in string format used by Prometheus. """ - + if step.total_seconds() > 60 * 60 * 24: + return f"{int(step.total_seconds()) // (60 * 60 * 24)}d" return f"{int(step.total_seconds()) // 60}m" - async def query_prometheus(self, metric: Metric) -> list[dict]: + def query_prometheus_thread(self, metric: Metric, query_type: QueryType) -> list[dict]: + if query_type == QueryType.QueryRange: + value = self.prometheus.custom_query_range( + query=metric.query, + start_time=metric.start_time, + end_time=metric.end_time, + step=metric.step, + ) + return value + + # regular query, lighter on preformance + results = self.prometheus.custom_query(query=metric.query) + # format the results to return the same format as custom_query_range + for result in results: + result["values"] = [result.pop("value")] + return results + + async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[dict]: """ Asynchronous method that queries Prometheus to fetch metrics. @@ -90,12 +130,7 @@ class BaseMetricLoader(Configurable, abc.ABC): loop = asyncio.get_running_loop() return await loop.run_in_executor( self.executor, - lambda: self.prometheus.custom_query_range( - query=metric.query, - start_time=metric.start_time, - end_time=metric.end_time, - step=metric.step, - ), + lambda: self.query_prometheus_thread(metric=metric, query_type=query_type), ) async def load_data( @@ -112,8 +147,9 @@ class BaseMetricLoader(Configurable, abc.ABC): Returns: ResourceHistoryData: An instance of the ResourceHistoryData class representing the loaded metrics. """ - - query = self.get_query(object) + resolution = f"{self._step_to_string(period)}:{self._step_to_string(step)}" + query = self.get_query(object, resolution) + query_type = self.get_query_type() end_time = datetime.datetime.now().astimezone() metric = Metric( query=query, @@ -121,7 +157,9 @@ class BaseMetricLoader(Configurable, abc.ABC): end_time=end_time, step=self._step_to_string(step), ) - result = await self.query_prometheus(metric) + result = await self.query_prometheus(metric=metric, query_type=query_type) + # adding the query in the results for a graph + metric.query = self.get_graph_query(object, resolution) if result == []: self.warning(f"{service_name} returned no {self.__class__.__name__} metrics for {object}") @@ -135,12 +173,13 @@ class BaseMetricLoader(Configurable, abc.ABC): ) @staticmethod - def get_by_resource(resource: str) -> type[BaseMetricLoader]: + def get_by_resource(resource: str, strategy: Optional[str]) -> type[BaseMetricLoader]: """ Fetches the metric loader corresponding to the specified resource. Args: resource (str): The name of the resource. + resource (str): The name of the strategy. Returns: type[BaseMetricLoader]: The class of the metric loader corresponding to the resource. @@ -150,6 +189,13 @@ class BaseMetricLoader(Configurable, abc.ABC): """ try: + lower_strategy = strategy.lower() + if ( + lower_strategy + and lower_strategy in STRATEGY_METRICS_OVERRIDES + and resource in STRATEGY_METRICS_OVERRIDES[lower_strategy] + ): + return STRATEGY_METRICS_OVERRIDES[lower_strategy][resource] return REGISTERED_METRICS[resource] except KeyError as e: raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e @@ -174,3 +220,26 @@ def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]: return cls return decorator + + +# This is a temporary solutions, metric loaders will be moved to strategy in the future +def override_metric(strategy: str, resource: str) -> Callable[[type[Self]], type[Self]]: + """ + A decorator that overrides the bound metric on a specific strategy. + + Args: + strategy (str): The name of the strategy for this metric. + resource (str): The name of the resource. + + Returns: + Callable[[type[Self]], type[Self]]: The decorator that does the binding. + """ + + def decorator(cls: type[Self]) -> type[Self]: + lower_strategy = strategy.lower() + if lower_strategy not in STRATEGY_METRICS_OVERRIDES: + STRATEGY_METRICS_OVERRIDES[lower_strategy] = {} + STRATEGY_METRICS_OVERRIDES[lower_strategy][resource] = cls + return cls + + return decorator diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index e2d364b..bf27622 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -1,13 +1,14 @@ +from typing import Optional from robusta_krr.core.models.allocations import ResourceType from robusta_krr.core.models.objects import K8sObjectData from .base_filtered_metric import BaseFilteredMetricLoader -from .base_metric import bind_metric +from .base_metric import bind_metric, QueryType @bind_metric(ResourceType.CPU) class CPUMetricLoader(BaseFilteredMetricLoader): - def get_query(self, object: K8sObjectData) -> str: + def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return ( @@ -18,3 +19,6 @@ class CPUMetricLoader(BaseFilteredMetricLoader): f"{cluster_label}" "}[5m])) by (container, pod, job)" ) + + def get_query_type(self) -> QueryType: + return QueryType.QueryRange diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index 5942ec1..a8a4521 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -1,13 +1,14 @@ +from typing import Optional from robusta_krr.core.models.allocations import ResourceType from robusta_krr.core.models.objects import K8sObjectData from .base_filtered_metric import BaseFilteredMetricLoader -from .base_metric import bind_metric +from .base_metric import bind_metric, QueryType, override_metric @bind_metric(ResourceType.Memory) class MemoryMetricLoader(BaseFilteredMetricLoader): - def get_query(self, object: K8sObjectData) -> str: + def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return ( @@ -16,5 +17,35 @@ class MemoryMetricLoader(BaseFilteredMetricLoader): f'pod=~"{pods_selector}", ' f'container="{object.container}"' f"{cluster_label}" - "}) by (container, pod, job)" + "}) by (container, pod, job, id)" ) + + def get_query_type(self) -> QueryType: + return QueryType.QueryRange + +# This is a temporary solutions, metric loaders will be moved to strategy in the future +@override_metric("simple", ResourceType.Memory) +class MemoryMetricLoader(MemoryMetricLoader): + """ + A class that overrides the memory metric on the simple strategy. + """ + + def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: + pods_selector = "|".join(pod.name for pod in object.pods) + cluster_label = self.get_prometheus_cluster_label() + resolution_formatted = f"[{resolution}]" if resolution else "" + return ( + f"max(max_over_time(container_memory_working_set_bytes{{" + f'namespace="{object.namespace}", ' + f'pod=~"{pods_selector}", ' + f'container="{object.container}"' + f"{cluster_label}}}" + f"{resolution_formatted}" + f")) by (container, pod, job, id)" + ) + + def get_query_type(self) -> QueryType: + return QueryType.Query + + def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: + return super().get_query(object, resolution) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 896387f..31fe0c0 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -177,9 +177,9 @@ class PrometheusMetricsService(MetricsService): """ self.debug(f"Gathering data for {object} and {resource}") + MetricLoaderType = BaseMetricLoader.get_by_resource(resource, self.config.strategy) await self.add_historic_pods(object, period) - - MetricLoaderType = BaseMetricLoader.get_by_resource(resource) + metric_loader = MetricLoaderType(self.config, self.prometheus, self.executor) return await metric_loader.load_data(object, period, step, self.name()) @@ -202,7 +202,7 @@ class PrometheusMetricsService(MetricsService): f'owner_name="{object.name}", ' f'owner_kind="Deployment", ' f'namespace="{object.namespace}"' - f'{cluster_label}' + f"{cluster_label}" "}" f"[{period_literal}]" ) @@ -218,7 +218,7 @@ class PrometheusMetricsService(MetricsService): f'owner_name=~"{owners_regex}", ' f'owner_kind="{pod_owner_kind}", ' f'namespace="{object.namespace}"' - f'{cluster_label}' + f"{cluster_label}" "}" f"[{period_literal}]" ) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py index eb11379..42066ae 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py @@ -2,7 +2,7 @@ from typing import Optional from kubernetes.client import ApiClient from requests.exceptions import ConnectionError, HTTPError - +from concurrent.futures import ThreadPoolExecutor from robusta_krr.core.models.config import Config from robusta_krr.utils.service_discovery import ServiceDiscovery @@ -48,9 +48,14 @@ class ThanosMetricsService(PrometheusMetricsService): *, cluster: Optional[str] = None, api_client: Optional[ApiClient] = None, + executor: Optional[ThreadPoolExecutor] = None, ) -> None: super().__init__( - config=config, cluster=cluster, api_client=api_client, service_discovery=ThanosMetricsDiscovery + config=config, + cluster=cluster, + api_client=api_client, + service_discovery=ThanosMetricsDiscovery, + executor=executor, ) def check_connection(self): diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py index b3779a8..9c15cab 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py @@ -1,5 +1,5 @@ from typing import Optional - +from concurrent.futures import ThreadPoolExecutor from kubernetes.client import ApiClient from requests.exceptions import ConnectionError, HTTPError @@ -47,9 +47,14 @@ class VictoriaMetricsService(PrometheusMetricsService): *, cluster: Optional[str] = None, api_client: Optional[ApiClient] = None, + executor: Optional[ThreadPoolExecutor] = None, ) -> None: super().__init__( - config=config, cluster=cluster, api_client=api_client, service_discovery=VictoriaMetricsDiscovery + config=config, + cluster=cluster, + api_client=api_client, + service_discovery=VictoriaMetricsDiscovery, + executor=executor, ) def check_connection(self): diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 890e19d..3cdf7b9 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -154,7 +154,7 @@ class Runner(Configurable): async def _collect_result(self) -> Result: clusters = await self._k8s_loader.list_clusters() - if len(clusters) > 1 and self.config.prometheus_url: + if clusters and len(clusters) > 1 and self.config.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect raise ClusterNotSpecifiedException( |
