summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorПавел Жуков <33721692+LeaveMyYard@users.noreply.github.com>2023-05-23 22:36:50 +0300
committerПавел Жуков <33721692+LeaveMyYard@users.noreply.github.com>2023-05-23 22:36:50 +0300
commit5f3fe62e81ecaddcdfec2b41167cc4f16501191b (patch)
treea0d7bc2d5710015a7be42652a7d974ada5d4e892
parentc21b87db6bd330906116d31f4e308b2eced591e7 (diff)
parentbdf27e1dfa394204412228cd61589caf7a0dd9d2 (diff)
Merge branch 'main' into loading_bar
-rw-r--r--robusta_krr/core/abstract/strategies.py13
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py3
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py1
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_metric.py35
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py12
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/memory_metric.py12
-rw-r--r--robusta_krr/core/models/result.py10
-rw-r--r--robusta_krr/core/runner.py28
-rw-r--r--robusta_krr/strategies/simple.py4
9 files changed, 74 insertions, 44 deletions
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py
index a46533c..9c4f133 100644
--- a/robusta_krr/core/abstract/strategies.py
+++ b/robusta_krr/core/abstract/strategies.py
@@ -29,7 +29,7 @@ class StrategySettings(pd.BaseModel):
history_duration: float = pd.Field(
24 * 7 * 2, ge=1, description="The duration of the history data to use (in hours)."
)
- timeframe_duration: float = pd.Field(15, ge=1, description="The step for the history data (in minutes).")
+ timeframe_duration: float = pd.Field(2, ge=1, description="The step for the history data (in minutes).")
@property
def history_timedelta(self) -> datetime.timedelta:
@@ -43,7 +43,16 @@ class StrategySettings(pd.BaseModel):
_StrategySettings = TypeVar("_StrategySettings", bound=StrategySettings)
ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]]
-ResourceHistoryData = dict[str, ArrayNx2]
+
+
+class ResourceHistoryData(pd.BaseModel):
+ query: str # The query used to get the data
+ data: dict[str, ArrayNx2] # Mapping: pod -> (time, value)
+
+ class Config:
+ arbitrary_types_allowed = True
+
+
HistoryData = dict[ResourceType, ResourceHistoryData]
RunResult = dict[ResourceType, ResourceRecommendation]
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index 685dd1c..b543bd8 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -18,9 +18,6 @@ from robusta_krr.utils.service_discovery import ServiceDiscovery
from .metrics import BaseMetricLoader
-import numpy as np
-from numpy.typing import NDArray
-
class PrometheusDiscovery(ServiceDiscovery):
def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
index 80408c5..a409810 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
@@ -21,6 +21,7 @@ class BaseFilteredMetricLoader(BaseMetricLoader):
return series["metric"][label]
return None
+ # TODO: Rework this, as now our query can return multiple metrics for different pods
@staticmethod
def filter_prom_jobs_results(
series_list_result: list[PrometheusSeries],
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
index be180f6..2547d16 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
@@ -24,7 +24,7 @@ class BaseMetricLoader(Configurable, abc.ABC):
self.prometheus = prometheus
@abc.abstractmethod
- def get_query(self, namespace: str, pod: str, container: str) -> str:
+ def get_query(self, object: K8sObjectData) -> str:
...
async def query_prometheus(
@@ -41,28 +41,25 @@ class BaseMetricLoader(Configurable, abc.ABC):
async def load_data(
self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
) -> ResourceHistoryData:
- result = await asyncio.gather(
- *[
- self.query_prometheus(
- query=self.get_query(object.namespace, pod.name, object.container),
- start_time=datetime.datetime.now() - period,
- end_time=datetime.datetime.now(),
- step=step,
- )
- for pod in object.pods
- ]
+ query = self.get_query(object)
+ result = await self.query_prometheus(
+ query=query,
+ start_time=datetime.datetime.now() - period,
+ end_time=datetime.datetime.now(),
+ step=step,
)
if result == []:
self.warning(f"Prometheus returned no {self.__class__.__name__} metrics for {object}")
- return {pod.name: np.array([]) for pod in object.pods}
-
- pod_results = {pod: result[i] for i, pod in enumerate(object.pods)}
- return {
- pod.name: np.array([(timestamp, value) for timestamp, value in pod_result[0]["values"]], dtype=np.float64)
- for pod, pod_result in pod_results.items()
- if pod_result != []
- }
+ return ResourceHistoryData(query=query, data={})
+
+ return ResourceHistoryData(
+ query=query,
+ data={
+ pod_result['metric']['pod']: np.array(pod_result["values"], dtype=np.float64)
+ for pod_result in result
+ },
+ )
@staticmethod
def get_by_resource(resource: str) -> type[BaseMetricLoader]:
diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
index ffbaa91..dda7d14 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
@@ -2,9 +2,17 @@ from robusta_krr.core.models.allocations import ResourceType
from .base_metric import bind_metric
from .base_filtered_metric import BaseFilteredMetricLoader
+from robusta_krr.core.models.objects import K8sObjectData
@bind_metric(ResourceType.CPU)
class CPUMetricLoader(BaseFilteredMetricLoader):
- def get_query(self, namespace: str, pod: str, container: str) -> str:
- return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)'
+ def get_query(self, object: K8sObjectData) -> str:
+ pods_selector = "|".join(pod.name for pod in object.pods)
+ return (
+ 'sum(irate(container_cpu_usage_seconds_total{'
+ f'namespace="{object.namespace}", '
+ f'pod=~"{pods_selector}", '
+ f'container="{object.container}"'
+ '}[5m])) by (container, pod, job)'
+ )
diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
index 3355ed6..33ed733 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
@@ -2,9 +2,17 @@ from robusta_krr.core.models.allocations import ResourceType
from .base_metric import bind_metric
from .base_filtered_metric import BaseFilteredMetricLoader
+from robusta_krr.core.models.objects import K8sObjectData
@bind_metric(ResourceType.Memory)
class MemoryMetricLoader(BaseFilteredMetricLoader):
- def get_query(self, namespace: str, pod: str, container: str) -> str:
- return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)'
+ def get_query(self, object: K8sObjectData) -> str:
+ pods_selector = "|".join(pod.name for pod in object.pods)
+ return (
+ 'sum(container_memory_working_set_bytes{'
+ f'namespace="{object.namespace}", '
+ f'pod=~"{pods_selector}", '
+ f'container="{object.container}"'
+ '}) by (container, pod, job)'
+ )
diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py
index 1e944e7..5119de8 100644
--- a/robusta_krr/core/models/result.py
+++ b/robusta_krr/core/models/result.py
@@ -60,13 +60,17 @@ class ResourceRecommendation(pd.BaseModel):
limits: dict[ResourceType, RecommendationValue]
+MetricsData = dict[ResourceType, str]
+
+
class ResourceScan(pd.BaseModel):
object: K8sObjectData
recommended: ResourceRecommendation
severity: Severity
+ metrics: MetricsData
@classmethod
- def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan:
+ def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations, metrics: MetricsData) -> ResourceScan:
recommendation_processed = ResourceRecommendation(requests={}, limits={})
for resource_type in ResourceType:
@@ -84,9 +88,9 @@ class ResourceScan(pd.BaseModel):
for selector in ["requests", "limits"]:
for recommendation_request in getattr(recommendation_processed, selector).values():
if recommendation_request.severity == severity:
- return cls(object=object, recommended=recommendation_processed, severity=severity)
+ return cls(object=object, recommended=recommendation_processed, severity=severity, metrics=metrics)
- return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN)
+ return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN, metrics=metrics)
class Result(pd.BaseModel):
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index b0779c2..665dd0d 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -7,7 +7,7 @@ from robusta_krr.core.integrations.kubernetes import KubernetesLoader
from robusta_krr.core.integrations.prometheus import PrometheusLoader, PrometheusNotFound
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
-from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result
+from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, MetricsData
from robusta_krr.utils.configurable import Configurable
from robusta_krr.utils.logo import ASCII_LOGO
from robusta_krr.utils.version import get_version
@@ -91,11 +91,12 @@ class Runner(Configurable):
for resource, recommendation in result.items()
}
- async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult:
+ async def _calculate_object_recommendations(self, object: K8sObjectData) -> tuple[RunResult, MetricsData]:
prometheus_loader = self._get_prometheus_loader(object.cluster)
if prometheus_loader is None:
- return {resource: ResourceRecommendation.undefined() for resource in ResourceType}
+ return {resource: ResourceRecommendation.undefined() for resource in ResourceType}, {}
+
data_tuple = await asyncio.gather(
*[
prometheus_loader.gather_data(
@@ -108,23 +109,28 @@ class Runner(Configurable):
]
)
data = dict(zip(ResourceType, data_tuple))
+ queries = {resource: data[resource].query for resource in ResourceType}
+
self.__progressbar.progress()
+
# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
result = await asyncio.to_thread(self._strategy.run, data, object)
- return self._format_result(result)
+ return self._format_result(result), queries
- async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]:
- recommendations: list[RunResult] = await asyncio.gather(
+ async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[tuple[ResourceAllocations, MetricsData]]:
+ recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather(
*[self._calculate_object_recommendations(object) for object in objects]
)
return [
- ResourceAllocations(
- requests={resource: recommendation[resource].request for resource in ResourceType},
- limits={resource: recommendation[resource].limit for resource in ResourceType},
+ (
+ ResourceAllocations(
+ requests={resource: recommendation[resource].request for resource in ResourceType},
+ limits={resource: recommendation[resource].limit for resource in ResourceType},
+ ), metric
)
- for recommendation in recommendations
+ for recommendation, metric in recommendations
]
async def _collect_result(self) -> Result:
@@ -145,7 +151,7 @@ class Runner(Configurable):
return Result(
scans=[
- ResourceScan.calculate(obj, recommended) for obj, recommended in zip(objects, resource_recommendations)
+ ResourceScan.calculate(obj, recommended, metrics) for obj, (recommended, metrics) in zip(objects, resource_recommendations)
],
description=self._strategy.description,
)
diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py
index c360a69..fa946da 100644
--- a/robusta_krr/strategies/simple.py
+++ b/robusta_krr/strategies/simple.py
@@ -51,8 +51,8 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
__rich_console__ = True
def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult:
- cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU])
- memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory])
+ cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU].data)
+ memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory].data)
return {
ResourceType.CPU: ResourceRecommendation(request=cpu_usage, limit=None),