summaryrefslogtreecommitdiff
path: root/robusta_krr
diff options
context:
space:
mode:
authorLeaveMyYard <zhukovpavel2001@gmail.com>2024-04-22 18:30:02 +0300
committerLeaveMyYard <zhukovpavel2001@gmail.com>2024-04-22 18:30:02 +0300
commitc7ad1cde09e79d4821383ceb5dd32fd0471d427d (patch)
tree123d617b423d89b6c0dc4d585f5cd36100c0c368 /robusta_krr
parent4fedd82caa36d062f7a25b38cc9066e9281a3a21 (diff)
Moved the logic from #93 for a new refined structure
Co-authored-by: Megrez Lu <lujiajing1126@gmail.com>
Diffstat (limited to 'robusta_krr')
-rw-r--r--robusta_krr/core/integrations/kubernetes/__init__.py47
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/base.py5
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py9
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py15
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py10
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py31
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py19
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py126
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py50
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py4
-rw-r--r--robusta_krr/core/runner.py70
-rw-r--r--robusta_krr/main.py7
12 files changed, 309 insertions, 84 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py
index 051cc23..2ff4458 100644
--- a/robusta_krr/core/integrations/kubernetes/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/__init__.py
@@ -17,10 +17,12 @@ from kubernetes.client.models import (
V2HorizontalPodAutoscaler,
)
+from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.object_like_dict import ObjectLikeDict
+from prometrix import PrometheusNotFound
from . import config_patch as _
from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader
@@ -31,7 +33,13 @@ AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod,
HPAKey = tuple[str, str, str]
-class ClusterWorkloadsLoader:
+class ClusterConnector:
+ EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)
+
+ def __init__(self) -> None:
+ self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
+ self._metrics_service_loaders_error_logged: set[Exception] = set()
+
async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
@@ -71,15 +79,44 @@ class ClusterWorkloadsLoader:
return [context["name"] for context in contexts]
return [context["name"] for context in contexts if context["name"] in settings.clusters]
+
+ def get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]:
+ if cluster not in self._metrics_service_loaders:
+ try:
+ self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster)
+ except Exception as e:
+ self._metrics_service_loaders[cluster] = e
+
+ result = self._metrics_service_loaders[cluster]
+ if isinstance(result, self.EXPECTED_EXCEPTIONS):
+ if result not in self._metrics_service_loaders_error_logged:
+ self._metrics_service_loaders_error_logged.add(result)
+ logger.error(str(result))
+ return None
+ elif isinstance(result, Exception):
+ raise result
- def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
- WorkloadLoader = KubeAPIWorkloadLoader if settings.workload_loader == "kubeapi" else PrometheusWorkloadLoader
+ return result
+ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
try:
- return WorkloadLoader(cluster=cluster)
+ if settings.workload_loader == "kubeapi":
+ return KubeAPIWorkloadLoader(cluster=cluster)
+ elif settings.workload_loader == "prometheus":
+ cluster_loader = self.get_prometheus_loader(cluster)
+ if cluster_loader is not None:
+ return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader)
+ else:
+ logger.error(
+ f"Could not load Prometheus for cluster {cluster} and will skip it."
+ "Not possible to load workloads through Prometheus without connection to Prometheus."
+ )
+ else:
+ raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented")
except Exception as e:
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
- return None
+
+ return None
async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]:
"""List all scannable objects.
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
index a7bba21..8581c1a 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
@@ -4,11 +4,8 @@ from robusta_krr.core.models.objects import K8sWorkload, PodData
class BaseWorkloadLoader(abc.ABC):
- def __init__(self, cluster: Optional[str] = None) -> None:
- self.cluster = cluster
-
@abc.abstractmethod
- async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]:
+ async def list_workloads(self) -> list[K8sWorkload]:
pass
@abc.abstractmethod
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
index 8586b77..d23f7a4 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
@@ -11,7 +11,6 @@ from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # t
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
-from robusta_krr.utils.object_like_dict import ObjectLikeDict
from ..base import BaseWorkloadLoader
from .loaders import (
@@ -41,8 +40,8 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader):
CronJobLoader,
]
- def __init__(self, cluster: Optional[str] = None):
- super().__init__(cluster)
+ def __init__(self, cluster: Optional[str] = None) -> None:
+ self.cluster = cluster
# This executor will be running requests to Kubernetes API
self.executor = ThreadPoolExecutor(settings.max_workers)
@@ -88,7 +87,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader):
async def list_pods(self, object: K8sWorkload) -> list[PodData]:
return await self._workload_loaders[object.kind].list_pods(object)
- def __build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload:
+ def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload:
name = item.metadata.name
namespace = item.metadata.namespace
kind = kind or item.__class__.__name__[2:]
@@ -160,7 +159,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader):
if asyncio.iscoroutine(containers):
containers = await containers
- result.extend(self.__build_scannable_object(item, container, kind) for container in containers)
+ result.extend(self._build_scannable_object(item, container, kind) for container in containers)
except ApiException as e:
if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]:
if self._kind_available[kind]:
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py
index 0c14e89..a6a552e 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py
@@ -6,21 +6,12 @@ from typing import Any, Iterable, Optional, Union
from kubernetes import client # type: ignore
from kubernetes.client.api_client import ApiClient # type: ignore
-from kubernetes.client.models import ( # type: ignore
- V1Container,
- V1DaemonSet,
- V1Deployment,
- V1Job,
- V1Pod,
- V1PodList,
- V1StatefulSet,
-)
+from kubernetes.client.models import V1Container, V1PodList # type: ignore
from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData
logger = logging.getLogger("krr")
-AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
HPAKey = tuple[str, str, str]
@@ -112,9 +103,7 @@ class BaseKindLoader(abc.ABC):
label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()]
if selector.match_expressions is not None:
- label_filters += [
- cls._get_match_expression_filter(expression) for expression in selector.match_expressions
- ]
+ label_filters += [cls._get_match_expression_filter(expression) for expression in selector.match_expressions]
if label_filters == []:
# NOTE: This might mean that we have DeploymentConfig,
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py
deleted file mode 100644
index d2a9883..0000000
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py
+++ /dev/null
@@ -1,10 +0,0 @@
-import logging
-from .base import BaseWorkloadLoader
-
-
-logger = logging.getLogger("krr")
-
-
-class PrometheusWorkloadLoader(BaseWorkloadLoader):
- # TODO: Implement PrometheusWorkloadLoader
- pass
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
new file mode 100644
index 0000000..b277bc2
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
@@ -0,0 +1,31 @@
+import asyncio
+import itertools
+import logging
+
+
+from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
+from robusta_krr.core.models.config import settings
+from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
+from robusta_krr.core.models.objects import K8sWorkload, PodData
+from ..base import BaseWorkloadLoader
+from .loaders import BaseKindLoader, DeploymentLoader
+
+
+logger = logging.getLogger("krr")
+
+
+class PrometheusWorkloadLoader(BaseWorkloadLoader):
+ workloads: list[type[BaseKindLoader]] = [DeploymentLoader]
+
+ def __init__(self, cluster: str, metric_loader: PrometheusMetricsLoader) -> None:
+ self.cluster = cluster
+ self.metric_service = metric_loader
+ self.loaders = [loader(metric_loader) for loader in self.workloads]
+
+ async def list_workloads(self) -> list[K8sWorkload]:
+ return itertools.chain(await asyncio.gather(*[loader.list_workloads(settings.namespaces, "") for loader in self.loaders]))
+
+ async def list_pods(self, object: K8sWorkload) -> list[PodData]:
+ # This should not be implemented, as implementation will repeat PrometheusMetricsLoader.load_pods
+ # As this method is ment to be a fallback, repeating the same logic will not be beneficial
+ raise NotImplementedError
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py
new file mode 100644
index 0000000..6ca6efd
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py
@@ -0,0 +1,19 @@
+from .base import BaseKindLoader
+from .cronjobs import CronJobLoader
+from .daemonsets import DaemonSetLoader
+from .deploymentconfigs import DeploymentConfigLoader
+from .deployments import DeploymentLoader
+from .jobs import JobLoader
+from .rollouts import RolloutLoader
+from .statefulsets import StatefulSetLoader
+
+__all__ = [
+ "BaseKindLoader",
+ "CronJobLoader",
+ "DeploymentLoader",
+ "DaemonSetLoader",
+ "DeploymentConfigLoader",
+ "JobLoader",
+ "RolloutLoader",
+ "StatefulSetLoader",
+] \ No newline at end of file
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
new file mode 100644
index 0000000..a767eb2
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
@@ -0,0 +1,126 @@
+import abc
+import asyncio
+import logging
+from concurrent.futures import ThreadPoolExecutor
+from typing import Any, Iterable, Literal, Optional, Union
+
+from kubernetes import client # type: ignore
+from kubernetes.client.api_client import ApiClient # type: ignore
+from kubernetes.client.models import ( # type: ignore
+ V1Container,
+ V1DaemonSet,
+ V1Deployment,
+ V1Job,
+ V1Pod,
+ V1PodList,
+ V1StatefulSet,
+)
+
+from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
+from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
+from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData
+
+logger = logging.getLogger("krr")
+
+AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
+HPAKey = tuple[str, str, str]
+
+
+class BaseKindLoader(abc.ABC):
+ """
+ This class is used to define how to load a specific kind of Kubernetes object.
+ It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects.
+ """
+
+ kind: KindLiteral
+
+ def __init__(self, metrics_loader: PrometheusMetricsLoader) -> None:
+ self.metrics_loader = metrics_loader
+
+ @abc.abstractmethod
+ def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]:
+ pass
+
+ async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations:
+ limits = await self.metrics_loader.loader.query(
+ "avg by(resource) (kube_pod_container_resource_limits{"
+ f'namespace="{namespace}", '
+ f'pod=~"{pod_selector}", '
+ f'container="{container_name}"'
+ "})"
+ )
+ requests = await self.metrics_loader.loader.query(
+ "avg by(resource) (kube_pod_container_resource_requests{"
+ f'namespace="{namespace}", '
+ f'pod=~"{pod_selector}", '
+ f'container="{container_name}"'
+ "})"
+ )
+ requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
+ limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
+ for limit in limits:
+ if limit["metric"]["resource"] == ResourceType.CPU:
+ limits_values[ResourceType.CPU] = float(limit["value"][1])
+ elif limit["metric"]["resource"] == ResourceType.Memory:
+ limits_values[ResourceType.Memory] = float(limit["value"][1])
+
+ for request in requests:
+ if request["metric"]["resource"] == ResourceType.CPU:
+ requests_values[ResourceType.CPU] = float(request["value"][1])
+ elif request["metric"]["resource"] == ResourceType.Memory:
+ requests_values[ResourceType.Memory] = float(request["value"][1])
+ return ResourceAllocations(requests=requests_values, limits=limits_values)
+
+ async def __build_from_owner(
+ self, namespace: str, app_name: str, containers: list[str], pod_names: list[str]
+ ) -> list[K8sWorkload]:
+ return [
+ K8sWorkload(
+ cluster=None,
+ namespace=namespace,
+ name=app_name,
+ kind="Deployment",
+ container=container_name,
+ allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find
+ pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
+ )
+ for container_name in containers
+ ]
+
+ async def _list_containers(self, namespace: str, pod_selector: str) -> list[str]:
+ containers = await self.metrics_loader.loader.query(
+ f"""
+ count by (container) (
+ kube_pod_container_info{{
+ namespace="{namespace}",
+ pod=~"{pod_selector}"
+ }}
+ )
+ """
+ )
+ return [container["metric"]["container"] for container in containers]
+
+ async def _list_containers_in_pods(
+ self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str
+ ) -> list[K8sWorkload]:
+ if pod_owner_kind == "ReplicaSet":
+ # owner_name is ReplicaSet names
+ pods = await self.metrics_loader.loader.query(
+ f"""
+ count by (owner_name, replicaset, pod) (
+ kube_pod_owner{{
+ namespace="{namespace}",
+ owner_name=~"{owner_name}", '
+ owner_kind="ReplicaSet"
+ }}
+ )
+ """
+ )
+ if pods is None or len(pods) == 0:
+ return [] # no container
+ # [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']},
+ # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}]
+ pod_names = [pod["metric"]["pod"] for pod in pods]
+ container_names = await self._list_containers(namespace, "|".join(pod_names))
+ return await self.__build_from_owner(namespace, app_name, container_names, pod_names)
+ return []
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py
new file mode 100644
index 0000000..c895d21
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py
@@ -0,0 +1,50 @@
+import logging
+from collections import defaultdict
+import itertools
+import asyncio
+from typing import Literal, Optional, Union
+
+from robusta_krr.core.models.objects import K8sWorkload
+
+from .base import BaseKindLoader
+
+logger = logging.getLogger("krr")
+
+
+class DeploymentLoader(BaseKindLoader):
+ kind = "Deployment"
+
+ async def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]:
+ logger.debug(
+ f"Listing deployments in namespace({namespaces})"
+ )
+ ns = "|".join(namespaces)
+ replicasets = await self.metrics_loader.loader.query(
+ f"""
+ count by (namespace, owner_name, replicaset) (
+ kube_replicaset_owner{{
+ namespace=~"{ns}",
+ owner_kind="Deployment",
+ }}
+ )
+ """
+ )
+ # groupBy: 'ns/owner_name' => [{metadata}...]
+ pod_owner_kind = "ReplicaSet"
+ replicaset_dict = defaultdict(list)
+ for replicaset in replicasets:
+ replicaset_dict[replicaset["metric"]["namespace"] + "/" + replicaset["metric"]["owner_name"]].append(
+ replicaset["metric"]
+ )
+ objects = await asyncio.gather(
+ *[
+ self._list_containers_in_pods(
+ replicas[0]["owner_name"],
+ pod_owner_kind,
+ replicas[0]["namespace"],
+ "|".join(list(map(lambda metric: metric["replicaset"], replicas))),
+ )
+ for replicas in replicaset_dict.values()
+ ]
+ )
+ return list(itertools.chain(*objects))
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index c449bca..eb734ef 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -23,6 +23,7 @@ if TYPE_CHECKING:
logger = logging.getLogger("krr")
+
class PrometheusMetricsLoader:
def __init__(self, *, cluster: Optional[str] = None) -> None:
"""
@@ -39,7 +40,8 @@ class PrometheusMetricsLoader:
raise PrometheusNotFound(
f"Wasn't able to connect to any Prometheus service in {cluster or 'inner'} cluster\n"
"Try using port-forwarding and/or setting the url manually (using the -p flag.).\n"
- "For more information, see 'Giving the Explicit Prometheus URL' at https://github.com/robusta-dev/krr?tab=readme-ov-file#usage"
+ "For more information, see 'Giving the Explicit Prometheus URL' at "
+ "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage"
)
self.loader = loader
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index de3b3c7..528fd89 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -2,7 +2,6 @@ import asyncio
import logging
import math
import os
-import sys
import warnings
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union
@@ -12,7 +11,7 @@ from rich.console import Console
from slack_sdk import WebClient
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
-from robusta_krr.core.integrations.kubernetes import ClusterWorkloadsLoader
+from robusta_krr.core.integrations.kubernetes import ClusterConnector, KubeAPIWorkloadLoader
from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sWorkload
@@ -21,6 +20,7 @@ from robusta_krr.utils.intro import load_intro_message
from robusta_krr.utils.progress_bar import ProgressBar
from robusta_krr.utils.version import get_version, load_latest_version
+
logger = logging.getLogger("krr")
@@ -37,36 +37,14 @@ class CriticalRunnerException(Exception): ...
class Runner:
- EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)
-
def __init__(self) -> None:
- self._k8s_loader = ClusterWorkloadsLoader()
- self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
- self._metrics_service_loaders_error_logged: set[Exception] = set()
- self._strategy = settings.create_strategy()
+ self.connector = ClusterConnector()
+ self.strategy = settings.create_strategy()
self.errors: list[dict] = []
# This executor will be running calculations for recommendations
- self._executor = ThreadPoolExecutor(settings.max_workers)
-
- def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]:
- if cluster not in self._metrics_service_loaders:
- try:
- self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster)
- except Exception as e:
- self._metrics_service_loaders[cluster] = e
-
- result = self._metrics_service_loaders[cluster]
- if isinstance(result, self.EXPECTED_EXCEPTIONS):
- if result not in self._metrics_service_loaders_error_logged:
- self._metrics_service_loaders_error_logged.add(result)
- logger.error(str(result))
- return None
- elif isinstance(result, Exception):
- raise result
-
- return result
+ self.executor = ThreadPoolExecutor(settings.max_workers)
@staticmethod
def __parse_version_string(version: str) -> tuple[int, ...]:
@@ -93,7 +71,7 @@ class Runner:
custom_print(intro_message)
custom_print(f"\nRunning Robusta's KRR (Kubernetes Resource Recommender) {current_version}")
- custom_print(f"Using strategy: {self._strategy}")
+ custom_print(f"Using strategy: {self.strategy}")
custom_print(f"Using formatter: {settings.format}")
if latest_version is not None and self.__check_newer_version_available(current_version, latest_version):
custom_print(f"[yellow bold]A newer version of KRR is available: {latest_version}[/yellow bold]")
@@ -171,10 +149,10 @@ class Runner:
if prometheus_loader is None:
return None
- object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta)
- if object.pods == []:
+ object.pods = await prometheus_loader.load_pods(object, self.strategy.settings.history_timedelta)
+ if object.pods == [] and isinstance(self.connector, KubeAPIWorkloadLoader):
# Fallback to Kubernetes API
- object.pods = await self._k8s_loader.load_pods(object)
+ object.pods = await self.connector.load_pods(object)
# NOTE: Kubernetes API returned pods, but Prometheus did not
# This might happen with fast executing jobs
@@ -187,21 +165,21 @@ class Runner:
metrics = await prometheus_loader.gather_data(
object,
- self._strategy,
- self._strategy.settings.history_timedelta,
- step=self._strategy.settings.timeframe_timedelta,
+ self.strategy,
+ self.strategy.settings.history_timedelta,
+ step=self.strategy.settings.timeframe_timedelta,
)
# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object)
+ result = await loop.run_in_executor(self.executor, self.strategy.run, metrics, object)
logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)")
return self._format_result(result)
async def _check_data_availability(self, cluster: Optional[str]) -> None:
- prometheus_loader = self._get_prometheus_loader(cluster)
+ prometheus_loader = self.connector.get_prometheus_loader(cluster)
if prometheus_loader is None:
return
@@ -219,11 +197,11 @@ class Runner:
return
logger.debug(f"History range for {cluster}: {history_range}")
- enough_data = self._strategy.settings.history_range_enough(history_range)
+ enough_data = self.strategy.settings.history_range_enough(history_range)
if not enough_data:
logger.warning(f"Not enough history available for cluster {cluster}.")
- try_after = history_range[0] + self._strategy.settings.history_timedelta
+ try_after = history_range[0] + self.strategy.settings.history_timedelta
logger.warning(
"If the cluster is freshly installed, it might take some time for the enough data to be available."
@@ -257,7 +235,7 @@ class Runner:
)
async def _collect_result(self) -> Result:
- clusters = await self._k8s_loader.list_clusters()
+ clusters = await self.connector.list_clusters()
if clusters and len(clusters) > 1 and settings.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
# In this scenario we dont yet support determining
@@ -275,7 +253,7 @@ class Runner:
await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters])
with ProgressBar(title="Calculating Recommendation") as self.__progressbar:
- workloads = await self._k8s_loader.list_workloads(clusters)
+ workloads = await self.connector.list_workloads(clusters)
scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads])
successful_scans = [scan for scan in scans if scan is not None]
@@ -293,10 +271,10 @@ class Runner:
return Result(
scans=scans,
- description=self._strategy.description,
+ description=self.strategy.description,
strategy=StrategyData(
- name=str(self._strategy).lower(),
- settings=self._strategy.settings.dict(),
+ name=str(self.strategy).lower(),
+ settings=self.strategy.settings.dict(),
),
)
@@ -313,14 +291,14 @@ class Runner:
try:
# eks has a lower step limit than other types of prometheus, it will throw an error
- step_count = self._strategy.settings.history_duration * 60 / self._strategy.settings.timeframe_duration
+ step_count = self.strategy.settings.history_duration * 60 / self.strategy.settings.timeframe_duration
if settings.eks_managed_prom and step_count > 11000:
- min_step = self._strategy.settings.history_duration * 60 / 10000
+ min_step = self.strategy.settings.history_duration * 60 / 10000
logger.warning(
f"The timeframe duration provided is insufficient and will be overridden with {min_step}. "
f"Kindly adjust --timeframe_duration to a value equal to or greater than {min_step}."
)
- self._strategy.settings.timeframe_duration = min_step
+ self.strategy.settings.timeframe_duration = min_step
result = await self._collect_result()
logger.info("Result collected, displaying...")
diff --git a/robusta_krr/main.py b/robusta_krr/main.py
index 5c2d01a..810ccd1 100644
--- a/robusta_krr/main.py
+++ b/robusta_krr/main.py
@@ -55,6 +55,12 @@ def load_commands() -> None:
help="Path to kubeconfig file. If not provided, will attempt to find it.",
rich_help_panel="Kubernetes Settings",
),
+ workload_loader: str = typer.Option(
+ "kubeapi",
+ "--workload",
+ help="Workload loader to use (kubeapi, prometheus).",
+ rich_help_panel="Kubernetes Settings",
+ ),
impersonate_user: Optional[str] = typer.Option(
None,
"--as",
@@ -250,6 +256,7 @@ def load_commands() -> None:
try:
config = Config(
kubeconfig=kubeconfig,
+ workload_loader=workload_loader,
impersonate_user=impersonate_user,
impersonate_group=impersonate_group,
clusters="*" if all_clusters else clusters,