summaryrefslogtreecommitdiff
path: root/robusta_krr
diff options
context:
space:
mode:
authorPavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com>2023-05-09 12:40:17 +0300
committerGitHub <noreply@github.com>2023-05-09 12:40:17 +0300
commit060a955bd523f508b4c7b3a1caa1b0b6bd5e9652 (patch)
tree57cf6b4985c9590061d85b77fa48ece2fba926bc /robusta_krr
parent3acdf8a4e77abdb265c5dbf21d8a09ee1d26a659 (diff)
parent823bc27454d7658bcc73afd47ee8562d171d9336 (diff)
Merge pull request #12 from robusta-dev/fix-CPU-metric
Improve metrics to be available for more people
Diffstat (limited to 'robusta_krr')
-rw-r--r--robusta_krr/core/integrations/prometheus/__init__.py1
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py (renamed from robusta_krr/core/integrations/prometheus.py)47
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/__init__.py3
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py61
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_metric.py82
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py10
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/memory_metric.py10
-rw-r--r--robusta_krr/core/runner.py2
8 files changed, 174 insertions, 42 deletions
diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py
new file mode 100644
index 0000000..e43e8aa
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/__init__.py
@@ -0,0 +1 @@
+from .loader import CustomPrometheusConnect, PrometheusDiscovery, PrometheusLoader, PrometheusNotFound
diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus/loader.py
index 39c7f00..c0c1961 100644
--- a/robusta_krr/core/integrations/prometheus.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -1,6 +1,4 @@
-import asyncio
import datetime
-from decimal import Decimal
from typing import Optional, no_type_check
import requests
@@ -17,6 +15,8 @@ from robusta_krr.core.models.result import ResourceType
from robusta_krr.utils.configurable import Configurable
from robusta_krr.utils.service_discovery import ServiceDiscovery
+from .metrics import BaseMetricLoader
+
class PrometheusDiscovery(ServiceDiscovery):
def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
@@ -111,45 +111,10 @@ class PrometheusLoader(Configurable):
resource: ResourceType,
period: datetime.timedelta,
*,
- timeframe: datetime.timedelta = datetime.timedelta(minutes=30),
+ step: datetime.timedelta = datetime.timedelta(minutes=30),
) -> ResourceHistoryData:
self.debug(f"Gathering data for {object} and {resource}")
- if resource == ResourceType.CPU:
- result = await asyncio.gather(
- *[
- asyncio.to_thread(
- self.prometheus.custom_query_range,
- query=f'sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})',
- start_time=datetime.datetime.now() - period,
- end_time=datetime.datetime.now(),
- step=f"{int(timeframe.total_seconds()) // 60}m",
- )
- for pod in object.pods
- ]
- )
- elif resource == ResourceType.Memory:
- result = await asyncio.gather(
- *[
- asyncio.to_thread(
- self.prometheus.custom_query_range,
- query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})',
- start_time=datetime.datetime.now() - period,
- end_time=datetime.datetime.now(),
- step=f"{int(timeframe.total_seconds()) // 60}m",
- )
- for pod in object.pods
- ]
- )
- else:
- raise ValueError(f"Unknown resource type: {resource}")
-
- if result == []:
- return {pod: [] for pod in object.pods}
-
- pod_results = {pod: result[i] for i, pod in enumerate(object.pods)}
- return {
- pod: [Decimal(value) for _, value in pod_result[0]["values"]]
- for pod, pod_result in pod_results.items()
- if pod_result != []
- }
+ MetricLoaderType = BaseMetricLoader.get_by_resource(resource)
+ metric_loader = MetricLoaderType(self.config, self.prometheus)
+ return await metric_loader.load_data(object, period, step)
diff --git a/robusta_krr/core/integrations/prometheus/metrics/__init__.py b/robusta_krr/core/integrations/prometheus/metrics/__init__.py
new file mode 100644
index 0000000..0852b67
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/__init__.py
@@ -0,0 +1,3 @@
+from .base_metric import BaseMetricLoader, bind_metric
+from .cpu_metric import CPUMetricLoader
+from .memory_metric import MemoryMetricLoader
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
new file mode 100644
index 0000000..80408c5
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
@@ -0,0 +1,61 @@
+import datetime
+from typing import Any, Optional
+
+from .base_metric import BaseMetricLoader
+
+PrometheusSeries = Any
+
+
+class BaseFilteredMetricLoader(BaseMetricLoader):
+ """
+ This is the version of the BasicMetricLoader, that filters out data,
+ if multiple metrics with the same name were found.
+
+ Searches for the kubelet metric. If not found - returns first one in alphabetical order.
+ """
+
+ @staticmethod
+ def get_target_name(series: PrometheusSeries) -> Optional[str]:
+ for label in ["container", "pod", "node"]:
+ if label in series["metric"]:
+ return series["metric"][label]
+ return None
+
+ @staticmethod
+ def filter_prom_jobs_results(
+ series_list_result: list[PrometheusSeries],
+ ) -> list[PrometheusSeries]:
+ """
+ Because there might be multiple metrics with the same name, we need to filter them out.
+
+ :param series_list_result: list of PrometheusSeries
+ """
+
+ if len(series_list_result) == 1:
+ return series_list_result
+
+ target_names = {
+ BaseFilteredMetricLoader.get_target_name(series)
+ for series in series_list_result
+ if BaseFilteredMetricLoader.get_target_name(series)
+ }
+ return_list: list[PrometheusSeries] = []
+
+ # takes kubelet job if exists, return first job alphabetically if it doesn't
+ for target_name in target_names:
+ relevant_series = [
+ series for series in series_list_result if BaseFilteredMetricLoader.get_target_name(series) == target_name
+ ]
+ relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"]
+ if len(relevant_kubelet_metric) == 1:
+ return_list.append(relevant_kubelet_metric[0])
+ continue
+ sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False)
+ return_list.append(sorted_relevant_series[0])
+ return return_list
+
+ async def query_prometheus(
+ self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta
+ ) -> list[PrometheusSeries]:
+ result = await super().query_prometheus(query, start_time, end_time, step)
+ return self.filter_prom_jobs_results(result)
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
new file mode 100644
index 0000000..407d3e3
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
@@ -0,0 +1,82 @@
+from __future__ import annotations
+
+import abc
+import asyncio
+import datetime
+from decimal import Decimal
+from typing import TYPE_CHECKING, Callable, TypeVar
+
+from robusta_krr.core.abstract.strategies import ResourceHistoryData
+from robusta_krr.core.models.config import Config
+from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.utils.configurable import Configurable
+
+if TYPE_CHECKING:
+ from ..loader import CustomPrometheusConnect
+
+REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {}
+
+
+class BaseMetricLoader(Configurable, abc.ABC):
+ def __init__(self, config: Config, prometheus: CustomPrometheusConnect) -> None:
+ super().__init__(config)
+ self.prometheus = prometheus
+
+ @abc.abstractmethod
+ def get_query(self, namespace: str, pod: str, container: str) -> str:
+ ...
+
+ async def query_prometheus(
+ self, query: str, start_time: datetime.datetime, end_time: datetime.datetime, step: datetime.timedelta
+ ) -> list[dict]:
+ return await asyncio.to_thread(
+ self.prometheus.custom_query_range,
+ query=query,
+ start_time=start_time,
+ end_time=end_time,
+ step=f"{int(step.total_seconds()) // 60}m",
+ )
+
+ async def load_data(
+ self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
+ ) -> ResourceHistoryData:
+ result = await asyncio.gather(
+ *[
+ self.query_prometheus(
+ query=self.get_query(object.namespace, pod, object.container),
+ start_time=datetime.datetime.now() - period,
+ end_time=datetime.datetime.now(),
+ step=step,
+ )
+ for pod in object.pods
+ ]
+ )
+
+ if result == []:
+ self.warning(f"Prometheus returned no {self.__class__.__name__} metrics for {object}")
+ return {pod: [] for pod in object.pods}
+
+ pod_results = {pod: result[i] for i, pod in enumerate(object.pods)}
+ return {
+ pod: [Decimal(value) for _, value in pod_result[0]["values"]]
+ for pod, pod_result in pod_results.items()
+ if pod_result != []
+ }
+
+ @staticmethod
+ def get_by_resource(resource: str) -> type[BaseMetricLoader]:
+ try:
+ return REGISTERED_METRICS[resource]
+ except KeyError as e:
+ raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e
+
+
+Self = TypeVar("Self", bound=BaseMetricLoader)
+
+
+def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]:
+ def decorator(cls: type[Self]) -> type[Self]:
+ REGISTERED_METRICS[resource] = cls
+ return cls
+
+ return decorator
diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
new file mode 100644
index 0000000..ffbaa91
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
@@ -0,0 +1,10 @@
+from robusta_krr.core.models.allocations import ResourceType
+
+from .base_metric import bind_metric
+from .base_filtered_metric import BaseFilteredMetricLoader
+
+
+@bind_metric(ResourceType.CPU)
+class CPUMetricLoader(BaseFilteredMetricLoader):
+ def get_query(self, namespace: str, pod: str, container: str) -> str:
+ return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)'
diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
new file mode 100644
index 0000000..3355ed6
--- /dev/null
+++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
@@ -0,0 +1,10 @@
+from robusta_krr.core.models.allocations import ResourceType
+
+from .base_metric import bind_metric
+from .base_filtered_metric import BaseFilteredMetricLoader
+
+
+@bind_metric(ResourceType.Memory)
+class MemoryMetricLoader(BaseFilteredMetricLoader):
+ def get_query(self, namespace: str, pod: str, container: str) -> str:
+ return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)'
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 00dcbb9..f4d4d94 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -94,7 +94,7 @@ class Runner(Configurable):
object,
resource,
self._strategy.settings.history_timedelta,
- timeframe=self._strategy.settings.timeframe_timedelta,
+ step=self._strategy.settings.timeframe_timedelta,
)
for resource in ResourceType
]