From 7e8f1f42f34f5c4ff2824ced92dc37f9afb2ba10 Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Mon, 29 Apr 2024 23:18:01 +0300 Subject: BaseClusterLoader, class structure change, not finished --- .../core/integrations/kubernetes/__init__.py | 94 +++++----------------- .../kubernetes/workload_loader/__init__.py | 16 +++- .../kubernetes/workload_loader/base.py | 41 +++++++++- .../workload_loader/kube_api/__init__.py | 57 +++++++++++-- .../workload_loader/prometheus/__init__.py | 29 ++++++- .../workload_loader/prometheus/loaders/base.py | 10 +-- .../prometheus/loaders/double_parent.py | 6 +- .../prometheus/loaders/simple_parent.py | 6 +- .../core/integrations/prometheus/connector.py | 8 ++ .../metrics_service/base_metric_service.py | 26 ++---- .../metrics_service/prometheus_metrics_service.py | 18 +++-- robusta_krr/core/models/config.py | 19 ++++- 12 files changed, 206 insertions(+), 124 deletions(-) (limited to 'robusta_krr') diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index 6af1733..403768c 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -25,7 +25,13 @@ from robusta_krr.utils.object_like_dict import ObjectLikeDict from prometrix import PrometheusNotFound from . import config_patch as _ -from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader +from .workload_loader import ( + BaseWorkloadLoader, + PrometheusWorkloadLoader, + BaseClusterLoader, + KubeAPIClusterLoader, + PrometheusClusterLoader, +) logger = logging.getLogger("krr") @@ -40,81 +46,23 @@ class ClusterConnector: self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {} self._connector_errors: set[Exception] = set() - async def list_clusters(self) -> Optional[list[str]]: - """List all clusters. - - Returns: - A list of clusters. - """ - - if settings.inside_cluster: - logger.debug("Working inside the cluster") - return None - - try: - contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) - except config.ConfigException: - if settings.clusters is not None and settings.clusters != "*": - logger.warning("Could not load context from kubeconfig.") - logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") - return settings.clusters - else: - logger.error( - "Could not load context from kubeconfig. " - "Please check your kubeconfig file or pass -c flag with the context name." - ) - return None - - logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") - logger.debug(f"Current cluster: {current_context['name']}") - - logger.debug(f"Configured clusters: {settings.clusters}") - - # None, empty means current cluster - if not settings.clusters: - return [current_context["name"]] - - # * means all clusters - if settings.clusters == "*": - return [context["name"] for context in contexts] - - return [context["name"] for context in contexts if context["name"] in settings.clusters] - def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]: - if cluster not in self._prometheus_connectors: - try: - logger.debug(f"Creating Prometheus connector for cluster {cluster}") - self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster) - except Exception as e: - self._prometheus_connectors[cluster] = e - - result = self._prometheus_connectors[cluster] - if isinstance(result, self.EXPECTED_EXCEPTIONS): - if result not in self._connector_errors: - self._connector_errors.add(result) - logger.error(str(result)) - return None - elif isinstance(result, Exception): - raise result - return result - - def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: + if settings.workload_loader == "kubeapi": + logger.debug(f"Creating Prometheus connector for cluster {cluster}") + elif settings.workload_loader == "prometheus": + logger.debug(f"Creating Prometheus connector") + # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters + # so in case of multiple clusters in one Prometheus (centralized version) + # for each cluster we will have the same PrometheusConnector (keyed by None) + cluster = None + + + + def _create_cluster_loader(self) -> BaseClusterLoader: try: - if settings.workload_loader == "kubeapi": - return KubeAPIWorkloadLoader(cluster=cluster) - elif settings.workload_loader == "prometheus": - cluster_loader = self.get_prometheus(cluster) - if cluster_loader is not None: - return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader) - else: - logger.error( - f"Could not load Prometheus for cluster {cluster} and will skip it." - "Not possible to load workloads through Prometheus without connection to Prometheus." - ) - else: - raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented") + except Exception as e: - logger.error(f"Could not load cluster {cluster} and will skip it: {e}") + logger.error(f"Could not connect to cluster loader and will skip it: {e}") return None diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py index 06dd8c0..2cad0cc 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py @@ -1,5 +1,13 @@ -from .base import BaseWorkloadLoader, IListPodsFallback -from .kube_api import KubeAPIWorkloadLoader -from .prometheus import PrometheusWorkloadLoader +from .base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader +from .kube_api import KubeAPIWorkloadLoader, KubeAPIClusterLoader +from .prometheus import PrometheusWorkloadLoader, PrometheusClusterLoader -__all__ = ["BaseWorkloadLoader", "IListPodsFallback", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"] \ No newline at end of file +__all__ = [ + "BaseWorkloadLoader", + "IListPodsFallback", + "KubeAPIWorkloadLoader", + "PrometheusWorkloadLoader", + "BaseClusterLoader", + "KubeAPIClusterLoader", + "PrometheusClusterLoader", +] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py index 11a2783..316bcbd 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py @@ -1,9 +1,17 @@ import abc +import logging + +from typing import Optional, Union +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService from robusta_krr.core.models.objects import K8sWorkload, PodData +logger = logging.getLogger("krr") + + class BaseWorkloadLoader(abc.ABC): - """A base class for workload loaders.""" + """A base class for single cluster workload loaders.""" @abc.abstractmethod async def list_workloads(self) -> list[K8sWorkload]: @@ -16,3 +24,34 @@ class IListPodsFallback(abc.ABC): @abc.abstractmethod async def list_pods(self, object: K8sWorkload) -> list[PodData]: pass + + +class BaseClusterLoader(abc.ABC): + """ + A class that wraps loading data from multiple clusters. + For example, a centralized prometheus server that can query multiple clusters. + Or one kubeconfig can define connections to multiple clusters. + """ + + def __init__(self) -> None: + self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {} + self._connector_errors: set[Exception] = set() + + @abc.abstractmethod + async def list_clusters(self) -> Optional[list[str]]: + pass + + @abc.abstractmethod + async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader: + pass + + def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusMetricsService: + """ + Connect to a Prometheus server and return a PrometheusConnector instance. + Cluster = None means that prometheus is the only one: either centralized or in-cluster. + """ + + if cluster not in self._prometheus_connectors: + self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster) + + return self._prometheus_connectors[cluster] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py index a7470d7..19cd108 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import asyncio import logging from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from typing import Any, Awaitable, Callable, Optional -from kubernetes import client # type: ignore +from kubernetes import client, config # type: ignore from kubernetes.client import ApiException # type: ignore from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore @@ -12,7 +14,7 @@ from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations -from ..base import BaseWorkloadLoader, IListPodsFallback +from ..base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader from .loaders import ( BaseKindLoader, CronJobLoader, @@ -29,8 +31,53 @@ logger = logging.getLogger("krr") HPAKey = tuple[str, str, str] +class KubeAPIClusterLoader(BaseClusterLoader): + # NOTE: For KubeAPIClusterLoader we have to first connect to read kubeconfig + # We do not need to connect to Prometheus from here, as we query all data from Kubernetes API + # Also here we might have different Prometeus instances for different clusters + + def __init__(self) -> None: + self.api_client = settings.get_kube_client() + + async def list_clusters(self) -> Optional[list[str]]: + if settings.inside_cluster: + logger.debug("Working inside the cluster") + return None + + try: + contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) + except config.ConfigException: + if settings.clusters is not None and settings.clusters != "*": + logger.warning("Could not load context from kubeconfig.") + logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") + return settings.clusters + else: + logger.error( + "Could not load context from kubeconfig. " + "Please check your kubeconfig file or pass -c flag with the context name." + ) + return None + + logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") + logger.debug(f"Current cluster: {current_context['name']}") + logger.debug(f"Configured clusters: {settings.clusters}") + + # None, empty means current cluster + if not settings.clusters: + return [current_context["name"]] + + # * means all clusters + if settings.clusters == "*": + return [context["name"] for context in contexts] + + return [context["name"] for context in contexts if context["name"] in settings.clusters] + + async def connect_cluster(self, cluster: str) -> KubeAPIWorkloadLoader: + return KubeAPIWorkloadLoader(cluster) + + class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): - workload_loaders: list[BaseKindLoader] = [ + kind_loaders: list[BaseKindLoader] = [ DeploymentLoader, RolloutLoader, DeploymentConfigLoader, @@ -40,7 +87,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): CronJobLoader, ] - def __init__(self, cluster: Optional[str] = None) -> None: + def __init__(self, cluster: Optional[str]) -> None: self.cluster = cluster # This executor will be running requests to Kubernetes API @@ -53,7 +100,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) self._hpa_list: dict[HPAKey, HPAData] = {} self._workload_loaders: dict[KindLiteral, BaseKindLoader] = { - loader.kind: loader(self.api_client, self.executor) for loader in self.workload_loaders + loader.kind: loader(self.api_client, self.executor) for loader in self.kind_loaders } async def list_workloads(self) -> list[K8sWorkload]: diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py index bfe6853..6796d20 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py @@ -4,24 +4,45 @@ import logging from collections import Counter +from pyparsing import Optional + from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import K8sWorkload -from ..base import BaseWorkloadLoader +from ..base import BaseWorkloadLoader, BaseClusterLoader from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader logger = logging.getLogger("krr") +class PrometheusClusterLoader(BaseClusterLoader): + # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it + + def __init__(self) -> None: + super().__init__() + self.prometheus_connector = super().connect_prometheus() + + async def list_clusters(self) -> list[str]: + return [] + + async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader: + return PrometheusWorkloadLoader(cluster, self.prometheus_connector) + + def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusConnector: + # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters + # so in case of multiple clusters in one Prometheus (centralized version) + # for each cluster we will have the same PrometheusConnector (keyed by None) + return self.prometheus_connector + class PrometheusWorkloadLoader(BaseWorkloadLoader): workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader] - def __init__(self, cluster: str, metric_loader: PrometheusConnector) -> None: + def __init__(self, cluster: str, prometheus_connector: PrometheusConnector) -> None: self.cluster = cluster - self.metric_service = metric_loader - self.loaders = [loader(metric_loader) for loader in self.workloads] + self.metric_service = prometheus_connector + self.loaders = [loader(prometheus_connector) for loader in self.workloads] async def list_workloads(self) -> list[K8sWorkload]: workloads = list( diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py index 3be8fe1..9534261 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py @@ -37,8 +37,8 @@ class BaseKindLoader(abc.ABC): kinds: list[KindLiteral] = [] - def __init__(self, connector: PrometheusConnector) -> None: - self.connector = connector + def __init__(self, prometheus: PrometheusConnector) -> None: + self.prometheus = prometheus self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label() @property @@ -50,7 +50,7 @@ class BaseKindLoader(abc.ABC): pass async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations: - limits = await self.connector.loader.query( + limits = await self.prometheus.loader.query( f""" avg by(resource) ( kube_pod_container_resource_limits{{ @@ -62,7 +62,7 @@ class BaseKindLoader(abc.ABC): ) """ ) - requests = await self.connector.loader.query( + requests = await self.prometheus.loader.query( f""" avg by(resource) ( kube_pod_container_resource_requests{{ @@ -90,7 +90,7 @@ class BaseKindLoader(abc.ABC): return ResourceAllocations(requests=requests_values, limits=limits_values) async def _list_containers_in_pods(self, pods: list[str]) -> set[str]: - containers = await self.connector.loader.query( + containers = await self.prometheus.loader.query( f""" count by (container) ( kube_pod_container_info{{ diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py index ea2353b..e757af4 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py @@ -53,7 +53,7 @@ class DoubleParentLoader(BaseKindLoader): subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name" # Replica is for ReplicaSet and/or ReplicationController - subowners = await self.connector.loader.query( + subowners = await self.prometheus.loader.query( f""" count by (namespace, owner_name, {subowner_label}, owner_kind) ( {metric_name} {{ @@ -91,7 +91,7 @@ class DoubleParentLoader(BaseKindLoader): async def _list_pods_of_subowner( self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str] ) -> list[K8sWorkload]: - pods = await self.connector.loader.query( + pods = await self.prometheus.loader.query( f""" count by (namespace, owner_name, owner_kind, pod) ( kube_pod_owner{{ @@ -111,7 +111,7 @@ class DoubleParentLoader(BaseKindLoader): return [ K8sWorkload( - cluster=self.connector.cluster, + cluster=self.prometheus.cluster, namespace=namespace, name=name, kind=kind, diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py index 05b7d74..233ebb2 100644 --- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py @@ -22,7 +22,7 @@ class SimpleParentLoader(BaseKindLoader): ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' ) - results = await self.connector.loader.query( + results = await self.prometheus.loader.query( f""" count by (namespace, owner_name, owner_kind, pod) ( kube_pod_owner{{ @@ -46,7 +46,7 @@ class SimpleParentLoader(BaseKindLoader): # NOTE: We do not show jobs that are a part of a cronjob, so we filter them out job_workloads = [name for (_, name, kind) in workloads if kind == "Job"] if job_workloads != []: - cronjobs = await self.connector.loader.query( + cronjobs = await self.prometheus.loader.query( f""" count by (namespace, job_name) ( kube_job_owner{{ @@ -72,7 +72,7 @@ class SimpleParentLoader(BaseKindLoader): return [ K8sWorkload( - cluster=self.connector.cluster, + cluster=self.prometheus.cluster, namespace=namespace, name=name, kind=kind, diff --git a/robusta_krr/core/integrations/prometheus/connector.py b/robusta_krr/core/integrations/prometheus/connector.py index ab89b34..399209a 100644 --- a/robusta_krr/core/integrations/prometheus/connector.py +++ b/robusta_krr/core/integrations/prometheus/connector.py @@ -49,6 +49,14 @@ class PrometheusConnector: logger.info(f"{self.loader.name()} connected successfully for {cluster or 'inner'} cluster") + @classmethod + def discover(cls, api_client: ApiClient) -> PrometheusConnector: + return cls() + + @classmethod + def connect(cls, cluster: str) -> PrometheusConnector: + return cls(cluster=cluster) + def get_metrics_service( self, api_client: Optional[ApiClient] = None, diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index ae34a7a..9b0c163 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -13,28 +13,13 @@ from ..metrics import PrometheusMetric class MetricsService(abc.ABC): - def __init__( - self, - api_client: Optional[ApiClient] = None, - cluster: Optional[str] = None, - executor: Optional[ThreadPoolExecutor] = None, - ) -> None: - self.api_client = api_client - self.cluster = cluster or "default" - self.executor = executor - @abc.abstractmethod def check_connection(self): - ... - - @classmethod - def name(cls) -> str: - classname = cls.__name__ - return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname + pass @abc.abstractmethod def get_cluster_names(self) -> Optional[List[str]]: - ... + pass @abc.abstractmethod async def gather_data( @@ -44,7 +29,12 @@ class MetricsService(abc.ABC): period: datetime.timedelta, step: datetime.timedelta = datetime.timedelta(minutes=30), ) -> PodsTimeData: - ... + pass + + @classmethod + def name(cls) -> str: + classname = cls.__name__ + return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname def get_prometheus_cluster_label(self) -> str: """ diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 50b86a6..4290e64 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -56,14 +56,15 @@ class PrometheusMetricsService(MetricsService): def __init__( self, - *, + url: str, cluster: Optional[str] = None, - api_client: Optional[ApiClient] = None, executor: Optional[ThreadPoolExecutor] = None, ) -> None: - super().__init__(api_client=api_client, cluster=cluster, executor=executor) + self.url = url + self.url_postfix + self.cluster = cluster + self.executor = executor or ThreadPoolExecutor(settings.max_workers) - logger.info(f"Trying to connect to {self.name()} for {self.cluster} cluster") + logger.info(f"Trying to connect to {self.name()}" + self._for_cluster_postfix) self.auth_header = settings.prometheus_auth_header self.ssl_enabled = settings.prometheus_ssl_enabled @@ -85,12 +86,12 @@ class PrometheusMetricsService(MetricsService): if not self.url: raise PrometheusNotFound( - f"{self.name()} instance could not be found while scanning in {self.cluster} cluster." + f"{self.name()} instance could not be found while scanning" + self._for_cluster_postfix ) self.url += self.url_postfix - logger.info(f"Using {self.name()} at {self.url} for cluster {cluster or 'default'}") + logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix) headers = settings.prometheus_other_headers headers |= self.additional_headers @@ -309,3 +310,8 @@ class PrometheusMetricsService(MetricsService): del pods_status_result return list({PodData(name=pod, deleted=pod not in current_pods_set) for pod in related_pods}) + + @property + def _for_cluster_postfix(self) -> str: + """The string postfix to be used in logging messages.""" + return (f" for {self.cluster} cluster" if self.cluster else "") diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 6e03967..4b337c7 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -5,13 +5,14 @@ import sys from typing import Any, Literal, Optional, Union import pydantic as pd -from kubernetes import config +from kubernetes import config, client from kubernetes.config.config_exception import ConfigException from rich.console import Console from rich.logging import RichHandler from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy +from robusta_krr.core.integrations.kubernetes.workload_loader.base import BaseClusterLoader from robusta_krr.core.models.objects import KindLiteral logger = logging.getLogger("krr") @@ -137,6 +138,20 @@ class Config(pd.BaseSettings): self._logging_console = Console(file=sys.stderr if self.log_to_stderr else sys.stdout, width=self.width) return self._logging_console + @property + def cluster_loader(self) -> BaseClusterLoader: + from robusta_krr.core.integrations.kubernetes.workload_loader import ( + KubeAPIClusterLoader, + PrometheusClusterLoader, + ) + + if settings.workload_loader == "kubeapi": + return KubeAPIClusterLoader() + elif settings.workload_loader == "prometheus": + return PrometheusClusterLoader() + else: + raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented") + def load_kubeconfig(self) -> None: try: config.load_kube_config(config_file=self.kubeconfig, context=self.context) @@ -145,7 +160,7 @@ class Config(pd.BaseSettings): config.load_incluster_config() self.inside_cluster = True - def get_kube_client(self, context: Optional[str] = None): + def get_kube_client(self, context: Optional[str] = None) -> client.ApiClient: if context is None: return None -- cgit v1.2.3