forked from zakkg3/ClusterSecret
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkubernetes_utils.py
354 lines (299 loc) · 11.4 KB
/
kubernetes_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List, Mapping, Tuple, Iterator
import re
import kopf
from kubernetes.client import CoreV1Api, CustomObjectsApi, exceptions, V1ObjectMeta, rest, V1Secret
from os_utils import get_blocked_labels, get_replace_existing, get_version
from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLOCKED_ANNOTATIONS, \
CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL
def patch_clustersecret_status(
logger: logging.Logger,
name: str,
new_status,
custom_objects_api: CustomObjectsApi,
):
"""Patch the status of a given clustersecret object
"""
group = 'clustersecret.io'
version = 'v1'
plural = 'clustersecrets'
# Retrieve the clustersecret object
clustersecret = custom_objects_api.get_cluster_custom_object(
group=group,
version=version,
plural=plural,
name=name,
)
# Update the status field
clustersecret['status'] = new_status
logger.debug(f'Updated clustersecret manifest: {clustersecret}')
# Perform a patch operation to update the custom resource
return custom_objects_api.patch_cluster_custom_object(
group=group,
version=version,
plural=plural,
name=name,
body=clustersecret,
)
def get_ns_list(
logger: logging.Logger,
body: Dict[str, Any],
v1: CoreV1Api,
) -> List[str]:
"""Returns a list of namespaces where the secret should be matched
"""
# Get matchNamespace or default to all
match_namespace = body.get('matchNamespace', ['.*'])
# Get avoidNamespaces or default to None
avoid_namespaces = body.get('avoidNamespaces', None)
# Collect all namespaces names
nss = [ns.metadata.name for ns in v1.list_namespace().items]
matched_ns = []
avoided_ns = []
# Iterate over all matchNamespace
for match_ns in match_namespace:
matched_ns.extend([ns for ns in nss if re.match(match_ns, ns)])
logger.debug(f'Matched namespaces: {", ".join(matched_ns)} match pattern: {match_ns}')
# If avoidNamespaces is None simply return our matched list
if not avoid_namespaces:
return matched_ns
# Iterate over all avoidNamespaces
for avoid_ns in avoid_namespaces:
avoided_ns.extend([ns for ns in nss if re.match(avoid_ns, ns)])
logger.debug(f'Skipping namespaces: {", ".join(avoided_ns)} avoid pattern: {avoid_ns}')
return list(set(matched_ns) - set(avoided_ns))
def read_data_secret(
logger: logging.Logger,
name: str,
namespace: str,
v1: CoreV1Api,
) -> Dict[str, str]:
"""Gets the data from the 'name' secret in namespace
"""
data = {}
logger.debug(f'Reading {name} from ns {namespace}')
try:
secret = v1.read_namespaced_secret(name, namespace)
logger.debug(f'Obtained secret {secret}')
data = secret.data
except exceptions.ApiException as e:
logger.error('Error reading secret')
logger.debug(f'error: {e}')
if e == '404':
logger.error(f'Secret {name} in ns {namespace} not found.')
raise kopf.TemporaryError('Error reading secret')
return data
def delete_secret(
logger: logging.Logger,
namespace: str,
name: str,
v1: CoreV1Api,
):
"""Deletes a given secret from a given namespace
"""
logger.info(f'deleting secret {name} from namespace {namespace}')
try:
v1.delete_namespaced_secret(name, namespace)
except rest.ApiException as e:
if e.status == 404:
logger.warning(f'The namespace {namespace} may not exist anymore: Not found')
else:
logger.warning('Something weird deleting the secret')
logger.debug(f'details: {e}')
def secret_exists(
logger: logging.Logger,
name: str,
namespace: str,
v1: CoreV1Api,
):
return secret_metadata(
logger=logger,
name=name,
namespace=namespace,
v1=v1,
) is not None
def secret_metadata(
logger: logging.Logger,
name: str,
namespace: str,
v1: CoreV1Api,
) -> Optional[V1ObjectMeta]:
try:
secret = v1.read_namespaced_secret(name, namespace)
return secret.metadata
except exceptions.ApiException as e:
if e.status == 404:
return None
logger.warning(f'Cannot read the secret {e}.')
raise kopf.TemporaryError(f'Error reading secret {e}')
def sync_secret(
logger: logging.Logger,
namespace: str,
body: Dict[str, Any],
v1: CoreV1Api,
):
"""Creates a given secret on a given namespace
"""
if 'metadata' not in body:
raise kopf.TemporaryError('Metadata is required.')
if 'name' not in body['metadata']:
raise kopf.TemporaryError('Property name is missing in metadata.')
cs_metadata: Dict[str, Any] = body.get('metadata')
sec_name = cs_metadata.get('name')
annotations = cs_metadata.get('annotations', None)
labels = cs_metadata.get('labels', None)
if 'data' not in body:
raise kopf.TemporaryError('Property data is missing.')
data: Dict[str, Any] = body['data']
if 'valueFrom' in data:
if len(data.keys()) > 1:
logger.error('Data keys with ValueFrom error, enable debug for more details')
logger.debug(f'keys: {data.keys()} len {len(data.keys())}')
raise kopf.TemporaryError('ValueFrom can not coexist with other keys in the data')
secret_key_ref: Dict[str, Any] = data.get('valueFrom', {}).get('secretKeyRef', {})
ns_from: str = secret_key_ref.get('namespace', None)
name_from: str = secret_key_ref.get('name', None)
keys: Optional[List[str]] = secret_key_ref.get('keys', None)
if ns_from is None or name_from is None:
logger.error('ERROR reading data from remote secret, enable debug for more details')
logger.debug(f'Deta details: {data}')
raise kopf.TemporaryError('Can not get Values from external secret')
# Filter the keys in data based on the keys list provided
raw_data = read_data_secret(logger, name_from, ns_from, v1)
if keys is not None:
data = {key: value for key, value in raw_data.items() if key in keys}
else:
data = raw_data
logger.debug(f'Going to create with data: {data}')
secret_type = body.get('type', 'Opaque')
body = V1Secret()
body.metadata = create_secret_metadata(
name=sec_name,
namespace=namespace,
annotations=annotations,
labels=labels,
)
body.type = secret_type
body.data = data
logger.info(f'cloning secret in namespace {namespace}')
logger.debug(f'V1Secret= {body}')
try:
# Get metadata from secrets (if exist)
metadata = secret_metadata(logger, name=sec_name, namespace=namespace, v1=v1)
# If nothing returned, the secret does not exist, creating it then
if metadata is None:
logger.info('Using create_namespaced_secret')
logger.debug(f'response is {v1.create_namespaced_secret(namespace, body)}')
return
if metadata.annotations is None:
logger.info(
f'secret `{sec_name}` exist but it does not have annotations, so is not managed by ClusterSecret',
)
# If we should not overwrite existing secrets
if not get_replace_existing():
logger.info(
f'secret `{sec_name}` will not be replaced. '
'You can enforce this by setting env REPLACE_EXISTING to true.',
)
return
elif metadata.annotations.get(CREATE_BY_ANNOTATION) is None:
logger.error(
f"secret `{sec_name}` already exist in namespace '{namespace}' and is not managed by ClusterSecret",
)
if not get_replace_existing():
logger.info(
f'secret `{sec_name}` will not be replaced. '
'You can enforce this by setting env REPLACE_EXISTING to true.',
)
return
logger.info(f'Replacing secret {sec_name}')
v1.replace_namespaced_secret(
name=sec_name,
namespace=namespace,
body=body,
)
except rest.ApiException as e:
logger.error('Can not create a secret, it is base64 encoded? enable debug for details')
logger.debug(f'data: {data}')
logger.debug(f'Kube exception {e}')
def create_secret_metadata(
name: str,
namespace: str,
annotations: Optional[Mapping[str, str]] = None,
labels: Optional[Mapping[str, str]] = None,
) -> V1ObjectMeta:
"""Create Kubernetes metadata objects.
Parameters
----------
name: str
The name of the Kubernetes secret.
namespace: str
The namespace where the secret will be place.
labels: Optional[Dict[str, str]]
The secret labels.
annotations: Optional[Dict[str, str]]
The secrets annotations.
Returns
-------
V1ObjectMeta
Kubernetes metadata object with ClusterSecret annotations.
"""
def filter_dict(
prefixes: List[str],
base: Dict[str, str],
source: Optional[Mapping[str, str]] = None
) -> Iterator[Tuple[str, str]]:
""" Remove potential useless / dangerous annotations and labels"""
for item in base.items():
yield item
if source is not None:
for item in source.items():
key, _ = item
if not any(key.startswith(prefix) for prefix in prefixes):
yield item
base_labels = {
CLUSTER_SECRET_LABEL: 'true'
}
base_annotations = {
CREATE_BY_ANNOTATION: CREATE_BY_AUTHOR,
VERSION_ANNOTATION: get_version(),
LAST_SYNC_ANNOTATION: datetime.now().isoformat(),
}
_annotations = filter_dict(BLOCKED_ANNOTATIONS, base_annotations, annotations)
_labels = filter_dict(get_blocked_labels(), base_labels, labels)
return V1ObjectMeta(
name=name,
namespace=namespace,
annotations=dict(_annotations),
labels=dict(_labels),
)
def get_custom_objects_by_kind(
group: str,
version: str,
plural: str,
custom_objects_api: CustomObjectsApi,
) -> List[dict]:
"""
Retrieve all CustomObjectsApi objects across all namespaces based on the provided group, version, and kind.
Args:
group (str): The API group of the custom object.
version (str): The API version of the custom object.
plural (str): The plural of the custom object.
custom_objects_api (CustomObjectsApi): The Kubernetes CustomObjectsApi.
Returns:
List[dict]: A list of custom objects (in dict format) matching the provided group, version, and plural.
Raises:
ApiException: If there is an issue communicating with the Kubernetes API server.
"""
try:
# Retrieve all custom objects matching the group, version, and kind
custom_objects = custom_objects_api.list_cluster_custom_object(
group=group,
version=version,
plural=plural,
)
return custom_objects['items']
except rest.ApiException as e:
# Properly handle API exceptions
raise rest.ApiException(f'Error while retrieving custom objects: {e}')