summaryrefslogtreecommitdiff
path: root/robusta_krr
diff options
context:
space:
mode:
Diffstat (limited to 'robusta_krr')
-rw-r--r--robusta_krr/api/models.py4
-rw-r--r--robusta_krr/core/abstract/metrics.py4
-rw-r--r--robusta_krr/core/abstract/strategies.py6
-rw-r--r--robusta_krr/core/integrations/kubernetes/__init__.py463
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py5
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/base.py16
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py273
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py19
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py127
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py68
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py13
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py38
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py13
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py18
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py61
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py13
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py10
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py6
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base.py6
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/cpu.py8
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/memory.py8
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py4
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py6
-rw-r--r--robusta_krr/core/models/config.py1
-rw-r--r--robusta_krr/core/models/objects.py6
-rw-r--r--robusta_krr/core/models/result.py6
-rw-r--r--robusta_krr/core/runner.py12
-rw-r--r--robusta_krr/strategies/simple.py8
28 files changed, 735 insertions, 487 deletions
diff --git a/robusta_krr/api/models.py b/robusta_krr/api/models.py
index 3a453ce..e61f01e 100644
--- a/robusta_krr/api/models.py
+++ b/robusta_krr/api/models.py
@@ -1,6 +1,6 @@
from robusta_krr.core.abstract.strategies import MetricsPodData, PodsTimeData, ResourceRecommendation, RunResult
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
-from robusta_krr.core.models.objects import K8sObjectData, PodData
+from robusta_krr.core.models.objects import K8sWorkload, PodData
from robusta_krr.core.models.result import ResourceScan, Result
from robusta_krr.core.models.severity import Severity, register_severity_calculator
@@ -8,7 +8,7 @@ __all__ = [
"ResourceType",
"ResourceAllocations",
"RecommendationValue",
- "K8sObjectData",
+ "K8sWorkload",
"PodData",
"Result",
"Severity",
diff --git a/robusta_krr/core/abstract/metrics.py b/robusta_krr/core/abstract/metrics.py
index 3b6f19c..e1151ea 100644
--- a/robusta_krr/core/abstract/metrics.py
+++ b/robusta_krr/core/abstract/metrics.py
@@ -2,7 +2,7 @@ import datetime
from abc import ABC, abstractmethod
from robusta_krr.core.abstract.strategies import PodsTimeData
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
class BaseMetric(ABC):
@@ -16,6 +16,6 @@ class BaseMetric(ABC):
@abstractmethod
async def load_data(
- self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
+ self, object: K8sWorkload, period: datetime.timedelta, step: datetime.timedelta
) -> PodsTimeData:
...
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py
index 5b6521e..ab5ab6c 100644
--- a/robusta_krr/core/abstract/strategies.py
+++ b/robusta_krr/core/abstract/strategies.py
@@ -9,7 +9,7 @@ import numpy as np
import pydantic as pd
from numpy.typing import NDArray
-from robusta_krr.core.models.result import K8sObjectData, ResourceType
+from robusta_krr.core.models.result import K8sWorkload, ResourceType
if TYPE_CHECKING:
from robusta_krr.core.abstract.metrics import BaseMetric # noqa: F401
@@ -133,7 +133,7 @@ class BaseStrategy(abc.ABC, Generic[_StrategySettings]):
# Abstract method that needs to be implemented by subclass.
# This method is intended to calculate resource recommendation based on history data and kubernetes object data.
@abc.abstractmethod
- def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult:
+ def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult:
pass
# This method is intended to return a strategy by its name.
@@ -167,6 +167,6 @@ __all__ = [
"StrategySettings",
"PodsTimeData",
"MetricsPodData",
- "K8sObjectData",
+ "K8sWorkload",
"ResourceType",
]
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py
index a772a5c..051cc23 100644
--- a/robusta_krr/core/integrations/kubernetes/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/__init__.py
@@ -18,11 +18,12 @@ from kubernetes.client.models import (
)
from robusta_krr.core.models.config import settings
-from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData
+from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.object_like_dict import ObjectLikeDict
from . import config_patch as _
+from .workload_loader import BaseWorkloadLoader, KubeAPIWorkloadLoader, PrometheusWorkloadLoader
logger = logging.getLogger("krr")
@@ -30,441 +31,7 @@ AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod,
HPAKey = tuple[str, str, str]
-class ClusterLoader:
- def __init__(self, cluster: Optional[str]=None):
- self.cluster = cluster
- # This executor will be running requests to Kubernetes API
- self.executor = ThreadPoolExecutor(settings.max_workers)
- self.api_client = settings.get_kube_client(cluster)
- self.apps = client.AppsV1Api(api_client=self.api_client)
- self.custom_objects = client.CustomObjectsApi(api_client=self.api_client)
- self.batch = client.BatchV1Api(api_client=self.api_client)
- self.core = client.CoreV1Api(api_client=self.api_client)
- self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
- self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)
-
- self.__kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True)
-
- self.__jobs_for_cronjobs: dict[str, list[V1Job]] = {}
- self.__jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
-
- async def list_scannable_objects(self) -> list[K8sObjectData]:
- """List all scannable objects.
-
- Returns:
- A list of scannable objects.
- """
-
- logger.info(f"Listing scannable objects in {self.cluster}")
- logger.debug(f"Namespaces: {settings.namespaces}")
- logger.debug(f"Resources: {settings.resources}")
-
- self.__hpa_list = await self._try_list_hpa()
- workload_object_lists = await asyncio.gather(
- self._list_deployments(),
- self._list_rollouts(),
- self._list_deploymentconfig(),
- self._list_all_statefulsets(),
- self._list_all_daemon_set(),
- self._list_all_jobs(),
- self._list_all_cronjobs(),
- )
-
- return [
- object
- for workload_objects in workload_object_lists
- for object in workload_objects
- # NOTE: By default we will filter out kube-system namespace
- if not (settings.namespaces == "*" and object.namespace == "kube-system")
- ]
-
- async def _list_jobs_for_cronjobs(self, namespace: str) -> list[V1Job]:
- if namespace not in self.__jobs_for_cronjobs:
- loop = asyncio.get_running_loop()
-
- async with self.__jobs_loading_locks[namespace]:
- logging.debug(f"Loading jobs for cronjobs in {namespace}")
- ret = await loop.run_in_executor(
- self.executor,
- lambda: self.batch.list_namespaced_job(namespace=namespace),
- )
- self.__jobs_for_cronjobs[namespace] = ret.items
-
- return self.__jobs_for_cronjobs[namespace]
-
- async def list_pods(self, object: K8sObjectData) -> list[PodData]:
- loop = asyncio.get_running_loop()
-
- if object.kind == "CronJob":
- namespace_jobs = await self._list_jobs_for_cronjobs(object.namespace)
- ownered_jobs_uids = [
- job.metadata.uid
- for job in namespace_jobs
- if any(
- owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid
- for owner in job.metadata.owner_references or []
- )
- ]
- selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})"
-
- else:
- if object.selector is None:
- return []
-
- selector = self._build_selector_query(object.selector)
- if selector is None:
- return []
-
- ret: V1PodList = await loop.run_in_executor(
- self.executor,
- lambda: self.core.list_namespaced_pod(
- namespace=object._api_resource.metadata.namespace, label_selector=selector
- ),
- )
-
- return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
-
- @staticmethod
- def _get_match_expression_filter(expression) -> str:
- if expression.operator.lower() == "exists":
- return expression.key
- elif expression.operator.lower() == "doesnotexist":
- return f"!{expression.key}"
-
- values = ",".join(expression.values)
- return f"{expression.key} {expression.operator} ({values})"
-
- @staticmethod
- def _build_selector_query(selector: Any) -> Union[str, None]:
- label_filters = []
-
- if selector.match_labels is not None:
- label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()]
-
- if selector.match_expressions is not None:
- label_filters += [
- ClusterLoader._get_match_expression_filter(expression) for expression in selector.match_expressions
- ]
-
- if label_filters == []:
- # NOTE: This might mean that we have DeploymentConfig,
- # which uses ReplicationController and it has a dict like matchLabels
- if len(selector) != 0:
- label_filters += [f"{label[0]}={label[1]}" for label in selector.items()]
- else:
- return None
-
- return ",".join(label_filters)
-
- def __build_scannable_object(
- self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None
- ) -> K8sObjectData:
- name = item.metadata.name
- namespace = item.metadata.namespace
- kind = kind or item.__class__.__name__[2:]
-
- obj = K8sObjectData(
- cluster=self.cluster,
- namespace=namespace,
- name=name,
- kind=kind,
- container=container.name,
- allocations=ResourceAllocations.from_container(container),
- hpa=self.__hpa_list.get((namespace, kind, name)),
- )
- obj._api_resource = item
- return obj
-
- def _should_list_resource(self, resource: str) -> bool:
- if settings.resources == "*":
- return True
- return resource in settings.resources
-
- async def _list_namespaced_or_global_objects(
- self,
- kind: KindLiteral,
- all_namespaces_request: Callable,
- namespaced_request: Callable
- ) -> list[Any]:
- logger.debug(f"Listing {kind}s in {self.cluster}")
- loop = asyncio.get_running_loop()
-
- if settings.namespaces == "*":
- requests = [
- loop.run_in_executor(
- self.executor,
- lambda: all_namespaces_request(
- watch=False,
- label_selector=settings.selector,
- ),
- )
- ]
- else:
- requests = [
- loop.run_in_executor(
- self.executor,
- lambda ns=namespace: namespaced_request(
- namespace=ns,
- watch=False,
- label_selector=settings.selector,
- ),
- )
- for namespace in settings.namespaces
- ]
-
- result = [
- item
- for request_result in await asyncio.gather(*requests)
- for item in request_result.items
- ]
-
- logger.debug(f"Found {len(result)} {kind} in {self.cluster}")
- return result
-
- async def _list_scannable_objects(
- self,
- kind: KindLiteral,
- all_namespaces_request: Callable,
- namespaced_request: Callable,
- extract_containers: Callable[[Any], Union[Iterable[V1Container], Awaitable[Iterable[V1Container]]]],
- filter_workflows: Optional[Callable[[Any], bool]] = None,
- ) -> list[K8sObjectData]:
- if not self._should_list_resource(kind):
- logger.debug(f"Skipping {kind}s in {self.cluster}")
- return
-
- if not self.__kind_available[kind]:
- return
-
- result = []
- try:
- for item in await self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request):
- if filter_workflows is not None and not filter_workflows(item):
- continue
-
- containers = extract_containers(item)
- if asyncio.iscoroutine(containers):
- containers = await containers
-
- result.extend(self.__build_scannable_object(item, container, kind) for container in containers)
- except ApiException as e:
- if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]:
- if self.__kind_available[kind]:
- logger.debug(f"{kind} API not available in {self.cluster}")
- self.__kind_available[kind] = False
- else:
- logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
- logger.error("Will skip this object type and continue.")
-
- return result
-
- def _list_deployments(self) -> list[K8sObjectData]:
- return self._list_scannable_objects(
- kind="Deployment",
- all_namespaces_request=self.apps.list_deployment_for_all_namespaces,
- namespaced_request=self.apps.list_namespaced_deployment,
- extract_containers=lambda item: item.spec.template.spec.containers,
- )
-
- def _list_rollouts(self) -> list[K8sObjectData]:
- async def _extract_containers(item: Any) -> list[V1Container]:
- if item.spec.template is not None:
- return item.spec.template.spec.containers
-
- loop = asyncio.get_running_loop()
-
- logging.debug(
- f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}"
- )
-
- # Template can be None and object might have workloadRef
- workloadRef = item.spec.workloadRef
- if workloadRef is not None:
- ret = await loop.run_in_executor(
- self.executor,
- lambda: self.apps.read_namespaced_deployment(
- namespace=item.metadata.namespace, name=workloadRef.name
- ),
- )
- return ret.spec.template.spec.containers
-
- return []
-
- # NOTE: Using custom objects API returns dicts, but all other APIs return objects
- # We need to handle this difference using a small wrapper
- return self._list_scannable_objects(
- kind="Rollout",
- all_namespaces_request=lambda **kwargs: ObjectLikeDict(
- self.custom_objects.list_cluster_custom_object(
- group="argoproj.io",
- version="v1alpha1",
- plural="rollouts",
- **kwargs,
- )
- ),
- namespaced_request=lambda **kwargs: ObjectLikeDict(
- self.custom_objects.list_namespaced_custom_object(
- group="argoproj.io",
- version="v1alpha1",
- plural="rollouts",
- **kwargs,
- )
- ),
- extract_containers=_extract_containers,
- )
-
- def _list_deploymentconfig(self) -> list[K8sObjectData]:
- # NOTE: Using custom objects API returns dicts, but all other APIs return objects
- # We need to handle this difference using a small wrapper
- return self._list_scannable_objects(
- kind="DeploymentConfig",
- all_namespaces_request=lambda **kwargs: ObjectLikeDict(
- self.custom_objects.list_cluster_custom_object(
- group="apps.openshift.io",
- version="v1",
- plural="deploymentconfigs",
- **kwargs,
- )
- ),
- namespaced_request=lambda **kwargs: ObjectLikeDict(
- self.custom_objects.list_namespaced_custom_object(
- group="apps.openshift.io",
- version="v1",
- plural="deploymentconfigs",
- **kwargs,
- )
- ),
- extract_containers=lambda item: item.spec.template.spec.containers,
- )
-
- def _list_all_statefulsets(self) -> list[K8sObjectData]:
- return self._list_scannable_objects(
- kind="StatefulSet",
- all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces,
- namespaced_request=self.apps.list_namespaced_stateful_set,
- extract_containers=lambda item: item.spec.template.spec.containers,
- )
-
- def _list_all_daemon_set(self) -> list[K8sObjectData]:
- return self._list_scannable_objects(
- kind="DaemonSet",
- all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces,
- namespaced_request=self.apps.list_namespaced_daemon_set,
- extract_containers=lambda item: item.spec.template.spec.containers,
- )
-
- def _list_all_jobs(self) -> list[K8sObjectData]:
- return self._list_scannable_objects(
- kind="Job",
- all_namespaces_request=self.batch.list_job_for_all_namespaces,
- namespaced_request=self.batch.list_namespaced_job,
- extract_containers=lambda item: item.spec.template.spec.containers,
- # NOTE: If the job has ownerReference and it is a CronJob, then we should skip it
- filter_workflows=lambda item: not any(
- owner.kind == "CronJob" for owner in item.metadata.owner_references or []
- ),
- )
-
- def _list_all_cronjobs(self) -> list[K8sObjectData]:
- return self._list_scannable_objects(
- kind="CronJob",
- all_namespaces_request=self.batch.list_cron_job_for_all_namespaces,
- namespaced_request=self.batch.list_namespaced_cron_job,
- extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers,
- )
-
- async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
- loop = asyncio.get_running_loop()
- res = await loop.run_in_executor(
- self.executor,
- lambda: self._list_namespaced_or_global_objects(
- kind="HPA-v1",
- all_namespaces_request=self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces,
- namespaced_request=self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler,
- ),
- )
- return {
- (
- hpa.metadata.namespace,
- hpa.spec.scale_target_ref.kind,
- hpa.spec.scale_target_ref.name,
- ): HPAData(
- min_replicas=hpa.spec.min_replicas,
- max_replicas=hpa.spec.max_replicas,
- current_replicas=hpa.status.current_replicas,
- desired_replicas=hpa.status.desired_replicas,
- target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage,
- target_memory_utilization_percentage=None,
- )
- async for hpa in res
- }
-
- async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]:
- res = await self._list_namespaced_or_global_objects(
- kind="HPA-v2",
- all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces,
- namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler,
- )
- def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]:
- return next(
- (
- metric.resource.target.average_utilization
- for metric in hpa.spec.metrics
- if metric.type == "Resource" and metric.resource.name == metric_name
- ),
- None,
- )
- return {
- (
- hpa.metadata.namespace,
- hpa.spec.scale_target_ref.kind,
- hpa.spec.scale_target_ref.name,
- ): HPAData(
- min_replicas=hpa.spec.min_replicas,
- max_replicas=hpa.spec.max_replicas,
- current_replicas=hpa.status.current_replicas,
- desired_replicas=hpa.status.desired_replicas,
- target_cpu_utilization_percentage=__get_metric(hpa, "cpu"),
- target_memory_utilization_percentage=__get_metric(hpa, "memory"),
- )
- for hpa in res
- }
-
- # TODO: What should we do in case of other metrics bound to the HPA?
- async def __list_hpa(self) -> dict[HPAKey, HPAData]:
- """List all HPA objects in the cluster.
-
- Returns:
- dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name).
- """
-
- try:
- # Try to use V2 API first
- return await self.__list_hpa_v2()
- except ApiException as e:
- if e.status != 404:
- # If the error is other than not found, then re-raise it.
- raise
-
- # If V2 API does not exist, fall back to V1
- return await self.__list_hpa_v1()
-
- async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
- try:
- return await self.__list_hpa()
- except Exception as e:
- logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}")
- logger.error(
- "Will assume that there are no HPA. "
- "Be careful as this may lead to inaccurate results if object actually has HPA."
- )
- return {}
-
-
-class KubernetesLoader:
- def __init__(self) -> None:
- self._cluster_loaders: dict[Optional[str], ClusterLoader] = {}
-
+class ClusterWorkloadsLoader:
async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
@@ -505,36 +72,42 @@ class KubernetesLoader:
return [context["name"] for context in contexts if context["name"] in settings.clusters]
- def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]:
+ def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
+ WorkloadLoader = KubeAPIWorkloadLoader if settings.workload_loader == "kubeapi" else PrometheusWorkloadLoader
+
try:
- return ClusterLoader(cluster=cluster)
+ return WorkloadLoader(cluster=cluster)
except Exception as e:
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None
- async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
+ async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]:
"""List all scannable objects.
- Yields:
- Each scannable object as it is loaded.
+ Returns:
+ A list of all loaded objects.
"""
+
if clusters is None:
_cluster_loaders = [self._try_create_cluster_loader(None)]
else:
_cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters]
- self.cluster_loaders = {cl.cluster: cl for cl in _cluster_loaders if cl is not None}
+ self.cluster_loaders: dict[Optional[str], BaseWorkloadLoader] = {
+ cl.cluster: cl for cl in _cluster_loaders if cl is not None
+ }
+
if self.cluster_loaders == {}:
logger.error("Could not load any cluster.")
return
-
+
return [
object
for cluster_loader in self.cluster_loaders.values()
- for object in await cluster_loader.list_scannable_objects()
+ for object in await cluster_loader.list_workloads()
]
- async def load_pods(self, object: K8sObjectData) -> list[PodData]:
+ async def load_pods(self, object: K8sWorkload) -> list[PodData]:
try:
cluster_loader = self.cluster_loaders[object.cluster]
except KeyError:
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
new file mode 100644
index 0000000..9543147
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
@@ -0,0 +1,5 @@
+from .base import BaseWorkloadLoader
+from .kube_api import KubeAPIWorkloadLoader
+from .prometheus import PrometheusWorkloadLoader
+
+__all__ = ["BaseWorkloadLoader", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"] \ No newline at end of file
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
new file mode 100644
index 0000000..a7bba21
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
@@ -0,0 +1,16 @@
+import abc
+from typing import Optional
+from robusta_krr.core.models.objects import K8sWorkload, PodData
+
+
+class BaseWorkloadLoader(abc.ABC):
+ def __init__(self, cluster: Optional[str] = None) -> None:
+ self.cluster = cluster
+
+ @abc.abstractmethod
+ async def list_workloads(self, clusters: Optional[list[str]]) -> list[K8sWorkload]:
+ pass
+
+ @abc.abstractmethod
+ async def list_pods(self, object: K8sWorkload) -> list[PodData]:
+ pass
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
new file mode 100644
index 0000000..8586b77
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
@@ -0,0 +1,273 @@
+import asyncio
+import logging
+from collections import defaultdict
+from concurrent.futures import ThreadPoolExecutor
+from typing import Any, Awaitable, Callable, Optional
+
+from kubernetes import client # type: ignore
+from kubernetes.client import ApiException # type: ignore
+from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore
+
+from robusta_krr.core.models.config import settings
+from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
+from robusta_krr.core.models.result import ResourceAllocations
+from robusta_krr.utils.object_like_dict import ObjectLikeDict
+
+from ..base import BaseWorkloadLoader
+from .loaders import (
+ BaseKindLoader,
+ CronJobLoader,
+ DaemonSetLoader,
+ DeploymentConfigLoader,
+ DeploymentLoader,
+ JobLoader,
+ RolloutLoader,
+ StatefulSetLoader,
+)
+
+logger = logging.getLogger("krr")
+
+HPAKey = tuple[str, str, str]
+
+
+class KubeAPIWorkloadLoader(BaseWorkloadLoader):
+ workload_loaders: list[BaseKindLoader] = [
+ DeploymentLoader,
+ RolloutLoader,
+ DeploymentConfigLoader,
+ StatefulSetLoader,
+ DaemonSetLoader,
+ JobLoader,
+ CronJobLoader,
+ ]
+
+ def __init__(self, cluster: Optional[str] = None):
+ super().__init__(cluster)
+
+ # This executor will be running requests to Kubernetes API
+ self.executor = ThreadPoolExecutor(settings.max_workers)
+ self.api_client = settings.get_kube_client(cluster)
+
+ self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
+ self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client)
+
+ self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True)
+ self._hpa_list: dict[HPAKey, HPAData] = {}
+ self._workload_loaders: dict[KindLiteral, BaseKindLoader] = {
+ loader.kind: loader(self.api_client, self.executor) for loader in self.workload_loaders
+ }
+
+ async def list_workloads(self) -> list[K8sWorkload]:
+ """List all scannable objects.
+
+ Returns:
+ A list of scannable objects.
+ """
+
+ logger.info(f"Listing scannable objects in {self.cluster}")
+ logger.debug(f"Namespaces: {settings.namespaces}")
+ logger.debug(f"Resources: {settings.resources}")
+
+ self._hpa_list = await self._try_list_hpa()
+ workload_object_lists = await asyncio.gather(
+ *[
+ self._fetch_workload(loader)
+ for loader in self._workload_loaders.values()
+ if self._should_list_resource(loader.kind)
+ ]
+ )
+
+ return [
+ object
+ for workload_objects in workload_object_lists
+ for object in workload_objects
+ # NOTE: By default we will filter out kube-system namespace
+ if not (settings.namespaces == "*" and object.namespace == "kube-system")
+ ]
+
+ async def list_pods(self, object: K8sWorkload) -> list[PodData]:
+ return await self._workload_loaders[object.kind].list_pods(object)
+
+ def __build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload:
+ name = item.metadata.name
+ namespace = item.metadata.namespace
+ kind = kind or item.__class__.__name__[2:]
+
+ obj = K8sWorkload(
+ cluster=self.cluster,
+ namespace=namespace,
+ name=name,
+ kind=kind,
+ container=container.name,
+ allocations=ResourceAllocations.from_container(container),
+ hpa=self._hpa_list.get((namespace, kind, name)),
+ )
+ obj._api_resource = item
+ return obj
+
+ def _should_list_resource(self, resource: str) -> bool:
+ if settings.resources == "*":
+ return True
+ return resource in settings.resources
+
+ async def _list_namespaced_or_global_objects(
+ self,
+ kind: KindLiteral,
+ all_namespaces_request: Callable[..., Awaitable[Any]],
+ namespaced_request: Callable[..., Awaitable[Any]],
+ ) -> list[Any]:
+ logger.debug(f"Listing {kind}s in {self.cluster}")
+
+ if settings.namespaces == "*":
+ requests = [
+ all_namespaces_request(
+ label_selector=settings.selector,
+ )
+ ]
+ else:
+ requests = [
+ namespaced_request(
+ namespace=namespace,
+ label_selector=settings.selector,
+ )
+ for namespace in settings.namespaces
+ ]
+
+ result = [item for request_result in await asyncio.gather(*requests) for item in request_result.items]
+
+ logger.debug(f"Found {len(result)} {kind} in {self.cluster}")
+ return result
+
+ async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]:
+ kind = loader.kind
+
+ if not self._should_list_resource(kind):
+ logger.debug(f"Skipping {kind}s in {self.cluster}")
+ return
+
+ if not self._kind_available[kind]:
+ return
+
+ result = []
+ try:
+ for item in await self._list_namespaced_or_global_objects(
+ kind, loader.all_namespaces_request_async, loader.namespaced_request_async
+ ):
+ if not loader.filter(item):
+ continue
+
+ containers = await loader.extract_containers(item)
+ if asyncio.iscoroutine(containers):
+ containers = await containers
+
+ result.extend(self.__build_scannable_object(item, container, kind) for container in containers)
+ except ApiException as e:
+ if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]:
+ if self._kind_available[kind]:
+ logger.debug(f"{kind} API not available in {self.cluster}")
+ self._kind_available[kind] = False
+ else:
+ logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
+ logger.error("Will skip this object type and continue.")
+
+ return result
+
+ async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
+ loop = asyncio.get_running_loop()
+ res = await self._list_namespaced_or_global_objects(
+ kind="HPA-v1",
+ all_namespaces_request=lambda **kwargs: loop.run_in_executor(
+ self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
+ ),
+ namespaced_request=lambda **kwargs: loop.run_in_executor(
+ self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs),
+ ),
+ )
+
+ return {
+ (
+ hpa.metadata.namespace,
+ hpa.spec.scale_target_ref.kind,
+ hpa.spec.scale_target_ref.name,
+ ): HPAData(
+ min_replicas=hpa.spec.min_replicas,
+ max_replicas=hpa.spec.max_replicas,
+ current_replicas=hpa.status.current_replicas,
+ desired_replicas=hpa.status.desired_replicas,
+ target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage,
+ target_memory_utilization_percentage=None,
+ )
+ async for hpa in res
+ }
+
+ async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]:
+ loop = asyncio.get_running_loop()
+
+ res = await self._list_namespaced_or_global_objects(
+ kind="HPA-v2",
+ all_namespaces_request=lambda **kwargs: loop.run_in_executor(
+ self.executor, self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs)
+ ),
+ namespaced_request=lambda **kwargs: loop.run_in_executor(
+ self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs),
+ ),
+ )
+
+ def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]:
+ return next(
+ (
+ metric.resource.target.average_utilization
+ for metric in hpa.spec.metrics
+ if metric.type == "Resource" and metric.resource.name == metric_name
+ ),
+ None,
+ )
+
+ return {
+ (
+ hpa.metadata.namespace,
+ hpa.spec.scale_target_ref.kind,
+ hpa.spec.scale_target_ref.name,
+ ): HPAData(
+ min_replicas=hpa.spec.min_replicas,
+ max_replicas=hpa.spec.max_replicas,
+ current_replicas=hpa.status.current_replicas,
+ desired_replicas=hpa.status.desired_replicas,
+ target_cpu_utilization_percentage=__get_metric(hpa, "cpu"),
+ target_memory_utilization_percentage=__get_metric(hpa, "memory"),
+ )
+ for hpa in res
+ }
+
+ # TODO: What should we do in case of other metrics bound to the HPA?
+ async def __list_hpa(self) -> dict[HPAKey, HPAData]:
+ """List all HPA objects in the cluster.
+
+ Returns:
+ dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name).
+ """
+
+ try:
+ # Try to use V2 API first
+ return await self.__list_hpa_v2()
+ except ApiException as e:
+ if e.status != 404:
+ # If the error is other than not found, then re-raise it.
+ raise
+
+ # If V2 API does not exist, fall back to V1
+ return await self.__list_hpa_v1()
+
+ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
+ try:
+ return await self.__list_hpa()
+ except Exception as e:
+ logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}")
+ logger.error(
+ "Will assume that there are no HPA. "
+ "Be careful as this may lead to inaccurate results if object actually has HPA."
+ )
+ return {}
+
+
+__all__ = ["KubeAPIWorkloadLoader"]
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py
new file mode 100644
index 0000000..6ca6efd
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/__init__.py
@@ -0,0 +1,19 @@
+from .base import BaseKindLoader
+from .cronjobs import CronJobLoader
+from .daemonsets import DaemonSetLoader
+from .deploymentconfigs import DeploymentConfigLoader
+from .deployments import DeploymentLoader
+from .jobs import JobLoader
+from .rollouts import RolloutLoader
+from .statefulsets import StatefulSetLoader
+
+__all__ = [
+ "BaseKindLoader",
+ "CronJobLoader",
+ "DeploymentLoader",
+ "DaemonSetLoader",
+ "DeploymentConfigLoader",
+ "JobLoader",
+ "RolloutLoader",
+ "StatefulSetLoader",
+] \ No newline at end of file
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py
new file mode 100644
index 0000000..0c14e89
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/base.py
@@ -0,0 +1,127 @@
+import abc
+import asyncio
+import logging
+from concurrent.futures import ThreadPoolExecutor
+from typing import Any, Iterable, Optional, Union
+
+from kubernetes import client # type: ignore
+from kubernetes.client.api_client import ApiClient # type: ignore
+from kubernetes.client.models import ( # type: ignore
+ V1Container,
+ V1DaemonSet,
+ V1Deployment,
+ V1Job,
+ V1Pod,
+ V1PodList,
+ V1StatefulSet,
+)
+
+from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData
+
+logger = logging.getLogger("krr")
+
+AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
+HPAKey = tuple[str, str, str]
+
+
+class BaseKindLoader(abc.ABC):
+ """
+ This class is used to define how to load a specific kind of Kubernetes object.
+ It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects.
+ """
+
+ kind: KindLiteral
+
+ def __init__(self, api_client: Optional[ApiClient], executor: ThreadPoolExecutor) -> None:
+ self.executor = executor
+ self.api_client = api_client
+ self.apps = client.AppsV1Api(api_client=self.api_client)
+ self.custom_objects = client.CustomObjectsApi(api_client=self.api_client)
+ self.batch = client.BatchV1Api(api_client=self.api_client)
+ self.core = client.CoreV1Api(api_client=self.api_client)
+
+ @abc.abstractmethod
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ pass
+
+ async def all_namespaces_request_async(self, label_selector: str) -> Any:
+ """Default async implementation executes the request in a thread pool."""
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(
+ self.executor,
+ lambda: self.all_namespaces_request(
+ label_selector=label_selector,
+ ),
+ )
+
+ @abc.abstractmethod
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ pass
+
+ async def namespaced_request_async(self, namespace: str, label_selector: str) -> Any:
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(
+ self.executor,
+ lambda: self.namespaced_request(
+ namespace=namespace,
+ label_selector=label_selector,
+ ),
+ )
+
+ async def extract_containers(self, item: Any) -> Iterable[V1Container]:
+ return item.spec.template.spec.containers
+
+ def filter(self, item: Any) -> bool:
+ return True
+
+ async def list_pods(self, object: K8sWorkload) -> list[PodData]:
+ loop = asyncio.get_running_loop()
+
+ if object.selector is None:
+ return []
+
+ selector = self._build_selector_query(object.selector)
+ if selector is None:
+ return []
+
+ ret: V1PodList = await loop.run_in_executor(
+ self.executor,
+ lambda: self.core.list_namespaced_pod(
+ namespace=object._api_resource.metadata.namespace, label_selector=selector
+ ),
+ )
+
+ return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
+
+ @classmethod
+ def _get_match_expression_filter(cls, expression: Any) -> str:
+ if expression.operator.lower() == "exists":
+ return expression.key
+ elif expression.operator.lower() == "doesnotexist":
+ return f"!{expression.key}"
+
+ values = ",".join(expression.values)
+ return f"{expression.key} {expression.operator} ({values})"
+
+ @classmethod
+ def _build_selector_query(cls, selector: Any) -> Union[str, None]:
+ label_filters = []
+
+ if selector.match_labels is not None:
+ label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()]
+
+ if selector.match_expressions is not None:
+ label_filters += [
+ cls._get_match_expression_filter(expression) for expression in selector.match_expressions
+ ]
+
+ if label_filters == []:
+ # NOTE: This might mean that we have DeploymentConfig,
+ # which uses ReplicationController and it has a dict like matchLabels
+ if len(selector) != 0:
+ label_filters += [f"{label[0]}={label[1]}" for label in selector.items()]
+ else:
+ return None
+
+ return ",".join(label_filters)
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py
new file mode 100644
index 0000000..d999c65
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/cronjobs.py
@@ -0,0 +1,68 @@
+import asyncio
+from collections import defaultdict
+import logging
+from typing import Any, Iterable
+
+from kubernetes.client.models import V1Container, V1Job, V1PodList # type: ignore
+
+from robusta_krr.core.models.objects import K8sWorkload, PodData
+
+from .base import BaseKindLoader
+
+logger = logging.getLogger("krr")
+
+
+class CronJobLoader(BaseKindLoader):
+ kind = "CronJob"
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ self._jobs: dict[str, list[V1Job]] = {}
+ self._jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return self.batch.list_cron_job_for_all_namespaces(label_selector=label_selector, watch=False)
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return self.batch.list_namespaced_cron_job(namespace=namespace, label_selector=label_selector, watch=False)
+
+ async def extract_containers(self, item: Any) -> Iterable[V1Container]:
+ return item.spec.job_template.spec.template.spec.containers
+
+ async def list_pods(self, object: K8sWorkload) -> list[PodData]:
+ loop = asyncio.get_running_loop()
+
+ namespace_jobs = await self._list_jobs(object.namespace)
+ ownered_jobs_uids = [
+ job.metadata.uid
+ for job in namespace_jobs
+ if any(
+ owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid
+ for owner in job.metadata.owner_references or []
+ )
+ ]
+ selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})"
+
+ ret: V1PodList = await loop.run_in_executor(
+ self.executor,
+ lambda: self.core.list_namespaced_pod(
+ namespace=object._api_resource.metadata.namespace, label_selector=selector
+ ),
+ )
+
+ return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
+
+ async def _list_jobs(self, namespace: str) -> list[V1Job]:
+ if namespace not in self._jobs:
+ loop = asyncio.get_running_loop()
+
+ async with self._jobs_loading_locks[namespace]:
+ logging.debug(f"Loading jobs for cronjobs in {namespace}")
+ ret = await loop.run_in_executor(
+ self.executor,
+ lambda: self.batch.list_namespaced_job(namespace=namespace),
+ )
+ self._jobs[namespace] = ret.items
+
+ return self._jobs[namespace]
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py
new file mode 100644
index 0000000..1005bed
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/daemonsets.py
@@ -0,0 +1,13 @@
+from typing import Any
+
+from .base import BaseKindLoader
+
+
+class DaemonSetLoader(BaseKindLoader):
+ kind = "DaemonSet"
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return self.apps.list_daemon_set_for_all_namespaces(label_selector=label_selector, watch=False)
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return self.apps.list_namespaced_daemon_set(namespace=namespace, label_selector=label_selector, watch=False)
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py
new file mode 100644
index 0000000..33b64d9
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deploymentconfigs.py
@@ -0,0 +1,38 @@
+import logging
+from typing import Any
+
+from robusta_krr.utils.object_like_dict import ObjectLikeDict
+
+from .base import BaseKindLoader
+
+logger = logging.getLogger("krr")
+
+
+class DeploymentConfigLoader(BaseKindLoader):
+ kind = "DeploymentConfig"
+
+ # NOTE: Using custom objects API returns dicts, but all other APIs return objects
+ # We need to handle this difference using a small wrapper
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return ObjectLikeDict(
+ self.custom_objects.list_cluster_custom_object(
+ group="apps.openshift.io",
+ version="v1",
+ plural="deploymentconfigs",
+ label_selector=label_selector,
+ watch=False,
+ )
+ )
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return ObjectLikeDict(
+ self.custom_objects.list_namespaced_custom_object(
+ group="apps.openshift.io",
+ version="v1",
+ plural="deploymentconfigs",
+ namespace=namespace,
+ label_selector=label_selector,
+ watch=False,
+ )
+ )
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py
new file mode 100644
index 0000000..fc202b5
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/deployments.py
@@ -0,0 +1,13 @@
+from typing import Any
+
+from .base import BaseKindLoader
+
+
+class DeploymentLoader(BaseKindLoader):
+ kind = "Deployment"
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return self.apps.list_deployment_for_all_namespaces(label_selector=label_selector, watch=False)
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return self.apps.list_namespaced_deployment(namespace=namespace, label_selector=label_selector, watch=False)
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py
new file mode 100644
index 0000000..87ce5a7
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/jobs.py
@@ -0,0 +1,18 @@
+from typing import Any
+
+from .base import BaseKindLoader
+
+
+class JobLoader(BaseKindLoader):
+ kind = "Job"
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return self.batch.list_job_for_all_namespaces(label_selector=label_selector, watch=False)
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return self.batch.list_namespaced_job(namespace=namespace, label_selector=label_selector, watch=False)
+
+ def filter(self, item: Any) -> bool:
+ # NOTE: If the job has ownerReference and it is a CronJob,
+ # then we should skip it, as it is a part of the CronJob
+ return not any(owner.kind == "CronJob" for owner in item.metadata.owner_references or [])
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py
new file mode 100644
index 0000000..0148745
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/rollouts.py
@@ -0,0 +1,61 @@
+import asyncio
+import logging
+from typing import Any, Iterable
+
+from kubernetes.client.models import V1Container # type: ignore
+
+from robusta_krr.utils.object_like_dict import ObjectLikeDict
+
+from .base import BaseKindLoader
+
+logger = logging.getLogger("krr")
+
+
+class RolloutLoader(BaseKindLoader):
+ kind = "Rollout"
+
+ # NOTE: Using custom objects API returns dicts, but all other APIs return objects
+ # We need to handle this difference using a small wrapper
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return ObjectLikeDict(
+ self.custom_objects.list_cluster_custom_object(
+ group="argoproj.io",
+ version="v1alpha1",
+ plural="rollouts",
+ label_selector=label_selector,
+ watch=False,
+ )
+ )
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return ObjectLikeDict(
+ self.custom_objects.list_namespaced_custom_object(
+ group="argoproj.io",
+ version="v1alpha1",
+ plural="rollouts",
+ namespace=namespace,
+ label_selector=label_selector,
+ watch=False,
+ )
+ )
+
+ async def extract_containers(self, item: Any) -> Iterable[V1Container]:
+ if item.spec.template is not None:
+ return item.spec.template.spec.containers
+
+ logging.debug(
+ f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}"
+ )
+
+ # Template can be None and object might have workloadRef
+ workloadRef = item.spec.workloadRef
+ if workloadRef is not None:
+ loop = asyncio.get_running_loop()
+ ret = await loop.run_in_executor(
+ self.executor,
+ lambda: self.apps.read_namespaced_deployment(namespace=item.metadata.namespace, name=workloadRef.name),
+ )
+ return ret.spec.template.spec.containers
+
+ return []
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py
new file mode 100644
index 0000000..069f2c6
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/loaders/statefulsets.py
@@ -0,0 +1,13 @@
+from typing import Any
+
+from .base import BaseKindLoader
+
+
+class StatefulSetLoader(BaseKindLoader):
+ kind = "StatefulSet"
+
+ def all_namespaces_request(self, label_selector: str) -> Any:
+ return self.apps.list_stateful_set_for_all_namespaces(label_selector=label_selector, watch=False)
+
+ def namespaced_request(self, namespace: str, label_selector: str) -> Any:
+ return self.apps.list_namespaced_stateful_set(namespace=namespace, label_selector=label_selector, watch=False)
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py
new file mode 100644
index 0000000..d2a9883
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus.py
@@ -0,0 +1,10 @@
+import logging
+from .base import BaseWorkloadLoader
+
+
+logger = logging.getLogger("krr")
+
+
+class PrometheusWorkloadLoader(BaseWorkloadLoader):
+ # TODO: Implement PrometheusWorkloadLoader
+ pass
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index db49392..c449bca 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -11,7 +11,7 @@ from kubernetes.client.exceptions import ApiException
from prometrix import MetricsNotFound, PrometheusNotFound
from robusta_krr.core.models.config import settings
-from robusta_krr.core.models.objects import K8sObjectData, PodData
+from robusta_krr.core.models.objects import K8sWorkload, PodData
from .metrics_service.prometheus_metrics_service import PrometheusMetricsService
from .metrics_service.thanos_metrics_service import ThanosMetricsService
@@ -82,7 +82,7 @@ class PrometheusMetricsLoader:
) -> Optional[tuple[datetime.datetime, datetime.datetime]]:
return await self.loader.get_history_range(history_duration)
- async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]:
+ async def load_pods(self, object: K8sWorkload, period: datetime.timedelta) -> list[PodData]:
try:
return await self.loader.load_pods(object, period)
except Exception as e:
@@ -91,7 +91,7 @@ class PrometheusMetricsLoader:
async def gather_data(
self,
- object: K8sObjectData,
+ object: K8sWorkload,
strategy: BaseStrategy,
period: datetime.timedelta,
*,
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py
index f4265e2..d55af00 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base.py
@@ -15,7 +15,7 @@ from prometrix import CustomPrometheusConnect
from robusta_krr.core.abstract.metrics import BaseMetric
from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.models.config import settings
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
class PrometheusSeries(TypedDict):
@@ -86,7 +86,7 @@ class PrometheusMetric(BaseMetric):
return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"'
@abc.abstractmethod
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
"""
This method should be implemented by all subclasses to provide a query string to fetch metrics.
@@ -148,7 +148,7 @@ class PrometheusMetric(BaseMetric):
return await loop.run_in_executor(self.executor, lambda: self._query_prometheus_sync(data))
async def load_data(
- self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
+ self, object: K8sWorkload, period: datetime.timedelta, step: datetime.timedelta
) -> PodsTimeData:
"""
Asynchronous method that loads metric data for a specific object.
diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu.py b/robusta_krr/core/integrations/prometheus/metrics/cpu.py
index c7a2c73..7257654 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/cpu.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/cpu.py
@@ -1,4 +1,4 @@
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
from .base import PrometheusMetric, QueryType
@@ -10,7 +10,7 @@ class CPULoader(PrometheusMetric):
query_type: QueryType = QueryType.QueryRange
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
@@ -36,7 +36,7 @@ def PercentileCPULoader(percentile: float) -> type[PrometheusMetric]:
raise ValueError("percentile must be between 0 and 100")
class PercentileCPULoader(PrometheusMetric):
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
@@ -64,7 +64,7 @@ class CPUAmountLoader(PrometheusMetric):
A metric loader for loading CPU points count.
"""
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory.py b/robusta_krr/core/integrations/prometheus/metrics/memory.py
index 85031bc..ca324f6 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/memory.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/memory.py
@@ -1,4 +1,4 @@
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
from .base import PrometheusMetric, QueryType
@@ -10,7 +10,7 @@ class MemoryLoader(PrometheusMetric):
query_type: QueryType = QueryType.QueryRange
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
@@ -30,7 +30,7 @@ class MaxMemoryLoader(PrometheusMetric):
A metric loader for loading max memory usage metrics.
"""
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
@@ -53,7 +53,7 @@ class MemoryAmountLoader(PrometheusMetric):
A metric loader for loading memory points count.
"""
- def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
+ def get_query(self, object: K8sWorkload, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return f"""
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
index a3b0ee0..ae34a7a 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py
@@ -7,7 +7,7 @@ from kubernetes.client.api_client import ApiClient
from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.models.config import settings
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
from ..metrics import PrometheusMetric
@@ -39,7 +39,7 @@ class MetricsService(abc.ABC):
@abc.abstractmethod
async def gather_data(
self,
- object: K8sObjectData,
+ object: K8sWorkload,
LoaderClass: type[PrometheusMetric],
period: datetime.timedelta,
step: datetime.timedelta = datetime.timedelta(minutes=30),
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 423a29a..5347422 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -11,7 +11,7 @@ from prometrix import PrometheusNotFound, get_custom_prometheus_connect
from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.integrations import openshift
from robusta_krr.core.models.config import settings
-from robusta_krr.core.models.objects import K8sObjectData, PodData
+from robusta_krr.core.models.objects import K8sWorkload, PodData
from robusta_krr.utils.batched import batched
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery
@@ -177,7 +177,7 @@ class PrometheusMetricsService(MetricsService):
async def gather_data(
self,
- object: K8sObjectData,
+ object: K8sWorkload,
LoaderClass: type[PrometheusMetric],
period: timedelta,
step: timedelta = timedelta(minutes=30),
@@ -202,7 +202,7 @@ class PrometheusMetricsService(MetricsService):
return data
- async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]:
+ async def load_pods(self, object: K8sWorkload, period: timedelta) -> list[PodData]:
"""
List pods related to the object and add them to the object's pods list.
Args:
diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py
index ff6142a..6e03967 100644
--- a/robusta_krr/core/models/config.py
+++ b/robusta_krr/core/models/config.py
@@ -23,6 +23,7 @@ class Config(pd.BaseSettings):
clusters: Union[list[str], Literal["*"], None] = None
kubeconfig: Optional[str] = None
+ workload_loader: Literal["kubeapi", "prometheus"] = pd.Field("kubeapi")
impersonate_user: Optional[str] = None
impersonate_group: Optional[str] = None
namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py
index e4b400d..5dac123 100644
--- a/robusta_krr/core/models/objects.py
+++ b/robusta_krr/core/models/objects.py
@@ -35,7 +35,7 @@ PodWarning = Literal[
]
-class K8sObjectData(pd.BaseModel):
+class K8sWorkload(pd.BaseModel):
# NOTE: Here None means that we are running inside the cluster
cluster: Optional[str]
name: str
@@ -80,7 +80,7 @@ class K8sObjectData(pd.BaseModel):
else:
return self._api_resource.spec.selector
- def split_into_batches(self, n: int) -> list[K8sObjectData]:
+ def split_into_batches(self, n: int) -> list[K8sWorkload]:
"""
Batch this object into n objects, splitting the pods into batches of size n.
"""
@@ -89,7 +89,7 @@ class K8sObjectData(pd.BaseModel):
return [self]
return [
- K8sObjectData(
+ K8sWorkload(
cluster=self.cluster,
name=self.name,
container=self.container,
diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py
index 2d5ffbc..1735125 100644
--- a/robusta_krr/core/models/result.py
+++ b/robusta_krr/core/models/result.py
@@ -6,7 +6,7 @@ import pydantic as pd
from robusta_krr.core.abstract import formatters
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
from robusta_krr.core.models.severity import Severity
@@ -22,12 +22,12 @@ class ResourceRecommendation(pd.BaseModel):
class ResourceScan(pd.BaseModel):
- object: K8sObjectData
+ object: K8sWorkload
recommended: ResourceRecommendation
severity: Severity
@classmethod
- def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan:
+ def calculate(cls, object: K8sWorkload, recommendation: ResourceAllocations) -> ResourceScan:
recommendation_processed = ResourceRecommendation(requests={}, limits={}, info={})
for resource_type in ResourceType:
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 8e08521..de3b3c7 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -12,10 +12,10 @@ from rich.console import Console
from slack_sdk import WebClient
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
-from robusta_krr.core.integrations.kubernetes import KubernetesLoader
+from robusta_krr.core.integrations.kubernetes import ClusterWorkloadsLoader
from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader
from robusta_krr.core.models.config import settings
-from robusta_krr.core.models.objects import K8sObjectData
+from robusta_krr.core.models.objects import K8sWorkload
from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData
from robusta_krr.utils.intro import load_intro_message
from robusta_krr.utils.progress_bar import ProgressBar
@@ -40,7 +40,7 @@ class Runner:
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)
def __init__(self) -> None:
- self._k8s_loader = KubernetesLoader()
+ self._k8s_loader = ClusterWorkloadsLoader()
self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = settings.create_strategy()
@@ -165,7 +165,7 @@ class Runner:
for resource, recommendation in result.items()
}
- async def _calculate_object_recommendations(self, object: K8sObjectData) -> Optional[RunResult]:
+ async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]:
prometheus_loader = self._get_prometheus_loader(object.cluster)
if prometheus_loader is None:
@@ -239,7 +239,7 @@ class Runner:
}
)
- async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> Optional[ResourceScan]:
+ async def _gather_object_allocations(self, k8s_object: K8sWorkload) -> Optional[ResourceScan]:
recommendation = await self._calculate_object_recommendations(k8s_object)
self.__progressbar.progress()
@@ -275,7 +275,7 @@ class Runner:
await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters])
with ProgressBar(title="Calculating Recommendation") as self.__progressbar:
- workloads = await self._k8s_loader.list_scannable_objects(clusters)
+ workloads = await self._k8s_loader.list_workloads(clusters)
scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads])
successful_scans = [scan for scan in scans if scan is not None]
diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py
index 64e58ac..db15f6c 100644
--- a/robusta_krr/strategies/simple.py
+++ b/robusta_krr/strategies/simple.py
@@ -5,7 +5,7 @@ import pydantic as pd
from robusta_krr.core.abstract.strategies import (
BaseStrategy,
- K8sObjectData,
+ K8sWorkload,
MetricsPodData,
PodsTimeData,
ResourceRecommendation,
@@ -78,7 +78,7 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
return [PercentileCPULoader(self.settings.cpu_percentile), MaxMemoryLoader, CPUAmountLoader, MemoryAmountLoader]
def __calculate_cpu_proposal(
- self, history_data: MetricsPodData, object_data: K8sObjectData
+ self, history_data: MetricsPodData, object_data: K8sWorkload
) -> ResourceRecommendation:
data = history_data["PercentileCPULoader"]
@@ -98,7 +98,7 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
return ResourceRecommendation(request=cpu_usage, limit=None)
def __calculate_memory_proposal(
- self, history_data: MetricsPodData, object_data: K8sObjectData
+ self, history_data: MetricsPodData, object_data: K8sWorkload
) -> ResourceRecommendation:
data = history_data["MaxMemoryLoader"]
@@ -117,7 +117,7 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
memory_usage = self.settings.calculate_memory_proposal(data)
return ResourceRecommendation(request=memory_usage, limit=memory_usage)
- def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult:
+ def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult:
return {
ResourceType.CPU: self.__calculate_cpu_proposal(history_data, object_data),
ResourceType.Memory: self.__calculate_memory_proposal(history_data, object_data),