summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorПавел Жуков <33721692+LeaveMyYard@users.noreply.github.com>2023-05-29 13:40:57 +0300
committerПавел Жуков <33721692+LeaveMyYard@users.noreply.github.com>2023-05-29 13:40:57 +0300
commit29de96b3ccea0f10ed86d7f9015d1bc3b7b90d5a (patch)
treef82def135c74924729ab5ae14e9716df12b3684c
parent110da668bee9f87e0f98a0e41ef32dee4984dddf (diff)
Fix multi-cluster issues
-rw-r--r--robusta_krr/core/integrations/kubernetes.py36
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py16
-rw-r--r--robusta_krr/core/models/config.py40
-rw-r--r--robusta_krr/core/runner.py12
-rw-r--r--robusta_krr/main.py19
-rw-r--r--robusta_krr/utils/configurable.py2
-rw-r--r--robusta_krr/utils/service_discovery.py34
7 files changed, 109 insertions, 50 deletions
diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py
index cbe95f3..37ed97f 100644
--- a/robusta_krr/core/integrations/kubernetes.py
+++ b/robusta_krr/core/integrations/kubernetes.py
@@ -26,7 +26,11 @@ class ClusterLoader(Configurable):
super().__init__(*args, **kwargs)
self.cluster = cluster
- self.api_client = config.new_client_from_config(context=cluster) if cluster is not None else None
+ self.api_client = (
+ config.new_client_from_config(context=cluster, config_file=self.config.kubeconfig)
+ if cluster is not None
+ else None
+ )
self.apps = client.AppsV1Api(api_client=self.api_client)
self.batch = client.BatchV1Api(api_client=self.api_client)
self.core = client.CoreV1Api(api_client=self.api_client)
@@ -185,7 +189,19 @@ class KubernetesLoader(Configurable):
self.debug("Working inside the cluster")
return None
- contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts)
+ try:
+ contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts)
+ except config.ConfigException:
+ if self.config.clusters is not None and self.config.clusters != "*":
+ self.warning("Could not load context from kubeconfig.")
+ self.warning(f"Falling back to clusters from CLI: {self.config.clusters}")
+ return self.config.clusters
+ else:
+ self.error(
+ "Could not load context from kubeconfig. "
+ "Please check your kubeconfig file or pass -c flag with the context name."
+ )
+ return None
self.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
self.debug(f"Current cluster: {current_context['name']}")
@@ -202,6 +218,13 @@ class KubernetesLoader(Configurable):
return [context["name"] for context in contexts if context["name"] in self.config.clusters]
+ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]:
+ try:
+ return ClusterLoader(cluster=cluster, config=self.config)
+ except Exception as e:
+ self.error(f"Could not load cluster {cluster} and will skip it: {e}")
+ return None
+
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
"""List all scannable objects.
@@ -210,9 +233,14 @@ class KubernetesLoader(Configurable):
"""
if clusters is None:
- cluster_loaders = [ClusterLoader(cluster=None, config=self.config)]
+ _cluster_loaders = [self._try_create_cluster_loader(None)]
else:
- cluster_loaders = [ClusterLoader(cluster=cluster, config=self.config) for cluster in clusters]
+ _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters]
+
+ cluster_loaders = [cl for cl in _cluster_loaders if cl is not None]
+ if cluster_loaders == []:
+ self.error("Could not load any cluster.")
+ return []
objects = await asyncio.gather(*[cluster_loader.list_scannable_objects() for cluster_loader in cluster_loaders])
return list(itertools.chain(*objects))
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index 4e50626..3b3fb15 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -4,7 +4,6 @@ from typing import Optional, no_type_check
import requests
from kubernetes import config as k8s_config
-from kubernetes.client import ApiClient
from prometheus_api_client import PrometheusConnect, Retry
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectionError, HTTPError
@@ -20,7 +19,7 @@ from .metrics import BaseMetricLoader
class PrometheusDiscovery(ServiceDiscovery):
- def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
+ def find_prometheus_url(self) -> Optional[str]:
return super().find_url(
selectors=[
"app=kube-prometheus-stack-prometheus",
@@ -32,7 +31,6 @@ class PrometheusDiscovery(ServiceDiscovery):
"app=prometheus-prometheus",
"app.kubernetes.io/name=vmsingle",
],
- api_client=api_client,
)
@@ -69,11 +67,15 @@ class PrometheusLoader(Configurable):
self.auth_header = self.config.prometheus_auth_header
self.ssl_enabled = self.config.prometheus_ssl_enabled
- self.api_client = k8s_config.new_client_from_config(context=cluster) if cluster is not None else None
- self.prometheus_discovery = PrometheusDiscovery(config=self.config)
+ self.api_client = (
+ k8s_config.new_client_from_config(config_file=self.config.kubeconfig, context=cluster)
+ if cluster is not None
+ else None
+ )
+ self.prometheus_discovery = PrometheusDiscovery(config=self.config, api_client=self.api_client)
self.url = self.config.prometheus_url
- self.url = self.url or self.prometheus_discovery.find_prometheus_url(api_client=self.api_client)
+ self.url = self.url or self.prometheus_discovery.find_prometheus_url()
if not self.url:
raise PrometheusNotFound(
@@ -81,6 +83,8 @@ class PrometheusLoader(Configurable):
"\tTry using port-forwarding and/or setting the url manually (using the -p flag.)."
)
+ self.info(f"Using prometheus at {self.url} for cluster {cluster or 'default'}")
+
headers = {}
if self.auth_header:
diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py
index ff147af..c82252c 100644
--- a/robusta_krr/core/models/config.py
+++ b/robusta_krr/core/models/config.py
@@ -1,30 +1,21 @@
from typing import Any, Literal, Optional, Union
import pydantic as pd
+
+from rich.console import Console
from kubernetes import config
from kubernetes.config.config_exception import ConfigException
from robusta_krr.core.abstract.formatters import BaseFormatter
from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy
-try:
- config.load_incluster_config()
-except ConfigException:
- try:
- config.load_kube_config()
- except ConfigException:
- IN_CLUSTER = None
- else:
- IN_CLUSTER = False
-else:
- IN_CLUSTER = True
-
class Config(pd.BaseSettings):
quiet: bool = pd.Field(False)
verbose: bool = pd.Field(False)
clusters: Union[list[str], Literal["*"], None] = None
+ kubeconfig: Optional[str] = None
namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
# Value settings
@@ -43,6 +34,14 @@ class Config(pd.BaseSettings):
other_args: dict[str, Any]
+ # Internal
+ inside_cluster: bool = False
+ console: Optional[Console] = None
+
+ def __init__(self, **kwargs: Any) -> None:
+ super().__init__(**kwargs)
+ self.console = Console(stderr=self.log_to_stderr)
+
@property
def Formatter(self) -> type[BaseFormatter]:
return BaseFormatter.find(self.format)
@@ -70,9 +69,14 @@ class Config(pd.BaseSettings):
return v
@property
- def config_loaded(self) -> bool:
- return IN_CLUSTER is not None
-
- @property
- def inside_cluster(self) -> bool:
- return bool(IN_CLUSTER)
+ def context(self) -> Optional[str]:
+ return self.clusters[0] if self.clusters != "*" and self.clusters else None
+
+ def load_kubeconfig(self) -> None:
+ try:
+ config.load_incluster_config()
+ except ConfigException:
+ config.load_kube_config(config_file=self.kubeconfig, context=self.context)
+ self.inside_cluster = False
+ else:
+ self.inside_cluster = True
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 52632cf..0b31c7c 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -35,7 +35,7 @@ class Runner(Configurable):
if isinstance(result, self.EXPECTED_EXCEPTIONS):
if result not in self._prometheus_loaders_error_logged:
self._prometheus_loaders_error_logged.add(result)
- self.error(result)
+ self.error(str(result))
return None
elif isinstance(result, Exception):
raise result
@@ -161,11 +161,15 @@ class Runner(Configurable):
)
async def run(self) -> None:
- if not self.config.config_loaded:
- self.console.print("[CRITICAL] Could not load kubernetes configuration. Do you have kubeconfig set up?")
+ self._greet()
+
+ try:
+ self.config.load_kubeconfig()
+ except Exception as e:
+ self.error(f"Could not load kubernetes configuration: {e}")
+ self.error("Try to explicitly set --context and/or --kubeconfig flags.")
return
- self._greet()
try:
result = await self._collect_result()
self._process_result(result)
diff --git a/robusta_krr/main.py b/robusta_krr/main.py
index fdd654d..68b6855 100644
--- a/robusta_krr/main.py
+++ b/robusta_krr/main.py
@@ -43,11 +43,25 @@ def load_commands() -> None:
@app.command(rich_help_panel="Strategies")
def {func_name}(
ctx: typer.Context,
+ kubeconfig: Optional[str] = typer.Option(
+ None,
+ "--kubeconfig",
+ "-k",
+ help="Path to kubeconfig file. If not provided, will attempt to find it.",
+ rich_help_panel="Kubernetes Settings"
+ ),
clusters: List[str] = typer.Option(
None,
+ "--context",
"--cluster",
"-c",
- help="List of clusters to run on. By default, will run on the current cluster. Use '*' to run on all clusters.",
+ help="List of clusters to run on. By default, will run on the current cluster. Use --all-clusters to run on all clusters.",
+ rich_help_panel="Kubernetes Settings"
+ ),
+ all_clusters: bool = typer.Option(
+ False,
+ "--all-clusters",
+ help="Run on all clusters. Overrides --context.",
rich_help_panel="Kubernetes Settings"
),
namespaces: List[str] = typer.Option(
@@ -85,7 +99,8 @@ def load_commands() -> None:
'''Run KRR using the `{func_name}` strategy'''
config = Config(
- clusters="*" if "*" in clusters else clusters,
+ kubeconfig=kubeconfig,
+ clusters="*" if all_clusters else clusters,
namespaces="*" if "*" in namespaces else namespaces,
prometheus_url=prometheus_url,
prometheus_auth_header=prometheus_auth_header,
diff --git a/robusta_krr/utils/configurable.py b/robusta_krr/utils/configurable.py
index 4777a9c..a6cb92f 100644
--- a/robusta_krr/utils/configurable.py
+++ b/robusta_krr/utils/configurable.py
@@ -15,7 +15,7 @@ class Configurable(abc.ABC):
def __init__(self, config: Config) -> None:
self.config = config
- self.console = Console(stderr=self.config.log_to_stderr)
+ self.console: Console = self.config.console # type: ignore
@property
def debug_active(self) -> bool:
diff --git a/robusta_krr/utils/service_discovery.py b/robusta_krr/utils/service_discovery.py
index b718700..8ba58ca 100644
--- a/robusta_krr/utils/service_discovery.py
+++ b/robusta_krr/utils/service_discovery.py
@@ -1,4 +1,3 @@
-import logging
from typing import Optional
from cachetools import TTLCache
@@ -7,6 +6,7 @@ from kubernetes.client import V1IngressList, V1ServiceList
from kubernetes.client.api_client import ApiClient
from kubernetes.client.models.v1_ingress import V1Ingress
from kubernetes.client.models.v1_service import V1Service
+from robusta_krr.core.models.config import Config
from robusta_krr.utils.configurable import Configurable
@@ -15,12 +15,16 @@ class ServiceDiscovery(Configurable):
SERVICE_CACHE_TTL_SEC = 900
cache: TTLCache = TTLCache(maxsize=1, ttl=SERVICE_CACHE_TTL_SEC)
- def find_service_url(self, label_selector: str, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
+ def __init__(self, config: Config, api_client: Optional[ApiClient] = None) -> None:
+ super().__init__(config)
+ self.api_client = api_client
+
+ def find_service_url(self, label_selector: str) -> Optional[str]:
"""
Get the url of an in-cluster service with a specific label
"""
# we do it this way because there is a weird issue with hikaru's ServiceList.listServiceForAllNamespaces()
- v1 = client.CoreV1Api(api_client=api_client)
+ v1 = client.CoreV1Api(api_client=self.api_client)
svc_list: V1ServiceList = v1.list_service_for_all_namespaces(label_selector=label_selector)
if not svc_list.items:
return None
@@ -33,19 +37,19 @@ class ServiceDiscovery(Configurable):
if self.config.inside_cluster:
return f"http://{name}.{namespace}.svc.cluster.local:{port}"
- elif api_client is not None:
- return f"{api_client.configuration.host}/api/v1/namespaces/{namespace}/services/{name}:{port}/proxy"
+ elif self.api_client is not None:
+ return f"{self.api_client.configuration.host}/api/v1/namespaces/{namespace}/services/{name}:{port}/proxy"
return None
- def find_ingress_host(self, label_selector: str, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
+ def find_ingress_host(self, label_selector: str) -> Optional[str]:
"""
Discover the ingress host of the Prometheus if krr is not running in cluster
"""
if self.config.inside_cluster:
return None
- v1 = client.NetworkingV1Api(api_client=api_client)
+ v1 = client.NetworkingV1Api(api_client=self.api_client)
ingress_list: V1IngressList = v1.list_ingress_for_all_namespaces(label_selector=label_selector)
if not ingress_list.items:
return None
@@ -54,26 +58,26 @@ class ServiceDiscovery(Configurable):
prometheus_host = ingress.spec.rules[0].host
return f"http://{prometheus_host}"
- def find_url(self, selectors: list[str], *, api_client: Optional[ApiClient] = None) -> Optional[str]:
+ def find_url(self, selectors: list[str]) -> Optional[str]:
"""
Try to autodiscover the url of an in-cluster service
"""
- cache_key = ",".join(selectors)
+ cache_key = ",".join(selectors + [self.api_client.configuration.host if self.api_client else ""])
cached_value = self.cache.get(cache_key)
if cached_value:
return cached_value
for label_selector in selectors:
- logging.debug(f"Trying to find service with label selector {label_selector}")
- service_url = self.find_service_url(label_selector, api_client=api_client)
+ self.debug(f"Trying to find service with label selector {label_selector}")
+ service_url = self.find_service_url(label_selector)
if service_url:
- logging.debug(f"Found service with label selector {label_selector}")
+ self.debug(f"Found service with label selector {label_selector}")
self.cache[cache_key] = service_url
return service_url
- logging.debug(f"Trying to find ingress with label selector {label_selector}")
- self.find_ingress_host(label_selector, api_client=api_client)
- ingress_url = self.find_ingress_host(label_selector, api_client=api_client)
+ self.debug(f"Trying to find ingress with label selector {label_selector}")
+ self.find_ingress_host(label_selector)
+ ingress_url = self.find_ingress_host(label_selector)
if ingress_url:
return ingress_url