forked from RedHatQE/openshift-virtualization-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinfra.py
More file actions
1703 lines (1394 loc) · 61.4 KB
/
infra.py
File metadata and controls
1703 lines (1394 loc) · 61.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import base64
import io
import json
import logging
import os
import platform
import re
import shlex
import ssl
import stat
import subprocess
import tarfile
import tempfile
import time
import zipfile
from contextlib import contextmanager
from functools import cache
from subprocess import PIPE, CalledProcessError, Popen
from typing import Any
import netaddr
import paramiko
import pytest
import requests
import urllib3
import yaml
from jira import JIRA
from kubernetes.client import ApiException
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError
from ocp_resources.cluster_service_version import ClusterServiceVersion
from ocp_resources.cluster_version import ClusterVersion
from ocp_resources.config_map import ConfigMap
from ocp_resources.console_cli_download import ConsoleCLIDownload
from ocp_resources.daemonset import DaemonSet
from ocp_resources.deployment import Deployment
from ocp_resources.endpoints import Endpoints
from ocp_resources.exceptions import ResourceTeardownError
from ocp_resources.hyperconverged import HyperConverged
from ocp_resources.infrastructure import Infrastructure
from ocp_resources.mutating_webhook_config import MutatingWebhookConfiguration
from ocp_resources.namespace import Namespace
from ocp_resources.node import Node
from ocp_resources.package_manifest import PackageManifest
from ocp_resources.pod import Pod
from ocp_resources.project_request import ProjectRequest
from ocp_resources.resource import Resource, ResourceEditor, get_client
from ocp_resources.secret import Secret
from ocp_resources.subscription import Subscription
from ocp_resources.validating_webhook_config import ValidatingWebhookConfiguration
from ocp_resources.virtual_machine import VirtualMachine
from ocp_utilities.exceptions import NodeNotReadyError, NodeUnschedulableError
from ocp_utilities.infra import (
assert_nodes_in_healthy_condition,
assert_nodes_schedulable,
)
from packaging.version import Version
from pyhelper_utils.shell import run_command
from pytest_testconfig import config as py_config
from requests import HTTPError, Timeout, TooManyRedirects
from timeout_sampler import TimeoutExpiredError, TimeoutSampler, retry
import utilities.virt
from utilities.constants import (
AMD_64,
ARTIFACTORY_SECRET_NAME,
AUDIT_LOGS_PATH,
CLUSTER,
CPU_MODEL_LABEL_PREFIX,
EXCLUDED_CPU_MODELS,
EXCLUDED_OLD_CPU_MODELS,
HCO_CATALOG_SOURCE,
IMAGE_CRON_STR,
KUBECONFIG,
KUBELET_READY_CONDITION,
KUBERNETES_ARCH_LABEL,
NET_UTIL_CONTAINER_IMAGE,
OC_ADM_LOGS_COMMAND,
PROMETHEUS_K8S,
SANITY_TESTS_FAILURE,
TIMEOUT_1MIN,
TIMEOUT_2MIN,
TIMEOUT_5MIN,
TIMEOUT_5SEC,
TIMEOUT_6MIN,
TIMEOUT_10MIN,
TIMEOUT_10SEC,
TIMEOUT_30SEC,
VIRTCTL,
X86_64,
NamespacesNames,
)
from utilities.data_collector import (
collect_default_cnv_must_gather_with_vm_gather,
get_data_collector_dir,
write_to_file,
)
from utilities.exceptions import (
ClusterSanityError,
MissingEnvironmentVariableError,
OsDictNotFoundError,
StorageSanityError,
UrlNotFoundError,
UtilityPodNotFoundError,
)
from utilities.hco import wait_for_hco_conditions
from utilities.ssp import guest_agent_version_parser
from utilities.storage import get_test_artifact_server_url
JIRA_STATUS_CLOSED = ("on_qa", "verified", "release pending", "closed")
NON_EXIST_URL = "https://noneexist.test" # Use 'test' domain rfc6761
EXCLUDED_FROM_URL_VALIDATION = ("", NON_EXIST_URL)
INTERNAL_HTTP_SERVER_ADDRESS = "internal-http.cnv-tests-utilities"
HOST_MODEL_CPU_LABEL = f"host-model-cpu.node.{Resource.ApiGroup.KUBEVIRT_IO}"
LOGGER = logging.getLogger(__name__)
def label_project(name, label, admin_client):
ns = Namespace(client=admin_client, name=name, ensure_exists=True)
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=TIMEOUT_2MIN)
ResourceEditor({ns: {"metadata": {"labels": label}}}).update()
def create_ns(
name: str,
admin_client: DynamicClient,
unprivileged_client: DynamicClient | None = None,
labels: dict[str, str] | None = None,
teardown: bool = True,
delete_timeout: int = TIMEOUT_6MIN,
):
"""
For kubemacpool labeling opt-modes, provide kmp_vm_label and admin_client as admin_client
"""
if not unprivileged_client:
with Namespace(
client=admin_client,
name=name,
label=labels,
teardown=teardown,
delete_timeout=delete_timeout,
) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=TIMEOUT_2MIN)
yield ns
else:
ProjectRequest(name=name, client=unprivileged_client, teardown=teardown).deploy()
label_project(name=name, label=labels, admin_client=admin_client)
ns = Namespace(client=unprivileged_client, name=name, ensure_exists=True)
yield ns
ns.client = admin_client
if teardown and not ns.clean_up():
raise ResourceTeardownError(resource=ns)
class ClusterHosts:
class Type:
VIRTUAL = "virtual"
PHYSICAL = "physical"
def url_excluded_from_validation(url):
# Negative URL test cases or internal http server
return url in EXCLUDED_FROM_URL_VALIDATION or INTERNAL_HTTP_SERVER_ADDRESS in url
def camelcase_to_mixedcase(camelcase_str):
# Utility to convert CamelCase to mixedCase
# Example: Service type may be NodePort but in VM attributes.spec.ports it is nodePort
return camelcase_str[0].lower() + camelcase_str[1:]
def get_pod_by_name_prefix(dyn_client, pod_prefix, namespace, get_all=False):
"""
Args:
dyn_client (DynamicClient): OCP Client to use.
pod_prefix (str): str or regex pattern.
namespace (str): Namespace name.
get_all (bool): Return all pods if True else only the first one.
Returns:
list or Pod: A list of all matching pods if get_all else only the first pod.
Raises:
ResourceNotFoundError: if no pods are found.
"""
pods = [pod for pod in Pod.get(dyn_client=dyn_client, namespace=namespace) if re.match(pod_prefix, pod.name)]
if get_all:
return pods # Some negative cases check if no pods exists.
elif pods:
return pods[0]
raise ResourceNotFoundError(f"A pod with the {pod_prefix} prefix does not exist")
def generate_namespace_name(file_path):
return (file_path.strip(".py").replace("/", "-").replace("_", "-"))[-63:].split("-", 1)[-1]
def generate_latest_os_dict(os_list):
"""
Get latest os dict.
Args:
os_list (list): [<os-name>]_os_matrix - a list of dicts.
Returns:
dict: {Latest OS name: latest supported OS dict} else raises an exception.
Raises:
OsDictNotFoundError: If no os matched.
"""
for os_dict in os_list:
for os_version, os_values in os_dict.items():
if os_values.get("latest_released"):
return {os_version: os_values}
raise OsDictNotFoundError(f"No OS is marked as 'latest_released': {os_list}")
def get_latest_os_dict_list(os_list):
"""
Get latest os dict generated by 'generate_latest_os_dict()'
This will extract the dict from `generate_latest_os_dict()` without the name key.
Args:
os_list (list): [rhel|windows|fedora]_os_matrix - a list of dicts
Returns:
list: List of oses dict [{latest supported OS dict}]
"""
res = []
for _os in os_list:
res.append(list(generate_latest_os_dict(os_list=_os).values())[0])
return res
def base64_encode_str(text):
return base64.b64encode(text.encode()).decode()
def private_to_public_key(key):
return paramiko.RSAKey.from_private_key_file(key).get_base64()
def name_prefix(name):
return name.split(".")[0]
def authorized_key(private_key_path):
return f"ssh-rsa {private_to_public_key(key=private_key_path)} root@exec1.rdocloud"
def get_jira_status(jira):
url = os.getenv("PYTEST_JIRA_URL")
token = os.getenv("PYTEST_JIRA_TOKEN")
email = os.getenv("PYTEST_JIRA_USERNAME")
if not (token and url and email):
# For conformance tests without JIRA credentials, assume the JIRA is open
if py_config.get("conformance_tests"):
LOGGER.info(f"Conformance tests without JIRA credentials: assuming {jira} is open")
return "open"
raise MissingEnvironmentVariableError(
"Please set PYTEST_JIRA_TOKEN, PYTEST_JIRA_URL and PYTEST_JIRA_USERNAME environment variables"
)
jira_connection = JIRA(
server=url,
basic_auth=(email, token),
)
status = jira_connection.issue(id=jira).fields.status.name.lower()
LOGGER.info(f"Jira {jira}: status is {status}")
return status
def get_pods(dyn_client: DynamicClient, namespace: Namespace, label: str = "") -> list[Pod]:
return list(
Pod.get(
dyn_client=dyn_client,
namespace=namespace.name,
label_selector=label,
)
)
def wait_for_pods_deletion(pods):
for pod in pods:
pod.wait_deleted()
def get_pod_container_error_status(pod: Pod) -> str | None:
try:
pod_instance_status = pod.instance.status
# Check the containerStatuses and if any container is in waiting state, return that information:
for container_status in pod_instance_status.get("containerStatuses", []):
if waiting_container := container_status.get("state", {}).get("waiting"):
return waiting_container["reason"] if waiting_container.get("reason") else waiting_container
return None
except NotFoundError:
LOGGER.error(f"Pod {pod.name} was not found")
raise
def get_not_running_pods(pods: list[Pod], filter_pods_by_name: str = "") -> list[dict[str, str]]:
pods_not_running = []
for pod in pods:
if filter_pods_by_name and filter_pods_by_name in pod.name:
LOGGER.warning(f"Ignoring pod: {pod.name} for pod state validations.")
continue
try:
pod_instance = pod.instance
# Waits for all pods in a given namespace to be in final healthy state(running/completed).
# We also need to keep track of pods marked for deletion as not running. This would ensure any
# pod that was spinned up in place of pod marked for deletion, reaches healthy state before end
# of this check
if pod_instance.metadata.get("deletionTimestamp") or pod_instance.status.phase not in (
pod.Status.RUNNING,
pod.Status.SUCCEEDED,
):
pods_not_running.append({pod.name: pod.status})
elif container_status_error := get_pod_container_error_status(pod=pod):
pods_not_running.append({pod.name: container_status_error})
except (ResourceNotFoundError, NotFoundError):
LOGGER.warning(f"Ignoring pod {pod.name} that disappeared during cluster sanity check")
pods_not_running.append({pod.name: "Deleted"})
return pods_not_running
def wait_for_pods_running(
admin_client: DynamicClient,
namespace: Namespace,
number_of_consecutive_checks: int = 1,
filter_pods_by_name: str = "",
) -> None:
"""
Waits for all pods in a given namespace to reach Running/Completed state. To avoid catching all pods in running
state too soon, use number_of_consecutive_checks with appropriate values.
Args:
admin_client(DynamicClient): Dynamic client
namespace(Namespace): A namespace object
number_of_consecutive_checks(int): Number of times to check for all pods in running state
filter_pods_by_name(str): string to filter pod names by
Raises:
TimeoutExpiredError: Raises TimeoutExpiredError if any of the pods in the given namespace are not in Running
state
"""
samples = TimeoutSampler(
wait_timeout=TIMEOUT_2MIN,
sleep=TIMEOUT_5SEC,
func=get_pods,
dyn_client=admin_client,
namespace=namespace,
exceptions_dict={NotFoundError: []},
)
not_running_pods = []
try:
current_check = 0
for sample in samples:
if sample:
if not_running_pods := get_not_running_pods(pods=sample, filter_pods_by_name=filter_pods_by_name):
LOGGER.warning(f"Not running pods: {not_running_pods}")
current_check = 0
else:
current_check += 1
if current_check >= number_of_consecutive_checks:
return
except TimeoutExpiredError:
if not_running_pods:
LOGGER.error(
f"timeout waiting for all pods in namespace {namespace.name} to reach "
f"running state, following pods are in not running state: {not_running_pods}"
)
raise
def get_daemonset_by_name(admin_client, daemonset_name, namespace_name):
"""
Gets a daemonset object by name
Args:
admin_client (DynamicClient): a DynamicClient object
daemonset_name (str): Name of the daemonset
namespace_name (str): Name of the associated namespace
Returns:
Daemonset: Daemonset object
"""
daemon_set = DaemonSet(
client=admin_client,
namespace=namespace_name,
name=daemonset_name,
)
if daemon_set.exists:
return daemon_set
raise ResourceNotFoundError(f"Daemonset: {daemonset_name} not found in namespace: {namespace_name}")
def wait_for_consistent_resource_conditions(
dynamic_client,
expected_conditions,
resource_kind,
stop_conditions=None,
condition_key1="type",
condition_key2="status",
namespace=None,
total_timeout=TIMEOUT_10MIN,
polling_interval=5,
consecutive_checks_count=10,
exceptions_dict=None,
resource_name=None,
):
"""This function awaits certain conditions of a given resource_kind (HCO, CSV, etc.).
Using TimeoutSampler loop and poll the CR (of the resource_kind type) and attempt to match the expected conditions
against the actual conditions found in the CR.
Since the conditions statuses might change, we use consecutive checks in order to have consistent results (stable),
thereby ascertaining that the expected conditions are met over time.
Args:
dynamic_client (DynamicClient): admin client
namespace (str, default: None): resource namespace. Not needed for cluster-scoped resources.
expected_conditions (dict): a dict comprises expected conditions to meet, for example:
{<condition key's value>: <condition key's value>,
Resource.Condition.AVAILABLE: Resource.Condition.Status.TRUE,}
stop_conditions (dict, optional): A dict comprising conditions that should not be met.
The keys represent the value of the `type` field in a condition, and the values represent the value of the
`reason` field, for example:
{<condition type key's value>: <condition reason key's value>}
If any of the stop condition are met, the function will not wait for the expected_conditions.
resource_kind (Resource): (e.g. HyperConverged, ClusterServiceVersion)
condition_key1 (str): the key of the first condition in the actual resource_kind (e.g. type, reason, status)
condition_key2 (str): the key of the second condition in the actual resource_kind (e.g. type, reason, status)
total_timeout (int): total timeout to wait for (seconds)
polling_interval (int): the time to sleep after each iteration (seconds)
consecutive_checks_count (int): the number of repetitions for the status check to make sure the transition is
done.
The default value for this argument is not absolute, and there are situations in which it should be higher
in order to ascertain the consistency of the Ready status.
Possible situations:
1. the resource is in a Ready status, because the process (that should cause
the change in its state) has not started yet.
2. some components are in Ready status, but others have not started the process yet.
exceptions_dict: TimeoutSampler exceptions_dict
Raises:
TimeoutExpiredError: raised when expected conditions are not met within the timeframe
"""
samples = TimeoutSampler(
wait_timeout=total_timeout,
sleep=polling_interval,
func=lambda: list(
resource_kind.get(
dyn_client=dynamic_client,
namespace=namespace,
name=resource_name,
)
),
exceptions_dict=exceptions_dict,
)
current_check = 0
actual_conditions = {}
LOGGER.info(
f"Waiting for resource to stabilize: resource_kind={resource_kind.__name__} conditions={expected_conditions} "
f"sleep={total_timeout} consecutive_checks_count={consecutive_checks_count}"
)
try:
for sample in samples:
status_conditions = sample[0].instance.get("status", {}).get("conditions")
if status_conditions:
actual_conditions = {
condition[condition_key1]: condition[condition_key2]
for condition in status_conditions
if condition[condition_key1] in expected_conditions
}
if actual_conditions == expected_conditions:
current_check += 1
if current_check >= consecutive_checks_count:
return
continue
else:
current_check = 0
if stop_conditions:
actual_conditions = {condition["type"]: condition["reason"] for condition in status_conditions}
matched_stop_conditions = {
type: reason
for type, reason in stop_conditions.items()
if type in actual_conditions and actual_conditions[type] == reason
}
if matched_stop_conditions:
LOGGER.error(
f"Execution halted due to matched stop conditions: {matched_stop_conditions}. "
f"Current status conditions: {status_conditions}."
)
raise TimeoutExpiredError(
f"Stop condition met for {resource_kind.__name__}/{resource_name}."
)
except TimeoutExpiredError:
LOGGER.error(
f"Timeout expired meeting conditions for resource: resource={resource_kind.kind} "
f"expected_conditions={expected_conditions} status_conditions={actual_conditions}"
)
raise
def raise_multiple_exceptions(exceptions):
"""Raising multiple exceptions
To be used when multiple exceptions need to be raised, for example when using TimeoutSampler,
and additional information should be added (so it is viewable in junit report).
Example:
except TimeoutExpiredError as exp:
raise_multiple_exceptions(
exceptions=[
ValueError(f"Error message: {output}"),
exp,
]
)
Args:
exceptions (list): List of exceptions to be raised. The 1st exception will appear in pytest error message;
all exceptions will appear in the stacktrace.
"""
# After all exceptions were raised
if not exceptions:
return
try:
raise exceptions.pop()
finally:
raise_multiple_exceptions(exceptions=exceptions)
def get_node_pod(utility_pods, node):
"""
This function will return a pod based on the node specified as an argument.
Args:
utility_pods (list): List of utility pods.
node (Node or str): Node to get the pod for it.
"""
_node_name = node.name if hasattr(node, "name") else node
for pod in utility_pods:
if pod.node.name == _node_name:
return pod
class ExecCommandOnPod:
def __init__(self, utility_pods, node):
"""
Run command on pod with chroot /host
Args:
utility_pods (list): List of utility pods resources.
node (Node): Node resource.
Returns:
str: Command output
"""
self.pod = get_node_pod(utility_pods=utility_pods, node=node)
if not self.pod:
raise UtilityPodNotFoundError(node=node.name)
def exec(self, command, chroot_host=True, ignore_rc=False, timeout=TIMEOUT_1MIN):
chroot_command = "chroot /host" if chroot_host else ""
_command = shlex.split(f"{chroot_command} bash -c {shlex.quote(command)}")
return self.pod.execute(command=_command, ignore_rc=ignore_rc, timeout=timeout).strip()
def get_interface_ip(self, interface):
out = self.exec(command=f"ip addr show {interface}")
match_ip = re.search(r"[0-9]+(?:\.[0-9]+){3}", out)
if match_ip:
interface_ip = match_ip.group()
if netaddr.valid_ipv4(interface_ip):
return interface_ip
@property
def reboot(self):
try:
self.exec(command="sudo echo b > /proc/sysrq-trigger")
except ApiException:
return True
return False
@property
def is_connective(self):
return self.exec(command="ls")
def interface_status(self, interface):
return self.exec(command=f"cat /sys/class/net/{interface}/operstate")
@property
def release_info(self):
out = self.exec(command="cat /etc/os-release")
release_info = {}
for line in out.strip().splitlines():
values = line.split("=", 1)
if len(values) != 2:
continue
release_info[values[0].strip()] = values[1].strip(" \"'")
return release_info
def storage_sanity_check(cluster_storage_classes_names):
config_sc = list([[*csc][0] for csc in py_config["storage_class_matrix"]])
exists_sc = [scn for scn in config_sc if scn in cluster_storage_classes_names]
if sorted(config_sc) != sorted(exists_sc):
LOGGER.error(f"Expected {config_sc}, On cluster {exists_sc}")
return False
return True
def cluster_sanity(
request,
admin_client,
cluster_storage_classes_names,
nodes,
hco_namespace,
hco_status_conditions,
expected_hco_status,
junitxml_property=None,
):
if "cluster_health_check" in request.config.getoption("-m"):
LOGGER.warning("Skipping cluster sanity test, got -m cluster_health_check")
return
skip_cluster_sanity_check = "--cluster-sanity-skip-check"
skip_storage_classes_check = "--cluster-sanity-skip-storage-check"
skip_nodes_check = "--cluster-sanity-skip-nodes-check"
skip_webhook_check = "--cluster-sanity-skip-webhook-check"
exceptions_filename = "cluster_sanity_failure.txt"
try:
if request.session.config.getoption(skip_cluster_sanity_check):
LOGGER.warning(f"Skipping cluster sanity check, got {skip_cluster_sanity_check}")
return
LOGGER.info(
f"Running cluster sanity. (To skip cluster sanity check pass {skip_cluster_sanity_check} to pytest)"
)
# Check storage class only if --cluster-sanity-skip-storage-check not passed to pytest.
if request.session.config.getoption(skip_storage_classes_check):
LOGGER.warning(f"Skipping storage classes check, got {skip_storage_classes_check}")
else:
LOGGER.info(
f"Check storage classes sanity. (To skip storage class sanity check pass {skip_storage_classes_check} "
f"to pytest)"
)
if not storage_sanity_check(cluster_storage_classes_names=cluster_storage_classes_names):
raise StorageSanityError(
err_str=f"Cluster is missing storage class.\n"
f"either run with '--storage-class-matrix' or with '{skip_storage_classes_check}'"
)
# Check nodes only if --cluster-sanity-skip-nodes-check not passed to pytest.
if request.session.config.getoption(skip_nodes_check):
LOGGER.warning(f"Skipping nodes check, got {skip_nodes_check}")
else:
# validate that all the nodes are ready and schedulable and CNV pods are running
LOGGER.info(f"Check nodes sanity. (To skip nodes sanity check pass {skip_nodes_check} to pytest)")
assert_nodes_in_healthy_condition(nodes=nodes, healthy_node_condition_type=KUBELET_READY_CONDITION)
assert_nodes_schedulable(nodes=nodes)
try:
wait_for_pods_running(
admin_client=admin_client,
namespace=hco_namespace,
filter_pods_by_name=IMAGE_CRON_STR,
)
except TimeoutExpiredError as timeout_error:
LOGGER.error(timeout_error)
raise ClusterSanityError(
err_str=f"Timed out waiting for all pods in namespace {hco_namespace.name} to get to running state."
)
# Check webhook endpoints only if --cluster-sanity-skip-webhook-check not passed to pytest.
if request.session.config.getoption(skip_webhook_check):
LOGGER.warning(f"Skipping webhook health check, got {skip_webhook_check}")
else:
LOGGER.info(f"Check webhook endpoints health. (To skip webhook check pass {skip_webhook_check} to pytest)")
check_webhook_endpoints_health(admin_client=admin_client, namespace=hco_namespace)
check_vm_creation_capability(admin_client=admin_client, namespace="default")
# Wait for hco to be healthy
wait_for_hco_conditions(
admin_client=admin_client,
hco_namespace=hco_namespace,
)
except (ClusterSanityError, NodeUnschedulableError, NodeNotReadyError, StorageSanityError) as ex:
exit_pytest_execution(
filename=exceptions_filename,
message=str(ex),
junitxml_property=junitxml_property,
)
def _discover_webhook_services(admin_client: DynamicClient, namespace: Namespace) -> set[str]:
"""
Discover all webhook services in the HCO namespace.
Scans all MutatingWebhookConfiguration and ValidatingWebhookConfiguration resources
and extracts service names that point to the namespace.
Args:
admin_client: Kubernetes dynamic client with admin privileges for cluster operations.
namespace: Namespace resource.
Returns:
Set of service names that are referenced by webhook configurations in the namespace.
"""
webhook_services: set[str] = set()
for webhook_kind in [MutatingWebhookConfiguration, ValidatingWebhookConfiguration]:
LOGGER.info(f"Scanning {webhook_kind.kind} resources for webhook services")
for webhook in webhook_kind.get(client=admin_client):
webhook_items = webhook.instance.webhooks or []
if not webhook_items:
LOGGER.warning(f"Webhook configuration {webhook.name} has no webhooks")
continue
for webhook_item in webhook_items:
service_config = webhook_item.get("clientConfig", {}).get("service")
if not service_config:
continue
if service_config["namespace"] == namespace.name:
webhook_services.add(service_config["name"])
return webhook_services
def check_webhook_endpoints_health(admin_client: DynamicClient, namespace: Namespace) -> None:
"""
Check that all webhook services in the HCO namespace have available endpoints.
Args:
admin_client: Kubernetes dynamic client with admin privileges for cluster operations.
namespace: Namespace resource.
Raises:
ClusterSanityError: When any webhook service has no ready endpoint addresses.
"""
LOGGER.info(f"Checking webhook endpoints health for services in namespace: {namespace.name}")
webhook_services = _discover_webhook_services(admin_client=admin_client, namespace=namespace)
if not webhook_services:
LOGGER.warning(f"No webhook services discovered in namespace {namespace.name}")
return
services_without_endpoints = []
for service_name in sorted(webhook_services):
LOGGER.info(f"Checking endpoints for service: {service_name}")
try:
endpoint = Endpoints(
name=service_name,
namespace=namespace.name,
client=admin_client,
ensure_exists=True,
)
subsets = endpoint.instance.subsets
if not subsets:
LOGGER.error(f"No subsets found in endpoints for service: {service_name}")
services_without_endpoints.append(service_name)
continue
for subset in subsets:
if addresses := getattr(subset, "addresses", None):
LOGGER.info(f"Service {service_name} has {len(addresses)} ready endpoint address(es)")
break
else:
LOGGER.error(f"No ready addresses found in endpoints for service: {service_name}")
services_without_endpoints.append(service_name)
except ResourceNotFoundError:
LOGGER.error(f"Endpoints resource not found for service: {service_name}")
services_without_endpoints.append(service_name)
except ApiException as ex:
LOGGER.error(f"API error checking endpoints for service {service_name}: {ex}")
services_without_endpoints.append(service_name)
if services_without_endpoints:
raise ClusterSanityError(
err_str=f"Webhook services have no available endpoints: {', '.join(services_without_endpoints)}. "
"Check that the corresponding pods are running."
)
LOGGER.info("All discovered webhook services have available endpoints")
def check_vm_creation_capability(admin_client: DynamicClient, namespace: str) -> None:
"""
Verify VM creation capability by performing a dry-run VM creation.
Args:
admin_client: Kubernetes dynamic client with admin privileges for cluster operations.
namespace: str
Raises:
ClusterSanityError: When dry-run VM creation fails.
"""
LOGGER.info(f"Checking VM creation capability via dry-run in namespace: {namespace}")
try:
vm = VirtualMachine(
name="sanity-check-dry-run-vm",
namespace=namespace,
client=admin_client,
body={
"spec": {
"running": False,
"template": {
"spec": {
"domain": {
"devices": {},
"resources": {
"requests": {
"memory": "64Mi",
},
},
},
},
},
},
},
dry_run=True,
)
vm.create()
LOGGER.info("Dry-run VM creation succeeded")
except ApiException as ex:
raise ClusterSanityError(
err_str=f"Dry-run VM creation failed: {ex}. This may indicate webhook or API server issues."
) from ex
except (ConnectionError, TimeoutError) as ex:
raise ClusterSanityError(
err_str=f"Connection error during dry-run VM creation: {ex}. Check cluster connectivity and webhook health."
) from ex
except Exception as ex:
raise ClusterSanityError(
err_str=f"Unexpected error during dry-run VM creation: {ex}. Check cluster state and webhook configuration."
) from ex
class ResourceMismatch(Exception):
pass
def exit_pytest_execution(message, return_code=SANITY_TESTS_FAILURE, filename=None, junitxml_property=None):
"""Exit pytest execution
Exit pytest execution; invokes pytest_sessionfinish.
Optionally, log an error message to tests-collected-info/utilities/pytest_exit_errors/<filename>
Args:
message (str): Message to display upon exit and to log in errors file
return_code (int. Default: 99): Exit return code
filename (str, optional. Default: None): filename where the given message will be saved
junitxml_property (pytest plugin): record_testsuite_property
"""
target_location = os.path.join(get_data_collector_dir(), "pytest_exit_errors")
# collect must-gather for past 5 minutes:
if return_code == SANITY_TESTS_FAILURE:
try:
collect_default_cnv_must_gather_with_vm_gather(
since_time=TIMEOUT_5MIN,
target_dir=target_location,
)
except Exception as current_exception:
LOGGER.warning(f"Failed to collect logs cnv must-gather after cluster_sanity failure: {current_exception}")
if filename:
write_to_file(
file_name=filename,
content=message,
base_directory=target_location,
)
if junitxml_property:
junitxml_property(name="exit_code", value=return_code)
pytest.exit(reason=message, returncode=return_code)
def get_kubevirt_package_manifest(admin_client):
return get_raw_package_manifest(
admin_client=admin_client,
name=py_config["hco_cr_name"],
catalog_source=HCO_CATALOG_SOURCE,
)
def get_raw_package_manifest(admin_client, name, catalog_source):
"""
Gets PackageManifest ResourceField associated with catalog source.
Multiple PackageManifest Resources exist with the same name but different labels.
Requires raw=True
Args:
admin_client (DynamicClient): dynamic client object
name (str): Name of PackageManifest
catalog_source (str): Catalog source
Returns:
ResourceField or None: PackageManifest ResourceField or None if no matching resource found
"""
for resource_field in PackageManifest.get(
dyn_client=admin_client,
namespace=py_config["marketplace_namespace"],
field_selector=f"metadata.name={name}",
label_selector=f"catalog={catalog_source}",
raw=True, # multiple packagemanifest exists with the same name but different labels
):
LOGGER.info(
f"Found expected packagemanefest: {resource_field.metadata.name}: "
f"in catalog: {resource_field.metadata.labels.catalog}"
)
return resource_field
LOGGER.warning(f"Not able to find any packagemanifest {name} in {catalog_source} source.")
def get_subscription(admin_client, namespace, subscription_name):
"""
Gets subscription by name
Args:
admin_client (DynamicClient): Dynamic client object
namespace (str): Name of the namespace
subscription_name (str): Name of the subscription
Returns:
Resource: subscription resource
Raises:
NotFoundError: when a given subscription is not found in a given namespace
"""
subscription = Subscription(
client=admin_client,
name=subscription_name,
namespace=namespace,
)
if subscription.exists:
return subscription
raise ResourceNotFoundError(f"Subscription {subscription_name} not found in namespace: {namespace}")
def get_csv_by_name(csv_name, admin_client, namespace):
"""
Gets csv from a given namespace by name
Args:
csv_name (str): Name of the csv
admin_client (DynamicClient): dynamic client object
namespace (str): namespace name
Returns:
Resource: csv resource
Raises:
NotFoundError: when a given csv is not found in a given namespace
"""
csv = ClusterServiceVersion(client=admin_client, namespace=namespace, name=csv_name)
if csv.exists:
return csv
raise ResourceNotFoundError(f"Csv {csv_name} not found in namespace: {namespace}")
def get_clusterversion(dyn_client):
for cvo in ClusterVersion.get(dyn_client=dyn_client):
return cvo
def get_deployments(admin_client, namespace):
return list(Deployment.get(dyn_client=admin_client, namespace=namespace))
def get_related_images_name_and_version(csv):
related_images = {}
for item in csv.instance.spec.relatedImages:
# Example: 'registry.redhat.io/container-native-virtualization/node-maintenance-operator:v2.6.3-1'
image_name = re.search(r".*/(?P<name>.*?):(.*)", item["name"]).group(1)
if image_name:
related_images[image_name] = item["image"]
LOGGER.info(f"From {csv.name} the related image information gathered: {related_images}")
return related_images
def run_virtctl_command(command, virtctl_binary=VIRTCTL, namespace=None, check=False, verify_stderr=True):
"""
Run virtctl command
Args:
virtctl_binary (str): virtctl binary including full path to binary