Skip to content
Open
41 changes: 35 additions & 6 deletions integration-tests/tests/fixtures/integration_test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import logging
import pathlib
import subprocess

import pytest

from tests.utils.classes import ExternalAction
from tests.utils.config import (
IntegrationTestLogs,
IntegrationTestPathConfig,
)
from tests.utils.logging_utils import format_action_failure_msg
from tests.utils.utils import (
get_binary_path,
remove_path,
Expand Down Expand Up @@ -100,7 +101,7 @@ def _download_and_extract_gzip_dataset(
:param keep_leading_dir: Whether to preserve the top-level directory during tarball extraction.
Defaults to False to avoid an unnecessary extra directory level.
:return: An IntegrationTestLogs instance providing metadata for the downloaded logs.
:raises subprocess.CalledProcessError: If `curl`, `tar`, or `chmod` fails.
:raise pytest.fail: If `curl`, `tar`, or `chmod` returns a non-zero exit code.
"""
integration_test_logs = IntegrationTestLogs(
name=name,
Expand Down Expand Up @@ -128,7 +129,14 @@ def _download_and_extract_gzip_dataset(
tarball_url,
]
# fmt: on
subprocess.run(curl_cmd, check=True)
curl_action = ExternalAction(cmd=curl_cmd)
if curl_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg(
f"`curl` failed when downloading `{tarball_url}`.",
curl_action,
)
)

# fmt: off
extract_cmd = [
Expand All @@ -141,13 +149,34 @@ def _download_and_extract_gzip_dataset(
# fmt: on
if not keep_leading_dir:
extract_cmd.extend(["--strip-components", "1"])
subprocess.run(extract_cmd, check=True)
extract_action = ExternalAction(cmd=extract_cmd)
if extract_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg(
f"`tar` failed when extracting `{tarball_path_str}`.",
extract_action,
)
)

# Allow the downloaded and extracted contents to be deletable or overwritable by adding write
# permissions for both the user and the group.
chmod_bin = get_binary_path("chmod")
subprocess.run([chmod_bin, "gu+w", tarball_path_str], check=True)
subprocess.run([chmod_bin, "-R", "gu+w", extract_path_str], check=True)
chmod_tarball_action = ExternalAction(cmd=[chmod_bin, "gu+w", tarball_path_str])
if chmod_tarball_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg(
f"`chmod` failed for `{tarball_path_str}`.",
chmod_tarball_action,
)
)
chmod_extract_action = ExternalAction(cmd=[chmod_bin, "-R", "gu+w", extract_path_str])
if chmod_extract_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg(
f"`chmod` failed for `{extract_path_str}`.",
chmod_extract_action,
)
)

logger.info("Downloaded and extracted uncompressed logs for dataset `%s`.", name)
request.config.cache.set(name, True)
Expand Down
24 changes: 18 additions & 6 deletions integration-tests/tests/test_identity_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@

import pytest

from tests.utils.classes import ExternalAction
from tests.utils.config import (
ClpCorePathConfig,
CompressionTestPathConfig,
IntegrationTestLogs,
IntegrationTestPathConfig,
)
from tests.utils.subprocess_utils import run_and_log_subprocess
from tests.utils.utils import (
from tests.utils.fs_validation import (
is_dir_tree_content_equal,
is_json_file_structurally_equal,
)
from tests.utils.logging_utils import format_action_failure_msg

pytestmark = pytest.mark.core

Expand Down Expand Up @@ -73,10 +74,14 @@ def test_clp_identity_transform(
src_path,
]
# fmt: on
run_and_log_subprocess(compression_cmd)
compression_action = ExternalAction(cmd=compression_cmd)
if compression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp` compression failed.", compression_action))

decompression_cmd = [bin_path, "x", compression_path, decompression_path]
run_and_log_subprocess(decompression_cmd)
decompression_action = ExternalAction(cmd=decompression_cmd)
if decompression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp` decompression failed.", decompression_action))

input_path = test_paths.logs_source_dir
output_path = test_paths.decompression_dir
Expand Down Expand Up @@ -148,5 +153,12 @@ def _clp_s_compress_and_decompress(
src_path = str(test_paths.logs_source_dir)
compression_path = str(test_paths.compression_dir)
decompression_path = str(test_paths.decompression_dir)
run_and_log_subprocess([bin_path, "c", compression_path, src_path])
run_and_log_subprocess([bin_path, "x", compression_path, decompression_path])
compression_action = ExternalAction(cmd=[bin_path, "c", compression_path, src_path])
if compression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp-s` compression failed.", compression_action))

decompression_action = ExternalAction(cmd=[bin_path, "x", compression_path, decompression_path])
if decompression_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg("`clp-s` decompression failed.", decompression_action)
)
22 changes: 16 additions & 6 deletions integration-tests/tests/test_log_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

import pytest

from tests.utils.classes import ExternalAction
from tests.utils.config import (
ClpCorePathConfig,
ConversionTestPathConfig,
IntegrationTestLogs,
IntegrationTestPathConfig,
)
from tests.utils.subprocess_utils import run_and_log_subprocess
from tests.utils.logging_utils import format_action_failure_msg

# Matching `LogSerializer::cTimestampKey`.
LOG_CONVERTER_OUTPUT_TIMESTAMP_KEY = "timestamp"
Expand Down Expand Up @@ -69,9 +70,14 @@ def _convert_and_compress(
src_path = str(test_paths.logs_source_dir)
conversion_path = str(test_paths.conversion_dir)
compression_path = str(test_paths.compression_dir)
run_and_log_subprocess([log_converter_bin_path, src_path, "--output-dir", conversion_path])
run_and_log_subprocess(
[
conversion_action = ExternalAction(
cmd=[log_converter_bin_path, src_path, "--output-dir", conversion_path]
)
if conversion_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`log-converter` failed.", conversion_action))

compression_action = ExternalAction(
cmd=[
clp_s_bin_path,
"c",
compression_path,
Expand All @@ -80,12 +86,16 @@ def _convert_and_compress(
LOG_CONVERTER_OUTPUT_TIMESTAMP_KEY,
]
)
if compression_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp-s` compression failed.", compression_action))

if test_paths.num_log_events is None:
return

output = run_and_log_subprocess([clp_s_bin_path, "s", compression_path, "timestamp > 0"])
lines = output.stdout.splitlines() if output.stdout else []
search_action = ExternalAction(cmd=[clp_s_bin_path, "s", compression_path, "timestamp > 0"])
if search_action.completed_proc.returncode != 0:
pytest.fail(format_action_failure_msg("`clp-s` search failed.", search_action))
lines = search_action.completed_proc.stdout.splitlines()
if len(lines) != test_paths.num_log_events:
pytest.fail(
f"Expected {test_paths.num_log_events} log events after conversion, "
Expand Down
14 changes: 11 additions & 3 deletions integration-tests/tests/utils/asserting_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from clp_py_utils.clp_config import ClpConfig
from pydantic import ValidationError

from tests.utils.classes import ExternalAction
from tests.utils.clp_mode_utils import compare_mode_signatures
from tests.utils.config import PackageInstance, PackageTestConfig
from tests.utils.docker_utils import list_running_services_in_compose_project
from tests.utils.subprocess_utils import run_and_log_subprocess
from tests.utils.fs_validation import is_dir_tree_content_equal
from tests.utils.logging_utils import format_action_failure_msg
from tests.utils.utils import (
clear_directory,
is_dir_tree_content_equal,
load_yaml_to_dict,
)

Expand Down Expand Up @@ -133,7 +134,14 @@ def verify_package_compression(
]

# Run decompression command and assert that it succeeds.
run_and_log_subprocess(decompress_cmd)
decompress_action = ExternalAction(cmd=decompress_cmd)
if decompress_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg(
f"Decompression script `{decompress_script_path.name}` failed.",
decompress_action,
)
)

# Verify content equality.
output_path = decompression_dir / path_to_original_dataset.relative_to(
Expand Down
23 changes: 15 additions & 8 deletions integration-tests/tests/utils/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,24 @@ def logs_path(self) -> Path:
return self.dataset_root_dir / self.metadata.logs_subdir


class CmdArgs(BaseModel, ABC):
"""Abstract base class for all CLP command argument models."""

@abstractmethod
def to_cmd(self) -> list[str]:
""":return: list of command arguments constructed from this instance's data members."""


@dataclass
class ExternalAction:
"""Metadata for an external action executed during an integration test."""

#: Command to pass to `subprocess.run()`.
cmd: list[str]

#: Optional structured arguments for verification purposes. Not used by `ExternalAction` itself.
args: CmdArgs | None = None

#: The completed process returned from `subprocess.run()`.
completed_proc: subprocess.CompletedProcess[str] = field(init=False)

Expand All @@ -151,6 +162,10 @@ def __post_init__(self) -> None:
self.completed_proc = self._run_subprocess()
self._log_action_summary_to_file()

def get_output(self) -> str:
""":return: The combined stdout and stderr from the completed subprocess."""
return self.completed_proc.stdout + self.completed_proc.stderr

def _run_subprocess(self) -> subprocess.CompletedProcess[str]:
"""
Passes `self.cmd` to `subprocess.run()` with preset parameters:
Expand Down Expand Up @@ -220,11 +235,3 @@ def _log_action_summary_to_file(self) -> None:
f"Subprocess returned. stdout and stderr written to log file: '{self.log_file_path}'"
)
logger.info(log_msg)


class CmdArgs(BaseModel, ABC):
"""Abstract base class for all CLP command argument models."""

@abstractmethod
def to_cmd(self) -> list[str]:
""":return: list of command arguments constructed from this instance's data members."""
16 changes: 13 additions & 3 deletions integration-tests/tests/utils/docker_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Provide utility functions related to the use of Docker during integration tests."""

import subprocess
import pytest

from tests.utils.classes import ExternalAction
from tests.utils.logging_utils import format_action_failure_msg
from tests.utils.utils import get_binary_path


Expand All @@ -11,6 +13,7 @@ def list_running_services_in_compose_project(project_name: str) -> list[str]:

:param project_name:
:return: List of the running services that belong to the specified Docker Compose project.
:raise pytest.fail: if `docker compose ps` returns a non-zero exit code.
"""
docker_bin = get_binary_path("docker")

Expand All @@ -24,10 +27,17 @@ def list_running_services_in_compose_project(project_name: str) -> list[str]:
]
# fmt: on

compose_ps_proc = subprocess.run(compose_ps_cmd, stdout=subprocess.PIPE, text=True, check=True)
compose_ps_action = ExternalAction(cmd=compose_ps_cmd)
if compose_ps_action.completed_proc.returncode != 0:
pytest.fail(
format_action_failure_msg(
f"`docker compose ps` failed for project `{project_name}`.",
compose_ps_action,
)
)

service_names: list[str] = []
for line in (compose_ps_proc.stdout or "").splitlines():
for line in compose_ps_action.completed_proc.stdout.splitlines():
service_name_candidate = line.strip()
if service_name_candidate:
service_names.append(service_name_candidate)
Expand Down
64 changes: 64 additions & 0 deletions integration-tests/tests/utils/fs_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""File structure validators."""

from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import IO

from tests.utils.classes import ExternalAction
from tests.utils.utils import get_binary_path


def is_dir_tree_content_equal(path1: Path, path2: Path) -> bool:
"""
:param path1:
:param path2:
:return: Whether two files/directories hold the exactly same content.
:raise: RuntimeError if the diff command fails due to execution errors.
"""
cmd = [get_binary_path("diff"), "--brief", "--recursive", str(path1), str(path2)]
diff_action = ExternalAction(cmd=cmd)
rc = diff_action.completed_proc.returncode
if rc == 0:
return True
if rc == 1:
return False
err_msg = f"Command failed {' '.join(cmd)}: {diff_action.completed_proc.stderr}"
raise RuntimeError(err_msg)


def is_json_file_structurally_equal(json_fp1: Path, json_fp2: Path) -> bool:
"""
:param json_fp1:
:param json_fp2:
:return: Whether two JSON files are structurally equal after sorting has been applied.
"""
with (
_sort_json_keys_and_rows(json_fp1) as temp_file_1,
_sort_json_keys_and_rows(json_fp2) as temp_file_2,
):
return is_dir_tree_content_equal(Path(temp_file_1.name), Path(temp_file_2.name))


def _sort_json_keys_and_rows(json_fp: Path) -> IO[str]:
"""
Normalize a JSON file to a stable, deterministically ordered form for comparison.

:param json_fp:
:return: A named temporary file (delete on close) that contains the sorted JSON content.
:raise: RuntimeError if jq is missing or fails due to execution errors.
"""
jq_action = ExternalAction(
cmd=[get_binary_path("jq"), "--sort-keys", "--compact-output", ".", str(json_fp)],
)
jq_rc = jq_action.completed_proc.returncode
if jq_rc != 0:
err_msg = f"jq failed with exit code {jq_rc} for {json_fp}"
Comment thread
quinntaylormitchell marked this conversation as resolved.
Outdated
raise RuntimeError(err_msg)

sorted_fp = NamedTemporaryFile(mode="w+", delete=True) # noqa: SIM115
sorted_lines = sorted(jq_action.completed_proc.stdout.splitlines())
for line in sorted_lines:
sorted_fp.write(f"{line}\n")
sorted_fp.flush()
sorted_fp.seek(0)
Comment thread
quinntaylormitchell marked this conversation as resolved.
Outdated
return sorted_fp
Loading
Loading