summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLeaveMyYard <zhukovpavel2001@gmail.com>2024-04-29 23:18:01 +0300
committerLeaveMyYard <zhukovpavel2001@gmail.com>2024-04-29 23:18:01 +0300
commit7e8f1f42f34f5c4ff2824ced92dc37f9afb2ba10 (patch)
tree89c90cc5a55c00e9334e85fed75b0d3682bd464b
parentb9a62a0d1b52c597540d18b4f33119a36cb5968f (diff)
BaseClusterLoader, class structure change, not finished
-rw-r--r--robusta_krr/core/integrations/kubernetes/__init__.py94
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py16
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/base.py41
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py57
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py29
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py10
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py6
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py6
-rw-r--r--robusta_krr/core/integrations/prometheus/connector.py8
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py26
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py18
-rw-r--r--robusta_krr/core/models/config.py19
12 files changed, 206 insertions, 124 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py
index 6af1733..403768c 100644
--- a/robusta_krr/core/integrations/kubernetes/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/__init__.py
@@ -25,7 +25,13 @@ from robusta_krr.utils.object_like_dict import ObjectLikeDict
from prometrix import PrometheusNotFound
from . import config_patch as _
-from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader
+from .workload_loader import (
+ BaseWorkloadLoader,
+ PrometheusWorkloadLoader,
+ BaseClusterLoader,
+ KubeAPIClusterLoader,
+ PrometheusClusterLoader,
+)
logger = logging.getLogger("krr")
@@ -40,81 +46,23 @@ class ClusterConnector:
self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {}
self._connector_errors: set[Exception] = set()
- async def list_clusters(self) -> Optional[list[str]]:
- """List all clusters.
-
- Returns:
- A list of clusters.
- """
-
- if settings.inside_cluster:
- logger.debug("Working inside the cluster")
- return None
-
- try:
- contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig)
- except config.ConfigException:
- if settings.clusters is not None and settings.clusters != "*":
- logger.warning("Could not load context from kubeconfig.")
- logger.warning(f"Falling back to clusters from CLI: {settings.clusters}")
- return settings.clusters
- else:
- logger.error(
- "Could not load context from kubeconfig. "
- "Please check your kubeconfig file or pass -c flag with the context name."
- )
- return None
-
- logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
- logger.debug(f"Current cluster: {current_context['name']}")
-
- logger.debug(f"Configured clusters: {settings.clusters}")
-
- # None, empty means current cluster
- if not settings.clusters:
- return [current_context["name"]]
-
- # * means all clusters
- if settings.clusters == "*":
- return [context["name"] for context in contexts]
-
- return [context["name"] for context in contexts if context["name"] in settings.clusters]
-
def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]:
- if cluster not in self._prometheus_connectors:
- try:
- logger.debug(f"Creating Prometheus connector for cluster {cluster}")
- self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster)
- except Exception as e:
- self._prometheus_connectors[cluster] = e
-
- result = self._prometheus_connectors[cluster]
- if isinstance(result, self.EXPECTED_EXCEPTIONS):
- if result not in self._connector_errors:
- self._connector_errors.add(result)
- logger.error(str(result))
- return None
- elif isinstance(result, Exception):
- raise result
- return result
-
- def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
+ if settings.workload_loader == "kubeapi":
+ logger.debug(f"Creating Prometheus connector for cluster {cluster}")
+ elif settings.workload_loader == "prometheus":
+ logger.debug(f"Creating Prometheus connector")
+ # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters
+ # so in case of multiple clusters in one Prometheus (centralized version)
+ # for each cluster we will have the same PrometheusConnector (keyed by None)
+ cluster = None
+
+
+
+ def _create_cluster_loader(self) -> BaseClusterLoader:
try:
- if settings.workload_loader == "kubeapi":
- return KubeAPIWorkloadLoader(cluster=cluster)
- elif settings.workload_loader == "prometheus":
- cluster_loader = self.get_prometheus(cluster)
- if cluster_loader is not None:
- return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader)
- else:
- logger.error(
- f"Could not load Prometheus for cluster {cluster} and will skip it."
- "Not possible to load workloads through Prometheus without connection to Prometheus."
- )
- else:
- raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented")
+
except Exception as e:
- logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
+ logger.error(f"Could not connect to cluster loader and will skip it: {e}")
return None
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
index 06dd8c0..2cad0cc 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
@@ -1,5 +1,13 @@
-from .base import BaseWorkloadLoader, IListPodsFallback
-from .kube_api import KubeAPIWorkloadLoader
-from .prometheus import PrometheusWorkloadLoader
+from .base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader
+from .kube_api import KubeAPIWorkloadLoader, KubeAPIClusterLoader
+from .prometheus import PrometheusWorkloadLoader, PrometheusClusterLoader
-__all__ = ["BaseWorkloadLoader", "IListPodsFallback", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"] \ No newline at end of file
+__all__ = [
+ "BaseWorkloadLoader",
+ "IListPodsFallback",
+ "KubeAPIWorkloadLoader",
+ "PrometheusWorkloadLoader",
+ "BaseClusterLoader",
+ "KubeAPIClusterLoader",
+ "PrometheusClusterLoader",
+]
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
index 11a2783..316bcbd 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
@@ -1,9 +1,17 @@
import abc
+import logging
+
+from typing import Optional, Union
+from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
+from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
from robusta_krr.core.models.objects import K8sWorkload, PodData
+logger = logging.getLogger("krr")
+
+
class BaseWorkloadLoader(abc.ABC):
- """A base class for workload loaders."""
+ """A base class for single cluster workload loaders."""
@abc.abstractmethod
async def list_workloads(self) -> list[K8sWorkload]:
@@ -16,3 +24,34 @@ class IListPodsFallback(abc.ABC):
@abc.abstractmethod
async def list_pods(self, object: K8sWorkload) -> list[PodData]:
pass
+
+
+class BaseClusterLoader(abc.ABC):
+ """
+ A class that wraps loading data from multiple clusters.
+ For example, a centralized prometheus server that can query multiple clusters.
+ Or one kubeconfig can define connections to multiple clusters.
+ """
+
+ def __init__(self) -> None:
+ self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {}
+ self._connector_errors: set[Exception] = set()
+
+ @abc.abstractmethod
+ async def list_clusters(self) -> Optional[list[str]]:
+ pass
+
+ @abc.abstractmethod
+ async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader:
+ pass
+
+ def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusMetricsService:
+ """
+ Connect to a Prometheus server and return a PrometheusConnector instance.
+ Cluster = None means that prometheus is the only one: either centralized or in-cluster.
+ """
+
+ if cluster not in self._prometheus_connectors:
+ self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster)
+
+ return self._prometheus_connectors[cluster]
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
index a7470d7..19cd108 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
@@ -1,10 +1,12 @@
+from __future__ import annotations
+
import asyncio
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Awaitable, Callable, Optional
-from kubernetes import client # type: ignore
+from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException # type: ignore
from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore
@@ -12,7 +14,7 @@ from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
-from ..base import BaseWorkloadLoader, IListPodsFallback
+from ..base import BaseWorkloadLoader, IListPodsFallback, BaseClusterLoader
from .loaders import (
BaseKindLoader,
CronJobLoader,
@@ -29,8 +31,53 @@ logger = logging.getLogger("krr")
HPAKey = tuple[str, str, str]
+class KubeAPIClusterLoader(BaseClusterLoader):
+ # NOTE: For KubeAPIClusterLoader we have to first connect to read kubeconfig
+ # We do not need to connect to Prometheus from here, as we query all data from Kubernetes API
+ # Also here we might have different Prometeus instances for different clusters
+
+ def __init__(self) -> None:
+ self.api_client = settings.get_kube_client()
+
+ async def list_clusters(self) -> Optional[list[str]]:
+ if settings.inside_cluster:
+ logger.debug("Working inside the cluster")
+ return None
+
+ try:
+ contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig)
+ except config.ConfigException:
+ if settings.clusters is not None and settings.clusters != "*":
+ logger.warning("Could not load context from kubeconfig.")
+ logger.warning(f"Falling back to clusters from CLI: {settings.clusters}")
+ return settings.clusters
+ else:
+ logger.error(
+ "Could not load context from kubeconfig. "
+ "Please check your kubeconfig file or pass -c flag with the context name."
+ )
+ return None
+
+ logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
+ logger.debug(f"Current cluster: {current_context['name']}")
+ logger.debug(f"Configured clusters: {settings.clusters}")
+
+ # None, empty means current cluster
+ if not settings.clusters:
+ return [current_context["name"]]
+
+ # * means all clusters
+ if settings.clusters == "*":
+ return [context["name"] for context in contexts]
+
+ return [context["name"] for context in contexts if context["name"] in settings.clusters]
+
+ async def connect_cluster(self, cluster: str) -> KubeAPIWorkloadLoader:
+ return KubeAPIWorkloadLoader(cluster)
+
+
class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
- workload_loaders: list[BaseKindLoader] = [
+ kind_loaders: list[BaseKindLoader] = [
DeploymentLoader,
RolloutLoader,
DeploymentConfigLoader,
@@ -40,7 +87,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
CronJobLoader,
]
- def __init__(self, cluster: Optional[str] = None) -> None:
+ def __init__(self, cluster: Optional[str]) -> None:
self.cluster = cluster
# This executor will be running requests to Kubernetes API
@@ -53,7 +100,7 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True)
self._hpa_list: dict[HPAKey, HPAData] = {}
self._workload_loaders: dict[KindLiteral, BaseKindLoader] = {
- loader.kind: loader(self.api_client, self.executor) for loader in self.workload_loaders
+ loader.kind: loader(self.api_client, self.executor) for loader in self.kind_loaders
}
async def list_workloads(self) -> list[K8sWorkload]:
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
index bfe6853..6796d20 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
@@ -4,24 +4,45 @@ import logging
from collections import Counter
+from pyparsing import Optional
+
from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sWorkload
-from ..base import BaseWorkloadLoader
+from ..base import BaseWorkloadLoader, BaseClusterLoader
from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader
logger = logging.getLogger("krr")
+class PrometheusClusterLoader(BaseClusterLoader):
+ # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it
+
+ def __init__(self) -> None:
+ super().__init__()
+ self.prometheus_connector = super().connect_prometheus()
+
+ async def list_clusters(self) -> list[str]:
+ return []
+
+ async def connect_cluster(self, cluster: str) -> BaseWorkloadLoader:
+ return PrometheusWorkloadLoader(cluster, self.prometheus_connector)
+
+ def connect_prometheus(self, cluster: Optional[str] = None) -> PrometheusConnector:
+ # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters
+ # so in case of multiple clusters in one Prometheus (centralized version)
+ # for each cluster we will have the same PrometheusConnector (keyed by None)
+ return self.prometheus_connector
+
class PrometheusWorkloadLoader(BaseWorkloadLoader):
workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader]
- def __init__(self, cluster: str, metric_loader: PrometheusConnector) -> None:
+ def __init__(self, cluster: str, prometheus_connector: PrometheusConnector) -> None:
self.cluster = cluster
- self.metric_service = metric_loader
- self.loaders = [loader(metric_loader) for loader in self.workloads]
+ self.metric_service = prometheus_connector
+ self.loaders = [loader(prometheus_connector) for loader in self.workloads]
async def list_workloads(self) -> list[K8sWorkload]:
workloads = list(
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
index 3be8fe1..9534261 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
@@ -37,8 +37,8 @@ class BaseKindLoader(abc.ABC):
kinds: list[KindLiteral] = []
- def __init__(self, connector: PrometheusConnector) -> None:
- self.connector = connector
+ def __init__(self, prometheus: PrometheusConnector) -> None:
+ self.prometheus = prometheus
self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label()
@property
@@ -50,7 +50,7 @@ class BaseKindLoader(abc.ABC):
pass
async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations:
- limits = await self.connector.loader.query(
+ limits = await self.prometheus.loader.query(
f"""
avg by(resource) (
kube_pod_container_resource_limits{{
@@ -62,7 +62,7 @@ class BaseKindLoader(abc.ABC):
)
"""
)
- requests = await self.connector.loader.query(
+ requests = await self.prometheus.loader.query(
f"""
avg by(resource) (
kube_pod_container_resource_requests{{
@@ -90,7 +90,7 @@ class BaseKindLoader(abc.ABC):
return ResourceAllocations(requests=requests_values, limits=limits_values)
async def _list_containers_in_pods(self, pods: list[str]) -> set[str]:
- containers = await self.connector.loader.query(
+ containers = await self.prometheus.loader.query(
f"""
count by (container) (
kube_pod_container_info{{
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py
index ea2353b..e757af4 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py
@@ -53,7 +53,7 @@ class DoubleParentLoader(BaseKindLoader):
subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name"
# Replica is for ReplicaSet and/or ReplicationController
- subowners = await self.connector.loader.query(
+ subowners = await self.prometheus.loader.query(
f"""
count by (namespace, owner_name, {subowner_label}, owner_kind) (
{metric_name} {{
@@ -91,7 +91,7 @@ class DoubleParentLoader(BaseKindLoader):
async def _list_pods_of_subowner(
self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str]
) -> list[K8sWorkload]:
- pods = await self.connector.loader.query(
+ pods = await self.prometheus.loader.query(
f"""
count by (namespace, owner_name, owner_kind, pod) (
kube_pod_owner{{
@@ -111,7 +111,7 @@ class DoubleParentLoader(BaseKindLoader):
return [
K8sWorkload(
- cluster=self.connector.cluster,
+ cluster=self.prometheus.cluster,
namespace=namespace,
name=name,
kind=kind,
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py
index 05b7d74..233ebb2 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py
@@ -22,7 +22,7 @@ class SimpleParentLoader(BaseKindLoader):
('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"'
)
- results = await self.connector.loader.query(
+ results = await self.prometheus.loader.query(
f"""
count by (namespace, owner_name, owner_kind, pod) (
kube_pod_owner{{
@@ -46,7 +46,7 @@ class SimpleParentLoader(BaseKindLoader):
# NOTE: We do not show jobs that are a part of a cronjob, so we filter them out
job_workloads = [name for (_, name, kind) in workloads if kind == "Job"]
if job_workloads != []:
- cronjobs = await self.connector.loader.query(
+ cronjobs = await self.prometheus.loader.query(
f"""
count by (namespace, job_name) (
kube_job_owner{{
@@ -72,7 +72,7 @@ class SimpleParentLoader(BaseKindLoader):
return [
K8sWorkload(
- cluster=self.connector.cluster,
+ cluster=self.prometheus.cluster,
namespace=namespace,
name=name,
kind=kind,
diff --git a/robusta_krr/core/integrations/prometheus/connector.py b/robusta_krr/core/integrations/prometheus/connector.py
index ab89b34..399209a 100644
--- a/robusta_krr/core/integrations/prometheus/connector.py
+++ b/robusta_krr/core/integrations/prometheus/connector.py
@@ -49,6 +49,14 @@ class PrometheusConnector:
logger.info(f"{self.loader.name()} connected successfully for {cluster or 'inner'} cluster")
+ @classmethod
+ def discover(cls, api_client: ApiClient) -> PrometheusConnector:
+ return cls()
+
+ @classmethod
+ def connect(cls, cluster: str) -> PrometheusConnector:
+ return cls(cluster=cluster)
+
def get_metrics_service(
self,
api_client: Optional[ApiClient] = None,
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
index ae34a7a..9b0c163 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
@@ -13,28 +13,13 @@ from ..metrics import PrometheusMetric
class MetricsService(abc.ABC):
- def __init__(
- self,
- api_client: Optional[ApiClient] = None,
- cluster: Optional[str] = None,
- executor: Optional[ThreadPoolExecutor] = None,
- ) -> None:
- self.api_client = api_client
- self.cluster = cluster or "default"
- self.executor = executor
-
@abc.abstractmethod
def check_connection(self):
- ...
-
- @classmethod
- def name(cls) -> str:
- classname = cls.__name__
- return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname
+ pass
@abc.abstractmethod
def get_cluster_names(self) -> Optional[List[str]]:
- ...
+ pass
@abc.abstractmethod
async def gather_data(
@@ -44,7 +29,12 @@ class MetricsService(abc.ABC):
period: datetime.timedelta,
step: datetime.timedelta = datetime.timedelta(minutes=30),
) -> PodsTimeData:
- ...
+ pass
+
+ @classmethod
+ def name(cls) -> str:
+ classname = cls.__name__
+ return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname
def get_prometheus_cluster_label(self) -> str:
"""
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 50b86a6..4290e64 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -56,14 +56,15 @@ class PrometheusMetricsService(MetricsService):
def __init__(
self,
- *,
+ url: str,
cluster: Optional[str] = None,
- api_client: Optional[ApiClient] = None,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
- super().__init__(api_client=api_client, cluster=cluster, executor=executor)
+ self.url = url + self.url_postfix
+ self.cluster = cluster
+ self.executor = executor or ThreadPoolExecutor(settings.max_workers)
- logger.info(f"Trying to connect to {self.name()} for {self.cluster} cluster")
+ logger.info(f"Trying to connect to {self.name()}" + self._for_cluster_postfix)
self.auth_header = settings.prometheus_auth_header
self.ssl_enabled = settings.prometheus_ssl_enabled
@@ -85,12 +86,12 @@ class PrometheusMetricsService(MetricsService):
if not self.url:
raise PrometheusNotFound(
- f"{self.name()} instance could not be found while scanning in {self.cluster} cluster."
+ f"{self.name()} instance could not be found while scanning" + self._for_cluster_postfix
)
self.url += self.url_postfix
- logger.info(f"Using {self.name()} at {self.url} for cluster {cluster or 'default'}")
+ logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix)
headers = settings.prometheus_other_headers
headers |= self.additional_headers
@@ -309,3 +310,8 @@ class PrometheusMetricsService(MetricsService):
del pods_status_result
return list({PodData(name=pod, deleted=pod not in current_pods_set) for pod in related_pods})
+
+ @property
+ def _for_cluster_postfix(self) -> str:
+ """The string postfix to be used in logging messages."""
+ return (f" for {self.cluster} cluster" if self.cluster else "")
diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py
index 6e03967..4b337c7 100644
--- a/robusta_krr/core/models/config.py
+++ b/robusta_krr/core/models/config.py
@@ -5,13 +5,14 @@ import sys
from typing import Any, Literal, Optional, Union
import pydantic as pd
-from kubernetes import config
+from kubernetes import config, client
from kubernetes.config.config_exception import ConfigException
from rich.console import Console
from rich.logging import RichHandler
from robusta_krr.core.abstract import formatters
from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy
+from robusta_krr.core.integrations.kubernetes.workload_loader.base import BaseClusterLoader
from robusta_krr.core.models.objects import KindLiteral
logger = logging.getLogger("krr")
@@ -137,6 +138,20 @@ class Config(pd.BaseSettings):
self._logging_console = Console(file=sys.stderr if self.log_to_stderr else sys.stdout, width=self.width)
return self._logging_console
+ @property
+ def cluster_loader(self) -> BaseClusterLoader:
+ from robusta_krr.core.integrations.kubernetes.workload_loader import (
+ KubeAPIClusterLoader,
+ PrometheusClusterLoader,
+ )
+
+ if settings.workload_loader == "kubeapi":
+ return KubeAPIClusterLoader()
+ elif settings.workload_loader == "prometheus":
+ return PrometheusClusterLoader()
+ else:
+ raise NotImplementedError(f"Workload loader {settings.workload_loader} is not implemented")
+
def load_kubeconfig(self) -> None:
try:
config.load_kube_config(config_file=self.kubeconfig, context=self.context)
@@ -145,7 +160,7 @@ class Config(pd.BaseSettings):
config.load_incluster_config()
self.inside_cluster = True
- def get_kube_client(self, context: Optional[str] = None):
+ def get_kube_client(self, context: Optional[str] = None) -> client.ApiClient:
if context is None:
return None