-
Notifications
You must be signed in to change notification settings - Fork 346
/
Copy pathcts-cli.in
3446 lines (2999 loc) · 159 KB
/
cts-cli.in
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!@PYTHON@
"""Regression tests for Pacemaker's command line tools."""
# pylint doesn't like the module name "cts-cli" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
# We know this is a very long file.
# pylint: disable=too-many-lines
__copyright__ = "Copyright 2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
from contextlib import contextmanager
from datetime import datetime, timedelta
import fileinput
from functools import partial
from gettext import ngettext
from multiprocessing import Pool, cpu_count
import os
import pathlib
import re
from shutil import copyfile
import signal
from string import Formatter
import subprocess
import sys
from tempfile import NamedTemporaryFile, TemporaryDirectory, mkstemp
import types
# These imports allow running from a source checkout after running `make`.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker._cts.errors import XmlValidationError
from pacemaker._cts.validate import validate
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
# Individual tool tests are split out, but can also be accessed as a group with "tools"
tools_tests = ["cibadmin", "crm_attribute", "crm_standby", "crm_resource",
"crm_ticket", "crmadmin", "crm_shadow", "crm_verify"]
# The default list of tests to run, in the order they should be run
default_tests = ["access_render", "daemons", "dates", "error_codes"] + tools_tests + \
["crm_mon", "acls", "validity", "upgrade", "rules", "feature_set"]
other_tests = ["agents"]
# The directory containing this program
test_home = os.path.dirname(os.path.realpath(__file__))
# The name of the shadow CIB
SHADOW_NAME = "cts-cli"
# Arguments to pass to valgrind
VALGRIND_ARGS = ["-q", "--gen-suppressions=all", "--show-reachable=no", "--leak-check=full",
"--trace-children=no", "--time-stamp=yes", "--num-callers=20",
"--suppressions=%s/valgrind-pcmk.suppressions" % test_home]
class PluralFormatter(Formatter):
"""
Special string formatting class for selecting singular vs. plurals.
Use like so:
fmt = PluralFormatter()
print(fmt.format("{0} {0}:plural,test,tests} succeeded", n_tests))
"""
def format_field(self, value, format_spec):
"""Convert a value to a formatted representation."""
if format_spec.startswith("plural,"):
eles = format_spec.split(',')
if len(eles) == 2:
singular = eles[1]
plural = singular + "s"
else:
singular = eles[1]
plural = eles[2]
return ngettext(singular, plural, value)
return super().format_field(value, format_spec)
def apply_substitutions(s, extra=None):
"""Apply text substitutions to an input string and return it."""
substitutions = {
"cts_cli_data": "%s/cli" % test_home,
"shadow": SHADOW_NAME,
"test_home": test_home,
}
if extra is not None:
substitutions.update(extra)
return s.format(**substitutions)
def cleanup_shadow_dir():
"""Remove any previously created shadow CIB directory."""
subprocess.run(["crm_shadow", "--force", "--delete", SHADOW_NAME],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
def copy_existing_cib(existing):
"""
Generate a CIB by copying an existing one to a temporary location.
This is suitable for use with the cib_gen= parameter to the TestGroup class.
"""
(fp, new) = mkstemp(prefix="cts-cli.cib.xml.")
os.close(fp)
copyfile(apply_substitutions(existing), new)
return new
def current_cib():
"""Return the complete current CIB."""
with environ({"CIB_user": "root"}):
return subprocess.check_output(["cibadmin", "-Q"], encoding="utf-8")
def make_test_group(desc, cmd, classes, **kwargs):
"""
Create a TestGroup that replicates the same test for multiple classes.
The given description, cmd, and kwargs will be passed as arguments to each
Test subclass in the classes parameter. The resulting objects will then be
added to a TestGroup and returned.
The main purpose of this function is to be able to run the same test for
both text and XML formats without having to duplicate everything. Thus, the
cmd string may contain "{fmt}", which will have any --output-as= class
variable substituted in.
"""
tests = []
for c in classes:
obj = c(desc, apply_substitutions(cmd, extra={"fmt": c.format_args}),
**kwargs)
tests.append(obj)
return TestGroup(tests)
def create_shadow_cib(shadow_dir, create_empty=True, validate_with=None,
valgrind=False):
"""
Create a shadow CIB file.
Keyword arguments:
create_empty -- If True, the shadow CIB will be empty. Otherwise, the
shadow CIB will be a copy of the currently active
cluster configuration.
validate_with -- If not None, the schema version to validate the CIB
against
valgrind -- If True, run the create operation under valgrind
"""
args = ["crm_shadow", "--batch", "--force"]
if create_empty:
args += ["--create-empty", SHADOW_NAME]
else:
args += ["--create", SHADOW_NAME]
if validate_with is not None:
args += ["--validate-with", validate_with]
if valgrind:
args = ["valgrind"] + VALGRIND_ARGS + args
os.environ["CIB_shadow_dir"] = shadow_dir
os.environ["CIB_shadow"] = SHADOW_NAME
subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
delete_shadow_resource_defaults()
def delete_shadow_resource_defaults():
"""Clear out the rsc_defaults section from a shadow CIB file."""
# A newly created empty CIB might or might not have a rsc_defaults section
# depending on whether the --with-resource-stickiness-default configure
# option was used. To ensure regression tests behave the same either way,
# delete any rsc_defaults after creating or erasing a CIB.
subprocess.run(["cibadmin", "--delete", "--xml-text", "<rsc_defaults/>"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
# The above command might or might not bump the CIB version, so reset it
# to ensure future changes result in the same version for comparison.
reset_shadow_cib_version()
def reset_shadow_cib_version():
"""Set various version numbers in a shadow CIB file back to 0."""
with fileinput.input(files=[shadow_path()], inplace=True) as f:
for line in f:
line = re.sub('epoch="[0-9]*"', 'epoch="1"', line)
line = re.sub('num_updates="[0-9]*"', 'num_updates="0"', line)
line = re.sub('admin_epoch="[0-9]*"', 'admin_epoch="0"', line)
print(line, end='')
def run_cmd_list(cmds):
"""
Run one or more shell commands.
cmds can be:
* A string
* A Python function
* A list of the above
Raises subprocess.CalledProcessError on error.
"""
if cmds is None:
return
if isinstance(cmds, (str, types.FunctionType)):
cmds = [cmds]
for c in cmds:
if isinstance(c, types.FunctionType):
c()
else:
subprocess.run(apply_substitutions(c), stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, universal_newlines=True, check=True)
def sanitize_output(s):
"""
Replace content in the output expected to change between test runs.
This is stuff like version numbers, timestamps, source line numbers,
build options, system names and messages, etc.
"""
# A list of tuples of regular expressions and their replacements.
replacements = [
(r'Created new pacemaker-.* configuration', r'Created new pacemaker configuration'),
(r'Device not configured', r'No such device or address'),
(r'^Entity: line [0-9]+: ', r''),
(r'Last change: .*', r'Last change:'),
(r'Last updated: .*', r'Last updated:'),
(r'^Migration will take effect until: .*', r'Migration will take effect until:'),
(r'(\* Possible values.*: .*)\(default: [^)]*\)', r'\1(default: )'),
(r"""-X '.*'""", r"""-X '...'"""),
(r' api-version="[^"]*"', r' api-version="X"'),
(r'\(apply_upgrade@.*\.c:[0-9]+\)', r'apply_upgrade'),
(r'\(invert_action@.*\.c:[0-9]+\)', r'invert_action'),
(r'\(pcmk__update_schema@.*\.c:[0-9]+\)', r'pcmk__update_schema'),
(r'(<change-attr name="crm_feature_set" .* value=")[0-9.]*"', r'\1"'),
(r'(<change-attr name="validate-with" .* value="pacemaker-)[0-9.]+"', r'\1X"'),
(r'(<cib.*) cib-last-written="[^"]*"', r'\1'),
(r'crm_feature_set="[^"]*" ', r''),
(r'@crm_feature_set=[0-9.]+, ', r''),
(r'\(crm_time_parse_duration@.*\.c:[0-9]+\)', r'crm_time_parse_duration'),
(r'\(crm_time_parse_period@.*\.c:[0-9]+\)', r'crm_time_parse_period'),
(r'\(crm_time_parse_sec@.*\.c:[0-9]+\)', r'crm_time_parse_sec'),
(r' default="[^"]*"', r' default=""'),
(r' end="[0-9][-+: 0-9]*Z*"', r' end=""'),
(r'last_change time=".*"', r'last_change time=""'),
(r'last_update time=".*"', r'last_update time=""'),
(r' last-rc-change=[\'"][-+A-Za-z0-9: ]*[\'"],?', r''),
(r'\(parse_date@.*\.c:[0-9]+\)', r'parse_date'),
(r'\((pcmk__.*)@.*\.c:[0-9]+\)', r'\1'),
(r'.*Relax-NG validity error : ', r''),
(r'request=".*(crm_?[a-zA-Z0-9]+) ', r'request="\1 '),
(r'request=".*iso8601', r'request="iso8601'),
(r' start="[0-9][-+: 0-9]*Z*"', r' start=""'),
(r'/tmp/cts-cli\.[^/]*/shadow.cts-cli', r'/tmp/cts-cli.shadow/shadow.cts-cli'),
(r'^/tmp/cts-cli\.xmllint\.[^:]*:', r'/tmp/cts-cli.xmllint:'),
(r'^/tmp/cts-cli\.xmllint\.[^ ]* fails to validate', r'/tmp/cts-cli.xmllint fails to validate'),
(r'.*\((unpack_.*)@.*\.c:[0-9]+\)', r'\1'),
(r' validate-with="[^"]*"', r''),
(r'(@validate-with=pacemaker-)[0-9.]+,', r'\1X,'),
(r' version="[^"]*"', r' version=""'),
(r'\(version .*\)', r'(version)'),
(r'--xml-file .*cli/(.*)\.xml', r'--xml-file \1.xml'),
]
new_output = []
for line in s:
# @TODO Add a way to suppress this message within cibadmin, and then drop
# the handling here.
if line.startswith("The supplied command can provide skewed result"):
continue
for (pattern, repl) in replacements:
line = re.sub(pattern, repl, line)
new_output.append(line)
return new_output
def shadow_path():
"""Return the current shadow CIB path."""
p = subprocess.check_output(["crm_shadow", "--file"], encoding="utf-8")
return p.strip()
def write_cib(s):
"""
Generate a CIB by writing a string to a temporary location.
This is suitable for use with the cib_gen= parameter to the TestGroup class.
"""
(fp, new) = mkstemp(prefix="cts-cli.cib.xml.")
os.write(fp, s.encode())
os.close(fp)
return new
@contextmanager
def environ(env):
"""
Run code in an environment modified with the provided dict.
This context manager augments the current process environment with the provided
dict, allowing code to be constructed like so:
e = {"CIB_user": "xyx"}
with environ(e):
...
When the context manager exits, the previous environment will be restored.
It is possible to remove an environment key (whether it was in the environment by
default, or given with a nested call to this context) by passing None for the
value. Additionally, this context manager accepts None for the env parameter,
in which case nothing will be done.
Finally, note that values in env will be passed to apply_substitutions before
being set in the environment.
"""
if env is None:
env = {}
original_env = {}
else:
original_env = os.environ.copy()
for k, v in env.items():
if v is None:
os.environ.pop(k)
else:
os.environ[k] = apply_substitutions(v)
try:
yield
finally:
for k, v in original_env.items():
if v is None:
os.environ.pop(k)
else:
os.environ[k] = v
class StdinCmd:
"""
A class for defining a command that should be run later.
subprocess.Popen (and its various helper functions) start running the command
immediately, which doesn't work if we want to provide the command when a Test
is created, but delay its execution until the environment is defined when the
Test is run.
This class allows us to do that.
"""
def __init__(self, cmd):
"""Create a new StdinCmd instance.
Arguments:
cmd -- The command string to run later. This string will be passed
to apply_substitutions before being executed.
"""
self._cmd = cmd
def run(self):
"""Run this command, returning a subprocess.Popen object."""
return subprocess.Popen(apply_substitutions(self._cmd), shell=True,
encoding="utf-8", stdout=subprocess.PIPE)
class Test:
"""A base class for defining a single command line regression test."""
format_args = ""
def __init__(self, desc, cmd, expected_rc=ExitStatus.OK, update_cib=False,
setup=None, teardown=None, stdin=None, env=None):
"""
Create a new Test instance.
Arguments:
desc -- A short human-readable description of this test
cmd -- The command to run for this test, as a string. This string
will be passed to apply_substitutions before being executed.
Keyword arguments:
expected_rc -- The expected return value of cmd
update_cib -- If True, the resulting CIB will be printed after
performing the test
setup -- A shell command to be run in the same environment as
cmd, immediately before the test. Valid types are:
a string, a Python function, or a list of the above
teardown -- Like setup, but runs immediately after the test
stdin -- If not None, the text to feed to cmd as its stdin
env -- If not None, a dict of values to be added to the test
environment. This will be added when the test is run
and will override anything given to the TestGroup.
"""
self.desc = desc
self.cmd = cmd
self.expected_rc = expected_rc
self.update_cib = update_cib
self._setup = setup
self._teardown = teardown
self._stdin = stdin
if env is None:
self._env = {}
else:
self._env = env
self._output = None
@property
def output(self):
"""Return the test's detailed output."""
return self._output
def _log_end_test(self, rc):
"""Log a message when a test ends."""
if isinstance(rc, ExitStatus):
rc_str = str(rc)
else:
if rc < 0:
rc = abs(rc)
rc_str = signal.strsignal(rc)
else:
rc = ExitStatus(rc)
rc_str = str(rc)
self._output.append("=#=#=#= End test: %s - %s (%d) =#=#=#=" % (self.desc, rc_str, rc))
def _log_start_test(self):
"""Log a message when a test starts."""
self._output.append("=#=#=#= Begin test: %s =#=#=#=" % self.desc)
def _log_test_failed(self, app, rc):
"""Log a message when a test fails."""
self._output.append("* Failed (rc=%.3d): %-23s - %s" % (rc, app, self.desc))
def _log_test_passed(self, app):
"""Log a message when a test passes."""
self._output.append("* Passed: %-21s - %s" % (app, self.desc))
# pylint: disable=unused-argument
def _validate_hook(self, rc, _stdout, _stderr, valgrind=False):
"""Validate test output."""
self._log_end_test(rc)
return rc
def _run_setup_teardown(self, cmd, app):
"""
Run any setup or teardown command required by this test.
On success (or if no command is present), return True. On failure,
return False and log the stdout/stderr of the command for debugging.
Arguments:
cmd -- The setup/teardown command(s) to run
app -- The base name of the test command, for logging purposes
"""
try:
run_cmd_list(cmd)
return True
except subprocess.CalledProcessError as exn:
rc = exn.returncode
self._output.extend(exn.stderr.splitlines())
self._output.extend(exn.stdout.splitlines())
self._log_test_failed(app, rc)
return False
def run(self, group, env=None, valgrind=False):
"""
Run this test.
Basic output is printed to stdout, while detailed output is available
in the self.output property after this function has been run. Return
True if the return code matches self.expected_rc, and False otherwise.
Arguments:
group -- The name of the group this test is a part of, for logging purposes
Keyword arguments:
env -- If not None, a dict of values to be added to the test environment
"""
self._output = []
cmd = apply_substitutions(self.cmd)
app = cmd.split(" ")[0]
test_id = "%s(%s)" % (app, group)
print("* Running: %-31s - %s" % (test_id, self.desc))
self._log_start_test()
# Add any environment variables specified in Test.__init__
if env is None:
env = self._env
else:
env = env.update(self._env)
with environ(env):
# Run the setup hook, if any
if not self._run_setup_teardown(self._setup, app):
return False
# Define basic arguments for all forms of running this test.
kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.PIPE,
"shell": True, "universal_newlines": True, "check": False}
stdin_p = None
# Handle the stdin= parameter.
if isinstance(self._stdin, StdinCmd):
stdin_p = self._stdin.run()
kwargs["stdin"] = stdin_p.stdout
elif isinstance(self._stdin, pathlib.Path):
kwargs["input"] = self._stdin.read_text()
else:
kwargs["input"] = self._stdin
if valgrind:
cmd = "valgrind %s %s" % (" ".join(VALGRIND_ARGS), cmd)
# Run the test command
# We handle the "check" argument above in the kwargs dict.
# pylint: disable-msg=subprocess-run-check
cmd_p = subprocess.run(cmd, **kwargs)
rc = cmd_p.returncode
if stdin_p is not None:
stdin_p.stdout.close()
self._output.extend(cmd_p.stderr.splitlines())
self._output.extend(cmd_p.stdout.splitlines())
# Run the teardown hook, if any
if not self._run_setup_teardown(self._teardown, app):
return False
if self.update_cib:
self._output.append("=#=#=#= Current cib after: %s =#=#=#=" % self.desc)
self._output.extend(current_cib().splitlines())
self._validate_hook(rc, cmd_p.stdout, cmd_p.stderr, valgrind=valgrind)
if rc == self.expected_rc:
self._log_test_passed(app)
return True
self._log_test_failed(app, rc)
return False
class AclTest(Test):
"""
A Test subclass specialized for running certain ACL tests.
Differences from the Test class:
* Does not use the stdin= parameter.
* Runs the setup and teardown hooks in an environment that also sets
CIB_user=root, CIB_shadow="", and CIB_file to a temporary file.
* The setup hooks should construct a new CIB and write it to that
temporary file.
* Prints the input CIB before running the test.
"""
def run(self, group, env=None, valgrind=False):
"""
Run this test.
Basic output is printed to stdout, while detailed output is available
in the self.output property after this function has been run. Return
True if the return code matches self.expected_rc, and False otherwise.
Arguments:
group -- The name of the group this test is a part of, for logging purposes
Keyword arguments:
env -- If not None, a dict of values to be added to the test environment
"""
self._output = []
cmd = apply_substitutions(self.cmd)
app = cmd.split(" ")[0]
test_id = "%s(%s)" % (app, group)
print("* Running: %-31s - %s" % (test_id, self.desc))
# Add any environment variables specified in Test.__init__
if env is None:
env = self._env
else:
env = env.update(self._env)
with environ(env):
with NamedTemporaryFile(prefix="cts-cli.cib.") as fp:
fp.write(current_cib().encode())
fp.flush()
# Run the setup hook, if any. Typically, this is something that
# modifies the existing CIB. We need to do these modifications
# in a different environment from what the test will run in, since
# the test may not have the permissions necessary to do the
# modifications.
with environ({"CIB_user": "root", "CIB_file": fp.name, "CIB_shadow": None}):
if not self._run_setup_teardown(self._setup, app):
return False
# At the least, print the CIB that will be the test input.
self._output.extend(current_cib().splitlines())
# Note: This is positioned differently from where it is in Test.run.
self._log_start_test()
# Define basic arguments for running this test.
kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.PIPE,
"shell": True, "universal_newlines": True, "check": False}
# Read in the potentially modified CIB as the stdin for the test.
fp.seek(0)
kwargs["input"] = fp.read().decode(encoding="utf-8")
if valgrind:
cmd = "valgrind %s %s" % (" ".join(VALGRIND_ARGS), cmd)
# Run the test command
# We handle the "check" argument above in the kwargs dict.
# pylint: disable-msg=subprocess-run-check
cmd_p = subprocess.run(cmd, **kwargs)
rc = cmd_p.returncode
self._output.extend(cmd_p.stderr.splitlines())
self._output.extend(cmd_p.stdout.splitlines())
# Run the teardown hook, if any
with environ({"CIB_user": "root", "CIB_file": fp.name, "CIB_shadow": None}):
if not self._run_setup_teardown(self._teardown, app):
return False
if self.update_cib:
self._output.append("=#=#=#= Current cib after: %s =#=#=#=" % self.desc)
self._output.extend(current_cib().splitlines())
self._validate_hook(rc, cmd_p.stdout, cmd_p.stderr, valgrind=valgrind)
if rc == self.expected_rc:
self._log_test_passed(app)
return True
self._log_test_failed(app, rc)
return False
class ValidatingTest(Test):
"""A Test subclass that additionally runs test results through xmllint."""
format_args = "--output-as=xml"
def __init__(self, desc, cmd, **kwargs):
"""Create a new ValidatingTest instance."""
Test.__init__(self, desc + " (XML)", cmd, **kwargs)
def _validate_hook(self, rc, stdout, stderr, valgrind=False):
"""Validate test output with xmllint."""
# Do not validate if running under valgrind, even if told to do so. Valgrind
# will output a lot more stuff that is not XML, so it wouldn't validate
# anyway.
if valgrind:
return Test._validate_hook(self, rc, stdout, stderr, valgrind=valgrind)
try:
validate(stdout)
# We only care about the return code from validation if there was an error,
# which will be dealt with below. Here, we want to log the original return
# code from the test itself.
self._log_end_test(rc)
return 0
except XmlValidationError as e:
self._output.append("=#=#=#= End test: %s - Failed to validate (%d) =#=#=#=" % (self.desc, e.exit_code))
self._output.extend(e.output.splitlines())
return e.exit_code
class TestGroup:
"""A base class for a group of related tests."""
def __init__(self, tests, cib_gen=None, env=None, setup=None, teardown=None):
"""
Create a new TestGroup instance.
Arguments:
tests -- A list of Test instances
Keyword arguments:
cib_gen -- If not None, a function that generates a CIB file and returns the
name of that CIB. This will be added to the test environment as
CIB_file and used for all tests in this group. The file will then
be deleted after all tests have been run.
env -- If not None, a dict of values to be added to the test environment
setup -- A command string, python function, or list of the previous
types to run immediately before the test. This will be run in
the same environment as cmd.
teardown -- Like setup, but runs immediately after the tests
"""
self.tests = tests
self._cib_gen = cib_gen
self._env = env
self._setup = setup
self._teardown = teardown
self._successes = None
self._failures = None
self._output = None
@property
def failures(self):
"""Return the number of member tests that failed."""
return self._failures
@property
def output(self):
"""Return the test's detailed output."""
return self._output
@property
def successes(self):
"""Return the number of member tests that succeeded."""
return self._successes
def _run_setup_teardown(self, cmd):
"""
Run any setup or teardown command required by this test group.
On success (or if no command is present), return True. On failure,
return False and log the stdout/stderr of the command for debugging.
Arguments:
cmd -- The setup/teardown command(s) to run
"""
try:
run_cmd_list(cmd)
return True
except subprocess.CalledProcessError as exn:
self._output.extend(exn.stderr.splitlines())
self._output.extend(exn.stdout.splitlines())
return False
def run(self, group, valgrind=False):
"""
Run all Test instances that are a part of this regression test.
Additionally, record their stdout and stderr in the self.output property
and the total number of tests that passed and failed.
Arguments:
group -- The name of the group this test is a part of, for logging purposes
"""
self._failures = 0
self._successes = 0
self._output = []
cib_file = None
with environ(self._env):
# If we were given a way to generate a CIB, do that now and add it to the
# environment.
if self._cib_gen is not None:
cib_file = self._cib_gen()
os.environ.update({"CIB_file": cib_file})
# Run the setup hook, if any
if not self._run_setup_teardown(self._setup):
return False
# Run the tests
for t in self.tests:
rc = t.run(group, valgrind=valgrind)
if isinstance(t, TestGroup):
self._successes += t.successes
self._failures += t.failures
else:
if rc:
self._successes += 1
else:
self._failures += 1
self._output.extend(t.output)
if cib_file is not None:
os.environ.pop("CIB_file")
os.unlink(cib_file)
# Run the teardown hook, if any
if self._run_setup_teardown(self._teardown):
return False
return True
class ShadowTestGroup(TestGroup):
"""A group of related tests that require a shadow CIB."""
def __init__(self, tests, **kwargs):
"""
Create a new ShadowTestGroup instance.
Arguments:
tests -- A list of Test instances
Keyword arguments:
create -- If True, create a shadow CIB file (see create_empty).
Otherwise, just create a temp directory and set environment
variables.
create_empty -- If True, the shadow CIB will be empty. Otherwise, the
shadow CIB will be a copy of the currently active
cluster configuration.
validate_with -- If not None, the schema version to validate the CIB
against
"""
self._create = kwargs.pop("create", True)
self._create_empty = kwargs.pop("create_empty", True)
self._validate_with = kwargs.pop("validate_with", None)
TestGroup.__init__(self, tests, **kwargs)
def run(self, group, valgrind=False):
"""
Run all Test instances that are a part of this regression test.
Additionally, record their stdout and stderr in the self.output property
and the total number of tests that passed and failed.
Arguments:
group -- The name of the group this test is a part of, for logging purposes
"""
with TemporaryDirectory(prefix="cts-cli.shadow.") as shadow_dir:
if self._create:
create_shadow_cib(shadow_dir, create_empty=self._create_empty,
validate_with=self._validate_with, valgrind=valgrind)
else:
os.environ["CIB_shadow_dir"] = shadow_dir
os.environ["CIB_shadow"] = SHADOW_NAME
rc = TestGroup.run(self, group, valgrind=valgrind)
if self._create:
cleanup_shadow_dir()
os.environ.pop("CIB_shadow_dir")
os.environ.pop("CIB_shadow")
return rc
class RegressionTest:
"""A base class for testing a single command line tool."""
def __init__(self):
"""Create a new RegressionTest instance."""
self._identical = None
self._successes = None
self._failures = None
self._tempfile = None
self._output = None
@property
def failures(self):
"""Return the number of member tests that failed."""
return self._failures
@property
def identical(self):
"""Return whether the expected output matches the actual output."""
return self._identical
@property
def name(self):
"""
Return the name of this regression test.
This should be a unique, very short, single word name without any special
characters. It must match the name of some word in the default_tests
list because it may be given with the -r option on the command line
to select only certain tests to run.
All subclasses must define this property.
"""
raise NotImplementedError
@property
def results_file(self):
"""Return the location where the regression test results are stored."""
return self._tempfile
@property
def successes(self):
"""Return the number of member tests that succeeded."""
return self._successes
@property
def summary(self):
"""Return a list of all Passed/Failed lines for tests in this regression test."""
retval = []
for line in self._output:
if line.startswith("* Failed") or line.startswith("* Passed"):
retval.append(line)
return retval
@property
def tests(self):
"""A list of Test and TestGroup instances to be run as part of this regression test."""
return []
def cleanup(self):
"""Remove the temp file where test output is stored."""
os.remove(self._tempfile)
self._tempfile = None
def diff(self, verbose=False):
"""
Compare the results of this regression test to the expected results.
Arguments:
verbose -- If True, the diff will be written to stdout
"""
args = ["diff", "-wu", "%s/cli/regression.%s.exp" % (test_home, self.name), self.results_file]
try:
if verbose:
subprocess.run(args, check=True)
else:
subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
self._identical = True
except subprocess.CalledProcessError:
self._identical = False
def process_results(self, verbose):
"""If actual output differs from expected output, print the actual output."""
if self.identical:
self.cleanup()
return
print(" %s" % self.results_file)
if verbose:
print("======================================================")
with open(self.results_file, encoding="utf-8") as f:
print(f.read())
print("======================================================")
def run(self, valgrind=False):
"""
Run all Test and TestGroup instances that are a part of this regression test.
Additionally, record their stdout and stderr in the self.output property
and the total number of tests that passed and failed.
"""
self._failures = 0
self._successes = 0
self._output = []
for t in self.tests:
rc = t.run(self.name, valgrind=valgrind)
if isinstance(t, TestGroup):
self._successes += t.successes
self._failures += t.failures
else:
if rc:
self._successes += 1
else:
self._failures += 1