1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
from __future__ import annotations
from typing import Literal, Optional
import pydantic as pd
from robusta_krr.core.models.allocations import ResourceAllocations
from robusta_krr.utils.batched import batched
from kubernetes.client.models import V1LabelSelector
KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig"]
class PodData(pd.BaseModel):
name: str
deleted: bool
def __hash__(self) -> int:
return hash(self.name)
class HPAKey(pd.BaseModel):
namespace: str
kind: str
name: str
class Config:
allow_mutation = False
def __hash__(self) -> int:
return hash((self.namespace, self.kind, self.name))
class HPAData(pd.BaseModel):
min_replicas: Optional[int]
max_replicas: int
target_cpu_utilization_percentage: Optional[float]
target_memory_utilization_percentage: Optional[float]
PodWarning = Literal[
"NoPrometheusPods",
"NoPrometheusCPUMetrics",
"NoPrometheusMemoryMetrics",
]
class K8sWorkload(pd.BaseModel):
# NOTE: Here None means that we are running inside the cluster
cluster: Optional[str]
name: str
container: str
pods: list[PodData] = []
hpa: Optional[HPAData]
namespace: str
kind: KindLiteral
allocations: ResourceAllocations
warnings: set[PodWarning] = set()
_api_resource = pd.PrivateAttr(None)
def __str__(self) -> str:
return f"{self.kind} {self.namespace}/{self.name}/{self.container}"
def __repr__(self) -> str:
return f"<K8sWorkload {self}>"
def __hash__(self) -> int:
return hash(str(self))
def add_warning(self, warning: PodWarning) -> None:
self.warnings.add(warning)
@property
def cluster_selector(self) -> str:
from robusta_krr.core.models.config import settings
if settings.prometheus_label is not None:
return f'{settings.prometheus_cluster_label}="{settings.prometheus_label}",'
if settings.prometheus_cluster_label is None:
return ""
return f'{settings.prometheus_cluster_label}="{self.cluster}",' if self.cluster else ""
@property
def current_pods_count(self) -> int:
return len([pod for pod in self.pods if not pod.deleted])
@property
def deleted_pods_count(self) -> int:
return len([pod for pod in self.pods if pod.deleted])
@property
def pods_count(self) -> int:
return len(self.pods)
@property
def selector(self) -> V1LabelSelector:
if self._api_resource is None:
raise ValueError("api_resource is not set")
if self.kind == 'CronJob':
return self._api_resource.spec.job_template.spec.selector
else:
return self._api_resource.spec.selector
def split_into_batches(self, n: int) -> list[K8sWorkload]:
"""
Batch this object into n objects, splitting the pods into batches of size n.
"""
if self.pods_count <= n:
return [self]
return [
K8sWorkload(
cluster=self.cluster,
name=self.name,
container=self.container,
pods=batch,
hpa=self.hpa,
namespace=self.namespace,
kind=self.kind,
allocations=self.allocations,
)
for batch in batched(self.pods, n)
]
|