-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathutils.py
More file actions
353 lines (304 loc) · 13.4 KB
/
utils.py
File metadata and controls
353 lines (304 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import importlib
import inspect
import logging
import re
from typing import Any
from benedict import benedict
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import ConflictError, ResourceNotFoundError
from ocp_resources.image_digest_mirror_set import ImageDigestMirrorSet
from ocp_resources.installplan import InstallPlan
from ocp_resources.machine_config_pool import MachineConfigPool
from ocp_resources.network_addons_config import NetworkAddonsConfig
from ocp_resources.node import Node
from ocp_resources.operator_condition import OperatorCondition
from ocp_resources.resource import Resource, ResourceEditor
from ocp_resources.subscription import Subscription
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
from tests.install_upgrade_operators.constants import KEY_PATH_SEPARATOR
from utilities.constants import (
PRODUCTION_CATALOG_SOURCE,
TIMEOUT_1MIN,
TIMEOUT_5SEC,
TIMEOUT_10SEC,
TIMEOUT_30MIN,
TIMEOUT_40MIN,
)
from utilities.operator import wait_for_mcp_update_completion
LOGGER = logging.getLogger(__name__)
KONFLUX_IDMS_NAME = "zz-cnv-icsp-fallback"
KONFLUX_MIRROR_BASE_URL = "quay.io/openshift-virtualization/konflux-builds"
KONFLUX_IDMS_SOURCE = "registry.redhat.io/container-native-virtualization"
def wait_for_operator_condition(client, hco_namespace, name, upgradable):
LOGGER.info(f"Wait for the operator condition. Name:{name} Upgradable:{upgradable}")
samples = TimeoutSampler(
wait_timeout=TIMEOUT_30MIN,
sleep=TIMEOUT_10SEC,
func=OperatorCondition.get,
client=client,
namespace=hco_namespace,
name=name,
)
try:
for sample in samples:
for operator_condition in sample:
operator_spec_condition = operator_condition.instance.spec.conditions
if operator_spec_condition:
upgradeable_condition = next(
(condition for condition in operator_spec_condition if condition.type == "Upgradeable"),
None,
)
if upgradeable_condition is not None and upgradeable_condition.status == str(upgradable):
return operator_condition
else:
LOGGER.warning(f"Waiting for hco operator to update spec.conditions of OperatorCondition: {name}")
except TimeoutExpiredError:
LOGGER.error(f"timeout waiting for operator version: name={name}, upgradable:{upgradable}")
raise
def wait_for_install_plan(
client: DynamicClient,
hco_namespace: str,
hco_target_csv_name: str,
is_production_source: bool,
cnv_subscription: Subscription,
) -> Any:
"""Waits for the upgrade InstallPlan to be created and returns it.
Args:
client: Kubernetes dynamic client.
hco_namespace: HCO namespace name.
hco_target_csv_name: Expected target CSV name.
is_production_source: Whether upgrading from production source.
cnv_subscription: CNV subscription resource.
Returns:
The matching InstallPlan resource.
"""
LOGGER.info(f"Waiting for the upgrade install plan. hco_target_csv_name: {hco_target_csv_name}")
install_plan_sampler = TimeoutSampler(
wait_timeout=TIMEOUT_40MIN,
sleep=TIMEOUT_10SEC,
func=InstallPlan.get,
exceptions_dict={
ConflictError: [],
ResourceNotFoundError: [],
}, # Ignore ConflictError during install plan reconciliation
client=client,
hco_namespace=hco_namespace,
hco_target_version=hco_target_csv_name,
)
install_plan_name_in_subscription = None
try:
for install_plan_samples in install_plan_sampler:
# wait for the install plan to be created and updated in the subscription.
install_plan_name_in_subscription = getattr(cnv_subscription.instance.status.installplan, "name", None)
for ip in install_plan_samples:
# If we find a not-approved install plan that is associated with production catalogsource, we need
# to delete it. Deleting the install plan associated with production catalogsource, would cause
# install plan associated with custom catalog source to generate. Upgrade automation is supposed to
# upgrade cnv using custom catalogsource, to a specified version. Approving install plan associated
# with the production catalogsource would also lead to failure as production catalogsource has been
# disabled at this point.
if ip.exists:
ip_instance = ip.instance
if not is_production_source:
if (
not ip_instance.spec.approved
and ip_instance.status
and ip_instance.status.bundleLookups[0].get("catalogSourceRef").get("name")
== PRODUCTION_CATALOG_SOURCE
):
ip.clean_up()
continue
if (
hco_target_csv_name == ip_instance.spec.clusterServiceVersionNames[0]
and ip.name == install_plan_name_in_subscription
):
return ip
LOGGER.info(
f"Subscription: {cnv_subscription.name}, is associated with install plan:"
f" {install_plan_name_in_subscription}"
)
except TimeoutExpiredError:
LOGGER.error(
f"timeout waiting for target install plan: version={hco_target_csv_name}, "
f"subscription install plan: {install_plan_name_in_subscription}"
)
raise
def get_network_addon_config(admin_client):
"""
Gets NetworkAddonsConfig object
Args:
admin_client (DynamicClient): a DynamicClient object
Returns:
Generator of NetworkAddonsConfig: Generator of NetworkAddonsConfig
"""
for nao in NetworkAddonsConfig.get(client=admin_client, name="cluster"):
return nao
def wait_for_spec_change(expected, get_spec_func, base_path):
"""
Waits for spec values to get propagated
Args:
expected (dict): dictionary of values that would be used to update hco cr
get_spec_func (function): function to fetch current spec dictionary
base_path (list): list of associated keys for a given kind
"""
samplers = TimeoutSampler(
wait_timeout=TIMEOUT_1MIN,
sleep=TIMEOUT_5SEC,
func=lambda: benedict(get_spec_func(), keypath_separator=KEY_PATH_SEPARATOR),
)
current_value = None
try:
for current_spec in samplers:
current_value = current_spec.get(base_path)
if current_value and sorted(expected.items()) == sorted(current_value.items()):
LOGGER.info(
f"{get_function_name(function_name=get_spec_func)}: Found expected spec values: '{expected}'"
)
return True
except TimeoutExpiredError:
LOGGER.error(
f"{get_function_name(function_name=get_spec_func)}: Timed out waiting for CR with expected spec:"
f" '{expected}', current value:'{current_value}'"
)
raise
def get_function_name(function_name):
"""
Return the text of the source code for a function
Args:
function_name (function object): function object
Returns:
str: name of the function
"""
return inspect.getsource(function_name).split("(")[0].split(" ")[-1]
def get_resource_container_env_image_mismatch(container):
return [
env_dict
for env_dict in container.get("env", [])
if "image" in env_dict["name"].lower()
and env_dict.get("value")
and not re.match(
rf"NOT_AVAILABLE|{Resource.ApiGroup.IMAGE_REGISTRY}",
env_dict.get("value"),
)
]
def get_ocp_resource_module_name(related_object_kind, list_submodules):
"""
From a list of ocp_resources submodule, based on kubernetes 'kind' name pick the right module name
Args:
related_object_kind (str): Kubernetes kind name of a resource
list_submodules (list): list of ocp_resources submodule names
Returns:
str: Name of the ocp_resources submodule
Raises:
ModuleNotFoundError: if a module associated with related object kind is not found
"""
for module_name in list_submodules:
expected_module_name = module_name.replace("_", "")
if related_object_kind.lower() == expected_module_name:
return module_name
raise ModuleNotFoundError(f"{related_object_kind} module not found in ocp_resources")
def get_resource(related_obj, admin_client, module_name):
"""
Gets CR based on associated HCO.status.relatedObject entry and ocp_reources module name
Args:
related_obj (dict): Associated HCO.status.relatedObject dict
admin_client (DynamicClient): Dynamic client object
module_name (str): Associated ocp_reources module name to be used
Returns:
Resource: Associated cr object
Raises:
AssertionError: if a related object kind is not in module name
"""
kwargs = {"client": admin_client, "name": related_obj["name"]}
if related_obj["namespace"]:
kwargs["namespace"] = related_obj["namespace"]
module = importlib.import_module(f"ocp_resources.{module_name}")
cls_related_obj = getattr(module, related_obj["kind"], None)
assert cls_related_obj, f"class {related_obj['kind']} is not in {module_name}"
LOGGER.debug(f"reading class {related_obj['kind']} from module {module_name}")
return cls_related_obj(**kwargs)
def get_resource_from_module_name(related_obj, ocp_resources_submodule_list, admin_client):
"""
Gets resource object based on module name
Args:
related_obj (dict): Related object Dictionary
ocp_resources_submodule_list (list): list of submudule names associated with ocp_resources package
admin_client (DynamicClient): Dynamic client object
Returns:
Resource: Associated cr object
"""
module_name = get_ocp_resource_module_name(
related_object_kind=related_obj["kind"],
list_submodules=ocp_resources_submodule_list,
)
return get_resource(
admin_client=admin_client,
related_obj=related_obj,
module_name=module_name,
)
def get_resource_by_name(
resource_kind: Resource, name: str, admin_client: DynamicClient, namespace: str | None = None
) -> Resource:
kwargs = {"name": name}
if namespace:
kwargs["namespace"] = namespace
kwargs["client"] = admin_client
resource = resource_kind(**kwargs)
if resource.exists:
return resource
raise ResourceNotFoundError(f"{resource_kind} {name} not found.")
def get_resource_key_value(resource: Resource, key_name: str) -> Any:
return benedict(
resource.instance.to_dict()["spec"],
keypath_separator=KEY_PATH_SEPARATOR,
).get(key_name)
def apply_konflux_idms(
idms: ImageDigestMirrorSet,
required_mirrors: list[str],
machine_config_pools: list[MachineConfigPool],
mcp_conditions: dict[str, list[dict[str, str]]],
nodes: list[Node],
) -> None:
"""Creates or patches the Konflux IDMS with the required mirror entries.
Args:
idms: The Konflux IDMS resource to create or patch.
required_mirrors: Konflux mirror URLs to set on the IDMS.
machine_config_pools: Active machine config pools to pause/wait.
mcp_conditions: Initial MCP conditions for tracking update progress.
nodes: Cluster nodes to verify readiness after MCP update.
"""
image_digest_mirrors = [{"source": KONFLUX_IDMS_SOURCE, "mirrors": required_mirrors}]
LOGGER.info("Pausing MCP updates while modifying IDMS.")
with ResourceEditor(patches={mcp: {"spec": {"paused": True}} for mcp in machine_config_pools}):
if idms.exists:
LOGGER.info(f"Patching IDMS {KONFLUX_IDMS_NAME} with mirrors: {required_mirrors}")
ResourceEditor(patches={idms: {"spec": {"imageDigestMirrors": image_digest_mirrors}}}).update()
else:
LOGGER.info(f"Creating IDMS {KONFLUX_IDMS_NAME} with mirrors: {required_mirrors}")
ImageDigestMirrorSet(
name=KONFLUX_IDMS_NAME,
client=idms.client,
image_digest_mirrors=image_digest_mirrors,
teardown=False,
).deploy(wait=True)
LOGGER.info("Wait for MCP update after IDMS modification.")
wait_for_mcp_update_completion(
machine_config_pools_list=machine_config_pools,
initial_mcp_conditions=mcp_conditions,
nodes=nodes,
)
def idms_has_all_mirrors(idms: ImageDigestMirrorSet, required_mirrors: list[str]) -> bool:
"""Returns True if the IDMS already contains all required Konflux mirror entries."""
existing_mirrors = idms.instance.spec.imageDigestMirrors
source_entry = next(
(entry for entry in existing_mirrors if entry["source"] == KONFLUX_IDMS_SOURCE),
None,
)
if not source_entry:
LOGGER.info(
f"IDMS {idms.name} has no entry for source {KONFLUX_IDMS_SOURCE}, mirrors need to be added."
f" Current IDMS mirrors: {existing_mirrors}"
)
return False
existing_urls = {str(mirror) for mirror in source_entry["mirrors"]}
return all(mirror in existing_urls for mirror in required_mirrors)