diff options
| author | Павел Жуков <33721692+LeaveMyYard@users.noreply.github.com> | 2023-03-20 20:21:42 +0200 |
|---|---|---|
| committer | Павел Жуков <33721692+LeaveMyYard@users.noreply.github.com> | 2023-03-20 20:21:42 +0200 |
| commit | 07a1bb1dc504dac31bfe17d68e72981c1c2a045a (patch) | |
| tree | c526198179e538ed8ec66ca5145916c7f3f2bda8 /robusta_krr | |
| parent | f3f0b6c5d515ff0cca06c96f4c7296eaf0343970 (diff) | |
Fix prometheus integration
Diffstat (limited to 'robusta_krr')
| -rw-r--r-- | robusta_krr/core/abstract/strategies.py | 2 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/kubernetes.py | 95 | ||||
| -rw-r--r-- | robusta_krr/core/integrations/prometheus.py | 53 | ||||
| -rw-r--r-- | robusta_krr/core/models/objects.py | 1 | ||||
| -rw-r--r-- | robusta_krr/core/runner.py | 24 | ||||
| -rw-r--r-- | robusta_krr/formatters/__init__.py | 1 | ||||
| -rw-r--r-- | robusta_krr/formatters/pprint.py | 23 | ||||
| -rw-r--r-- | robusta_krr/formatters/table.py | 10 | ||||
| -rw-r--r-- | robusta_krr/strategies/simple.py | 22 |
9 files changed, 172 insertions, 59 deletions
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index 3cd7682..8d7fef3 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -26,7 +26,7 @@ class StrategySettings(pd.BaseModel): _StrategySettings = TypeVar("_StrategySettings", bound=StrategySettings) -HistoryData = dict[ResourceType, list[float]] +HistoryData = dict[ResourceType, dict[str, list[Decimal]]] RunResult = dict[ResourceType, ResourceRecommendation] diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index d027191..908b085 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -1,5 +1,6 @@ import asyncio import itertools +from typing import Literal from kubernetes import client, config from kubernetes.client.models import ( @@ -9,6 +10,7 @@ from kubernetes.client.models import ( V1Deployment, V1DeploymentList, V1JobList, + V1LabelSelector, V1PodList, V1StatefulSet, V1StatefulSetList, @@ -27,6 +29,7 @@ class ClusterLoader(Configurable): self.api_client = config.new_client_from_config(context=cluster) self.apps = client.AppsV1Api(api_client=self.api_client) self.batch = client.BatchV1Api(api_client=self.api_client) + self.core = client.CoreV1Api(api_client=self.api_client) async def list_scannable_objects(self) -> list[K8sObjectData]: """List all scannable objects. @@ -51,7 +54,40 @@ class ClusterLoader(Configurable): return list(itertools.chain(*objects_tuple)) - def __build_obj(self, item: V1Deployment | V1DaemonSet | V1StatefulSet, container: V1Container) -> K8sObjectData: + @staticmethod + def _get_match_expression_filter(expression) -> str: + if expression.operator.lower() == "exists": + return expression.key + elif expression.operator.lower() == "doesnotexist": + return f"!{expression.key}" + + values = ",".join(expression.values) + return f"{expression.key} {expression.operator} ({values})" + + @staticmethod + def _build_selector_query(selector: V1LabelSelector) -> str | None: + label_filters = [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] + + if selector.match_expressions is not None: + label_filters.extend( + [ClusterLoader._get_match_expression_filter(expression) for expression in selector.match_expressions] + ) + + return ",".join(label_filters) + + async def __list_pods(self, resource: V1Deployment | V1DaemonSet | V1StatefulSet) -> list[str]: + selector = self._build_selector_query(resource.spec.selector) + if selector is None: + return [] + + ret: V1PodList = await asyncio.to_thread( + self.core.list_namespaced_pod, namespace=resource.metadata.namespace, label_selector=selector + ) + return [pod.metadata.name for pod in ret.items] + + async def __build_obj( + self, item: V1Deployment | V1DaemonSet | V1StatefulSet, container: V1Container + ) -> K8sObjectData: return K8sObjectData( cluster=self.cluster, namespace=item.metadata.namespace, @@ -59,42 +95,61 @@ class ClusterLoader(Configurable): kind=item.__class__.__name__[2:], container=container.name, allocations=ResourceAllocations.from_container(container), + pods=await self.__list_pods(item), ) async def _list_deployments(self) -> list[K8sObjectData]: ret: V1DeploymentList = await asyncio.to_thread(self.apps.list_deployment_for_all_namespaces, watch=False) - return [ - self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers - ] + return await asyncio.gather( + *[ + self.__build_obj(item, container) + for item in ret.items + for container in item.spec.template.spec.containers + ] + ) async def _list_all_statefulsets(self) -> list[K8sObjectData]: ret: V1StatefulSetList = await asyncio.to_thread(self.apps.list_stateful_set_for_all_namespaces, watch=False) - return [ - self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers - ] + return await asyncio.gather( + *[ + self.__build_obj(item, container) + for item in ret.items + for container in item.spec.template.spec.containers + ] + ) async def _list_all_daemon_set(self) -> list[K8sObjectData]: ret: V1DaemonSetList = await asyncio.to_thread(self.apps.list_daemon_set_for_all_namespaces, watch=False) - return [ - self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers - ] + return await asyncio.gather( + *[ + self.__build_obj(item, container) + for item in ret.items + for container in item.spec.template.spec.containers + ] + ) async def _list_all_jobs(self) -> list[K8sObjectData]: ret: V1JobList = await asyncio.to_thread(self.batch.list_job_for_all_namespaces, watch=False) - return [ - self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers - ] + return await asyncio.gather( + *[ + self.__build_obj(item, container) + for item in ret.items + for container in item.spec.template.spec.containers + ] + ) async def _list_pods(self) -> list[K8sObjectData]: """For future use, not supported yet.""" ret: V1PodList = await asyncio.to_thread(self.apps.list_pod_for_all_namespaces, watch=False) - return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers] + return await asyncio.gather( + *[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers] + ) class KubernetesLoader(Configurable): @@ -102,16 +157,22 @@ class KubernetesLoader(Configurable): super().__init__(*args, **kwargs) config.load_kube_config() - async def list_clusters(self) -> list[str]: + async def list_clusters(self, filter: list[str] | Literal["*"] | None = None) -> list[str]: """List all clusters. Returns: A list of clusters. """ - contexts, _ = await asyncio.to_thread(config.list_kube_config_contexts) + contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts) + + if filter is None: + return [current_context["name"]] + + if filter == "*": + return [context["name"] for context in contexts] - return [context["name"] for context in contexts] + return [context["name"] for context in contexts if context["name"] in filter] async def list_scannable_objects(self, clusters: list[str]) -> list[K8sObjectData]: """List all scannable objects. diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus.py index 1549e59..bb76c1f 100644 --- a/robusta_krr/core/integrations/prometheus.py +++ b/robusta_krr/core/integrations/prometheus.py @@ -1,5 +1,6 @@ import asyncio import datetime +from decimal import Decimal from kubernetes import config as k8s_config from kubernetes.client import ApiClient @@ -92,35 +93,39 @@ class PrometheusLoader(Configurable): period: datetime.timedelta, *, timeframe: datetime.timedelta = datetime.timedelta(minutes=30), - ) -> list[int]: - # TODO: Queries do not work properly + ) -> dict[str, list[Decimal]]: self.debug(f"Gathering data for {object} and {resource}") if resource == ResourceType.CPU: - return await asyncio.to_thread( - self.prometheus.custom_query_range, - query='sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{namespace="$namespace", pod=~"$pod", container=~"$container"})', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step="30m", - params={ - "namespace": object.namespace, - "pod": object.name, - "container": object.container, - }, + result = await asyncio.gather( + *[ + asyncio.to_thread( + self.prometheus.custom_query_range, + query=f'sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', + start_time=datetime.datetime.now() - period, + end_time=datetime.datetime.now(), + step=f"{int(timeframe.total_seconds()) // 60}m", + ) + for pod in object.pods + ] ) elif resource == ResourceType.Memory: - return await asyncio.to_thread( - self.prometheus.custom_query_range, - query='sum(container_memory_working_set_bytes{job="kubelet", metrics_path="/metrics/cadvisor", pod=~"$pod", container=~"$container", image!=""})', - start_time=datetime.datetime.now() - period, - end_time=datetime.datetime.now(), - step="30m", - params={ - # "namespace": object.namespace, - "pod": object.name, - "container": object.container, - }, + result = await asyncio.gather( + *[ + asyncio.to_thread( + self.prometheus.custom_query_range, + query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})', + start_time=datetime.datetime.now() - period, + end_time=datetime.datetime.now(), + step=f"{int(timeframe.total_seconds()) // 60}m", + ) + for pod in object.pods + ] ) else: raise ValueError(f"Unknown resource type: {resource}") + + if result == []: + return {pod: [] for pod in object.pods} + + return {pod: [Decimal(value) for _, value in result[i][0]["values"]] for i, pod in enumerate(object.pods)} diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 47fa47f..328e9c8 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -7,6 +7,7 @@ class K8sObjectData(pd.BaseModel): cluster: str name: str container: str + pods: list[str] namespace: str kind: str | None allocations: ResourceAllocations diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 484f5d8..b884a05 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -1,6 +1,8 @@ import asyncio +import math +from decimal import Decimal -from robusta_krr.core.abstract.strategies import RunResult +from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader from robusta_krr.core.integrations.prometheus import PrometheusLoader from robusta_krr.core.models.config import Config @@ -43,6 +45,23 @@ class Runner(Configurable): self.echo("\n", no_prefix=True) self.console.print(formatted) + @staticmethod + def _round_value(value: Decimal | None) -> Decimal | None: + if value is None or value.is_nan(): + return None + + return Decimal(math.ceil(value * 1000)) / 1000 + + @staticmethod + def _format_result(result: RunResult) -> RunResult: + return { + resource: ResourceRecommendation( + request=Runner._round_value(recommendation.request), + limit=Runner._round_value(recommendation.limit), + ) + for resource, recommendation in result.items() + } + async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult: prometheus_loader = self._get_prometheus_loader(object.cluster) @@ -60,7 +79,8 @@ class Runner(Configurable): # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive # But keep in mind that numpy calcluations will not block the GIL - return await asyncio.to_thread(self._strategy.run, data, object) + result = await asyncio.to_thread(self._strategy.run, data, object) + return self._format_result(result) async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]: recommendations: list[RunResult] = await asyncio.gather( diff --git a/robusta_krr/formatters/__init__.py b/robusta_krr/formatters/__init__.py index cdb12bb..0fc1c80 100644 --- a/robusta_krr/formatters/__init__.py +++ b/robusta_krr/formatters/__init__.py @@ -1,3 +1,4 @@ from .json import JSONFormatter +from .pprint import PPrintFormatter from .table import TableFormatter from .yaml import YAMLFormatter diff --git a/robusta_krr/formatters/pprint.py b/robusta_krr/formatters/pprint.py new file mode 100644 index 0000000..bdfcc4c --- /dev/null +++ b/robusta_krr/formatters/pprint.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from pprint import pformat + +from robusta_krr.core.abstract.formatters import BaseFormatter +from robusta_krr.core.models.result import Result + + +class PPrintFormatter(BaseFormatter): + """Formatter for object output with python's pprint module.""" + + __display_name__ = "pprint" + + def format(self, result: Result) -> str: + """Format the result using pprint.pformat(...) + + :param result: The results to format. + :type result: :class:`core.result.Result` + :returns: The formatted results. + :rtype: str + """ + + return pformat(result.dict()) diff --git a/robusta_krr/formatters/table.py b/robusta_krr/formatters/table.py index 9eda49e..327d66a 100644 --- a/robusta_krr/formatters/table.py +++ b/robusta_krr/formatters/table.py @@ -12,6 +12,7 @@ from robusta_krr.utils import resource_units NONE_LITERAL = "none" NAN_LITERAL = "?" PRESCISION = 4 +ALLOWED_DIFFERENCE = 0.05 class TableFormatter(BaseFormatter): @@ -28,10 +29,13 @@ class TableFormatter(BaseFormatter): return resource_units.format(value, prescision=prescision) def _format_request_str(self, item: ResourceScan, resource: ResourceType, selector: str) -> str: + allocated = getattr(item.object.allocations, selector)[resource] + recommended = getattr(item.recommended, selector)[resource] + return ( - self._format_united_decimal(getattr(item.object.allocations, selector)[resource]) + self._format_united_decimal(allocated) + " -> " - + self._format_united_decimal(getattr(item.recommended, selector)[resource], prescision=PRESCISION) + + self._format_united_decimal(recommended, prescision=PRESCISION) ) def format(self, result: Result) -> Table: @@ -49,6 +53,7 @@ class TableFormatter(BaseFormatter): table.add_column("Cluster", style="cyan") table.add_column("Namespace", style="cyan") table.add_column("Name", style="cyan") + table.add_column("Pods", style="cyan") table.add_column("Type", style="cyan") table.add_column("Container", style="cyan") for resource in ResourceType: @@ -69,6 +74,7 @@ class TableFormatter(BaseFormatter): item.object.cluster if full_info_row else "", item.object.namespace if full_info_row else "", item.object.name if full_info_row else "", + str(len(item.object.pods)) if full_info_row else "", item.object.kind if full_info_row else "", item.object.container, *[ diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py index 63fafef..914db12 100644 --- a/robusta_krr/strategies/simple.py +++ b/robusta_krr/strategies/simple.py @@ -30,19 +30,15 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]): memory_usage = self._calculate_percentile(history_data[ResourceType.Memory], self.settings.request_percentile) return { - ResourceType.CPU: ResourceRecommendation( - request=Decimal(cpu_usage) / 1000 if cpu_usage is not None else None, - limit=None, - ), - ResourceType.Memory: ResourceRecommendation( - request=Decimal(memory_usage), - limit=Decimal(memory_usage), - ), + ResourceType.CPU: ResourceRecommendation(request=cpu_usage, limit=None), + ResourceType.Memory: ResourceRecommendation(request=memory_usage, limit=memory_usage), } - def _calculate_percentile(self, data: list[float], percentile: float) -> float: - if len(data) == 0: - return float("nan") + def _calculate_percentile(self, data: dict[str, list[Decimal]], percentile: float) -> Decimal: + data_ = [value for values in data.values() for value in values] - data = sorted(data) - return data[int(len(data) * percentile)] + if len(data_) == 0: + return Decimal.from_float(float("nan")) + + data_ = sorted(data_) + return data_[int(len(data_) * percentile)] |
