summaryrefslogtreecommitdiff
path: root/robusta_krr/main.py
blob: e61db7ff195560ae1b9032de775f3854401079e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from __future__ import annotations

import asyncio
import inspect
import logging
from datetime import datetime
from typing import List, Optional
from uuid import UUID

import typer
import urllib3
from pydantic import ValidationError  # noqa: F401
from typer.models import OptionInfo

from robusta_krr import formatters as concrete_formatters  # noqa: F401
from robusta_krr.core.abstract import formatters
from robusta_krr.core.abstract.strategies import BaseStrategy
from robusta_krr.core.models.config import Config
from robusta_krr.core.runner import Runner
from robusta_krr.utils.version import get_version

app = typer.Typer(pretty_exceptions_show_locals=False, pretty_exceptions_short=True, no_args_is_help=True)

# NOTE: Disable insecure request warnings, as it might be expected to use self-signed certificates inside the cluster
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger("krr")


@app.command(rich_help_panel="Utils")
def version() -> None:
    typer.echo(get_version())


def __process_type(_T: type) -> type:
    """Process type to a python literal"""
    if _T in (int, float, str, bool, datetime, UUID):
        return _T
    elif _T is Optional:
        return Optional[{__process_type(_T.__args__[0])}]  # type: ignore
    else:
        return str  # If the type is unknown, just use str and let pydantic handle it


def load_commands() -> None:
    for strategy_name, strategy_type in BaseStrategy.get_all().items():  # type: ignore
        # NOTE: This wrapper here is needed to avoid the strategy_name being overwritten in the loop
        def strategy_wrapper(_strategy_name: str = strategy_name):
            def run_strategy(
                ctx: typer.Context,
                kubeconfig: Optional[str] = typer.Option(
                    None,
                    "--kubeconfig",
                    "-k",
                    help="Path to kubeconfig file. If not provided, will attempt to find it.",
                    rich_help_panel="Kubernetes Settings",
                ),
                clusters: List[str] = typer.Option(
                    None,
                    "--context",
                    "--cluster",
                    "-c",
                    help="List of clusters to run on. By default, will run on the current cluster. Use --all-clusters to run on all clusters.",
                    rich_help_panel="Kubernetes Settings",
                ),
                all_clusters: bool = typer.Option(
                    False,
                    "--all-clusters",
                    help="Run on all clusters. Overrides --context.",
                    rich_help_panel="Kubernetes Settings",
                ),
                namespaces: List[str] = typer.Option(
                    None,
                    "--namespace",
                    "-n",
                    help="List of namespaces to run on. By default, will run on all namespaces except 'kube-system'.",
                    rich_help_panel="Kubernetes Settings",
                ),
                resources: List[str] = typer.Option(
                    None,
                    "--resource",
                    "-r",
                    help="List of resources to run on (Deployment, StatefulSet, DaemonSet, Job, Rollout). By default, will run on all resources. Case insensitive.",
                    rich_help_panel="Kubernetes Settings",
                ),
                selector: Optional[str] = typer.Option(
                    None,
                    "--selector",
                    "-s",
                    help="Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -s key1=value1,key2=value2). Matching objects must satisfy all of the specified label constraints.",
                    rich_help_panel="Kubernetes Settings",
                ),
                prometheus_url: Optional[str] = typer.Option(
                    None,
                    "--prometheus-url",
                    "-p",
                    help="Prometheus URL. If not provided, will attempt to find it in kubernetes cluster",
                    rich_help_panel="Prometheus Settings",
                ),
                prometheus_auth_header: Optional[str] = typer.Option(
                    None,
                    "--prometheus-auth-header",
                    help="Prometheus authentication header.",
                    rich_help_panel="Prometheus Settings",
                ),
                prometheus_other_headers: Optional[List[str]] = typer.Option(
                    None,
                    "--prometheus-headers",
                    "-H",
                    help="Additional headers to add to Prometheus requests. Format as 'key: value', for example 'X-MyHeader: 123'. Trailing whitespaces will be stripped.",
                    rich_help_panel="Prometheus Settings",
                ),
                prometheus_ssl_enabled: bool = typer.Option(
                    False,
                    "--prometheus-ssl-enabled",
                    help="Enable SSL for Prometheus requests.",
                    rich_help_panel="Prometheus Settings",
                ),
                prometheus_cluster_label: Optional[str] = typer.Option(
                    None,
                    "--prometheus-cluster-label",
                    "-l",
                    help="The label in prometheus for your cluster.(Only relevant for centralized prometheus)",
                    rich_help_panel="Prometheus Settings",
                ),
                prometheus_label: str = typer.Option(
                    None,
                    "--prometheus-label",
                    help="The label in prometheus used to differentiate clusters. (Only relevant for centralized prometheus)",
                    rich_help_panel="Prometheus Settings",
                ),
                eks_managed_prom: bool = typer.Option(
                    False,
                    "--eks-managed-prom",
                    help="Adds additional signitures for eks prometheus connection.",
                    rich_help_panel="Prometheus EKS Settings",
                ),
                eks_managed_prom_profile_name: Optional[str] = typer.Option(
                    None,
                    "--eks-profile-name",
                    help="Sets the profile name for eks prometheus connection.",
                    rich_help_panel="Prometheus EKS Settings",
                ),
                eks_access_key: Optional[str] = typer.Option(
                    None,
                    "--eks-access-key",
                    help="Sets the access key for eks prometheus connection.",
                    rich_help_panel="Prometheus EKS Settings",
                ),
                eks_secret_key: Optional[str] = typer.Option(
                    None,
                    "--eks-secret-key",
                    help="Sets the secret key for eks prometheus connection.",
                    rich_help_panel="Prometheus EKS Settings",
                ),
                eks_service_name: Optional[str] = typer.Option(
                    "aps",
                    "--eks-service-name",
                    help="Sets the service name for eks prometheus connection.",
                    rich_help_panel="Prometheus EKS Settings",
                ),
                eks_managed_prom_region: Optional[str] = typer.Option(
                    None,
                    "--eks-managed-prom-region",
                    help="Sets the region for eks prometheus connection.",
                    rich_help_panel="Prometheus EKS Settings",
                ),
                coralogix_token: Optional[str] = typer.Option(
                    None,
                    "--coralogix-token",
                    help="Adds the token needed to query Coralogix managed prometheus.",
                    rich_help_panel="Prometheus Coralogix Settings",
                ),
                openshift: bool = typer.Option(
                    False,
                    "--openshift",
                    help="Used when running by Robusta inside an OpenShift cluster.",
                    rich_help_panel="Prometheus Openshift Settings",
                    hidden=True,
                ),
                cpu_min_value: int = typer.Option(
                    10,
                    "--cpu-min",
                    help="Sets the minimum recommended cpu value in millicores.",
                    rich_help_panel="Recommendation Settings",
                ),
                memory_min_value: int = typer.Option(
                    100,
                    "--mem-min",
                    help="Sets the minimum recommended memory value in MB.",
                    rich_help_panel="Recommendation Settings",
                ),
                max_workers: int = typer.Option(
                    10,
                    "--max-workers",
                    "-w",
                    help="Max workers to use for async requests.",
                    rich_help_panel="Threading Settings",
                ),
                format: str = typer.Option(
                    "table",
                    "--formatter",
                    "-f",
                    help=f"Output formatter ({', '.join(formatters.list_available())})",
                    rich_help_panel="Logging Settings",
                ),
                show_cluster_name: bool = typer.Option(
                    False, "--show-cluster-name", help="In table output, always show the cluster name even for a single cluster", rich_help_panel="Output Settings"
                ),
                verbose: bool = typer.Option(
                    False, "--verbose", "-v", help="Enable verbose mode", rich_help_panel="Logging Settings"
                ),
                quiet: bool = typer.Option(
                    False, "--quiet", "-q", help="Enable quiet mode", rich_help_panel="Logging Settings"
                ),
                log_to_stderr: bool = typer.Option(
                    False, "--logtostderr", help="Pass logs to stderr", rich_help_panel="Logging Settings"
                ),
                width: Optional[int] = typer.Option(
                    None,
                    "--width",
                    help="Width of the output. Will use console width by default.",
                    rich_help_panel="Logging Settings",
                ),
                file_output: Optional[str] = typer.Option(
                    None, "--fileoutput", help="Print the output to a file", rich_help_panel="Output Settings"
                ),
                slack_output: Optional[str] = typer.Option(
                    None,
                    "--slackoutput",
                    help="Send to output to a slack channel, must have SLACK_BOT_TOKEN",
                    rich_help_panel="Output Settings",
                ),
                **strategy_args,
            ) -> None:
                f"""Run KRR using the `{_strategy_name}` strategy"""

                try:
                    config = Config(
                        kubeconfig=kubeconfig,
                        clusters="*" if all_clusters else clusters,
                        namespaces="*" if "*" in namespaces else namespaces,
                        resources="*" if "*" in resources else resources,
                        selector=selector,
                        prometheus_url=prometheus_url,
                        prometheus_auth_header=prometheus_auth_header,
                        prometheus_other_headers=prometheus_other_headers,
                        prometheus_ssl_enabled=prometheus_ssl_enabled,
                        prometheus_cluster_label=prometheus_cluster_label,
                        prometheus_label=prometheus_label,
                        eks_managed_prom=eks_managed_prom,
                        eks_managed_prom_region=eks_managed_prom_region,
                        eks_managed_prom_profile_name=eks_managed_prom_profile_name,
                        eks_access_key=eks_access_key,
                        eks_secret_key=eks_secret_key,
                        eks_service_name=eks_service_name,
                        coralogix_token=coralogix_token,
                        openshift=openshift,
                        max_workers=max_workers,
                        format=format,
                        show_cluster_name=show_cluster_name,
                        verbose=verbose,
                        cpu_min_value=cpu_min_value,
                        memory_min_value=memory_min_value,
                        quiet=quiet,
                        log_to_stderr=log_to_stderr,
                        width=width,
                        file_output=file_output,
                        slack_output=slack_output,
                        strategy=_strategy_name,
                        other_args=strategy_args,
                    )
                    Config.set_config(config)
                except ValidationError:
                    logger.exception("Error occured while parsing arguments")
                else:
                    runner = Runner()
                    asyncio.run(runner.run())

            run_strategy.__name__ = strategy_name
            signature = inspect.signature(run_strategy)
            run_strategy.__signature__ = signature.replace(  # type: ignore
                parameters=list(signature.parameters.values())[:-1]
                + [
                    inspect.Parameter(
                        name=field_name,
                        kind=inspect.Parameter.KEYWORD_ONLY,
                        default=OptionInfo(
                            default=field_meta.default,
                            param_decls=list(set([f"--{field_name}", f"--{field_name.replace('_', '-')}"])),
                            help=f"{field_meta.field_info.description}",
                            rich_help_panel="Strategy Settings",
                        ),
                        annotation=__process_type(field_meta.type_),
                    )
                    for field_name, field_meta in strategy_type.get_settings_type().__fields__.items()
                ]
            )

            app.command(rich_help_panel="Strategies")(run_strategy)

        strategy_wrapper()


def run() -> None:
    load_commands()
    app()


if __name__ == "__main__":
    run()