diff options
| author | LeaveMyYard <zhukovpavel2001@gmail.com> | 2024-04-30 18:14:44 +0300 |
|---|---|---|
| committer | LeaveMyYard <zhukovpavel2001@gmail.com> | 2024-04-30 18:14:44 +0300 |
| commit | 4c1f5c9735a8b35df515920bb337ba9962e7ac5d (patch) | |
| tree | 444dc4de5b4a1fb87f9f031559619c48ba6f844e | |
| parent | 7e8f1f42f34f5c4ff2824ced92dc37f9afb2ba10 (diff) | |
Finished structure changes and workload loaders
| -rw-r--r-- | robusta_krr/core/abstract/cluster_loader.py | 46 | ||||
| -rw-r--r-- | robusta_krr/core/abstract/workload_loader.py | 23 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/__init__.py | 101 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py) | 34 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py | 13 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/workload_loader/base.py | 57 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py) | 48 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py (renamed from robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py) | 0 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/connector.py | 96 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py | 1 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py | 43 | ||||
| -rw-r--r-- | robusta_krr/core/models/config.py | 27 | ||||
| -rw-r--r-- | robusta_krr/core/models/exceptions.py | 2 | ||||
| -rw-r--r-- | robusta_krr/core/runner.py | 83 | ||||
| -rw-r--r-- | robusta_krr/main.py | 16 |
27 files changed, 290 insertions, 300 deletions
diff --git a/robusta_krr/core/abstract/cluster_loader.py b/robusta_krr/core/abstract/cluster_loader.py new file mode 100644 index 0000000..86767d1 --- /dev/null +++ b/robusta_krr/core/abstract/cluster_loader.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import abc +import logging +from typing import Optional, TYPE_CHECKING + +from .workload_loader import BaseWorkloadLoader + +if TYPE_CHECKING: + from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector + + +logger = logging.getLogger("krr") + + +class BaseClusterLoader(abc.ABC): + """ + A class that wraps loading data from multiple clusters. + For example, a centralized prometheus server that can query multiple clusters. + Or one kubeconfig can define connections to multiple clusters. + """ + + @abc.abstractmethod + async def list_clusters(self) -> Optional[list[str]]: + pass + + @abc.abstractmethod + def get_workload_loader(self, cluster: Optional[str]) -> BaseWorkloadLoader: + pass + + def try_get_workload_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: + try: + return self.get_workload_loader(cluster) + except Exception as e: + logger.error(f"Could not connect to cluster {cluster} and will skip it: {e}") + return None + + @abc.abstractmethod + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + """ + Connect to a Prometheus server and return a PrometheusConnector instance. + Cluster = None means that prometheus is the only one: either centralized or in-cluster. + raise prometrix.PrometheusNotFound if Prometheus is not available. + """ + + pass diff --git a/robusta_krr/core/abstract/workload_loader.py b/robusta_krr/core/abstract/workload_loader.py new file mode 100644 index 0000000..42860ee --- /dev/null +++ b/robusta_krr/core/abstract/workload_loader.py @@ -0,0 +1,23 @@ +import abc +import logging + +from robusta_krr.core.models.objects import K8sWorkload, PodData + + +logger = logging.getLogger("krr") + + +class BaseWorkloadLoader(abc.ABC): + """A base class for single cluster workload loaders.""" + + @abc.abstractmethod + async def list_workloads(self) -> list[K8sWorkload]: + pass + + +class IListPodsFallback(abc.ABC): + """This is an interface that a workload loader can implement to have a fallback method to list pods.""" + + @abc.abstractmethod + async def load_pods(self, object: K8sWorkload) -> list[PodData]: + pass diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py deleted file mode 100644 index 403768c..0000000 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import logging -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Awaitable, Callable, Iterable, Optional, Union - -from kubernetes import client, config # type: ignore -from kubernetes.client import ApiException -from kubernetes.client.models import ( - V1Container, - V1DaemonSet, - V1Deployment, - V1Job, - V1Pod, - V1PodList, - V1StatefulSet, - V2HorizontalPodAutoscaler, -) - -from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData -from robusta_krr.core.models.result import ResourceAllocations -from robusta_krr.utils.object_like_dict import ObjectLikeDict -from prometrix import PrometheusNotFound - -from . import config_patch as _ -from .workload_loader import ( - BaseWorkloadLoader, - PrometheusWorkloadLoader, - BaseClusterLoader, - KubeAPIClusterLoader, - PrometheusClusterLoader, -) - -logger = logging.getLogger("krr") - -AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] -HPAKey = tuple[str, str, str] - - -class ClusterConnector: - EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) - - def __init__(self) -> None: - self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {} - self._connector_errors: set[Exception] = set() - - def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]: - if settings.workload_loader == "kubeapi": - logger.debug(f"Creating Prometheus connector for cluster {cluster}") - elif settings.workload_loader == "prometheus": - logger.debug(f"Creating Prometheus connector") - # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters - # so in case of multiple clusters in one Prometheus (centralized version) - # for each cluster we will have the same PrometheusConnector (keyed by None) - cluster = None - - - - def _create_cluster_loader(self) -> BaseClusterLoader: - try: - - except Exception as e: - logger.error(f"Could not connect to cluster loader and will skip it: {e}") - - return None - - async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]: - """List all scannable objects. - - Returns: - A list of all loaded objects. - """ - - if clusters is None: - _cluster_loaders = [self._try_create_cluster_loader(None)] - else: - _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters] - - self.cluster_loaders: dict[Optional[str], BaseWorkloadLoader] = { - cl.cluster: cl for cl in _cluster_loaders if cl is not None - } - - if self.cluster_loaders == {}: - logger.error("Could not load any cluster.") - return [] - - return [ - object - for cluster_loader in self.cluster_loaders.values() - for object in await cluster_loader.list_workloads() - ] - - async def load_pods(self, object: K8sWorkload) -> list[PodData]: - try: - cluster_loader = self.cluster_loaders[object.cluster] - except KeyError: - raise RuntimeError(f"Cluster loader for cluster {object.cluster} not found") from None - - return await cluster_loader.list_pods(object) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py index 19cd108..ab520bb 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py @@ -9,12 +9,18 @@ from typing import Any, Awaitable, Callable, Optional from kubernetes import client, config # type: ignore from kubernetes.client import ApiException # type: ignore from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore +from functools import cache +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService from robusta_krr.core.models.config import settings +from robusta_krr.core.models.exceptions import CriticalRunnerException from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations -from ..base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader + +from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader, IListPodsFallback +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader from .loaders import ( BaseKindLoader, CronJobLoader, @@ -37,7 +43,16 @@ class KubeAPIClusterLoader(BaseClusterLoader): # Also here we might have different Prometeus instances for different clusters def __init__(self) -> None: + try: + settings.load_kubeconfig() + except Exception as e: + logger.error(f"Could not load kubernetes configuration: {e.__class__.__name__}\n{e}") + logger.error("Try to explicitly set --context and/or --kubeconfig flags.") + logger.error("Alternatively, try a prometheus-only mode with `--mode prometheus`") + raise CriticalRunnerException("Could not load kubernetes configuration") from e + self.api_client = settings.get_kube_client() + self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {} async def list_clusters(self) -> Optional[list[str]]: if settings.inside_cluster: @@ -72,8 +87,19 @@ class KubeAPIClusterLoader(BaseClusterLoader): return [context["name"] for context in contexts if context["name"] in settings.clusters] - async def connect_cluster(self, cluster: str) -> KubeAPIWorkloadLoader: + @cache + def get_workload_loader(self, cluster: Optional[str]) -> KubeAPIWorkloadLoader: return KubeAPIWorkloadLoader(cluster) + + @cache + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + connector = PrometheusConnector(cluster=cluster) + if settings.prometheus_url is not None: + logger.info(f"Connecting to Prometheus using URL: {settings.prometheus_url}") + connector.connect(settings.prometheus_url) + else: + logger.info(f"Trying to discover PromQL service" + (f" for cluster {cluster}" if cluster else "")) + connector.discover(api_client=self.api_client) class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): @@ -131,7 +157,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): if not (settings.namespaces == "*" and object.namespace == "kube-system") ] - async def list_pods(self, object: K8sWorkload) -> list[PodData]: + async def load_pods(self, object: K8sWorkload) -> list[PodData]: return await self._workload_loaders[object.kind].list_pods(object) def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: @@ -320,4 +346,4 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): return {} -__all__ = ["KubeAPIWorkloadLoader"] +__all__ = ["KubeAPIWorkloadLoader", "KubeAPIClusterLoader"] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py index 6ca6efd..6ca6efd 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py index a6a552e..a6a552e 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py index d999c65..d999c65 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py index 1005bed..1005bed 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py index 33b64d9..33b64d9 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py index fc202b5..fc202b5 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py index 87ce5a7..87ce5a7 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py index 0148745..0148745 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py index 069f2c6..069f2c6 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py deleted file mode 100644 index 2cad0cc..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from .base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader -from .kube_api import KubeAPIWorkloadLoader, KubeAPIClusterLoader -from .prometheus import PrometheusWorkloadLoader, PrometheusClusterLoader - -__all__ = [ - "BaseWorkloadLoader", - "IListPodsFallback", - "KubeAPIWorkloadLoader", - "PrometheusWorkloadLoader", - "BaseClusterLoader", - "KubeAPIClusterLoader", - "PrometheusClusterLoader", -] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py deleted file mode 100644 index 316bcbd..0000000 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py +++ /dev/null @@ -1,57 +0,0 @@ -import abc -import logging - -from typing import Optional, Union -from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector -from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService -from robusta_krr.core.models.objects import K8sWorkload, PodData - - -logger = logging.getLogger("krr") - - -class BaseWorkloadLoader(abc.ABC): - """A base class for single cluster workload loaders.""" - - @abc.abstractmethod - async def list_workloads(self) -> list[K8sWorkload]: - pass - - -class IListPodsFallback(abc.ABC): - """This is an interface that a workload loader can implement to have a fallback method to list pods.""" - - @abc.abstractmethod - async def list_pods(self, object: K8sWorkload) -> list[PodData]: - pass - - -class BaseClusterLoader(abc.ABC): - """ - A class that wraps loading data from multiple clusters. - For example, a centralized prometheus server that can query multiple clusters. - Or one kubeconfig can define connections to multiple clusters. - """ - - def __init__(self) -> None: - self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {} - self._connector_errors: set[Exception] = set() - - @abc.abstractmethod - async def list_clusters(self) -> Optional[list[str]]: - pass - - @abc.abstractmethod - async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader: - pass - - def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusMetricsService: - """ - Connect to a Prometheus server and return a PrometheusConnector instance. - Cluster = None means that prometheus is the only one: either centralized or in-cluster. - """ - - if cluster not in self._prometheus_connectors: - self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster) - - return self._prometheus_connectors[cluster] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py index 6796d20..c073ab5 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py @@ -1,16 +1,20 @@ +from __future__ import annotations + import asyncio import itertools import logging from collections import Counter -from pyparsing import Optional - +from typing import Optional +from functools import cache from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sWorkload -from ..base import BaseWorkloadLoader, BaseClusterLoader +from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader +from robusta_krr.core.models.exceptions import CriticalRunnerException from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader @@ -21,20 +25,34 @@ class PrometheusClusterLoader(BaseClusterLoader): # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it def __init__(self) -> None: - super().__init__() - self.prometheus_connector = super().connect_prometheus() - - async def list_clusters(self) -> list[str]: - return [] - - async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader: - return PrometheusWorkloadLoader(cluster, self.prometheus_connector) - - def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusConnector: + self._prometheus_connector = PrometheusConnector() + if not settings.prometheus_url: + raise CriticalRunnerException( + "Prometheus URL is not provided. " + "Can not auto-discover Prometheus with `--mode prometheus`. " + "Please provide the URL with `--prometheus-url` flag." + ) + + self._prometheus_connector.connect(settings.prometheus_url) + + async def list_clusters(self) -> Optional[list[str]]: + if settings.prometheus_cluster_label is None: + return None + + # TODO: We can try to auto-discover clusters by querying Prometheus, + # but for that we will need to rework PrometheusMetric.get_prometheus_cluster_label + return [settings.prometheus_cluster_label] + + @cache + def get_workload_loader(self, cluster: str) -> PrometheusWorkloadLoader: + return PrometheusWorkloadLoader(cluster, self._prometheus_connector) + + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters # so in case of multiple clusters in one Prometheus (centralized version) # for each cluster we will have the same PrometheusConnector (keyed by None) - return self.prometheus_connector + return self._prometheus_connector + class PrometheusWorkloadLoader(BaseWorkloadLoader): workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader] @@ -56,3 +74,5 @@ class PrometheusWorkloadLoader(BaseWorkloadLoader): logger.info(f"Found {count} {kind} in {self.cluster}") return workloads + +__all__ = ["PrometheusClusterLoader", "PrometheusWorkloadLoader"]
\ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py index 18b9a3d..18b9a3d 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py index 9534261..9534261 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py index e757af4..e757af4 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py index 233ebb2..233ebb2 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py diff --git a/robusta_krr/core/integrations/prometheus/connector.py b/robusta_krr/core/integrations/prometheus/connector.py index 399209a..d75050f 100644 --- a/robusta_krr/core/integrations/prometheus/connector.py +++ b/robusta_krr/core/integrations/prometheus/connector.py @@ -5,7 +5,6 @@ import logging from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Optional -from kubernetes import config as k8s_config from kubernetes.client.api_client import ApiClient from kubernetes.client.exceptions import ApiException from prometrix import MetricsNotFound, PrometheusNotFound @@ -35,58 +34,56 @@ class PrometheusConnector: self.executor = ThreadPoolExecutor(settings.max_workers) self.cluster = cluster - self.api_client = settings.get_kube_client(context=cluster) - loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) - if loader is None: - raise PrometheusNotFound( - f"Wasn't able to connect to any Prometheus service in {cluster or 'inner'} cluster\n" - "Try using port-forwarding and/or setting the url manually (using the -p flag.).\n" - "For more information, see 'Giving the Explicit Prometheus URL' at " - "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" - ) - - self.loader = loader - - logger.info(f"{self.loader.name()} connected successfully for {cluster or 'inner'} cluster") - - @classmethod - def discover(cls, api_client: ApiClient) -> PrometheusConnector: - return cls() - @classmethod - def connect(cls, cluster: str) -> PrometheusConnector: - return cls(cluster=cluster) - - def get_metrics_service( - self, - api_client: Optional[ApiClient] = None, - cluster: Optional[str] = None, - ) -> Optional[PrometheusMetricsService]: - if settings.prometheus_url is not None: - logger.info("Prometheus URL is specified, will not auto-detect a metrics service") - metrics_to_check = [PrometheusMetricsService] - else: - logger.info("No Prometheus URL is specified, trying to auto-detect a metrics service") - metrics_to_check = [VictoriaMetricsService, ThanosMetricsService, MimirMetricsService, PrometheusMetricsService] + def discover(self, api_client: ApiClient) -> None: + """Try to automatically discover a Prometheus service.""" + metrics_to_check: list[PrometheusMetricsService] = [ + VictoriaMetricsService, + ThanosMetricsService, + MimirMetricsService, + PrometheusMetricsService, + ] for metric_service_class in metrics_to_check: - service_name = metric_service_class.name() + logger.info(f"Trying to find {metric_service_class.name()}{self._for_cluster_postfix}") try: - loader = metric_service_class(api_client=api_client, cluster=cluster, executor=self.executor) - loader.check_connection() - except MetricsNotFound as e: - logger.info(f"{service_name} not found: {e}") - except ApiException as e: - logger.warning( - f"Unable to automatically discover a {service_name} in the cluster ({e}). " - "Try specifying how to connect to Prometheus via cli options" - ) + loader = metric_service_class.discover(api_client=api_client) + self._connect(loader) + except Exception: + logger.info(f"Wasn't able to find {metric_service_class.name()}{self._for_cluster_postfix}") else: - logger.info(f"{service_name} found") - loader.validate_cluster_name() - return loader + return + + raise PrometheusNotFound - return None + def connect(self, url: Optional[str] = None) -> None: + """Connect to a Prometheus service using a URL.""" + try: + loader = PrometheusMetricsService(url=url) + self._connect(loader) + except Exception as e: + logger.warning(f"Unable to connect to Prometheus using the provided URL ({e})") + raise e + else: + logger.info(f"{loader.name()} connected successfully") + + def _connect(self, loader: PrometheusMetricsService) -> None: + service_name = loader.name() + try: + loader.check_connection() + except MetricsNotFound as e: + logger.info(f"{service_name} not found: {e}") + raise PrometheusNotFound(f"Wasn't able to connect to {service_name}" + self._for_cluster_postfix) + except ApiException as e: + logger.warning( + f"Unable to automatically discover a {service_name}{self._for_cluster_postfix} ({e}). " + "Try specifying how to connect to Prometheus via cli options" + ) + raise e + else: + logger.info(f"{service_name} found") + loader.validate_cluster_name() + self.loader = loader async def get_history_range( self, history_duration: datetime.timedelta @@ -125,3 +122,8 @@ class PrometheusConnector: MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step) for MetricLoader in strategy.metrics } + + @property + def _for_cluster_postfix(self) -> str: + """The string postfix to be used in logging messages.""" + return f" for {self.cluster} cluster" if self.cluster else "" diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py index ea3af57..e11d705 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py @@ -7,6 +7,7 @@ from robusta_krr.utils.service_discovery import MetricsServiceDiscovery from .prometheus_metrics_service import PrometheusMetricsService + class MimirMetricsDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: """ diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 4290e64..08fd83f 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,8 +1,11 @@ +from __future__ import annotations + import asyncio import logging from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Iterable, List, Optional +from typing_extensions import Self from kubernetes.client import ApiClient from prometheus_api_client import PrometheusApiClientException @@ -59,16 +62,18 @@ class PrometheusMetricsService(MetricsService): url: str, cluster: Optional[str] = None, executor: Optional[ThreadPoolExecutor] = None, + api_client: Optional[ApiClient] = None, ) -> None: self.url = url + self.url_postfix self.cluster = cluster self.executor = executor or ThreadPoolExecutor(settings.max_workers) - - logger.info(f"Trying to connect to {self.name()}" + self._for_cluster_postfix) + self.api_client = api_client self.auth_header = settings.prometheus_auth_header self.ssl_enabled = settings.prometheus_ssl_enabled + logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix) + if settings.openshift: logging.info("Openshift flag is set, trying to load token from service account.") openshift_token = openshift.load_token() @@ -79,20 +84,6 @@ class PrometheusMetricsService(MetricsService): else: logging.warning("Openshift token is not found, trying to connect without it.") - self.prometheus_discovery = self.service_discovery(api_client=self.api_client) - - self.url = settings.prometheus_url - self.url = self.url or self.prometheus_discovery.find_metrics_url() - - if not self.url: - raise PrometheusNotFound( - f"{self.name()} instance could not be found while scanning" + self._for_cluster_postfix - ) - - self.url += self.url_postfix - - logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix) - headers = settings.prometheus_other_headers headers |= self.additional_headers @@ -100,9 +91,23 @@ class PrometheusMetricsService(MetricsService): headers |= {"Authorization": self.auth_header} elif not settings.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) + self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self) self.prometheus = get_custom_prometheus_connect(self.prom_config) + @classmethod + def discover( + cls, + api_client: ApiClient, + cluster: Optional[str] = None, + executor: Optional[ThreadPoolExecutor] = None, + ) -> Self: + url = cls.service_discovery(api_client=api_client).find_metrics_url() + if not url: + raise PrometheusNotFound(f"{cls.name()} instance could not be found while scanning") + + return cls(url, cluster, executor, api_client) + def check_connection(self): """ Checks the connection to Prometheus. @@ -249,7 +254,9 @@ class PrometheusMetricsService(MetricsService): }}[{period_literal}] """ ) - pod_owners = {repl_controller["metric"]["replicationcontroller"] for repl_controller in replication_controllers} + pod_owners = { + repl_controller["metric"]["replicationcontroller"] for repl_controller in replication_controllers + } pod_owner_kind = "ReplicationController" del replication_controllers @@ -314,4 +321,4 @@ class PrometheusMetricsService(MetricsService): @property def _for_cluster_postfix(self) -> str: """The string postfix to be used in logging messages.""" - return (f" for {self.cluster} cluster" if self.cluster else "") + return f" for {self.cluster} cluster" if self.cluster else "" diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 4b337c7..4f8b506 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from enum import Enum import sys from typing import Any, Literal, Optional, Union @@ -12,19 +13,24 @@ from rich.logging import RichHandler from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy -from robusta_krr.core.integrations.kubernetes.workload_loader.base import BaseClusterLoader +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader from robusta_krr.core.models.objects import KindLiteral logger = logging.getLogger("krr") +class LoadingMode(str, Enum): + KUBEAPI = "kubeapi" + PROMETHETUS = "prometheus" + + class Config(pd.BaseSettings): quiet: bool = pd.Field(False) verbose: bool = pd.Field(False) clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None - workload_loader: Literal["kubeapi", "prometheus"] = pd.Field("kubeapi") + mode: LoadingMode = pd.Field(LoadingMode.KUBEAPI) impersonate_user: Optional[str] = None impersonate_group: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") @@ -138,19 +144,18 @@ class Config(pd.BaseSettings): self._logging_console = Console(file=sys.stderr if self.log_to_stderr else sys.stdout, width=self.width) return self._logging_console - @property - def cluster_loader(self) -> BaseClusterLoader: - from robusta_krr.core.integrations.kubernetes.workload_loader import ( - KubeAPIClusterLoader, - PrometheusClusterLoader, - ) + def create_cluster_loader(self) -> BaseClusterLoader: + from robusta_krr.core.integrations.prometheus.cluster_loader import PrometheusClusterLoader + from robusta_krr.core.integrations.kubernetes.cluster_loader import KubeAPIClusterLoader - if settings.workload_loader == "kubeapi": + if settings.mode == LoadingMode.KUBEAPI: + logger.info("Connecting using Kubernetes API, will load the kubeconfig.") return KubeAPIClusterLoader() - elif settings.workload_loader == "prometheus": + elif settings.mode == LoadingMode.PROMETHETUS: + logger.info("Connecting using Prometheus, will load the kubeconfig.") return PrometheusClusterLoader() else: - raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented") + raise NotImplementedError(f"Workload loader {settings.mode} is not implemented") def load_kubeconfig(self) -> None: try: diff --git a/robusta_krr/core/models/exceptions.py b/robusta_krr/core/models/exceptions.py new file mode 100644 index 0000000..70f3b05 --- /dev/null +++ b/robusta_krr/core/models/exceptions.py @@ -0,0 +1,2 @@ +class CriticalRunnerException(Exception): + """This exception will be raised when a critical error occurs in the runner and the runner cannot continue.""" diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index bb109df..75c2191 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -1,4 +1,5 @@ import asyncio +import itertools import logging import math import os @@ -8,12 +9,13 @@ from typing import Optional, Union from datetime import timedelta from rich.console import Console from slack_sdk import WebClient +from prometrix import PrometheusNotFound from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult -from robusta_krr.core.integrations.kubernetes.workload_loader import IListPodsFallback -from robusta_krr.core.integrations.kubernetes import ClusterConnector +from robusta_krr.core.abstract.workload_loader import IListPodsFallback from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException from robusta_krr.core.models.config import settings +from robusta_krr.core.models.exceptions import CriticalRunnerException from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData from robusta_krr.utils.intro import load_intro_message @@ -33,12 +35,8 @@ def custom_print(*objects, rich: bool = True, force: bool = False) -> None: print_func(*objects) # type: ignore -class CriticalRunnerException(Exception): ... - - class Runner: def __init__(self) -> None: - self.connector = ClusterConnector() self.strategy = settings.create_strategy() self.errors: list[dict] = [] @@ -144,16 +142,18 @@ class Runner: } async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]: - prometheus_loader = self.connector.get_prometheus(object.cluster) + prometheus = self.connector.get_prometheus(object.cluster) - if prometheus_loader is None: + if prometheus is None: return None - object.pods = await prometheus_loader.load_pods(object, self.strategy.settings.history_timedelta) - if object.pods == [] and isinstance(self.connector, IListPodsFallback): + cluster_loader = self.connector.get_workload_loader(object.cluster) + + object.pods = await prometheus.load_pods(object, self.strategy.settings.history_timedelta) + if object.pods == [] and isinstance(cluster_loader, IListPodsFallback): # Fallback to IListPodsFallback if Prometheus did not return any pods # IListPodsFallback is implemented by the Kubernetes API connector - object.pods = await self.connector.load_pods(object) + object.pods = await cluster_loader.load_pods(object) # NOTE: Kubernetes API returned pods, but Prometheus did not # This might happen with fast executing jobs @@ -164,7 +164,7 @@ class Runner: "Loaded pods from Kubernetes API instead." ) - metrics = await prometheus_loader.gather_data( + metrics = await prometheus.gather_data( object, self.strategy, self.strategy.settings.history_timedelta, @@ -179,10 +179,18 @@ class Runner: logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)") return self._format_result(result) - async def _check_data_availability(self, cluster: Optional[str]) -> None: - prometheus_loader = self.connector.get_prometheus(cluster) - if prometheus_loader is None: - return + async def _check_cluster(self, cluster: Optional[str]) -> bool: + try: + prometheus_loader = self.connector.get_prometheus(cluster) + except PrometheusNotFound: + logger.error( + f"Wasn't able to connect to any Prometheus service" + f' for cluster {cluster}' if cluster is not None else "" + "\nTry using port-forwarding and/or setting the url manually (using the -p flag.).\n" + "For more information, see 'Giving the Explicit Prometheus URL' at " + "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" + ) + return False try: history_range = await prometheus_loader.get_history_range(timedelta(hours=5)) @@ -195,7 +203,7 @@ class Runner: "name": "HistoryRangeError", } ) - return + return True # We can try to continue without history range logger.debug(f"History range for {cluster}: {history_range}") enough_data = self.strategy.settings.history_range_enough(history_range) @@ -218,6 +226,8 @@ class Runner: } ) + return True + async def _gather_object_allocations(self, k8s_object: K8sWorkload) -> Optional[ResourceScan]: recommendation = await self._calculate_object_recommendations(k8s_object) @@ -248,14 +258,36 @@ class Runner: logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') + # This is for code clarity. All functions take str | None as cluster parameter if clusters is None: - await self._check_data_availability(None) - else: - await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters]) + clusters = [None] + + checks = await asyncio.gather(*[self._check_cluster(cluster) for cluster in clusters]) + clusters = [cluster for cluster, check in zip(clusters, checks) if check] + + if clusters == []: + raise CriticalRunnerException("No clusters available to scan.") + + workload_loaders = {cluster: self.connector.try_get_workload_loader(cluster) for cluster in clusters} + + # NOTE: we filter out None values as they are clusters that we could not connect to + workload_loaders = {cluster: loader for cluster, loader in workload_loaders.items() if loader is not None} + + if workload_loaders == {}: + raise CriticalRunnerException("Could not connect to any cluster.") with ProgressBar(title="Calculating Recommendation") as self.__progressbar: - workloads = await self.connector.list_workloads(clusters) + # We gather all workloads from all clusters in parallel (asyncio.gather) + # Then we chain all workloads together (itertools.chain) + workloads = list( + itertools.chain(*await asyncio.gather(*[loader.list_workloads() for loader in workload_loaders.values()])) + ) + # Then we gather all recommendations for all workloads in parallel (asyncio.gather) scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads]) + # NOTE: Previously we were streaming workloads to + # calculate recommendations as soon as they were available (not waiting for all workloads to be loaded), + # but it gave minor performance improvements (most of the time was spent on calculating recommendations) + # So we decided to do those two steps sequentially to simplify the code successful_scans = [scan for scan in scans if scan is not None] @@ -284,13 +316,6 @@ class Runner: await self._greet() try: - settings.load_kubeconfig() - except Exception as e: - logger.error(f"Could not load kubernetes configuration: {e}") - logger.error("Try to explicitly set --context and/or --kubeconfig flags.") - return 1 # Exit with error - - try: # eks has a lower step limit than other types of prometheus, it will throw an error step_count = self.strategy.settings.history_duration * 60 / self.strategy.settings.timeframe_duration if settings.eks_managed_prom and step_count > 11000: @@ -301,6 +326,8 @@ class Runner: ) self.strategy.settings.timeframe_duration = min_step + self.connector = settings.create_cluster_loader() + result = await self._collect_result() logger.info("Result collected, displaying...") self._process_result(result) diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 810ccd1..6fc0f1f 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -15,7 +15,7 @@ from typer.models import OptionInfo from robusta_krr import formatters as concrete_formatters # noqa: F401 from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import BaseStrategy -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import Config, LoadingMode from robusta_krr.core.runner import Runner from robusta_krr.utils.version import get_version @@ -55,10 +55,12 @@ def load_commands() -> None: help="Path to kubeconfig file. If not provided, will attempt to find it.", rich_help_panel="Kubernetes Settings", ), - workload_loader: str = typer.Option( - "kubeapi", - "--workload", - help="Workload loader to use (kubeapi, prometheus).", + mode: LoadingMode = typer.Option( + LoadingMode.KUBEAPI, + "--mode", + "-m", + help="Loading mode. KubeAPI mode requires a kubeconfig and supports auto-discovery. Prometheus mode requires to pass a --prometheus-url.", + case_sensitive=False, rich_help_panel="Kubernetes Settings", ), impersonate_user: Optional[str] = typer.Option( @@ -138,7 +140,7 @@ def load_commands() -> None: None, "--prometheus-cluster-label", "-l", - help="The label in prometheus for your cluster.(Only relevant for centralized prometheus)", + help="The label in prometheus for your cluster. (Only relevant for centralized prometheus)", rich_help_panel="Prometheus Settings", ), prometheus_label: str = typer.Option( @@ -256,7 +258,7 @@ def load_commands() -> None: try: config = Config( kubeconfig=kubeconfig, - workload_loader=workload_loader, + mode=mode, impersonate_user=impersonate_user, impersonate_group=impersonate_group, clusters="*" if all_clusters else clusters, |
