diff options
| author | LeaveMyYard <zhukovpavel2001@gmail.com> | 2024-03-14 13:06:14 +0200 |
|---|---|---|
| committer | LeaveMyYard <zhukovpavel2001@gmail.com> | 2024-03-14 13:06:14 +0200 |
| commit | 4c8e727205221f4d38a638b3ed51dea568b26309 (patch) | |
| tree | 7cae5b7cc635d58a7db7489d670fb8ab0286a03c /robusta_krr/core | |
| parent | 50147bd63ea57246d6b2653a849fb16da26f6339 (diff) | |
Remove aiostream
Diffstat (limited to 'robusta_krr/core')
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes/__init__.py | 42 |
1 files changed, 18 insertions, 24 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index aad7f95..2c42d22 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -1,9 +1,8 @@ import asyncio import logging from concurrent.futures import ThreadPoolExecutor -from typing import AsyncGenerator, AsyncIterator, Callable, Optional, Union +from typing import AsyncGenerator, AsyncIterable, Callable, Optional, Union -import aiostream from kubernetes import client, config # type: ignore from kubernetes.client import ApiException from kubernetes.client.models import ( @@ -23,6 +22,7 @@ from kubernetes.client.models import ( from robusta_krr.core.models.config import settings from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData from robusta_krr.core.models.result import ResourceAllocations +from robusta_krr.utils.async_gen_merge import async_gen_merge from . import config_patch as _ from .rollout import RolloutAppsV1Api @@ -67,20 +67,17 @@ class ClusterLoader: # https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python # This will merge all the streams from all the cluster loaders into a single stream - objects_combined = aiostream.stream.merge( + async for object in async_gen_merge( self._list_deployments(), self._list_rollouts(), self._list_all_statefulsets(), self._list_all_daemon_set(), self._list_all_jobs(), - ) - - async with objects_combined.stream() as streamer: - async for object in streamer: - # NOTE: By default we will filter out kube-system namespace - if settings.namespaces == "*" and object.namespace == "kube-system": - continue - yield object + ): + # NOTE: By default we will filter out kube-system namespace + if settings.namespaces == "*" and object.namespace == "kube-system": + continue + yield object async def list_pods(self, object: K8sObjectData) -> list[PodData]: selector = self._build_selector_query(object._api_resource.spec.selector) @@ -143,7 +140,7 @@ class ClusterLoader: async def _list_workflows( self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable - ) -> AsyncIterator[K8sObjectData]: + ) -> AsyncIterable[K8sObjectData]: if not self._should_list_resource(kind): logger.debug(f"Skipping {kind}s in {self.cluster}") return @@ -198,35 +195,35 @@ class ClusterLoader: logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") logger.error("Will skip this object type and continue.") - def _list_deployments(self) -> AsyncIterator[K8sObjectData]: + def _list_deployments(self) -> AsyncIterable[K8sObjectData]: return self._list_workflows( kind="Deployment", all_namespaces_request=self.apps.list_deployment_for_all_namespaces, namespaced_request=self.apps.list_namespaced_deployment, ) - def _list_rollouts(self) -> AsyncIterator[K8sObjectData]: + def _list_rollouts(self) -> AsyncIterable[K8sObjectData]: return self._list_workflows( kind="Rollout", all_namespaces_request=self.rollout.list_rollout_for_all_namespaces, namespaced_request=self.rollout.list_namespaced_rollout, ) - def _list_all_statefulsets(self) -> AsyncIterator[K8sObjectData]: + def _list_all_statefulsets(self) -> AsyncIterable[K8sObjectData]: return self._list_workflows( kind="StatefulSet", all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces, namespaced_request=self.apps.list_namespaced_stateful_set, ) - def _list_all_daemon_set(self) -> AsyncIterator[K8sObjectData]: + def _list_all_daemon_set(self) -> AsyncIterable[K8sObjectData]: return self._list_workflows( kind="DaemonSet", all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces, namespaced_request=self.apps.list_namespaced_daemon_set, ) - def _list_all_jobs(self) -> AsyncIterator[K8sObjectData]: + def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]: return self._list_workflows( kind="Job", all_namespaces_request=self.batch.list_job_for_all_namespaces, @@ -372,7 +369,7 @@ class KubernetesLoader: logger.error(f"Could not load cluster {cluster} and will skip it: {e}") return None - async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]: + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterable[K8sObjectData]: """List all scannable objects. Yields: @@ -390,13 +387,10 @@ class KubernetesLoader: # https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python # This will merge all the streams from all the cluster loaders into a single stream - objects_combined = aiostream.stream.merge( + async for object in async_gen_merge( *[cluster_loader.list_scannable_objects() for cluster_loader in self.cluster_loaders.values()] - ) - - async with objects_combined.stream() as streamer: - async for object in streamer: - yield object + ): + yield object async def load_pods(self, object: K8sObjectData) -> list[PodData]: try: |
