summaryrefslogtreecommitdiff
path: root/robusta_krr/core/models/config.py
blob: 4f8b50664e2b9ce2ddff4a09ae95b74bb33417c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from __future__ import annotations

import logging
from enum import Enum
import sys
from typing import Any, Literal, Optional, Union

import pydantic as pd
from kubernetes import config, client
from kubernetes.config.config_exception import ConfigException
from rich.console import Console
from rich.logging import RichHandler

from robusta_krr.core.abstract import formatters
from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy
from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader
from robusta_krr.core.models.objects import KindLiteral

logger = logging.getLogger("krr")


class LoadingMode(str, Enum):
    KUBEAPI = "kubeapi"
    PROMETHETUS = "prometheus"


class Config(pd.BaseSettings):
    quiet: bool = pd.Field(False)
    verbose: bool = pd.Field(False)

    clusters: Union[list[str], Literal["*"], None] = None
    kubeconfig: Optional[str] = None
    mode: LoadingMode = pd.Field(LoadingMode.KUBEAPI)
    impersonate_user: Optional[str] = None
    impersonate_group: Optional[str] = None
    namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
    resources: Union[list[KindLiteral], Literal["*"]] = pd.Field("*")
    selector: Optional[str] = None

    # Value settings
    cpu_min_value: int = pd.Field(10, ge=0)  # in millicores
    memory_min_value: int = pd.Field(100, ge=0)  # in megabytes

    # Prometheus Settings
    prometheus_url: Optional[str] = pd.Field(None)
    prometheus_auth_header: Optional[str] = pd.Field(None)
    prometheus_other_headers: dict[str, str] = pd.Field(default_factory=dict)
    prometheus_ssl_enabled: bool = pd.Field(False)
    prometheus_cluster_label: Optional[str] = pd.Field(None)
    prometheus_label: Optional[str] = pd.Field(None)
    eks_managed_prom: bool = pd.Field(False)
    eks_managed_prom_profile_name: Optional[str] = pd.Field(None)
    eks_access_key: Optional[str] = pd.Field(None)
    eks_secret_key: Optional[str] = pd.Field(None)
    eks_service_name: Optional[str] = pd.Field(None)
    eks_managed_prom_region: Optional[str] = pd.Field(None)
    coralogix_token: Optional[str] = pd.Field(None)
    openshift: bool = pd.Field(False)

    # Threading settings
    max_workers: int = pd.Field(6, ge=1)

    # Logging Settings
    format: str
    show_cluster_name: bool
    strategy: str
    log_to_stderr: bool
    width: Optional[int] = pd.Field(None, ge=1)

    # Outputs Settings
    file_output: Optional[str] = pd.Field(None)
    slack_output: Optional[str] = pd.Field(None)

    other_args: dict[str, Any]

    # Internal
    inside_cluster: bool = False
    _logging_console: Optional[Console] = pd.PrivateAttr(None)

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)

    @property
    def Formatter(self) -> formatters.FormatterFunc:
        return formatters.find(self.format)

    @pd.validator("prometheus_url")
    def validate_prometheus_url(cls, v: Optional[str]):
        if v is None:
            return None

        if not v.startswith("https://") and not v.startswith("http://"):
            raise Exception("--prometheus-url must start with https:// or http://")

        v = v.removesuffix("/")

        return v

    @pd.validator("prometheus_other_headers", pre=True)
    def validate_prometheus_other_headers(cls, headers: Union[list[str], dict[str, str]]) -> dict[str, str]:
        if isinstance(headers, dict):
            return headers

        return {k.strip().lower(): v.strip() for k, v in [header.split(":") for header in headers]}

    @pd.validator("namespaces")
    def validate_namespaces(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str], Literal["*"]]:
        if v == []:
            return "*"

        return [val.lower() for val in v]

    @pd.validator("resources", pre=True)
    def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str], Literal["*"]]:
        if v == []:
            return "*"

        # NOTE: KindLiteral.__args__ is a tuple of all possible values of KindLiteral
        # So this will preserve the big and small letters of the resource
        return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v]

    def create_strategy(self) -> AnyStrategy:
        StrategyType = AnyStrategy.find(self.strategy)
        StrategySettingsType = StrategyType.get_settings_type()
        return StrategyType(StrategySettingsType(**self.other_args))  # type: ignore

    @pd.validator("strategy")
    def validate_strategy(cls, v: str) -> str:
        BaseStrategy.find(v)  # NOTE: raises if strategy is not found
        return v

    @pd.validator("format")
    def validate_format(cls, v: str) -> str:
        formatters.find(v)  # NOTE: raises if strategy is not found
        return v

    @property
    def context(self) -> Optional[str]:
        return self.clusters[0] if self.clusters != "*" and self.clusters else None

    @property
    def logging_console(self) -> Console:
        if getattr(self, "_logging_console") is None:
            self._logging_console = Console(file=sys.stderr if self.log_to_stderr else sys.stdout, width=self.width)
        return self._logging_console

    def create_cluster_loader(self) -> BaseClusterLoader:
        from robusta_krr.core.integrations.prometheus.cluster_loader import PrometheusClusterLoader
        from robusta_krr.core.integrations.kubernetes.cluster_loader import KubeAPIClusterLoader

        if settings.mode == LoadingMode.KUBEAPI:
            logger.info("Connecting using Kubernetes API, will load the kubeconfig.")
            return KubeAPIClusterLoader()
        elif settings.mode == LoadingMode.PROMETHETUS:
            logger.info("Connecting using Prometheus, will load the kubeconfig.")
            return PrometheusClusterLoader()
        else:
            raise NotImplementedError(f"Workload loader {settings.mode} is not implemented")

    def load_kubeconfig(self) -> None:
        try:
            config.load_kube_config(config_file=self.kubeconfig, context=self.context)
            self.inside_cluster = False
        except ConfigException:
            config.load_incluster_config()
            self.inside_cluster = True

    def get_kube_client(self, context: Optional[str] = None) -> client.ApiClient:
        if context is None:
            return None

        api_client = config.new_client_from_config(context=context, config_file=self.kubeconfig)
        if self.impersonate_user is not None:
            # trick copied from https://github.com/kubernetes-client/python/issues/362
            api_client.set_default_header("Impersonate-User", self.impersonate_user)
        if self.impersonate_group is not None:
            api_client.set_default_header("Impersonate-Group", self.impersonate_group)
        return api_client

    @staticmethod
    def set_config(config: Config) -> None:
        global _config

        _config = config
        logging.basicConfig(
            level="NOTSET",
            format="%(message)s",
            datefmt="[%X]",
            handlers=[RichHandler(console=config.logging_console)],
        )
        logging.getLogger("").setLevel(logging.CRITICAL)
        logger.setLevel(logging.DEBUG if config.verbose else logging.CRITICAL if config.quiet else logging.INFO)


# NOTE: This class is just a proxy for _config.
# Import settings from this module and use it like it is just a config object.
class _Settings(Config):  # Config here is used for type checking
    def __init__(self) -> None:
        pass

    def __getattr__(self, name: str):
        if _config is None:
            raise AttributeError("Config is not set")

        return getattr(_config, name)


_config: Optional[Config] = None
settings = _Settings()