From f87550128ddb48a8c2ff083f182784279fc12118 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Tue, 25 Jul 2023 11:10:30 +0300 Subject: Change the flow of asyncronous behaviour --- robusta_krr/core/integrations/kubernetes.py | 53 ++++++++++++---------- .../metrics_service/prometheus_metrics_service.py | 6 ++- robusta_krr/core/models/objects.py | 2 +- robusta_krr/core/runner.py | 46 +++++++++---------- robusta_krr/utils/configurable.py | 2 +- 5 files changed, 57 insertions(+), 52 deletions(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 7452e66..4d1a5a3 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -1,7 +1,7 @@ import asyncio -import itertools from concurrent.futures import ThreadPoolExecutor -from typing import Optional, Union +from typing import AsyncGenerator, Optional, Union +import aiostream from kubernetes import client, config # type: ignore from kubernetes.client import ApiException @@ -53,7 +53,7 @@ class ClusterLoader(Configurable): self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) - async def list_scannable_objects(self) -> list[K8sObjectData]: + async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]: """List all scannable objects. Returns: @@ -65,30 +65,27 @@ class ClusterLoader(Configurable): try: self.__hpa_list = await self.__list_hpa() - objects_tuple = await asyncio.gather( + tasks = [ self._list_deployments(), self._list_rollouts(), self._list_all_statefulsets(), self._list_all_daemon_set(), self._list_all_jobs(), - ) + ] + + for fut in asyncio.as_completed(tasks): + object_list = await fut + for object in object_list: + if self.config.namespaces == "*" and object.namespace == "kube-system": + continue + elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces: + continue + yield object except Exception as e: self.error(f"Error trying to list pods in cluster {self.cluster}: {e}") self.debug_exception() - return [] - - objects = itertools.chain(*objects_tuple) - if self.config.namespaces == "*": - # NOTE: We are not scanning kube-system namespace by default - result = [obj for obj in objects if obj.namespace != "kube-system"] - else: - result = [obj for obj in objects if obj.namespace in self.config.namespaces] - - namespaces = {obj.namespace for obj in result} - self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces in {self.cluster}") - - return result + return @staticmethod def _get_match_expression_filter(expression) -> str: @@ -389,13 +386,12 @@ class KubernetesLoader(Configurable): self.error(f"Could not load cluster {cluster} and will skip it: {e}") return None - async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncGenerator[K8sObjectData, None]: """List all scannable objects. - Returns: - A list of scannable objects. + Yields: + Each scannable object as it is loaded. """ - if clusters is None: _cluster_loaders = [self._try_create_cluster_loader(None)] else: @@ -404,7 +400,14 @@ class KubernetesLoader(Configurable): cluster_loaders = [cl for cl in _cluster_loaders if cl is not None] if cluster_loaders == []: self.error("Could not load any cluster.") - return [] + return + + # https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python + # This will merge all the streams from all the cluster loaders into a single stream + objects_combined = aiostream.stream.merge( + *[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders] + ) - objects = await asyncio.gather(*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders]) - return list(itertools.chain(*objects)) + async with objects_combined.stream() as streamer: + async for object in streamer: + yield object diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index b380abc..916f48b 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -123,7 +123,7 @@ class PrometheusMetricsService(MetricsService): cluster_label = self.config.prometheus_cluster_label cluster_names = self.get_cluster_names() - if len(cluster_names) <= 1: + if cluster_names is None or len(cluster_names) <= 1: # there is only one cluster of metrics in this prometheus return @@ -153,7 +153,7 @@ class PrometheusMetricsService(MetricsService): """ ResourceHistoryData: The gathered resource history data. """ - self.debug(f"Gathering data for {object} and {LoaderClass}") + self.debug(f"Gathering {LoaderClass.__name__} metric for {object}") metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor) return await metric_loader.load_data(object, period, step) @@ -166,6 +166,8 @@ class PrometheusMetricsService(MetricsService): period (datetime.timedelta): The time period for which to gather data. """ + self.debug(f"Adding historic pods for {object}") + days_literal = min(int(period.total_seconds()) // 60 // 24, 32) period_literal = f"{days_literal}d" pod_owners: list[str] diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 52c4fbd..ede4be8 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -27,7 +27,7 @@ class K8sObjectData(pd.BaseModel): cluster: Optional[str] name: str container: str - pods: list[PodData] + pods: list[PodData] = [] hpa: Optional[HPAData] namespace: str kind: str diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index d3d1f9b..53b7aa5 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -120,7 +120,9 @@ class Runner(Configurable): step=self._strategy.settings.timeframe_timedelta, ) - self.__progressbar.progress() + # self.__progressbar.progress() + + self.debug(f"Calculating recommendations for {object} with {len(metrics)} metrics") # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive # But keep in mind that numpy calcluations will not block the GIL @@ -128,21 +130,17 @@ class Runner(Configurable): result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object) return self._format_result(result) - async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]: - recommendations: list[RunResult] = await asyncio.gather( - *[self._calculate_object_recommendations(object) for object in objects] - ) + async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan: + recommendation = await self._calculate_object_recommendations(k8s_object) - return [ - ( - ResourceAllocations( - requests={resource: recommendation[resource].request for resource in ResourceType}, - limits={resource: recommendation[resource].limit for resource in ResourceType}, - info={resource: recommendation[resource].info for resource in ResourceType}, - ) - ) - for recommendation in recommendations - ] + return ResourceScan.calculate( + k8s_object, + ResourceAllocations( + requests={resource: recommendation[resource].request for resource in ResourceType}, + limits={resource: recommendation[resource].limit for resource in ResourceType}, + info={resource: recommendation[resource].info for resource in ResourceType}, + ), + ) async def _collect_result(self) -> Result: clusters = await self._k8s_loader.list_clusters() @@ -154,9 +152,16 @@ class Runner(Configurable): ) self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') - objects = await self._k8s_loader.list_scannable_objects(clusters) - if len(objects) == 0: + # with ProgressBar(self.config, total=len(objects), title="Calculating Recommendation") as self.__progressbar: + scans_tasks = [ + asyncio.create_task(self._gather_object_allocations(k8s_object)) + async for k8s_object in self._k8s_loader.list_scannable_objects(clusters) + ] + + scans = await asyncio.gather(*scans_tasks) + + if len(scans) == 0: self.warning("Current filters resulted in no objects available to scan.") self.warning("Try to change the filters or check if there is anything available.") if self.config.namespaces == "*": @@ -166,13 +171,8 @@ class Runner(Configurable): strategy=StrategyData(name=str(self._strategy).lower(), settings=self._strategy.settings.dict()), ) - with ProgressBar(self.config, total=len(objects), title="Calculating Recommendation") as self.__progressbar: - resource_recommendations = await self._gather_objects_recommendations(objects) - return Result( - scans=[ - ResourceScan.calculate(obj, recommended) for obj, recommended in zip(objects, resource_recommendations) - ], + scans=scans, description=self._strategy.description, strategy=StrategyData( name=str(self._strategy).lower(), diff --git a/robusta_krr/utils/configurable.py b/robusta_krr/utils/configurable.py index 54e71ff..4fbd804 100644 --- a/robusta_krr/utils/configurable.py +++ b/robusta_krr/utils/configurable.py @@ -65,7 +65,7 @@ class Configurable(abc.ABC): caller = getframeinfo(stack()[1][0]) self.console.print( self.__add_prefix( - message + f"\t\t({caller.filename}:{caller.lineno})", + message, "[bold green][DEBUG][/bold green]", no_prefix=False, ) -- cgit v1.2.3 From b21f25242016957fac7f4a1fbd917846c4e324ab Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Tue, 25 Jul 2023 11:57:30 +0300 Subject: Remove k8s API pod loading --- robusta_krr/core/integrations/kubernetes.py | 131 +++++++++------------ robusta_krr/core/integrations/prometheus/loader.py | 2 +- .../metrics_service/prometheus_metrics_service.py | 13 +- 3 files changed, 64 insertions(+), 82 deletions(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 4d1a5a3..d8f0978 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -63,29 +63,31 @@ class ClusterLoader(Configurable): self.info(f"Listing scannable objects in {self.cluster}") self.debug(f"Namespaces: {self.config.namespaces}") - try: - self.__hpa_list = await self.__list_hpa() - tasks = [ - self._list_deployments(), - self._list_rollouts(), - self._list_all_statefulsets(), - self._list_all_daemon_set(), - self._list_all_jobs(), - ] - - for fut in asyncio.as_completed(tasks): + self.__hpa_list = await self._try_list_hpa() + + tasks = [ + self._list_deployments(), + self._list_rollouts(), + self._list_all_statefulsets(), + self._list_all_daemon_set(), + self._list_all_jobs(), + ] + + for fut in asyncio.as_completed(tasks): + try: object_list = await fut - for object in object_list: - if self.config.namespaces == "*" and object.namespace == "kube-system": - continue - elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces: - continue - yield object - - except Exception as e: - self.error(f"Error trying to list pods in cluster {self.cluster}: {e}") - self.debug_exception() - return + except Exception as e: + self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}") + self.debug_exception() + self.error("Will skip this object type and continue.") + continue + + for object in object_list: + if self.config.namespaces == "*" and object.namespace == "kube-system": + continue + elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces: + continue + yield object @staticmethod def _get_match_expression_filter(expression) -> str: @@ -108,22 +110,12 @@ class ClusterLoader(Configurable): return ",".join(label_filters) - async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1StatefulSet]) -> list[PodData]: - selector = self._build_selector_query(resource.spec.selector) - if selector is None: - return [] - - loop = asyncio.get_running_loop() - ret: V1PodList = await loop.run_in_executor( - self.executor, - lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector), - ) - return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] - - async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container) -> K8sObjectData: + def __build_obj( + self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None + ) -> K8sObjectData: name = item.metadata.name namespace = item.metadata.namespace - kind = item.__class__.__name__[2:] + kind = kind or item.__class__.__name__[2:] return K8sObjectData( cluster=self.cluster, @@ -132,7 +124,6 @@ class ClusterLoader(Configurable): kind=kind, container=container.name, allocations=ResourceAllocations.from_container(container), - pods=await self.__list_pods(item), hpa=self.__hpa_list.get((namespace, kind, name)), ) @@ -148,13 +139,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} deployments in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_rollouts(self) -> list[K8sObjectData]: self.debug(f"Listing ArgoCD rollouts in {self.cluster}") @@ -175,13 +162,9 @@ class ClusterLoader(Configurable): self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_all_statefulsets(self) -> list[K8sObjectData]: self.debug(f"Listing statefulsets in {self.cluster}") @@ -195,13 +178,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_all_daemon_set(self) -> list[K8sObjectData]: self.debug(f"Listing daemonsets in {self.cluster}") @@ -215,13 +194,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_all_jobs(self) -> list[K8sObjectData]: self.debug(f"Listing jobs in {self.cluster}") @@ -235,13 +210,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} jobs in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_pods(self) -> list[K8sObjectData]: """For future use, not supported yet.""" @@ -257,9 +228,7 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} pods in {self.cluster}") - return await asyncio.gather( - *[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers] - ) + return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers] async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() @@ -337,6 +306,18 @@ class ClusterLoader(Configurable): # If V2 API does not exist, fall back to V1 return await self.__list_hpa_v1() + async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: + try: + return await self.__list_hpa() + except Exception as e: + self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}") + self.debug_exception() + self.error( + "Will assume that there are no HPA. " + "Be careful as this may lead to inaccurate results if object actually has HPA." + ) + return {} + class KubernetesLoader(Configurable): async def list_clusters(self) -> Optional[list[str]]: diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 0daf3a6..703b96f 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -97,7 +97,7 @@ class PrometheusMetricsLoader(Configurable): ResourceHistoryData: The gathered resource history data. """ - await self.loader.add_historic_pods(object, period) + await self.loader.load_pods(object, period) return { MetricLoader: await self.loader.gather_data(object, MetricLoader, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 916f48b..bbf92ba 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -158,9 +158,9 @@ class PrometheusMetricsService(MetricsService): metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor) return await metric_loader.load_data(object, period, step) - async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None: + async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None: """ - Finds pods that have been deleted but still have some metrics in Prometheus. + List pods related to the object and add them to the object's pods list. Args: object (K8sObjectData): The Kubernetes object. period (datetime.timedelta): The time period for which to gather data. @@ -200,10 +200,11 @@ class PrometheusMetricsService(MetricsService): f"[{period_literal}]" ) - current_pods = {p.name for p in object.pods} + if related_pods == []: + self.debug(f"No pods found for {object}") + return + last_timestamp: float = max([pod["values"][-1][0] for pod in related_pods]) object.pods += [ - PodData(name=pod["metric"]["pod"], deleted=True) - for pod in related_pods - if pod["metric"]["pod"] not in current_pods + PodData(name=pod["metric"]["pod"], deleted=pod["values"][-1][0] != last_timestamp) for pod in related_pods ] -- cgit v1.2.3 From f4bb8885a2f9724f667712f581e20d66e7e2e307 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Tue, 25 Jul 2023 12:14:23 +0300 Subject: Fix progressbar --- robusta_krr/core/integrations/kubernetes.py | 2 +- robusta_krr/core/runner.py | 16 ++++++++-------- robusta_krr/utils/progress_bar.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index d8f0978..74a3856 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -23,7 +23,7 @@ from kubernetes.client.models import ( V2HorizontalPodAutoscalerList, ) -from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData +from robusta_krr.core.models.objects import HPAData, K8sObjectData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.configurable import Configurable diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 53b7aa5..3d8907f 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -120,8 +120,6 @@ class Runner(Configurable): step=self._strategy.settings.timeframe_timedelta, ) - # self.__progressbar.progress() - self.debug(f"Calculating recommendations for {object} with {len(metrics)} metrics") # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive @@ -133,6 +131,8 @@ class Runner(Configurable): async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan: recommendation = await self._calculate_object_recommendations(k8s_object) + self.__progressbar.progress() + return ResourceScan.calculate( k8s_object, ResourceAllocations( @@ -153,13 +153,13 @@ class Runner(Configurable): self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') - # with ProgressBar(self.config, total=len(objects), title="Calculating Recommendation") as self.__progressbar: - scans_tasks = [ - asyncio.create_task(self._gather_object_allocations(k8s_object)) - async for k8s_object in self._k8s_loader.list_scannable_objects(clusters) - ] + with ProgressBar(self.config, title="Calculating Recommendation") as self.__progressbar: + scans_tasks = [ + asyncio.create_task(self._gather_object_allocations(k8s_object)) + async for k8s_object in self._k8s_loader.list_scannable_objects(clusters) + ] - scans = await asyncio.gather(*scans_tasks) + scans = await asyncio.gather(*scans_tasks) if len(scans) == 0: self.warning("Current filters resulted in no objects available to scan.") diff --git a/robusta_krr/utils/progress_bar.py b/robusta_krr/utils/progress_bar.py index 621dc18..32bf211 100644 --- a/robusta_krr/utils/progress_bar.py +++ b/robusta_krr/utils/progress_bar.py @@ -14,7 +14,7 @@ class ProgressBar(Configurable): def __init__(self, config: Config, **kwargs) -> None: super().__init__(config) - self.show_bar = self.echo_active and False + self.show_bar = self.echo_active if self.show_bar: self.alive_bar = alive_bar(**kwargs) -- cgit v1.2.3 From 3c946f34995b2c98390f9c54f982edab7954646a Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 26 Jul 2023 09:54:01 +0300 Subject: Fix pytest --- requirements.txt | 1 + tests/conftest.py | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/requirements.txt b/requirements.txt index b16a2ff..e05f215 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ about-time==4.2.1 ; python_version >= "3.9" and python_version < "3.12" +aiostream==0.4.5 ; python_version >= "3.9" and python_version < "3.12" alive-progress==3.1.2 ; python_version >= "3.9" and python_version < "3.12" cachetools==5.3.0 ; python_version >= "3.9" and python_version < "3.12" certifi==2022.12.7 ; python_version >= "3.9" and python_version < "3.12" diff --git a/tests/conftest.py b/tests/conftest.py index f39a30b..b6ac6f5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,12 @@ import random from datetime import datetime, timedelta -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, patch, MagicMock import numpy as np import pytest -from robusta_krr.api.models import K8sObjectData, PodData, ResourceAllocations, MetricPodData +from robusta_krr.api.models import K8sObjectData, PodData, ResourceAllocations +from robusta_krr.strategies import SimpleStrategy TEST_OBJECT = K8sObjectData( cluster="mock-cluster", @@ -25,6 +26,15 @@ TEST_OBJECT = K8sObjectData( ) +class AsyncIter: + def __init__(self, items): + self.items = items + + async def __aiter__(self): + for item in self.items: + yield item + + @pytest.fixture(autouse=True, scope="session") def mock_list_clusters(): with patch( @@ -38,7 +48,7 @@ def mock_list_clusters(): def mock_list_scannable_objects(): with patch( "robusta_krr.core.integrations.kubernetes.KubernetesLoader.list_scannable_objects", - new=AsyncMock(return_value=[TEST_OBJECT]), + new=MagicMock(return_value=AsyncIter([TEST_OBJECT])), ): yield @@ -57,9 +67,11 @@ def mock_prometheus_loader(): metric_points_data = np.array([(t, random.randrange(0, 100)) for t in np.linspace(start_ts, now_ts, 3600)]) with patch( - "robusta_krr.core.integrations.prometheus.loader.MetricsLoader.gather_data", + "robusta_krr.core.integrations.prometheus.loader.PrometheusMetricsLoader.gather_data", new=AsyncMock( - return_value={pod.name: metric_points_data for pod in TEST_OBJECT.pods}, + return_value={ + metric: {pod.name: metric_points_data for pod in TEST_OBJECT.pods} for metric in SimpleStrategy.metrics + }, ), ) as mock_prometheus_loader: mock_prometheus_loader @@ -68,5 +80,5 @@ def mock_prometheus_loader(): @pytest.fixture(autouse=True, scope="session") def mock_prometheus_init(): - with patch("robusta_krr.core.integrations.prometheus.loader.MetricsLoader.__init__", return_value=None): + with patch("robusta_krr.core.integrations.prometheus.loader.PrometheusMetricsLoader.__init__", return_value=None): yield -- cgit v1.2.3 From c5b610c47e6a5694c9b58847ca043815bd0fc1b4 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Thu, 27 Jul 2023 11:50:51 +0300 Subject: Optimize getting pods from prometheus --- .../metrics_service/prometheus_metrics_service.py | 40 +++++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index bbf92ba..18e9b2a 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,5 +1,6 @@ import asyncio import datetime +import time from typing import List, Optional from concurrent.futures import ThreadPoolExecutor @@ -185,26 +186,47 @@ class PrometheusMetricsService(MetricsService): ) pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets] pod_owner_kind = "ReplicaSet" + + del replicasets else: pod_owners = [object.name] pod_owner_kind = object.kind owners_regex = "|".join(pod_owners) related_pods = await self.query( - "kube_pod_owner{" - f'owner_name=~"{owners_regex}", ' - f'owner_kind="{pod_owner_kind}", ' - f'namespace="{object.namespace}"' - f"{cluster_label}" - "}" - f"[{period_literal}]" + f""" + last_over_time( + kube_pod_owner{{ + owner_name=~"{owners_regex}", + owner_kind="{pod_owner_kind}", + namespace="{object.namespace}" + {cluster_label} + }}[{period_literal}] + ) + """ ) if related_pods == []: self.debug(f"No pods found for {object}") return - last_timestamp: float = max([pod["values"][-1][0] for pod in related_pods]) + current_pods = await self.query( + f""" + present_over_time( + kube_pod_owner{{ + owner_name=~"{owners_regex}", + owner_kind="{pod_owner_kind}", + namespace="{object.namespace}" + {cluster_label} + }}[1m] + ) + """ + ) + + current_pods_set = {pod["metric"]["pod"] for pod in current_pods} + del current_pods + object.pods += [ - PodData(name=pod["metric"]["pod"], deleted=pod["values"][-1][0] != last_timestamp) for pod in related_pods + PodData(name=pod["metric"]["pod"], deleted=pod["metric"]["pod"] not in current_pods_set) + for pod in related_pods ] -- cgit v1.2.3