From 2655dfaea93ac228ef09ead93f72cc328a33c775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Tue, 16 May 2023 23:58:46 +0300 Subject: Combine promql requests for different pods into one, pass query to result data --- robusta_krr/core/abstract/strategies.py | 11 ++++++- robusta_krr/core/integrations/prometheus/loader.py | 3 -- .../prometheus/metrics/base_filtered_metric.py | 1 + .../integrations/prometheus/metrics/base_metric.py | 35 ++++++++++------------ .../integrations/prometheus/metrics/cpu_metric.py | 12 ++++++-- .../prometheus/metrics/memory_metric.py | 12 ++++++-- robusta_krr/core/models/result.py | 10 +++++-- robusta_krr/core/runner.py | 25 +++++++++------- robusta_krr/strategies/simple.py | 4 +-- 9 files changed, 70 insertions(+), 43 deletions(-) diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index a46533c..5882493 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -43,7 +43,16 @@ class StrategySettings(pd.BaseModel): _StrategySettings = TypeVar("_StrategySettings", bound=StrategySettings) ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]] -ResourceHistoryData = dict[str, ArrayNx2] + + +class ResourceHistoryData(pd.BaseModel): + query: str # The query used to get the data + data: dict[str, ArrayNx2] # Mapping: pod -> (time, value) + + class Config: + arbitrary_types_allowed = True + + HistoryData = dict[ResourceType, ResourceHistoryData] RunResult = dict[ResourceType, ResourceRecommendation] diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 685dd1c..b543bd8 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -18,9 +18,6 @@ from robusta_krr.utils.service_discovery import ServiceDiscovery from .metrics import BaseMetricLoader -import numpy as np -from numpy.typing import NDArray - class PrometheusDiscovery(ServiceDiscovery): def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py index 80408c5..a409810 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py @@ -21,6 +21,7 @@ class BaseFilteredMetricLoader(BaseMetricLoader): return series["metric"][label] return None + # TODO: Rework this, as now our query can return multiple metrics for different pods @staticmethod def filter_prom_jobs_results( series_list_result: list[PrometheusSeries], diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py index be180f6..2547d16 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -24,7 +24,7 @@ class BaseMetricLoader(Configurable, abc.ABC): self.prometheus = prometheus @abc.abstractmethod - def get_query(self, namespace: str, pod: str, container: str) -> str: + def get_query(self, object: K8sObjectData) -> str: ... async def query_prometheus( @@ -41,28 +41,25 @@ class BaseMetricLoader(Configurable, abc.ABC): async def load_data( self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta ) -> ResourceHistoryData: - result = await asyncio.gather( - *[ - self.query_prometheus( - query=self.get_query(object.namespace, pod.name, object.container), - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step=step, - ) - for pod in object.pods - ] + query = self.get_query(object) + result = await self.query_prometheus( + query=query, + start_time=datetime.datetime.now() - period, + end_time=datetime.datetime.now(), + step=step, ) if result == []: self.warning(f"Prometheus returned no {self.__class__.__name__} metrics for {object}") - return {pod.name: np.array([]) for pod in object.pods} - - pod_results = {pod: result[i] for i, pod in enumerate(object.pods)} - return { - pod.name: np.array([(timestamp, value) for timestamp, value in pod_result[0]["values"]], dtype=np.float64) - for pod, pod_result in pod_results.items() - if pod_result != [] - } + return ResourceHistoryData(query=query, data={}) + + return ResourceHistoryData( + query=query, + data={ + pod_result['metric']['pod']: np.array(pod_result["values"], dtype=np.float64) + for pod_result in result + }, + ) @staticmethod def get_by_resource(resource: str) -> type[BaseMetricLoader]: diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index ffbaa91..dda7d14 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -2,9 +2,17 @@ from robusta_krr.core.models.allocations import ResourceType from .base_metric import bind_metric from .base_filtered_metric import BaseFilteredMetricLoader +from robusta_krr.core.models.objects import K8sObjectData @bind_metric(ResourceType.CPU) class CPUMetricLoader(BaseFilteredMetricLoader): - def get_query(self, namespace: str, pod: str, container: str) -> str: - return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)' + def get_query(self, object: K8sObjectData) -> str: + pods_selector = "|".join(pod.name for pod in object.pods) + return ( + 'sum(irate(container_cpu_usage_seconds_total{' + f'namespace="{object.namespace}", ' + f'pod=~"{pods_selector}", ' + f'container="{object.container}"' + '}[5m])) by (container, pod, job)' + ) diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index 3355ed6..33ed733 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -2,9 +2,17 @@ from robusta_krr.core.models.allocations import ResourceType from .base_metric import bind_metric from .base_filtered_metric import BaseFilteredMetricLoader +from robusta_krr.core.models.objects import K8sObjectData @bind_metric(ResourceType.Memory) class MemoryMetricLoader(BaseFilteredMetricLoader): - def get_query(self, namespace: str, pod: str, container: str) -> str: - return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)' + def get_query(self, object: K8sObjectData) -> str: + pods_selector = "|".join(pod.name for pod in object.pods) + return ( + 'sum(container_memory_working_set_bytes{' + f'namespace="{object.namespace}", ' + f'pod=~"{pods_selector}", ' + f'container="{object.container}"' + '}) by (container, pod, job)' + ) diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py index 1e944e7..5119de8 100644 --- a/robusta_krr/core/models/result.py +++ b/robusta_krr/core/models/result.py @@ -60,13 +60,17 @@ class ResourceRecommendation(pd.BaseModel): limits: dict[ResourceType, RecommendationValue] +MetricsData = dict[ResourceType, str] + + class ResourceScan(pd.BaseModel): object: K8sObjectData recommended: ResourceRecommendation severity: Severity + metrics: MetricsData @classmethod - def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan: + def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations, metrics: MetricsData) -> ResourceScan: recommendation_processed = ResourceRecommendation(requests={}, limits={}) for resource_type in ResourceType: @@ -84,9 +88,9 @@ class ResourceScan(pd.BaseModel): for selector in ["requests", "limits"]: for recommendation_request in getattr(recommendation_processed, selector).values(): if recommendation_request.severity == severity: - return cls(object=object, recommended=recommendation_processed, severity=severity) + return cls(object=object, recommended=recommendation_processed, severity=severity, metrics=metrics) - return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN) + return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN, metrics=metrics) class Result(pd.BaseModel): diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 5ee8689..51642fe 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -7,7 +7,7 @@ from robusta_krr.core.integrations.kubernetes import KubernetesLoader from robusta_krr.core.integrations.prometheus import PrometheusLoader, PrometheusNotFound from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData -from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result +from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, MetricsData from robusta_krr.utils.configurable import Configurable from robusta_krr.utils.logo import ASCII_LOGO from robusta_krr.utils.version import get_version @@ -91,11 +91,11 @@ class Runner(Configurable): for resource, recommendation in result.items() } - async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult: + async def _calculate_object_recommendations(self, object: K8sObjectData) -> tuple[RunResult, MetricsData]: prometheus_loader = self._get_prometheus_loader(object.cluster) if prometheus_loader is None: - return {resource: ResourceRecommendation.undefined() for resource in ResourceType} + return {resource: ResourceRecommendation.undefined() for resource in ResourceType}, {} data_tuple = await asyncio.gather( *[ @@ -109,23 +109,26 @@ class Runner(Configurable): ] ) data = dict(zip(ResourceType, data_tuple)) + queries = {resource: data[resource].query for resource in ResourceType} # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive # But keep in mind that numpy calcluations will not block the GIL result = await asyncio.to_thread(self._strategy.run, data, object) - return self._format_result(result) + return self._format_result(result), queries - async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]: - recommendations: list[RunResult] = await asyncio.gather( + async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[tuple[ResourceAllocations, MetricsData]]: + recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather( *[self._calculate_object_recommendations(object) for object in objects] ) return [ - ResourceAllocations( - requests={resource: recommendation[resource].request for resource in ResourceType}, - limits={resource: recommendation[resource].limit for resource in ResourceType}, + ( + ResourceAllocations( + requests={resource: recommendation[resource].request for resource in ResourceType}, + limits={resource: recommendation[resource].limit for resource in ResourceType}, + ), metric ) - for recommendation in recommendations + for recommendation, metric in recommendations ] async def _collect_result(self) -> Result: @@ -144,7 +147,7 @@ class Runner(Configurable): return Result( scans=[ - ResourceScan.calculate(obj, recommended) for obj, recommended in zip(objects, resource_recommendations) + ResourceScan.calculate(obj, recommended, metrics) for obj, (recommended, metrics) in zip(objects, resource_recommendations) ], description=self._strategy.description, ) diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py index c360a69..fa946da 100644 --- a/robusta_krr/strategies/simple.py +++ b/robusta_krr/strategies/simple.py @@ -51,8 +51,8 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]): __rich_console__ = True def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult: - cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU]) - memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory]) + cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU].data) + memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory].data) return { ResourceType.CPU: ResourceRecommendation(request=cpu_usage, limit=None), -- cgit v1.2.3 From 98d138db2e0b6ffd5b114ec655f464a05fe2a2bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B0=D0=B2=D0=B5=D0=BB=20=D0=96=D1=83=D0=BA=D0=BE?= =?UTF-8?q?=D0=B2?= <33721692+LeaveMyYard@users.noreply.github.com> Date: Mon, 22 May 2023 18:01:12 +0300 Subject: [MAIN-169] Set default timeframe to 2m --- robusta_krr/core/abstract/strategies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index 5882493..9c4f133 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -29,7 +29,7 @@ class StrategySettings(pd.BaseModel): history_duration: float = pd.Field( 24 * 7 * 2, ge=1, description="The duration of the history data to use (in hours)." ) - timeframe_duration: float = pd.Field(15, ge=1, description="The step for the history data (in minutes).") + timeframe_duration: float = pd.Field(2, ge=1, description="The step for the history data (in minutes).") @property def history_timedelta(self) -> datetime.timedelta: -- cgit v1.2.3