diff options
| author | LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> | 2023-07-25 11:57:30 +0300 |
|---|---|---|
| committer | LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> | 2023-07-25 11:57:30 +0300 |
| commit | b21f25242016957fac7f4a1fbd917846c4e324ab (patch) | |
| tree | 33a93c72d9faa141205e1f0ef0661bf81721484d | |
| parent | f87550128ddb48a8c2ff083f182784279fc12118 (diff) | |
Remove k8s API pod loading
3 files changed, 64 insertions, 82 deletions
diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 4d1a5a3..d8f0978 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -63,29 +63,31 @@ class ClusterLoader(Configurable): self.info(f"Listing scannable objects in {self.cluster}") self.debug(f"Namespaces: {self.config.namespaces}") - try: - self.__hpa_list = await self.__list_hpa() - tasks = [ - self._list_deployments(), - self._list_rollouts(), - self._list_all_statefulsets(), - self._list_all_daemon_set(), - self._list_all_jobs(), - ] - - for fut in asyncio.as_completed(tasks): + self.__hpa_list = await self._try_list_hpa() + + tasks = [ + self._list_deployments(), + self._list_rollouts(), + self._list_all_statefulsets(), + self._list_all_daemon_set(), + self._list_all_jobs(), + ] + + for fut in asyncio.as_completed(tasks): + try: object_list = await fut - for object in object_list: - if self.config.namespaces == "*" and object.namespace == "kube-system": - continue - elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces: - continue - yield object - - except Exception as e: - self.error(f"Error trying to list pods in cluster {self.cluster}: {e}") - self.debug_exception() - return + except Exception as e: + self.error(f"Error {e.__class__.__name__} listing objects in cluster {self.cluster}: {e}") + self.debug_exception() + self.error("Will skip this object type and continue.") + continue + + for object in object_list: + if self.config.namespaces == "*" and object.namespace == "kube-system": + continue + elif self.config.namespaces != "*" and object.namespace not in self.config.namespaces: + continue + yield object @staticmethod def _get_match_expression_filter(expression) -> str: @@ -108,22 +110,12 @@ class ClusterLoader(Configurable): return ",".join(label_filters) - async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1StatefulSet]) -> list[PodData]: - selector = self._build_selector_query(resource.spec.selector) - if selector is None: - return [] - - loop = asyncio.get_running_loop() - ret: V1PodList = await loop.run_in_executor( - self.executor, - lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector), - ) - return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] - - async def __build_obj(self, item: AnyKubernetesAPIObject, container: V1Container) -> K8sObjectData: + def __build_obj( + self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None + ) -> K8sObjectData: name = item.metadata.name namespace = item.metadata.namespace - kind = item.__class__.__name__[2:] + kind = kind or item.__class__.__name__[2:] return K8sObjectData( cluster=self.cluster, @@ -132,7 +124,6 @@ class ClusterLoader(Configurable): kind=kind, container=container.name, allocations=ResourceAllocations.from_container(container), - pods=await self.__list_pods(item), hpa=self.__hpa_list.get((namespace, kind, name)), ) @@ -148,13 +139,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} deployments in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_rollouts(self) -> list[K8sObjectData]: self.debug(f"Listing ArgoCD rollouts in {self.cluster}") @@ -175,13 +162,9 @@ class ClusterLoader(Configurable): self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_all_statefulsets(self) -> list[K8sObjectData]: self.debug(f"Listing statefulsets in {self.cluster}") @@ -195,13 +178,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_all_daemon_set(self) -> list[K8sObjectData]: self.debug(f"Listing daemonsets in {self.cluster}") @@ -215,13 +194,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_all_jobs(self) -> list[K8sObjectData]: self.debug(f"Listing jobs in {self.cluster}") @@ -235,13 +210,9 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} jobs in {self.cluster}") - return await asyncio.gather( - *[ - self.__build_obj(item, container) - for item in ret.items - for container in item.spec.template.spec.containers - ] - ) + return [ + self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers + ] async def _list_pods(self) -> list[K8sObjectData]: """For future use, not supported yet.""" @@ -257,9 +228,7 @@ class ClusterLoader(Configurable): ) self.debug(f"Found {len(ret.items)} pods in {self.cluster}") - return await asyncio.gather( - *[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers] - ) + return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers] async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() @@ -337,6 +306,18 @@ class ClusterLoader(Configurable): # If V2 API does not exist, fall back to V1 return await self.__list_hpa_v1() + async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: + try: + return await self.__list_hpa() + except Exception as e: + self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}") + self.debug_exception() + self.error( + "Will assume that there are no HPA. " + "Be careful as this may lead to inaccurate results if object actually has HPA." + ) + return {} + class KubernetesLoader(Configurable): async def list_clusters(self) -> Optional[list[str]]: diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 0daf3a6..703b96f 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -97,7 +97,7 @@ class PrometheusMetricsLoader(Configurable): ResourceHistoryData: The gathered resource history data. """ - await self.loader.add_historic_pods(object, period) + await self.loader.load_pods(object, period) return { MetricLoader: await self.loader.gather_data(object, MetricLoader, period, step) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 916f48b..bbf92ba 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -158,9 +158,9 @@ class PrometheusMetricsService(MetricsService): metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor) return await metric_loader.load_data(object, period, step) - async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None: + async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None: """ - Finds pods that have been deleted but still have some metrics in Prometheus. + List pods related to the object and add them to the object's pods list. Args: object (K8sObjectData): The Kubernetes object. period (datetime.timedelta): The time period for which to gather data. @@ -200,10 +200,11 @@ class PrometheusMetricsService(MetricsService): f"[{period_literal}]" ) - current_pods = {p.name for p in object.pods} + if related_pods == []: + self.debug(f"No pods found for {object}") + return + last_timestamp: float = max([pod["values"][-1][0] for pod in related_pods]) object.pods += [ - PodData(name=pod["metric"]["pod"], deleted=True) - for pod in related_pods - if pod["metric"]["pod"] not in current_pods + PodData(name=pod["metric"]["pod"], deleted=pod["values"][-1][0] != last_timestamp) for pod in related_pods ] |
