-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathinfra.py
More file actions
1191 lines (983 loc) · 42.9 KB
/
infra.py
File metadata and controls
1191 lines (983 loc) · 42.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import base64
import io
import json
import logging
import os
import platform
import re
import shlex
import stat
import subprocess
import tarfile
import tempfile
import time
import zipfile
from contextlib import contextmanager
from functools import cache
from subprocess import PIPE, CalledProcessError, Popen
from typing import Any, Generator
import netaddr
import requests
import urllib3
import yaml
from kubernetes.client import ApiException
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError
from ocp_resources.cluster_service_version import ClusterServiceVersion
from ocp_resources.cluster_version import ClusterVersion
from ocp_resources.console_cli_download import ConsoleCLIDownload
from ocp_resources.daemonset import DaemonSet
from ocp_resources.deployment import Deployment
from ocp_resources.exceptions import ResourceTeardownError
from ocp_resources.hyperconverged import HyperConverged
from ocp_resources.infrastructure import Infrastructure
from ocp_resources.namespace import Namespace
from ocp_resources.package_manifest import PackageManifest
from ocp_resources.pod import Pod
from ocp_resources.project_request import ProjectRequest
from ocp_resources.resource import Resource, ResourceEditor, get_client
from ocp_resources.secret import Secret
from ocp_resources.subscription import Subscription
from packaging.version import Version
from pyhelper_utils.shell import run_command, run_ssh_commands
from pytest_testconfig import config as py_config
from requests import HTTPError, Timeout, TooManyRedirects
from timeout_sampler import TimeoutExpiredError, TimeoutSampler, retry
import utilities.virt
from utilities.constants import (
AMD_64,
AUDIT_LOGS_PATH,
CLUSTER,
HCO_CATALOG_SOURCE,
KUBECONFIG,
NET_UTIL_CONTAINER_IMAGE,
OC_ADM_LOGS_COMMAND,
PROMETHEUS_K8S,
TIMEOUT_1MIN,
TIMEOUT_2MIN,
TIMEOUT_3MIN,
TIMEOUT_4MIN,
TIMEOUT_5MIN,
TIMEOUT_5SEC,
TIMEOUT_6MIN,
TIMEOUT_10MIN,
TIMEOUT_10SEC,
TIMEOUT_30SEC,
VIRTCTL,
X86_64,
NamespacesNames,
)
from utilities.exceptions import (
UrlNotFoundError,
UtilityPodNotFoundError,
)
from utilities.ssp import guest_agent_version_parser
NON_EXIST_URL = "https://noneexist.test" # Use 'test' domain rfc6761
EXCLUDED_FROM_URL_VALIDATION = ("", NON_EXIST_URL)
INTERNAL_HTTP_SERVER_ADDRESS = "internal-http.cnv-tests-utilities.svc.cluster.local"
HOST_MODEL_CPU_LABEL = f"host-model-cpu.node.{Resource.ApiGroup.KUBEVIRT_IO}"
LOGGER = logging.getLogger(__name__)
def label_project(name, label, admin_client):
ns = Namespace(client=admin_client, name=name, ensure_exists=True)
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=TIMEOUT_2MIN)
ResourceEditor({ns: {"metadata": {"labels": label}}}).update()
def create_ns(
name: str,
admin_client: DynamicClient,
unprivileged_client: DynamicClient | None = None,
labels: dict[str, str] | None = None,
teardown: bool = True,
delete_timeout: int = TIMEOUT_6MIN,
):
"""
For kubemacpool labeling opt-modes, provide kmp_vm_label and admin_client as admin_client
"""
if not unprivileged_client:
with Namespace(
client=admin_client,
name=name,
label=labels,
teardown=teardown,
delete_timeout=delete_timeout,
) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=TIMEOUT_2MIN)
yield ns
else:
ProjectRequest(name=name, client=unprivileged_client, teardown=teardown).deploy()
label_project(name=name, label=labels, admin_client=admin_client)
ns = Namespace(client=unprivileged_client, name=name, ensure_exists=True)
yield ns
ns.client = admin_client
if teardown and not ns.clean_up():
raise ResourceTeardownError(resource=ns)
class ClusterHosts:
class Type:
VIRTUAL = "virtual"
PHYSICAL = "physical"
def url_excluded_from_validation(url):
# Negative URL test cases or internal http server
return url in EXCLUDED_FROM_URL_VALIDATION or INTERNAL_HTTP_SERVER_ADDRESS in url
def camelcase_to_mixedcase(camelcase_str):
# Utility to convert CamelCase to mixedCase
# Example: Service type may be NodePort but in VM attributes.spec.ports it is nodePort
return camelcase_str[0].lower() + camelcase_str[1:]
def get_pod_by_name_prefix(client, pod_prefix, namespace, get_all=False):
"""
Args:
client (DynamicClient): OCP Client to use.
pod_prefix (str): str or regex pattern.
namespace (str): Namespace name.
get_all (bool): Return all pods if True else only the first one.
Returns:
list or Pod: A list of all matching pods if get_all else only the first pod.
Raises:
ResourceNotFoundError: if no pods are found.
"""
pods = [pod for pod in Pod.get(client=client, namespace=namespace) if re.match(pod_prefix, pod.name)]
if get_all:
return pods # Some negative cases check if no pods exists.
elif pods:
return pods[0]
raise ResourceNotFoundError(f"A pod with the {pod_prefix} prefix does not exist")
def generate_namespace_name(file_path):
return (file_path.strip(".py").replace("/", "-").replace("_", "-"))[-63:].split("-", 1)[-1]
def get_pods(client: DynamicClient, namespace: Namespace, label: str = "") -> list[Pod]:
return list(
Pod.get(
client=client,
namespace=namespace.name,
label_selector=label,
)
)
def wait_for_pods_deletion(pods):
for pod in pods:
pod.wait_deleted()
def get_pod_container_error_status(pod: Pod) -> str | None:
try:
pod_instance_status = pod.instance.status
# Check the containerStatuses and if any container is in waiting state, return that information:
for container_status in pod_instance_status.get("containerStatuses", []):
if waiting_container := container_status.get("state", {}).get("waiting"):
return waiting_container["reason"] if waiting_container.get("reason") else waiting_container
return None
except NotFoundError:
LOGGER.error(f"Pod {pod.name} was not found")
raise
def get_not_running_pods(pods: list[Pod], filter_pods_by_name: str = "") -> list[dict[str | None, str]]:
pods_not_running = []
for pod in pods:
if filter_pods_by_name and filter_pods_by_name in pod.name: # type: ignore[operator]
LOGGER.warning(f"Ignoring pod: {pod.name} for pod state validations.")
continue
try:
pod_instance = pod.instance
# Waits for all pods in a given namespace to be in final healthy state(running/completed).
# We also need to keep track of pods marked for deletion as not running. This would ensure any
# pod that was spinned up in place of pod marked for deletion, reaches healthy state before end
# of this check
if pod_instance.metadata.get("deletionTimestamp") or pod_instance.status.phase not in (
pod.Status.RUNNING,
pod.Status.SUCCEEDED,
):
pods_not_running.append({pod.name: pod.status})
elif container_status_error := get_pod_container_error_status(pod=pod):
pods_not_running.append({pod.name: container_status_error})
except ResourceNotFoundError, NotFoundError:
LOGGER.warning(f"Ignoring pod {pod.name} that disappeared during cluster sanity check")
pods_not_running.append({pod.name: "Deleted"})
return pods_not_running
def wait_for_pods_running(
admin_client: DynamicClient,
namespace: Namespace,
number_of_consecutive_checks: int = 1,
filter_pods_by_name: str = "",
) -> None:
"""
Waits for all pods in a given namespace to reach Running/Completed state. To avoid catching all pods in running
state too soon, use number_of_consecutive_checks with appropriate values.
Args:
admin_client(DynamicClient): Dynamic client
namespace(Namespace): A namespace object
number_of_consecutive_checks(int): Number of times to check for all pods in running state
filter_pods_by_name(str): string to filter pod names by
Raises:
TimeoutExpiredError: Raises TimeoutExpiredError if any of the pods in the given namespace are not in Running
state
"""
samples = TimeoutSampler(
wait_timeout=TIMEOUT_5MIN,
sleep=TIMEOUT_5SEC,
func=get_pods,
client=admin_client,
namespace=namespace,
exceptions_dict={NotFoundError: []},
)
not_running_pods = []
try:
current_check = 0
for sample in samples:
if sample:
if not_running_pods := get_not_running_pods(pods=sample, filter_pods_by_name=filter_pods_by_name):
LOGGER.warning(f"Not running pods: {not_running_pods}")
current_check = 0
else:
current_check += 1
if current_check >= number_of_consecutive_checks:
return
except TimeoutExpiredError:
if not_running_pods:
LOGGER.error(
f"timeout waiting for all pods in namespace {namespace.name} to reach "
f"running state, following pods are in not running state: {not_running_pods}"
)
raise
def get_daemonset_by_name(admin_client, daemonset_name, namespace_name):
"""
Gets a daemonset object by name
Args:
admin_client (DynamicClient): a DynamicClient object
daemonset_name (str): Name of the daemonset
namespace_name (str): Name of the associated namespace
Returns:
Daemonset: Daemonset object
"""
daemon_set = DaemonSet(
client=admin_client,
namespace=namespace_name,
name=daemonset_name,
)
if daemon_set.exists:
return daemon_set
raise ResourceNotFoundError(f"Daemonset: {daemonset_name} not found in namespace: {namespace_name}")
def wait_for_consistent_resource_conditions(
dynamic_client,
expected_conditions,
resource_kind,
stop_conditions=None,
condition_key1="type",
condition_key2="status",
namespace=None,
total_timeout=TIMEOUT_10MIN,
polling_interval=5,
consecutive_checks_count=10,
exceptions_dict=None,
resource_name=None,
):
"""This function awaits certain conditions of a given resource_kind (HCO, CSV, etc.).
Using TimeoutSampler loop and poll the CR (of the resource_kind type) and attempt to match the expected conditions
against the actual conditions found in the CR.
Since the conditions statuses might change, we use consecutive checks in order to have consistent results (stable),
thereby ascertaining that the expected conditions are met over time.
Args:
dynamic_client (DynamicClient): admin client
namespace (str, default: None): resource namespace. Not needed for cluster-scoped resources.
expected_conditions (dict): a dict comprises expected conditions to meet, for example:
{<condition key's value>: <condition key's value>,
Resource.Condition.AVAILABLE: Resource.Condition.Status.TRUE,}
stop_conditions (dict, optional): A dict comprising conditions that should not be met.
The keys represent the value of the `type` field in a condition, and the values represent the value of the
`reason` field, for example:
{<condition type key's value>: <condition reason key's value>}
If any of the stop condition are met, the function will not wait for the expected_conditions.
resource_kind (Resource): (e.g. HyperConverged, ClusterServiceVersion)
condition_key1 (str): the key of the first condition in the actual resource_kind (e.g. type, reason, status)
condition_key2 (str): the key of the second condition in the actual resource_kind (e.g. type, reason, status)
total_timeout (int): total timeout to wait for (seconds)
polling_interval (int): the time to sleep after each iteration (seconds)
consecutive_checks_count (int): the number of repetitions for the status check to make sure the transition is
done.
The default value for this argument is not absolute, and there are situations in which it should be higher
in order to ascertain the consistency of the Ready status.
Possible situations:
1. the resource is in a Ready status, because the process (that should cause
the change in its state) has not started yet.
2. some components are in Ready status, but others have not started the process yet.
exceptions_dict: TimeoutSampler exceptions_dict
Raises:
TimeoutExpiredError: raised when expected conditions are not met within the timeframe
"""
samples = TimeoutSampler(
wait_timeout=total_timeout,
sleep=polling_interval,
func=lambda: list(
resource_kind.get(
client=dynamic_client,
namespace=namespace,
name=resource_name,
)
),
exceptions_dict=exceptions_dict,
)
current_check = 0
actual_conditions = {}
LOGGER.info(
f"Waiting for resource to stabilize: resource_kind={resource_kind.__name__} conditions={expected_conditions} "
f"sleep={total_timeout} consecutive_checks_count={consecutive_checks_count}"
)
try:
for sample in samples:
status_conditions = sample[0].instance.get("status", {}).get("conditions")
if status_conditions:
actual_conditions = {
condition[condition_key1]: condition[condition_key2]
for condition in status_conditions
if condition[condition_key1] in expected_conditions
}
if actual_conditions == expected_conditions:
current_check += 1
if current_check >= consecutive_checks_count:
return
continue
else:
current_check = 0
if stop_conditions:
actual_conditions = {condition["type"]: condition["reason"] for condition in status_conditions}
matched_stop_conditions = {
type: reason
for type, reason in stop_conditions.items()
if type in actual_conditions and actual_conditions[type] == reason
}
if matched_stop_conditions:
LOGGER.error(
f"Execution halted due to matched stop conditions: {matched_stop_conditions}. "
f"Current status conditions: {status_conditions}."
)
raise TimeoutExpiredError(
f"Stop condition met for {resource_kind.__name__}/{resource_name}."
)
except TimeoutExpiredError:
LOGGER.error(
f"Timeout expired meeting conditions for resource: resource={resource_kind.kind} "
f"expected_conditions={expected_conditions} status_conditions={actual_conditions}"
)
raise
def get_node_pod(utility_pods, node):
"""
This function will return a pod based on the node specified as an argument.
Args:
utility_pods (list): List of utility pods.
node (Node or str): Node to get the pod for it.
"""
_node_name = node.name if hasattr(node, "name") else node
for pod in utility_pods:
if pod.node.name == _node_name:
return pod
class ExecCommandOnPod:
def __init__(self, utility_pods, node):
"""
Run command on pod with chroot /host
Args:
utility_pods (list): List of utility pods resources.
node (Node): Node resource.
Returns:
str: Command output
"""
self.pod = get_node_pod(utility_pods=utility_pods, node=node)
if not self.pod:
raise UtilityPodNotFoundError(node=node.name)
def exec(self, command, chroot_host=True, ignore_rc=False, timeout=TIMEOUT_1MIN):
chroot_command = "chroot /host" if chroot_host else ""
_command = shlex.split(f"{chroot_command} bash -c {shlex.quote(command)}")
return self.pod.execute(command=_command, ignore_rc=ignore_rc, timeout=timeout).strip()
def get_interface_ip(self, interface):
out = self.exec(command=f"ip addr show {interface}")
match_ip = re.search(r"[0-9]+(?:\.[0-9]+){3}", out)
if match_ip:
interface_ip = match_ip.group()
if netaddr.valid_ipv4(interface_ip):
return interface_ip
@property
def reboot(self):
try:
self.exec(command="sudo echo b > /proc/sysrq-trigger")
except ApiException:
return True
return False
@property
def is_connective(self):
return self.exec(command="ls")
def interface_status(self, interface):
return self.exec(command=f"cat /sys/class/net/{interface}/operstate")
@property
def release_info(self):
out = self.exec(command="cat /etc/os-release")
release_info = {}
for line in out.strip().splitlines():
values = line.split("=", 1)
if len(values) != 2:
continue
release_info[values[0].strip()] = values[1].strip(" \"'")
return release_info
def get_kubevirt_package_manifest(admin_client):
return get_raw_package_manifest(
admin_client=admin_client,
name=py_config["hco_cr_name"],
catalog_source=HCO_CATALOG_SOURCE,
)
def get_raw_package_manifest(admin_client, name, catalog_source):
"""
Gets PackageManifest ResourceField associated with catalog source.
Multiple PackageManifest Resources exist with the same name but different labels.
Requires raw=True
Args:
admin_client (DynamicClient): dynamic client object
name (str): Name of PackageManifest
catalog_source (str): Catalog source
Returns:
ResourceField or None: PackageManifest ResourceField or None if no matching resource found
"""
for resource_field in PackageManifest.get(
client=admin_client,
namespace=py_config["marketplace_namespace"],
field_selector=f"metadata.name={name}",
label_selector=f"catalog={catalog_source}",
raw=True, # multiple packagemanifest exists with the same name but different labels
):
LOGGER.info(
f"Found expected packagemanefest: {resource_field.metadata.name}: "
f"in catalog: {resource_field.metadata.labels.catalog}"
)
return resource_field
LOGGER.warning(f"Not able to find any packagemanifest {name} in {catalog_source} source.")
def get_subscription(admin_client, namespace, subscription_name):
"""
Gets subscription by name
Args:
admin_client (DynamicClient): Dynamic client object
namespace (str): Name of the namespace
subscription_name (str): Name of the subscription
Returns:
Resource: subscription resource
Raises:
NotFoundError: when a given subscription is not found in a given namespace
"""
subscription = Subscription(
client=admin_client,
name=subscription_name,
namespace=namespace,
)
if subscription.exists:
return subscription
raise ResourceNotFoundError(f"Subscription {subscription_name} not found in namespace: {namespace}")
def get_csv_by_name(csv_name, admin_client, namespace):
"""
Gets csv from a given namespace by name
Args:
csv_name (str): Name of the csv
admin_client (DynamicClient): dynamic client object
namespace (str): namespace name
Returns:
Resource: csv resource
Raises:
NotFoundError: when a given csv is not found in a given namespace
"""
csv = ClusterServiceVersion(client=admin_client, namespace=namespace, name=csv_name)
if csv.exists:
return csv
raise ResourceNotFoundError(f"Csv {csv_name} not found in namespace: {namespace}")
def get_clusterversion(client):
for cvo in ClusterVersion.get(client=client):
return cvo
def get_deployments(admin_client, namespace):
return list(Deployment.get(client=admin_client, namespace=namespace))
def get_related_images_name_and_version(csv):
related_images = {}
for item in csv.instance.spec.relatedImages:
# Example: 'registry.redhat.io/container-native-virtualization/node-maintenance-operator:v2.6.3-1'
image_name = re.search(r".*/(?P<name>.*?):(.*)", item["name"]).group(1)
if image_name:
related_images[image_name] = item["image"]
LOGGER.info(f"From {csv.name} the related image information gathered: {related_images}")
return related_images
def run_virtctl_command(command, virtctl_binary=VIRTCTL, namespace=None, check=False, verify_stderr=True):
"""
Run virtctl command
Args:
virtctl_binary (str): virtctl binary including full path to binary
command (list): Command to run
namespace (str, default:None): Namespace to send to virtctl command
check (bool, default:False): If check is True and the exit code was non-zero, it raises a
CalledProcessError
Returns:
tuple: True, out if command succeeded, False, err otherwise.
"""
virtctl_cmd = [virtctl_binary]
kubeconfig = os.getenv(KUBECONFIG)
if namespace:
virtctl_cmd.extend(["-n", namespace])
if kubeconfig:
virtctl_cmd.extend(["--kubeconfig", kubeconfig])
virtctl_cmd.extend(command)
return run_command(command=virtctl_cmd, check=check, verify_stderr=verify_stderr)
def get_hco_mismatch_statuses(hco_status_conditions, expected_hco_status):
current_status = {condition["type"]: condition["status"] for condition in hco_status_conditions}
mismatch_statuses = []
for condition_type, condition_status in expected_hco_status.items():
if current_status[condition_type] != condition_status:
mismatch_statuses.append(
f"Current condition type {condition_type} does not match expected status {condition_status}"
)
return mismatch_statuses
def get_hyperconverged_resource(client, hco_ns_name):
hco_name = py_config["hco_cr_name"]
hco = HyperConverged(
client=client,
namespace=hco_ns_name,
name=hco_name,
)
hco.api_version = f"{hco.ApiGroup.HCO_KUBEVIRT_IO}/{hco.ApiVersion.V1BETA1}"
if hco.exists:
return hco
raise ResourceNotFoundError(f"Hyperconverged: {hco_name} not found in {hco_ns_name}")
def get_utility_pods_from_nodes(nodes, admin_client, label_selector):
pods = list(Pod.get(client=admin_client, label_selector=label_selector))
nodes_without_utility_pods = [node.name for node in nodes if node.name not in [pod.node.name for pod in pods]]
assert not nodes_without_utility_pods, (
f"Missing pods with label {label_selector} for: {' '.join(nodes_without_utility_pods)}"
)
return [pod for pod in pods if pod.node.name in [node.name for node in nodes]]
def label_nodes(nodes, labels):
updates = [ResourceEditor({node: {"metadata": {"labels": labels}}}) for node in nodes]
for update in updates:
update.update(backup_resources=True)
yield nodes
for update in updates:
update.restore()
def get_daemonsets(admin_client, namespace):
return list(DaemonSet.get(client=admin_client, namespace=namespace))
@contextmanager
def scale_deployment_replicas(deployment_name, namespace, replica_count):
"""
It scales deployments replicas. At the end of the test restores them back
"""
deployment = Deployment(name=deployment_name, namespace=namespace)
initial_replicas = deployment.instance.spec.replicas
deployment.scale_replicas(replica_count=replica_count)
deployment.wait_for_replicas(deployed=replica_count > 0)
yield
deployment.scale_replicas(replica_count=initial_replicas)
deployment.wait_for_replicas(deployed=initial_replicas > 0)
def get_console_spec_links(admin_client, name):
console_cli_download_resource_content = ConsoleCLIDownload(name=name, client=admin_client)
if console_cli_download_resource_content.exists:
return console_cli_download_resource_content.instance.spec.links
raise ResourceNotFoundError(f"{name} ConsoleCLIDownload not found")
def get_all_console_links(console_cli_downloads_spec_links):
all_urls = [entry["href"] for entry in console_cli_downloads_spec_links]
assert all_urls, (
"No URL entries found in the resource: "
f"console_cli_download_resource_content={console_cli_downloads_spec_links}"
)
return all_urls
def download_and_extract_file_from_cluster(tmpdir, url):
"""
Download and extract archive file from the cluster
Args:
tmpdir (py.path.local): temporary folder to download the files.
url (str): URL to download from.
Returns:
list: list of extracted filenames
"""
zip_file_extension = ".zip"
LOGGER.info(f"Downloading archive using: url={url}")
urllib3.disable_warnings() # TODO: remove this when we fix the SSL warning
local_file_name = os.path.join(tmpdir, url.split("/")[-1])
with requests.get(url, verify=False, stream=True) as created_request:
created_request.raise_for_status()
with open(local_file_name, "wb") as file_downloaded:
for chunk in created_request.iter_content(chunk_size=8192):
file_downloaded.write(chunk)
LOGGER.info("Extract the downloaded archive.")
if url.endswith(zip_file_extension):
archive_file_object = zipfile.ZipFile(file=local_file_name)
else:
archive_file_object = tarfile.open(name=local_file_name, mode="r")
archive_file_object.extractall(path=tmpdir)
extracted_filenames = (
archive_file_object.namelist() if url.endswith(zip_file_extension) else archive_file_object.getnames()
)
LOGGER.info(f"Downloaded file: {extracted_filenames}")
if os.path.isfile(local_file_name):
os.remove(local_file_name)
return [os.path.join(tmpdir.strpath, namelist) for namelist in extracted_filenames]
def get_and_extract_file_from_cluster(urls, system_os, dest_dir, machine_type=None):
if not machine_type:
machine_type = get_machine_platform()
for url in urls:
if system_os in url and machine_type in url:
extracted_files = download_and_extract_file_from_cluster(tmpdir=dest_dir, url=url)
assert len(extracted_files) == 1, (
f"Only a single file expected in archive: extracted_files={extracted_files}"
)
return extracted_files[0]
raise UrlNotFoundError(f"Url not found for system_os={system_os}")
def download_file_from_cluster(
get_console_spec_links_name: str, dest_dir: os.PathLike[str], admin_client: DynamicClient
) -> str:
console_cli_links = get_console_spec_links(
admin_client=admin_client,
name=get_console_spec_links_name,
)
download_urls = get_all_console_links(console_cli_downloads_spec_links=console_cli_links)
os_system = platform.system().lower()
if os_system == "darwin" and platform.mac_ver()[0]:
os_system = "mac"
binary_file = get_and_extract_file_from_cluster(
system_os=os_system,
urls=download_urls,
dest_dir=dest_dir,
machine_type=get_machine_platform(),
)
os.chmod(binary_file, stat.S_IRUSR | stat.S_IXUSR)
return binary_file
def get_machine_platform():
os_machine_type = platform.machine()
return AMD_64 if os_machine_type == X86_64 else os_machine_type
def get_nodes_with_label(nodes, label):
return [node for node in nodes if label in node.labels.keys()]
def get_daemonset_yaml_file_with_image_hash(generated_pulled_secret=None, service_account=None):
ds_yaml_file = os.path.abspath("utilities/manifests/utility-daemonset.yaml")
image_info = utilities.virt.get_oc_image_info(
image=NET_UTIL_CONTAINER_IMAGE,
pull_secret=generated_pulled_secret,
)
with open(ds_yaml_file) as fd:
ds_yaml = yaml.safe_load(fd.read())
template_spec = ds_yaml["spec"]["template"]["spec"]
container = template_spec["containers"][0]
container["image"] = f"{container['image']}@{image_info.get('listDigest')}"
template_spec["containers"][0] = container
if service_account:
template_spec["serviceAccount"] = service_account.name
template_spec["serviceAccountName"] = service_account.name
return io.StringIO(yaml.dump(ds_yaml))
def unique_name(name, service_type=None):
# Sets unique name
service_type = f"{service_type}-" if service_type else ""
return f"{name}-{service_type}{time.time()}".replace(".", "-")
def get_openshift_pull_secret(client: DynamicClient = None) -> Secret:
pull_secret_name = "pull-secret"
secret = Secret(
client=client or get_client(),
name=pull_secret_name,
namespace=NamespacesNames.OPENSHIFT_CONFIG,
)
assert secret.exists, f"Pull-secret {pull_secret_name} not found in namespace {NamespacesNames.OPENSHIFT_CONFIG}"
return secret
@cache
def generate_openshift_pull_secret_file(client: DynamicClient = None) -> str:
# TODO: refactor this code; only needed by `utilities.virt.get_oc_image_info`
# Should be called by `utilities.virt.get_oc_image_info` and not require the user to pass it
pull_secret = get_openshift_pull_secret(client=client)
pull_secret_path = tempfile.mkdtemp(suffix="-cnv-tests-pull-secret")
json_file = os.path.join(pull_secret_path, "pull-secrets.json")
secret = base64.b64decode(pull_secret.instance.data[".dockerconfigjson"]).decode(encoding="utf-8")
with open(file=json_file, mode="w") as outfile:
outfile.write(secret)
return json_file
@retry(
wait_timeout=TIMEOUT_4MIN,
sleep=TIMEOUT_10SEC,
exceptions_dict={RuntimeError: []},
)
def get_node_audit_log_entries(log: str, node: str, log_entry: str) -> tuple[bool, list[str]]:
"""
Retrieve audit log entries from a node matching a specific log entry pattern.
Args:
log: Name of the audit log file to read
node: Node name to retrieve logs from
log_entry: Pattern to search for in the audit logs
Returns:
Tuple of (success: bool, lines: list[str]) where success indicates if operation completed
and lines contains matching log entries
"""
# Patterns to match errors that should trigger a retry
error_patterns_list = [
r"^\s*error:",
r"Unhandled Error.*couldn't get current server API group list.*i/o timeout",
r".*read tcp.*connection reset by peer",
]
error_patterns = re.compile("|".join(f"({pattern})" for pattern in error_patterns_list))
result = subprocess.run(
f"{OC_ADM_LOGS_COMMAND} {node} {AUDIT_LOGS_PATH}/{log} | grep {shlex.quote(log_entry)}",
shell=True,
capture_output=True,
text=True,
timeout=TIMEOUT_3MIN,
)
lines = result.stdout.splitlines()
has_errors = any(error_patterns.search(line) for line in lines)
if has_errors:
if any(line.strip().startswith("404 page not found") for line in lines):
LOGGER.warning(f"Skipping {log} check as it was rotated:\n{lines}")
return True, []
LOGGER.warning(f"oc command failed for node {node}, log {log}:\n{lines}")
raise RuntimeError
return True, lines
def get_node_audit_log_line_dict(logs: list[str], node: str, log_entry: str) -> Generator[dict[str, Any], None, None]:
"""
Parse audit log entries into dictionaries.
Args:
logs: List of audit log file names
node: Node name to retrieve logs from
log_entry: Pattern to search for in the audit logs
Yields:
Parsed JSON dictionaries from matching audit log lines
"""
for log in logs:
_, deprecated_api_lines = get_node_audit_log_entries(log=log, node=node, log_entry=log_entry)
if deprecated_api_lines:
for line in deprecated_api_lines:
try:
yield json.loads(line)
except json.decoder.JSONDecodeError:
LOGGER.error(f"Unable to parse line: {line!r}")
raise
def wait_for_node_status(node, status=True, wait_timeout=TIMEOUT_1MIN):
"""Wait for node status Ready (status=True) or NotReady (status=False)"""
for sample in TimeoutSampler(wait_timeout=wait_timeout, sleep=1, func=lambda: node.kubelet_ready):
if (status and sample) or (not status and not sample):
return
def utility_daemonset_for_custom_tests(
generated_pulled_secret,
cnv_tests_utilities_service_account,
label,
node_selector_label=None,
delete_pod_resources_limit=False,
):
"""
Deploy modified utility daemonset into the kube-system namespace.
Args:
generated_pulled_secret (str): fixture that contains the generated pulled secret.
cnv_tests_utilities_service_account (ServiceAccount): fixture that contains the service account
for CNV tests utilities.
label (str): string that is used as a label for the daemonset.
node_selector_label (dict): dictionary that contains the node selector for the daemonset. This is an optional
parameter and if not provided, no node selector will be set.
delete_pod_resources_limit (bool): boolean that indicates whether the pod resources
limit should be deleted or not.
Returns:
DaemonSet: DaemonSet object.
"""
ds_yaml_file = get_daemonset_yaml_file_with_image_hash(
generated_pulled_secret=generated_pulled_secret,
service_account=cnv_tests_utilities_service_account,
)
ds_yaml = yaml.safe_load(ds_yaml_file.read())
ds_yaml_spec = ds_yaml["spec"]
ds_yaml_metadata = ds_yaml["metadata"]
if node_selector_label:
ds_yaml_spec["template"]["spec"]["nodeSelector"] = node_selector_label
if delete_pod_resources_limit:
del ds_yaml["spec"]["template"]["spec"]["containers"][0]["resources"]["limits"]
ds_yaml_metadata["labels"]["cnv-test"] = label
ds_yaml_metadata["name"] = label
ds_yaml_spec["selector"]["matchLabels"]["cnv-test"] = label
ds_yaml_spec["template"]["metadata"]["labels"]["cnv-test"] = label
ds_yaml_spec["template"]["spec"]["containers"][0]["name"] = label
ds_yaml_file = io.StringIO(yaml.dump(ds_yaml))
with DaemonSet(yaml_file=ds_yaml_file) as ds:
ds.wait_until_deployed()
yield ds
def login_with_token(api_address, token):
"""
Log in to an OpenShift cluster using a token.
Args:
api_address (str): The API address of the OpenShift cluster.
token (str): The authentication token.
Returns:
bool: True if login is successful, False otherwise.
"""
login_command = f"oc login {api_address} --token {token}"
return login_to_account(login_command=login_command)
def login_with_user_password(api_address, user, password=None):
"""
Log in to an OpenShift cluster using a username and password.
Args:
api_address (str): The API address of the OpenShift cluster.
user (str): Cluster's username
password (str, optional): Cluster's password
Returns:
bool: True if login is successful otherwise False.
"""
login_command = f"oc login {api_address} -u {user}"
if password:
login_command += f" -p {password}"
return login_to_account(login_command=login_command)
def login_to_account(login_command):
"""
Log in to an OpenShift cluster using a given login command.
Args:
login_command (str): The full login command.
Returns:
bool: True if login is successful, False otherwise.
"""
stop_errors = [
"connect: no route to host",
"x509: certificate signed by unknown authority",
]
samples = TimeoutSampler(
wait_timeout=60,
sleep=3,
exceptions_dict={CalledProcessError: []},
func=Popen,
args=login_command,
shell=True,
stdout=PIPE,
stderr=PIPE,
)
login_result = None
try:
LOGGER.info("Trying to login to account")
for sample in samples:
login_result = sample.communicate()
login_decoded_result = login_result[1].decode("utf-8")
if sample.returncode == 0:
LOGGER.info("Login - success")