-
Notifications
You must be signed in to change notification settings - Fork 91
Expand file tree
/
Copy pathtest_identity_transformation.py
More file actions
164 lines (142 loc) · 5.9 KB
/
Copy pathtest_identity_transformation.py
File metadata and controls
164 lines (142 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Integration tests verifying that CLP core compression binaries perform lossless round-trip
compression and decompression.
"""
import pytest
from tests.utils.classes import ExternalAction
from tests.utils.config import (
ClpCorePathConfig,
CompressionTestPathConfig,
IntegrationTestLogs,
IntegrationTestPathConfig,
)
from tests.utils.fs_validation import (
is_dir_tree_content_equal,
is_json_file_structurally_equal,
)
from tests.utils.logging_utils import format_action_failure_msg
pytestmark = pytest.mark.core
text_datasets = pytest.mark.parametrize(
"test_logs_fixture",
[
"hive_24hr",
],
)
json_datasets = pytest.mark.parametrize(
"test_logs_fixture",
[
"postgresql",
],
)
@pytest.mark.clp
@text_datasets
def test_clp_identity_transform(
request: pytest.FixtureRequest,
clp_core_path_config: ClpCorePathConfig,
integration_test_path_config: IntegrationTestPathConfig,
test_logs_fixture: str,
) -> None:
"""
Validate that compression and decompression by the core binary `clp` run successfully and are
lossless.
:param request:
:param clp_core_path_config:
:param integration_test_path_config:
:param test_logs_fixture:
"""
integration_test_logs: IntegrationTestLogs = request.getfixturevalue(test_logs_fixture)
test_paths = CompressionTestPathConfig(
test_name=f"clp-{integration_test_logs.name}",
logs_source_dir=integration_test_logs.extraction_dir,
integration_test_path_config=integration_test_path_config,
)
test_paths.clear_test_outputs()
bin_path = str(clp_core_path_config.clp_binary_path)
src_path = str(test_paths.logs_source_dir)
compression_path = str(test_paths.compression_dir)
decompression_path = str(test_paths.decompression_dir)
# fmt: off
compression_cmd = [
bin_path,
"c",
"--progress",
"--remove-path-prefix", src_path,
compression_path,
src_path,
]
# fmt: on
compression_action = ExternalAction(cmd=compression_cmd)
if compression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp` compression failed.", compression_action))
decompression_cmd = [bin_path, "x", compression_path, decompression_path]
decompression_action = ExternalAction(cmd=decompression_cmd)
if decompression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp` decompression failed.", decompression_action))
input_path = test_paths.logs_source_dir
output_path = test_paths.decompression_dir
assert is_dir_tree_content_equal(
input_path,
output_path,
), f"Mismatch between clp input {input_path} and output {output_path}."
test_paths.clear_test_outputs()
@pytest.mark.clp_s
@json_datasets
def test_clp_s_identity_transform(
request: pytest.FixtureRequest,
clp_core_path_config: ClpCorePathConfig,
integration_test_path_config: IntegrationTestPathConfig,
test_logs_fixture: str,
) -> None:
"""
Validate that compression and decompression by the core binary `clp-s` run successfully and are
lossless.
:param request:
:param clp_core_path_config:
:param integration_test_path_config:
:param test_logs_fixture:
"""
integration_test_logs: IntegrationTestLogs = request.getfixturevalue(test_logs_fixture)
test_logs_name = integration_test_logs.name
test_paths = CompressionTestPathConfig(
test_name=f"clp-s-{test_logs_name}",
logs_source_dir=integration_test_logs.extraction_dir,
integration_test_path_config=integration_test_path_config,
)
_clp_s_compress_and_decompress(clp_core_path_config, test_paths)
# Recompress the decompressed output that's consolidated into a single json file, and decompress
# it again to verify consistency. The compression input of the second iteration points to the
# decompression output of the first.
# TODO: Remove this check once we can directly compare decompressed logs (which would preserve
# the directory structure and row/key order) with the original downloaded logs.
# See also: https://docs.yscope.com/clp/main/user-guide/core-clp-s.html#current-limitations
consolidated_json_test_paths = CompressionTestPathConfig(
test_name=f"clp-s-{test_logs_name}-consolidated-json",
logs_source_dir=test_paths.decompression_dir,
integration_test_path_config=integration_test_path_config,
)
_clp_s_compress_and_decompress(clp_core_path_config, consolidated_json_test_paths)
_consolidated_json_file_name = "original"
input_path = consolidated_json_test_paths.logs_source_dir / _consolidated_json_file_name
output_path = consolidated_json_test_paths.decompression_dir / _consolidated_json_file_name
assert is_json_file_structurally_equal(input_path, output_path), (
f"Mismatch between clp-s input {input_path} and output {output_path}."
)
test_paths.clear_test_outputs()
consolidated_json_test_paths.clear_test_outputs()
def _clp_s_compress_and_decompress(
clp_core_path_config: ClpCorePathConfig,
test_paths: CompressionTestPathConfig,
) -> None:
test_paths.clear_test_outputs()
bin_path = str(clp_core_path_config.clp_s_binary_path)
src_path = str(test_paths.logs_source_dir)
compression_path = str(test_paths.compression_dir)
decompression_path = str(test_paths.decompression_dir)
compression_action = ExternalAction(cmd=[bin_path, "c", compression_path, src_path])
if compression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp-s` compression failed.", compression_action))
decompression_action = ExternalAction(cmd=[bin_path, "x", compression_path, decompression_path])
if decompression_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg("`clp-s` decompression failed.", decompression_action)
)