From 4fedd82caa36d062f7a25b38cc9066e9281a3a21 Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Mon, 22 Apr 2024 14:30:11 +0300 Subject: Add multiple workload loaders, refactor kubeapi workload loader --- examples/custom_strategy.py | 4 +- robusta_krr/api/models.py | 4 +- robusta_krr/core/abstract/metrics.py | 4 +- robusta_krr/core/abstract/strategies.py | 6 +- .../core/integrations/kubernetes/__init__.py | 463 +-------------------- .../kubernetes/workload_loader/__init__.py | 5 + .../kubernetes/workload_loader/base.py | 16 + .../workload_loader/kube_api/__init__.py | 273 ++++++++++++ .../workload_loader/kube_api/loaders/__init__.py | 19 + .../workload_loader/kube_api/loaders/base.py | 127 ++++++ .../workload_loader/kube_api/loaders/cronjobs.py | 68 +++ .../workload_loader/kube_api/loaders/daemonsets.py | 13 + .../kube_api/loaders/deploymentconfigs.py | 38 ++ .../kube_api/loaders/deployments.py | 13 + .../workload_loader/kube_api/loaders/jobs.py | 18 + .../workload_loader/kube_api/loaders/rollouts.py | 61 +++ .../kube_api/loaders/statefulsets.py | 13 + .../kubernetes/workload_loader/prometheus.py | 10 + robusta_krr/core/integrations/prometheus/loader.py | 6 +- .../core/integrations/prometheus/metrics/base.py | 6 +- .../core/integrations/prometheus/metrics/cpu.py | 8 +- .../core/integrations/prometheus/metrics/memory.py | 8 +- .../metrics_service/base_metric_service.py | 4 +- .../metrics_service/prometheus_metrics_service.py | 6 +- robusta_krr/core/models/config.py | 1 + robusta_krr/core/models/objects.py | 6 +- robusta_krr/core/models/result.py | 6 +- robusta_krr/core/runner.py | 12 +- robusta_krr/strategies/simple.py | 8 +- tests/conftest.py | 4 +- 30 files changed, 739 insertions(+), 491 deletions(-) create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/base.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py create mode 100644 robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py diff --git a/examples/custom_strategy.py b/examples/custom_strategy.py index f9e281d..05735e5 100644 --- a/examples/custom_strategy.py +++ b/examples/custom_strategy.py @@ -3,7 +3,7 @@ import pydantic as pd import robusta_krr -from robusta_krr.api.models import K8sObjectData, MetricsPodData, ResourceRecommendation, ResourceType, RunResult +from robusta_krr.api.models import K8sWorkload, MetricsPodData, ResourceRecommendation, ResourceType, RunResult from robusta_krr.api.strategies import BaseStrategy, StrategySettings from robusta_krr.core.integrations.prometheus.metrics import MaxMemoryLoader, PercentileCPULoader @@ -24,7 +24,7 @@ class CustomStrategy(BaseStrategy[CustomStrategySettings]): rich_console = True # Whether to use rich console for the CLI metrics = [PercentileCPULoader(90), MaxMemoryLoader] # The metrics to use for the strategy - def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult: + def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult: return { ResourceType.CPU: ResourceRecommendation(request=self.settings.param_1, limit=None), ResourceType.Memory: ResourceRecommendation(request=self.settings.param_2, limit=self.settings.param_2), diff --git a/robusta_krr/api/models.py b/robusta_krr/api/models.py index 3a453ce..e61f01e 100644 --- a/robusta_krr/api/models.py +++ b/robusta_krr/api/models.py @@ -1,6 +1,6 @@ from robusta_krr.core.abstract.strategies import MetricsPodData, PodsTimeData, ResourceRecommendation, RunResult from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType -from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.objects import K8sWorkload, PodData from robusta_krr.core.models.result import ResourceScan, Result from robusta_krr.core.models.severity import Severity, register_severity_calculator @@ -8,7 +8,7 @@ __all__ = [ "ResourceType", "ResourceAllocations", "RecommendationValue", - "K8sObjectData", + "K8sWorkload", "PodData", "Result", "Severity", diff --git a/robusta_krr/core/abstract/metrics.py b/robusta_krr/core/abstract/metrics.py index 3b6f19c..e1151ea 100644 --- a/robusta_krr/core/abstract/metrics.py +++ b/robusta_krr/core/abstract/metrics.py @@ -2,7 +2,7 @@ import datetime from abc import ABC, abstractmethod from robusta_krr.core.abstract.strategies import PodsTimeData -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload class BaseMetric(ABC): @@ -16,6 +16,6 @@ class BaseMetric(ABC): @abstractmethod async def load_data( - self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + self, object: K8sWorkload, period: datetime.timedelta, step: datetime.timedelta ) -> PodsTimeData: ... diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index 5b6521e..ab5ab6c 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -9,7 +9,7 @@ import numpy as np import pydantic as pd from numpy.typing import NDArray -from robusta_krr.core.models.result import K8sObjectData, ResourceType +from robusta_krr.core.models.result import K8sWorkload, ResourceType if TYPE_CHECKING: from robusta_krr.core.abstract.metrics import BaseMetric # noqa: F401 @@ -133,7 +133,7 @@ class BaseStrategy(abc.ABC, Generic[_StrategySettings]): # Abstract method that needs to be implemented by subclass. # This method is intended to calculate resource recommendation based on history data and kubernetes object data. @abc.abstractmethod - def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult: + def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult: pass # This method is intended to return a strategy by its name. @@ -167,6 +167,6 @@ __all__ = [ "StrategySettings", "PodsTimeData", "MetricsPodData", - "K8sObjectData", + "K8sWorkload", "ResourceType", ] diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index a772a5c..051cc23 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -18,11 +18,12 @@ from kubernetes.client.models import ( ) from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData +from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.object_like_dict import ObjectLikeDict from . import config_patch as _ +from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader logger = logging.getLogger("krr") @@ -30,441 +31,7 @@ AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, HPAKey = tuple[str, str, str] -class ClusterLoader: - def __init__(self, cluster: Optional[str]=None): - self.cluster = cluster - # This executor will be running requests to Kubernetes API - self.executor = ThreadPoolExecutor(settings.max_workers) - self.api_client = settings.get_kube_client(cluster) - self.apps = client.AppsV1Api(api_client=self.api_client) - self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) - self.batch = client.BatchV1Api(api_client=self.api_client) - self.core = client.CoreV1Api(api_client=self.api_client) - self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) - self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) - - self.__kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) - - self.__jobs_for_cronjobs: dict[str, list[V1Job]] = {} - self.__jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) - - async def list_scannable_objects(self) -> list[K8sObjectData]: - """List all scannable objects. - - Returns: - A list of scannable objects. - """ - - logger.info(f"Listing scannable objects in {self.cluster}") - logger.debug(f"Namespaces: {settings.namespaces}") - logger.debug(f"Resources: {settings.resources}") - - self.__hpa_list = await self._try_list_hpa() - workload_object_lists = await asyncio.gather( - self._list_deployments(), - self._list_rollouts(), - self._list_deploymentconfig(), - self._list_all_statefulsets(), - self._list_all_daemon_set(), - self._list_all_jobs(), - self._list_all_cronjobs(), - ) - - return [ - object - for workload_objects in workload_object_lists - for object in workload_objects - # NOTE: By default we will filter out kube-system namespace - if not (settings.namespaces == "*" and object.namespace == "kube-system") - ] - - async def _list_jobs_for_cronjobs(self, namespace: str) -> list[V1Job]: - if namespace not in self.__jobs_for_cronjobs: - loop = asyncio.get_running_loop() - - async with self.__jobs_loading_locks[namespace]: - logging.debug(f"Loading jobs for cronjobs in {namespace}") - ret = await loop.run_in_executor( - self.executor, - lambda: self.batch.list_namespaced_job(namespace=namespace), - ) - self.__jobs_for_cronjobs[namespace] = ret.items - - return self.__jobs_for_cronjobs[namespace] - - async def list_pods(self, object: K8sObjectData) -> list[PodData]: - loop = asyncio.get_running_loop() - - if object.kind == "CronJob": - namespace_jobs = await self._list_jobs_for_cronjobs(object.namespace) - ownered_jobs_uids = [ - job.metadata.uid - for job in namespace_jobs - if any( - owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid - for owner in job.metadata.owner_references or [] - ) - ] - selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" - - else: - if object.selector is None: - return [] - - selector = self._build_selector_query(object.selector) - if selector is None: - return [] - - ret: V1PodList = await loop.run_in_executor( - self.executor, - lambda: self.core.list_namespaced_pod( - namespace=object._api_resource.metadata.namespace, label_selector=selector - ), - ) - - return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] - - @staticmethod - def _get_match_expression_filter(expression) -> str: - if expression.operator.lower() == "exists": - return expression.key - elif expression.operator.lower() == "doesnotexist": - return f"!{expression.key}" - - values = ",".join(expression.values) - return f"{expression.key} {expression.operator} ({values})" - - @staticmethod - def _build_selector_query(selector: Any) -> Union[str, None]: - label_filters = [] - - if selector.match_labels is not None: - label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] - - if selector.match_expressions is not None: - label_filters += [ - ClusterLoader._get_match_expression_filter(expression) for expression in selector.match_expressions - ] - - if label_filters == []: - # NOTE: This might mean that we have DeploymentConfig, - # which uses ReplicationController and it has a dict like matchLabels - if len(selector) != 0: - label_filters += [f"{label[0]}={label[1]}" for label in selector.items()] - else: - return None - - return ",".join(label_filters) - - def __build_scannable_object( - self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None - ) -> K8sObjectData: - name = item.metadata.name - namespace = item.metadata.namespace - kind = kind or item.__class__.__name__[2:] - - obj = K8sObjectData( - cluster=self.cluster, - namespace=namespace, - name=name, - kind=kind, - container=container.name, - allocations=ResourceAllocations.from_container(container), - hpa=self.__hpa_list.get((namespace, kind, name)), - ) - obj._api_resource = item - return obj - - def _should_list_resource(self, resource: str) -> bool: - if settings.resources == "*": - return True - return resource in settings.resources - - async def _list_namespaced_or_global_objects( - self, - kind: KindLiteral, - all_namespaces_request: Callable, - namespaced_request: Callable - ) -> list[Any]: - logger.debug(f"Listing {kind}s in {self.cluster}") - loop = asyncio.get_running_loop() - - if settings.namespaces == "*": - requests = [ - loop.run_in_executor( - self.executor, - lambda: all_namespaces_request( - watch=False, - label_selector=settings.selector, - ), - ) - ] - else: - requests = [ - loop.run_in_executor( - self.executor, - lambda ns=namespace: namespaced_request( - namespace=ns, - watch=False, - label_selector=settings.selector, - ), - ) - for namespace in settings.namespaces - ] - - result = [ - item - for request_result in await asyncio.gather(*requests) - for item in request_result.items - ] - - logger.debug(f"Found {len(result)} {kind} in {self.cluster}") - return result - - async def _list_scannable_objects( - self, - kind: KindLiteral, - all_namespaces_request: Callable, - namespaced_request: Callable, - extract_containers: Callable[[Any], Union[Iterable[V1Container], Awaitable[Iterable[V1Container]]]], - filter_workflows: Optional[Callable[[Any], bool]] = None, - ) -> list[K8sObjectData]: - if not self._should_list_resource(kind): - logger.debug(f"Skipping {kind}s in {self.cluster}") - return - - if not self.__kind_available[kind]: - return - - result = [] - try: - for item in await self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request): - if filter_workflows is not None and not filter_workflows(item): - continue - - containers = extract_containers(item) - if asyncio.iscoroutine(containers): - containers = await containers - - result.extend(self.__build_scannable_object(item, container, kind) for container in containers) - except ApiException as e: - if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: - if self.__kind_available[kind]: - logger.debug(f"{kind} API not available in {self.cluster}") - self.__kind_available[kind] = False - else: - logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") - logger.error("Will skip this object type and continue.") - - return result - - def _list_deployments(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="Deployment", - all_namespaces_request=self.apps.list_deployment_for_all_namespaces, - namespaced_request=self.apps.list_namespaced_deployment, - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_rollouts(self) -> list[K8sObjectData]: - async def _extract_containers(item: Any) -> list[V1Container]: - if item.spec.template is not None: - return item.spec.template.spec.containers - - loop = asyncio.get_running_loop() - - logging.debug( - f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}" - ) - - # Template can be None and object might have workloadRef - workloadRef = item.spec.workloadRef - if workloadRef is not None: - ret = await loop.run_in_executor( - self.executor, - lambda: self.apps.read_namespaced_deployment( - namespace=item.metadata.namespace, name=workloadRef.name - ), - ) - return ret.spec.template.spec.containers - - return [] - - # NOTE: Using custom objects API returns dicts, but all other APIs return objects - # We need to handle this difference using a small wrapper - return self._list_scannable_objects( - kind="Rollout", - all_namespaces_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_cluster_custom_object( - group="argoproj.io", - version="v1alpha1", - plural="rollouts", - **kwargs, - ) - ), - namespaced_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_namespaced_custom_object( - group="argoproj.io", - version="v1alpha1", - plural="rollouts", - **kwargs, - ) - ), - extract_containers=_extract_containers, - ) - - def _list_deploymentconfig(self) -> list[K8sObjectData]: - # NOTE: Using custom objects API returns dicts, but all other APIs return objects - # We need to handle this difference using a small wrapper - return self._list_scannable_objects( - kind="DeploymentConfig", - all_namespaces_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_cluster_custom_object( - group="apps.openshift.io", - version="v1", - plural="deploymentconfigs", - **kwargs, - ) - ), - namespaced_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_namespaced_custom_object( - group="apps.openshift.io", - version="v1", - plural="deploymentconfigs", - **kwargs, - ) - ), - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_all_statefulsets(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="StatefulSet", - all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces, - namespaced_request=self.apps.list_namespaced_stateful_set, - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_all_daemon_set(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="DaemonSet", - all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces, - namespaced_request=self.apps.list_namespaced_daemon_set, - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_all_jobs(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="Job", - all_namespaces_request=self.batch.list_job_for_all_namespaces, - namespaced_request=self.batch.list_namespaced_job, - extract_containers=lambda item: item.spec.template.spec.containers, - # NOTE: If the job has ownerReference and it is a CronJob, then we should skip it - filter_workflows=lambda item: not any( - owner.kind == "CronJob" for owner in item.metadata.owner_references or [] - ), - ) - - def _list_all_cronjobs(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="CronJob", - all_namespaces_request=self.batch.list_cron_job_for_all_namespaces, - namespaced_request=self.batch.list_namespaced_cron_job, - extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers, - ) - - async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: - loop = asyncio.get_running_loop() - res = await loop.run_in_executor( - self.executor, - lambda: self._list_namespaced_or_global_objects( - kind="HPA-v1", - all_namespaces_request=self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces, - namespaced_request=self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler, - ), - ) - return { - ( - hpa.metadata.namespace, - hpa.spec.scale_target_ref.kind, - hpa.spec.scale_target_ref.name, - ): HPAData( - min_replicas=hpa.spec.min_replicas, - max_replicas=hpa.spec.max_replicas, - current_replicas=hpa.status.current_replicas, - desired_replicas=hpa.status.desired_replicas, - target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, - target_memory_utilization_percentage=None, - ) - async for hpa in res - } - - async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: - res = await self._list_namespaced_or_global_objects( - kind="HPA-v2", - all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces, - namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler, - ) - def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: - return next( - ( - metric.resource.target.average_utilization - for metric in hpa.spec.metrics - if metric.type == "Resource" and metric.resource.name == metric_name - ), - None, - ) - return { - ( - hpa.metadata.namespace, - hpa.spec.scale_target_ref.kind, - hpa.spec.scale_target_ref.name, - ): HPAData( - min_replicas=hpa.spec.min_replicas, - max_replicas=hpa.spec.max_replicas, - current_replicas=hpa.status.current_replicas, - desired_replicas=hpa.status.desired_replicas, - target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), - target_memory_utilization_percentage=__get_metric(hpa, "memory"), - ) - for hpa in res - } - - # TODO: What should we do in case of other metrics bound to the HPA? - async def __list_hpa(self) -> dict[HPAKey, HPAData]: - """List all HPA objects in the cluster. - - Returns: - dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name). - """ - - try: - # Try to use V2 API first - return await self.__list_hpa_v2() - except ApiException as e: - if e.status != 404: - # If the error is other than not found, then re-raise it. - raise - - # If V2 API does not exist, fall back to V1 - return await self.__list_hpa_v1() - - async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: - try: - return await self.__list_hpa() - except Exception as e: - logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") - logger.error( - "Will assume that there are no HPA. " - "Be careful as this may lead to inaccurate results if object actually has HPA." - ) - return {} - - -class KubernetesLoader: - def __init__(self) -> None: - self._cluster_loaders: dict[Optional[str], ClusterLoader] = {} - +class ClusterWorkloadsLoader: async def list_clusters(self) -> Optional[list[str]]: """List all clusters. @@ -505,36 +72,42 @@ class KubernetesLoader: return [context["name"] for context in contexts if context["name"] in settings.clusters] - def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]: + def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: + WorkloadLoader = KubeAPIWorkloadLoader if settings.workload_loader == "kubeapi" else PrometheusWorkloadLoader + try: - return ClusterLoader(cluster=cluster) + return WorkloadLoader(cluster=cluster) except Exception as e: logger.error(f"Could not load cluster {cluster} and will skip it: {e}") return None - async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: + async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]: """List all scannable objects. - Yields: - Each scannable object as it is loaded. + Returns: + A list of all loaded objects. """ + if clusters is None: _cluster_loaders = [self._try_create_cluster_loader(None)] else: _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters] - self.cluster_loaders = {cl.cluster: cl for cl in _cluster_loaders if cl is not None} + self.cluster_loaders: dict[Optional[str], BaseWorkloadLoader] = { + cl.cluster: cl for cl in _cluster_loaders if cl is not None + } + if self.cluster_loaders == {}: logger.error("Could not load any cluster.") return - + return [ object for cluster_loader in self.cluster_loaders.values() - for object in await cluster_loader.list_scannable_objects() + for object in await cluster_loader.list_workloads() ] - async def load_pods(self, object: K8sObjectData) -> list[PodData]: + async def load_pods(self, object: K8sWorkload) -> list[PodData]: try: cluster_loader = self.cluster_loaders[object.cluster] except KeyError: diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py new file mode 100644 index 0000000..9543147 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py @@ -0,0 +1,5 @@ +from .base import BaseWorkloadLoader +from .kube_api import KubeAPIWorkloadLoader +from .prometheus import PrometheusWorkloadLoader + +__all__ = ["BaseWorkloadLoader", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py new file mode 100644 index 0000000..a7bba21 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py @@ -0,0 +1,16 @@ +import abc +from typing import Optional +from robusta_krr.core.models.objects import K8sWorkload, PodData + + +class BaseWorkloadLoader(abc.ABC): + def __init__(self, cluster: Optional[str] = None) -> None: + self.cluster = cluster + + @abc.abstractmethod + async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]: + pass + + @abc.abstractmethod + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + pass diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py new file mode 100644 index 0000000..8586b77 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py @@ -0,0 +1,273 @@ +import asyncio +import logging +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Awaitable, Callable, Optional + +from kubernetes import client # type: ignore +from kubernetes.client import ApiException # type: ignore +from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore + +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData +from robusta_krr.core.models.result import ResourceAllocations +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from ..base import BaseWorkloadLoader +from .loaders import ( + BaseKindLoader, + CronJobLoader, + DaemonSetLoader, + DeploymentConfigLoader, + DeploymentLoader, + JobLoader, + RolloutLoader, + StatefulSetLoader, +) + +logger = logging.getLogger("krr") + +HPAKey = tuple[str, str, str] + + +class KubeAPIWorkloadLoader(BaseWorkloadLoader): + workload_loaders: list[BaseKindLoader] = [ + DeploymentLoader, + RolloutLoader, + DeploymentConfigLoader, + StatefulSetLoader, + DaemonSetLoader, + JobLoader, + CronJobLoader, + ] + + def __init__(self, cluster: Optional[str] = None): + super().__init__(cluster) + + # This executor will be running requests to Kubernetes API + self.executor = ThreadPoolExecutor(settings.max_workers) + self.api_client = settings.get_kube_client(cluster) + + self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) + self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) + + self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) + self._hpa_list: dict[HPAKey, HPAData] = {} + self._workload_loaders: dict[KindLiteral, BaseKindLoader] = { + loader.kind: loader(self.api_client, self.executor) for loader in self.workload_loaders + } + + async def list_workloads(self) -> list[K8sWorkload]: + """List all scannable objects. + + Returns: + A list of scannable objects. + """ + + logger.info(f"Listing scannable objects in {self.cluster}") + logger.debug(f"Namespaces: {settings.namespaces}") + logger.debug(f"Resources: {settings.resources}") + + self._hpa_list = await self._try_list_hpa() + workload_object_lists = await asyncio.gather( + *[ + self._fetch_workload(loader) + for loader in self._workload_loaders.values() + if self._should_list_resource(loader.kind) + ] + ) + + return [ + object + for workload_objects in workload_object_lists + for object in workload_objects + # NOTE: By default we will filter out kube-system namespace + if not (settings.namespaces == "*" and object.namespace == "kube-system") + ] + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + return await self._workload_loaders[object.kind].list_pods(object) + + def __build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: + name = item.metadata.name + namespace = item.metadata.namespace + kind = kind or item.__class__.__name__[2:] + + obj = K8sWorkload( + cluster=self.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container.name, + allocations=ResourceAllocations.from_container(container), + hpa=self._hpa_list.get((namespace, kind, name)), + ) + obj._api_resource = item + return obj + + def _should_list_resource(self, resource: str) -> bool: + if settings.resources == "*": + return True + return resource in settings.resources + + async def _list_namespaced_or_global_objects( + self, + kind: KindLiteral, + all_namespaces_request: Callable[..., Awaitable[Any]], + namespaced_request: Callable[..., Awaitable[Any]], + ) -> list[Any]: + logger.debug(f"Listing {kind}s in {self.cluster}") + + if settings.namespaces == "*": + requests = [ + all_namespaces_request( + label_selector=settings.selector, + ) + ] + else: + requests = [ + namespaced_request( + namespace=namespace, + label_selector=settings.selector, + ) + for namespace in settings.namespaces + ] + + result = [item for request_result in await asyncio.gather(*requests) for item in request_result.items] + + logger.debug(f"Found {len(result)} {kind} in {self.cluster}") + return result + + async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]: + kind = loader.kind + + if not self._should_list_resource(kind): + logger.debug(f"Skipping {kind}s in {self.cluster}") + return + + if not self._kind_available[kind]: + return + + result = [] + try: + for item in await self._list_namespaced_or_global_objects( + kind, loader.all_namespaces_request_async, loader.namespaced_request_async + ): + if not loader.filter(item): + continue + + containers = await loader.extract_containers(item) + if asyncio.iscoroutine(containers): + containers = await containers + + result.extend(self.__build_scannable_object(item, container, kind) for container in containers) + except ApiException as e: + if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: + if self._kind_available[kind]: + logger.debug(f"{kind} API not available in {self.cluster}") + self._kind_available[kind] = False + else: + logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") + logger.error("Will skip this object type and continue.") + + return result + + async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: + loop = asyncio.get_running_loop() + res = await self._list_namespaced_or_global_objects( + kind="HPA-v1", + all_namespaces_request=lambda **kwargs: loop.run_in_executor( + self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), + ), + namespaced_request=lambda **kwargs: loop.run_in_executor( + self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs), + ), + ) + + return { + ( + hpa.metadata.namespace, + hpa.spec.scale_target_ref.kind, + hpa.spec.scale_target_ref.name, + ): HPAData( + min_replicas=hpa.spec.min_replicas, + max_replicas=hpa.spec.max_replicas, + current_replicas=hpa.status.current_replicas, + desired_replicas=hpa.status.desired_replicas, + target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, + target_memory_utilization_percentage=None, + ) + async for hpa in res + } + + async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: + loop = asyncio.get_running_loop() + + res = await self._list_namespaced_or_global_objects( + kind="HPA-v2", + all_namespaces_request=lambda **kwargs: loop.run_in_executor( + self.executor, self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs) + ), + namespaced_request=lambda **kwargs: loop.run_in_executor( + self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs), + ), + ) + + def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: + return next( + ( + metric.resource.target.average_utilization + for metric in hpa.spec.metrics + if metric.type == "Resource" and metric.resource.name == metric_name + ), + None, + ) + + return { + ( + hpa.metadata.namespace, + hpa.spec.scale_target_ref.kind, + hpa.spec.scale_target_ref.name, + ): HPAData( + min_replicas=hpa.spec.min_replicas, + max_replicas=hpa.spec.max_replicas, + current_replicas=hpa.status.current_replicas, + desired_replicas=hpa.status.desired_replicas, + target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), + target_memory_utilization_percentage=__get_metric(hpa, "memory"), + ) + for hpa in res + } + + # TODO: What should we do in case of other metrics bound to the HPA? + async def __list_hpa(self) -> dict[HPAKey, HPAData]: + """List all HPA objects in the cluster. + + Returns: + dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name). + """ + + try: + # Try to use V2 API first + return await self.__list_hpa_v2() + except ApiException as e: + if e.status != 404: + # If the error is other than not found, then re-raise it. + raise + + # If V2 API does not exist, fall back to V1 + return await self.__list_hpa_v1() + + async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: + try: + return await self.__list_hpa() + except Exception as e: + logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") + logger.error( + "Will assume that there are no HPA. " + "Be careful as this may lead to inaccurate results if object actually has HPA." + ) + return {} + + +__all__ = ["KubeAPIWorkloadLoader"] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py new file mode 100644 index 0000000..6ca6efd --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py @@ -0,0 +1,19 @@ +from .base import BaseKindLoader +from .cronjobs import CronJobLoader +from .daemonsets import DaemonSetLoader +from .deploymentconfigs import DeploymentConfigLoader +from .deployments import DeploymentLoader +from .jobs import JobLoader +from .rollouts import RolloutLoader +from .statefulsets import StatefulSetLoader + +__all__ = [ + "BaseKindLoader", + "CronJobLoader", + "DeploymentLoader", + "DaemonSetLoader", + "DeploymentConfigLoader", + "JobLoader", + "RolloutLoader", + "StatefulSetLoader", +] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py new file mode 100644 index 0000000..0c14e89 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py @@ -0,0 +1,127 @@ +import abc +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Iterable, Optional, Union + +from kubernetes import client # type: ignore +from kubernetes.client.api_client import ApiClient # type: ignore +from kubernetes.client.models import ( # type: ignore + V1Container, + V1DaemonSet, + V1Deployment, + V1Job, + V1Pod, + V1PodList, + V1StatefulSet, +) + +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +logger = logging.getLogger("krr") + +AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] +HPAKey = tuple[str, str, str] + + +class BaseKindLoader(abc.ABC): + """ + This class is used to define how to load a specific kind of Kubernetes object. + It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. + """ + + kind: KindLiteral + + def __init__(self, api_client: Optional[ApiClient], executor: ThreadPoolExecutor) -> None: + self.executor = executor + self.api_client = api_client + self.apps = client.AppsV1Api(api_client=self.api_client) + self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) + self.batch = client.BatchV1Api(api_client=self.api_client) + self.core = client.CoreV1Api(api_client=self.api_client) + + @abc.abstractmethod + def all_namespaces_request(self, label_selector: str) -> Any: + pass + + async def all_namespaces_request_async(self, label_selector: str) -> Any: + """Default async implementation executes the request in a thread pool.""" + + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.all_namespaces_request( + label_selector=label_selector, + ), + ) + + @abc.abstractmethod + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + pass + + async def namespaced_request_async(self, namespace: str, label_selector: str) -> Any: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.namespaced_request( + namespace=namespace, + label_selector=label_selector, + ), + ) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + return item.spec.template.spec.containers + + def filter(self, item: Any) -> bool: + return True + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + loop = asyncio.get_running_loop() + + if object.selector is None: + return [] + + selector = self._build_selector_query(object.selector) + if selector is None: + return [] + + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + + @classmethod + def _get_match_expression_filter(cls, expression: Any) -> str: + if expression.operator.lower() == "exists": + return expression.key + elif expression.operator.lower() == "doesnotexist": + return f"!{expression.key}" + + values = ",".join(expression.values) + return f"{expression.key} {expression.operator} ({values})" + + @classmethod + def _build_selector_query(cls, selector: Any) -> Union[str, None]: + label_filters = [] + + if selector.match_labels is not None: + label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] + + if selector.match_expressions is not None: + label_filters += [ + cls._get_match_expression_filter(expression) for expression in selector.match_expressions + ] + + if label_filters == []: + # NOTE: This might mean that we have DeploymentConfig, + # which uses ReplicationController and it has a dict like matchLabels + if len(selector) != 0: + label_filters += [f"{label[0]}={label[1]}" for label in selector.items()] + else: + return None + + return ",".join(label_filters) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py new file mode 100644 index 0000000..d999c65 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py @@ -0,0 +1,68 @@ +import asyncio +from collections import defaultdict +import logging +from typing import Any, Iterable + +from kubernetes.client.models import V1Container, V1Job, V1PodList # type: ignore + +from robusta_krr.core.models.objects import K8sWorkload, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class CronJobLoader(BaseKindLoader): + kind = "CronJob" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self._jobs: dict[str, list[V1Job]] = {} + self._jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.batch.list_cron_job_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.batch.list_namespaced_cron_job(namespace=namespace, label_selector=label_selector, watch=False) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + return item.spec.job_template.spec.template.spec.containers + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + loop = asyncio.get_running_loop() + + namespace_jobs = await self._list_jobs(object.namespace) + ownered_jobs_uids = [ + job.metadata.uid + for job in namespace_jobs + if any( + owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid + for owner in job.metadata.owner_references or [] + ) + ] + selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" + + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + + async def _list_jobs(self, namespace: str) -> list[V1Job]: + if namespace not in self._jobs: + loop = asyncio.get_running_loop() + + async with self._jobs_loading_locks[namespace]: + logging.debug(f"Loading jobs for cronjobs in {namespace}") + ret = await loop.run_in_executor( + self.executor, + lambda: self.batch.list_namespaced_job(namespace=namespace), + ) + self._jobs[namespace] = ret.items + + return self._jobs[namespace] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py new file mode 100644 index 0000000..1005bed --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class DaemonSetLoader(BaseKindLoader): + kind = "DaemonSet" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_daemon_set_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_daemon_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py new file mode 100644 index 0000000..33b64d9 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py @@ -0,0 +1,38 @@ +import logging +from typing import Any + +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class DeploymentConfigLoader(BaseKindLoader): + kind = "DeploymentConfig" + + # NOTE: Using custom objects API returns dicts, but all other APIs return objects + # We need to handle this difference using a small wrapper + + def all_namespaces_request(self, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_cluster_custom_object( + group="apps.openshift.io", + version="v1", + plural="deploymentconfigs", + label_selector=label_selector, + watch=False, + ) + ) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_namespaced_custom_object( + group="apps.openshift.io", + version="v1", + plural="deploymentconfigs", + namespace=namespace, + label_selector=label_selector, + watch=False, + ) + ) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py new file mode 100644 index 0000000..fc202b5 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class DeploymentLoader(BaseKindLoader): + kind = "Deployment" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_deployment_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_deployment(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py new file mode 100644 index 0000000..87ce5a7 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py @@ -0,0 +1,18 @@ +from typing import Any + +from .base import BaseKindLoader + + +class JobLoader(BaseKindLoader): + kind = "Job" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.batch.list_job_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.batch.list_namespaced_job(namespace=namespace, label_selector=label_selector, watch=False) + + def filter(self, item: Any) -> bool: + # NOTE: If the job has ownerReference and it is a CronJob, + # then we should skip it, as it is a part of the CronJob + return not any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py new file mode 100644 index 0000000..0148745 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py @@ -0,0 +1,61 @@ +import asyncio +import logging +from typing import Any, Iterable + +from kubernetes.client.models import V1Container # type: ignore + +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class RolloutLoader(BaseKindLoader): + kind = "Rollout" + + # NOTE: Using custom objects API returns dicts, but all other APIs return objects + # We need to handle this difference using a small wrapper + + def all_namespaces_request(self, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_cluster_custom_object( + group="argoproj.io", + version="v1alpha1", + plural="rollouts", + label_selector=label_selector, + watch=False, + ) + ) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + plural="rollouts", + namespace=namespace, + label_selector=label_selector, + watch=False, + ) + ) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + if item.spec.template is not None: + return item.spec.template.spec.containers + + logging.debug( + f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}" + ) + + # Template can be None and object might have workloadRef + workloadRef = item.spec.workloadRef + if workloadRef is not None: + loop = asyncio.get_running_loop() + ret = await loop.run_in_executor( + self.executor, + lambda: self.apps.read_namespaced_deployment(namespace=item.metadata.namespace, name=workloadRef.name), + ) + return ret.spec.template.spec.containers + + return [] diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py new file mode 100644 index 0000000..069f2c6 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class StatefulSetLoader(BaseKindLoader): + kind = "StatefulSet" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_stateful_set_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_stateful_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py new file mode 100644 index 0000000..d2a9883 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py @@ -0,0 +1,10 @@ +import logging +from .base import BaseWorkloadLoader + + +logger = logging.getLogger("krr") + + +class PrometheusWorkloadLoader(BaseWorkloadLoader): + # TODO: Implement PrometheusWorkloadLoader + pass diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index db49392..c449bca 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -11,7 +11,7 @@ from kubernetes.client.exceptions import ApiException from prometrix import MetricsNotFound, PrometheusNotFound from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.objects import K8sWorkload, PodData from .metrics_service.prometheus_metrics_service import PrometheusMetricsService from .metrics_service.thanos_metrics_service import ThanosMetricsService @@ -82,7 +82,7 @@ class PrometheusMetricsLoader: ) -> Optional[tuple[datetime.datetime, datetime.datetime]]: return await self.loader.get_history_range(history_duration) - async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]: + async def load_pods(self, object: K8sWorkload, period: datetime.timedelta) -> list[PodData]: try: return await self.loader.load_pods(object, period) except Exception as e: @@ -91,7 +91,7 @@ class PrometheusMetricsLoader: async def gather_data( self, - object: K8sObjectData, + object: K8sWorkload, strategy: BaseStrategy, period: datetime.timedelta, *, diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index f4265e2..d55af00 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -15,7 +15,7 @@ from prometrix import CustomPrometheusConnect from robusta_krr.core.abstract.metrics import BaseMetric from robusta_krr.core.abstract.strategies import PodsTimeData from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload class PrometheusSeries(TypedDict): @@ -86,7 +86,7 @@ class PrometheusMetric(BaseMetric): return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"' @abc.abstractmethod - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: """ This method should be implemented by all subclasses to provide a query string to fetch metrics. @@ -148,7 +148,7 @@ class PrometheusMetric(BaseMetric): return await loop.run_in_executor(self.executor, lambda: self._query_prometheus_sync(data)) async def load_data( - self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + self, object: K8sWorkload, period: datetime.timedelta, step: datetime.timedelta ) -> PodsTimeData: """ Asynchronous method that loads metric data for a specific object. diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu.py b/robusta_krr/core/integrations/prometheus/metrics/cpu.py index c7a2c73..7257654 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu.py @@ -1,4 +1,4 @@ -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from .base import PrometheusMetric, QueryType @@ -10,7 +10,7 @@ class CPULoader(PrometheusMetric): query_type: QueryType = QueryType.QueryRange - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return f""" @@ -36,7 +36,7 @@ def PercentileCPULoader(percentile: float) -> type[PrometheusMetric]: raise ValueError("percentile must be between 0 and 100") class PercentileCPULoader(PrometheusMetric): - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return f""" @@ -64,7 +64,7 @@ class CPUAmountLoader(PrometheusMetric): A metric loader for loading CPU points count. """ - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return f""" diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory.py b/robusta_krr/core/integrations/prometheus/metrics/memory.py index 85031bc..ca324f6 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory.py @@ -1,4 +1,4 @@ -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from .base import PrometheusMetric, QueryType @@ -10,7 +10,7 @@ class MemoryLoader(PrometheusMetric): query_type: QueryType = QueryType.QueryRange - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return f""" @@ -30,7 +30,7 @@ class MaxMemoryLoader(PrometheusMetric): A metric loader for loading max memory usage metrics. """ - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return f""" @@ -53,7 +53,7 @@ class MemoryAmountLoader(PrometheusMetric): A metric loader for loading memory points count. """ - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return f""" diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index a3b0ee0..ae34a7a 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -7,7 +7,7 @@ from kubernetes.client.api_client import ApiClient from robusta_krr.core.abstract.strategies import PodsTimeData from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from ..metrics import PrometheusMetric @@ -39,7 +39,7 @@ class MetricsService(abc.ABC): @abc.abstractmethod async def gather_data( self, - object: K8sObjectData, + object: K8sWorkload, LoaderClass: type[PrometheusMetric], period: datetime.timedelta, step: datetime.timedelta = datetime.timedelta(minutes=30), diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 423a29a..5347422 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -11,7 +11,7 @@ from prometrix import PrometheusNotFound, get_custom_prometheus_connect from robusta_krr.core.abstract.strategies import PodsTimeData from robusta_krr.core.integrations import openshift from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.objects import K8sWorkload, PodData from robusta_krr.utils.batched import batched from robusta_krr.utils.service_discovery import MetricsServiceDiscovery @@ -177,7 +177,7 @@ class PrometheusMetricsService(MetricsService): async def gather_data( self, - object: K8sObjectData, + object: K8sWorkload, LoaderClass: type[PrometheusMetric], period: timedelta, step: timedelta = timedelta(minutes=30), @@ -202,7 +202,7 @@ class PrometheusMetricsService(MetricsService): return data - async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]: + async def load_pods(self, object: K8sWorkload, period: timedelta) -> list[PodData]: """ List pods related to the object and add them to the object's pods list. Args: diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index ff6142a..6e03967 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -23,6 +23,7 @@ class Config(pd.BaseSettings): clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None + workload_loader: Literal["kubeapi", "prometheus"] = pd.Field("kubeapi") impersonate_user: Optional[str] = None impersonate_group: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index e4b400d..5dac123 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -35,7 +35,7 @@ PodWarning = Literal[ ] -class K8sObjectData(pd.BaseModel): +class K8sWorkload(pd.BaseModel): # NOTE: Here None means that we are running inside the cluster cluster: Optional[str] name: str @@ -80,7 +80,7 @@ class K8sObjectData(pd.BaseModel): else: return self._api_resource.spec.selector - def split_into_batches(self, n: int) -> list[K8sObjectData]: + def split_into_batches(self, n: int) -> list[K8sWorkload]: """ Batch this object into n objects, splitting the pods into batches of size n. """ @@ -89,7 +89,7 @@ class K8sObjectData(pd.BaseModel): return [self] return [ - K8sObjectData( + K8sWorkload( cluster=self.cluster, name=self.name, container=self.container, diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py index 2d5ffbc..1735125 100644 --- a/robusta_krr/core/models/result.py +++ b/robusta_krr/core/models/result.py @@ -6,7 +6,7 @@ import pydantic as pd from robusta_krr.core.abstract import formatters from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.severity import Severity @@ -22,12 +22,12 @@ class ResourceRecommendation(pd.BaseModel): class ResourceScan(pd.BaseModel): - object: K8sObjectData + object: K8sWorkload recommended: ResourceRecommendation severity: Severity @classmethod - def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan: + def calculate(cls, object: K8sWorkload, recommendation: ResourceAllocations) -> ResourceScan: recommendation_processed = ResourceRecommendation(requests={}, limits={}, info={}) for resource_type in ResourceType: diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 8e08521..de3b3c7 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -12,10 +12,10 @@ from rich.console import Console from slack_sdk import WebClient from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult -from robusta_krr.core.integrations.kubernetes import KubernetesLoader +from robusta_krr.core.integrations.kubernetes import ClusterWorkloadsLoader from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData from robusta_krr.utils.intro import load_intro_message from robusta_krr.utils.progress_bar import ProgressBar @@ -40,7 +40,7 @@ class Runner: EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) def __init__(self) -> None: - self._k8s_loader = KubernetesLoader() + self._k8s_loader = ClusterWorkloadsLoader() self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} self._metrics_service_loaders_error_logged: set[Exception] = set() self._strategy = settings.create_strategy() @@ -165,7 +165,7 @@ class Runner: for resource, recommendation in result.items() } - async def _calculate_object_recommendations(self, object: K8sObjectData) -> Optional[RunResult]: + async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]: prometheus_loader = self._get_prometheus_loader(object.cluster) if prometheus_loader is None: @@ -239,7 +239,7 @@ class Runner: } ) - async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> Optional[ResourceScan]: + async def _gather_object_allocations(self, k8s_object: K8sWorkload) -> Optional[ResourceScan]: recommendation = await self._calculate_object_recommendations(k8s_object) self.__progressbar.progress() @@ -275,7 +275,7 @@ class Runner: await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters]) with ProgressBar(title="Calculating Recommendation") as self.__progressbar: - workloads = await self._k8s_loader.list_scannable_objects(clusters) + workloads = await self._k8s_loader.list_workloads(clusters) scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads]) successful_scans = [scan for scan in scans if scan is not None] diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py index 64e58ac..db15f6c 100644 --- a/robusta_krr/strategies/simple.py +++ b/robusta_krr/strategies/simple.py @@ -5,7 +5,7 @@ import pydantic as pd from robusta_krr.core.abstract.strategies import ( BaseStrategy, - K8sObjectData, + K8sWorkload, MetricsPodData, PodsTimeData, ResourceRecommendation, @@ -78,7 +78,7 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]): return [PercentileCPULoader(self.settings.cpu_percentile), MaxMemoryLoader, CPUAmountLoader, MemoryAmountLoader] def __calculate_cpu_proposal( - self, history_data: MetricsPodData, object_data: K8sObjectData + self, history_data: MetricsPodData, object_data: K8sWorkload ) -> ResourceRecommendation: data = history_data["PercentileCPULoader"] @@ -98,7 +98,7 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]): return ResourceRecommendation(request=cpu_usage, limit=None) def __calculate_memory_proposal( - self, history_data: MetricsPodData, object_data: K8sObjectData + self, history_data: MetricsPodData, object_data: K8sWorkload ) -> ResourceRecommendation: data = history_data["MaxMemoryLoader"] @@ -117,7 +117,7 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]): memory_usage = self.settings.calculate_memory_proposal(data) return ResourceRecommendation(request=memory_usage, limit=memory_usage) - def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult: + def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult: return { ResourceType.CPU: self.__calculate_cpu_proposal(history_data, object_data), ResourceType.Memory: self.__calculate_memory_proposal(history_data, object_data), diff --git a/tests/conftest.py b/tests/conftest.py index b1d8d22..2bc9807 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,10 @@ from unittest.mock import AsyncMock, patch import numpy as np import pytest -from robusta_krr.api.models import K8sObjectData, PodData, ResourceAllocations +from robusta_krr.api.models import K8sWorkload, PodData, ResourceAllocations from robusta_krr.strategies.simple import SimpleStrategy, SimpleStrategySettings -TEST_OBJECT = K8sObjectData( +TEST_OBJECT = K8sWorkload( cluster="mock-cluster", name="mock-object-1", container="mock-container-1", -- cgit v1.2.3