summaryrefslogtreecommitdiff
path: root/robusta_krr
diff options
context:
space:
mode:
authorLeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com>2023-07-24 20:47:51 +0300
committerLeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com>2023-07-24 20:47:51 +0300
commit428d628c7f09c16733e6f65c41a3f03ceeac4db8 (patch)
treeff6a131df5811da8dab6f6bf34819fd2e7c75959 /robusta_krr
parent2d53179a8f81df7e6ff7bee1ab0b19b12e82934a (diff)
Refactor strategy dependancy on metrics
Diffstat (limited to 'robusta_krr')
-rw-r--r--robusta_krr/api/models.py11
-rw-r--r--robusta_krr/core/abstract/metrics.py21
-rw-r--r--robusta_krr/core/abstract/strategies.py48
-rw-r--r--robusta_krr/core/integrations/prometheus/__init__.py2
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py30
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/__init__.py6
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base.py230
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py62
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_metric.py251
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/cpu.py33
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py25
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/memory.py (renamed from robusta_krr/core/integrations/prometheus/metrics/memory_metric.py)24
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py10
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py31
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py18
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py18
-rw-r--r--robusta_krr/core/integrations/prometheus/prometheus_client.py3
-rw-r--r--robusta_krr/core/models/result.py22
-rw-r--r--robusta_krr/core/runner.py54
-rw-r--r--robusta_krr/strategies/simple.py37
-rw-r--r--robusta_krr/utils/display_name.py31
-rw-r--r--robusta_krr/utils/progress_bar.py2
22 files changed, 416 insertions, 553 deletions
diff --git a/robusta_krr/api/models.py b/robusta_krr/api/models.py
index 537168a..7c4f587 100644
--- a/robusta_krr/api/models.py
+++ b/robusta_krr/api/models.py
@@ -1,4 +1,9 @@
-from robusta_krr.core.abstract.strategies import HistoryData, ResourceHistoryData, ResourceRecommendation, RunResult
+from robusta_krr.core.abstract.strategies import (
+ MetricPodData,
+ MetricsPodData,
+ ResourceRecommendation,
+ RunResult,
+)
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceScan, Result
@@ -15,7 +20,7 @@ __all__ = [
"register_severity_calculator",
"ResourceScan",
"ResourceRecommendation",
- "HistoryData",
- "ResourceHistoryData",
+ "MetricPodData",
+ "MetricsPodData",
"RunResult",
]
diff --git a/robusta_krr/core/abstract/metrics.py b/robusta_krr/core/abstract/metrics.py
new file mode 100644
index 0000000..28de381
--- /dev/null
+++ b/robusta_krr/core/abstract/metrics.py
@@ -0,0 +1,21 @@
+from abc import ABC, abstractmethod
+import datetime
+
+from robusta_krr.core.abstract.strategies import MetricPodData
+from robusta_krr.core.models.objects import K8sObjectData
+
+
+class BaseMetric(ABC):
+ """
+ This abstraction is done for a future use.
+ Currently we only scrape metrics from Prometheus,
+ but in the future we may want to support other metric sources like Datadog, etc.
+
+ TODO: When we want to support other metric sources, we should maybe rethink an interface here.
+ """
+
+ @abstractmethod
+ async def load_data(
+ self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
+ ) -> MetricPodData:
+ ...
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py
index 367df50..cc8432b 100644
--- a/robusta_krr/core/abstract/strategies.py
+++ b/robusta_krr/core/abstract/strategies.py
@@ -3,14 +3,17 @@ from __future__ import annotations
import abc
import datetime
from textwrap import dedent
-from typing import Annotated, Generic, Literal, Optional, TypeVar, get_args
+from typing import Annotated, Generic, Literal, Optional, TypeVar, get_args, TYPE_CHECKING
import numpy as np
import pydantic as pd
from numpy.typing import NDArray
-from robusta_krr.core.models.result import K8sObjectData, Metric, ResourceType
-from robusta_krr.utils.display_name import display_name_property
+from robusta_krr.core.models.result import K8sObjectData, ResourceType
+
+if TYPE_CHECKING:
+ from robusta_krr.core.integrations.prometheus.metrics import PrometheusMetric
+ from robusta_krr.core.abstract.metrics import BaseMetric # noqa: F401
SelfRR = TypeVar("SelfRR", bound="ResourceRecommendation")
@@ -61,21 +64,9 @@ class StrategySettings(pd.BaseModel):
ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]]
-class ResourceHistoryData(pd.BaseModel):
- """A class to represent resource history data.
-
- metric is the metric information used to gather the history data.
- data is a mapping from pod to a numpy array of time and value.
- """
-
- metric: Metric
- data: dict[str, ArrayNx2] # Mapping: pod -> [(time, value)]
-
- class Config:
- arbitrary_types_allowed = True
+MetricPodData = dict[str, ArrayNx2] # Mapping: pod -> [(time, value)]
+MetricsPodData = dict[type["BaseMetric"], MetricPodData]
-
-HistoryData = dict[ResourceType, ResourceHistoryData]
RunResult = dict[ResourceType, ResourceRecommendation]
SelfBS = TypeVar("SelfBS", bound="BaseStrategy")
@@ -85,7 +76,6 @@ _StrategySettings = TypeVar("_StrategySettings", bound=StrategySettings)
# An abstract base class for strategy implementation.
# This class requires implementation of a 'run' method for calculating recommendation.
# Make a subclass if you want to create a concrete strategy.
-@display_name_property(suffix="Strategy")
class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
"""An abstract base class for strategy implementation.
@@ -98,20 +88,25 @@ class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
Description property uses the docstring of the strategy class and the settings of the strategy.
The name of the strategy is the name of the class in lowercase, without the 'Strategy' suffix, if exists.
- If you want to change the name of the strategy, you can change the __display_name__ attribute.
+ If you want to change the name of the strategy, you can change the display_name class attribute.
The strategy will automatically be registered in the strategy registry using __subclasses__ mechanism.
"""
- __display_name__: str
-
- settings: _StrategySettings
+ display_name: str
+ rich_console: bool = False
+ # TODO: this should be BaseMetricLoader, but currently we only support Prometheus
+ metrics: list[type[PrometheusMetric]] = []
def __init__(self, settings: _StrategySettings):
self.settings = settings
def __str__(self) -> str:
- return self.__display_name__.title()
+ return self._display_name.title()
+
+ @property
+ def _display_name(self) -> str:
+ return getattr(self, "__display_name__", self.__class__.__name__.lower().removeprefix("strategy"))
@property
def description(self) -> Optional[str]:
@@ -129,7 +124,7 @@ class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
# Abstract method that needs to be implemented by subclass.
# This method is intended to calculate resource recommendation based on history data and kubernetes object data.
@abc.abstractmethod
- def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult:
+ def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult:
pass
# This method is intended to return a strategy by its name.
@@ -146,7 +141,7 @@ class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
def get_all(cls: type[SelfBS]) -> dict[str, type[SelfBS]]:
from robusta_krr import strategies as _ # noqa: F401
- return {sub_cls.__display_name__.lower(): sub_cls for sub_cls in cls.__subclasses__()}
+ return {sub_cls.display_name.lower(): sub_cls for sub_cls in cls.__subclasses__()}
# This method is intended to return the type of settings used in strategy.
@classmethod
@@ -161,7 +156,8 @@ __all__ = [
"AnyStrategy",
"BaseStrategy",
"StrategySettings",
- "HistoryData",
+ "MetricPodData",
+ "MetricsPodData",
"K8sObjectData",
"ResourceType",
]
diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py
index 8f9dbb7..e7e545b 100644
--- a/robusta_krr/core/integrations/prometheus/__init__.py
+++ b/robusta_krr/core/integrations/prometheus/__init__.py
@@ -1,3 +1,3 @@
-from .loader import MetricsLoader
+from .loader import PrometheusMetricsLoader
from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound
from .prometheus_client import CustomPrometheusConnect, ClusterNotSpecifiedException
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index 590bc83..0daf3a6 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -1,21 +1,24 @@
+from __future__ import annotations
+
import datetime
from concurrent.futures import ThreadPoolExecutor
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient
-from robusta_krr.core.abstract.strategies import ResourceHistoryData
-from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
-from robusta_krr.core.models.result import ResourceType
from robusta_krr.utils.configurable import Configurable
-from .metrics_service.base_metric_service import MetricsNotFound, MetricsService
+from .metrics_service.base_metric_service import MetricsNotFound
from .metrics_service.prometheus_metrics_service import PrometheusMetricsService, PrometheusNotFound
from .metrics_service.thanos_metrics_service import ThanosMetricsService
from .metrics_service.victoria_metrics_service import VictoriaMetricsService
+if TYPE_CHECKING:
+ from robusta_krr.core.abstract.strategies import MetricsPodData, BaseStrategy
+ from robusta_krr.core.models.config import Config
+
METRICS_SERVICES = {
"Prometheus": PrometheusMetricsService,
"Victoria Metrics": VictoriaMetricsService,
@@ -23,7 +26,7 @@ METRICS_SERVICES = {
}
-class MetricsLoader(Configurable):
+class PrometheusMetricsLoader(Configurable):
def __init__(
self,
config: Config,
@@ -53,14 +56,14 @@ class MetricsLoader(Configurable):
self.loader = loader
- self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster")
+ self.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster")
def get_metrics_service(
self,
config: Config,
api_client: Optional[ApiClient] = None,
cluster: Optional[str] = None,
- ) -> Optional[MetricsService]:
+ ) -> Optional[PrometheusMetricsService]:
for service_name, metric_service_class in METRICS_SERVICES.items():
try:
loader = metric_service_class(config, api_client=api_client, cluster=cluster, executor=self.executor)
@@ -76,11 +79,11 @@ class MetricsLoader(Configurable):
async def gather_data(
self,
object: K8sObjectData,
- resource: ResourceType,
+ strategy: BaseStrategy,
period: datetime.timedelta,
*,
step: datetime.timedelta = datetime.timedelta(minutes=30),
- ) -> ResourceHistoryData:
+ ) -> MetricsPodData:
"""
Gathers data from Prometheus for a specified object and resource.
@@ -94,4 +97,9 @@ class MetricsLoader(Configurable):
ResourceHistoryData: The gathered resource history data.
"""
- return await self.loader.gather_data(object, resource, period, step)
+ await self.loader.add_historic_pods(object, period)
+
+ return {
+ MetricLoader: await self.loader.gather_data(object, MetricLoader, period, step)
+ for MetricLoader in strategy.metrics
+ }
diff --git a/robusta_krr/core/integrations/prometheus/metrics/__init__.py b/robusta_krr/core/integrations/prometheus/metrics/__init__.py
index 0852b67..5b9a45b 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/__init__.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/__init__.py
@@ -1,3 +1,3 @@
-from .base_metric import BaseMetricLoader, bind_metric
-from .cpu_metric import CPUMetricLoader
-from .memory_metric import MemoryMetricLoader
+from .cpu import CPULoader, MaxCPULoader
+from .memory import MemoryLoader, MaxMemoryLoader
+from .base import PrometheusMetric
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py
new file mode 100644
index 0000000..434ef0a
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/base.py
@@ -0,0 +1,230 @@
+from __future__ import annotations
+
+import abc
+import asyncio
+import datetime
+import enum
+from concurrent.futures import ThreadPoolExecutor
+from typing import Any, TYPE_CHECKING, Optional
+
+import numpy as np
+import pydantic as pd
+
+from robusta_krr.core.abstract.metrics import BaseMetric
+from robusta_krr.core.abstract.strategies import MetricPodData
+from robusta_krr.core.models.config import Config
+from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.utils.configurable import Configurable
+
+if TYPE_CHECKING:
+ from .. import CustomPrometheusConnect
+
+
+class QueryType(str, enum.Enum):
+ Query = "query"
+ QueryRange = "query_range"
+
+
+class PrometheusMetricData(pd.BaseModel):
+ query: str
+ start_time: datetime.datetime
+ end_time: datetime.datetime
+ step: str
+ type: QueryType
+
+
+class PrometheusMetric(BaseMetric, Configurable):
+ """
+ Base class for all metric loaders.
+
+ Metric loaders are used to load metrics from a specified source (like Prometheus in this case).
+ """
+
+ query_type: QueryType = QueryType.QueryRange
+
+ def __init__(
+ self,
+ config: Config,
+ prometheus: CustomPrometheusConnect,
+ service_name: str,
+ executor: Optional[ThreadPoolExecutor] = None,
+ ) -> None:
+ super().__init__(config)
+ self.prometheus = prometheus
+ self.service_name = service_name
+
+ self.executor = executor
+
+ def get_prometheus_cluster_label(self) -> str:
+ """
+ Generates the cluster label for querying a centralized Prometheus
+
+ Returns:
+ str: a promql safe label string for querying the cluster.
+ """
+ if self.config.prometheus_cluster_label is None:
+ return ""
+ return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"'
+
+ @abc.abstractmethod
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
+ """
+ This method should be implemented by all subclasses to provide a query string to fetch metrics.
+
+ Args:
+ object (K8sObjectData): The object for which metrics need to be fetched.
+ resolution (Optional[str]): a string for configurable resolution to the query.
+
+ Returns:
+ str: The query string.
+ """
+
+ pass
+
+ def _step_to_string(self, step: datetime.timedelta) -> str:
+ """
+ Converts step in datetime.timedelta format to a string format used by Prometheus.
+
+ Args:
+ step (datetime.timedelta): Step size in datetime.timedelta format.
+
+ Returns:
+ str: Step size in string format used by Prometheus.
+ """
+ if step.total_seconds() > 60 * 60 * 24:
+ return f"{int(step.total_seconds()) // (60 * 60 * 24)}d"
+ return f"{int(step.total_seconds()) // 60}m"
+
+ def _query_prometheus_sync(self, data: PrometheusMetricData) -> list[dict]:
+ if data.type == QueryType.QueryRange:
+ value = self.prometheus.custom_query_range(
+ query=data.query,
+ start_time=data.start_time,
+ end_time=data.end_time,
+ step=data.step,
+ )
+ return value
+ else:
+ # regular query, lighter on preformance
+ results = self.prometheus.custom_query(query=data.query)
+ # format the results to return the same format as custom_query_range
+ for result in results:
+ result["values"] = [result.pop("value")]
+ return results
+
+ async def query_prometheus(self, data: PrometheusMetricData) -> list[dict]:
+ """
+ Asynchronous method that queries Prometheus to fetch metrics.
+
+ Args:
+ metric (Metric): An instance of the Metric class specifying what metrics to fetch.
+
+ Returns:
+ list[dict]: A list of dictionary where each dictionary represents metrics for a pod.
+ """
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(self.executor, lambda: self._query_prometheus_sync(data))
+
+ async def load_data(
+ self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
+ ) -> MetricPodData:
+ """
+ Asynchronous method that loads metric data for a specific object.
+
+ Args:
+ object (K8sObjectData): The object for which metrics need to be loaded.
+ period (datetime.timedelta): The time period for which metrics need to be loaded.
+ step (datetime.timedelta): The time interval between successive metric values.
+
+ Returns:
+ ResourceHistoryData: An instance of the ResourceHistoryData class representing the loaded metrics.
+ """
+ resolution = f"{self._step_to_string(period)}:{self._step_to_string(step)}"
+ query = self.get_query(object, resolution)
+ end_time = datetime.datetime.now().astimezone()
+ start_time = end_time - period
+
+ result = await self.query_prometheus(
+ PrometheusMetricData(
+ query=query,
+ start_time=start_time,
+ end_time=end_time,
+ step=self._step_to_string(step),
+ type=self.query_type,
+ )
+ )
+
+ if result == []:
+ self.warning(f"{self.service_name} returned no {self.__class__.__name__} metrics for {object}")
+ return {}
+
+ return {pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result}
+
+
+class QueryRangeMetric(PrometheusMetric):
+ """This type of PrometheusMetric is used to query metrics for a specific time range."""
+
+ query_type = QueryType.QueryRange
+
+
+class QueryMetric(PrometheusMetric):
+ """This type of PrometheusMetric is used to query metrics for a specific time."""
+
+ query_type = QueryType.Query
+
+
+PrometheusSeries = Any
+
+
+class FilterMetric(PrometheusMetric):
+ """
+ This is the version of the BasicMetricLoader, that filters out data,
+ if multiple metrics with the same name were found.
+
+ Searches for the kubelet metric. If not found - returns first one in alphabetical order.
+ """
+
+ @staticmethod
+ def get_target_name(series: PrometheusSeries) -> Optional[str]:
+ for label in ["pod", "container", "node"]:
+ if label in series["metric"]:
+ return series["metric"][label]
+ return None
+
+ @staticmethod
+ def filter_prom_jobs_results(
+ series_list_result: list[PrometheusSeries],
+ ) -> list[PrometheusSeries]:
+ """
+ Because there might be multiple metrics with the same name, we need to filter them out.
+
+ :param series_list_result: list of PrometheusSeries
+ """
+
+ if len(series_list_result) == 1:
+ return series_list_result
+
+ target_names = {
+ FilterMetric.get_target_name(series)
+ for series in series_list_result
+ if FilterMetric.get_target_name(series)
+ }
+ return_list: list[PrometheusSeries] = []
+
+ # takes kubelet job if exists, return first job alphabetically if it doesn't
+ for target_name in target_names:
+ relevant_series = [
+ series for series in series_list_result if FilterMetric.get_target_name(series) == target_name
+ ]
+ relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"]
+ if len(relevant_kubelet_metric) == 1:
+ return_list.append(relevant_kubelet_metric[0])
+ continue
+ sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False)
+ return_list.append(sorted_relevant_series[0])
+ return return_list
+
+ async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusSeries]:
+ result = await super().query_prometheus(data)
+ return self.filter_prom_jobs_results(result)
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
deleted file mode 100644
index 5927aa4..0000000
--- a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
+++ /dev/null
@@ -1,62 +0,0 @@
-from typing import Any, Optional
-
-from robusta_krr.core.abstract.strategies import Metric
-
-from .base_metric import BaseMetricLoader, QueryType
-
-PrometheusSeries = Any
-
-
-class BaseFilteredMetricLoader(BaseMetricLoader):
- """
- This is the version of the BasicMetricLoader, that filters out data,
- if multiple metrics with the same name were found.
-
- Searches for the kubelet metric. If not found - returns first one in alphabetical order.
- """
-
- @staticmethod
- def get_target_name(series: PrometheusSeries) -> Optional[str]:
- for label in ["pod", "container", "node"]:
- if label in series["metric"]:
- return series["metric"][label]
- return None
-
- @staticmethod
- def filter_prom_jobs_results(
- series_list_result: list[PrometheusSeries],
- ) -> list[PrometheusSeries]:
- """
- Because there might be multiple metrics with the same name, we need to filter them out.
-
- :param series_list_result: list of PrometheusSeries
- """
-
- if len(series_list_result) == 1:
- return series_list_result
-
- target_names = {
- BaseFilteredMetricLoader.get_target_name(series)
- for series in series_list_result
- if BaseFilteredMetricLoader.get_target_name(series)
- }
- return_list: list[PrometheusSeries] = []
-
- # takes kubelet job if exists, return first job alphabetically if it doesn't
- for target_name in target_names:
- relevant_series = [
- series
- for series in series_list_result
- if BaseFilteredMetricLoader.get_target_name(series) == target_name
- ]
- relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"]
- if len(relevant_kubelet_metric) == 1:
- return_list.append(relevant_kubelet_metric[0])
- continue
- sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False)
- return_list.append(sorted_relevant_series[0])
- return return_list
-
- async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[PrometheusSeries]:
- result = await super().query_prometheus(metric, query_type)
- return self.filter_prom_jobs_results(result)
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
deleted file mode 100644
index 46150cb..0000000
--- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
+++ /dev/null
@@ -1,251 +0,0 @@
-from __future__ import annotations
-
-import abc
-import asyncio
-import datetime
-import enum
-from concurrent.futures import ThreadPoolExecutor
-from typing import TYPE_CHECKING, Callable, Optional, TypeVar
-
-import numpy as np
-
-from robusta_krr.core.abstract.strategies import Metric, ResourceHistoryData
-from robusta_krr.core.models.config import Config
-from robusta_krr.core.models.objects import K8sObjectData
-from robusta_krr.utils.configurable import Configurable
-
-if TYPE_CHECKING:
- from .. import CustomPrometheusConnect
-
-
-MetricsDictionary = dict[str, type["BaseMetricLoader"]]
-
-
-class QueryType(str, enum.Enum):
- Query = "query"
- QueryRange = "query_range"
-
-
-# A registry of metrics that can be used to fetch a corresponding metric loader.
-REGISTERED_METRICS: MetricsDictionary = {}
-STRATEGY_METRICS_OVERRIDES: dict[str, MetricsDictionary] = {}
-
-
-class BaseMetricLoader(Configurable, abc.ABC):
- """
- Base class for all metric loaders.
-
- Metric loaders are used to load metrics from a specified source (like Prometheus in this case).
- """
-
- def __init__(
- self,
- config: Config,
- prometheus: CustomPrometheusConnect,
- executor: Optional[ThreadPoolExecutor] = None,
- ) -> None:
- super().__init__(config)
- self.prometheus = prometheus
-
- self.executor = executor
-
- def get_prometheus_cluster_label(self) -> str:
- """
- Generates the cluster label for querying a centralized Prometheus
-
- Returns:
- str: a promql safe label string for querying the cluster.
- """
- if self.config.prometheus_cluster_label is None:
- return ""
- return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"'
-
- @abc.abstractmethod
- def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
- """
- This method should be implemented by all subclasses to provide a query string to fetch metrics.
-
- Args:
- object (K8sObjectData): The object for which metrics need to be fetched.
- resolution (Optional[str]): a string for configurable resolution to the query.
-
- Returns:
- str: The query string.
- """
-
- pass
-
- def get_query_type(self) -> QueryType:
- return QueryType.QueryRange
-
- def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
- """
- This method should be implemented by all subclasses to provide a query string in the metadata to produce relevant graphs.
-
- Args:
- object (K8sObjectData): The object for which metrics need to be fetched.
- resolution (Optional[str]): a string for configurable resolution to the query.
-
- Returns:
- str: The query string.
- """
- return self.get_query(object, resolution)
-
- def _step_to_string(self, step: datetime.timedelta) -> str:
- """
- Converts step in datetime.timedelta format to a string format used by Prometheus.
-
- Args:
- step (datetime.timedelta): Step size in datetime.timedelta format.
-
- Returns:
- str: Step size in string format used by Prometheus.
- """
- if step.total_seconds() > 60 * 60 * 24:
- return f"{int(step.total_seconds()) // (60 * 60 * 24)}d"
- return f"{int(step.total_seconds()) // 60}m"
-
- def query_prometheus_thread(self, metric: Metric, query_type: QueryType) -> list[dict]:
- if query_type == QueryType.QueryRange:
- value = self.prometheus.custom_query_range(
- query=metric.query,
- start_time=metric.start_time,
- end_time=metric.end_time,
- step=metric.step,
- )
- return value
-
- # regular query, lighter on preformance
- results = self.prometheus.custom_query(query=metric.query)
- # format the results to return the same format as custom_query_range
- for result in results:
- result["values"] = [result.pop("value")]
- return results
-
- async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[dict]:
- """
- Asynchronous method that queries Prometheus to fetch metrics.
-
- Args:
- metric (Metric): An instance of the Metric class specifying what metrics to fetch.
-
- Returns:
- list[dict]: A list of dictionary where each dictionary represents metrics for a pod.
- """
-
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(
- self.executor,
- lambda: self.query_prometheus_thread(metric=metric, query_type=query_type),
- )
-
- async def load_data(
- self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta, service_name: str
- ) -> ResourceHistoryData:
- """
- Asynchronous method that loads metric data for a specific object.
-
- Args:
- object (K8sObjectData): The object for which metrics need to be loaded.
- period (datetime.timedelta): The time period for which metrics need to be loaded.
- step (datetime.timedelta): The time interval between successive metric values.
-
- Returns:
- ResourceHistoryData: An instance of the ResourceHistoryData class representing the loaded metrics.
- """
- resolution = f"{self._step_to_string(period)}:{self._step_to_string(step)}"
- query = self.get_query(object, resolution)
- query_type = self.get_query_type()
- end_time = datetime.datetime.now().astimezone()
- metric = Metric(
- query=query,
- start_time=end_time - period,
- end_time=end_time,
- step=self._step_to_string(step),
- )
- result = await self.query_prometheus(metric=metric, query_type=query_type)
- # adding the query in the results for a graph
- metric.query = self.get_graph_query(object, resolution)
-
- if result == []:
- self.warning(f"{service_name} returned no {self.__class__.__name__} metrics for {object}")
- return ResourceHistoryData(metric=metric, data={})
-
- return ResourceHistoryData(
- metric=metric,
- data={
- pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result
- },
- )
-
- @staticmethod
- def get_by_resource(resource: str, strategy: str) -> type[BaseMetricLoader]:
- """
- Fetches the metric loader corresponding to the specified resource.
-
- Args:
- resource (str): The name of the resource.
- resource (str): The name of the strategy.
-
- Returns:
- type[BaseMetricLoader]: The class of the metric loader corresponding to the resource.
-
- Raises:
- KeyError: If the specified resource is not registered.
- """
-
- try:
- lower_strategy = strategy.lower()
- if (
- lower_strategy
- and lower_strategy in STRATEGY_METRICS_OVERRIDES
- and resource in STRATEGY_METRICS_OVERRIDES[lower_strategy]
- ):
- return STRATEGY_METRICS_OVERRIDES[lower_strategy][resource]
- return REGISTERED_METRICS[resource]
- except KeyError as e:
- raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e
-
-
-Self = TypeVar("Self", bound=BaseMetricLoader)
-
-
-def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]:
- """
- A decorator that binds a metric loader to a resource.
-
- Args:
- resource (str): The name of the resource.
-
- Returns:
- Callable[[type[Self]], type[Self]]: The decorator that does the binding.
- """
-
- def decorator(cls: type[Self]) -> type[Self]:
- REGISTERED_METRICS[resource] = cls
- return cls
-
- return decorator
-
-
-# This is a temporary solutions, metric loaders will be moved to strategy in the future
-def override_metric(strategy: str, resource: str) -> Callable[[type[Self]], type[Self]]:
- """
- A decorator that overrides the bound metric on a specific strategy.
-
- Args:
- strategy (str): The name of the strategy for this metric.
- resource (str): The name of the resource.
-
- Returns:
- Callable[[type[Self]], type[Self]]: The decorator that does the binding.
- """
-
- def decorator(cls: type[Self]) -> type[Self]:
- lower_strategy = strategy.lower()
- if lower_strategy not in STRATEGY_METRICS_OVERRIDES:
- STRATEGY_METRICS_OVERRIDES[lower_strategy] = {}
- STRATEGY_METRICS_OVERRIDES[lower_strategy][resource] = cls
- return cls
-
- return decorator
diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu.py b/robusta_krr/core/integrations/prometheus/metrics/cpu.py
new file mode 100644
index 0000000..28b5a80
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/cpu.py
@@ -0,0 +1,33 @@
+from typing import Optional
+
+from robusta_krr.core.models.objects import K8sObjectData
+
+from .base import QueryMetric, QueryRangeMetric, FilterMetric
+
+
+class CPULoader(QueryRangeMetric, FilterMetric):
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
+ pods_selector = "|".join(pod.name for pod in object.pods)
+ cluster_label = self.get_prometheus_cluster_label()
+ return (
+ "sum(irate(container_cpu_usage_seconds_total{"
+ f'namespace="{object.namespace}", '
+ f'pod=~"{pods_selector}", '
+ f'container="{object.container}"'
+ f"{cluster_label}"
+ "}[5m])) by (container, pod, job)"
+ )
+
+
+class MaxCPULoader(QueryMetric, FilterMetric):
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
+ pods_selector = "|".join(pod.name for pod in object.pods)
+ cluster_label = self.get_prometheus_cluster_label()
+ return (
+ "sum(irate(container_cpu_usage_seconds_total{"
+ f'namespace="{object.namespace}", '
+ f'pod=~"{pods_selector}", '
+ f'container="{object.container}"'
+ f"{cluster_label}"
+ "}[5m])) by (container, pod, job)"
+ )
diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
deleted file mode 100644
index a604652..0000000
--- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from typing import Optional
-
-from robusta_krr.core.models.allocations import ResourceType
-from robusta_krr.core.models.objects import K8sObjectData
-
-from .base_filtered_metric import BaseFilteredMetricLoader
-from .base_metric import QueryType, bind_metric
-
-
-@bind_metric(ResourceType.CPU)
-class CPUMetricLoader(BaseFilteredMetricLoader):
- def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
- pods_selector = "|".join(pod.name for pod in object.pods)
- cluster_label = self.get_prometheus_cluster_label()
- return (
- "sum(irate(container_cpu_usage_seconds_total{"
- f'namespace="{object.namespace}", '
- f'pod=~"{pods_selector}", '
- f'container="{object.container}"'
- f"{cluster_label}"
- "}[5m])) by (container, pod, job)"
- )
-
- def get_query_type(self) -> QueryType:
- return QueryType.QueryRange
diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory.py
index fa4fd87..00c19f8 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/memory.py
@@ -1,14 +1,11 @@
from typing import Optional
-from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData
-from .base_filtered_metric import BaseFilteredMetricLoader
-from .base_metric import QueryType, bind_metric, override_metric
+from .base import QueryMetric, QueryRangeMetric, FilterMetric
-@bind_metric(ResourceType.Memory)
-class MemoryMetricLoader(BaseFilteredMetricLoader):
+class MemoryLoader(QueryRangeMetric, FilterMetric):
def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
@@ -21,17 +18,8 @@ class MemoryMetricLoader(BaseFilteredMetricLoader):
"}) by (container, pod, job, id)"
)
- def get_query_type(self) -> QueryType:
- return QueryType.QueryRange
-
-
-# This is a temporary solutions, metric loaders will be moved to strategy in the future
-@override_metric("simple", ResourceType.Memory)
-class SimpleMemoryMetricLoader(MemoryMetricLoader):
- """
- A class that overrides the memory metric on the simple strategy.
- """
+class MaxMemoryLoader(QueryMetric, FilterMetric):
def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
@@ -45,9 +33,3 @@ class SimpleMemoryMetricLoader(MemoryMetricLoader):
f"{resolution_formatted}"
f")"
)
-
- def get_query_type(self) -> QueryType:
- return QueryType.Query
-
- def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
- return super().get_query(object, resolution)
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
index e2654e8..6501f7c 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
@@ -5,12 +5,13 @@ from typing import List, Optional
from kubernetes.client.api_client import ApiClient
-from robusta_krr.core.abstract.strategies import ResourceHistoryData
+from robusta_krr.core.abstract.strategies import MetricPodData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
-from robusta_krr.core.models.result import ResourceType
from robusta_krr.utils.configurable import Configurable
+from ..metrics import PrometheusMetric
+
class MetricsNotFound(Exception):
"""
@@ -37,6 +38,7 @@ class MetricsService(Configurable, abc.ABC):
def check_connection(self):
...
+ @property
def name(self) -> str:
classname = self.__class__.__name__
return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname
@@ -49,10 +51,10 @@ class MetricsService(Configurable, abc.ABC):
async def gather_data(
self,
object: K8sObjectData,
- resource: ResourceType,
+ LoaderClass: type[PrometheusMetric],
period: datetime.timedelta,
step: datetime.timedelta = datetime.timedelta(minutes=30),
- ) -> ResourceHistoryData:
+ ) -> MetricPodData:
...
def get_prometheus_cluster_label(self) -> str:
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 5b22472..b380abc 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -1,19 +1,18 @@
import asyncio
import datetime
-from typing import List, Optional, Type
+from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor
from kubernetes.client import ApiClient
from prometheus_api_client import PrometheusApiClientException
from requests.exceptions import ConnectionError, HTTPError
-from robusta_krr.core.abstract.strategies import ResourceHistoryData
+from robusta_krr.core.abstract.strategies import MetricPodData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData, PodData
-from robusta_krr.core.models.result import ResourceType
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery
-from ..metrics import BaseMetricLoader
+from ..metrics import PrometheusMetric
from ..prometheus_client import ClusterNotSpecifiedException, CustomPrometheusConnect
from .base_metric_service import MetricsNotFound, MetricsService
@@ -54,34 +53,35 @@ class PrometheusMetricsService(MetricsService):
A class for fetching metrics from Prometheus.
"""
+ service_discovery: type[MetricsServiceDiscovery] = PrometheusDiscovery
+
def __init__(
self,
config: Config,
*,
cluster: Optional[str] = None,
api_client: Optional[ApiClient] = None,
- service_discovery: Type[MetricsServiceDiscovery] = PrometheusDiscovery,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(config=config, api_client=api_client, cluster=cluster, executor=executor)
- self.info(f"Connecting to {self.name()} for {self.cluster} cluster")
+ self.info(f"Connecting to {self.name} for {self.cluster} cluster")
self.auth_header = self.config.prometheus_auth_header
self.ssl_enabled = self.config.prometheus_ssl_enabled
- self.prometheus_discovery = service_discovery(config=self.config, api_client=self.api_client)
+ self.prometheus_discovery = self.service_discovery(config=self.config, api_client=self.api_client)
self.url = self.config.prometheus_url
self.url = self.url or self.prometheus_discovery.find_metrics_url()
if not self.url:
raise PrometheusNotFound(
- f"{self.name()} instance could not be found while scanning in {self.cluster} cluster.\n"
+ f"{self.name} instance could not be found while scanning in {self.cluster} cluster.\n"
"\tTry using port-forwarding and/or setting the url manually (using the -p flag.)."
)
- self.info(f"Using {self.name()} at {self.url} for cluster {cluster or 'default'}")
+ self.info(f"Using {self.name} at {self.url} for cluster {cluster or 'default'}")
headers = self.config.prometheus_other_headers
@@ -146,20 +146,17 @@ class PrometheusMetricsService(MetricsService):
async def gather_data(
self,
object: K8sObjectData,
- resource: ResourceType,
+ LoaderClass: type[PrometheusMetric],
period: datetime.timedelta,
step: datetime.timedelta = datetime.timedelta(minutes=30),
- ) -> ResourceHistoryData:
+ ) -> MetricPodData:
"""
ResourceHistoryData: The gathered resource history data.
"""
- self.debug(f"Gathering data for {object} and {resource}")
-
- MetricLoaderType = BaseMetricLoader.get_by_resource(resource, self.config.strategy)
- await self.add_historic_pods(object, period)
+ self.debug(f"Gathering data for {object} and {LoaderClass}")
- metric_loader = MetricLoaderType(self.config, self.prometheus, self.executor)
- return await metric_loader.load_data(object, period, step, self.name())
+ metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
+ return await metric_loader.load_data(object, period, step)
async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py
index f1de8b2..3b7ef85 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py
@@ -1,9 +1,7 @@
-from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from kubernetes.client import ApiClient
-from robusta_krr.core.models.config import Config
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery
from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService
@@ -42,21 +40,7 @@ class ThanosMetricsService(PrometheusMetricsService):
A class for fetching metrics from Thanos.
"""
- def __init__(
- self,
- config: Config,
- *,
- cluster: Optional[str] = None,
- api_client: Optional[ApiClient] = None,
- executor: Optional[ThreadPoolExecutor] = None,
- ) -> None:
- super().__init__(
- config=config,
- cluster=cluster,
- api_client=api_client,
- service_discovery=ThanosMetricsDiscovery,
- executor=executor,
- )
+ service_discovery = ThanosMetricsDiscovery
def check_connection(self):
"""
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py
index fd9f8a9..925136a 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py
@@ -1,9 +1,7 @@
-from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from kubernetes.client import ApiClient
-from robusta_krr.core.models.config import Config
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery
from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService
@@ -41,21 +39,7 @@ class VictoriaMetricsService(PrometheusMetricsService):
A class for fetching metrics from Victoria Metrics.
"""
- def __init__(
- self,
- config: Config,
- *,
- cluster: Optional[str] = None,
- api_client: Optional[ApiClient] = None,
- executor: Optional[ThreadPoolExecutor] = None,
- ) -> None:
- super().__init__(
- config=config,
- cluster=cluster,
- api_client=api_client,
- service_discovery=VictoriaMetricsDiscovery,
- executor=executor,
- )
+ service_discovery = VictoriaMetricsDiscovery
def check_connection(self):
"""
diff --git a/robusta_krr/core/integrations/prometheus/prometheus_client.py b/robusta_krr/core/integrations/prometheus/prometheus_client.py
index 824cd92..ac93608 100644
--- a/robusta_krr/core/integrations/prometheus/prometheus_client.py
+++ b/robusta_krr/core/integrations/prometheus/prometheus_client.py
@@ -19,6 +19,7 @@ class CustomPrometheusConnect(PrometheusConnect):
Custom PrometheusConnect class to handle retries.
"""
+ @no_type_check
def __init__(
self,
url: str = "http://127.0.0.1:9090",
@@ -31,6 +32,7 @@ class CustomPrometheusConnect(PrometheusConnect):
self._session = requests.Session()
self._session.mount(self.url, HTTPAdapter(max_retries=retry, pool_maxsize=10, pool_block=True))
+ @no_type_check
def custom_query(self, query: str, params: dict = None):
params = params or {}
data = None
@@ -52,6 +54,7 @@ class CustomPrometheusConnect(PrometheusConnect):
return data
+ @no_type_check
def custom_query_range(
self,
query: str,
diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py
index 68f36cc..4d3fe38 100644
--- a/robusta_krr/core/models/result.py
+++ b/robusta_krr/core/models/result.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-from datetime import datetime
from typing import Any, Optional, Union
import pydantic as pd
@@ -22,26 +21,13 @@ class ResourceRecommendation(pd.BaseModel):
info: dict[ResourceType, Optional[str]]
-class Metric(pd.BaseModel):
- query: str
- start_time: datetime
- end_time: datetime
- step: str
-
-
-MetricsData = dict[ResourceType, Metric]
-
-
class ResourceScan(pd.BaseModel):
object: K8sObjectData
recommended: ResourceRecommendation
severity: Severity
- metrics: MetricsData
@classmethod
- def calculate(
- cls, object: K8sObjectData, recommendation: ResourceAllocations, metrics: MetricsData
- ) -> ResourceScan:
+ def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan:
recommendation_processed = ResourceRecommendation(requests={}, limits={}, info={})
for resource_type in ResourceType:
@@ -61,11 +47,9 @@ class ResourceScan(pd.BaseModel):
for selector in ["requests", "limits"]:
for recommendation_request in getattr(recommendation_processed, selector).values():
if recommendation_request.severity == severity:
- return cls(
- object=object, recommended=recommendation_processed, severity=severity, metrics=metrics
- )
+ return cls(object=object, recommended=recommendation_processed, severity=severity)
- return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN, metrics=metrics)
+ return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN)
class StrategyData(pd.BaseModel):
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 68ceb18..d3d1f9b 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -5,11 +5,14 @@ from typing import Optional, Union
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
from robusta_krr.core.integrations.kubernetes import KubernetesLoader
-from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound
+from robusta_krr.core.integrations.prometheus import (
+ ClusterNotSpecifiedException,
+ PrometheusMetricsLoader,
+ PrometheusNotFound,
+)
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.models.result import (
- MetricsData,
ResourceAllocations,
ResourceScan,
ResourceType,
@@ -28,17 +31,17 @@ class Runner(Configurable):
def __init__(self, config: Config) -> None:
super().__init__(config)
self._k8s_loader = KubernetesLoader(self.config)
- self._metrics_service_loaders: dict[Optional[str], Union[MetricsLoader, Exception]] = {}
+ self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = self.config.create_strategy()
# This executor will be running calculations for recommendations
self._executor = ThreadPoolExecutor(self.config.max_workers)
- def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[MetricsLoader]:
+ def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]:
if cluster not in self._metrics_service_loaders:
try:
- self._metrics_service_loaders[cluster] = MetricsLoader(self.config, cluster=cluster)
+ self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(self.config, cluster=cluster)
except Exception as e:
self._metrics_service_loaders[cluster] = e
@@ -104,38 +107,29 @@ class Runner(Configurable):
for resource, recommendation in result.items()
}
- async def _calculate_object_recommendations(self, object: K8sObjectData) -> tuple[RunResult, MetricsData]:
+ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult:
prometheus_loader = self._get_prometheus_loader(object.cluster)
if prometheus_loader is None:
- return {resource: ResourceRecommendation.undefined() for resource in ResourceType}, {}
-
- data_tuple = await asyncio.gather(
- *[
- prometheus_loader.gather_data(
- object,
- resource,
- self._strategy.settings.history_timedelta,
- step=self._strategy.settings.timeframe_timedelta,
- )
- for resource in ResourceType
- ]
+ return {resource: ResourceRecommendation.undefined() for resource in ResourceType}
+
+ metrics = await prometheus_loader.gather_data(
+ object,
+ self._strategy,
+ self._strategy.settings.history_timedelta,
+ step=self._strategy.settings.timeframe_timedelta,
)
- data = dict(zip(ResourceType, data_tuple))
- metrics = {resource: data[resource].metric for resource in ResourceType}
self.__progressbar.progress()
# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(self._executor, self._strategy.run, data, object)
- return self._format_result(result), metrics
+ result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object)
+ return self._format_result(result)
- async def _gather_objects_recommendations(
- self, objects: list[K8sObjectData]
- ) -> list[tuple[ResourceAllocations, MetricsData]]:
- recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather(
+ async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]:
+ recommendations: list[RunResult] = await asyncio.gather(
*[self._calculate_object_recommendations(object) for object in objects]
)
@@ -145,10 +139,9 @@ class Runner(Configurable):
requests={resource: recommendation[resource].request for resource in ResourceType},
limits={resource: recommendation[resource].limit for resource in ResourceType},
info={resource: recommendation[resource].info for resource in ResourceType},
- ),
- metric,
+ )
)
- for recommendation, metric in recommendations
+ for recommendation in recommendations
]
async def _collect_result(self) -> Result:
@@ -178,8 +171,7 @@ class Runner(Configurable):
return Result(
scans=[
- ResourceScan.calculate(obj, recommended, metrics)
- for obj, (recommended, metrics) in zip(objects, resource_recommendations)
+ ResourceScan.calculate(obj, recommended) for obj, recommended in zip(objects, resource_recommendations)
],
description=self._strategy.description,
strategy=StrategyData(
diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py
index b83de50..c476d08 100644
--- a/robusta_krr/strategies/simple.py
+++ b/robusta_krr/strategies/simple.py
@@ -1,16 +1,17 @@
import numpy as np
import pydantic as pd
-from numpy.typing import NDArray
from robusta_krr.core.abstract.strategies import (
BaseStrategy,
- HistoryData,
+ MetricPodData,
+ MetricsPodData,
K8sObjectData,
ResourceRecommendation,
ResourceType,
RunResult,
StrategySettings,
)
+from robusta_krr.core.integrations.prometheus.metrics import MaxCPULoader, MaxMemoryLoader
class SimpleStrategySettings(StrategySettings):
@@ -19,14 +20,14 @@ class SimpleStrategySettings(StrategySettings):
5, gt=0, description="The percentage of added buffer to the peak memory usage for memory recommendation."
)
- def calculate_memory_proposal(self, data: dict[str, NDArray[np.float64]]) -> float:
+ def calculate_memory_proposal(self, data: MetricPodData) -> float:
data_ = [np.max(values[:, 1]) for values in data.values()]
if len(data_) == 0:
return float("NaN")
return max(data_) * (1 + self.memory_buffer_percentage / 100)
- def calculate_cpu_proposal(self, data: dict[str, NDArray[np.float64]]) -> float:
+ def calculate_cpu_proposal(self, data: MetricPodData) -> float:
if len(data) == 0:
return float("NaN")
@@ -49,32 +50,42 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
Learn more: [underline]https://github.com/robusta-dev/krr#algorithm[/underline]
"""
- __display_name__ = "simple"
- __rich_console__ = True
+ display_name = "simple"
+ rich_console = True
+ metrics = [MaxCPULoader, MaxMemoryLoader]
- def __calculate_cpu_proposal(self, history_data: HistoryData, object_data: K8sObjectData) -> ResourceRecommendation:
- if len(history_data[ResourceType.CPU].data) == 0:
+ def __calculate_cpu_proposal(
+ self, history_data: MetricsPodData, object_data: K8sObjectData
+ ) -> ResourceRecommendation:
+ data = history_data[MaxCPULoader]
+
+ if len(data) == 0:
return ResourceRecommendation.undefined(info="No data")
if object_data.hpa is not None and object_data.hpa.target_cpu_utilization_percentage is not None:
return ResourceRecommendation.undefined(info="HPA detected")
- cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU].data)
+ cpu_usage = self.settings.calculate_cpu_proposal(data)
return ResourceRecommendation(request=cpu_usage, limit=None)
def __calculate_memory_proposal(
- self, history_data: HistoryData, object_data: K8sObjectData
+ self, history_data: MetricsPodData, object_data: K8sObjectData
) -> ResourceRecommendation:
- if len(history_data[ResourceType.Memory].data) == 0:
+ data = history_data[MaxMemoryLoader]
+
+ if len(data) == 0:
return ResourceRecommendation.undefined(info="No data")
if object_data.hpa is not None and object_data.hpa.target_memory_utilization_percentage is not None:
return ResourceRecommendation.undefined(info="HPA detected")
- memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory].data)
+ memory_usage = self.settings.calculate_memory_proposal(data)
return ResourceRecommendation(request=memory_usage, limit=memory_usage)
- def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult:
+ def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult:
+ from pprint import pprint
+
+ pprint(history_data)
return {
ResourceType.CPU: self.__calculate_cpu_proposal(history_data, object_data),
ResourceType.Memory: self.__calculate_memory_proposal(history_data, object_data),
diff --git a/robusta_krr/utils/display_name.py b/robusta_krr/utils/display_name.py
deleted file mode 100644
index ef9526a..0000000
--- a/robusta_krr/utils/display_name.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from typing import Any, Callable, TypeVar
-
-_T = TypeVar("_T")
-
-
-def display_name_property(*, suffix: str) -> Callable[[type[_T]], type[_T]]:
- """Add a decorator factory to add __display_name__ property to the class.
-
- It is a utility function for BaseStrategy.
- It makes a __display_name__ property for the class, that uses the name of the class.
- By default, it will remove the suffix from the name of the class.
- For example, if the name of the class is 'MyStrategy', the __display_name__ property will be 'My'.
- If the name of the class is 'Foo', the __display_name__ property will be 'Foo', because it does not end with 'Strategy'.
-
- If you then override the __display_name__ property, it will be used instead of the default one.
- """
-
- def decorator(cls: type[_T]) -> type[_T]:
- class DisplayNameProperty:
- # This is a descriptor that returns the name of the class.
- # It is used to generate the __display_name__ property.
- def __get__(self, instance: Any, owner: type[_T]) -> str:
- if owner.__name__.lower().endswith(suffix.lower()):
- return owner.__name__[: -len(suffix)]
-
- return owner.__name__
-
- cls.__display_name__ = DisplayNameProperty() # type: ignore
- return cls
-
- return decorator
diff --git a/robusta_krr/utils/progress_bar.py b/robusta_krr/utils/progress_bar.py
index 32bf211..621dc18 100644
--- a/robusta_krr/utils/progress_bar.py
+++ b/robusta_krr/utils/progress_bar.py
@@ -14,7 +14,7 @@ class ProgressBar(Configurable):
def __init__(self, config: Config, **kwargs) -> None:
super().__init__(config)
- self.show_bar = self.echo_active
+ self.show_bar = self.echo_active and False
if self.show_bar:
self.alive_bar = alive_bar(**kwargs)