summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com>2023-07-25 11:57:30 +0300
committerLeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com>2023-07-25 11:57:30 +0300
commitb21f25242016957fac7f4a1fbd917846c4e324ab (patch)
tree33a93c72d9faa141205e1f0ef0661bf81721484d
parentf87550128ddb48a8c2ff083f182784279fc12118 (diff)
Remove k8s API pod loading
-rw-r--r--robusta_krr/core/integrations/kubernetes.py131
-rw-r--r--robusta_krr/core/integrations/prometheus/loader.py2
-rw-r--r--robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py13
3 files changed, 64 insertions, 82 deletions
diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py
index 4d1a5a3..d8f0978 100644
--- a/robusta_krr/core/integrations/kubernetes.py
+++ b/robusta_krr/core/integrations/kubernetes.py
@@ -63,29 +63,31 @@ class ClusterLoader(Configurable):
self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")
- try:
- self.__hpa_list = await self.__list_hpa()
- tasks = [
- self._list_deployments(),
- self._list_rollouts(),
- self._list_all_statefulsets(),
- self._list_all_daemon_set(),
- self._list_all_jobs(),
- ]
-
- for fut in asyncio.as_completed(tasks):
+ self.__hpa_list = await self._try_list_hpa()
+
+ tasks = [
+ self._list_deployments(),
+ self._list_rollouts(),
+ self._list_all_statefulsets(),
+ self._list_all_daemon_set(),
+ self._list_all_jobs(),
+ ]
+
+ for fut in asyncio.as_completed(tasks):
+ try:
object_list = await fut
- for object in object_list:
- if self.config.namespaces == "*" and object.namespace == "kube-system":
- continue
- elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
- continue
- yield object
-
- except Exception as e:
- self.error(f"Error trying to list pods in cluster {self.cluster}: {e}")
- self.debug_exception()
- return
+ except Exception as e:
+ self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}")
+ self.debug_exception()
+ self.error("Will skip this object type and continue.")
+ continue
+
+ for object in object_list:
+ if self.config.namespaces == "*" and object.namespace == "kube-system":
+ continue
+ elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces:
+ continue
+ yield object
@staticmethod
def _get_match_expression_filter(expression) -> str:
@@ -108,22 +110,12 @@ class ClusterLoader(Configurable):
return ",".join(label_filters)
- async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1StatefulSet]) -> list[PodData]:
- selector = self._build_selector_query(resource.spec.selector)
- if selector is None:
- return []
-
- loop = asyncio.get_running_loop()
- ret: V1PodList = await loop.run_in_executor(
- self.executor,
- lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector),
- )
- return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
-
- async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container) -> K8sObjectData:
+ def __build_obj(
+ self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None
+ ) -> K8sObjectData:
name = item.metadata.name
namespace = item.metadata.namespace
- kind = item.__class__.__name__[2:]
+ kind = kind or item.__class__.__name__[2:]
return K8sObjectData(
cluster=self.cluster,
@@ -132,7 +124,6 @@ class ClusterLoader(Configurable):
kind=kind,
container=container.name,
allocations=ResourceAllocations.from_container(container),
- pods=await self.__list_pods(item),
hpa=self.__hpa_list.get((namespace, kind, name)),
)
@@ -148,13 +139,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_rollouts(self) -> list[K8sObjectData]:
self.debug(f"Listing ArgoCD rollouts in {self.cluster}")
@@ -175,13 +162,9 @@ class ClusterLoader(Configurable):
self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
@@ -195,13 +178,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
@@ -215,13 +194,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
@@ -235,13 +210,9 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")
- return await asyncio.gather(
- *[
- self.__build_obj(item, container)
- for item in ret.items
- for container in item.spec.template.spec.containers
- ]
- )
+ return [
+ self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
+ ]
async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""
@@ -257,9 +228,7 @@ class ClusterLoader(Configurable):
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")
- return await asyncio.gather(
- *[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
- )
+ return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
@@ -337,6 +306,18 @@ class ClusterLoader(Configurable):
# If V2 API does not exist, fall back to V1
return await self.__list_hpa_v1()
+ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
+ try:
+ return await self.__list_hpa()
+ except Exception as e:
+ self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}")
+ self.debug_exception()
+ self.error(
+ "Will assume that there are no HPA. "
+ "Be careful as this may lead to inaccurate results if object actually has HPA."
+ )
+ return {}
+
class KubernetesLoader(Configurable):
async def list_clusters(self) -> Optional[list[str]]:
diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py
index 0daf3a6..703b96f 100644
--- a/robusta_krr/core/integrations/prometheus/loader.py
+++ b/robusta_krr/core/integrations/prometheus/loader.py
@@ -97,7 +97,7 @@ class PrometheusMetricsLoader(Configurable):
ResourceHistoryData: The gathered resource history data.
"""
- await self.loader.add_historic_pods(object, period)
+ await self.loader.load_pods(object, period)
return {
MetricLoader: await self.loader.gather_data(object, MetricLoader, period, step)
diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
index 916f48b..bbf92ba 100644
--- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
+++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py
@@ -158,9 +158,9 @@ class PrometheusMetricsService(MetricsService):
metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
return await metric_loader.load_data(object, period, step)
- async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
+ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
- Finds pods that have been deleted but still have some metrics in Prometheus.
+ List pods related to the object and add them to the object's pods list.
Args:
object (K8sObjectData): The Kubernetes object.
period (datetime.timedelta): The time period for which to gather data.
@@ -200,10 +200,11 @@ class PrometheusMetricsService(MetricsService):
f"[{period_literal}]"
)
- current_pods = {p.name for p in object.pods}
+ if related_pods == []:
+ self.debug(f"No pods found for {object}")
+ return
+ last_timestamp: float = max([pod["values"][-1][0] for pod in related_pods])
object.pods += [
- PodData(name=pod["metric"]["pod"], deleted=True)
- for pod in related_pods
- if pod["metric"]["pod"] not in current_pods
+ PodData(name=pod["metric"]["pod"], deleted=pod["values"][-1][0] != last_timestamp) for pod in related_pods
]