From 4f7d312ae4ea470423fde890401facc62080ae2b Mon Sep 17 00:00:00 2001 From: LeaveMyYard Date: Fri, 4 Aug 2023 12:05:05 +0300 Subject: Fix rollouts missing behaviour --- robusta_krr/core/integrations/kubernetes.py | 48 ++++++++++++++--------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index d4d8ef2..26ac233 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -1,7 +1,6 @@ import asyncio from concurrent.futures import ThreadPoolExecutor -from typing import AsyncGenerator, Optional, Union, Callable, AsyncIterator -from functools import wraps +from typing import AsyncGenerator, Optional, Union, Callable, AsyncIterator, Literal import aiostream from kubernetes import client, config # type: ignore @@ -9,16 +8,11 @@ from kubernetes.client import ApiException from kubernetes.client.models import ( V1Container, V1DaemonSet, - V1DaemonSetList, V1Deployment, - V1DeploymentList, V1Job, - V1JobList, V1LabelSelector, V1Pod, - V1PodList, V1StatefulSet, - V1StatefulSetList, V1HorizontalPodAutoscalerList, V2HorizontalPodAutoscaler, V2HorizontalPodAutoscalerList, @@ -34,6 +28,8 @@ from .rollout import RolloutAppsV1Api AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] HPAKey = tuple[str, str, str] +KindLiteral = Literal["deployment", "daemonset", "statefulset", "job", "rollout"] + class ClusterLoader(Configurable): def __init__(self, cluster: Optional[str], *args, **kwargs): @@ -54,6 +50,8 @@ class ClusterLoader(Configurable): self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) + self.__rollouts_available = True + async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]: """List all scannable objects. @@ -128,12 +126,15 @@ class ClusterLoader(Configurable): return resource.lower() in self.config.resources async def _list_workflows( - self, kind: str, all_namespaces_request: Callable, namespaced_request: Callable + self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable ) -> AsyncIterator[K8sObjectData]: if not self._should_list_resource(kind): self.debug(f"Skipping {kind}s in {self.cluster}") return + if kind == "rollout" and not self.__rollouts_available: + return + self.debug(f"Listing {kind}s in {self.cluster}") loop = asyncio.get_running_loop() @@ -173,9 +174,14 @@ class ClusterLoader(Configurable): self.debug(f"Found {total_items} {kind} in {self.cluster}") except ApiException as e: - self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") - self.debug_exception() - self.error("Will skip this object type and continue.") + if kind == "rollout" and e.status in [400, 401, 403, 404]: + if self.__rollouts_available: + self.debug(f"Rollout API not available in {self.cluster}") + self.__rollouts_available = False + else: + self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") + self.debug_exception() + self.error("Will skip this object type and continue.") def _list_deployments(self) -> AsyncIterator[K8sObjectData]: return self._list_workflows( @@ -184,20 +190,12 @@ class ClusterLoader(Configurable): namespaced_request=self.apps.list_namespaced_deployment, ) - async def _list_rollouts(self) -> AsyncIterator[K8sObjectData]: - # TODO: Mutlitple errors will throw here, we should catch them all - try: - async for rollout in self._list_workflows( - kind="rollout", - all_namespaces_request=self.rollout.list_rollout_for_all_namespaces, - namespaced_request=self.rollout.list_namespaced_rollout, - ): - yield rollout - except ApiException as e: - if e.status in [400, 401, 403, 404]: - self.debug(f"Rollout API not available in {self.cluster}") - else: - raise + def _list_rollouts(self) -> AsyncIterator[K8sObjectData]: + return self._list_workflows( + kind="rollout", + all_namespaces_request=self.rollout.list_rollout_for_all_namespaces, + namespaced_request=self.rollout.list_namespaced_rollout, + ) def _list_all_statefulsets(self) -> AsyncIterator[K8sObjectData]: return self._list_workflows( -- cgit v1.2.3