diff options
| -rw-r--r-- | robusta_krr/core/abstract/strategies.py | 5 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/config_patch.py | 4 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/loader.py | 7 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics/base.py | 2 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py | 46 | ||||
| -rw-r--r-- | robusta_krr/core/models/result.py | 1 | ||||
| -rw-r--r-- | robusta_krr/core/runner.py | 59 | ||||
| -rw-r--r-- | robusta_krr/strategies/simple.py | 6 |
8 files changed, 115 insertions, 15 deletions
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index 204341f..5b6521e 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -59,6 +59,11 @@ class StrategySettings(pd.BaseModel): def timeframe_timedelta(self) -> datetime.timedelta: return datetime.timedelta(minutes=self.timeframe_duration) + def history_range_enough(self, history_range: tuple[datetime.timedelta, datetime.timedelta]) -> bool: + """Override this function to check if the history range is enough for the strategy.""" + + return True + # A type alias for a numpy array of shape (N, 2). ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]] diff --git a/robusta_krr/core/integrations/kubernetes/config_patch.py b/robusta_krr/core/integrations/kubernetes/config_patch.py index 5f294bc..81cd108 100644 --- a/robusta_krr/core/integrations/kubernetes/config_patch.py +++ b/robusta_krr/core/integrations/kubernetes/config_patch.py @@ -3,6 +3,8 @@ from __future__ import annotations +from typing import Optional + from kubernetes.client import configuration from kubernetes.config import kube_config @@ -25,7 +27,7 @@ class KubeConfigLoader(kube_config.KubeConfigLoader): class Configuration(configuration.Configuration): def __init__( self, - proxy: str | None = None, + proxy: Optional[str] = None, **kwargs, ): super().__init__(**kwargs) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index c099536..df5af96 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -65,10 +65,15 @@ class PrometheusMetricsLoader: loader.validate_cluster_name() return loader except MetricsNotFound as e: - logger.debug(f"{service_name} not found: {e}") + logger.info(f"{service_name} not found: {e}") return None + async def get_history_range( + self, history_duration: datetime.timedelta + ) -> Optional[tuple[datetime.datetime, datetime.datetime]]: + return await self.loader.get_history_range(history_duration) + async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]: try: return await self.loader.load_pods(object, period) diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index eadec78..f4265e2 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -166,7 +166,7 @@ class PrometheusMetric(BaseMetric): duration_str = self._step_to_string(period) query = self.get_query(object, duration_str, step_str) - end_time = datetime.datetime.now().replace(second=0, microsecond=0).astimezone() + end_time = datetime.datetime.utcnow().replace(second=0, microsecond=0) start_time = end_time - period # Here if we split the object into multiple sub-objects, we query each sub-object recursively. diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 4b67309..017c0b2 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,7 +1,7 @@ import asyncio -import datetime import logging from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timedelta from typing import List, Optional from kubernetes.client import ApiClient @@ -108,7 +108,19 @@ class PrometheusMetricsService(MetricsService): async def query(self, query: str) -> dict: loop = asyncio.get_running_loop() - return await loop.run_in_executor(self.executor, lambda: self.prometheus.safe_custom_query(query=query)) + return await loop.run_in_executor( + self.executor, + lambda: self.prometheus.safe_custom_query(query=query)["result"], + ) + + async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.prometheus.safe_custom_query_range( + query=query, start_time=start, end_time=end, step=f"{step.seconds}s" + )["result"], + ) def validate_cluster_name(self): if not settings.prometheus_cluster_label and not settings.prometheus_label: @@ -137,12 +149,34 @@ class PrometheusMetricsService(MetricsService): logger.error("Labels api not present on prometheus client") return [] + async def get_history_range(self, history_duration: timedelta) -> tuple[datetime, datetime]: + """ + Get the history range from Prometheus, based on container_memory_working_set_bytes. + Returns: + float: The first history point. + """ + + now = datetime.now() + result = await self.query_range( + "max(container_memory_working_set_bytes)", + start=now - history_duration, + end=now, + step=timedelta(hours=1), + ) + try: + values = result[0]["values"] + start, end = values[0][0], values[-1][0] + return datetime.fromtimestamp(start), datetime.fromtimestamp(end) + except (KeyError, IndexError) as e: + logger.debug(f"Returned from get_history_range: {result}") + raise ValueError("Error while getting history range") from e + async def gather_data( self, object: K8sObjectData, LoaderClass: type[PrometheusMetric], - period: datetime.timedelta, - step: datetime.timedelta = datetime.timedelta(minutes=30), + period: timedelta, + step: timedelta = timedelta(minutes=30), ) -> PodsTimeData: """ ResourceHistoryData: The gathered resource history data. @@ -164,12 +198,12 @@ class PrometheusMetricsService(MetricsService): return data - async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]: + async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]: """ List pods related to the object and add them to the object's pods list. Args: object (K8sObjectData): The Kubernetes object. - period (datetime.timedelta): The time period for which to gather data. + period (timedelta): The time period for which to gather data. """ logger.debug(f"Adding historic pods for {object}") diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py index 4d3fe38..2d5ffbc 100644 --- a/robusta_krr/core/models/result.py +++ b/robusta_krr/core/models/result.py @@ -63,6 +63,7 @@ class Result(pd.BaseModel): resources: list[str] = ["cpu", "memory"] description: Optional[str] = None strategy: StrategyData + errors: list[dict[str, Any]] = pd.Field(default_factory=list) def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index fd0c9e0..0fcb875 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -41,6 +41,8 @@ class Runner: self._metrics_service_loaders_error_logged: set[Exception] = set() self._strategy = settings.create_strategy() + self.errors: list[dict] = [] + # This executor will be running calculations for recommendations self._executor = ThreadPoolExecutor(settings.max_workers) @@ -73,6 +75,8 @@ class Runner: custom_print("") def _process_result(self, result: Result) -> None: + result.errors = self.errors + Formatter = settings.Formatter formatted = result.format(Formatter) rich = getattr(Formatter, "__rich_console__", False) @@ -149,13 +153,12 @@ class Runner: object.pods = await self._k8s_loader.load_pods(object) # NOTE: Kubernetes API returned pods, but Prometheus did not + # This might happen with fast executing jobs if object.pods != []: object.add_warning("NoPrometheusPods") logger.warning( - f"Was not able to load any pods for {object} from Prometheus.\n\t" - "This could mean that Prometheus is missing some required metrics.\n\t" - "Loaded pods from Kubernetes API instead.\n\t" - "See more info at https://github.com/robusta-dev/krr#requirements " + f"Was not able to load any pods for {object} from Prometheus. " + "Loaded pods from Kubernetes API instead." ) metrics = await prometheus_loader.gather_data( @@ -173,6 +176,43 @@ class Runner: logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)") return self._format_result(result) + async def _check_data_availability(self, cluster: Optional[str]) -> None: + prometheus_loader = self._get_prometheus_loader(cluster) + if prometheus_loader is None: + return + + try: + history_range = await prometheus_loader.get_history_range(self._strategy.settings.history_timedelta) + except ValueError: + logger.exception(f"Was not able to get history range for cluster {cluster}") + self.errors.append( + { + "name": "HistoryRangeError", + } + ) + return + + logger.debug(f"History range for {cluster}: {history_range}") + enough_data = self._strategy.settings.history_range_enough(history_range) + + if not enough_data: + logger.error(f"Not enough history available for cluster {cluster}.") + try_after = history_range[0] + self._strategy.settings.history_timedelta + + logger.error( + "If the cluster is freshly installed, it might take some time for the enough data to be available." + ) + logger.error( + f"Enough data is estimated to be available after {try_after}, " + "but will try to calculate recommendations anyway." + ) + self.errors.append( + { + "name": "NotEnoughHistoryAvailable", + "retry_after": try_after, + } + ) + async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan: recommendation = await self._calculate_object_recommendations(k8s_object) @@ -191,13 +231,20 @@ class Runner: clusters = await self._k8s_loader.list_clusters() if clusters and len(clusters) > 1 and settings.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus - # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect + # In this scenario we dont yet support determining + # which metrics belong to which cluster so the reccomendation can be incorrect raise ClusterNotSpecifiedException( - f"Cannot scan multiple clusters for this prometheus, Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}" + f"Cannot scan multiple clusters for this prometheus, " + f"Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}" ) logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') + if clusters is None: + await self._check_data_availability(None) + else: + await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters]) + with ProgressBar(title="Calculating Recommendation") as self.__progressbar: scans_tasks = [ asyncio.create_task(self._gather_object_allocations(k8s_object)) diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py index f7a11df..22c6a20 100644 --- a/robusta_krr/strategies/simple.py +++ b/robusta_krr/strategies/simple.py @@ -1,3 +1,5 @@ +from datetime import timedelta + import numpy as np import pydantic as pd @@ -47,6 +49,10 @@ class SimpleStrategySettings(StrategySettings): return np.max(data_) + def history_range_enough(self, history_range: tuple[timedelta, timedelta]) -> bool: + start, end = history_range + return min(end - start, self.history_timedelta) / self.timeframe_timedelta >= self.points_required + class SimpleStrategy(BaseStrategy[SimpleStrategySettings]): """ |
