summaryrefslogtreecommitdiff
path: root/robusta_krr
diff options
context:
space:
mode:
authorLeaveMyYard <zhukovpavel2001@gmail.com>2024-04-24 16:05:03 +0300
committerLeaveMyYard <zhukovpavel2001@gmail.com>2024-04-24 16:05:03 +0300
commitbf909780f67a58c8647c689a72e2044e53a84e6c (patch)
treee543a8667bca4cb713fc8119e6e99e39bc774ba4 /robusta_krr
parentc7ad1cde09e79d4821383ceb5dd32fd0471d427d (diff)
Implement remaining kinds in prometheus workload loader
Diffstat (limited to 'robusta_krr')
-rw-r--r--robusta_krr/core/integrations/kubernetes/__init__.py24
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py4
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/base.py7
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py16
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py28
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py18
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py104
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py50
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py123
-rw-r--r--robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py65
-rw-r--r--robusta_krr/core/integrations/prometheus/__init__.py2
-rw-r--r--robusta_krr/core/integrations/prometheus/connector.py (renamed from robusta_krr/core/integrations/prometheus/loader.py)5
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics/base.py3
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py12
-rw-r--r--robusta_krr/core/models/objects.py3
-rw-r--r--robusta_krr/core/runner.py15
16 files changed, 305 insertions, 174 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py
index 2ff4458..99b4e3c 100644
--- a/robusta_krr/core/integrations/kubernetes/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/__init__.py
@@ -17,7 +17,7 @@ from kubernetes.client.models import (
V2HorizontalPodAutoscaler,
)
-from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
+from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
@@ -37,8 +37,8 @@ class ClusterConnector:
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)
def __init__(self) -> None:
- self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {}
- self._metrics_service_loaders_error_logged: set[Exception] = set()
+ self._prometheus_connectors: dict[Optional[str], Union[PrometheusConnector, Exception]] = {}
+ self._connector_errors: set[Exception] = set()
async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.
@@ -80,22 +80,22 @@ class ClusterConnector:
return [context["name"] for context in contexts if context["name"] in settings.clusters]
- def get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]:
- if cluster not in self._metrics_service_loaders:
+ def get_prometheus(self, cluster: Optional[str]) -> Optional[PrometheusConnector]:
+ if cluster not in self._prometheus_connectors:
try:
- self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster)
+ logger.debug(f"Creating Prometheus connector for cluster {cluster}")
+ self._prometheus_connectors[cluster] = PrometheusConnector(cluster=cluster)
except Exception as e:
- self._metrics_service_loaders[cluster] = e
+ self._prometheus_connectors[cluster] = e
- result = self._metrics_service_loaders[cluster]
+ result = self._prometheus_connectors[cluster]
if isinstance(result, self.EXPECTED_EXCEPTIONS):
- if result not in self._metrics_service_loaders_error_logged:
- self._metrics_service_loaders_error_logged.add(result)
+ if result not in self._connector_errors:
+ self._connector_errors.add(result)
logger.error(str(result))
return None
elif isinstance(result, Exception):
raise result
-
return result
def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]:
@@ -103,7 +103,7 @@ class ClusterConnector:
if settings.workload_loader == "kubeapi":
return KubeAPIWorkloadLoader(cluster=cluster)
elif settings.workload_loader == "prometheus":
- cluster_loader = self.get_prometheus_loader(cluster)
+ cluster_loader = self.get_prometheus(cluster)
if cluster_loader is not None:
return PrometheusWorkloadLoader(cluster=cluster, metric_loader=cluster_loader)
else:
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
index 9543147..06dd8c0 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/__init__.py
@@ -1,5 +1,5 @@
-from .base import BaseWorkloadLoader
+from .base import BaseWorkloadLoader, IListPodsFallback
from .kube_api import KubeAPIWorkloadLoader
from .prometheus import PrometheusWorkloadLoader
-__all__ = ["BaseWorkloadLoader", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"] \ No newline at end of file
+__all__ = ["BaseWorkloadLoader", "IListPodsFallback", "KubeAPIWorkloadLoader", "PrometheusWorkloadLoader"] \ No newline at end of file
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
index 8581c1a..11a2783 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/base.py
@@ -1,13 +1,18 @@
import abc
-from typing import Optional
from robusta_krr.core.models.objects import K8sWorkload, PodData
class BaseWorkloadLoader(abc.ABC):
+ """A base class for workload loaders."""
+
@abc.abstractmethod
async def list_workloads(self) -> list[K8sWorkload]:
pass
+
+class IListPodsFallback(abc.ABC):
+ """This is an interface that a workload loader can implement to have a fallback method to list pods."""
+
@abc.abstractmethod
async def list_pods(self, object: K8sWorkload) -> list[PodData]:
pass
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
index d23f7a4..a7470d7 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/kube_api/__init__.py
@@ -12,7 +12,7 @@ from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sWorkload, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
-from ..base import BaseWorkloadLoader
+from ..base import BaseWorkloadLoader, IListPodsFallback
from .loaders import (
BaseKindLoader,
CronJobLoader,
@@ -29,7 +29,7 @@ logger = logging.getLogger("krr")
HPAKey = tuple[str, str, str]
-class KubeAPIWorkloadLoader(BaseWorkloadLoader):
+class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback):
workload_loaders: list[BaseKindLoader] = [
DeploymentLoader,
RolloutLoader,
@@ -176,10 +176,12 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader):
res = await self._list_namespaced_or_global_objects(
kind="HPA-v1",
all_namespaces_request=lambda **kwargs: loop.run_in_executor(
- self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
+ self.executor,
+ lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
),
namespaced_request=lambda **kwargs: loop.run_in_executor(
- self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs),
+ self.executor,
+ lambda: self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs),
),
)
@@ -205,10 +207,12 @@ class KubeAPIWorkloadLoader(BaseWorkloadLoader):
res = await self._list_namespaced_or_global_objects(
kind="HPA-v2",
all_namespaces_request=lambda **kwargs: loop.run_in_executor(
- self.executor, self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs)
+ self.executor,
+ lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs),
),
namespaced_request=lambda **kwargs: loop.run_in_executor(
- self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs),
+ self.executor,
+ lambda: self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs),
),
)
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
index b277bc2..bfe6853 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/__init__.py
@@ -2,30 +2,36 @@ import asyncio
import itertools
import logging
+from collections import Counter
-from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
+
+from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
from robusta_krr.core.models.config import settings
-from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService
-from robusta_krr.core.models.objects import K8sWorkload, PodData
+from robusta_krr.core.models.objects import K8sWorkload
from ..base import BaseWorkloadLoader
-from .loaders import BaseKindLoader, DeploymentLoader
+from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader
logger = logging.getLogger("krr")
class PrometheusWorkloadLoader(BaseWorkloadLoader):
- workloads: list[type[BaseKindLoader]] = [DeploymentLoader]
+ workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader]
- def __init__(self, cluster: str, metric_loader: PrometheusMetricsLoader) -> None:
+ def __init__(self, cluster: str, metric_loader: PrometheusConnector) -> None:
self.cluster = cluster
self.metric_service = metric_loader
self.loaders = [loader(metric_loader) for loader in self.workloads]
async def list_workloads(self) -> list[K8sWorkload]:
- return itertools.chain(await asyncio.gather(*[loader.list_workloads(settings.namespaces, "") for loader in self.loaders]))
+ workloads = list(
+ itertools.chain(
+ *await asyncio.gather(*[loader.list_workloads(settings.namespaces) for loader in self.loaders])
+ )
+ )
+
+ kind_counts = Counter([workload.kind for workload in workloads])
+ for kind, count in kind_counts.items():
+ logger.info(f"Found {count} {kind} in {self.cluster}")
- async def list_pods(self, object: K8sWorkload) -> list[PodData]:
- # This should not be implemented, as implementation will repeat PrometheusMetricsLoader.load_pods
- # As this method is ment to be a fallback, repeating the same logic will not be beneficial
- raise NotImplementedError
+ return workloads
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py
index 6ca6efd..18b9a3d 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/__init__.py
@@ -1,19 +1,9 @@
from .base import BaseKindLoader
-from .cronjobs import CronJobLoader
-from .daemonsets import DaemonSetLoader
-from .deploymentconfigs import DeploymentConfigLoader
-from .deployments import DeploymentLoader
-from .jobs import JobLoader
-from .rollouts import RolloutLoader
-from .statefulsets import StatefulSetLoader
+from .double_parent import DoubleParentLoader
+from .simple_parent import SimpleParentLoader
__all__ = [
"BaseKindLoader",
- "CronJobLoader",
- "DeploymentLoader",
- "DaemonSetLoader",
- "DeploymentConfigLoader",
- "JobLoader",
- "RolloutLoader",
- "StatefulSetLoader",
+ "DoubleParentLoader",
+ "SimpleParentLoader",
] \ No newline at end of file
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
index a767eb2..4367d80 100644
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/base.py
@@ -1,5 +1,6 @@
import abc
import asyncio
+from collections import defaultdict
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Iterable, Literal, Optional, Union
@@ -15,8 +16,10 @@ from kubernetes.client.models import ( # type: ignore
V1PodList,
V1StatefulSet,
)
+from robusta_krr.core.models.config import settings
-from robusta_krr.core.integrations.prometheus.loader import PrometheusMetricsLoader
+from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector
+from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric
from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType
from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData
@@ -32,29 +35,44 @@ class BaseKindLoader(abc.ABC):
It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects.
"""
- kind: KindLiteral
+ kinds: list[KindLiteral] = []
- def __init__(self, metrics_loader: PrometheusMetricsLoader) -> None:
- self.metrics_loader = metrics_loader
+ def __init__(self, connector: PrometheusConnector) -> None:
+ self.connector = connector
+ self.cluster_selector = PrometheusMetric.get_prometheus_cluster_label()
+
+ @property
+ def kinds_to_scan(self) -> list[KindLiteral]:
+ return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds
@abc.abstractmethod
- def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]:
+ def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]:
pass
- async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations:
- limits = await self.metrics_loader.loader.query(
- "avg by(resource) (kube_pod_container_resource_limits{"
- f'namespace="{namespace}", '
- f'pod=~"{pod_selector}", '
- f'container="{container_name}"'
- "})"
+ async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations:
+ limits = await self.connector.loader.query(
+ f"""
+ avg by(resource) (
+ kube_pod_container_resource_limits{{
+ {self.cluster_selector}
+ namespace="{namespace}",
+ pod=~"{'|'.join(pods)}",
+ container="{container_name}"
+ }}
+ )
+ """
)
- requests = await self.metrics_loader.loader.query(
- "avg by(resource) (kube_pod_container_resource_requests{"
- f'namespace="{namespace}", '
- f'pod=~"{pod_selector}", '
- f'container="{container_name}"'
- "})"
+ requests = await self.connector.loader.query(
+ f"""
+ avg by(resource) (
+ kube_pod_container_resource_requests{{
+ {self.cluster_selector}
+ namespace="{namespace}",
+ pod=~"{'|'.join(pods)}",
+ container="{container_name}"
+ }}
+ )
+ """
)
requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
@@ -71,56 +89,16 @@ class BaseKindLoader(abc.ABC):
requests_values[ResourceType.Memory] = float(request["value"][1])
return ResourceAllocations(requests=requests_values, limits=limits_values)
- async def __build_from_owner(
- self, namespace: str, app_name: str, containers: list[str], pod_names: list[str]
- ) -> list[K8sWorkload]:
- return [
- K8sWorkload(
- cluster=None,
- namespace=namespace,
- name=app_name,
- kind="Deployment",
- container=container_name,
- allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find
- pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
- )
- for container_name in containers
- ]
-
- async def _list_containers(self, namespace: str, pod_selector: str) -> list[str]:
- containers = await self.metrics_loader.loader.query(
+ async def _list_containers_in_pods(self, pods: list[str]) -> set[str]:
+ containers = await self.connector.loader.query(
f"""
count by (container) (
kube_pod_container_info{{
- namespace="{namespace}",
- pod=~"{pod_selector}"
+ {self.cluster_selector}
+ pod=~"{'|'.join(pods)}"
}}
)
"""
)
- return [container["metric"]["container"] for container in containers]
- async def _list_containers_in_pods(
- self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str
- ) -> list[K8sWorkload]:
- if pod_owner_kind == "ReplicaSet":
- # owner_name is ReplicaSet names
- pods = await self.metrics_loader.loader.query(
- f"""
- count by (owner_name, replicaset, pod) (
- kube_pod_owner{{
- namespace="{namespace}",
- owner_name=~"{owner_name}", '
- owner_kind="ReplicaSet"
- }}
- )
- """
- )
- if pods is None or len(pods) == 0:
- return [] # no container
- # [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']},
- # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}]
- pod_names = [pod["metric"]["pod"] for pod in pods]
- container_names = await self._list_containers(namespace, "|".join(pod_names))
- return await self.__build_from_owner(namespace, app_name, container_names, pod_names)
- return []
+ return {container["metric"]["container"] for container in containers}
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py
deleted file mode 100644
index c895d21..0000000
--- a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/deployments.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import logging
-from collections import defaultdict
-import itertools
-import asyncio
-from typing import Literal, Optional, Union
-
-from robusta_krr.core.models.objects import K8sWorkload
-
-from .base import BaseKindLoader
-
-logger = logging.getLogger("krr")
-
-
-class DeploymentLoader(BaseKindLoader):
- kind = "Deployment"
-
- async def list_workloads(self, namespaces: Union[list[str], Literal["*"]], label_selector: str) -> list[K8sWorkload]:
- logger.debug(
- f"Listing deployments in namespace({namespaces})"
- )
- ns = "|".join(namespaces)
- replicasets = await self.metrics_loader.loader.query(
- f"""
- count by (namespace, owner_name, replicaset) (
- kube_replicaset_owner{{
- namespace=~"{ns}",
- owner_kind="Deployment",
- }}
- )
- """
- )
- # groupBy: 'ns/owner_name' => [{metadata}...]
- pod_owner_kind = "ReplicaSet"
- replicaset_dict = defaultdict(list)
- for replicaset in replicasets:
- replicaset_dict[replicaset["metric"]["namespace"] + "/" + replicaset["metric"]["owner_name"]].append(
- replicaset["metric"]
- )
- objects = await asyncio.gather(
- *[
- self._list_containers_in_pods(
- replicas[0]["owner_name"],
- pod_owner_kind,
- replicas[0]["namespace"],
- "|".join(list(map(lambda metric: metric["replicaset"], replicas))),
- )
- for replicas in replicaset_dict.values()
- ]
- )
- return list(itertools.chain(*objects))
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py
new file mode 100644
index 0000000..b4ce13e
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/double_parent.py
@@ -0,0 +1,123 @@
+import logging
+from collections import defaultdict
+import itertools
+import asyncio
+from typing import Literal, Union
+
+from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData
+
+from .base import BaseKindLoader
+
+logger = logging.getLogger("krr")
+
+SubownerLiteral = Literal["ReplicaSet", "ReplicationController", "Job"]
+
+
+class DoubleParentLoader(BaseKindLoader):
+ kinds = ["Deployment", "Rollout", "DeploymentConfig", "CronJob"]
+
+ kind_subowner_map: dict[KindLiteral, SubownerLiteral] = {
+ "Deployment": "ReplicaSet",
+ "Rollout": "ReplicaSet",
+ "DeploymentConfig": "ReplicationController",
+ "CronJob": "Job",
+ }
+
+ async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]:
+ return list(
+ itertools.chain(
+ *await asyncio.gather(
+ *[
+ self.list_workloads_by_subowner(namespaces, subowner)
+ for subowner in set(self.kind_subowner_map.values())
+ ]
+ )
+ )
+ )
+
+ async def list_workloads_by_subowner(
+ self, namespaces: Union[list[str], Literal["*"]], subowner_kind: SubownerLiteral
+ ) -> list[K8sWorkload]:
+ kinds = [kind for kind in self.kinds_to_scan if self.kind_subowner_map[kind] == subowner_kind]
+
+ if kinds == []:
+ return []
+
+ logger.debug(f"Listing {', '.join(kinds)}")
+ # NOTE: kube-system is excluded if we scan all namespaces
+ namespace_selector = (
+ ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"'
+ )
+
+ metric_name = f"kube_{subowner_kind.lower()}_owner"
+ subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name"
+
+ # Replica is for ReplicaSet and/or ReplicationController
+ subowners = await self.connector.loader.query(
+ f"""
+ count by (namespace, owner_name, {subowner_label}, owner_kind) (
+ {metric_name} {{
+ {self.cluster_selector}
+ {namespace_selector},
+ owner_kind=~"{'|'.join(kinds)}"
+ }}
+ )
+ """
+ )
+ # groupBy: (namespace, owner_name, owner_kind) => [replicaset,...]
+ replicas_by_owner = defaultdict(list)
+ for subowner in subowners:
+ metric = subowner["metric"]
+ key = metric["namespace"], metric["owner_name"], metric["owner_kind"]
+ replicas_by_owner[key].append(metric[subowner_label])
+
+ return list(
+ itertools.chain(
+ *await asyncio.gather(
+ *[
+ self._list_pods_of_subowner(
+ namespace,
+ name,
+ kind,
+ subowner_kind,
+ subowners,
+ )
+ for (namespace, name, kind), subowners in replicas_by_owner.items()
+ ]
+ )
+ )
+ )
+
+ async def _list_pods_of_subowner(
+ self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str]
+ ) -> list[K8sWorkload]:
+ pods = await self.connector.loader.query(
+ f"""
+ count by (namespace, owner_name, owner_kind, pod) (
+ kube_pod_owner{{
+ {self.cluster_selector}
+ namespace="{namespace}",
+ owner_name=~"{'|'.join(subowner_names)}",
+ owner_kind="{subowner_kind}"
+ }}
+ )
+ """
+ )
+ if pods is None or len(pods) == 0:
+ return []
+
+ pod_names = [pod["metric"]["pod"] for pod in pods]
+ containers = await self._list_containers_in_pods(pod_names)
+
+ return [
+ K8sWorkload(
+ cluster=self.connector.cluster,
+ namespace=namespace,
+ name=name,
+ kind=kind,
+ container=container_name,
+ allocations=await self._parse_allocation(namespace, pod_names, container_name), # find
+ pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
+ )
+ for container_name in containers
+ ]
diff --git a/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py
new file mode 100644
index 0000000..33a8c5a
--- /dev/null
+++ b/robusta_krr/core/integrations/kubernetes/workload_loader/prometheus/loaders/simple_parent.py
@@ -0,0 +1,65 @@
+import asyncio
+from collections import defaultdict
+import logging
+from typing import Literal, Union
+
+from robusta_krr.core.models.objects import K8sWorkload, PodData
+
+from .base import BaseKindLoader
+
+logger = logging.getLogger("krr")
+
+
+class SimpleParentLoader(BaseKindLoader):
+ kinds = ["DaemonSet", "StatefulSet", "Job"]
+
+ async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]:
+ if self.kinds_to_scan == []:
+ return []
+
+ logger.debug(f"Listing {', '.join(self.kinds_to_scan)}")
+ namespace_selector = (
+ ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"'
+ )
+
+ results = await self.connector.loader.query(
+ f"""
+ count by (namespace, owner_name, owner_kind, pod) (
+ kube_pod_owner{{
+ {self.cluster_selector}
+ {namespace_selector},
+ owner_kind=~"{'|'.join(self.kinds_to_scan)}"
+ }}
+ )
+ """
+ )
+ if results is None or len(results) == 0:
+ return []
+
+ # groupBy: (namespace, owner_name, owner_kind) => [pod, ... ]
+ workloads: defaultdict[tuple[str, str, str], list[str]] = defaultdict(list)
+ for result in results:
+ metric = result["metric"]
+ key = metric["namespace"], metric["owner_name"], metric["owner_kind"]
+ workloads[key].append(metric["pod"])
+
+ workloads_containers = dict(
+ zip(
+ workloads.keys(),
+ await asyncio.gather(*[self._list_containers_in_pods(pods) for pods in workloads.values()]),
+ )
+ )
+
+ return [
+ K8sWorkload(
+ cluster=self.connector.cluster,
+ namespace=namespace,
+ name=name,
+ kind=kind,
+ container=container,
+ allocations=await self._parse_allocation(namespace, pod_names, container), # find
+ pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
+ )
+ for (namespace, name, kind), pod_names in workloads.items()
+ for container in workloads_containers[namespace, name, kind]
+ ]
diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py
index cedf1c0..1ce9d2a 100644
--- a/robusta_krr/core/integrations/prometheus/__init__.py
+++ b/robusta_krr/core/integrations/prometheus/__init__.py
@@ -1,3 +1,3 @@
-from .loader import PrometheusMetricsLoader
+from .connector import PrometheusConnector
from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound
from .prometheus_utils import ClusterNotSpecifiedException
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/connector.py
index eb734ef..ab89b34 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/connector.py
@@ -24,7 +24,7 @@ if TYPE_CHECKING:
logger = logging.getLogger("krr")
-class PrometheusMetricsLoader:
+class PrometheusConnector:
def __init__(self, *, cluster: Optional[str] = None) -> None:
"""
Initializes the Prometheus Loader.
@@ -34,6 +34,7 @@ class PrometheusMetricsLoader:
"""
self.executor = ThreadPoolExecutor(settings.max_workers)
+ self.cluster = cluster
self.api_client = settings.get_kube_client(context=cluster)
loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster)
if loader is None:
@@ -46,7 +47,7 @@ class PrometheusMetricsLoader:
self.loader = loader
- logger.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster")
+ logger.info(f"{self.loader.name()} connected successfully for {cluster or 'inner'} cluster")
def get_metrics_service(
self,
diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py
index d55af00..34957ef 100644
--- a/robusta_krr/core/integrations/prometheus/metrics/base.py
+++ b/robusta_krr/core/integrations/prometheus/metrics/base.py
@@ -74,7 +74,8 @@ class PrometheusMetric(BaseMetric):
if self.pods_batch_size is not None and self.pods_batch_size <= 0:
raise ValueError("pods_batch_size must be positive")
- def get_prometheus_cluster_label(self) -> str:
+ @staticmethod
+ def get_prometheus_cluster_label() -> str:
"""
Generates the cluster label for querying a centralized Prometheus
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 5347422..50b86a6 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -112,10 +112,14 @@ class PrometheusMetricsService(MetricsService):
async def query(self, query: str) -> dict:
loop = asyncio.get_running_loop()
- return await loop.run_in_executor(
- self.executor,
- lambda: self.prometheus.safe_custom_query(query=query)["result"],
- )
+ try:
+ return await loop.run_in_executor(
+ self.executor,
+ lambda: self.prometheus.safe_custom_query(query=query)["result"],
+ )
+ except PrometheusApiClientException as e:
+ logger.error(f"Error while querying Prometheus: {query}")
+ raise e
async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict:
loop = asyncio.get_running_loop()
diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py
index 5dac123..27f1c29 100644
--- a/robusta_krr/core/models/objects.py
+++ b/robusta_krr/core/models/objects.py
@@ -52,6 +52,9 @@ class K8sWorkload(pd.BaseModel):
def __str__(self) -> str:
return f"{self.kind} {self.namespace}/{self.name}/{self.container}"
+ def __repr__(self) -> str:
+ return f"<K8sWorkload {self}>"
+
def __hash__(self) -> int:
return hash(str(self))
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 528fd89..bb109df 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -6,13 +6,13 @@ import warnings
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union
from datetime import timedelta
-from prometrix import PrometheusNotFound
from rich.console import Console
from slack_sdk import WebClient
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
-from robusta_krr.core.integrations.kubernetes import ClusterConnector, KubeAPIWorkloadLoader
-from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader
+from robusta_krr.core.integrations.kubernetes.workload_loader import IListPodsFallback
+from robusta_krr.core.integrations.kubernetes import ClusterConnector
+from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sWorkload
from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData
@@ -144,14 +144,15 @@ class Runner:
}
async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]:
- prometheus_loader = self._get_prometheus_loader(object.cluster)
+ prometheus_loader = self.connector.get_prometheus(object.cluster)
if prometheus_loader is None:
return None
object.pods = await prometheus_loader.load_pods(object, self.strategy.settings.history_timedelta)
- if object.pods == [] and isinstance(self.connector, KubeAPIWorkloadLoader):
- # Fallback to Kubernetes API
+ if object.pods == [] and isinstance(self.connector, IListPodsFallback):
+ # Fallback to IListPodsFallback if Prometheus did not return any pods
+ # IListPodsFallback is implemented by the Kubernetes API connector
object.pods = await self.connector.load_pods(object)
# NOTE: Kubernetes API returned pods, but Prometheus did not
@@ -179,7 +180,7 @@ class Runner:
return self._format_result(result)
async def _check_data_availability(self, cluster: Optional[str]) -> None:
- prometheus_loader = self.connector.get_prometheus_loader(cluster)
+ prometheus_loader = self.connector.get_prometheus(cluster)
if prometheus_loader is None:
return