diff --git a/.flake8 b/.flake8 index 29cb32392c..fe164e734e 100644 --- a/.flake8 +++ b/.flake8 @@ -12,3 +12,5 @@ exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cd # F401 - Unused imports -- this is the only way to have a file-wide rule exception per-file-ignores = + # We utilize * imports on test files here to dynamically collect test cases + conftest.py: F401,F403 diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 5457fb4702..d941901ac2 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -192,11 +192,12 @@ public static class DocParams implements TransformerParams { public String getTransformerConfigParameterArgPrefix() { return DOC_CONFIG_PARAMETER_ARG_PREFIX; } - private static final String DOC_CONFIG_PARAMETER_ARG_PREFIX = "doc-"; + private static final String DOC_CONFIG_PARAMETER_ARG_PREFIX = "doc"; @Parameter( required = false, - names = "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "transformer-config-base64", + names = { "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "-transformer-config-base64", + "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "TransformerConfigBase64" }, arity = 1, description = "Configuration of doc transformers. The same contents as --doc-transformer-config but " + "Base64 encoded so that the configuration is easier to pass as a command line parameter.") @@ -204,7 +205,8 @@ public String getTransformerConfigParameterArgPrefix() { @Parameter( required = false, - names = "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "transformer-config", + names = { "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "-transformer-config", + "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "TransformerConfig" }, arity = 1, description = "Configuration of doc transformers. Either as a string that identifies the " + "transformer that should be run (with default settings) or as json to specify options " @@ -215,7 +217,8 @@ public String getTransformerConfigParameterArgPrefix() { @Parameter( required = false, - names = "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "transformer-config-file", + names = { "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "-transformer-config-file", + "--" + DOC_CONFIG_PARAMETER_ARG_PREFIX + "TransformerConfigFile" }, arity = 1, description = "Path to the JSON configuration file of doc transformers.") private String transformerConfigFile; diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py index 1ef98323a1..0569b6c9b7 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py @@ -59,8 +59,11 @@ def run_test_benchmarks(cluster: Cluster): # As a default we exclude system indices and searchguard indices def clear_indices(cluster: Cluster): clear_indices_path = "/*,-.*,-searchguard*,-sg7*,.migrations_working_state" - r = cluster.call_api(clear_indices_path, method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) - return r.content + try: + r = cluster.call_api(clear_indices_path, method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) + return r.content + except Exception as e: + return f"Error encountered when clearing indices: {e}" def clear_cluster(cluster: Cluster): diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index 3cb34d0b4c..e49264c075 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -89,6 +89,7 @@ def __init__(self, config: Dict, client_options: Optional[ClientOptions] = None) raise ValueError("Invalid config file for cluster", v.errors) self.endpoint = config["endpoint"] + self.version = config.get("version", None) self.allow_insecure = config.get("allow_insecure", False) if self.endpoint.startswith( "https") else config.get("allow_insecure", True) if 'no_auth' in config: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md index ab7b9482d2..f18e74b7fd 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md @@ -1,64 +1,47 @@ -## E2E Integration Testing -Developers can run a test script which will verify the end-to-end Docker Solution. - -### Compatibility -* Python >= 3.7 - -### Pre-requisites - -* Have all containers from Docker solution running. - -To run the test script, users must navigate to this directory, -install the required packages and then run the script: +## Migration Assistant E2E Integration Testing +This library contains E2E integration tests to execute against a Migration Assistant deployment +### Installing dependencies +To install the required dependencies ``` -pip install -r requirements.txt -pytest tests.py +pipenv install ``` -### Running in Docker setup +### Creating an E2E test case +Test cases created within the `test_cases` directory are performed by the `ma_workflow_test.py` test structure. The link between these two are created in the pytest +configuration `conftest.py` file. Any test class created in an existing file within the `test_cases` directory will automatically be added to the list of test cases +to attempt when the `ma_workflow_test.py` file is executed with pytest. The `conftest.py` file achieves this by collecting all test cases initially, and then filters +out any test cases that don't apply to the given source and target clusters versions, as well as on filters that a user can provide such as `--test_ids`. Once the final +list is determined, the `conftest.py` file will dynamically create a parameterized tag on the `ma_workflow_test.py` test, resulting in multiple executions of this test +based on the final list of test cases to be executed. If a new test file is created within the `test_cases` directory it should be imported into the `conftest.py` file +like other test files. -From the root of this repository bring up the Docker environment -```shell -./gradlew -p TrafficCapture dockerSolution:ComposeUp -x test -x spotlessCheck --info --stacktrace -``` -The Docker compose file being used can be found [here](../../../docker-compose.yml) -* The integ_test `lib` directory can be directly mounted as a volume on the migration console container to spe +### Running tests in K8s setup + +Follow the quickstart guide [here](../../../../../../../../deployment/k8s/quickstart.md) to set up a Migration Assistant environment with source and +target test clusters -To run one of the integration test suites a command like below can be used: +Access the migration console: ```shell -docker exec $(docker ps --filter "name=migration-console" -q) pipenv run pytest /root/lib/integ_test/integ_test/full_tests.py --unique_id="testindex" -s +kubectl exec --stdin --tty $(kubectl get pods -l app=ma-migration-console --sort-by=.metadata.creationTimestamp -o jsonpath="{.items[-1].metadata.name}") -- /bin/bash ``` -To teardown, execute the following command at the root of this repository +Perform pytest: ```shell -./gradlew -p TrafficCapture dockerSolution:ComposeDown +pytest ~/lib/integ_test/integ_test/ma_workflow_test.py ``` -#### Notes - -##### Ports Setup -The test script, by default, uses the ports assigned to the containers in this -[docker-compose file](../../../docker-compose.yml), so if the Docker solution in -its current setup started with no issues, then the test script will run as is. If for any reason -the user changed the ports in that file, they must also either, provide the following parameters variables: -`proxy_endpoint`, `source_endpoint`, and `target_endpoint` respectively, or update the default value - for them in [conftest.py](integ_test/conftest.py). +To tear-down resources, follow the end of the quickstart guide [here](../../../../../../../../deployment/k8s/quickstart.md#cleanup) -#### Script Parameters +### Pytest parameters -This script accepts various parameters to customize its behavior. Below is a list of available parameters along with their default values and acceptable choices: +Pytest has been configured to accepts various parameters to customize its behavior. Below is a list of available parameters along with their default values and acceptable choices: - `--unique_id`: The unique identifier to apply to created indices/documents. - Default: Generated uuid - `--config_file_path`: The services yaml config file path for the console library. - Default: `/config/migration_services.yaml` - - -#### Clean Up -The test script is implemented with a setup and teardown functions that are ran after -each and every test where additions made to the endpoints are deleted, *mostly* cleaning up after themselves, however, -as we log all operations going through the proxy (which is capturing the traffic), those are only being -deleted after the Docker solution is shut down. +- `--test_ids`: Specify test IDs like `'0001,0003'` to filter tests to execute. + - Default: Attempt to execute all tests diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/__init__.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py index b4f3f4920c..6a7b843d44 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py @@ -8,10 +8,11 @@ from console_link.models.command_result import CommandResult from console_link.models.metadata import Metadata from console_link.cli import Context -from common_operations import (get_document, create_document, create_index, check_doc_counts_match, - EXPECTED_BENCHMARK_DOCS) +from .common_utils import EXPECTED_BENCHMARK_DOCS +from .default_operations import DefaultOperationsLibrary logger = logging.getLogger(__name__) +ops = DefaultOperationsLibrary() def preload_data(source_cluster: Cluster, target_cluster: Cluster): @@ -29,9 +30,9 @@ def preload_data(source_cluster: Cluster, target_cluster: Cluster): # test_backfill_0001 index_name = f"test_backfill_0001_{pytest.unique_id}" doc_id = "backfill_0001_doc" - create_index(cluster=source_cluster, index_name=index_name) - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, - expected_status_code=HTTPStatus.CREATED) + ops.create_index(cluster=source_cluster, index_name=index_name) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, + expected_status_code=HTTPStatus.CREATED) # test_backfill_0002 run_test_benchmarks(source_cluster) @@ -85,23 +86,23 @@ def test_backfill_0001_single_document(self): target_cluster: Cluster = pytest.console_env.target_cluster # Assert preloaded document exists - get_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, test_case=self) + ops.get_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, test_case=self) # TODO Determine when backfill is completed - get_document(cluster=target_cluster, index_name=index_name, doc_id=doc_id, max_attempts=30, delay=30.0, - test_case=self) + ops.get_document(cluster=target_cluster, index_name=index_name, doc_id=doc_id, max_attempts=30, delay=30.0, + test_case=self) def test_backfill_0002_sample_benchmarks(self): source_cluster: Cluster = pytest.console_env.source_cluster target_cluster: Cluster = pytest.console_env.target_cluster # Confirm documents on source - check_doc_counts_match(cluster=source_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, - test_case=self) + ops.check_doc_counts_match(cluster=source_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, + test_case=self) # TODO Determine when backfill is completed # Confirm documents on target after backfill - check_doc_counts_match(cluster=target_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, - max_attempts=30, delay=30.0, test_case=self) + ops.check_doc_counts_match(cluster=target_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, + max_attempts=30, delay=30.0, test_case=self) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/cluster_version.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/cluster_version.py new file mode 100644 index 0000000000..7870391ded --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/cluster_version.py @@ -0,0 +1,40 @@ +import re + + +class ClusterVersion: + pattern = re.compile(r"^(ES|OS)_([0-9]+)\.([0-9]+|x|X)$") + + def __init__(self, version_str: str): + match = self.pattern.match(version_str) + if not match: + raise ValueError(f"Invalid version format: {version_str}. Cluster versions must be in format ES_x.y or " + f"OS_x.y, where y is a number or 'x' for any minor version.") + + self.cluster_type = match.group(1) + self.major_version = int(match.group(2)) + + minor_version = match.group(3) + if minor_version.lower() == 'x': + self.minor_version = 'x' + else: + self.minor_version = int(minor_version) + + def __str__(self): + return f"{self.cluster_type}_{self.major_version}.{self.minor_version}" + + +ElasticsearchV5_X = ClusterVersion("ES_5.x") +ElasticsearchV6_X = ClusterVersion("ES_6.x") +ElasticsearchV7_X = ClusterVersion("ES_7.x") +OpensearchV1_X = ClusterVersion("OS_1.x") +OpensearchV2_X = ClusterVersion("OS_2.x") + + +def is_incoming_version_supported(limiting_version: ClusterVersion, incoming_version: ClusterVersion): + if (limiting_version.cluster_type == incoming_version.cluster_type and + limiting_version.major_version == incoming_version.major_version): + if isinstance(limiting_version.minor_version, str): + return True + else: + return limiting_version.minor_version == incoming_version.minor_version + return False diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py deleted file mode 100644 index b897c05347..0000000000 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py +++ /dev/null @@ -1,248 +0,0 @@ -import datetime -import random -import string -import json -import time -import logging -from requests.exceptions import ConnectionError, SSLError -from typing import Dict, List -from unittest import TestCase -from console_link.middleware.clusters import call_api -from console_link.models.cluster import HttpMethod, Cluster -from console_link.models.replayer_base import Replayer, ReplayStatus - -logger = logging.getLogger(__name__) - -DEFAULT_INDEX_IGNORE_LIST = ["test_", ".", "searchguard", "sg7", "security-auditlog", "reindexed-logs"] - -EXPECTED_BENCHMARK_DOCS = { - "geonames": {"count": 1000}, - "logs-221998": {"count": 1000}, - "logs-211998": {"count": 1000}, - "logs-231998": {"count": 1000}, - "logs-241998": {"count": 1000}, - "logs-181998": {"count": 1000}, - "logs-201998": {"count": 1000}, - "logs-191998": {"count": 1000}, - "sonested": {"count": 1000}, - "nyc_taxis": {"count": 1000} -} - - -class ClusterAPIRequestError(Exception): - pass - - -class ReplayerNotActiveError(Exception): - pass - - -def execute_api_call(cluster: Cluster, path: str, method=HttpMethod.GET, data=None, headers=None, timeout=None, - session=None, expected_status_code: int = 200, max_attempts: int = 10, delay: float = 2.5, - test_case=None): - api_exception = None - last_received_status = None - last_response = None - for _ in range(1, max_attempts + 1): - try: - response = call_api(cluster=cluster, path=path, method=method, data=data, headers=headers, timeout=timeout, - session=session, raise_error=False) - last_response = response - if response.status_code == expected_status_code: - break - else: - # Ensure that our final captured exception is accurate - api_exception = None - last_received_status = response.status_code - logger.debug(f"Status code returned: {response.status_code} did not" - f" match the expected status code: {expected_status_code}." - f" Trying again in {delay} seconds.") - except (ConnectionError, SSLError) as e: - last_response = None - api_exception = e - logger.debug(f"Received exception: {e}. Unable to connect to server. Please check all containers are up" - f" and ports are setup properly. Trying again in {delay} seconds.") - time.sleep(delay) - - if api_exception: - error_message = f"Unable to connect to server. Underlying exception: {api_exception}" - raise ClusterAPIRequestError(error_message) - else: - error_message = (f"Failed to receive desired status code of {expected_status_code} and instead " - f"received {last_received_status} for request: {method.name} {path}") - if test_case is not None: - test_case.assertEqual(expected_status_code, last_response.status_code, error_message) - elif expected_status_code != last_response.status_code: - raise ClusterAPIRequestError(error_message) - return last_response - - -def create_index(index_name: str, cluster: Cluster, **kwargs): - headers = {'Content-Type': 'application/json'} - return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}", - headers=headers, **kwargs) - - -def get_index(index_name: str, cluster: Cluster, **kwargs): - return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}", - **kwargs) - - -def delete_index(index_name: str, cluster: Cluster, **kwargs): - return execute_api_call(cluster=cluster, method=HttpMethod.DELETE, path=f"/{index_name}", - **kwargs) - - -def create_document(index_name: str, doc_id: str, cluster: Cluster, data: dict = None, **kwargs): - if data is None: - data = { - 'title': 'Test Document', - 'content': 'This is a sample document for testing OpenSearch.' - } - headers = {'Content-Type': 'application/json'} - return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}/_doc/{doc_id}", - data=json.dumps(data), headers=headers, **kwargs) - - -def get_document(index_name: str, doc_id: str, cluster: Cluster, **kwargs): - return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}/_doc/{doc_id}", - **kwargs) - - -def delete_document(index_name: str, doc_id: str, cluster: Cluster, **kwargs): - return execute_api_call(cluster=cluster, method=HttpMethod.DELETE, path=f"/{index_name}/_doc/{doc_id}", - **kwargs) - - -def index_matches_ignored_index(index_name: str, index_prefix_ignore_list: List[str]): - for prefix in index_prefix_ignore_list: - if index_name.startswith(prefix): - return True - return False - - -def get_all_index_details(cluster: Cluster, index_prefix_ignore_list=None, **kwargs) -> Dict[str, Dict[str, str]]: - all_index_details = execute_api_call(cluster=cluster, path="/_cat/indices?format=json", **kwargs).json() - index_dict = {} - for index_details in all_index_details: - # While cat/indices returns a doc count metric, the underlying implementation bleeds through details, only - # capture the index name and make a separate api call for the doc count - index_name = index_details['index'] - valid_index = not index_matches_ignored_index(index_name, - index_prefix_ignore_list=index_prefix_ignore_list) - if index_prefix_ignore_list is None or valid_index: - # "To get an accurate count of Elasticsearch documents, use the cat count or count APIs." - # See https://www.elastic.co/guide/en/elasticsearch/reference/7.10/cat-indices.html - - count_response = execute_api_call(cluster=cluster, path=f"/{index_name}/_count?format=json", **kwargs) - index_dict[index_name] = count_response.json() - index_dict[index_name]['index'] = index_name - return index_dict - - -def check_doc_counts_match(cluster: Cluster, - expected_index_details: Dict[str, Dict[str, str]], - test_case: TestCase, - index_prefix_ignore_list=None, - max_attempts: int = 5, - delay: float = 2.5): - if index_prefix_ignore_list is None: - index_prefix_ignore_list = DEFAULT_INDEX_IGNORE_LIST - - error_message = "" - for attempt in range(1, max_attempts + 1): - # Refresh documents - execute_api_call(cluster=cluster, path="/_refresh") - actual_index_details = get_all_index_details(cluster=cluster, index_prefix_ignore_list=index_prefix_ignore_list) - logger.debug(f"Received actual indices: {actual_index_details}") - if actual_index_details.keys() != expected_index_details.keys(): - error_message = (f"Indices are different: \n Expected: {expected_index_details.keys()} \n " - f"Actual: {actual_index_details.keys()}") - logger.debug(f"Error on attempt {attempt}: {error_message}") - else: - for index_details in actual_index_details.values(): - index_name = index_details['index'] - actual_doc_count = index_details['count'] - expected_doc_count = expected_index_details[index_name]['count'] - if actual_doc_count != expected_doc_count: - error_message = (f"Index {index_name} has {actual_doc_count} documents but {expected_doc_count} " - f"were expected") - logger.debug(f"Error on attempt {attempt}: {error_message}") - break - if not error_message: - return True - if attempt != max_attempts: - error_message = "" - time.sleep(delay) - test_case.fail(error_message) - - -def check_doc_match(test_case: TestCase, index_name: str, doc_id: str, source_cluster: Cluster, - target_cluster: Cluster): - source_response = get_document(index_name=index_name, doc_id=doc_id, cluster=source_cluster) - target_response = get_document(index_name=index_name, doc_id=doc_id, cluster=target_cluster) - - source_document = source_response.json() - source_content = source_document['_source'] - target_document = target_response.json() - target_content = target_document['_source'] - test_case.assertEqual(source_content, target_content) - - -def generate_large_doc(size_mib): - # Calculate number of characters needed (1 char = 1 byte) - num_chars = size_mib * 1024 * 1024 - - # Generate random string of the desired length - large_string = ''.join(random.choices(string.ascii_letters + string.digits, k=num_chars)) - - return { - "timestamp": datetime.datetime.now().isoformat(), - "large_field": large_string - } - - -def wait_for_running_replayer(replayer: Replayer, - test_case: TestCase = None, - max_attempts: int = 25, - delay: float = 3.0): - error_message = "" - for attempt in range(1, max_attempts + 1): - cmd_result = replayer.get_status() - status = cmd_result.value[0] - logger.debug(f"Received status {status} for Replayer on attempt {attempt}") - if status == ReplayStatus.RUNNING: - return - error_message = (f"Received replayer status of {status} but expecting to receive: {ReplayStatus.RUNNING} " - f"after {max_attempts} attempts") - if attempt != max_attempts: - error_message = "" - time.sleep(delay) - if test_case: - test_case.fail(error_message) - else: - raise ReplayerNotActiveError(error_message) - - -def convert_transformations_to_str(transform_list: List[Dict]) -> str: - return json.dumps(transform_list) - - -def get_index_name_transformation(existing_index_name: str, target_index_name: str, - source_major_version: int, source_minor_version: int) -> Dict: - return { - "TypeMappingSanitizationTransformerProvider": { - "staticMappings": { - f"{existing_index_name}": { - "_doc": f"{target_index_name}" - } - }, - "sourceProperties": { - "version": { - "major": source_major_version, - "minor": source_minor_version - } - } - - } - } diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_utils.py new file mode 100644 index 0000000000..0d59015bef --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_utils.py @@ -0,0 +1,94 @@ +import time +import logging +from requests.exceptions import ConnectionError, SSLError +from unittest import TestCase +from console_link.middleware.clusters import call_api +from console_link.models.cluster import HttpMethod, Cluster +from console_link.models.replayer_base import Replayer, ReplayStatus + +logger = logging.getLogger(__name__) + +DEFAULT_INDEX_IGNORE_LIST = ["test_", ".", "searchguard", "sg7", "security-auditlog", "reindexed-logs"] + +EXPECTED_BENCHMARK_DOCS = { + "geonames": {"count": 1000}, + "logs-221998": {"count": 1000}, + "logs-211998": {"count": 1000}, + "logs-231998": {"count": 1000}, + "logs-241998": {"count": 1000}, + "logs-181998": {"count": 1000}, + "logs-201998": {"count": 1000}, + "logs-191998": {"count": 1000}, + "sonested": {"count": 1000}, + "nyc_taxis": {"count": 1000} +} + + +class ClusterAPIRequestError(Exception): + pass + + +class ReplayerNotActiveError(Exception): + pass + + +def execute_api_call(cluster: Cluster, path: str, method=HttpMethod.GET, data=None, headers=None, timeout=None, + session=None, expected_status_code: int = 200, max_attempts: int = 10, delay: float = 2.5, + test_case=None): + api_exception = None + last_received_status = None + last_response = None + for _ in range(1, max_attempts + 1): + try: + response = call_api(cluster=cluster, path=path, method=method, data=data, headers=headers, timeout=timeout, + session=session, raise_error=False) + last_response = response + if response.status_code == expected_status_code: + break + else: + # Ensure that our final captured exception is accurate + api_exception = None + last_received_status = response.status_code + logger.debug(f"Status code returned: {response.status_code} did not" + f" match the expected status code: {expected_status_code}." + f" Trying again in {delay} seconds.") + except (ConnectionError, SSLError) as e: + last_response = None + api_exception = e + logger.debug(f"Received exception: {e}. Unable to connect to server. Please check all containers are up" + f" and ports are setup properly. Trying again in {delay} seconds.") + time.sleep(delay) + + if api_exception: + error_message = f"Unable to connect to server. Underlying exception: {api_exception}" + raise ClusterAPIRequestError(error_message) + else: + error_message = (f"Failed to receive desired status code of {expected_status_code} and instead " + f"received {last_received_status} for request: {method.name} {path}") + if test_case is not None: + test_case.assertEqual(expected_status_code, last_response.status_code, error_message) + elif expected_status_code != last_response.status_code: + raise ClusterAPIRequestError(error_message) + return last_response + + +def wait_for_running_replayer(replayer: Replayer, + test_case: TestCase = None, + max_attempts: int = 25, + delay: float = 3.0): + error_message = "" + for attempt in range(1, max_attempts + 1): + cmd_result = replayer.get_status() + status = cmd_result.value[0] + logger.debug(f"Received status {status} for Replayer on attempt {attempt}") + if status == ReplayStatus.RUNNING: + return + error_message = (f"Received replayer status of {status} but expecting to receive: {ReplayStatus.RUNNING} " + f"after {max_attempts} attempts") + if attempt != max_attempts: + error_message = "" + time.sleep(delay) + if test_case: + test_case.fail(error_message) + else: + raise ReplayerNotActiveError(error_message) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py index 2dd429de62..7433c5648f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py @@ -2,18 +2,32 @@ import pytest import uuid import logging +from typing import List +from .test_cases.ma_test_base import ClusterVersionCombinationUnsupported +from .test_cases.basic_tests import * +from .test_cases.multi_type_tests import * +from console_link.cli import Context +from console_link.environment import Environment -def pytest_configure(config): - # Configure logging, avoid sensitive data at lower logs levels - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') +logger = logging.getLogger(__name__) + +# Dynamically collect all test case classes that have been imported above +ALL_TEST_CASES = [cls for cls in globals().values() if isinstance(cls, type) and + cls.__module__.__contains__("integ_test.test_cases") and + cls.__name__.startswith("Test")] + + +def _split_test_ids(option_str: str): + # Split the string by comma and strip whitespace + return [tid.strip() for tid in option_str.split(",")] def pytest_addoption(parser): parser.addoption("--unique_id", action="store", default=uuid.uuid4().hex) parser.addoption("--stage", action="store", default="dev") + parser.addoption("--test_ids", action="store", default=[], type=_split_test_ids, + help="Specify test IDs like '0001,0003' to filter tests to execute") parser.addoption("--config_file_path", action="store", default="/config/migration_services.yaml", help="Path to config file for console library") parser.addoption("--source_proxy_alb_endpoint", action="store", default=None, @@ -22,6 +36,52 @@ def pytest_addoption(parser): help="Specify the Migration ALB endpoint for the target proxy") +def pytest_generate_tests(metafunc): + if metafunc.function.__name__ == "test_migration_assistant_workflow": + console_config_path = metafunc.config.getoption("config_file_path") + console_link_env: Environment = Context(console_config_path).env + unique_id = metafunc.config.getoption("unique_id") + test_ids_list = metafunc.config.getoption("test_ids") + test_cases_param = _generate_test_cases(console_config_path, console_link_env, unique_id, test_ids_list) + metafunc.parametrize("test_cases", test_cases_param) + + +def _filter_test_cases(test_ids_list: List[str]) -> List: + if not test_ids_list: + return ALL_TEST_CASES + filtered_cases = [] + for case in ALL_TEST_CASES: + if test_ids_list and any(tid in str(case) for tid in test_ids_list): + filtered_cases.append(case) + return filtered_cases + + +def _generate_test_cases(console_config_path: str, console_link_env: Environment, unique_id: str, + test_ids_list: List[str]): + aggregated_test_cases_to_run = [] + isolated_test_cases_to_run = [] + unsupported_test_cases = [] + cases = _filter_test_cases(test_ids_list) + for test_case in cases: + try: + valid_case = test_case(console_config_path=console_config_path, console_link_env=console_link_env, + unique_id=unique_id) + if valid_case.run_isolated: + isolated_test_cases_to_run.append([valid_case]) + else: + aggregated_test_cases_to_run.append(valid_case) + except ClusterVersionCombinationUnsupported: + unsupported_test_cases.append(test_case) + logger.info(f"Aggregated test cases to run ({len(aggregated_test_cases_to_run)}) - {aggregated_test_cases_to_run}") + logger.info(f"Isolated test cases to run ({len(isolated_test_cases_to_run)}) - {isolated_test_cases_to_run}") + if aggregated_test_cases_to_run: + isolated_test_cases_to_run.append(aggregated_test_cases_to_run) + if unsupported_test_cases: + logger.info(f"The following tests are incompatible with the cluster version specified and will be " + f"skipped: {unsupported_test_cases}") + return isolated_test_cases_to_run + + @pytest.fixture def unique_id(pytestconfig): return pytestconfig.getoption("unique_id") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py new file mode 100644 index 0000000000..be6f0e86f0 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py @@ -0,0 +1,195 @@ +import datetime +import logging +import os +import random +import string +import json +import time +from typing import Dict, List +from unittest import TestCase +from console_link.models.cluster import HttpMethod, Cluster +from .common_utils import execute_api_call, DEFAULT_INDEX_IGNORE_LIST + +logger = logging.getLogger(__name__) + + +class DefaultOperationsLibrary: + """ + Provides a library of high-level common operations to perform on an elasticsearch or opensearch cluster as well as + operations for interacting with a Migration Assistant deployment. + **Note**: This library was implemented as a default to work with OpenSearch 2.x clusters and should be extended to + work with older clusters, such as Elasticsearch 5.x clusters, where pattern differences such as multi-type indices + exist. + """ + + def create_index(self, index_name: str, cluster: Cluster, **kwargs): + headers = {'Content-Type': 'application/json'} + return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}", + headers=headers, **kwargs) + + def get_index(self, index_name: str, cluster: Cluster, **kwargs): + return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}", + **kwargs) + + def delete_index(self, index_name: str, cluster: Cluster, **kwargs): + return execute_api_call(cluster=cluster, method=HttpMethod.DELETE, path=f"/{index_name}", + **kwargs) + + def create_document(self, index_name: str, doc_id: str, cluster: Cluster, data: dict = None, doc_type="_doc", + **kwargs): + if data is None: + data = { + 'title': 'Test Document', + 'content': 'This is a sample document for testing OpenSearch.' + } + headers = {'Content-Type': 'application/json'} + return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}/{doc_type}/{doc_id}", + data=json.dumps(data), headers=headers, **kwargs) + + def create_and_retrieve_document(self, index_name: str, doc_id: str, cluster: Cluster, data: dict = None, + doc_type="_doc", **kwargs): + self.create_document(index_name=index_name, doc_id=doc_id, cluster=cluster, data=data, doc_type=doc_type, + **kwargs) + headers = {'Content-Type': 'application/json'} + self.get_document(index_name=index_name, doc_id=doc_id, cluster=cluster, data=data, doc_type=doc_type, + headers=headers, **kwargs) + + def get_document(self, index_name: str, doc_id: str, cluster: Cluster, doc_type="_doc", **kwargs): + return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}/{doc_type}/{doc_id}", + **kwargs) + + def delete_document(self, index_name: str, doc_id: str, cluster: Cluster, doc_type="_doc", **kwargs): + return execute_api_call(cluster=cluster, method=HttpMethod.DELETE, path=f"/{index_name}/{doc_type}/{doc_id}", + **kwargs) + + def clear_index_templates(self, cluster: Cluster, **kwargs): + logger.warning(f"Clearing index templates has not been implemented for cluster version: {cluster.version}") + return + + def get_all_composable_index_template_names(self, cluster: Cluster, **kwargs): + response = execute_api_call(cluster=cluster, method=HttpMethod.GET, path="/_index_template", **kwargs) + data = response.json() + templates = data.get("index_templates", []) + return [tpl["name"] for tpl in templates] + + def verify_index_mapping_properties(self, index_name: str, cluster: Cluster, expected_props: set, **kwargs): + response = execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}", **kwargs) + data = response.json() + mappings = data[index_name]["mappings"]["properties"] + if not all(prop in mappings for prop in expected_props): + raise AssertionError(f"Expected properties: {expected_props} not found in index " + f"mappings {list(mappings.keys())}") + + def index_matches_ignored_index(self, index_name: str, index_prefix_ignore_list: List[str]): + for prefix in index_prefix_ignore_list: + if index_name.startswith(prefix): + return True + return False + + def get_all_index_details(self, cluster: Cluster, index_prefix_ignore_list=None, + **kwargs) -> Dict[str, Dict[str, str]]: + all_index_details = execute_api_call(cluster=cluster, path="/_cat/indices?format=json", **kwargs).json() + index_dict = {} + for index_details in all_index_details: + # While cat/indices returns a doc count metric, the underlying implementation bleeds through details, only + # capture the index name and make a separate api call for the doc count + index_name = index_details['index'] + valid_index = not self.index_matches_ignored_index(index_name, + index_prefix_ignore_list=index_prefix_ignore_list) + if index_prefix_ignore_list is None or valid_index: + # "To get an accurate count of Elasticsearch documents, use the cat count or count APIs." + # See https://www.elastic.co/guide/en/elasticsearch/reference/7.10/cat-indices.html + + count_response = execute_api_call(cluster=cluster, path=f"/{index_name}/_count?format=json", **kwargs) + index_dict[index_name] = count_response.json() + index_dict[index_name]['index'] = index_name + return index_dict + + def check_doc_counts_match(self, cluster: Cluster, + expected_index_details: Dict[str, Dict[str, str]], + test_case: TestCase, + index_prefix_ignore_list=None, + max_attempts: int = 5, + delay: float = 2.5): + if index_prefix_ignore_list is None: + index_prefix_ignore_list = DEFAULT_INDEX_IGNORE_LIST + + error_message = "" + for attempt in range(1, max_attempts + 1): + # Refresh documents + execute_api_call(cluster=cluster, path="/_refresh") + actual_index_details = self.get_all_index_details(cluster=cluster, + index_prefix_ignore_list=index_prefix_ignore_list) + logger.debug(f"Received actual indices: {actual_index_details}") + if actual_index_details.keys() != expected_index_details.keys(): + error_message = (f"Indices are different: \n Expected: {expected_index_details.keys()} \n " + f"Actual: {actual_index_details.keys()}") + logger.debug(f"Error on attempt {attempt}: {error_message}") + else: + for index_details in actual_index_details.values(): + index_name = index_details['index'] + actual_doc_count = index_details['count'] + expected_doc_count = expected_index_details[index_name]['count'] + if actual_doc_count != expected_doc_count: + error_message = (f"Index {index_name} has {actual_doc_count} documents " + f"but {expected_doc_count} were expected") + logger.debug(f"Error on attempt {attempt}: {error_message}") + break + if not error_message: + return True + if attempt != max_attempts: + error_message = "" + time.sleep(delay) + test_case.fail(error_message) + + def check_doc_match(self, test_case: TestCase, index_name: str, doc_id: str, source_cluster: Cluster, + target_cluster: Cluster): + source_response = self.get_document(index_name=index_name, doc_id=doc_id, cluster=source_cluster) + target_response = self.get_document(index_name=index_name, doc_id=doc_id, cluster=target_cluster) + + source_document = source_response.json() + source_content = source_document['_source'] + target_document = target_response.json() + target_content = target_document['_source'] + test_case.assertEqual(source_content, target_content) + + def generate_large_doc(self, size_mib): + # Calculate number of characters needed (1 char = 1 byte) + num_chars = size_mib * 1024 * 1024 + + # Generate random string of the desired length + large_string = ''.join(random.choices(string.ascii_letters + string.digits, k=num_chars)) + + return { + "timestamp": datetime.datetime.now().isoformat(), + "large_field": large_string + } + + def create_transformation_json_file(self, transform_config_data, file_path_to_create: str): + directory = os.path.dirname(file_path_to_create) + if directory: + os.makedirs(directory, exist_ok=True) + with open(file_path_to_create, "w") as file: + json.dump(transform_config_data, file, indent=4) + + def convert_transformations_to_str(self, transform_list: List[Dict]) -> str: + return json.dumps(transform_list) + + def get_index_name_transformation(self, existing_index_name: str, target_index_name: str, + source_major_version: int, source_minor_version: int) -> Dict: + return { + "TypeMappingSanitizationTransformerProvider": { + "staticMappings": { + f"{existing_index_name}": { + "_doc": f"{target_index_name}" + } + }, + "sourceProperties": { + "version": { + "major": source_major_version, + "minor": source_minor_version + } + } + + } + } diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/elasticsearch_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/elasticsearch_operations.py new file mode 100644 index 0000000000..b9b6f2f1a6 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/elasticsearch_operations.py @@ -0,0 +1,73 @@ +from typing import Dict + +from .cluster_version import ClusterVersion +from .default_operations import DefaultOperationsLibrary + + +def get_type_mapping_split_transformation(multi_type_index_name: str, doc_type_1: str, doc_type_2: str, + split_index_name_1: str, split_index_name_2: str, + cluster_version: ClusterVersion) -> Dict: + return { + "TypeMappingSanitizationTransformerProvider": { + "staticMappings": { + multi_type_index_name: { + doc_type_1: split_index_name_1, + doc_type_2: split_index_name_2 + } + }, + "sourceProperties": { + "version": { + "major": cluster_version.major_version, + "minor": cluster_version.minor_version + } + } + } + } + + +def get_type_mapping_union_transformation(multi_type_index_name: str, doc_type_1: str, doc_type_2: str, + cluster_version: ClusterVersion) -> Dict: + return { + "TypeMappingSanitizationTransformerProvider": { + "staticMappings": { + multi_type_index_name: { + doc_type_1: multi_type_index_name, + doc_type_2: multi_type_index_name + } + }, + "sourceProperties": { + "version": { + "major": cluster_version.major_version, + "minor": cluster_version.minor_version + } + } + } + } + + +class ElasticsearchV5_XOperationsLibrary(DefaultOperationsLibrary): + + def get_type_mapping_union_transformation(self, multi_type_index_name: str, doc_type_1: str, doc_type_2: str, + cluster_version: ClusterVersion): + return get_type_mapping_union_transformation(multi_type_index_name=multi_type_index_name, + doc_type_1=doc_type_1, + doc_type_2=doc_type_2, + cluster_version=cluster_version) + + def get_type_mapping_split_transformation(self, multi_type_index_name: str, doc_type_1: str, doc_type_2: str, + split_index_name_1: str, split_index_name_2: str, + cluster_version: ClusterVersion): + return get_type_mapping_split_transformation(multi_type_index_name=multi_type_index_name, + doc_type_1=doc_type_1, + doc_type_2=doc_type_2, + split_index_name_1=split_index_name_1, + split_index_name_2=split_index_name_2, + cluster_version=cluster_version) + + +class ElasticsearchV6_XOperationsLibrary(DefaultOperationsLibrary): + pass + + +class ElasticsearchV7_XOperationsLibrary(DefaultOperationsLibrary): + pass diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py index 97e97d360c..7d2392d0e8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py @@ -15,9 +15,11 @@ from console_link.middleware.kafka import delete_topic from console_link.models.metadata import Metadata from console_link.cli import Context -from common_operations import (create_index, create_document, check_doc_counts_match, wait_for_running_replayer, - get_index_name_transformation, convert_transformations_to_str) +from .common_utils import wait_for_running_replayer +from .default_operations import DefaultOperationsLibrary + logger = logging.getLogger(__name__) +ops = DefaultOperationsLibrary() @pytest.fixture(scope="class") @@ -105,9 +107,9 @@ def test_e2e_0001_default(self): } } } - create_index(cluster=source_cluster, index_name=index_name, data=json.dumps(index_body), test_case=self) - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_1", - expected_status_code=HTTPStatus.CREATED, test_case=self) + ops.create_index(cluster=source_cluster, index_name=index_name, data=json.dumps(index_body), test_case=self) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_1", + expected_status_code=HTTPStatus.CREATED, test_case=self) backfill.create() snapshot: Snapshot = pytest.console_env.snapshot @@ -115,11 +117,11 @@ def test_e2e_0001_default(self): assert snapshot_result.success # Perform metadata migration with a transform to index name - index_name_transform = get_index_name_transformation(existing_index_name=index_name, - target_index_name=transformed_index_name, - source_major_version=source_major_version, - source_minor_version=source_minor_version) - transform_arg = convert_transformations_to_str(transform_list=[index_name_transform]) + index_name_transform = ops.get_index_name_transformation(existing_index_name=index_name, + target_index_name=transformed_index_name, + source_major_version=source_major_version, + source_minor_version=source_minor_version) + transform_arg = ops.convert_transformations_to_str(transform_list=[index_name_transform]) metadata_result: CommandResult = metadata.migrate(extra_args=["--transformer-config", transform_arg]) assert metadata_result.success @@ -129,25 +131,25 @@ def test_e2e_0001_default(self): backfill_scale_result: CommandResult = backfill.scale(units=2) assert backfill_scale_result.success # This document was created after snapshot and should not be included in Backfill but expected in Replay - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_2", - expected_status_code=HTTPStatus.CREATED, test_case=self) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_2", + expected_status_code=HTTPStatus.CREATED, test_case=self) ignore_list = [".", "searchguard", "sg7", "security-auditlog", "reindexed-logs"] expected_source_docs = {} expected_target_docs = {} # Source should have both documents expected_source_docs[index_name] = {"count": 2} - check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, - index_prefix_ignore_list=ignore_list, test_case=self) + ops.check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, + index_prefix_ignore_list=ignore_list, test_case=self) # Target should have one document from snapshot expected_target_docs[transformed_index_name] = {"count": 1} - check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, - index_prefix_ignore_list=ignore_list, max_attempts=20, delay=30.0, test_case=self) + ops.check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, + index_prefix_ignore_list=ignore_list, max_attempts=20, delay=30.0, test_case=self) backfill.stop() - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_3", - expected_status_code=HTTPStatus.CREATED, test_case=self) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_3", + expected_status_code=HTTPStatus.CREATED, test_case=self) replayer.start() wait_for_running_replayer(replayer=replayer) @@ -155,7 +157,7 @@ def test_e2e_0001_default(self): expected_source_docs[index_name] = {"count": 3} # TODO Replayer transformation needed to only have docs in the transformed index expected_target_docs[index_name] = {"count": 3} - check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, - index_prefix_ignore_list=ignore_list, test_case=self) - check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, - index_prefix_ignore_list=ignore_list, max_attempts=30, delay=10.0, test_case=self) + ops.check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, + index_prefix_ignore_list=ignore_list, test_case=self) + ops.check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, + index_prefix_ignore_list=ignore_list, max_attempts=30, delay=10.0, test_case=self) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/ma_workflow_test.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/ma_workflow_test.py new file mode 100644 index 0000000000..417f68bb7f --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/ma_workflow_test.py @@ -0,0 +1,92 @@ +import logging +import pytest +import shutil +from typing import Callable, List + +from .test_cases.ma_test_base import MATestBase +from console_link.middleware.clusters import connection_check, clear_cluster, clear_indices, ConnectionResult +from console_link.models.backfill_base import Backfill +from console_link.models.replayer_base import Replayer +from console_link.models.kafka import Kafka +from console_link.middleware.kafka import delete_topic + +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True) +def setup_and_teardown(request, test_cases: List[MATestBase]): + #-----Setup----- + logger.info("Performing setup...") + test_case = test_cases[0] + source_cluster = test_case.source_cluster + target_cluster = test_case.target_cluster + source_con_result: ConnectionResult = connection_check(source_cluster) + assert source_con_result.connection_established is True + target_con_result: ConnectionResult = connection_check(target_cluster) + assert target_con_result.connection_established is True + kafka: Kafka = test_case.console_link_env.kafka + + # Cleanup generated transformation files + try: + shutil.rmtree("/shared-logs-output/test-transformations") + logger.info("Removed existing /shared-logs-output/test-transformations directory") + except FileNotFoundError: + logger.info("No transformation files detected to cleanup") + + # Clear cluster data + clear_cluster(source_cluster) + clear_indices(target_cluster) + test_case.target_operations.clear_index_templates(cluster=target_cluster) + + # Delete existing Kafka topic to clear records + delete_topic(kafka=kafka, topic_name="logging-traffic-topic") + + #-----Execute test----- + yield + + #-----Teardown----- + logger.info("\nTearing down resources...") + try: + backfill: Backfill = test_case.console_link_env.backfill + backfill.stop() + except Exception as e: + logger.error(f"Error encountered when stopping backfill, resources may not have been cleaned up: {e}") + try: + replayer: Replayer = test_case.console_link_env.replay + replayer.stop() + except Exception as e: + logger.error(f"Error encountered when stopping replayer, resources may not have been cleaned up: {e}") + + +def run_on_all_cases(test_cases: List, operation: Callable) -> None: + for tc in test_cases: + operation(tc) + + +# The test_cases parameter here is dynamically provided by the pytest_generate_tests() function in conftest.py. This +# function will add a parametrize tag on this test to provide the 'test_cases' it has collected +def test_migration_assistant_workflow(test_cases: List[MATestBase]): + logger.info(f"Performing the following test cases: {test_cases}") + control_test_case = test_cases[0] + + # Enable for stepping through workflows with Python debugger + #breakpoint() + + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.test_before()) + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.snapshot_before()) + control_test_case.snapshot_create() + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.snapshot_after()) + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.metadata_before()) + control_test_case.metadata_migrate() + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.metadata_after()) + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.backfill_before()) + control_test_case.backfill_start() + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.backfill_during()) + control_test_case.backfill_wait_for_stop() + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.backfill_after()) + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.replay_before()) + control_test_case.replay_start() + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.replay_during()) + control_test_case.replay_wait_for_stop() + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.replay_after()) + run_on_all_cases(test_cases=test_cases, operation=lambda case: case.test_after()) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/opensearch_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/opensearch_operations.py new file mode 100644 index 0000000000..acca827709 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/opensearch_operations.py @@ -0,0 +1,19 @@ +from console_link.models.cluster import Cluster, HttpMethod + +from .common_utils import execute_api_call +from .default_operations import DefaultOperationsLibrary + + +class OpensearchV1_XOperationsLibrary(DefaultOperationsLibrary): + pass + + +class OpensearchV2_XOperationsLibrary(DefaultOperationsLibrary): + + def clear_index_templates(self, cluster: Cluster, **kwargs): + # Remove legacy templates + execute_api_call(cluster=cluster, method=HttpMethod.DELETE, path="/_template/*", **kwargs) + # Remove composable index templates + composable_template_names = self.get_all_composable_index_template_names(cluster=cluster, **kwargs) + for name in composable_template_names: + execute_api_call(cluster=cluster, method=HttpMethod.DELETE, path=f"/_index_template/{name}", **kwargs) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/operations_library_factory.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/operations_library_factory.py new file mode 100644 index 0000000000..e58e545494 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/operations_library_factory.py @@ -0,0 +1,20 @@ +from .cluster_version import (ClusterVersion, ElasticsearchV5_X, ElasticsearchV6_X, ElasticsearchV7_X, OpensearchV1_X, + OpensearchV2_X, is_incoming_version_supported) +from .elasticsearch_operations import (ElasticsearchV5_XOperationsLibrary, ElasticsearchV6_XOperationsLibrary, + ElasticsearchV7_XOperationsLibrary) +from .opensearch_operations import OpensearchV1_XOperationsLibrary, OpensearchV2_XOperationsLibrary + + +def get_operations_library_by_version(version: ClusterVersion): + if is_incoming_version_supported(limiting_version=ElasticsearchV5_X, incoming_version=version): + return ElasticsearchV5_XOperationsLibrary() + elif is_incoming_version_supported(limiting_version=ElasticsearchV6_X, incoming_version=version): + return ElasticsearchV6_XOperationsLibrary() + elif is_incoming_version_supported(limiting_version=ElasticsearchV7_X, incoming_version=version): + return ElasticsearchV7_XOperationsLibrary() + elif is_incoming_version_supported(limiting_version=OpensearchV1_X, incoming_version=version): + return OpensearchV1_XOperationsLibrary() + elif is_incoming_version_supported(limiting_version=OpensearchV2_X, incoming_version=version): + return OpensearchV2_XOperationsLibrary() + else: + raise Exception(f"Unsupported cluster version: {version}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/replayer_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/replayer_tests.py index d5830cba98..5afa32c111 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/replayer_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/replayer_tests.py @@ -15,12 +15,12 @@ from console_link.models.cluster import Cluster, AuthMethod from console_link.cli import Context -from common_operations import (get_index, create_index, delete_index, get_document, create_document, delete_document, - check_doc_match, check_doc_counts_match, generate_large_doc, execute_api_call, - wait_for_running_replayer, EXPECTED_BENCHMARK_DOCS) -from metric_operations import assert_metrics_present +from .common_utils import execute_api_call, wait_for_running_replayer, EXPECTED_BENCHMARK_DOCS +from .default_operations import DefaultOperationsLibrary +from .metric_operations import assert_metrics_present logger = logging.getLogger(__name__) +ops = DefaultOperationsLibrary() @pytest.fixture(scope="class") @@ -85,14 +85,14 @@ def test_replayer_0001_empty_index(self): target_cluster: Cluster = pytest.console_env.target_cluster index_name = f"test_replayer_0001_{pytest.unique_id}" - create_index(cluster=source_cluster, index_name=index_name, test_case=self) - get_index(cluster=source_cluster, index_name=index_name, test_case=self) - get_index(cluster=target_cluster, index_name=index_name, test_case=self) - delete_index(cluster=source_cluster, index_name=index_name, test_case=self) - get_index(cluster=source_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, - test_case=self) - get_index(cluster=target_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, - test_case=self) + ops.create_index(cluster=source_cluster, index_name=index_name, test_case=self) + ops.get_index(cluster=source_cluster, index_name=index_name, test_case=self) + ops.get_index(cluster=target_cluster, index_name=index_name, test_case=self) + ops.delete_index(cluster=source_cluster, index_name=index_name, test_case=self) + ops.get_index(cluster=source_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, + test_case=self) + ops.get_index(cluster=target_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, + test_case=self) @unittest.skip("Flaky test - https://opensearch.atlassian.net/browse/MIGRATIONS-1925") def test_replayer_0002_single_document(self): @@ -106,23 +106,23 @@ def test_replayer_0002_single_document(self): index_name = f"test_replayer_0002_{pytest.unique_id}" doc_id = "replayer_0002_doc" - create_index(cluster=source_cluster, index_name=index_name, test_case=self) - get_index(cluster=source_cluster, index_name=index_name, test_case=self) - get_index(cluster=target_cluster, index_name=index_name, test_case=self) - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, - expected_status_code=HTTPStatus.CREATED, test_case=self) - check_doc_match(source_cluster=source_cluster, target_cluster=target_cluster, - index_name=index_name, doc_id=doc_id, test_case=self) - delete_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, test_case=self) - get_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, - expected_status_code=HTTPStatus.NOT_FOUND, test_case=self) - get_document(cluster=target_cluster, index_name=index_name, doc_id=doc_id, - expected_status_code=HTTPStatus.NOT_FOUND, test_case=self) - delete_index(cluster=source_cluster, index_name=index_name) - get_index(cluster=source_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, - test_case=self) - get_index(cluster=target_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, - test_case=self) + ops.create_index(cluster=source_cluster, index_name=index_name, test_case=self) + ops.get_index(cluster=source_cluster, index_name=index_name, test_case=self) + ops.get_index(cluster=target_cluster, index_name=index_name, test_case=self) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, + expected_status_code=HTTPStatus.CREATED, test_case=self) + ops.check_doc_match(source_cluster=source_cluster, target_cluster=target_cluster, + index_name=index_name, doc_id=doc_id, test_case=self) + ops.delete_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, test_case=self) + ops.get_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, + expected_status_code=HTTPStatus.NOT_FOUND, test_case=self) + ops.get_document(cluster=target_cluster, index_name=index_name, doc_id=doc_id, + expected_status_code=HTTPStatus.NOT_FOUND, test_case=self) + ops.delete_index(cluster=source_cluster, index_name=index_name) + ops.get_index(cluster=source_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, + test_case=self) + ops.get_index(cluster=target_cluster, index_name=index_name, expected_status_code=HTTPStatus.NOT_FOUND, + test_case=self) def test_replayer_0003_negativeAuth_invalidCreds(self): # This test sends negative credentials to the clusters to validate that unauthorized access is prevented. @@ -178,11 +178,11 @@ def test_replayer_0006_OSB(self): run_test_benchmarks(cluster=source_cluster) # Confirm documents on source - check_doc_counts_match(cluster=source_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, - test_case=self) + ops.check_doc_counts_match(cluster=source_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, + test_case=self) # Confirm documents on target after replay - check_doc_counts_match(cluster=target_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, - test_case=self) + ops.check_doc_counts_match(cluster=target_cluster, expected_index_details=EXPECTED_BENCHMARK_DOCS, + test_case=self) def test_replayer_0007_timeBetweenRequestsOnSameConnection(self): # This test will verify that the replayer functions correctly when @@ -201,17 +201,17 @@ def test_replayer_0007_timeBetweenRequestsOnSameConnection(self): for doc_id_int in range(number_of_docs): doc_id = str(doc_id_int) - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, - expected_status_code=HTTPStatus.CREATED, session=proxy_single_connection_session, - test_case=self) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, + expected_status_code=HTTPStatus.CREATED, session=proxy_single_connection_session, + test_case=self) if doc_id_int + 1 < number_of_docs: time.sleep(seconds_between_requests) try: for doc_id_int in range(number_of_docs): doc_id = str(doc_id_int) - check_doc_match(source_cluster=source_cluster, target_cluster=target_cluster, - index_name=index_name, doc_id=doc_id, test_case=self) + ops.check_doc_match(source_cluster=source_cluster, target_cluster=target_cluster, + index_name=index_name, doc_id=doc_id, test_case=self) finally: proxy_single_connection_session.close() @@ -224,13 +224,13 @@ def test_replayer_0008_largeRequest(self): # Create large document, 99MiB # Default max 100MiB in ES/OS settings (http.max_content_length) - large_doc = generate_large_doc(size_mib=99) + large_doc = ops.generate_large_doc(size_mib=99) # Measure the time taken by the create_document call # Send large request to proxy and verify response start_time = time.time() - create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, data=large_doc, - expected_status_code=HTTPStatus.CREATED, test_case=self) + ops.create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id, data=large_doc, + expected_status_code=HTTPStatus.CREATED, test_case=self) end_time = time.time() duration = end_time - start_time @@ -244,5 +244,5 @@ def test_replayer_0008_largeRequest(self): time.sleep(wait_time_seconds) # Verify document created on source and target - check_doc_match(source_cluster=source_cluster, target_cluster=target_cluster, index_name=index_name, - doc_id=doc_id, test_case=self) + ops.check_doc_match(source_cluster=source_cluster, target_cluster=target_cluster, index_name=index_name, + doc_id=doc_id, test_case=self) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/__init__.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/basic_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/basic_tests.py new file mode 100644 index 0000000000..b9c72de110 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/basic_tests.py @@ -0,0 +1,64 @@ +import logging +from ..cluster_version import ElasticsearchV6_X, ElasticsearchV7_X, OpensearchV1_X, OpensearchV2_X +from .ma_test_base import MATestBase, MigrationType +from console_link.environment import Environment + +logger = logging.getLogger(__name__) + + +class Test0001SingleDocumentBackfill(MATestBase): + def __init__(self, console_config_path: str, console_link_env: Environment, unique_id: str): + allow_combinations = [ + (ElasticsearchV6_X, OpensearchV1_X), + (ElasticsearchV6_X, OpensearchV2_X), + (ElasticsearchV7_X, OpensearchV1_X), + (ElasticsearchV7_X, OpensearchV2_X), + ] + migrations_required = [MigrationType.BACKFILL] + super().__init__(console_config_path=console_config_path, + console_link_env=console_link_env, + unique_id=unique_id, + migrations_required=migrations_required, + allow_source_target_combinations=allow_combinations) + self.index_name = f"test_0001_{self.unique_id}" + self.doc_id = "test_0001_doc" + + def test_before(self): + # Create single document + self.source_operations.create_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id) + self.source_operations.get_document(cluster=self.source_cluster, index_name=self.index_name, doc_id=self.doc_id) + + def backfill_during(self): + # Validate single document exists on target + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.index_name, + doc_id=self.doc_id, max_attempts=10, delay=3.0) + + +class Test0002IndexWithNoDocumentsMetadataMigration(MATestBase): + def __init__(self, console_config_path: str, console_link_env: Environment, unique_id: str): + allow_combinations = [ + (ElasticsearchV6_X, OpensearchV1_X), + (ElasticsearchV6_X, OpensearchV2_X), + (ElasticsearchV7_X, OpensearchV1_X), + (ElasticsearchV7_X, OpensearchV2_X), + ] + run_isolated = True + migrations_required = [MigrationType.METADATA] + super().__init__(console_config_path=console_config_path, + console_link_env=console_link_env, + unique_id=unique_id, + migrations_required=migrations_required, + allow_source_target_combinations=allow_combinations, + run_isolated=run_isolated) + self.index_name = f"test_0002_{self.unique_id}" + + def test_before(self): + # Create empty index + self.source_operations.create_index(cluster=self.source_cluster, index_name=self.index_name) + self.source_operations.get_index(cluster=self.source_cluster, index_name=self.index_name) + + def metadata_after(self): + # Validate index exists on target + self.target_operations.get_index(cluster=self.target_cluster, index_name=self.index_name, max_attempts=5, + delay=2.0) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/ma_test_base.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/ma_test_base.py new file mode 100644 index 0000000000..b6b60898f9 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/ma_test_base.py @@ -0,0 +1,131 @@ +from enum import Enum + +from ..common_utils import wait_for_running_replayer +from ..cluster_version import ClusterVersion, is_incoming_version_supported +from ..operations_library_factory import get_operations_library_by_version + +from console_link.models.backfill_base import Backfill +from console_link.environment import Environment +from console_link.models.replayer_base import Replayer +from console_link.models.command_result import CommandResult +from console_link.models.snapshot import Snapshot +from console_link.models.metadata import Metadata + +MigrationType = Enum("MigrationType", ["METADATA", "BACKFILL", "CAPTURE_AND_REPLAY"]) + + +class ClusterVersionCombinationUnsupported(Exception): + def __init__(self, source_version, target_version, message="Cluster version combination is unsupported"): + self.source_version = source_version + self.target_version = target_version + self.message = f"{message}: Source version '{source_version}' and Target version '{target_version}'" + super().__init__(self.message) + + +class MATestBase: + def __init__(self, console_config_path: str, console_link_env: Environment, unique_id: str, + migrations_required=[MigrationType.METADATA, MigrationType.BACKFILL, MigrationType.CAPTURE_AND_REPLAY], + allow_source_target_combinations=None, run_isolated=False, short_description="MA base test case"): + self.allow_source_target_combinations = allow_source_target_combinations or [] + self.run_isolated = run_isolated + self.short_description = short_description + self.console_link_env = console_link_env + self.migrations_required = migrations_required + if ((not console_link_env.source_cluster or not console_link_env.target_cluster) or + (not console_link_env.source_cluster.version or not console_link_env.target_cluster.version)): + raise RuntimeError("Both a source cluster and target cluster must be defined for the console library and " + "include the version field") + self.source_cluster = console_link_env.source_cluster + self.target_cluster = console_link_env.target_cluster + self.source_version = ClusterVersion(version_str=self.source_cluster.version) + self.target_version = ClusterVersion(version_str=self.target_cluster.version) + + supported_combo = False + for (allowed_source, allowed_target) in allow_source_target_combinations: + if (is_incoming_version_supported(allowed_source, self.source_version) and + is_incoming_version_supported(allowed_target, self.target_version)): + supported_combo = True + break + if not supported_combo: + raise ClusterVersionCombinationUnsupported(self.source_version, self.target_version) + + self.source_operations = get_operations_library_by_version(self.source_version) + self.target_operations = get_operations_library_by_version(self.target_version) + self.console_config_path = console_config_path + self.unique_id = unique_id + self.snapshot: Snapshot = console_link_env.snapshot + self.backfill: Backfill = console_link_env.backfill + self.metadata: Metadata = console_link_env.metadata + self.replayer: Replayer = console_link_env.replay + + def __repr__(self): + return f"<{self.__class__.__name__}(source={self.source_version},target={self.target_version})>" + + def test_before(self): + pass + + def snapshot_before(self): + pass + + def snapshot_create(self): + if any(migration in self.migrations_required for migration in (MigrationType.METADATA, MigrationType.BACKFILL)): + snapshot_result: CommandResult = self.snapshot.create(wait=True) + assert snapshot_result.success + + def snapshot_after(self): + pass + + def metadata_before(self): + pass + + def metadata_migrate(self): + if MigrationType.METADATA in self.migrations_required: + metadata_result: CommandResult = self.metadata.migrate() + assert metadata_result.success + + def metadata_after(self): + pass + + def backfill_before(self): + pass + + def backfill_start(self): + if MigrationType.BACKFILL in self.migrations_required: + backfill_start_result: CommandResult = self.backfill.start() + assert backfill_start_result.success + backfill_scale_result: CommandResult = self.backfill.scale(units=1) + assert backfill_scale_result.success + + def backfill_during(self): + pass + + def backfill_wait_for_stop(self): + if MigrationType.BACKFILL in self.migrations_required: + backfill_stop_result: CommandResult = self.backfill.stop() + assert backfill_stop_result.success + + def backfill_after(self): + pass + + def replay_before(self): + pass + + def replay_start(self): + if MigrationType.CAPTURE_AND_REPLAY in self.migrations_required: + replayer_start_result = self.replayer.start() + assert replayer_start_result.success + wait_for_running_replayer(replayer=self.replayer) + + def replay_during(self): + pass + + def replay_wait_for_stop(self): + if MigrationType.CAPTURE_AND_REPLAY in self.migrations_required: + replayer_stop_result = self.replayer.stop() + assert replayer_stop_result.success + + def replay_after(self): + pass + + def test_after(self): + pass diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/multi_type_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/multi_type_tests.py new file mode 100644 index 0000000000..cebc4e09e4 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/test_cases/multi_type_tests.py @@ -0,0 +1,183 @@ +import logging +from ..cluster_version import ElasticsearchV5_X, OpensearchV1_X, OpensearchV2_X +from .ma_test_base import MATestBase +from console_link.environment import Environment +from console_link.models.command_result import CommandResult + +logger = logging.getLogger(__name__) + + +class Test0004MultiTypeUnionMigration(MATestBase): + def __init__(self, console_config_path: str, console_link_env: Environment, unique_id: str): + allow_combinations = [ + (ElasticsearchV5_X, OpensearchV1_X), + (ElasticsearchV5_X, OpensearchV2_X), + ] + run_isolated = True + super().__init__(console_config_path=console_config_path, + console_link_env=console_link_env, + unique_id=unique_id, + allow_source_target_combinations=allow_combinations, + run_isolated=run_isolated) + self.index_name = f"test_0004_{self.unique_id}" + self.doc_id1 = "test_0004_1" + self.doc_id2 = "test_0004_2" + self.doc_id3 = "test_0004_3" + self.doc_id4 = "test_0004_4" + self.doc_type1 = "sample_type1" + self.doc_type2 = "sample_type2" + self.sample_data1 = { + 'author': 'Alice Quantum', + 'published_date': '2025-03-11T12:00:00Z', + 'tags': ['quantum computing', 'technology', 'innovation', 'research'], + } + self.sample_data2 = { + 'title': 'Exploring Quantum Computing', + 'content': 'Quantum computing is an emerging field that leverages quantum phenomena to perform ' + 'computations at unprecedented speeds. This document explores the basic principles, ' + 'potential applications, and future challenges of this revolutionary technology.', + 'published_date': '2025-03-11T14:00:00Z' + } + self.transform_config_file = "/shared-logs-output/test-transformations/transformation.json" + + def test_before(self): + union_transform = self.source_operations.get_type_mapping_union_transformation( + multi_type_index_name=self.index_name, + doc_type_1=self.doc_type1, + doc_type_2=self.doc_type2, + cluster_version=self.source_version + ) + self.source_operations.create_transformation_json_file(transform_config_data=[union_transform], + file_path_to_create=self.transform_config_file) + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id1, doc_type=self.doc_type1, + data=self.sample_data1) + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id2, doc_type=self.doc_type2, + data=self.sample_data2) + + def metadata_migrate(self): + metadata_result: CommandResult = self.metadata.migrate(extra_args=["--transformer-config-file", + self.transform_config_file]) + assert metadata_result.success + + def metadata_after(self): + self.target_operations.get_index(cluster=self.target_cluster, index_name=self.index_name, max_attempts=3, + delay=2.0) + # Get all keys from sample data + expected_keys = set(self.sample_data1.keys()).union(set(self.sample_data2.keys())) + self.target_operations.verify_index_mapping_properties(cluster=self.target_cluster, index_name=self.index_name, + expected_props=expected_keys) + + def backfill_wait_for_stop(self): + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.index_name, + doc_id=self.doc_id1, max_attempts=10, delay=3.0) + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.index_name, + doc_id=self.doc_id2, max_attempts=10, delay=3.0) + backfill_stop_result: CommandResult = self.backfill.stop() + assert backfill_stop_result.success + + def replay_before(self): + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id3, doc_type=self.doc_type1, + data=self.sample_data1) + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id4, doc_type=self.doc_type2, + data=self.sample_data2) + + def replay_wait_for_stop(self): + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.index_name, + doc_id=self.doc_id3, max_attempts=20, delay=3.0) + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.index_name, + doc_id=self.doc_id4, max_attempts=20, delay=3.0) + replayer_stop_result = self.replayer.stop() + assert replayer_stop_result.success + + +class Test0005MultiTypeSplitMigration(MATestBase): + def __init__(self, console_config_path: str, console_link_env: Environment, unique_id: str): + allow_combinations = [ + (ElasticsearchV5_X, OpensearchV1_X), + (ElasticsearchV5_X, OpensearchV2_X), + ] + run_isolated = True + super().__init__(console_config_path=console_config_path, + console_link_env=console_link_env, + unique_id=unique_id, + allow_source_target_combinations=allow_combinations, + run_isolated=run_isolated) + self.index_name = f"test_0005_{self.unique_id}" + self.split_index_name1 = f"test_0005_split_1_{self.unique_id}" + self.split_index_name2 = f"test_0005_split_2_{self.unique_id}" + self.doc_id1 = "test_0005_1" + self.doc_id2 = "test_0005_2" + self.doc_id3 = "test_0005_3" + self.doc_id4 = "test_0005_4" + self.doc_type1 = "sample_type1" + self.doc_type2 = "sample_type2" + self.sample_data1 = { + 'author': 'Alice Quantum', + 'published_date': '2025-03-11T12:00:00Z', + 'tags': ['quantum computing', 'technology', 'innovation', 'research'], + } + self.sample_data2 = { + 'title': 'Exploring Quantum Computing', + 'content': 'Quantum computing is an emerging field that leverages quantum phenomena to perform ' + 'computations at unprecedented speeds. This document explores the basic principles, ' + 'potential applications, and future challenges of this revolutionary technology.', + 'published_date': '2025-03-11T14:00:00Z' + } + self.transform_config_file = "/shared-logs-output/test-transformations/transformation.json" + + def test_before(self): + split_transform = self.source_operations.get_type_mapping_split_transformation( + multi_type_index_name=self.index_name, + doc_type_1=self.doc_type1, + doc_type_2=self.doc_type2, + split_index_name_1=self.split_index_name1, + split_index_name_2=self.split_index_name2, + cluster_version=self.source_version + ) + self.source_operations.create_transformation_json_file(transform_config_data=[split_transform], + file_path_to_create=self.transform_config_file) + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id1, doc_type=self.doc_type1, + data=self.sample_data1) + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id2, doc_type=self.doc_type2, + data=self.sample_data2) + + def metadata_migrate(self): + metadata_result: CommandResult = self.metadata.migrate(extra_args=["--transformer-config-file", + self.transform_config_file]) + assert metadata_result.success + + def metadata_after(self): + self.target_operations.verify_index_mapping_properties(cluster=self.target_cluster, + index_name=self.split_index_name1, + expected_props=set(self.sample_data1.keys())) + self.target_operations.verify_index_mapping_properties(cluster=self.target_cluster, + index_name=self.split_index_name2, + expected_props=set(self.sample_data2.keys())) + + def backfill_wait_for_stop(self): + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.split_index_name1, + doc_id=self.doc_id1, max_attempts=10, delay=3.0) + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.split_index_name2, + doc_id=self.doc_id2, max_attempts=10, delay=3.0) + backfill_stop_result: CommandResult = self.backfill.stop() + assert backfill_stop_result.success + + def replay_before(self): + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id3, doc_type=self.doc_type1, + data=self.sample_data1) + self.source_operations.create_and_retrieve_document(cluster=self.source_cluster, index_name=self.index_name, + doc_id=self.doc_id4, doc_type=self.doc_type2, + data=self.sample_data2) + + def replay_wait_for_stop(self): + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.split_index_name1, + doc_id=self.doc_id3, max_attempts=20, delay=3.0) + self.target_operations.get_document(cluster=self.target_cluster, index_name=self.split_index_name2, + doc_id=self.doc_id4, max_attempts=20, delay=3.0) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/pytest.ini b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/pytest.ini new file mode 100644 index 0000000000..2c946ff134 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +log_cli = true +log_cli_level = INFO +log_cli_format = %(asctime)s - %(levelname)s - %(message)s +log_cli_date_format = %Y-%m-%d %H:%M:%S \ No newline at end of file diff --git a/deployment/k8s/charts/aggregates/migrationAssistant/Chart.yaml b/deployment/k8s/charts/aggregates/migrationAssistant/Chart.yaml index e124546361..b16f608667 100644 --- a/deployment/k8s/charts/aggregates/migrationAssistant/Chart.yaml +++ b/deployment/k8s/charts/aggregates/migrationAssistant/Chart.yaml @@ -13,6 +13,10 @@ dependencies: version: "0.1.0" repository: "file://../../sharedResources/snapshotVolume" condition: conditionalPackageInstalls.snapshotVolume + - name: logs-volume + version: "0.1.0" + repository: "file://../../sharedResources/logsVolume" + condition: conditionalPackageInstalls.logsVolume - name: migration-console condition: conditionalPackageInstalls.migrationConsole diff --git a/deployment/k8s/charts/aggregates/migrationAssistant/values.yaml b/deployment/k8s/charts/aggregates/migrationAssistant/values.yaml index e79ad8c444..00ee0c550e 100644 --- a/deployment/k8s/charts/aggregates/migrationAssistant/values.yaml +++ b/deployment/k8s/charts/aggregates/migrationAssistant/values.yaml @@ -9,6 +9,7 @@ conditionalPackageInstalls: replayer: true sharedConfigs: true snapshotVolume: true + logsVolume: true shared-configs: @@ -16,10 +17,11 @@ shared-configs: sourceCluster: allowRuntimeOverride: true object: - endpoint: "http://elasticsearch-master:9200" + endpoint: "http://ma-capture-proxy:9200" + directEndpoint: "http://elasticsearch-master:9200" allowInsecure: true authType: "no_auth" - version: "7.10.2" + version: "ES_7.10" targetCluster: allowRuntimeOverride: true object: @@ -30,7 +32,7 @@ shared-configs: basicAuthPassword: "myStrongPassword123!" # Should be in sync with above username/password basicAuthHeader: "Basic YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE=" - version: "2.16.0" + version: "OS_2.16" kafkaBrokers: allowRuntimeOverride: false object: @@ -63,7 +65,7 @@ capture-proxy: destinationUri: source: otherConfig configMapName: "source-cluster" - configMapKey: "endpoint" + configMapKey: "directEndpoint" insecureDestination: source: otherConfig configMapName: "source-cluster" @@ -77,10 +79,14 @@ capture-proxy: migration-console: snapshotVolumeEnabled: true snapshotVolumePvc: "snapshot-volume-pvc" + sharedLogsVolumeEnabled: true + sharedLogsPvc: "shared-logs-pvc" bulk-document-loader: snapshotVolumeEnabled: true snapshotVolumePvc: "snapshot-volume-pvc" + sharedLogsVolumeEnabled: true + sharedLogsPvc: "shared-logs-pvc" parameters: snapshotName: source: otherConfig @@ -103,8 +109,18 @@ bulk-document-loader: source: otherConfig configMapName: "target-cluster" configMapKey: "basicAuthPassword" + sourceVersion: + source: parameterConfig + value: "ES_7.10" + allowRuntimeOverride: true + docTransformerConfigFile: + source: parameterConfig + value: "/shared-logs-output/test-transformations/transformation.json" + allowRuntimeOverride: true replayer: + sharedLogsVolumeEnabled: true + sharedLogsPvc: "shared-logs-pvc" parameters: kafkaTrafficBrokers: source: otherConfig # eventually this will become a list from a shared config @@ -123,6 +139,14 @@ replayer: source: otherConfig configMapName: "target-cluster" configMapKey: "basicAuthHeader" + speedupFactor: + source: parameterConfig + value: 10 + allowRuntimeOverride: true + transformerConfigFile: + source: parameterConfig + value: "/shared-logs-output/test-transformations/transformation.json" + allowRuntimeOverride: true captured-traffic-kafka-cluster: environment: test diff --git a/deployment/k8s/charts/components/bulkLoad/Chart.lock b/deployment/k8s/charts/components/bulkLoad/Chart.lock index 7290cb91e0..500960244b 100644 --- a/deployment/k8s/charts/components/bulkLoad/Chart.lock +++ b/deployment/k8s/charts/components/bulkLoad/Chart.lock @@ -2,8 +2,5 @@ dependencies: - name: helm-common repository: file://../../sharedResources/helmCommon version: 0.1.0 -- name: logs-volume - repository: file://../../sharedResources/logsVolume - version: 0.1.0 -digest: sha256:5e118d521e5fe18deb458491cbeedae908bd27ded08b0d16f867968c0c591c28 -generated: "2025-01-29T17:20:16.376823-06:00" +digest: sha256:bc6f4a7b13fb0743870f216e559f3cc46bbf7cf573bdddcad9899a736696cd72 +generated: "2025-03-10T17:50:13.424809-05:00" diff --git a/deployment/k8s/charts/components/bulkLoad/Chart.yaml b/deployment/k8s/charts/components/bulkLoad/Chart.yaml index 9408663683..5a9d914079 100644 --- a/deployment/k8s/charts/components/bulkLoad/Chart.yaml +++ b/deployment/k8s/charts/components/bulkLoad/Chart.yaml @@ -7,6 +7,3 @@ dependencies: - name: helm-common repository: file://../../sharedResources/helmCommon version: 0.1.0 - - name: logs-volume - version: "0.1.0" - repository: "file://../../sharedResources/logsVolume" diff --git a/deployment/k8s/charts/components/bulkLoad/templates/deployment.yaml b/deployment/k8s/charts/components/bulkLoad/templates/deployment.yaml index d58c7970df..773924285d 100644 --- a/deployment/k8s/charts/components/bulkLoad/templates/deployment.yaml +++ b/deployment/k8s/charts/components/bulkLoad/templates/deployment.yaml @@ -1,5 +1,6 @@ {{ $envMountName := "env-vars" }} {{ $snapshotVolumeEnabled := .Values.snapshotVolumeEnabled }} +{{ $sharedLogsVolumeEnabled := .Values.sharedLogsVolumeEnabled }} apiVersion: apps/v1 kind: Deployment metadata: @@ -38,6 +39,10 @@ spec: - name: snapshot-volume mountPath: /snapshot {{- end }} + {{- if $sharedLogsVolumeEnabled }} + - name: shared-logs + mountPath: /shared-logs-output + {{- end }} volumes: - name: {{ $envMountName }} emptyDir: {} @@ -46,3 +51,8 @@ spec: persistentVolumeClaim: claimName: {{ .Values.snapshotVolumePvc }} {{- end }} + {{- if $sharedLogsVolumeEnabled }} + - name: shared-logs + persistentVolumeClaim: + claimName: {{ .Values.sharedLogsPvc }} + {{- end }} diff --git a/deployment/k8s/charts/components/bulkLoad/values.yaml b/deployment/k8s/charts/components/bulkLoad/values.yaml index 9d75cfe18f..d17cd80a12 100644 --- a/deployment/k8s/charts/components/bulkLoad/values.yaml +++ b/deployment/k8s/charts/components/bulkLoad/values.yaml @@ -1,3 +1,5 @@ +sharedLogsVolumeEnabled: false +sharedLogsPvc: "" snapshotVolumeEnabled: false snapshotVolumePvc: "" parameters: diff --git a/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-multi-node-cluster.yaml b/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-multi-node-cluster.yaml index 253620853b..39274a6192 100644 --- a/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-multi-node-cluster.yaml +++ b/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-multi-node-cluster.yaml @@ -6,6 +6,10 @@ elasticsearch: protocol: http replicas: 3 # Set replicas to the number of nodes you want in the cluster createCert: false + clusterHealthCheckParams: "wait_for_status=yellow&timeout=3s" + readinessProbe: + failureThreshold: 5 + successThreshold: 2 esConfig: elasticsearch.yml: | bootstrap.system_call_filter: false diff --git a/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-single-node-cluster.yaml b/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-single-node-cluster.yaml index 85020d7f7a..5112fb3e83 100644 --- a/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-single-node-cluster.yaml +++ b/deployment/k8s/charts/components/elasticsearchCluster/environments/es-5-6-single-node-cluster.yaml @@ -6,6 +6,10 @@ elasticsearch: protocol: http replicas: 1 # Set replicas to the number of nodes you want in the cluster createCert: false + clusterHealthCheckParams: "wait_for_status=yellow&timeout=3s" + readinessProbe: + failureThreshold: 5 + successThreshold: 2 esConfig: elasticsearch.yml: | bootstrap.system_call_filter: false diff --git a/deployment/k8s/charts/components/migrationConsole/README.md b/deployment/k8s/charts/components/migrationConsole/README.md new file mode 100644 index 0000000000..02b876610e --- /dev/null +++ b/deployment/k8s/charts/components/migrationConsole/README.md @@ -0,0 +1,8 @@ +# Migration Console Helm Chart +The component Helm chart for the Migration Console + +### Developing the Migration Console +To enable quicker development with libraries on the Migration Console, such as the `console_link` and `integ_test`, a `developerModeEnabled` setting was +added to the default `values.yaml` in this directory. When this setting is enabled in the `values.yaml`, the Migration Console container will +mount the local repository onto the container, such that changes to these libraries will be automatically updated on the Migration Console container, +enabling a quicker feedback loop when testing these libraries in a deployed environment. \ No newline at end of file diff --git a/deployment/k8s/charts/components/migrationConsole/templates/deployment.yaml b/deployment/k8s/charts/components/migrationConsole/templates/deployment.yaml index a0ef164d81..84213d088d 100644 --- a/deployment/k8s/charts/components/migrationConsole/templates/deployment.yaml +++ b/deployment/k8s/charts/components/migrationConsole/templates/deployment.yaml @@ -1,7 +1,8 @@ {{ $mountName := "all-configs" }} {{ $envVarMountName := "env-vars" }} -{{ $sharedLogsVolumeEnabled := false }} +{{ $sharedLogsVolumeEnabled := .Values.sharedLogsVolumeEnabled }} {{ $snapshotVolumeEnabled := .Values.snapshotVolumeEnabled }} +{{ $developerModeEnabled := .Values.developerModeEnabled }} apiVersion: apps/v1 kind: Deployment metadata: @@ -34,6 +35,9 @@ spec: - "-c" - | source /shared/vars.sh + {{- if .Values.developerModeEnabled }} + source /.venv/bin/activate && pipenv install -e ~/lib/console_link + {{- end }} export START_API_COMMAND="pipenv run python /root/console_api/manage.py runserver_plus 0.0.0.0:8000 --cert-file cert.crt" export WAIT_AND_DO_NOTHING_COMMAND="tail -f /dev/null" migrationEnabled=$(echo "$@" | grep -o -- "--migrationApiEnabled" | wc -l); if [ "$migrationEnabled" -gt 0 ]; then $START_API_COMMAND; else $WAIT_AND_DO_NOTHING_COMMAND; fi > run.sh @@ -47,9 +51,10 @@ spec: mountPath: /config - name: {{ $envVarMountName }} mountPath: /shared -{{/* Enable if developing the console library */}} -{{/* - name: local-console-link*/}} -{{/* mountPath: /root/lib/console_link*/}} + {{- if $developerModeEnabled }} + - name: local-console-libs + mountPath: /root/lib + {{- end }} {{- if $snapshotVolumeEnabled }} - name: snapshot-volume mountPath: /snapshot @@ -59,10 +64,12 @@ spec: emptyDir: { } - name: merged-config emptyDir: { } -{{/* - name: local-console-link*/}} -{{/* hostPath:*/}} -{{/* path: /opensearch-migrations/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link*/}} -{{/* type: Directory*/}} + {{- if $developerModeEnabled }} + - name: local-console-libs + hostPath: + path: /opensearch-migrations/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib + type: Directory + {{- end }} {{- if $sharedLogsVolumeEnabled }} - name: shared-logs persistentVolumeClaim: diff --git a/deployment/k8s/charts/components/migrationConsole/values.yaml b/deployment/k8s/charts/components/migrationConsole/values.yaml index eec662b0bd..032d04d788 100644 --- a/deployment/k8s/charts/components/migrationConsole/values.yaml +++ b/deployment/k8s/charts/components/migrationConsole/values.yaml @@ -1,3 +1,4 @@ +developerModeEnabled: false sharedLogsVolumeEnabled: false sharedLogsPvc: "" snapshotVolumeEnabled: false diff --git a/deployment/k8s/charts/components/replayer/templates/deployment.yaml b/deployment/k8s/charts/components/replayer/templates/deployment.yaml index b7454dc47b..91c7f51b16 100644 --- a/deployment/k8s/charts/components/replayer/templates/deployment.yaml +++ b/deployment/k8s/charts/components/replayer/templates/deployment.yaml @@ -1,4 +1,5 @@ {{ $envMountName := "env-vars" }} +{{ $sharedLogsVolumeEnabled := .Values.sharedLogsVolumeEnabled }} apiVersion: apps/v1 kind: Deployment metadata: @@ -42,6 +43,15 @@ spec: volumeMounts: - name: {{ $envMountName }} mountPath: /shared + {{- if $sharedLogsVolumeEnabled }} + - name: shared-logs + mountPath: /shared-logs-output + {{- end }} volumes: - name: {{ $envMountName }} emptyDir: {} + {{- if $sharedLogsVolumeEnabled }} + - name: shared-logs + persistentVolumeClaim: + claimName: {{ .Values.sharedLogsPvc }} + {{- end }} \ No newline at end of file diff --git a/deployment/k8s/charts/components/replayer/values.yaml b/deployment/k8s/charts/components/replayer/values.yaml index 77f699af95..58f1f70292 100644 --- a/deployment/k8s/charts/components/replayer/values.yaml +++ b/deployment/k8s/charts/components/replayer/values.yaml @@ -1,3 +1,5 @@ +sharedLogsVolumeEnabled: false +sharedLogsPvc: "" parameters: targetUri: postionalArgumentIndex: 0 diff --git a/deployment/k8s/minikubeLocal.sh b/deployment/k8s/minikubeLocal.sh index c2793a89d5..b31397b5ca 100755 --- a/deployment/k8s/minikubeLocal.sh +++ b/deployment/k8s/minikubeLocal.sh @@ -7,18 +7,28 @@ usage() { exit 1 } +kill_minikube_processes() { + mount_process_id=$(pgrep -f "minikube mount") + if [ -n "$mount_process_id" ]; then + kill "$mount_process_id" + fi +} + start() { helm repo add opensearch-operator https://opensearch-project.github.io/opensearch-k8s-operator/ helm repo add strimzi https://strimzi.io/charts/ minikube start + minikube mount .:/opensearch-migrations > /dev/null 2>&1 & } pause() { + kill_minikube_processes minikube pause } delete() { + kill_minikube_processes minikube delete } diff --git a/deployment/k8s/update_deps.sh b/deployment/k8s/update_deps.sh index b7e3636606..21f2babf95 100755 --- a/deployment/k8s/update_deps.sh +++ b/deployment/k8s/update_deps.sh @@ -67,6 +67,11 @@ update_directories () { START_TIME=$(date +%s) +# Allow executing this script from any dir +script_abs_path=$(readlink -f "$0") +script_dir_abs_path=$(dirname "$script_abs_path") +cd "$script_dir_abs_path" || exit + # Create hash file if it doesn't exist, create or clear temp file touch "$HASH_FILE" echo -n > "$TEMP_FILE" diff --git a/transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerLoaders/src/main/java/org/opensearch/migrations/transform/TransformerConfigUtils.java b/transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerLoaders/src/main/java/org/opensearch/migrations/transform/TransformerConfigUtils.java index 56f12ca9b9..9c7394754e 100644 --- a/transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerLoaders/src/main/java/org/opensearch/migrations/transform/TransformerConfigUtils.java +++ b/transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerLoaders/src/main/java/org/opensearch/migrations/transform/TransformerConfigUtils.java @@ -19,9 +19,10 @@ public static String getTransformerConfig(TransformerParams params) { isConfigured(params.getTransformerConfig()); if (configuredCount > 1) { System.err.println("Specify only one of " + - "--" + params.getTransformerConfigParameterArgPrefix() + "transformer-config-base64" + ", " + - "--" + params.getTransformerConfigParameterArgPrefix() + "transformer-config" + ", or " + - "--" + params.getTransformerConfigParameterArgPrefix() + "transformer-config-file" + "."); + "--" + params.getTransformerConfigParameterArgPrefix() + "-transformer-config-base64" + ", " + + "--" + params.getTransformerConfigParameterArgPrefix() + "-transformer-config" + ", or " + + "--" + params.getTransformerConfigParameterArgPrefix() + "-transformer-config-file" + + ". Both Kebab case and lower Camel case are supported."); System.exit(4); }