summaryrefslogtreecommitdiff
path: root/robusta_krr/core/models/objects.py
blob: 21289036315e8fcaee6abce5ca36aa1d6d3c0b8b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from __future__ import annotations

from typing import Literal, Optional

import pydantic as pd

from robusta_krr.core.models.allocations import ResourceAllocations
from robusta_krr.utils.batched import batched
from kubernetes.client.models import V1LabelSelector

KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig"]


class PodData(pd.BaseModel):
    name: str
    deleted: bool

    def __hash__(self) -> int:
        return hash(self.name)


class HPAKey(pd.BaseModel):
    namespace: str
    kind: str
    name: str

    class Config:
        allow_mutation = False

    def __hash__(self) -> int:
        return hash((self.namespace, self.kind, self.name))


class HPAData(pd.BaseModel):
    min_replicas: Optional[int]
    max_replicas: int
    target_cpu_utilization_percentage: Optional[float]
    target_memory_utilization_percentage: Optional[float]


PodWarning = Literal[
    "NoPrometheusPods",
    "NoPrometheusCPUMetrics",
    "NoPrometheusMemoryMetrics",
]


class K8sWorkload(pd.BaseModel):
    # NOTE: Here None means that we are running inside the cluster
    cluster: Optional[str]
    name: str
    container: str
    pods: list[PodData] = []
    hpa: Optional[HPAData]
    namespace: str
    kind: KindLiteral
    allocations: ResourceAllocations
    warnings: set[PodWarning] = set()

    _api_resource = pd.PrivateAttr(None)

    def __str__(self) -> str:
        return f"{self.kind} {self.namespace}/{self.name}/{self.container}"

    def __repr__(self) -> str:
        return f"<K8sWorkload {self}>"

    def __hash__(self) -> int:
        return hash(str(self))

    def add_warning(self, warning: PodWarning) -> None:
        self.warnings.add(warning)

    @property
    def cluster_selector(self) -> str:
        from robusta_krr.core.models.config import settings

        if settings.prometheus_label is not None:
            return f'{settings.prometheus_cluster_label}="{settings.prometheus_label}",'

        if settings.prometheus_cluster_label is None:
            return ""

        return f'{settings.prometheus_cluster_label}="{self.cluster}",' if self.cluster else ""

    @property
    def current_pods_count(self) -> int:
        return len([pod for pod in self.pods if not pod.deleted])

    @property
    def deleted_pods_count(self) -> int:
        return len([pod for pod in self.pods if pod.deleted])

    @property
    def pods_count(self) -> int:
        return len(self.pods)

    @property
    def selector(self) -> V1LabelSelector:
        if self._api_resource is None:
            raise ValueError("api_resource is not set")

        if self.kind == 'CronJob':
            return self._api_resource.spec.job_template.spec.selector
        else:
            return self._api_resource.spec.selector

    def split_into_batches(self, n: int) -> list[K8sWorkload]:
        """
        Batch this object into n objects, splitting the pods into batches of size n.
        """

        if self.pods_count <= n:
            return [self]

        return [
            K8sWorkload(
                cluster=self.cluster,
                name=self.name,
                container=self.container,
                pods=batch,
                hpa=self.hpa,
                namespace=self.namespace,
                kind=self.kind,
                allocations=self.allocations,
            )
            for batch in batched(self.pods, n)
        ]