summaryrefslogtreecommitdiff
path: root/robusta_krr
diff options
context:
space:
mode:
authorПавел Жуков <33721692+LeaveMyYard@users.noreply.github.com>2023-03-20 20:21:42 +0200
committerПавел Жуков <33721692+LeaveMyYard@users.noreply.github.com>2023-03-20 20:21:42 +0200
commit07a1bb1dc504dac31bfe17d68e72981c1c2a045a (patch)
treec526198179e538ed8ec66ca5145916c7f3f2bda8 /robusta_krr
parentf3f0b6c5d515ff0cca06c96f4c7296eaf0343970 (diff)
Fix prometheus integration
Diffstat (limited to 'robusta_krr')
-rw-r--r--robusta_krr/core/abstract/strategies.py2
-rw-r--r--robusta_krr/core/integrations/kubernetes.py95
-rw-r--r--robusta_krr/core/integrations/prometheus.py53
-rw-r--r--robusta_krr/core/models/objects.py1
-rw-r--r--robusta_krr/core/runner.py24
-rw-r--r--robusta_krr/formatters/__init__.py1
-rw-r--r--robusta_krr/formatters/pprint.py23
-rw-r--r--robusta_krr/formatters/table.py10
-rw-r--r--robusta_krr/strategies/simple.py22
9 files changed, 172 insertions, 59 deletions
diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py
index 3cd7682..8d7fef3 100644
--- a/robusta_krr/core/abstract/strategies.py
+++ b/robusta_krr/core/abstract/strategies.py
@@ -26,7 +26,7 @@ class StrategySettings(pd.BaseModel):
_StrategySettings = TypeVar("_StrategySettings", bound=StrategySettings)
-HistoryData = dict[ResourceType, list[float]]
+HistoryData = dict[ResourceType, dict[str, list[Decimal]]]
RunResult = dict[ResourceType, ResourceRecommendation]
diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py
index d027191..908b085 100644
--- a/robusta_krr/core/integrations/kubernetes.py
+++ b/robusta_krr/core/integrations/kubernetes.py
@@ -1,5 +1,6 @@
import asyncio
import itertools
+from typing import Literal
from kubernetes import client, config
from kubernetes.client.models import (
@@ -9,6 +10,7 @@ from kubernetes.client.models import (
V1Deployment,
V1DeploymentList,
V1JobList,
+ V1LabelSelector,
V1PodList,
V1StatefulSet,
V1StatefulSetList,
@@ -27,6 +29,7 @@ class ClusterLoader(Configurable):
self.api_client = config.new_client_from_config(context=cluster)
self.apps = client.AppsV1Api(api_client=self.api_client)
self.batch = client.BatchV1Api(api_client=self.api_client)
+ self.core = client.CoreV1Api(api_client=self.api_client)
async def list_scannable_objects(self) -> list[K8sObjectData]:
"""List all scannable objects.
@@ -51,7 +54,40 @@ class ClusterLoader(Configurable):
return list(itertools.chain(*objects_tuple))
- def __build_obj(self, item: V1Deployment | V1DaemonSet | V1StatefulSet, container: V1Container) -> K8sObjectData:
+ @staticmethod
+ def _get_match_expression_filter(expression) -> str:
+ if expression.operator.lower() == "exists":
+ return expression.key
+ elif expression.operator.lower() == "doesnotexist":
+ return f"!{expression.key}"
+
+ values = ",".join(expression.values)
+ return f"{expression.key} {expression.operator} ({values})"
+
+ @staticmethod
+ def _build_selector_query(selector: V1LabelSelector) -> str | None:
+ label_filters = [f"{label[0]}={label[1]}" for label in selector.match_labels.items()]
+
+ if selector.match_expressions is not None:
+ label_filters.extend(
+ [ClusterLoader._get_match_expression_filter(expression) for expression in selector.match_expressions]
+ )
+
+ return ",".join(label_filters)
+
+ async def __list_pods(self, resource: V1Deployment | V1DaemonSet | V1StatefulSet) -> list[str]:
+ selector = self._build_selector_query(resource.spec.selector)
+ if selector is None:
+ return []
+
+ ret: V1PodList = await asyncio.to_thread(
+ self.core.list_namespaced_pod, namespace=resource.metadata.namespace, label_selector=selector
+ )
+ return [pod.metadata.name for pod in ret.items]
+
+ async def __build_obj(
+ self, item: V1Deployment | V1DaemonSet | V1StatefulSet, container: V1Container
+ ) -> K8sObjectData:
return K8sObjectData(
cluster=self.cluster,
namespace=item.metadata.namespace,
@@ -59,42 +95,61 @@ class ClusterLoader(Configurable):
kind=item.__class__.__name__[2:],
container=container.name,
allocations=ResourceAllocations.from_container(container),
+ pods=await self.__list_pods(item),
)
async def _list_deployments(self) -> list[K8sObjectData]:
ret: V1DeploymentList = await asyncio.to_thread(self.apps.list_deployment_for_all_namespaces, watch=False)
- return [
- self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
- ]
+ return await asyncio.gather(
+ *[
+ self.__build_obj(item, container)
+ for item in ret.items
+ for container in item.spec.template.spec.containers
+ ]
+ )
async def _list_all_statefulsets(self) -> list[K8sObjectData]:
ret: V1StatefulSetList = await asyncio.to_thread(self.apps.list_stateful_set_for_all_namespaces, watch=False)
- return [
- self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
- ]
+ return await asyncio.gather(
+ *[
+ self.__build_obj(item, container)
+ for item in ret.items
+ for container in item.spec.template.spec.containers
+ ]
+ )
async def _list_all_daemon_set(self) -> list[K8sObjectData]:
ret: V1DaemonSetList = await asyncio.to_thread(self.apps.list_daemon_set_for_all_namespaces, watch=False)
- return [
- self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
- ]
+ return await asyncio.gather(
+ *[
+ self.__build_obj(item, container)
+ for item in ret.items
+ for container in item.spec.template.spec.containers
+ ]
+ )
async def _list_all_jobs(self) -> list[K8sObjectData]:
ret: V1JobList = await asyncio.to_thread(self.batch.list_job_for_all_namespaces, watch=False)
- return [
- self.__build_obj(item, container) for item in ret.items for container in item.spec.template.spec.containers
- ]
+ return await asyncio.gather(
+ *[
+ self.__build_obj(item, container)
+ for item in ret.items
+ for container in item.spec.template.spec.containers
+ ]
+ )
async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""
ret: V1PodList = await asyncio.to_thread(self.apps.list_pod_for_all_namespaces, watch=False)
- return [self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
+ return await asyncio.gather(
+ *[self.__build_obj(item, container) for item in ret.items for container in item.spec.containers]
+ )
class KubernetesLoader(Configurable):
@@ -102,16 +157,22 @@ class KubernetesLoader(Configurable):
super().__init__(*args, **kwargs)
config.load_kube_config()
- async def list_clusters(self) -> list[str]:
+ async def list_clusters(self, filter: list[str] | Literal["*"] | None = None) -> list[str]:
"""List all clusters.
Returns:
A list of clusters.
"""
- contexts, _ = await asyncio.to_thread(config.list_kube_config_contexts)
+ contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts)
+
+ if filter is None:
+ return [current_context["name"]]
+
+ if filter == "*":
+ return [context["name"] for context in contexts]
- return [context["name"] for context in contexts]
+ return [context["name"] for context in contexts if context["name"] in filter]
async def list_scannable_objects(self, clusters: list[str]) -> list[K8sObjectData]:
"""List all scannable objects.
diff --git a/robusta_krr/core/integrations/prometheus.py b/robusta_krr/core/integrations/prometheus.py
index 1549e59..bb76c1f 100644
--- a/robusta_krr/core/integrations/prometheus.py
+++ b/robusta_krr/core/integrations/prometheus.py
@@ -1,5 +1,6 @@
import asyncio
import datetime
+from decimal import Decimal
from kubernetes import config as k8s_config
from kubernetes.client import ApiClient
@@ -92,35 +93,39 @@ class PrometheusLoader(Configurable):
period: datetime.timedelta,
*,
timeframe: datetime.timedelta = datetime.timedelta(minutes=30),
- ) -> list[int]:
- # TODO: Queries do not work properly
+ ) -> dict[str, list[Decimal]]:
self.debug(f"Gathering data for {object} and {resource}")
if resource == ResourceType.CPU:
- return await asyncio.to_thread(
- self.prometheus.custom_query_range,
- query='sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{namespace="$namespace", pod=~"$pod", container=~"$container"})',
- start_time=datetime.datetime.now() - period,
- end_time=datetime.datetime.now(),
- step="30m",
- params={
- "namespace": object.namespace,
- "pod": object.name,
- "container": object.container,
- },
+ result = await asyncio.gather(
+ *[
+ asyncio.to_thread(
+ self.prometheus.custom_query_range,
+ query=f'sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})',
+ start_time=datetime.datetime.now() - period,
+ end_time=datetime.datetime.now(),
+ step=f"{int(timeframe.total_seconds()) // 60}m",
+ )
+ for pod in object.pods
+ ]
)
elif resource == ResourceType.Memory:
- return await asyncio.to_thread(
- self.prometheus.custom_query_range,
- query='sum(container_memory_working_set_bytes{job="kubelet", metrics_path="/metrics/cadvisor", pod=~"$pod", container=~"$container", image!=""})',
- start_time=datetime.datetime.now() - period,
- end_time=datetime.datetime.now(),
- step="30m",
- params={
- # "namespace": object.namespace,
- "pod": object.name,
- "container": object.container,
- },
+ result = await asyncio.gather(
+ *[
+ asyncio.to_thread(
+ self.prometheus.custom_query_range,
+ query=f'sum(container_memory_working_set_bytes{{job="kubelet", metrics_path="/metrics/cadvisor", image!="", namespace="{object.namespace}", pod="{pod}", container="{object.container}"}})',
+ start_time=datetime.datetime.now() - period,
+ end_time=datetime.datetime.now(),
+ step=f"{int(timeframe.total_seconds()) // 60}m",
+ )
+ for pod in object.pods
+ ]
)
else:
raise ValueError(f"Unknown resource type: {resource}")
+
+ if result == []:
+ return {pod: [] for pod in object.pods}
+
+ return {pod: [Decimal(value) for _, value in result[i][0]["values"]] for i, pod in enumerate(object.pods)}
diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py
index 47fa47f..328e9c8 100644
--- a/robusta_krr/core/models/objects.py
+++ b/robusta_krr/core/models/objects.py
@@ -7,6 +7,7 @@ class K8sObjectData(pd.BaseModel):
cluster: str
name: str
container: str
+ pods: list[str]
namespace: str
kind: str | None
allocations: ResourceAllocations
diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py
index 484f5d8..b884a05 100644
--- a/robusta_krr/core/runner.py
+++ b/robusta_krr/core/runner.py
@@ -1,6 +1,8 @@
import asyncio
+import math
+from decimal import Decimal
-from robusta_krr.core.abstract.strategies import RunResult
+from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
from robusta_krr.core.integrations.kubernetes import KubernetesLoader
from robusta_krr.core.integrations.prometheus import PrometheusLoader
from robusta_krr.core.models.config import Config
@@ -43,6 +45,23 @@ class Runner(Configurable):
self.echo("\n", no_prefix=True)
self.console.print(formatted)
+ @staticmethod
+ def _round_value(value: Decimal | None) -> Decimal | None:
+ if value is None or value.is_nan():
+ return None
+
+ return Decimal(math.ceil(value * 1000)) / 1000
+
+ @staticmethod
+ def _format_result(result: RunResult) -> RunResult:
+ return {
+ resource: ResourceRecommendation(
+ request=Runner._round_value(recommendation.request),
+ limit=Runner._round_value(recommendation.limit),
+ )
+ for resource, recommendation in result.items()
+ }
+
async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult:
prometheus_loader = self._get_prometheus_loader(object.cluster)
@@ -60,7 +79,8 @@ class Runner(Configurable):
# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
- return await asyncio.to_thread(self._strategy.run, data, object)
+ result = await asyncio.to_thread(self._strategy.run, data, object)
+ return self._format_result(result)
async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]:
recommendations: list[RunResult] = await asyncio.gather(
diff --git a/robusta_krr/formatters/__init__.py b/robusta_krr/formatters/__init__.py
index cdb12bb..0fc1c80 100644
--- a/robusta_krr/formatters/__init__.py
+++ b/robusta_krr/formatters/__init__.py
@@ -1,3 +1,4 @@
from .json import JSONFormatter
+from .pprint import PPrintFormatter
from .table import TableFormatter
from .yaml import YAMLFormatter
diff --git a/robusta_krr/formatters/pprint.py b/robusta_krr/formatters/pprint.py
new file mode 100644
index 0000000..bdfcc4c
--- /dev/null
+++ b/robusta_krr/formatters/pprint.py
@@ -0,0 +1,23 @@
+from __future__ import annotations
+
+from pprint import pformat
+
+from robusta_krr.core.abstract.formatters import BaseFormatter
+from robusta_krr.core.models.result import Result
+
+
+class PPrintFormatter(BaseFormatter):
+ """Formatter for object output with python's pprint module."""
+
+ __display_name__ = "pprint"
+
+ def format(self, result: Result) -> str:
+ """Format the result using pprint.pformat(...)
+
+ :param result: The results to format.
+ :type result: :class:`core.result.Result`
+ :returns: The formatted results.
+ :rtype: str
+ """
+
+ return pformat(result.dict())
diff --git a/robusta_krr/formatters/table.py b/robusta_krr/formatters/table.py
index 9eda49e..327d66a 100644
--- a/robusta_krr/formatters/table.py
+++ b/robusta_krr/formatters/table.py
@@ -12,6 +12,7 @@ from robusta_krr.utils import resource_units
NONE_LITERAL = "none"
NAN_LITERAL = "?"
PRESCISION = 4
+ALLOWED_DIFFERENCE = 0.05
class TableFormatter(BaseFormatter):
@@ -28,10 +29,13 @@ class TableFormatter(BaseFormatter):
return resource_units.format(value, prescision=prescision)
def _format_request_str(self, item: ResourceScan, resource: ResourceType, selector: str) -> str:
+ allocated = getattr(item.object.allocations, selector)[resource]
+ recommended = getattr(item.recommended, selector)[resource]
+
return (
- self._format_united_decimal(getattr(item.object.allocations, selector)[resource])
+ self._format_united_decimal(allocated)
+ " -> "
- + self._format_united_decimal(getattr(item.recommended, selector)[resource], prescision=PRESCISION)
+ + self._format_united_decimal(recommended, prescision=PRESCISION)
)
def format(self, result: Result) -> Table:
@@ -49,6 +53,7 @@ class TableFormatter(BaseFormatter):
table.add_column("Cluster", style="cyan")
table.add_column("Namespace", style="cyan")
table.add_column("Name", style="cyan")
+ table.add_column("Pods", style="cyan")
table.add_column("Type", style="cyan")
table.add_column("Container", style="cyan")
for resource in ResourceType:
@@ -69,6 +74,7 @@ class TableFormatter(BaseFormatter):
item.object.cluster if full_info_row else "",
item.object.namespace if full_info_row else "",
item.object.name if full_info_row else "",
+ str(len(item.object.pods)) if full_info_row else "",
item.object.kind if full_info_row else "",
item.object.container,
*[
diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py
index 63fafef..914db12 100644
--- a/robusta_krr/strategies/simple.py
+++ b/robusta_krr/strategies/simple.py
@@ -30,19 +30,15 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
memory_usage = self._calculate_percentile(history_data[ResourceType.Memory], self.settings.request_percentile)
return {
- ResourceType.CPU: ResourceRecommendation(
- request=Decimal(cpu_usage) / 1000 if cpu_usage is not None else None,
- limit=None,
- ),
- ResourceType.Memory: ResourceRecommendation(
- request=Decimal(memory_usage),
- limit=Decimal(memory_usage),
- ),
+ ResourceType.CPU: ResourceRecommendation(request=cpu_usage, limit=None),
+ ResourceType.Memory: ResourceRecommendation(request=memory_usage, limit=memory_usage),
}
- def _calculate_percentile(self, data: list[float], percentile: float) -> float:
- if len(data) == 0:
- return float("nan")
+ def _calculate_percentile(self, data: dict[str, list[Decimal]], percentile: float) -> Decimal:
+ data_ = [value for values in data.values() for value in values]
- data = sorted(data)
- return data[int(len(data) * percentile)]
+ if len(data_) == 0:
+ return Decimal.from_float(float("nan"))
+
+ data_ = sorted(data_)
+ return data_[int(len(data_) * percentile)]