summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com>2023-08-02 14:49:58 +0300
committerGitHub <noreply@github.com>2023-08-02 14:49:58 +0300
commita46b32fbf9d9b3cf4755b2881f0bf886962dd78a (patch)
treebd007e415649c3cecde6b77b1e60c712e84483e3
parentd202105fdb60b20b37c34289566967a214c0e0ca (diff)
parent5debaa6dccd731c4df49c63062d1e371be51f1ad (diff)
Merge pull request #118 from robusta-dev/refactoring-and-optimization
Optimization on memory usage
-rw-r--r--requirements.txt1
-rw-r--r--robusta_krr/core/integrations/kubernetes.py164
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py2
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py53
-rw-r--r--robusta_krr/core/models/objects.py2
-rw-r--r--robusta_krr/core/runner.py46
-rw-r--r--robusta_krr/utils/configurable.py2
-rw-r--r--tests/conftest.py13
8 files changed, 151 insertions, 132 deletions
diff --git a/requirements.txt b/requirements.txt
index 9c074a3..907cfed 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,5 @@
about-time==4.2.1 ; python_version >= "3.9" and python_version < "3.12"
+aiostream==0.4.5 ; python_version >= "3.9" and python_version < "3.12"
alive-progress==3.1.2 ; python_version >= "3.9" and python_version < "3.12"
cachetools==5.3.0 ; python_version >= "3.9" and python_version < "3.12"
certifi==2022.12.7 ; python_version >= "3.9" and python_version < "3.12"
diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py
index 7452e66..74a3856 100644
--- a/robusta_krr/core/integrations/kubernetes.py
+++ b/robusta_krr/core/integrations/kubernetes.py
@@ -1,7 +1,7 @@
import asyncio
-import itertools
from concurrent.futures import ThreadPoolExecutor
-from typing import Optional, Union
+from typing import AsyncGenerator, Optional, Union
+import aiostream
from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
@@ -23,7 +23,7 @@ from kubernetes.client.models import (
V2HorizontalPodAutoscalerList,
)
-from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData
+from robusta_krr.core.models.objects import HPAData, K8sObjectData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable
@@ -53,7 +53,7 @@ class ClusterLoader(Configurable):
self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)
- async def list_scannable_objects(self) -> list[K8sObjectData]:
+ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.
Returns:
@@ -63,32 +63,31 @@ class ClusterLoader(Configurable):
self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")
- try:
- self.__hpa_list = await self.__list_hpa()
- objects_tuple = await asyncio.gather(
- self._list_deployments(),
- self._list_rollouts(),
- self._list_all_statefulsets(),
- self._list_all_daemon_set(),
- self._list_all_jobs(),
- )
-
- except Exception as e:
- self.error(f"Error trying to list pods in cluster {self.cluster}: {e}")
- self.debug_exception()
- return []
-
- objects = itertools.chain(*objects_tuple)
- if self.config.namespaces == "*":
- # NOTE: We are not scanning kube-system namespace by default
- result = [obj for obj in objects if obj.namespace != "kube-system"]
- else:
- result = [obj for obj in objects if obj.namespace in self.config.namespaces]
-
- namespaces = {obj.namespace for obj in result}
- self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces in {self.cluster}")
-
- return result
+ self.__hpa_list = await self._try_list_hpa()
+
+ tasks = [
+ self._list_deployments(),
+ self._list_rollouts(),
+ self._list_all_statefulsets(),
+ self._list_all_daemon_set(),
+ self._list_all_jobs(),
+ ]
+
+ for fut in asyncio.as_completed(tasks):
+ try:
+ object_list = await fut
+ except Exception as e:
+ self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}")
+ self.debug_exception()
+ self.error("Will skip this object type and continue.")
+ continue
+
+ for object in object_list:
+ if self.config.namespaces == "*" and object.namespace == "kube-system":
+ continue
+ elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
+ continue
+ yield object
@staticmethod
def _get_match_expression_filter(expression) -> str:
@@ -111,22 +110,12 @@ class ClusterLoader(Configurable):
return ",".join(label_filters)
- async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1StatefulSet]) -> list[PodData]:
- selector = self._build_selector_query(resource.spec.selector)
- if selector is None:
- return []
-
- loop = asyncio.get_running_loop()
- ret: V1PodList = await loop.run_in_executor(
- self.executor,
- lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector),
- )
- return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
-
- async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container) -> K8sObjectData:
+ def __build_obj(
+ self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None
+ ) -> K8sObjectData:
name = item.metadata.name
namespace = item.metadata.namespace
- kind = item.__class__.__name__[2:]
+ kind = kind or item.__class__.__name__[2:]
return K8sObjectData(
cluster=self.cluster,
@@ -135,7 +124,6 @@ class ClusterLoader(Configurable):
kind=kind,
container=container.name,
allocations=ResourceAllocations.from_container(container),
- pods=await self.__list_pods(item),
hpa=self.__hpa_list.get((namespace, kind, name)),
)
@@ -151,13 +139,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_rollouts(self) -> list[K8sObjectData]:
self.debug(f"Listing ArgoCD rollouts in {self.cluster}")
@@ -178,13 +162,9 @@ class ClusterLoader(Configurable):
self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
@@ -198,13 +178,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
@@ -218,13 +194,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
@@ -238,13 +210,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""
@@ -260,9 +228,7 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")
- return await asyncio.gather(
- *[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
- )
+ return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
@@ -340,6 +306,18 @@ class ClusterLoader(Configurable):
# If V2 API does not exist, fall back to V1
return await self.__list_hpa_v1()
+ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
+ try:
+ return await self.__list_hpa()
+ except Exception as e:
+ self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}")
+ self.debug_exception()
+ self.error(
+ "Will assume that there are no HPA. "
+ "Be careful as this may lead to inaccurate results if object actually has HPA."
+ )
+ return {}
+
class KubernetesLoader(Configurable):
async def list_clusters(self) -> Optional[list[str]]:
@@ -389,13 +367,12 @@ class KubernetesLoader(Configurable):
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None
- async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
+ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncGenerator[K8sObjectData, None]:
"""List all scannable objects.
- Returns:
- A list of scannable objects.
+ Yields:
+ Each scannable object as it is loaded.
"""
-
if clusters is None:
_cluster_loaders = [self._try_create_cluster_loader(None)]
else:
@@ -404,7 +381,14 @@ class KubernetesLoader(Configurable):
cluster_loaders = [cl for cl in _cluster_loaders if cl is not None]
if cluster_loaders == []:
self.error("Could not load any cluster.")
- return []
+ return
+
+ # https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
+ # This will merge all the streams from all the cluster loaders into a single stream
+ objects_combined = aiostream.stream.merge(
+ *[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders]
+ )
- objects = await asyncio.gather(*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders])
- return list(itertools.chain(*objects))
+ async with objects_combined.stream() as streamer:
+ async for object in streamer:
+ yield object
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index a49c01c..ca0d6f1 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -97,7 +97,7 @@ class PrometheusMetricsLoader(Configurable):
ResourceHistoryData: The gathered resource history data.
"""
- await self.loader.add_historic_pods(object, period)
+ await self.loader.load_pods(object, period)
return {
MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step)
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 02d3d6d..b2cc135 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -1,5 +1,6 @@
import asyncio
import datetime
+import time
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor
@@ -123,7 +124,7 @@ class PrometheusMetricsService(MetricsService):
cluster_label = self.config.prometheus_cluster_label
cluster_names = self.get_cluster_names()
- if len(cluster_names) <= 1:
+ if cluster_names is None or len(cluster_names) <= 1:
# there is only one cluster of metrics in this prometheus
return
@@ -153,19 +154,21 @@ class PrometheusMetricsService(MetricsService):
"""
ResourceHistoryData: The gathered resource history data.
"""
- self.debug(f"Gathering data for {object} and {LoaderClass}")
+ self.debug(f"Gathering {LoaderClass.__name__} metric for {object}")
metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
return await metric_loader.load_data(object, period, step)
- async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
+ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
- Finds pods that have been deleted but still have some metrics in Prometheus.
+ List pods related to the object and add them to the object's pods list.
Args:
object (K8sObjectData): The Kubernetes object.
period (datetime.timedelta): The time period for which to gather data.
"""
+ self.debug(f"Adding historic pods for {object}")
+
days_literal = min(int(period.total_seconds()) // 60 // 24, 32)
period_literal = f"{days_literal}d"
pod_owners: list[str]
@@ -183,25 +186,47 @@ class PrometheusMetricsService(MetricsService):
)
pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets]
pod_owner_kind = "ReplicaSet"
+
+ del replicasets
else:
pod_owners = [object.name]
pod_owner_kind = object.kind
owners_regex = "|".join(pod_owners)
related_pods = await self.query(
- "kube_pod_owner{"
- f'owner_name=~"{owners_regex}", '
- f'owner_kind="{pod_owner_kind}", '
- f'namespace="{object.namespace}"'
- f"{cluster_label}"
- "}"
- f"[{period_literal}]"
+ f"""
+ last_over_time(
+ kube_pod_owner{{
+ owner_name=~"{owners_regex}",
+ owner_kind="{pod_owner_kind}",
+ namespace="{object.namespace}"
+ {cluster_label}
+ }}[{period_literal}]
+ )
+ """
+ )
+
+ if related_pods == []:
+ self.debug(f"No pods found for {object}")
+ return
+
+ current_pods = await self.query(
+ f"""
+ present_over_time(
+ kube_pod_owner{{
+ owner_name=~"{owners_regex}",
+ owner_kind="{pod_owner_kind}",
+ namespace="{object.namespace}"
+ {cluster_label}
+ }}[1m]
+ )
+ """
)
- current_pods = {p.name for p in object.pods}
+ current_pods_set = {pod["metric"]["pod"] for pod in current_pods}
+ del current_pods
object.pods += [
- PodData(name=pod["metric"]["pod"], deleted=True)
+ PodData(name=pod["metric"]["pod"], deleted=pod["metric"]["pod"] not in current_pods_set)
for pod in related_pods
- if pod["metric"]["pod"] not in current_pods
]
diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py
index 52c4fbd..ede4be8 100644
--- a/robusta_krr/core/models/objects.py
+++ b/robusta_krr/core/models/objects.py
@@ -27,7 +27,7 @@ class K8sObjectData(pd.BaseModel):
cluster: Optional[str]
name: str
container: str
- pods: list[PodData]
+ pods: list[PodData] = []
hpa: Optional[HPAData]
namespace: str
kind: str
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 78bc636..26ac3b1 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -143,7 +143,7 @@ class Runner(Configurable):
step=self._strategy.settings.timeframe_timedelta,
)
- self.__progressbar.progress()
+ self.debug(f"Calculating recommendations for {object} with {len(metrics)} metrics")
# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
@@ -151,21 +151,19 @@ class Runner(Configurable):
result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object)
return self._format_result(result)
- async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]:
- recommendations: list[RunResult] = await asyncio.gather(
- *[self._calculate_object_recommendations(object) for object in objects]
- )
+ async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan:
+ recommendation = await self._calculate_object_recommendations(k8s_object)
- return [
- (
- ResourceAllocations(
- requests={resource: recommendation[resource].request for resource in ResourceType},
- limits={resource: recommendation[resource].limit for resource in ResourceType},
- info={resource: recommendation[resource].info for resource in ResourceType},
- )
- )
- for recommendation in recommendations
- ]
+ self.__progressbar.progress()
+
+ return ResourceScan.calculate(
+ k8s_object,
+ ResourceAllocations(
+ requests={resource: recommendation[resource].request for resource in ResourceType},
+ limits={resource: recommendation[resource].limit for resource in ResourceType},
+ info={resource: recommendation[resource].info for resource in ResourceType},
+ ),
+ )
async def _collect_result(self) -> Result:
clusters = await self._k8s_loader.list_clusters()
@@ -177,9 +175,16 @@ class Runner(Configurable):
)
self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}')
- objects = await self._k8s_loader.list_scannable_objects(clusters)
- if len(objects) == 0:
+ with ProgressBar(self.config, title="Calculating Recommendation") as self.__progressbar:
+ scans_tasks = [
+ asyncio.create_task(self._gather_object_allocations(k8s_object))
+ async for k8s_object in self._k8s_loader.list_scannable_objects(clusters)
+ ]
+
+ scans = await asyncio.gather(*scans_tasks)
+
+ if len(scans) == 0:
self.warning("Current filters resulted in no objects available to scan.")
self.warning("Try to change the filters or check if there is anything available.")
if self.config.namespaces == "*":
@@ -189,13 +194,8 @@ class Runner(Configurable):
strategy=StrategyData(name=str(self._strategy).lower(), settings=self._strategy.settings.dict()),
)
- with ProgressBar(self.config, total=len(objects), title="Calculating Recommendation") as self.__progressbar:
- resource_recommendations = await self._gather_objects_recommendations(objects)
-
return Result(
- scans=[
- ResourceScan.calculate(obj, recommended) for obj, recommended in zip(objects, resource_recommendations)
- ],
+ scans=scans,
description=self._strategy.description,
strategy=StrategyData(
name=str(self._strategy).lower(),
diff --git a/robusta_krr/utils/configurable.py b/robusta_krr/utils/configurable.py
index 54e71ff..4fbd804 100644
--- a/robusta_krr/utils/configurable.py
+++ b/robusta_krr/utils/configurable.py
@@ -65,7 +65,7 @@ class Configurable(abc.ABC):
caller = getframeinfo(stack()[1][0])
self.console.print(
self.__add_prefix(
- message + f"\t\t({caller.filename}:{caller.lineno})",
+ message,
"[bold green][DEBUG][/bold green]",
no_prefix=False,
)
diff --git a/tests/conftest.py b/tests/conftest.py
index 6bb1b1c..7b570a7 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,6 +1,6 @@
import random
from datetime import datetime, timedelta
-from unittest.mock import AsyncMock, patch
+from unittest.mock import AsyncMock, patch, MagicMock
import numpy as np
import pytest
@@ -26,6 +26,15 @@ TEST_OBJECT = K8sObjectData(
)
+class AsyncIter:
+ def __init__(self, items):
+ self.items = items
+
+ async def __aiter__(self):
+ for item in self.items:
+ yield item
+
+
@pytest.fixture(autouse=True, scope="session")
def mock_list_clusters():
with patch(
@@ -39,7 +48,7 @@ def mock_list_clusters():
def mock_list_scannable_objects():
with patch(
"robusta_krr.core.integrations.kubernetes.KubernetesLoader.list_scannable_objects",
- new=AsyncMock(return_value=[TEST_OBJECT]),
+ new=MagicMock(return_value=AsyncIter([TEST_OBJECT])),
):
yield