From 4c87d90dd48748f01098d8dc73ffb736e2b62928 Mon Sep 17 00:00:00 2001 From: arnoldyahad Date: Sun, 25 Jun 2023 12:00:58 +0300 Subject: Rollout Integration in KRR --- robusta_krr/core/integrations/kubernetes.py | 17 +++ robusta_krr/core/integrations/rollout.py | 167 ++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 robusta_krr/core/integrations/rollout.py diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 37ed97f..105e49e 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -20,6 +20,8 @@ from robusta_krr.core.models.objects import K8sObjectData, PodData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.configurable import Configurable +from .rollout import RolloutAppsV1Api + class ClusterLoader(Configurable): def __init__(self, cluster: Optional[str], *args, **kwargs): @@ -32,6 +34,7 @@ class ClusterLoader(Configurable): else None ) self.apps = client.AppsV1Api(api_client=self.api_client) + self.rollout = RolloutAppsV1Api(api_client=self.api_client) self.batch = client.BatchV1Api(api_client=self.api_client) self.core = client.CoreV1Api(api_client=self.api_client) @@ -48,10 +51,12 @@ class ClusterLoader(Configurable): try: objects_tuple = await asyncio.gather( self._list_deployments(), + self._list_rollouts(), self._list_all_statefulsets(), self._list_all_daemon_set(), self._list_all_jobs(), ) + except Exception as e: self.error(f"Error trying to list pods in cluster {self.cluster}: {e}") self.debug_exception() @@ -126,6 +131,18 @@ class ClusterLoader(Configurable): ] ) + async def _list_rollouts(self) -> list[K8sObjectData]: + ret: V1DeploymentList = await asyncio.to_thread(self.rollout.list_rollout_for_all_namespaces, watch=False) + self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}") + + return await asyncio.gather( + *[ + self.__build_obj(item, container) + for item in ret.items + for container in item.spec.template.spec.containers + ] + ) + async def _list_all_statefulsets(self) -> list[K8sObjectData]: self.debug(f"Listing statefulsets in {self.cluster}") ret: V1StatefulSetList = await asyncio.to_thread(self.apps.list_stateful_set_for_all_namespaces, watch=False) diff --git a/robusta_krr/core/integrations/rollout.py b/robusta_krr/core/integrations/rollout.py new file mode 100644 index 0000000..3ac5513 --- /dev/null +++ b/robusta_krr/core/integrations/rollout.py @@ -0,0 +1,167 @@ +from kubernetes import client +import six + +from kubernetes.client.exceptions import ( # noqa: F401 + ApiTypeError, + +) + + +class RolloutAppsV1Api(client.AppsV1Api): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def list_rollout_for_all_namespaces(self, **kwargs): # noqa: E501 + """list_rollout_for_all_namespaces # noqa: E501 + + list or watch objects of kind Deployment # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.list_rollout_for_all_namespaces(async_req=True) + >>> result = thread.get() + + :param async_req bool: execute request asynchronously + :param bool allow_watch_bookmarks: allowWatchBookmarks requests watch events with type \"BOOKMARK\". Servers that do not implement bookmarks may ignore this flag and bookmarks are sent at the server's discretion. Clients should not assume bookmarks are returned at any specific interval, nor may they assume the server will send any BOOKMARK event during a session. If this is not a watch, this field is ignored. + :param str _continue: The continue option should be set when retrieving more results from the server. Since this value is server defined, clients may only use the continue value from a previous query result with identical query parameters (except for the value of continue) and the server may reject a continue value it does not recognize. If the specified continue value is no longer valid whether due to expiration (generally five to fifteen minutes) or a configuration change on the server, the server will respond with a 410 ResourceExpired error together with a continue token. If the client needs a consistent list, it must restart their list without the continue field. Otherwise, the client may send another list request with the token received with the 410 error, the server will respond with a list starting from the next key, but from the latest snapshot, which is inconsistent from the previous list results - objects that are created, modified, or deleted after the first list request will be included in the response, as long as their keys are after the \"next key\". This field is not supported when watch is true. Clients may start a watch from the last resourceVersion value returned by the server and not miss any modifications. + :param str field_selector: A selector to restrict the list of returned objects by their fields. Defaults to everything. + :param str label_selector: A selector to restrict the list of returned objects by their labels. Defaults to everything. + :param int limit: limit is a maximum number of responses to return for a list call. If more items exist, the server will set the `continue` field on the list metadata to a value that can be used with the same initial query to retrieve the next set of results. Setting a limit may return fewer than the requested amount of items (up to zero items) in the event all requested objects are filtered out and clients should only use the presence of the continue field to determine whether more results are available. Servers may choose not to support the limit argument and will return all of the available results. If limit is specified and the continue field is empty, clients may assume that no more results are available. This field is not supported if watch is true. The server guarantees that the objects returned when using continue will be identical to issuing a single list call without a limit - that is, no objects created, modified, or deleted after the first request is issued will be included in any subsequent continued requests. This is sometimes referred to as a consistent snapshot, and ensures that a client that is using limit to receive smaller chunks of a very large result can ensure they see all possible objects. If objects are updated during a chunked list the version of the object that was present at the time the first list result was calculated is returned. + :param str pretty: If 'true', then the output is pretty printed. + :param str resource_version: resourceVersion sets a constraint on what resource versions a request may be served from. See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details. Defaults to unset + :param str resource_version_match: resourceVersionMatch determines how resourceVersion is applied to list calls. It is highly recommended that resourceVersionMatch be set for list calls where resourceVersion is set See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details. Defaults to unset + :param int timeout_seconds: Timeout for the list/watch call. This limits the duration of the call, regardless of any activity or inactivity. + :param bool watch: Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion. + :param _preload_content: if False, the urllib3.HTTPResponse object will + be returned without reading/decoding response + data. Default is True. + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :return: V1RolloutList + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + return self.list_rollout_for_all_namespaces_with_http_info(**kwargs) # noqa: E501 + + def list_rollout_for_all_namespaces_with_http_info(self, **kwargs): # noqa: E501 + """list_rollout_for_all_namespaces # noqa: E501 + + list or watch objects of kind Deployment # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.list_deployment_for_all_namespaces_with_http_info(async_req=True) + >>> result = thread.get() + + :param async_req bool: execute request asynchronously + :param bool allow_watch_bookmarks: allowWatchBookmarks requests watch events with type \"BOOKMARK\". Servers that do not implement bookmarks may ignore this flag and bookmarks are sent at the server's discretion. Clients should not assume bookmarks are returned at any specific interval, nor may they assume the server will send any BOOKMARK event during a session. If this is not a watch, this field is ignored. + :param str _continue: The continue option should be set when retrieving more results from the server. Since this value is server defined, clients may only use the continue value from a previous query result with identical query parameters (except for the value of continue) and the server may reject a continue value it does not recognize. If the specified continue value is no longer valid whether due to expiration (generally five to fifteen minutes) or a configuration change on the server, the server will respond with a 410 ResourceExpired error together with a continue token. If the client needs a consistent list, it must restart their list without the continue field. Otherwise, the client may send another list request with the token received with the 410 error, the server will respond with a list starting from the next key, but from the latest snapshot, which is inconsistent from the previous list results - objects that are created, modified, or deleted after the first list request will be included in the response, as long as their keys are after the \"next key\". This field is not supported when watch is true. Clients may start a watch from the last resourceVersion value returned by the server and not miss any modifications. + :param str field_selector: A selector to restrict the list of returned objects by their fields. Defaults to everything. + :param str label_selector: A selector to restrict the list of returned objects by their labels. Defaults to everything. + :param int limit: limit is a maximum number of responses to return for a list call. If more items exist, the server will set the `continue` field on the list metadata to a value that can be used with the same initial query to retrieve the next set of results. Setting a limit may return fewer than the requested amount of items (up to zero items) in the event all requested objects are filtered out and clients should only use the presence of the continue field to determine whether more results are available. Servers may choose not to support the limit argument and will return all of the available results. If limit is specified and the continue field is empty, clients may assume that no more results are available. This field is not supported if watch is true. The server guarantees that the objects returned when using continue will be identical to issuing a single list call without a limit - that is, no objects created, modified, or deleted after the first request is issued will be included in any subsequent continued requests. This is sometimes referred to as a consistent snapshot, and ensures that a client that is using limit to receive smaller chunks of a very large result can ensure they see all possible objects. If objects are updated during a chunked list the version of the object that was present at the time the first list result was calculated is returned. + :param str pretty: If 'true', then the output is pretty printed. + :param str resource_version: resourceVersion sets a constraint on what resource versions a request may be served from. See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details. Defaults to unset + :param str resource_version_match: resourceVersionMatch determines how resourceVersion is applied to list calls. It is highly recommended that resourceVersionMatch be set for list calls where resourceVersion is set See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details. Defaults to unset + :param int timeout_seconds: Timeout for the list/watch call. This limits the duration of the call, regardless of any activity or inactivity. + :param bool watch: Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion. + :param _return_http_data_only: response data without head status code + and headers + :param _preload_content: if False, the urllib3.HTTPResponse object will + be returned without reading/decoding response + data. Default is True. + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :return: tuple(V1DeploymentList, status_code(int), headers(HTTPHeaderDict)) + If the method is called asynchronously, + returns the request thread. + """ + + local_var_params = locals() + + all_params = [ + 'allow_watch_bookmarks', + '_continue', + 'field_selector', + 'label_selector', + 'limit', + 'pretty', + 'resource_version', + 'resource_version_match', + 'timeout_seconds', + 'watch' + ] + all_params.extend( + [ + 'async_req', + '_return_http_data_only', + '_preload_content', + '_request_timeout' + ] + ) + + for key, val in six.iteritems(local_var_params['kwargs']): + if key not in all_params: + raise ApiTypeError( + "Got an unexpected keyword argument '%s'" + " to method list_deployment_for_all_namespaces" % key + ) + local_var_params[key] = val + del local_var_params['kwargs'] + + collection_formats = {} + + path_params = {} + + query_params = [] + if 'allow_watch_bookmarks' in local_var_params and local_var_params['allow_watch_bookmarks'] is not None: # noqa: E501 + query_params.append(('allowWatchBookmarks', local_var_params['allow_watch_bookmarks'])) # noqa: E501 + if '_continue' in local_var_params and local_var_params['_continue'] is not None: # noqa: E501 + query_params.append(('continue', local_var_params['_continue'])) # noqa: E501 + if 'field_selector' in local_var_params and local_var_params['field_selector'] is not None: # noqa: E501 + query_params.append(('fieldSelector', local_var_params['field_selector'])) # noqa: E501 + if 'label_selector' in local_var_params and local_var_params['label_selector'] is not None: # noqa: E501 + query_params.append(('labelSelector', local_var_params['label_selector'])) # noqa: E501 + if 'limit' in local_var_params and local_var_params['limit'] is not None: # noqa: E501 + query_params.append(('limit', local_var_params['limit'])) # noqa: E501 + if 'pretty' in local_var_params and local_var_params['pretty'] is not None: # noqa: E501 + query_params.append(('pretty', local_var_params['pretty'])) # noqa: E501 + if 'resource_version' in local_var_params and local_var_params['resource_version'] is not None: # noqa: E501 + query_params.append(('resourceVersion', local_var_params['resource_version'])) # noqa: E501 + if 'resource_version_match' in local_var_params and local_var_params['resource_version_match'] is not None: # noqa: E501 + query_params.append(('resourceVersionMatch', local_var_params['resource_version_match'])) # noqa: E501 + if 'timeout_seconds' in local_var_params and local_var_params['timeout_seconds'] is not None: # noqa: E501 + query_params.append(('timeoutSeconds', local_var_params['timeout_seconds'])) # noqa: E501 + if 'watch' in local_var_params and local_var_params['watch'] is not None: # noqa: E501 + query_params.append(('watch', local_var_params['watch'])) # noqa: E501 + + header_params = {} + + form_params = [] + local_var_files = {} + + body_params = None + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['application/json', 'application/yaml', 'application/vnd.kubernetes.protobuf', 'application/json;stream=watch', 'application/vnd.kubernetes.protobuf;stream=watch']) # noqa: E501 + + # Authentication setting + auth_settings = ['BearerToken'] # noqa: E501 + + return self.api_client.call_api( + '/apis/argoproj.io/v1alpha1/rollouts', 'GET', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='V1DeploymentList', # noqa: E501 + auth_settings=auth_settings, + async_req=local_var_params.get('async_req'), + _return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501 + _preload_content=local_var_params.get('_preload_content', True), + _request_timeout=local_var_params.get('_request_timeout'), + collection_formats=collection_formats) -- cgit v1.2.3 From 631a1d2c4648b05a01b37019f784a0e7b533a977 Mon Sep 17 00:00:00 2001 From: arnoldyahad Date: Sun, 25 Jun 2023 12:14:47 +0300 Subject: Change import to be as the standard --- robusta_krr/core/integrations/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 105e49e..b79c4e4 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -20,7 +20,7 @@ from robusta_krr.core.models.objects import K8sObjectData, PodData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.configurable import Configurable -from .rollout import RolloutAppsV1Api +from robusta_krr.core.integrations.rollout import RolloutAppsV1Api class ClusterLoader(Configurable): -- cgit v1.2.3 From 9f0a2f8a0ce17a7501b812e1880da3a1afffcf78 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 5 Jul 2023 10:54:06 +0300 Subject: Fix linting errors --- pyproject.toml | 2 +- robusta_krr/core/integrations/prometheus/loader.py | 13 ++++++++----- .../core/integrations/prometheus/metrics/base_metric.py | 10 +++++++--- .../integrations/prometheus/metrics/memory_metric.py | 5 +++-- .../prometheus/metrics_service/base_metric_service.py | 3 +-- .../metrics_service/prometheus_metrics_service.py | 17 +++++++++-------- .../metrics_service/thanos_metrics_service.py | 5 ++--- .../metrics_service/victoria_metrics_service.py | 5 ++--- robusta_krr/core/runner.py | 4 ++-- robusta_krr/formatters/table.py | 2 +- robusta_krr/utils/configurable.py | 4 ++-- robusta_krr/utils/service_discovery.py | 7 +++++++ tests/conftest.py | 2 +- 13 files changed, 46 insertions(+), 33 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index af10fe7..e764643 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ multi_line_output = 3 include_trailing_comma = true [tool.mypy] -plugins = "numpy.typing.mypy_plugin" +plugins = "numpy.typing.mypy_plugin,pydantic.mypy" [tool.poetry.scripts] krr = "robusta_krr.main:run" diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 143fb63..4f3b832 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -1,6 +1,5 @@ -import asyncio import datetime -from typing import Optional, no_type_check +from typing import Optional from concurrent.futures import ThreadPoolExecutor @@ -49,10 +48,12 @@ class MetricsLoader(Configurable): if cluster is not None else None ) - self.loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster) - if not self.loader: + loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster) + if loader is None: raise PrometheusNotFound("No Prometheus or metrics service found") + self.loader = loader + self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster") def get_metrics_service( @@ -68,9 +69,11 @@ class MetricsLoader(Configurable): self.echo(f"{service_name} found") loader.validate_cluster_name() return loader - except MetricsNotFound as e: + except MetricsNotFound: self.debug(f"{service_name} not found") + return None + async def gather_data( self, object: K8sObjectData, diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py index 8bf263c..4e872b0 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -15,7 +15,8 @@ from robusta_krr.utils.configurable import Configurable if TYPE_CHECKING: from .. import CustomPrometheusConnect - MetricsDictionary = dict[str, type[BaseMetricLoader]] + +MetricsDictionary = dict[str, type["BaseMetricLoader"]] class QueryType(str, enum.Enum): @@ -72,6 +73,9 @@ class BaseMetricLoader(Configurable, abc.ABC): pass + def get_query_type(self) -> QueryType: + return QueryType.QueryRange + def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: """ This method should be implemented by all subclasses to provide a query string in the metadata to produce relevant graphs. @@ -158,7 +162,7 @@ class BaseMetricLoader(Configurable, abc.ABC): step=self._step_to_string(step), ) result = await self.query_prometheus(metric=metric, query_type=query_type) - # adding the query in the results for a graph + # adding the query in the results for a graph metric.query = self.get_graph_query(object, resolution) if result == []: @@ -173,7 +177,7 @@ class BaseMetricLoader(Configurable, abc.ABC): ) @staticmethod - def get_by_resource(resource: str, strategy: Optional[str]) -> type[BaseMetricLoader]: + def get_by_resource(resource: str, strategy: str) -> type[BaseMetricLoader]: """ Fetches the metric loader corresponding to the specified resource. diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index a8a4521..b87c5c3 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -23,9 +23,10 @@ class MemoryMetricLoader(BaseFilteredMetricLoader): def get_query_type(self) -> QueryType: return QueryType.QueryRange + # This is a temporary solutions, metric loaders will be moved to strategy in the future @override_metric("simple", ResourceType.Memory) -class MemoryMetricLoader(MemoryMetricLoader): +class SimpleMemoryMetricLoader(MemoryMetricLoader): """ A class that overrides the memory metric on the simple strategy. """ @@ -47,5 +48,5 @@ class MemoryMetricLoader(MemoryMetricLoader): def get_query_type(self) -> QueryType: return QueryType.Query - def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: + def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str: return super().get_query(object, resolution) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index 4337e5e..3178a61 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -42,7 +42,7 @@ class MetricsService(Configurable, abc.ABC): return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname @abc.abstractmethod - async def get_cluster_names(self) -> Optional[List[str]]: + def get_cluster_names(self) -> Optional[List[str]]: ... @abc.abstractmethod @@ -51,7 +51,6 @@ class MetricsService(Configurable, abc.ABC): object: K8sObjectData, resource: ResourceType, period: datetime.timedelta, - *, step: datetime.timedelta = datetime.timedelta(minutes=30), ) -> ResourceHistoryData: ... diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 31fe0c0..8703e56 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,7 +1,7 @@ import asyncio from concurrent.futures import ThreadPoolExecutor import datetime -from typing import List, Optional, no_type_check, Type +from typing import Optional, no_type_check, Type import requests from kubernetes.client import ApiClient @@ -13,14 +13,13 @@ from robusta_krr.core.abstract.strategies import ResourceHistoryData from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData, PodData from robusta_krr.core.models.result import ResourceType -from robusta_krr.utils.configurable import Configurable -from robusta_krr.utils.service_discovery import ServiceDiscovery +from robusta_krr.utils.service_discovery import MetricsServiceDiscovery from ..metrics import BaseMetricLoader from .base_metric_service import MetricsNotFound, MetricsService -class PrometheusDiscovery(ServiceDiscovery): +class PrometheusDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: """ Finds the Prometheus URL using selectors. @@ -89,7 +88,7 @@ class PrometheusMetricsService(MetricsService): *, cluster: Optional[str] = None, api_client: Optional[ApiClient] = None, - service_discovery: Type[ServiceDiscovery] = PrometheusDiscovery, + service_discovery: Type[MetricsServiceDiscovery] = PrometheusDiscovery, executor: Optional[ThreadPoolExecutor] = None, ) -> None: super().__init__(config=config, api_client=api_client, cluster=cluster, executor=executor) @@ -116,7 +115,7 @@ class PrometheusMetricsService(MetricsService): if self.auth_header: headers = {"Authorization": self.auth_header} - elif not self.config.inside_cluster: + elif not self.config.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) @@ -162,7 +161,9 @@ class PrometheusMetricsService(MetricsService): f"Label {cluster_label} does not exist, Rerun krr with the flag `-l ` where is one of {cluster_names}" ) - def get_cluster_names(self) -> Optional[List[str]]: + # Superclass method returns Optional[list[str]], but here we return list[str] + # NOTE that this does not break Liskov Substitution Principle + def get_cluster_names(self) -> list[str]: return self.prometheus.get_label_values(label_name=self.config.prometheus_label) async def gather_data( @@ -179,7 +180,7 @@ class PrometheusMetricsService(MetricsService): MetricLoaderType = BaseMetricLoader.get_by_resource(resource, self.config.strategy) await self.add_historic_pods(object, period) - + metric_loader = MetricLoaderType(self.config, self.prometheus, self.executor) return await metric_loader.load_data(object, period, step, self.name()) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py index 42066ae..120c471 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py @@ -1,15 +1,14 @@ from typing import Optional from kubernetes.client import ApiClient -from requests.exceptions import ConnectionError, HTTPError from concurrent.futures import ThreadPoolExecutor from robusta_krr.core.models.config import Config -from robusta_krr.utils.service_discovery import ServiceDiscovery +from robusta_krr.utils.service_discovery import MetricsServiceDiscovery from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService -class ThanosMetricsDiscovery(ServiceDiscovery): +class ThanosMetricsDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: """ Finds the Thanos URL using selectors. diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py index 9c15cab..38d376c 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py @@ -1,15 +1,14 @@ from typing import Optional from concurrent.futures import ThreadPoolExecutor from kubernetes.client import ApiClient -from requests.exceptions import ConnectionError, HTTPError from robusta_krr.core.models.config import Config -from robusta_krr.utils.service_discovery import ServiceDiscovery +from robusta_krr.utils.service_discovery import MetricsServiceDiscovery from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService -class VictoriaMetricsDiscovery(ServiceDiscovery): +class VictoriaMetricsDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: """ Finds the Victoria Metrics URL using selectors. diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 3cdf7b9..1ed4ac5 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -65,7 +65,7 @@ class Runner(Configurable): Formatter = self.config.Formatter formatted = result.format(Formatter) self.echo("\n", no_prefix=True) - self.print_result(formatted, rich=Formatter.__rich_console__) + self.print_result(formatted, rich=getattr(Formatter, "__rich_console__", False)) def __get_resource_minimal(self, resource: ResourceType) -> float: if resource == ResourceType.CPU: @@ -204,5 +204,5 @@ class Runner(Configurable): self._process_result(result) except ClusterNotSpecifiedException as e: self.error(e) - except Exception as e: + except Exception: self.console.print_exception(extra_lines=1, max_frames=10) diff --git a/robusta_krr/formatters/table.py b/robusta_krr/formatters/table.py index 846732a..a1c6e42 100644 --- a/robusta_krr/formatters/table.py +++ b/robusta_krr/formatters/table.py @@ -1,5 +1,5 @@ import itertools -from typing import Any, Optional +from typing import Any from rich.table import Table diff --git a/robusta_krr/utils/configurable.py b/robusta_krr/utils/configurable.py index 8954139..00c3a64 100644 --- a/robusta_krr/utils/configurable.py +++ b/robusta_krr/utils/configurable.py @@ -93,9 +93,9 @@ class Configurable(abc.ABC): self.echo(message, type="WARNING") - def error(self, message: str = "") -> None: + def error(self, message: str | Exception = "") -> None: """ Echoes an error message to the user """ - self.echo(message, type="ERROR") + self.echo(str(message), type="ERROR") diff --git a/robusta_krr/utils/service_discovery.py b/robusta_krr/utils/service_discovery.py index 42890d3..20138ed 100644 --- a/robusta_krr/utils/service_discovery.py +++ b/robusta_krr/utils/service_discovery.py @@ -1,4 +1,5 @@ from typing import Optional +from abc import ABC, abstractmethod from cachetools import TTLCache from kubernetes import client @@ -82,3 +83,9 @@ class ServiceDiscovery(Configurable): return ingress_url return None + + +class MetricsServiceDiscovery(ServiceDiscovery, ABC): + @abstractmethod + def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + pass diff --git a/tests/conftest.py b/tests/conftest.py index af43124..3b89e88 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,6 @@ import random from datetime import datetime, timedelta -from unittest.mock import AsyncMock, PropertyMock, patch +from unittest.mock import AsyncMock, patch import numpy as np import pytest -- cgit v1.2.3 From 96a626c51e868d65b00d9106a22d7496c9a22ccf Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 5 Jul 2023 10:57:09 +0300 Subject: Run isort on whole codebase --- robusta_krr/core/integrations/kubernetes.py | 9 ++++----- robusta_krr/core/integrations/prometheus/loader.py | 3 +-- .../core/integrations/prometheus/metrics/base_filtered_metric.py | 1 + robusta_krr/core/integrations/prometheus/metrics/base_metric.py | 6 ++++-- robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py | 3 ++- .../core/integrations/prometheus/metrics/memory_metric.py | 3 ++- .../prometheus/metrics_service/base_metric_service.py | 2 +- .../prometheus/metrics_service/prometheus_metrics_service.py | 4 ++-- .../prometheus/metrics_service/thanos_metrics_service.py | 3 ++- .../prometheus/metrics_service/victoria_metrics_service.py | 3 ++- robusta_krr/core/runner.py | 3 +-- robusta_krr/main.py | 1 - robusta_krr/utils/service_discovery.py | 2 +- 13 files changed, 23 insertions(+), 20 deletions(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 4940121..ce5166e 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -1,6 +1,6 @@ import asyncio -from concurrent.futures import ThreadPoolExecutor import itertools +from concurrent.futures import ThreadPoolExecutor from typing import Optional, Union from kubernetes import client, config # type: ignore @@ -10,22 +10,21 @@ from kubernetes.client.models import ( V1DaemonSetList, V1Deployment, V1DeploymentList, + V1Job, V1JobList, V1LabelSelector, - V1PodList, V1Pod, - V1Job, + V1PodList, V1StatefulSet, V1StatefulSetList, V2HorizontalPodAutoscaler, V2HorizontalPodAutoscalerList, ) -from robusta_krr.core.models.objects import K8sObjectData, PodData, HPAData +from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.configurable import Configurable - AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 4f3b832..590bc83 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -1,7 +1,6 @@ import datetime -from typing import Optional - from concurrent.futures import ThreadPoolExecutor +from typing import Optional from kubernetes import config as k8s_config from kubernetes.client.api_client import ApiClient diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py index c7ba3e7..5927aa4 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_filtered_metric.py @@ -1,4 +1,5 @@ from typing import Any, Optional + from robusta_krr.core.abstract.strategies import Metric from .base_metric import BaseMetricLoader, QueryType diff --git a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py index 4e872b0..46150cb 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base_metric.py @@ -2,11 +2,13 @@ from __future__ import annotations import abc import asyncio -from concurrent.futures import ThreadPoolExecutor import datetime -from typing import TYPE_CHECKING, Callable, Optional, TypeVar import enum +from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING, Callable, Optional, TypeVar + import numpy as np + from robusta_krr.core.abstract.strategies import Metric, ResourceHistoryData from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index bf27622..a604652 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -1,9 +1,10 @@ from typing import Optional + from robusta_krr.core.models.allocations import ResourceType from robusta_krr.core.models.objects import K8sObjectData from .base_filtered_metric import BaseFilteredMetricLoader -from .base_metric import bind_metric, QueryType +from .base_metric import QueryType, bind_metric @bind_metric(ResourceType.CPU) diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index b87c5c3..763ab26 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -1,9 +1,10 @@ from typing import Optional + from robusta_krr.core.models.allocations import ResourceType from robusta_krr.core.models.objects import K8sObjectData from .base_filtered_metric import BaseFilteredMetricLoader -from .base_metric import bind_metric, QueryType, override_metric +from .base_metric import QueryType, bind_metric, override_metric @bind_metric(ResourceType.Memory) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index 3178a61..e2654e8 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -1,6 +1,6 @@ import abc -from concurrent.futures import ThreadPoolExecutor import datetime +from concurrent.futures import ThreadPoolExecutor from typing import List, Optional from kubernetes.client.api_client import ApiClient diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 8703e56..78ec413 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,7 +1,7 @@ import asyncio -from concurrent.futures import ThreadPoolExecutor import datetime -from typing import Optional, no_type_check, Type +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Type, no_type_check import requests from kubernetes.client import ApiClient diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py index 120c471..f1de8b2 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py @@ -1,7 +1,8 @@ +from concurrent.futures import ThreadPoolExecutor from typing import Optional from kubernetes.client import ApiClient -from concurrent.futures import ThreadPoolExecutor + from robusta_krr.core.models.config import Config from robusta_krr.utils.service_discovery import MetricsServiceDiscovery diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py index 38d376c..fd9f8a9 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py @@ -1,5 +1,6 @@ -from typing import Optional from concurrent.futures import ThreadPoolExecutor +from typing import Optional + from kubernetes.client import ApiClient from robusta_krr.core.models.config import Config diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 1ed4ac5..68ceb18 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -1,8 +1,7 @@ import asyncio import math -from typing import Optional, Union - from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Union from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 03b4f21..185a819 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -6,7 +6,6 @@ from datetime import datetime from typing import List, Literal, Optional, Union from uuid import UUID - import typer import urllib3 diff --git a/robusta_krr/utils/service_discovery.py b/robusta_krr/utils/service_discovery.py index 20138ed..c1e2763 100644 --- a/robusta_krr/utils/service_discovery.py +++ b/robusta_krr/utils/service_discovery.py @@ -1,5 +1,5 @@ -from typing import Optional from abc import ABC, abstractmethod +from typing import Optional from cachetools import TTLCache from kubernetes import client -- cgit v1.2.3 From 914bfcb1558cdfd35592ba29d091040288848114 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 5 Jul 2023 10:59:55 +0300 Subject: Remove 3.10 union syntax --- robusta_krr/utils/configurable.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robusta_krr/utils/configurable.py b/robusta_krr/utils/configurable.py index 00c3a64..54e71ff 100644 --- a/robusta_krr/utils/configurable.py +++ b/robusta_krr/utils/configurable.py @@ -1,6 +1,6 @@ import abc from inspect import getframeinfo, stack -from typing import Literal +from typing import Literal, Union from rich.console import Console @@ -93,7 +93,7 @@ class Configurable(abc.ABC): self.echo(message, type="WARNING") - def error(self, message: str | Exception = "") -> None: + def error(self, message: Union[str, Exception] = "") -> None: """ Echoes an error message to the user """ -- cgit v1.2.3 From e3ddd9256d787062bbdf252f6fb66a361f64fc61 Mon Sep 17 00:00:00 2001 From: Avi-Robusta <97387909+Avi-Robusta@users.noreply.github.com> Date: Fri, 7 Jul 2023 17:45:11 +0300 Subject: remove redundant max (#98) --- robusta_krr/core/integrations/prometheus/metrics/memory_metric.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index a8a4521..54c6c2d 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -35,13 +35,13 @@ class MemoryMetricLoader(MemoryMetricLoader): cluster_label = self.get_prometheus_cluster_label() resolution_formatted = f"[{resolution}]" if resolution else "" return ( - f"max(max_over_time(container_memory_working_set_bytes{{" + f"max_over_time(container_memory_working_set_bytes{{" f'namespace="{object.namespace}", ' f'pod=~"{pods_selector}", ' f'container="{object.container}"' f"{cluster_label}}}" f"{resolution_formatted}" - f")) by (container, pod, job, id)" + f")" ) def get_query_type(self) -> QueryType: -- cgit v1.2.3 From 145ea7f2bd37e84f53660b4eb3b14ffd4d16806c Mon Sep 17 00:00:00 2001 From: arnoldyahad Date: Tue, 11 Jul 2023 12:31:28 +0300 Subject: Docker directory for specific clouds --- docker/README.md | 12 ++++++++++++ docker/aws.Dockerfile | 28 ++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 docker/README.md create mode 100644 docker/aws.Dockerfile diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000..a1bf1da --- /dev/null +++ b/docker/README.md @@ -0,0 +1,12 @@ +# Dockerfiles for specific clouds + +This directory will include Dockerfiles for various cloud providers. + +## AWS + +For the usage of `krr` container we need the Dockerfile to have `awscli` installed on it. +The `aws.Dockerfile` is a modified `krr` dockerfile which includes: + - installation of curl & zip + - installation of awscli + + diff --git a/docker/aws.Dockerfile b/docker/aws.Dockerfile new file mode 100644 index 0000000..13d79b8 --- /dev/null +++ b/docker/aws.Dockerfile @@ -0,0 +1,28 @@ +# Use the official Python 3.9 slim image as the base image +FROM python:3.9-slim as builder + +# Set the working directory +WORKDIR /app + +# Install system dependencies required for Poetry +RUN apt-get update && \ + dpkg --add-architecture arm64 + +COPY ./requirements.txt requirements.txt + +# Install the project dependencies +RUN pip install -r requirements.txt + +# Install curl and unzip for awscli +RUN apt-get -y update; apt-get -y install curl; apt-get -y install unzip + +# Download awscli and unzip it +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ + unzip awscliv2.zip && \ + ./aws/install + +# Copy the rest of the application code +COPY . . + +# Run the application using 'poetry run krr simple' +ENTRYPOINT ["python", "krr.py", "simple"] -- cgit v1.2.3 From d120a07c71cfdb2d24de6f912f4256273b4bf364 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Tue, 11 Jul 2023 14:58:07 +0300 Subject: Fix rollout not found --- robusta_krr/core/integrations/kubernetes.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 87cbe6b..aefc943 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -4,6 +4,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Optional, Union from kubernetes import client, config # type: ignore +from kubernetes.client import ApiException from kubernetes.client.models import ( V1Container, V1DaemonSet, @@ -152,7 +153,14 @@ class ClusterLoader(Configurable): ) async def _list_rollouts(self) -> list[K8sObjectData]: - ret: V1DeploymentList = await asyncio.to_thread(self.rollout.list_rollout_for_all_namespaces, watch=False) + try: + ret: V1DeploymentList = await asyncio.to_thread(self.rollout.list_rollout_for_all_namespaces, watch=False) + except ApiException as e: + if e.status == 404: + self.debug(f"Rollout API not available in {self.cluster}") + return [] + raise + self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}") return await asyncio.gather( -- cgit v1.2.3