summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAvi-Robusta <97387909+Avi-Robusta@users.noreply.github.com>2023-07-04 17:32:43 +0300
committerGitHub <noreply@github.com>2023-07-04 17:32:43 +0300
commit682c2e5f45930f94fc0f2485d62fe536c61417c2 (patch)
treef01d83abd395046719a4a6f97b713fbdd10e7b4f
parent26f9f705d56689b59556c39b3bb1683246a3f44c (diff)
parent14c4ae526a7b8767f94a9dcfd64312dc865113a5 (diff)
Merge pull request #92 from robusta-dev/mem_test_v2
Fixed prometheus query memory issue
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py7
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base_metric.py101
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py8
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/memory_metric.py37
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py8
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py9
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py9
-rw-r--r--robusta_krr/core/runner.py2
8 files changed, 147 insertions, 34 deletions
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
index f6a535e..c7ba3e7 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py
@@ -1,8 +1,7 @@
from typing import Any, Optional
-
from robusta_krr.core.abstract.strategies import Metric
-from .base_metric import BaseMetricLoader
+from .base_metric import BaseMetricLoader, QueryType
PrometheusSeries = Any
@@ -57,6 +56,6 @@ class BaseFilteredMetricLoader(BaseMetricLoader):
return_list.append(sorted_relevant_series[0])
return return_list
- async def query_prometheus(self, metric: Metric) -> list[PrometheusSeries]:
- result = await super().query_prometheus(metric)
+ async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[PrometheusSeries]:
+ result = await super().query_prometheus(metric, query_type)
return self.filter_prom_jobs_results(result)
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
index b3230e8..8bf263c 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py
@@ -5,9 +5,8 @@ import asyncio
from concurrent.futures import ThreadPoolExecutor
import datetime
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
-
+import enum
import numpy as np
-
from robusta_krr.core.abstract.strategies import Metric, ResourceHistoryData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
@@ -16,8 +15,17 @@ from robusta_krr.utils.configurable import Configurable
if TYPE_CHECKING:
from .. import CustomPrometheusConnect
+ MetricsDictionary = dict[str, type[BaseMetricLoader]]
+
+
+class QueryType(str, enum.Enum):
+ Query = "query"
+ QueryRange = "query_range"
+
+
# A registry of metrics that can be used to fetch a corresponding metric loader.
-REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {}
+REGISTERED_METRICS: MetricsDictionary = {}
+STRATEGY_METRICS_OVERRIDES: dict[str, MetricsDictionary] = {}
class BaseMetricLoader(Configurable, abc.ABC):
@@ -50,12 +58,13 @@ class BaseMetricLoader(Configurable, abc.ABC):
return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"'
@abc.abstractmethod
- def get_query(self, object: K8sObjectData) -> str:
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
"""
This method should be implemented by all subclasses to provide a query string to fetch metrics.
Args:
object (K8sObjectData): The object for which metrics need to be fetched.
+ resolution (Optional[str]): a string for configurable resolution to the query.
Returns:
str: The query string.
@@ -63,6 +72,19 @@ class BaseMetricLoader(Configurable, abc.ABC):
pass
+ def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
+ """
+ This method should be implemented by all subclasses to provide a query string in the metadata to produce relevant graphs.
+
+ Args:
+ object (K8sObjectData): The object for which metrics need to be fetched.
+ resolution (Optional[str]): a string for configurable resolution to the query.
+
+ Returns:
+ str: The query string.
+ """
+ return self.get_query(object, resolution)
+
def _step_to_string(self, step: datetime.timedelta) -> str:
"""
Converts step in datetime.timedelta format to a string format used by Prometheus.
@@ -73,10 +95,28 @@ class BaseMetricLoader(Configurable, abc.ABC):
Returns:
str: Step size in string format used by Prometheus.
"""
-
+ if step.total_seconds() > 60 * 60 * 24:
+ return f"{int(step.total_seconds()) // (60 * 60 * 24)}d"
return f"{int(step.total_seconds()) // 60}m"
- async def query_prometheus(self, metric: Metric) -> list[dict]:
+ def query_prometheus_thread(self, metric: Metric, query_type: QueryType) -> list[dict]:
+ if query_type == QueryType.QueryRange:
+ value = self.prometheus.custom_query_range(
+ query=metric.query,
+ start_time=metric.start_time,
+ end_time=metric.end_time,
+ step=metric.step,
+ )
+ return value
+
+ # regular query, lighter on preformance
+ results = self.prometheus.custom_query(query=metric.query)
+ # format the results to return the same format as custom_query_range
+ for result in results:
+ result["values"] = [result.pop("value")]
+ return results
+
+ async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[dict]:
"""
Asynchronous method that queries Prometheus to fetch metrics.
@@ -90,12 +130,7 @@ class BaseMetricLoader(Configurable, abc.ABC):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
- lambda: self.prometheus.custom_query_range(
- query=metric.query,
- start_time=metric.start_time,
- end_time=metric.end_time,
- step=metric.step,
- ),
+ lambda: self.query_prometheus_thread(metric=metric, query_type=query_type),
)
async def load_data(
@@ -112,8 +147,9 @@ class BaseMetricLoader(Configurable, abc.ABC):
Returns:
ResourceHistoryData: An instance of the ResourceHistoryData class representing the loaded metrics.
"""
-
- query = self.get_query(object)
+ resolution = f"{self._step_to_string(period)}:{self._step_to_string(step)}"
+ query = self.get_query(object, resolution)
+ query_type = self.get_query_type()
end_time = datetime.datetime.now().astimezone()
metric = Metric(
query=query,
@@ -121,7 +157,9 @@ class BaseMetricLoader(Configurable, abc.ABC):
end_time=end_time,
step=self._step_to_string(step),
)
- result = await self.query_prometheus(metric)
+ result = await self.query_prometheus(metric=metric, query_type=query_type)
+ # adding the query in the results for a graph
+ metric.query = self.get_graph_query(object, resolution)
if result == []:
self.warning(f"{service_name} returned no {self.__class__.__name__} metrics for {object}")
@@ -135,12 +173,13 @@ class BaseMetricLoader(Configurable, abc.ABC):
)
@staticmethod
- def get_by_resource(resource: str) -> type[BaseMetricLoader]:
+ def get_by_resource(resource: str, strategy: Optional[str]) -> type[BaseMetricLoader]:
"""
Fetches the metric loader corresponding to the specified resource.
Args:
resource (str): The name of the resource.
+ resource (str): The name of the strategy.
Returns:
type[BaseMetricLoader]: The class of the metric loader corresponding to the resource.
@@ -150,6 +189,13 @@ class BaseMetricLoader(Configurable, abc.ABC):
"""
try:
+ lower_strategy = strategy.lower()
+ if (
+ lower_strategy
+ and lower_strategy in STRATEGY_METRICS_OVERRIDES
+ and resource in STRATEGY_METRICS_OVERRIDES[lower_strategy]
+ ):
+ return STRATEGY_METRICS_OVERRIDES[lower_strategy][resource]
return REGISTERED_METRICS[resource]
except KeyError as e:
raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e
@@ -174,3 +220,26 @@ def bind_metric(resource: str) -> Callable[[type[Self]], type[Self]]:
return cls
return decorator
+
+
+# This is a temporary solutions, metric loaders will be moved to strategy in the future
+def override_metric(strategy: str, resource: str) -> Callable[[type[Self]], type[Self]]:
+ """
+ A decorator that overrides the bound metric on a specific strategy.
+
+ Args:
+ strategy (str): The name of the strategy for this metric.
+ resource (str): The name of the resource.
+
+ Returns:
+ Callable[[type[Self]], type[Self]]: The decorator that does the binding.
+ """
+
+ def decorator(cls: type[Self]) -> type[Self]:
+ lower_strategy = strategy.lower()
+ if lower_strategy not in STRATEGY_METRICS_OVERRIDES:
+ STRATEGY_METRICS_OVERRIDES[lower_strategy] = {}
+ STRATEGY_METRICS_OVERRIDES[lower_strategy][resource] = cls
+ return cls
+
+ return decorator
diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
index e2d364b..bf27622 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
@@ -1,13 +1,14 @@
+from typing import Optional
from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData
from .base_filtered_metric import BaseFilteredMetricLoader
-from .base_metric import bind_metric
+from .base_metric import bind_metric, QueryType
@bind_metric(ResourceType.CPU)
class CPUMetricLoader(BaseFilteredMetricLoader):
- def get_query(self, object: K8sObjectData) -> str:
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return (
@@ -18,3 +19,6 @@ class CPUMetricLoader(BaseFilteredMetricLoader):
f"{cluster_label}"
"}[5m])) by (container, pod, job)"
)
+
+ def get_query_type(self) -> QueryType:
+ return QueryType.QueryRange
diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
index 5942ec1..a8a4521 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
@@ -1,13 +1,14 @@
+from typing import Optional
from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData
from .base_filtered_metric import BaseFilteredMetricLoader
-from .base_metric import bind_metric
+from .base_metric import bind_metric, QueryType, override_metric
@bind_metric(ResourceType.Memory)
class MemoryMetricLoader(BaseFilteredMetricLoader):
- def get_query(self, object: K8sObjectData) -> str:
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return (
@@ -16,5 +17,35 @@ class MemoryMetricLoader(BaseFilteredMetricLoader):
f'pod=~"{pods_selector}", '
f'container="{object.container}"'
f"{cluster_label}"
- "}) by (container, pod, job)"
+ "}) by (container, pod, job, id)"
)
+
+ def get_query_type(self) -> QueryType:
+ return QueryType.QueryRange
+
+# This is a temporary solutions, metric loaders will be moved to strategy in the future
+@override_metric("simple", ResourceType.Memory)
+class MemoryMetricLoader(MemoryMetricLoader):
+ """
+ A class that overrides the memory metric on the simple strategy.
+ """
+
+ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
+ pods_selector = "|".join(pod.name for pod in object.pods)
+ cluster_label = self.get_prometheus_cluster_label()
+ resolution_formatted = f"[{resolution}]" if resolution else ""
+ return (
+ f"max(max_over_time(container_memory_working_set_bytes{{"
+ f'namespace="{object.namespace}", '
+ f'pod=~"{pods_selector}", '
+ f'container="{object.container}"'
+ f"{cluster_label}}}"
+ f"{resolution_formatted}"
+ f")) by (container, pod, job, id)"
+ )
+
+ def get_query_type(self) -> QueryType:
+ return QueryType.Query
+
+ def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
+ return super().get_query(object, resolution)
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 896387f..31fe0c0 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -177,9 +177,9 @@ class PrometheusMetricsService(MetricsService):
"""
self.debug(f"Gathering data for {object} and {resource}")
+ MetricLoaderType = BaseMetricLoader.get_by_resource(resource, self.config.strategy)
await self.add_historic_pods(object, period)
-
- MetricLoaderType = BaseMetricLoader.get_by_resource(resource)
+
metric_loader = MetricLoaderType(self.config, self.prometheus, self.executor)
return await metric_loader.load_data(object, period, step, self.name())
@@ -202,7 +202,7 @@ class PrometheusMetricsService(MetricsService):
f'owner_name="{object.name}", '
f'owner_kind="Deployment", '
f'namespace="{object.namespace}"'
- f'{cluster_label}'
+ f"{cluster_label}"
"}"
f"[{period_literal}]"
)
@@ -218,7 +218,7 @@ class PrometheusMetricsService(MetricsService):
f'owner_name=~"{owners_regex}", '
f'owner_kind="{pod_owner_kind}", '
f'namespace="{object.namespace}"'
- f'{cluster_label}'
+ f"{cluster_label}"
"}"
f"[{period_literal}]"
)
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py
index eb11379..42066ae 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py
@@ -2,7 +2,7 @@ from typing import Optional
from kubernetes.client import ApiClient
from requests.exceptions import ConnectionError, HTTPError
-
+from concurrent.futures import ThreadPoolExecutor
from robusta_krr.core.models.config import Config
from robusta_krr.utils.service_discovery import ServiceDiscovery
@@ -48,9 +48,14 @@ class ThanosMetricsService(PrometheusMetricsService):
*,
cluster: Optional[str] = None,
api_client: Optional[ApiClient] = None,
+ executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(
- config=config, cluster=cluster, api_client=api_client, service_discovery=ThanosMetricsDiscovery
+ config=config,
+ cluster=cluster,
+ api_client=api_client,
+ service_discovery=ThanosMetricsDiscovery,
+ executor=executor,
)
def check_connection(self):
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py
index b3779a8..9c15cab 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py
@@ -1,5 +1,5 @@
from typing import Optional
-
+from concurrent.futures import ThreadPoolExecutor
from kubernetes.client import ApiClient
from requests.exceptions import ConnectionError, HTTPError
@@ -47,9 +47,14 @@ class VictoriaMetricsService(PrometheusMetricsService):
*,
cluster: Optional[str] = None,
api_client: Optional[ApiClient] = None,
+ executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(
- config=config, cluster=cluster, api_client=api_client, service_discovery=VictoriaMetricsDiscovery
+ config=config,
+ cluster=cluster,
+ api_client=api_client,
+ service_discovery=VictoriaMetricsDiscovery,
+ executor=executor,
)
def check_connection(self):
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 890e19d..3cdf7b9 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -154,7 +154,7 @@ class Runner(Configurable):
async def _collect_result(self) -> Result:
clusters = await self._k8s_loader.list_clusters()
- if len(clusters) > 1 and self.config.prometheus_url:
+ if clusters and len(clusters) > 1 and self.config.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
# In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect
raise ClusterNotSpecifiedException(