summaryrefslogtreecommitdiff
path: root/robusta_krr/core
diff options
context:
space:
mode:
authorPavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com>2024-03-26 11:54:14 +0200
committerGitHub <noreply@github.com>2024-03-26 11:54:14 +0200
commit4e450f3ecf7a7df161956fe1574392e9c406054c (patch)
tree7cc088fbd8e03edb21fd041e8ee6e6d9ed7f9db8 /robusta_krr/core
parentfaa69edc547d36bfa7ad4edf5f5ea79906cb90de (diff)
Improve limited permissions (cherry-picked from #220) (#238)
* Add --as option to impersonate a specific user * Update test case * Don't exit if the user lacks permissions to auto-discover prometheus * Add a comment * Add support for HPA w/o cluster-level permissions * feat: cli option for --as-group (#224) * feat: cli option for --as-group * add: as-group example * Improve a message in case of API error * Return the debug log with found items in cluster --------- Co-authored-by: Robusta Runner <aantny@gmail.com> Co-authored-by: Rohan Katkar <rohan.katkar@dnv.com> Co-authored-by: LeaveMyYard <zhukovpave2001@gmail.com>
Diffstat (limited to 'robusta_krr/core')
-rw-r--r--robusta_krr/core/integrations/kubernetes/__init__.py147
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py14
-rw-r--r--robusta_krr/core/models/config.py11
3 files changed, 95 insertions, 77 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py
index c7976cf..335b47a 100644
--- a/robusta_krr/core/integrations/kubernetes/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/__init__.py
@@ -10,13 +10,11 @@ from kubernetes.client.models import (
V1Container,
V1DaemonSet,
V1Deployment,
- V1HorizontalPodAutoscalerList,
V1Job,
V1Pod,
V1PodList,
V1StatefulSet,
V2HorizontalPodAutoscaler,
- V2HorizontalPodAutoscalerList,
)
from robusta_krr.core.models.config import settings
@@ -34,15 +32,11 @@ HPAKey = tuple[str, str, str]
class ClusterLoader:
- def __init__(self, cluster: Optional[str]):
+ def __init__(self, cluster: Optional[str]=None):
self.cluster = cluster
# This executor will be running requests to Kubernetes API
self.executor = ThreadPoolExecutor(settings.max_workers)
- self.api_client = (
- config.new_client_from_config(context=cluster, config_file=settings.kubeconfig)
- if cluster is not None
- else None
- )
+ self.api_client = settings.get_kube_client(cluster)
self.apps = client.AppsV1Api(api_client=self.api_client)
self.custom_objects = client.CustomObjectsApi(api_client=self.api_client)
self.batch = client.BatchV1Api(api_client=self.api_client)
@@ -162,7 +156,7 @@ class ClusterLoader:
return ",".join(label_filters)
- def __build_obj(
+ def __build_scannable_object(
self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None
) -> K8sObjectData:
name = item.metadata.name
@@ -186,7 +180,48 @@ class ClusterLoader:
return True
return resource in settings.resources
- async def _list_workflows(
+ async def _list_namespaced_or_global_objects(
+ self,
+ kind: KindLiteral,
+ all_namespaces_request: Callable,
+ namespaced_request: Callable
+ ) -> AsyncIterable[Any]:
+ logger.debug(f"Listing {kind}s in {self.cluster}")
+ loop = asyncio.get_running_loop()
+
+ if settings.namespaces == "*":
+ tasks = [
+ loop.run_in_executor(
+ self.executor,
+ lambda: all_namespaces_request(
+ watch=False,
+ label_selector=settings.selector,
+ ),
+ )
+ ]
+ else:
+ tasks = [
+ loop.run_in_executor(
+ self.executor,
+ lambda ns=namespace: namespaced_request(
+ namespace=ns,
+ watch=False,
+ label_selector=settings.selector,
+ ),
+ )
+ for namespace in settings.namespaces
+ ]
+
+ total_items = 0
+ for task in asyncio.as_completed(tasks):
+ ret_single = await task
+ total_items += len(ret_single.items)
+ for item in ret_single.items:
+ yield item
+
+ logger.debug(f"Found {total_items} {kind} in {self.cluster}")
+
+ async def _list_scannable_objects(
self,
kind: KindLiteral,
all_namespaces_request: Callable,
@@ -201,49 +236,17 @@ class ClusterLoader:
if not self.__kind_available[kind]:
return
- logger.debug(f"Listing {kind}s in {self.cluster}")
- loop = asyncio.get_running_loop()
-
try:
- if settings.namespaces == "*":
- tasks = [
- loop.run_in_executor(
- self.executor,
- lambda: all_namespaces_request(
- watch=False,
- label_selector=settings.selector,
- ),
- )
- ]
- else:
- tasks = [
- loop.run_in_executor(
- self.executor,
- lambda ns=namespace: namespaced_request(
- namespace=ns,
- watch=False,
- label_selector=settings.selector,
- ),
- )
- for namespace in settings.namespaces
- ]
-
- total_items = 0
- for task in asyncio.as_completed(tasks):
- ret_single = await task
- total_items += len(ret_single.items)
- for item in ret_single.items:
- if filter_workflows is not None and not filter_workflows(item):
- continue
-
- containers = extract_containers(item)
- if asyncio.iscoroutine(containers):
- containers = await containers
-
- for container in containers:
- yield self.__build_obj(item, container, kind)
-
- logger.debug(f"Found {total_items} {kind} in {self.cluster}")
+ async for item in self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request):
+ if filter_workflows is not None and not filter_workflows(item):
+ continue
+
+ containers = extract_containers(item)
+ if asyncio.iscoroutine(containers):
+ containers = await containers
+
+ for container in containers:
+ yield self.__build_scannable_object(item, container, kind)
except ApiException as e:
if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]:
if self.__kind_available[kind]:
@@ -254,7 +257,7 @@ class ClusterLoader:
logger.error("Will skip this object type and continue.")
def _list_deployments(self) -> AsyncIterable[K8sObjectData]:
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="Deployment",
all_namespaces_request=self.apps.list_deployment_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_deployment,
@@ -287,7 +290,7 @@ class ClusterLoader:
# NOTE: Using custom objects API returns dicts, but all other APIs return objects
# We need to handle this difference using a small wrapper
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="Rollout",
all_namespaces_request=lambda **kwargs: ObjectLikeDict(
self.custom_objects.list_cluster_custom_object(
@@ -311,7 +314,7 @@ class ClusterLoader:
def _list_deploymentconfig(self) -> AsyncIterable[K8sObjectData]:
# NOTE: Using custom objects API returns dicts, but all other APIs return objects
# We need to handle this difference using a small wrapper
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="DeploymentConfig",
all_namespaces_request=lambda **kwargs: ObjectLikeDict(
self.custom_objects.list_cluster_custom_object(
@@ -333,7 +336,7 @@ class ClusterLoader:
)
def _list_all_statefulsets(self) -> AsyncIterable[K8sObjectData]:
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="StatefulSet",
all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_stateful_set,
@@ -341,7 +344,7 @@ class ClusterLoader:
)
def _list_all_daemon_set(self) -> AsyncIterable[K8sObjectData]:
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="DaemonSet",
all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_daemon_set,
@@ -349,7 +352,7 @@ class ClusterLoader:
)
def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]:
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
@@ -361,7 +364,7 @@ class ClusterLoader:
)
def _list_all_cronjobs(self) -> AsyncIterable[K8sObjectData]:
- return self._list_workflows(
+ return self._list_scannable_objects(
kind="CronJob",
all_namespaces_request=self.batch.list_cron_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_cron_job,
@@ -370,11 +373,14 @@ class ClusterLoader:
async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
-
- res: V1HorizontalPodAutoscalerList = await loop.run_in_executor(
- self.executor, lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False)
+ res = await loop.run_in_executor(
+ self.executor,
+ lambda: self._list_namespaced_or_global_objects(
+ kind="HPA-v1",
+ all_namespaces_request=self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces,
+ namespaced_request=self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler,
+ ),
)
-
return {
(
hpa.metadata.namespace,
@@ -388,17 +394,19 @@ class ClusterLoader:
target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage,
target_memory_utilization_percentage=None,
)
- for hpa in res.items
+ async for hpa in res
}
async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
-
- res: V2HorizontalPodAutoscalerList = await loop.run_in_executor(
+ res = await loop.run_in_executor(
self.executor,
- lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False),
+ lambda: self._list_namespaced_or_global_objects(
+ kind="HPA-v2",
+ all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces,
+ namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler,
+ ),
)
-
def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]:
return next(
(
@@ -408,7 +416,6 @@ class ClusterLoader:
),
None,
)
-
return {
(
hpa.metadata.namespace,
@@ -422,7 +429,7 @@ class ClusterLoader:
target_cpu_utilization_percentage=__get_metric(hpa, "cpu"),
target_memory_utilization_percentage=__get_metric(hpa, "memory"),
)
- for hpa in res.items
+ async for hpa in res
}
# TODO: What should we do in case of other metrics bound to the HPA?
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index 5593d69..b9b600f 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Optional
from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient
+from kubernetes.client.exceptions import ApiException
from prometrix import MetricsNotFound, PrometheusNotFound
from robusta_krr.core.models.config import settings
@@ -38,13 +39,7 @@ class PrometheusMetricsLoader:
"""
self.executor = ThreadPoolExecutor(settings.max_workers)
- logger.info(f"Prometheus loader max workers: {settings.max_workers}")
-
- self.api_client = (
- k8s_config.new_client_from_config(config_file=settings.kubeconfig, context=cluster)
- if cluster is not None
- else None
- )
+ self.api_client = settings.get_kube_client(context=cluster)
loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster)
if loader is None:
raise PrometheusNotFound("No Prometheus or metrics service found")
@@ -67,6 +62,11 @@ class PrometheusMetricsLoader:
return loader
except MetricsNotFound as e:
logger.info(f"{service_name} not found: {e}")
+ except ApiException as e:
+ logger.warning(
+ f"Unable to automatically discover a {service_name} in the cluster ({e}). "
+ "Try specifying how to connect to Prometheus via cli options"
+ )
return None
diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py
index 36787d4..ac3b15c 100644
--- a/robusta_krr/core/models/config.py
+++ b/robusta_krr/core/models/config.py
@@ -23,6 +23,8 @@ class Config(pd.BaseSettings):
clusters: Union[list[str], Literal["*"], None] = None
kubeconfig: Optional[str] = None
+ impersonate_user: Optional[str] = None
+ impersonate_group: Optional[str] = None
namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
resources: Union[list[KindLiteral], Literal["*"]] = pd.Field("*")
selector: Optional[str] = None
@@ -141,6 +143,15 @@ class Config(pd.BaseSettings):
else:
self.inside_cluster = True
+ def get_kube_client(self, context: Optional[str] = None):
+ api_client = config.new_client_from_config(context=context, config_file=self.kubeconfig)
+ if self.impersonate_user is not None:
+ # trick copied from https://github.com/kubernetes-client/python/issues/362
+ api_client.set_default_header("Impersonate-User", self.impersonate_user)
+ if self.impersonate_group is not None:
+ api_client.set_default_header("Impersonate-Group", self.impersonate_group)
+ return api_client
+
@staticmethod
def set_config(config: Config) -> None:
global _config