summaryrefslogtreecommitdiff
path: root/robusta_krr/core
diff options
context:
space:
mode:
authorLeaveMyYard <zhukovpavel2001@gmail.com>2024-03-14 13:06:14 +0200
committerLeaveMyYard <zhukovpavel2001@gmail.com>2024-03-14 13:06:14 +0200
commit4c8e727205221f4d38a638b3ed51dea568b26309 (patch)
tree7cae5b7cc635d58a7db7489d670fb8ab0286a03c /robusta_krr/core
parent50147bd63ea57246d6b2653a849fb16da26f6339 (diff)
Remove aiostream
Diffstat (limited to 'robusta_krr/core')
-rw-r--r--robusta_krr/core/integrations/kubernetes/__init__.py42
1 files changed, 18 insertions, 24 deletions
diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py
index aad7f95..2c42d22 100644
--- a/robusta_krr/core/integrations/kubernetes/__init__.py
+++ b/robusta_krr/core/integrations/kubernetes/__init__.py
@@ -1,9 +1,8 @@
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
-from typing import AsyncGenerator, AsyncIterator, Callable, Optional, Union
+from typing import AsyncGenerator, AsyncIterable, Callable, Optional, Union
-import aiostream
from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException
from kubernetes.client.models import (
@@ -23,6 +22,7 @@ from kubernetes.client.models import (
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
+from robusta_krr.utils.async_gen_merge import async_gen_merge
from . import config_patch as _
from .rollout import RolloutAppsV1Api
@@ -67,20 +67,17 @@ class ClusterLoader:
# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
- objects_combined = aiostream.stream.merge(
+ async for object in async_gen_merge(
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
- )
-
- async with objects_combined.stream() as streamer:
- async for object in streamer:
- # NOTE: By default we will filter out kube-system namespace
- if settings.namespaces == "*" and object.namespace == "kube-system":
- continue
- yield object
+ ):
+ # NOTE: By default we will filter out kube-system namespace
+ if settings.namespaces == "*" and object.namespace == "kube-system":
+ continue
+ yield object
async def list_pods(self, object: K8sObjectData) -> list[PodData]:
selector = self._build_selector_query(object._api_resource.spec.selector)
@@ -143,7 +140,7 @@ class ClusterLoader:
async def _list_workflows(
self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable
- ) -> AsyncIterator[K8sObjectData]:
+ ) -> AsyncIterable[K8sObjectData]:
if not self._should_list_resource(kind):
logger.debug(f"Skipping {kind}s in {self.cluster}")
return
@@ -198,35 +195,35 @@ class ClusterLoader:
logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
logger.error("Will skip this object type and continue.")
- def _list_deployments(self) -> AsyncIterator[K8sObjectData]:
+ def _list_deployments(self) -> AsyncIterable[K8sObjectData]:
return self._list_workflows(
kind="Deployment",
all_namespaces_request=self.apps.list_deployment_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_deployment,
)
- def _list_rollouts(self) -> AsyncIterator[K8sObjectData]:
+ def _list_rollouts(self) -> AsyncIterable[K8sObjectData]:
return self._list_workflows(
kind="Rollout",
all_namespaces_request=self.rollout.list_rollout_for_all_namespaces,
namespaced_request=self.rollout.list_namespaced_rollout,
)
- def _list_all_statefulsets(self) -> AsyncIterator[K8sObjectData]:
+ def _list_all_statefulsets(self) -> AsyncIterable[K8sObjectData]:
return self._list_workflows(
kind="StatefulSet",
all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_stateful_set,
)
- def _list_all_daemon_set(self) -> AsyncIterator[K8sObjectData]:
+ def _list_all_daemon_set(self) -> AsyncIterable[K8sObjectData]:
return self._list_workflows(
kind="DaemonSet",
all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces,
namespaced_request=self.apps.list_namespaced_daemon_set,
)
- def _list_all_jobs(self) -> AsyncIterator[K8sObjectData]:
+ def _list_all_jobs(self) -> AsyncIterable[K8sObjectData]:
return self._list_workflows(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
@@ -372,7 +369,7 @@ class KubernetesLoader:
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None
- async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]:
+ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterable[K8sObjectData]:
"""List all scannable objects.
Yields:
@@ -390,13 +387,10 @@ class KubernetesLoader:
# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
# This will merge all the streams from all the cluster loaders into a single stream
- objects_combined = aiostream.stream.merge(
+ async for object in async_gen_merge(
*[cluster_loader.list_scannable_objects() for cluster_loader in self.cluster_loaders.values()]
- )
-
- async with objects_combined.stream() as streamer:
- async for object in streamer:
- yield object
+ ):
+ yield object
async def load_pods(self, object: K8sObjectData) -> list[PodData]:
try: