diff options
| author | Павел Жуков <33721692+LeaveMyYard@users.noreply.github.com> | 2023-05-29 13:40:57 +0300 |
|---|---|---|
| committer | Павел Жуков <33721692+LeaveMyYard@users.noreply.github.com> | 2023-05-29 13:40:57 +0300 |
| commit | 29de96b3ccea0f10ed86d7f9015d1bc3b7b90d5a (patch) | |
| tree | f82def135c74924729ab5ae14e9716df12b3684c | |
| parent | 110da668bee9f87e0f98a0e41ef32dee4984dddf (diff) | |
Fix multi-cluster issues
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes.py | 36 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus/loader.py | 16 | ||||
| -rw-r--r-- | robusta_krr/core/models/config.py | 40 | ||||
| -rw-r--r-- | robusta_krr/core/runner.py | 12 | ||||
| -rw-r--r-- | robusta_krr/main.py | 19 | ||||
| -rw-r--r-- | robusta_krr/utils/configurable.py | 2 | ||||
| -rw-r--r-- | robusta_krr/utils/service_discovery.py | 34 |
7 files changed, 109 insertions, 50 deletions
diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index cbe95f3..37ed97f 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -26,7 +26,11 @@ class ClusterLoader(Configurable): super().__init__(*args, **kwargs) self.cluster = cluster - self.api_client = config.new_client_from_config(context=cluster) if cluster is not None else None + self.api_client = ( + config.new_client_from_config(context=cluster, config_file=self.config.kubeconfig) + if cluster is not None + else None + ) self.apps = client.AppsV1Api(api_client=self.api_client) self.batch = client.BatchV1Api(api_client=self.api_client) self.core = client.CoreV1Api(api_client=self.api_client) @@ -185,7 +189,19 @@ class KubernetesLoader(Configurable): self.debug("Working inside the cluster") return None - contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts) + try: + contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts) + except config.ConfigException: + if self.config.clusters is not None and self.config.clusters != "*": + self.warning("Could not load context from kubeconfig.") + self.warning(f"Falling back to clusters from CLI: {self.config.clusters}") + return self.config.clusters + else: + self.error( + "Could not load context from kubeconfig. " + "Please check your kubeconfig file or pass -c flag with the context name." + ) + return None self.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") self.debug(f"Current cluster: {current_context['name']}") @@ -202,6 +218,13 @@ class KubernetesLoader(Configurable): return [context["name"] for context in contexts if context["name"] in self.config.clusters] + def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]: + try: + return ClusterLoader(cluster=cluster, config=self.config) + except Exception as e: + self.error(f"Could not load cluster {cluster} and will skip it: {e}") + return None + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: """List all scannable objects. @@ -210,9 +233,14 @@ class KubernetesLoader(Configurable): """ if clusters is None: - cluster_loaders = [ClusterLoader(cluster=None, config=self.config)] + _cluster_loaders = [self._try_create_cluster_loader(None)] else: - cluster_loaders = [ClusterLoader(cluster=cluster, config=self.config) for cluster in clusters] + _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters] + + cluster_loaders = [cl for cl in _cluster_loaders if cl is not None] + if cluster_loaders == []: + self.error("Could not load any cluster.") + return [] objects = await asyncio.gather(*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders]) return list(itertools.chain(*objects)) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 4e50626..3b3fb15 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -4,7 +4,6 @@ from typing import Optional, no_type_check import requests from kubernetes import config as k8s_config -from kubernetes.client import ApiClient from prometheus_api_client import PrometheusConnect, Retry from requests.adapters import HTTPAdapter from requests.exceptions import ConnectionError, HTTPError @@ -20,7 +19,7 @@ from .metrics import BaseMetricLoader class PrometheusDiscovery(ServiceDiscovery): - def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_prometheus_url(self) -> Optional[str]: return super().find_url( selectors=[ "app=kube-prometheus-stack-prometheus", @@ -32,7 +31,6 @@ class PrometheusDiscovery(ServiceDiscovery): "app=prometheus-prometheus", "app.kubernetes.io/name=vmsingle", ], - api_client=api_client, ) @@ -69,11 +67,15 @@ class PrometheusLoader(Configurable): self.auth_header = self.config.prometheus_auth_header self.ssl_enabled = self.config.prometheus_ssl_enabled - self.api_client = k8s_config.new_client_from_config(context=cluster) if cluster is not None else None - self.prometheus_discovery = PrometheusDiscovery(config=self.config) + self.api_client = ( + k8s_config.new_client_from_config(config_file=self.config.kubeconfig, context=cluster) + if cluster is not None + else None + ) + self.prometheus_discovery = PrometheusDiscovery(config=self.config, api_client=self.api_client) self.url = self.config.prometheus_url - self.url = self.url or self.prometheus_discovery.find_prometheus_url(api_client=self.api_client) + self.url = self.url or self.prometheus_discovery.find_prometheus_url() if not self.url: raise PrometheusNotFound( @@ -81,6 +83,8 @@ class PrometheusLoader(Configurable): "\tTry using port-forwarding and/or setting the url manually (using the -p flag.)." ) + self.info(f"Using prometheus at {self.url} for cluster {cluster or 'default'}") + headers = {} if self.auth_header: diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index ff147af..c82252c 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -1,30 +1,21 @@ from typing import Any, Literal, Optional, Union import pydantic as pd + +from rich.console import Console from kubernetes import config from kubernetes.config.config_exception import ConfigException from robusta_krr.core.abstract.formatters import BaseFormatter from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy -try: - config.load_incluster_config() -except ConfigException: - try: - config.load_kube_config() - except ConfigException: - IN_CLUSTER = None - else: - IN_CLUSTER = False -else: - IN_CLUSTER = True - class Config(pd.BaseSettings): quiet: bool = pd.Field(False) verbose: bool = pd.Field(False) clusters: Union[list[str], Literal["*"], None] = None + kubeconfig: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") # Value settings @@ -43,6 +34,14 @@ class Config(pd.BaseSettings): other_args: dict[str, Any] + # Internal + inside_cluster: bool = False + console: Optional[Console] = None + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self.console = Console(stderr=self.log_to_stderr) + @property def Formatter(self) -> type[BaseFormatter]: return BaseFormatter.find(self.format) @@ -70,9 +69,14 @@ class Config(pd.BaseSettings): return v @property - def config_loaded(self) -> bool: - return IN_CLUSTER is not None - - @property - def inside_cluster(self) -> bool: - return bool(IN_CLUSTER) + def context(self) -> Optional[str]: + return self.clusters[0] if self.clusters != "*" and self.clusters else None + + def load_kubeconfig(self) -> None: + try: + config.load_incluster_config() + except ConfigException: + config.load_kube_config(config_file=self.kubeconfig, context=self.context) + self.inside_cluster = False + else: + self.inside_cluster = True diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 52632cf..0b31c7c 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -35,7 +35,7 @@ class Runner(Configurable): if isinstance(result, self.EXPECTED_EXCEPTIONS): if result not in self._prometheus_loaders_error_logged: self._prometheus_loaders_error_logged.add(result) - self.error(result) + self.error(str(result)) return None elif isinstance(result, Exception): raise result @@ -161,11 +161,15 @@ class Runner(Configurable): ) async def run(self) -> None: - if not self.config.config_loaded: - self.console.print("[CRITICAL] Could not load kubernetes configuration. Do you have kubeconfig set up?") + self._greet() + + try: + self.config.load_kubeconfig() + except Exception as e: + self.error(f"Could not load kubernetes configuration: {e}") + self.error("Try to explicitly set --context and/or --kubeconfig flags.") return - self._greet() try: result = await self._collect_result() self._process_result(result) diff --git a/robusta_krr/main.py b/robusta_krr/main.py index fdd654d..68b6855 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -43,11 +43,25 @@ def load_commands() -> None: @app.command(rich_help_panel="Strategies") def {func_name}( ctx: typer.Context, + kubeconfig: Optional[str] = typer.Option( + None, + "--kubeconfig", + "-k", + help="Path to kubeconfig file. If not provided, will attempt to find it.", + rich_help_panel="Kubernetes Settings" + ), clusters: List[str] = typer.Option( None, + "--context", "--cluster", "-c", - help="List of clusters to run on. By default, will run on the current cluster. Use '*' to run on all clusters.", + help="List of clusters to run on. By default, will run on the current cluster. Use --all-clusters to run on all clusters.", + rich_help_panel="Kubernetes Settings" + ), + all_clusters: bool = typer.Option( + False, + "--all-clusters", + help="Run on all clusters. Overrides --context.", rich_help_panel="Kubernetes Settings" ), namespaces: List[str] = typer.Option( @@ -85,7 +99,8 @@ def load_commands() -> None: '''Run KRR using the `{func_name}` strategy''' config = Config( - clusters="*" if "*" in clusters else clusters, + kubeconfig=kubeconfig, + clusters="*" if all_clusters else clusters, namespaces="*" if "*" in namespaces else namespaces, prometheus_url=prometheus_url, prometheus_auth_header=prometheus_auth_header, diff --git a/robusta_krr/utils/configurable.py b/robusta_krr/utils/configurable.py index 4777a9c..a6cb92f 100644 --- a/robusta_krr/utils/configurable.py +++ b/robusta_krr/utils/configurable.py @@ -15,7 +15,7 @@ class Configurable(abc.ABC): def __init__(self, config: Config) -> None: self.config = config - self.console = Console(stderr=self.config.log_to_stderr) + self.console: Console = self.config.console # type: ignore @property def debug_active(self) -> bool: diff --git a/robusta_krr/utils/service_discovery.py b/robusta_krr/utils/service_discovery.py index b718700..8ba58ca 100644 --- a/robusta_krr/utils/service_discovery.py +++ b/robusta_krr/utils/service_discovery.py @@ -1,4 +1,3 @@ -import logging from typing import Optional from cachetools import TTLCache @@ -7,6 +6,7 @@ from kubernetes.client import V1IngressList, V1ServiceList from kubernetes.client.api_client import ApiClient from kubernetes.client.models.v1_ingress import V1Ingress from kubernetes.client.models.v1_service import V1Service +from robusta_krr.core.models.config import Config from robusta_krr.utils.configurable import Configurable @@ -15,12 +15,16 @@ class ServiceDiscovery(Configurable): SERVICE_CACHE_TTL_SEC = 900 cache: TTLCache = TTLCache(maxsize=1, ttl=SERVICE_CACHE_TTL_SEC) - def find_service_url(self, label_selector: str, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def __init__(self, config: Config, api_client: Optional[ApiClient] = None) -> None: + super().__init__(config) + self.api_client = api_client + + def find_service_url(self, label_selector: str) -> Optional[str]: """ Get the url of an in-cluster service with a specific label """ # we do it this way because there is a weird issue with hikaru's ServiceList.listServiceForAllNamespaces() - v1 = client.CoreV1Api(api_client=api_client) + v1 = client.CoreV1Api(api_client=self.api_client) svc_list: V1ServiceList = v1.list_service_for_all_namespaces(label_selector=label_selector) if not svc_list.items: return None @@ -33,19 +37,19 @@ class ServiceDiscovery(Configurable): if self.config.inside_cluster: return f"http://{name}.{namespace}.svc.cluster.local:{port}" - elif api_client is not None: - return f"{api_client.configuration.host}/api/v1/namespaces/{namespace}/services/{name}:{port}/proxy" + elif self.api_client is not None: + return f"{self.api_client.configuration.host}/api/v1/namespaces/{namespace}/services/{name}:{port}/proxy" return None - def find_ingress_host(self, label_selector: str, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_ingress_host(self, label_selector: str) -> Optional[str]: """ Discover the ingress host of the Prometheus if krr is not running in cluster """ if self.config.inside_cluster: return None - v1 = client.NetworkingV1Api(api_client=api_client) + v1 = client.NetworkingV1Api(api_client=self.api_client) ingress_list: V1IngressList = v1.list_ingress_for_all_namespaces(label_selector=label_selector) if not ingress_list.items: return None @@ -54,26 +58,26 @@ class ServiceDiscovery(Configurable): prometheus_host = ingress.spec.rules[0].host return f"http://{prometheus_host}" - def find_url(self, selectors: list[str], *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_url(self, selectors: list[str]) -> Optional[str]: """ Try to autodiscover the url of an in-cluster service """ - cache_key = ",".join(selectors) + cache_key = ",".join(selectors + [self.api_client.configuration.host if self.api_client else ""]) cached_value = self.cache.get(cache_key) if cached_value: return cached_value for label_selector in selectors: - logging.debug(f"Trying to find service with label selector {label_selector}") - service_url = self.find_service_url(label_selector, api_client=api_client) + self.debug(f"Trying to find service with label selector {label_selector}") + service_url = self.find_service_url(label_selector) if service_url: - logging.debug(f"Found service with label selector {label_selector}") + self.debug(f"Found service with label selector {label_selector}") self.cache[cache_key] = service_url return service_url - logging.debug(f"Trying to find ingress with label selector {label_selector}") - self.find_ingress_host(label_selector, api_client=api_client) - ingress_url = self.find_ingress_host(label_selector, api_client=api_client) + self.debug(f"Trying to find ingress with label selector {label_selector}") + self.find_ingress_host(label_selector) + ingress_url = self.find_ingress_host(label_selector) if ingress_url: return ingress_url |
