|
1 | 1 | import time
|
2 |
| -from typing import Dict, Optional, List, Callable, Any |
| 2 | +from typing import Dict, Optional, List, Callable, Mapping, Any |
3 | 3 | from kubernetes import client, config
|
4 | 4 | from kubernetes.client import V1Secret, CoreV1Api, CustomObjectsApi
|
5 | 5 | from kubernetes.client.rest import ApiException
|
6 | 6 | from time import sleep
|
7 | 7 |
|
8 | 8 |
|
| 9 | +def is_subset(_set: Optional[Mapping[str, str]], _subset: Optional[Mapping[str, str]]) -> bool: |
| 10 | + if _set is None: |
| 11 | + return _subset is None |
| 12 | + |
| 13 | + for key, item in _subset.items(): |
| 14 | + if _set.get(key, None) != item: |
| 15 | + return False |
| 16 | + return True |
| 17 | + |
| 18 | + |
9 | 19 | def wait_for_pod_ready_with_events(pod_selector: dict, namespace: str, timeout_seconds: int = 300):
|
10 | 20 | """
|
11 | 21 | Wait for a pod to be ready in the specified namespace and print all events.
|
@@ -52,6 +62,7 @@ class ClusterSecretManager:
|
52 | 62 | def __init__(self, custom_objects_api: CustomObjectsApi, api_instance: CoreV1Api):
|
53 | 63 | self.custom_objects_api: CustomObjectsApi = custom_objects_api
|
54 | 64 | self.api_instance: CoreV1Api = api_instance
|
| 65 | + # immutable after |
55 | 66 | self.retry_attempts = 3
|
56 | 67 | self.retry_delay = 5
|
57 | 68 |
|
@@ -167,48 +178,79 @@ def get_kubernetes_secret(self, name: str, namespace: str) -> Optional[V1Secret]
|
167 | 178 | raise e
|
168 | 179 |
|
169 | 180 | def validate_namespace_secrets(
|
170 |
| - self, name: str, |
| 181 | + self, |
| 182 | + name: str, |
171 | 183 | data: Dict[str, str],
|
172 |
| - namespaces: Optional[List[str]] = None |
| 184 | + namespaces: Optional[List[str]] = None, |
| 185 | + labels: Optional[Dict[str, str]] = None, |
| 186 | + annotations: Optional[Dict[str, str]] = None, |
173 | 187 | ) -> bool:
|
174 | 188 | """
|
175 | 189 |
|
176 | 190 | Parameters
|
177 | 191 | ----------
|
178 |
| - name |
179 |
| - data |
| 192 | + name: str |
| 193 | + data: Dict[str, str] |
180 | 194 | namespaces: Optional[List[str]]
|
181 | 195 | If None, it means the secret should be present in ALL namespaces
|
| 196 | + annotations: Optional[Dict[str, str]] |
| 197 | + labels: Optional[Dict[str, str]] |
182 | 198 |
|
183 | 199 | Returns
|
184 | 200 | -------
|
185 | 201 |
|
186 | 202 | """
|
187 | 203 | all_namespaces = [item.metadata.name for item in self.api_instance.list_namespace().items]
|
188 | 204 |
|
189 |
| - def validate(): |
| 205 | + def validate() -> Optional[str]: |
190 | 206 | for namespace in all_namespaces:
|
191 | 207 |
|
192 | 208 | secret = self.get_kubernetes_secret(name=name, namespace=namespace)
|
193 | 209 |
|
194 | 210 | if namespaces is not None and namespace not in namespaces:
|
195 | 211 | if secret is None:
|
196 | 212 | continue
|
197 |
| - return False |
| 213 | + return f'' |
| 214 | + |
| 215 | + if secret is None: |
| 216 | + return f'secret {name} is none in namespace {namespace}.' |
| 217 | + |
| 218 | + if secret.data != data: |
| 219 | + return f'secret {name} data mismatch in namespace {namespace}.' |
| 220 | + |
| 221 | + if annotations is not None and not is_subset(secret.metadata.annotations, annotations): |
| 222 | + return f'secret {name} annotations mismatch in namespace {namespace}.' |
198 | 223 |
|
199 |
| - if secret is None or secret.data != data: |
200 |
| - return False |
| 224 | + if labels is not None and not is_subset(secret.metadata.labels, labels): |
| 225 | + return f'secret {name} labels mismatch in namespace {namespace}.' |
201 | 226 |
|
202 |
| - return True |
| 227 | + return None |
203 | 228 |
|
204 | 229 | return self.retry(validate)
|
205 | 230 |
|
206 |
| - def retry(self, f: Callable[[], bool]) -> bool: |
207 |
| - while self.retry_attempts > 0: |
208 |
| - if f(): |
| 231 | + def retry(self, f: Callable[[], Optional[str]]) -> bool: |
| 232 | + """ |
| 233 | + Utility function |
| 234 | + Parameters |
| 235 | + ---------- |
| 236 | + f |
| 237 | +
|
| 238 | + Returns |
| 239 | + ------- |
| 240 | +
|
| 241 | + """ |
| 242 | + retry: int = self.retry_attempts |
| 243 | + err: Optional[str] = None |
| 244 | + |
| 245 | + while retry > 0: |
| 246 | + err = f() |
| 247 | + if err is None: |
209 | 248 | return True
|
210 | 249 | sleep(self.retry_delay)
|
211 |
| - self.retry_attempts -= 1 |
| 250 | + retry -= 1 |
| 251 | + |
| 252 | + if err is not None: |
| 253 | + print(f"Retry attempts exhausted. Last error: {err}") |
212 | 254 | return False
|
213 | 255 |
|
214 | 256 | def cleanup(self):
|
|
0 commit comments