summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--robusta_krr/core/abstract/strategies.py5
-rw-r--r--robusta_krr/core/integrations/kubernetes/config_patch.py4
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py7
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base.py2
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py46
-rw-r--r--robusta_krr/core/models/result.py1
-rw-r--r--robusta_krr/core/runner.py59
-rw-r--r--robusta_krr/strategies/simple.py6
8 files changed, 115 insertions, 15 deletions
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py
index 204341f..5b6521e 100644
--- a/robusta_krr/core/abstract/strategies.py
+++ b/robusta_krr/core/abstract/strategies.py
@@ -59,6 +59,11 @@ class StrategySettings(pd.BaseModel):
def timeframe_timedelta(self) -> datetime.timedelta:
return datetime.timedelta(minutes=self.timeframe_duration)
+ def history_range_enough(self, history_range: tuple[datetime.timedelta, datetime.timedelta]) -> bool:
+ """Override this function to check if the history range is enough for the strategy."""
+
+ return True
+
# A type alias for a numpy array of shape (N, 2).
ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]]
diff --git a/robusta_krr/core/integrations/kubernetes/config_patch.py b/robusta_krr/core/integrations/kubernetes/config_patch.py
index 5f294bc..81cd108 100644
--- a/robusta_krr/core/integrations/kubernetes/config_patch.py
+++ b/robusta_krr/core/integrations/kubernetes/config_patch.py
@@ -3,6 +3,8 @@
from __future__ import annotations
+from typing import Optional
+
from kubernetes.client import configuration
from kubernetes.config import kube_config
@@ -25,7 +27,7 @@ class KubeConfigLoader(kube_config.KubeConfigLoader):
class Configuration(configuration.Configuration):
def __init__(
self,
- proxy: str | None = None,
+ proxy: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index c099536..df5af96 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -65,10 +65,15 @@ class PrometheusMetricsLoader:
loader.validate_cluster_name()
return loader
except MetricsNotFound as e:
- logger.debug(f"{service_name} not found: {e}")
+ logger.info(f"{service_name} not found: {e}")
return None
+ async def get_history_range(
+ self, history_duration: datetime.timedelta
+ ) -> Optional[tuple[datetime.datetime, datetime.datetime]]:
+ return await self.loader.get_history_range(history_duration)
+
async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]:
try:
return await self.loader.load_pods(object, period)
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py
index eadec78..f4265e2 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base.py
@@ -166,7 +166,7 @@ class PrometheusMetric(BaseMetric):
duration_str = self._step_to_string(period)
query = self.get_query(object, duration_str, step_str)
- end_time = datetime.datetime.now().replace(second=0, microsecond=0).astimezone()
+ end_time = datetime.datetime.utcnow().replace(second=0, microsecond=0)
start_time = end_time - period
# Here if we split the object into multiple sub-objects, we query each sub-object recursively.
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 4b67309..017c0b2 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -1,7 +1,7 @@
import asyncio
-import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
+from datetime import datetime, timedelta
from typing import List, Optional
from kubernetes.client import ApiClient
@@ -108,7 +108,19 @@ class PrometheusMetricsService(MetricsService):
async def query(self, query: str) -> dict:
loop = asyncio.get_running_loop()
- return await loop.run_in_executor(self.executor, lambda: self.prometheus.safe_custom_query(query=query))
+ return await loop.run_in_executor(
+ self.executor,
+ lambda: self.prometheus.safe_custom_query(query=query)["result"],
+ )
+
+ async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict:
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(
+ self.executor,
+ lambda: self.prometheus.safe_custom_query_range(
+ query=query, start_time=start, end_time=end, step=f"{step.seconds}s"
+ )["result"],
+ )
def validate_cluster_name(self):
if not settings.prometheus_cluster_label and not settings.prometheus_label:
@@ -137,12 +149,34 @@ class PrometheusMetricsService(MetricsService):
logger.error("Labels api not present on prometheus client")
return []
+ async def get_history_range(self, history_duration: timedelta) -> tuple[datetime, datetime]:
+ """
+ Get the history range from Prometheus, based on container_memory_working_set_bytes.
+ Returns:
+ float: The first history point.
+ """
+
+ now = datetime.now()
+ result = await self.query_range(
+ "max(container_memory_working_set_bytes)",
+ start=now - history_duration,
+ end=now,
+ step=timedelta(hours=1),
+ )
+ try:
+ values = result[0]["values"]
+ start, end = values[0][0], values[-1][0]
+ return datetime.fromtimestamp(start), datetime.fromtimestamp(end)
+ except (KeyError, IndexError) as e:
+ logger.debug(f"Returned from get_history_range: {result}")
+ raise ValueError("Error while getting history range") from e
+
async def gather_data(
self,
object: K8sObjectData,
LoaderClass: type[PrometheusMetric],
- period: datetime.timedelta,
- step: datetime.timedelta = datetime.timedelta(minutes=30),
+ period: timedelta,
+ step: timedelta = timedelta(minutes=30),
) -> PodsTimeData:
"""
ResourceHistoryData: The gathered resource history data.
@@ -164,12 +198,12 @@ class PrometheusMetricsService(MetricsService):
return data
- async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]:
+ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]:
"""
List pods related to the object and add them to the object's pods list.
Args:
object (K8sObjectData): The Kubernetes object.
- period (datetime.timedelta): The time period for which to gather data.
+ period (timedelta): The time period for which to gather data.
"""
logger.debug(f"Adding historic pods for {object}")
diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py
index 4d3fe38..2d5ffbc 100644
--- a/robusta_krr/core/models/result.py
+++ b/robusta_krr/core/models/result.py
@@ -63,6 +63,7 @@ class Result(pd.BaseModel):
resources: list[str] = ["cpu", "memory"]
description: Optional[str] = None
strategy: StrategyData
+ errors: list[dict[str, Any]] = pd.Field(default_factory=list)
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index fd0c9e0..0fcb875 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -41,6 +41,8 @@ class Runner:
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = settings.create_strategy()
+ self.errors: list[dict] = []
+
# This executor will be running calculations for recommendations
self._executor = ThreadPoolExecutor(settings.max_workers)
@@ -73,6 +75,8 @@ class Runner:
custom_print("")
def _process_result(self, result: Result) -> None:
+ result.errors = self.errors
+
Formatter = settings.Formatter
formatted = result.format(Formatter)
rich = getattr(Formatter, "__rich_console__", False)
@@ -149,13 +153,12 @@ class Runner:
object.pods = await self._k8s_loader.load_pods(object)
# NOTE: Kubernetes API returned pods, but Prometheus did not
+ # This might happen with fast executing jobs
if object.pods != []:
object.add_warning("NoPrometheusPods")
logger.warning(
- f"Was not able to load any pods for {object} from Prometheus.\n\t"
- "This could mean that Prometheus is missing some required metrics.\n\t"
- "Loaded pods from Kubernetes API instead.\n\t"
- "See more info at https://github.com/robusta-dev/krr#requirements "
+ f"Was not able to load any pods for {object} from Prometheus. "
+ "Loaded pods from Kubernetes API instead."
)
metrics = await prometheus_loader.gather_data(
@@ -173,6 +176,43 @@ class Runner:
logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)")
return self._format_result(result)
+ async def _check_data_availability(self, cluster: Optional[str]) -> None:
+ prometheus_loader = self._get_prometheus_loader(cluster)
+ if prometheus_loader is None:
+ return
+
+ try:
+ history_range = await prometheus_loader.get_history_range(self._strategy.settings.history_timedelta)
+ except ValueError:
+ logger.exception(f"Was not able to get history range for cluster {cluster}")
+ self.errors.append(
+ {
+ "name": "HistoryRangeError",
+ }
+ )
+ return
+
+ logger.debug(f"History range for {cluster}: {history_range}")
+ enough_data = self._strategy.settings.history_range_enough(history_range)
+
+ if not enough_data:
+ logger.error(f"Not enough history available for cluster {cluster}.")
+ try_after = history_range[0] + self._strategy.settings.history_timedelta
+
+ logger.error(
+ "If the cluster is freshly installed, it might take some time for the enough data to be available."
+ )
+ logger.error(
+ f"Enough data is estimated to be available after {try_after}, "
+ "but will try to calculate recommendations anyway."
+ )
+ self.errors.append(
+ {
+ "name": "NotEnoughHistoryAvailable",
+ "retry_after": try_after,
+ }
+ )
+
async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan:
recommendation = await self._calculate_object_recommendations(k8s_object)
@@ -191,13 +231,20 @@ class Runner:
clusters = await self._k8s_loader.list_clusters()
if clusters and len(clusters) > 1 and settings.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
- # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect
+ # In this scenario we dont yet support determining
+ # which metrics belong to which cluster so the reccomendation can be incorrect
raise ClusterNotSpecifiedException(
- f"Cannot scan multiple clusters for this prometheus, Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}"
+ f"Cannot scan multiple clusters for this prometheus, "
+ f"Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}"
)
logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}')
+ if clusters is None:
+ await self._check_data_availability(None)
+ else:
+ await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters])
+
with ProgressBar(title="Calculating Recommendation") as self.__progressbar:
scans_tasks = [
asyncio.create_task(self._gather_object_allocations(k8s_object))
diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py
index f7a11df..22c6a20 100644
--- a/robusta_krr/strategies/simple.py
+++ b/robusta_krr/strategies/simple.py
@@ -1,3 +1,5 @@
+from datetime import timedelta
+
import numpy as np
import pydantic as pd
@@ -47,6 +49,10 @@ class SimpleStrategySettings(StrategySettings):
return np.max(data_)
+ def history_range_enough(self, history_range: tuple[timedelta, timedelta]) -> bool:
+ start, end = history_range
+ return min(end - start, self.history_timedelta) / self.timeframe_timedelta >= self.points_required
+
class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
"""