diff --git a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java index cbf4614621..9d6077782a 100644 --- a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java @@ -84,6 +84,12 @@ public static class Args { description = "The role ARN the cluster will assume to write a snapshot to S3") public String s3RoleArn; + @Parameter( + names = {"--s3-endpoint" }, + required = false, + description = "The S3 endpoint URL to use for the S3 bucket, like: s3.us-west-2.amazonaws.com") + public String s3Endpoint; + @Parameter( names = {"--index-allowlist"}, required = false, @@ -163,6 +169,7 @@ public void run() { arguments.indexAllowlist, arguments.maxSnapshotRateMBPerNode, arguments.s3RoleArn, + arguments.s3Endpoint, context ); } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java index 30db17314f..d6446983e8 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java @@ -14,6 +14,7 @@ public class S3SnapshotCreator extends SnapshotCreator { private final String s3Region; private final Integer maxSnapshotRateMBPerNode; private final String snapshotRoleArn; + private final String s3Endpoint; public S3SnapshotCreator( String snapshotName, @@ -24,7 +25,7 @@ public S3SnapshotCreator( List indexAllowlist, IRfsContexts.ICreateSnapshotContext context ) { - this(snapshotName, snapshotRepoName, client, s3Uri, s3Region, indexAllowlist, null, null, context); + this(snapshotName, snapshotRepoName, client, s3Uri, s3Region, indexAllowlist, null, null, null, context); } public S3SnapshotCreator( @@ -36,6 +37,7 @@ public S3SnapshotCreator( List indexAllowlist, Integer maxSnapshotRateMBPerNode, String snapshotRoleArn, + String s3Endpoint, IRfsContexts.ICreateSnapshotContext context ) { super(snapshotName, snapshotRepoName, indexAllowlist, client, context); @@ -43,6 +45,7 @@ public S3SnapshotCreator( this.s3Region = s3Region; this.maxSnapshotRateMBPerNode = maxSnapshotRateMBPerNode; this.snapshotRoleArn = snapshotRoleArn; + this.s3Endpoint = s3Endpoint; } @Override @@ -60,6 +63,10 @@ public ObjectNode getRequestBodyForRegisterRepo() { if (maxSnapshotRateMBPerNode != null) { settings.put("max_snapshot_bytes_per_sec", maxSnapshotRateMBPerNode + "mb"); } + + if (s3Endpoint != null) { + settings.put("endpoint", s3Endpoint); + } ObjectNode body = mapper.createObjectNode(); body.put("type", "s3"); diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index a544c8a49b..235ae6f944 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -96,7 +96,8 @@ def __init__(self, config_file: str): if 'snapshot' in self.config: self.snapshot: Snapshot = get_snapshot(self.config["snapshot"], - source_cluster=self.source_cluster) + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) logger.info(f"Snapshot initialized: {self.snapshot}") else: logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py index 6912399dd9..e142e64dac 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/clusters.py @@ -34,7 +34,7 @@ def connection_check(cluster: Cluster) -> ConnectionResult: caught_exception = None r = None try: - r = cluster.call_api(cluster_details_path, timeout=3) + r = cluster.call_api(cluster_details_path, timeout=20) except Exception as e: caught_exception = e logging.debug(f"Unable to access cluster: {cluster} with exception: {e}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index 1fd1256b45..5720252811 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -143,7 +143,7 @@ def _generate_auth_object(self) -> requests.auth.AuthBase | None: raise NotImplementedError(f"Auth type {self.auth_type} not implemented") def call_api(self, path, method: HttpMethod = HttpMethod.GET, data=None, headers=None, - timeout=None, session=None, raise_error=True, **kwargs) -> requests.Response: + timeout=300, session=None, raise_error=True, **kwargs) -> requests.Response: """ Calls an API on the cluster. """ diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/command_runner.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/command_runner.py index 2ad34d4d2f..f9fbceeb13 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/command_runner.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/command_runner.py @@ -16,7 +16,12 @@ def __init__(self, command_root: str, command_args: Dict[str, Any], sensitive_fi run_as_detatched: bool = False, log_file: Optional[str] = None): self.command_args = command_args self.command = [command_root] + if "__positional__" in command_args: + self.command.extend(command_args["__positional__"]) + for key, value in command_args.items(): + if key == "__positional__": + continue self.command.append(key) if value is not FlagOnlyArgument: if type(value) is not str: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/factories.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/factories.py index 7e61305e0e..a677418430 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/factories.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/factories.py @@ -46,11 +46,17 @@ def __init__(self, supplied_backfill: str): super().__init__("Unsupported backfill type", supplied_backfill) -def get_snapshot(config: Dict, source_cluster: Cluster): +def get_snapshot(config: Dict, source_cluster: Cluster = None, target_cluster: Cluster = None): + # Use target_cluster as a fallback when source_cluster is not available + cluster_to_use = source_cluster if source_cluster is not None else target_cluster + + if cluster_to_use is None: + raise ValueError("Either source_cluster or target_cluster must be provided for snapshot operations") + if 'fs' in config: - return FileSystemSnapshot(config, source_cluster) + return FileSystemSnapshot(config, cluster_to_use) elif 's3' in config: - return S3Snapshot(config, source_cluster) + return S3Snapshot(config, cluster_to_use) logger.error(f"An unsupported snapshot type was provided: {config.keys()}") if len(config.keys()) > 1: raise UnsupportedSnapshotError(', '.join(config.keys())) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 8a79e89f66..6511111412 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -2,7 +2,7 @@ import logging from abc import ABC, abstractmethod from requests.exceptions import HTTPError -from typing import Dict +from typing import Dict, Optional from cerberus import Validator from console_link.models.cluster import AuthMethod, Cluster, HttpMethod @@ -27,7 +27,8 @@ 'schema': { 'repo_uri': {'type': 'string', 'required': True}, 'aws_region': {'type': 'string', 'required': True}, - 'role': {'type': 'string', 'required': False} + 'role': {'type': 'string', 'required': False}, + 'endpoint': {'type': 'string', 'required': False} } }, 'fs': { @@ -46,9 +47,9 @@ class Snapshot(ABC): """ Interface for creating and managing snapshots. """ - def __init__(self, config: Dict, source_cluster: Cluster) -> None: + def __init__(self, config: Dict, cluster: Cluster) -> None: self.config = config - self.source_cluster = source_cluster + self.cluster = cluster v = Validator(SNAPSHOT_SCHEMA) if not v.validate({'snapshot': config}): raise ValueError("Invalid config file for snapshot", v.errors) @@ -85,27 +86,27 @@ def _collect_universal_command_args(self) -> Dict: command_args = { "--snapshot-name": self.snapshot_name, "--snapshot-repo-name": self.snapshot_repo_name, - "--source-host": self.source_cluster.endpoint + "--source-host": self.cluster.endpoint } - if self.source_cluster.auth_type == AuthMethod.BASIC_AUTH: + if self.cluster.auth_type == AuthMethod.BASIC_AUTH: try: command_args.update({ - "--source-username": self.source_cluster.auth_details.get("username"), - "--source-password": self.source_cluster.get_basic_auth_password() + "--source-username": self.cluster.auth_details.get("username"), + "--source-password": self.cluster.get_basic_auth_password() }) logger.info("Using basic auth for source cluster") except KeyError as e: raise ValueError(f"Missing required auth details for source cluster: {e}") - elif self.source_cluster.auth_type == AuthMethod.SIGV4: - signing_name, region = self.source_cluster._get_sigv4_details(force_region=True) + elif self.cluster.auth_type == AuthMethod.SIGV4: + signing_name, region = self.cluster._get_sigv4_details(force_region=True) logger.info(f"Using sigv4 auth for source cluster with signing_name {signing_name} and region {region}") command_args.update({ "--source-aws-service-signing-name": signing_name, "--source-aws-region": region }) - if self.source_cluster.allow_insecure: + if self.cluster.allow_insecure: command_args["--source-insecure"] = None if self.otel_endpoint: @@ -115,14 +116,17 @@ def _collect_universal_command_args(self) -> Dict: class S3Snapshot(Snapshot): - def __init__(self, config: Dict, source_cluster: Cluster) -> None: - super().__init__(config, source_cluster) + def __init__(self, config: Dict, cluster: Cluster) -> None: + super().__init__(config, cluster) + self.snapshot_name = config['snapshot_name'] + self.otel_endpoint = config.get("otel_endpoint", None) self.s3_repo_uri = config['s3']['repo_uri'] self.s3_role_arn = config['s3'].get('role') self.s3_region = config['s3']['aws_region'] + self.s3_endpoint = config['s3'].get('endpoint') def create(self, *args, **kwargs) -> CommandResult: - assert isinstance(self.source_cluster, Cluster) + assert isinstance(self.cluster, Cluster) base_command = "/root/createSnapshot/bin/CreateSnapshot" s3_command_args = { @@ -130,6 +134,9 @@ def create(self, *args, **kwargs) -> CommandResult: "--s3-region": self.s3_region, } + if self.s3_endpoint: + s3_command_args["--s3-endpoint"] = self.s3_endpoint + command_args = self._collect_universal_command_args() command_args.update(s3_command_args) @@ -159,26 +166,35 @@ def create(self, *args, **kwargs) -> CommandResult: def status(self, *args, deep_check=False, **kwargs) -> CommandResult: if deep_check: - return get_snapshot_status_full(self.source_cluster, self.snapshot_name, self.snapshot_repo_name) - return get_snapshot_status(self.source_cluster, self.snapshot_name, self.snapshot_repo_name) + return get_snapshot_status_full(self.cluster, self.snapshot_name) + return get_snapshot_status(self.cluster, self.snapshot_name) def delete(self, *args, **kwargs) -> CommandResult: - return delete_snapshot(self.source_cluster, self.snapshot_name, self.snapshot_repo_name) - - def delete_all_snapshots(self, *args, **kwargs) -> CommandResult: - return delete_all_snapshots(self.source_cluster, self.snapshot_repo_name) + timeout = kwargs.get('timeout', 1200) + return delete_snapshot(self.cluster, self.snapshot_name, timeout=timeout) def delete_snapshot_repo(self, *args, **kwargs) -> CommandResult: - return delete_snapshot_repo(self.source_cluster, self.snapshot_repo_name) + timeout = kwargs.get('timeout', 1200) + return delete_snapshot_repo(self.cluster, self.snapshot_repo_name, timeout=timeout) + + def delete_all_snapshots(self, *args, **kwargs) -> CommandResult: + try: + timeout = kwargs.get('timeout', 1200) + delete_all_snapshots(self.cluster, self.snapshot_repo_name, timeout) + return CommandResult(success=True, value="All snapshots deleted successfully.") + except Exception as e: + return CommandResult(success=False, value=f"Failed to delete all snapshots: {str(e)}") class FileSystemSnapshot(Snapshot): - def __init__(self, config: Dict, source_cluster: Cluster) -> None: - super().__init__(config, source_cluster) + def __init__(self, config: Dict, cluster: Cluster) -> None: + super().__init__(config, cluster) + self.snapshot_name = config['snapshot_name'] + self.otel_endpoint = config.get("otel_endpoint", None) self.repo_path = config['fs']['repo_path'] def create(self, *args, **kwargs) -> CommandResult: - assert isinstance(self.source_cluster, Cluster) + assert isinstance(self.cluster, Cluster) base_command = "/root/createSnapshot/bin/CreateSnapshot" command_args = self._collect_universal_command_args() @@ -205,23 +221,31 @@ def create(self, *args, **kwargs) -> CommandResult: def status(self, *args, deep_check=False, **kwargs) -> CommandResult: if deep_check: - return get_snapshot_status_full(self.source_cluster, self.snapshot_name, self.snapshot_repo_name) - return get_snapshot_status(self.source_cluster, self.snapshot_name, self.snapshot_repo_name) + return get_snapshot_status_full(self.cluster, self.snapshot_name) + return get_snapshot_status(self.cluster, self.snapshot_name) def delete(self, *args, **kwargs) -> CommandResult: - return delete_snapshot(self.source_cluster, self.snapshot_name, self.snapshot_repo_name) - - def delete_all_snapshots(self, *args, **kwargs) -> CommandResult: - return delete_all_snapshots(self.source_cluster, self.snapshot_repo_name) + timeout = kwargs.get('timeout', 1200) + return delete_snapshot(self.cluster, self.snapshot_name, timeout=timeout) def delete_snapshot_repo(self, *args, **kwargs) -> CommandResult: - return delete_snapshot_repo(self.source_cluster, self.snapshot_repo_name) + timeout = kwargs.get('timeout', 1200) + return delete_snapshot_repo(self.cluster, self.snapshot_repo_name, timeout=timeout) + + def delete_all_snapshots(self, *args, **kwargs) -> CommandResult: + try: + timeout = kwargs.get('timeout', 1200) + delete_all_snapshots(self.cluster, self.snapshot_repo_name, timeout) + return CommandResult(success=True, value="All snapshots deleted successfully.") + except Exception as e: + return CommandResult(success=False, value=f"Failed to delete all snapshots: {str(e)}") -def get_snapshot_status(cluster: Cluster, snapshot: str, repository: str) -> CommandResult: +def get_snapshot_status(cluster: Cluster, snapshot: str, + repository: str = 'migration_assistant_repo', timeout: int = 300) -> CommandResult: path = f"/_snapshot/{repository}/{snapshot}" try: - response = cluster.call_api(path, HttpMethod.GET) + response = cluster.call_api(path, HttpMethod.GET, timeout=timeout) logging.debug(f"Raw get snapshot status response: {response.text}") response.raise_for_status() @@ -235,6 +259,21 @@ def get_snapshot_status(cluster: Cluster, snapshot: str, repository: str) -> Com return CommandResult(success=False, value=f"Failed to get snapshot status: {str(e)}") +def get_repository_for_snapshot(cluster: Cluster, snapshot: str, timeout: int = 300) -> Optional[str]: + url = f"/_snapshot/*/{snapshot}" + response = cluster.call_api(url, HttpMethod.GET, timeout=timeout) + logging.debug(f"Raw response: {response.text}") + response.raise_for_status() + + snapshot_data = response.json() + snapshots = snapshot_data.get('snapshots', []) + if not snapshots: + logging.debug(f"Snapshot {snapshot} not found in any repository") + return None + + return snapshots[0].get("repository") + + def format_date(millis: int) -> str: if millis == 0: return "N/A" @@ -291,10 +330,14 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str: ) -def get_snapshot_status_full(cluster: Cluster, snapshot: str, repository: str) -> CommandResult: +def get_snapshot_status_full(cluster: Cluster, snapshot: str, + repository: str = 'migration_assistant_repo', timeout: int = 300) -> CommandResult: try: + if repository == '*': + repository = get_repository_for_snapshot(cluster, snapshot, timeout=timeout) + path = f"/_snapshot/{repository}/{snapshot}" - response = cluster.call_api(path, HttpMethod.GET) + response = cluster.call_api(path, HttpMethod.GET, timeout=timeout) logging.debug(f"Raw get snapshot status response: {response.text}") response.raise_for_status() @@ -307,7 +350,7 @@ def get_snapshot_status_full(cluster: Cluster, snapshot: str, repository: str) - state = snapshot_info.get("state") path = f"/_snapshot/{repository}/{snapshot}/_status" - response = cluster.call_api(path, HttpMethod.GET) + response = cluster.call_api(path, HttpMethod.GET, timeout=timeout) logging.debug(f"Raw get snapshot status full response: {response.text}") response.raise_for_status() @@ -322,14 +365,20 @@ def get_snapshot_status_full(cluster: Cluster, snapshot: str, repository: str) - return CommandResult(success=False, value=f"Failed to get full snapshot status: {str(e)}") -def delete_snapshot(cluster: Cluster, snapshot_name: str, repository: str): +def delete_snapshot( + cluster: Cluster, + snapshot_name: str, + repository: str = 'migration_assistant_repo', + timeout: int = 1200 +): + repository = repository if repository != '*' else get_repository_for_snapshot(cluster, snapshot_name) path = f"/_snapshot/{repository}/{snapshot_name}" - response = cluster.call_api(path, HttpMethod.DELETE) + response = cluster.call_api(path, HttpMethod.DELETE, timeout=timeout) logging.debug(f"Raw delete snapshot status response: {response.text}") logger.info(f"Deleted snapshot: {snapshot_name} from repository '{repository}'.") -def delete_all_snapshots(cluster: Cluster, repository: str) -> None: +def delete_all_snapshots(cluster: Cluster, repository: str, timeout: int = 1200) -> None: logger.info(f"Clearing snapshots from repository '{repository}'") """ Clears all snapshots from the specified repository. @@ -341,7 +390,7 @@ def delete_all_snapshots(cluster: Cluster, repository: str) -> None: try: # List all snapshots in the repository snapshots_path = f"/_snapshot/{repository}/_all" - response = cluster.call_api(snapshots_path, raise_error=True) + response = cluster.call_api(snapshots_path, raise_error=True, timeout=timeout) logger.debug(f"Raw response: {response.json()}") snapshots = response.json().get("snapshots", []) logger.info(f"Found {len(snapshots)} snapshots in repository '{repository}'.") @@ -367,7 +416,7 @@ def delete_all_snapshots(cluster: Cluster, repository: str) -> None: raise e -def delete_snapshot_repo(cluster: Cluster, repository: str) -> None: +def delete_snapshot_repo(cluster: Cluster, repository: str, timeout: int = 1200) -> None: logger.info(f"Deleting repository '{repository}'") """ Delete repository. Should be empty before execution. @@ -378,7 +427,7 @@ def delete_snapshot_repo(cluster: Cluster, repository: str) -> None: """ try: delete_path = f"/_snapshot/{repository}" - response = cluster.call_api(delete_path, method=HttpMethod.DELETE, raise_error=True) + response = cluster.call_api(delete_path, method=HttpMethod.DELETE, raise_error=True, timeout=timeout) logging.debug(f"Raw delete snapshot repository status response: {response.text}") logger.info(f"Deleted repository: {repository}.") except Exception as e: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py index de79ca1ab1..07d88515c2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py @@ -45,7 +45,7 @@ def fs_snapshot(mock_cluster): @pytest.mark.parametrize("snapshot_fixture", ['s3_snapshot', 'fs_snapshot']) def test_snapshot_status(request, snapshot_fixture): snapshot = request.getfixturevalue(snapshot_fixture) - source_cluster = snapshot.source_cluster + source_cluster = snapshot.cluster mock_response = mock.Mock() mock_response.json.return_value = { "snapshots": [ @@ -64,14 +64,15 @@ def test_snapshot_status(request, snapshot_fixture): assert result.value == "SUCCESS" source_cluster.call_api.assert_called_once_with( f"/_snapshot/{snapshot.snapshot_repo_name}/{snapshot.snapshot_name}", - HttpMethod.GET + HttpMethod.GET, + timeout=300 ) @pytest.mark.parametrize("snapshot_fixture", ['s3_snapshot', 'fs_snapshot']) def test_snapshot_status_full(request, snapshot_fixture): snapshot = request.getfixturevalue(snapshot_fixture) - source_cluster = snapshot.source_cluster + source_cluster = snapshot.cluster mock_response = mock.Mock() mock_response.json.return_value = { "snapshots": [ @@ -124,7 +125,8 @@ def test_snapshot_status_full(request, snapshot_fixture): source_cluster.call_api.assert_called_with( f"/_snapshot/{snapshot.snapshot_repo_name}/{snapshot.snapshot_name}/_status", - HttpMethod.GET + HttpMethod.GET, + timeout=300 ) @@ -363,9 +365,9 @@ def test_s3_snapshot_create_fails_for_clusters_with_auth(mocker): mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", config["snapshot"]["snapshot_name"], '--snapshot-repo-name', snapshot.snapshot_repo_name, - "--source-host", snapshot.source_cluster.endpoint, - "--source-username", snapshot.source_cluster.auth_details.get("username"), - "--source-password", snapshot.source_cluster.get_basic_auth_password(), + "--source-host", snapshot.cluster.endpoint, + "--source-username", snapshot.cluster.auth_details.get("username"), + "--source-password", snapshot.cluster.get_basic_auth_password(), "--source-insecure", "--s3-repo-uri", config["snapshot"]["s3"]["repo_uri"], "--s3-region", config["snapshot"]["s3"]["aws_region"], @@ -391,9 +393,9 @@ def test_fs_snapshot_create_works_for_clusters_with_basic_auth(mocker): mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", config["snapshot"]["snapshot_name"], '--snapshot-repo-name', snapshot.snapshot_repo_name, - "--source-host", snapshot.source_cluster.endpoint, - "--source-username", snapshot.source_cluster.auth_details.get("username"), - "--source-password", snapshot.source_cluster.get_basic_auth_password(), + "--source-host", snapshot.cluster.endpoint, + "--source-username", snapshot.cluster.auth_details.get("username"), + "--source-password", snapshot.cluster.get_basic_auth_password(), "--source-insecure", "--file-system-repo-path", config["snapshot"]["fs"]["repo_path"], "--max-snapshot-rate-mb-per-node", str(max_snapshot_rate), @@ -422,7 +424,7 @@ def test_fs_snapshot_create_works_for_clusters_with_sigv4(mocker): mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", config["snapshot"]["snapshot_name"], '--snapshot-repo-name', snapshot.snapshot_repo_name, - "--source-host", snapshot.source_cluster.endpoint, + "--source-host", snapshot.cluster.endpoint, "--source-aws-service-signing-name", service_name, "--source-aws-region", signing_region, "--source-insecure", @@ -433,36 +435,37 @@ def test_fs_snapshot_create_works_for_clusters_with_sigv4(mocker): @pytest.mark.parametrize("snapshot_fixture", ['s3_snapshot', 'fs_snapshot']) def test_snapshot_delete(request, snapshot_fixture): snapshot = request.getfixturevalue(snapshot_fixture) - source_cluster = snapshot.source_cluster + source_cluster = snapshot.cluster snapshot.delete() source_cluster.call_api.assert_called_once() source_cluster.call_api.assert_called_with(f"/_snapshot/{snapshot.snapshot_repo_name}/{snapshot.snapshot_name}", - HttpMethod.DELETE) + HttpMethod.DELETE, timeout=1200) @pytest.mark.parametrize("snapshot_fixture", ['s3_snapshot', 'fs_snapshot']) def test_snapshot_delete_all_snapshots(request, snapshot_fixture): snapshot = request.getfixturevalue(snapshot_fixture) - source_cluster = snapshot.source_cluster + source_cluster = snapshot.cluster source_cluster.call_api.return_value.json = lambda: {"snapshots": [{"snapshot": "test_snapshot"}]} source_cluster.call_api.return_value.text = str({"snapshots": [{"snapshot": "test_snapshot"}]}) snapshot.delete_all_snapshots() source_cluster.call_api.assert_called() source_cluster.call_api.assert_has_calls([ - mock.call(f'/_snapshot/{snapshot.snapshot_repo_name}/_all', raise_error=True), - mock.call(f'/_snapshot/{snapshot.snapshot_repo_name}/test_snapshot', HttpMethod.DELETE), + mock.call(f'/_snapshot/{snapshot.snapshot_repo_name}/_all', raise_error=True, timeout=1200), + mock.call(f'/_snapshot/{snapshot.snapshot_repo_name}/test_snapshot', HttpMethod.DELETE, timeout=1200), ]) @pytest.mark.parametrize("snapshot_fixture", ['s3_snapshot', 'fs_snapshot']) def test_snapshot_delete_repo(request, snapshot_fixture): snapshot = request.getfixturevalue(snapshot_fixture) - source_cluster = snapshot.source_cluster + source_cluster = snapshot.cluster snapshot.delete_snapshot_repo() source_cluster.call_api.assert_called_once() source_cluster.call_api.assert_called_with(f"/_snapshot/{snapshot.snapshot_repo_name}", method=HttpMethod.DELETE, - raise_error=True) + raise_error=True, + timeout=1200) @pytest.mark.parametrize("snapshot_fixture", ['s3_snapshot', 'fs_snapshot']) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot_cleanup.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot_cleanup.py index 740d9a16eb..6766aad4f9 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot_cleanup.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot_cleanup.py @@ -69,7 +69,7 @@ def test_delete_all_snapshots_repository_missing(mock_cluster_with_missing_repo, }, } } - snapshot = S3Snapshot(config=config["snapshot"], source_cluster=mock_cluster_with_missing_repo) + snapshot = S3Snapshot(config=config["snapshot"], cluster=mock_cluster_with_missing_repo) with caplog.at_level(logging.INFO, logger='console_link.models.snapshot'): snapshot.delete_all_snapshots(cluster=mock_cluster_with_missing_repo, repository=snapshot.snapshot_repo_name) assert "Repository 'test-repo' is missing. Skipping snapshot clearing." in caplog.text @@ -86,7 +86,7 @@ def test_delete_all_snapshots_success(mock_cluster_with_snapshots, caplog): }, } } - snapshot = S3Snapshot(config=config["snapshot"], source_cluster=mock_cluster_with_snapshots) + snapshot = S3Snapshot(config=config["snapshot"], cluster=mock_cluster_with_snapshots) with caplog.at_level(logging.INFO, logger='console_link.models.snapshot'): snapshot.delete_all_snapshots(cluster=mock_cluster_with_snapshots, repository=snapshot.snapshot_repo_name) assert f"Deleted snapshot: snapshot_1 from repository '{snapshot.snapshot_repo_name}'." in caplog.text diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py index 5a39995814..76f42e65bf 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/conftest.py @@ -40,6 +40,31 @@ def pytest_addoption(parser): help="Specify the Migration ALB endpoint for the source capture proxy") parser.addoption("--target_proxy_alb_endpoint", action="store", default=None, help="Specify the Migration ALB endpoint for the target proxy") + parser.addoption("--num_shards", action="store", type=int, default=10, + help="Index setting for number of shards") + parser.addoption("--multiplication_factor", action="store", type=int, default=1000, + help="Transformer multiplication factor") + parser.addoption("--batch_count", action="store", type=int, default=3, + help="Number of bulk ingestion batches") + parser.addoption("--docs_per_batch", action="store", type=int, default=100, + help="Number of documents per batch for Bulk Ingest") + parser.addoption("--backfill_timeout_hours", action="store", type=int, default=45, + help="Timeout for backfill completion in hours") + parser.addoption("--transformation_directory", action="store", default="/shared-logs-output/test-transformations", + help="Directory for transformation files") + parser.addoption("--large_snapshot_s3_uri", action="store", + default="s3://test-large-snapshot-bucket/es56-snapshot/", + help="S3 URI for large snapshot") + parser.addoption("--deploy_region", action="store", default="us-east-1", + help="AWS region for deployment") + parser.addoption("--snapshot_region", action="store", default="us-east-1", + help="AWS region for taking the final snapshot") + parser.addoption("--large_snapshot_rate_mb_per_node", action="store", type=int, default=2000, + help="Rate for large snapshot creation") + parser.addoption("--rfs_workers", action="store", type=int, default=8, + help="Number of RFS workers to scale to") + parser.addoption("--cluster_version", action="store", default="es5x", + help="Cluster version: es5x, es6x, es7x, os1x, os2x") def pytest_configure(config): diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py index 238ffa5724..4105305bf8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/default_operations.py @@ -1,10 +1,10 @@ import datetime import logging -import os import random import string import json import time +from pathlib import Path from typing import Dict, List, Optional from unittest import TestCase from console_link.middleware.clusters import run_test_benchmarks @@ -28,6 +28,13 @@ def create_index(self, index_name: str, cluster: Cluster, **kwargs): return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}", headers=headers, **kwargs) + def create_custom_index(self, index_name: str, cluster: Cluster, body: dict = None, **kwargs): + headers = {'Content-Type': 'application/json'} + if body: + kwargs['data'] = json.dumps(body) + return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}", + headers=headers, **kwargs) + def get_index(self, index_name: str, cluster: Cluster, **kwargs): return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}", **kwargs) @@ -169,10 +176,9 @@ def generate_large_doc(self, size_mib): } def create_transformation_json_file(self, transform_config_data, file_path_to_create: str): - directory = os.path.dirname(file_path_to_create) - if directory: - os.makedirs(directory, exist_ok=True) - with open(file_path_to_create, "w") as file: + file_path = Path(file_path_to_create) + file_path.parent.mkdir(parents=True, exist_ok=True) + with file_path.open("w") as file: json.dump(transform_config_data, file, indent=4) def convert_transformations_to_str(self, transform_list: List[Dict]) -> str: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/document_multiplier.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/document_multiplier.py new file mode 100644 index 0000000000..4124139185 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/document_multiplier.py @@ -0,0 +1,981 @@ +import logging +import pytest +import unittest +import json +from console_link.middleware.clusters import clear_indices +from console_link.models.cluster import Cluster, HttpMethod +from console_link.models.backfill_base import Backfill +from console_link.models.command_result import CommandResult +from console_link.models.snapshot import Snapshot, delete_snapshot_repo, delete_all_snapshots +from console_link.cli import Context +from console_link.models.snapshot import S3Snapshot +from console_link.models.backfill_rfs import RfsWorkersInProgress +from console_link.models.command_runner import CommandRunner, CommandRunnerError +from .default_operations import DefaultOperationsLibrary +from .common_utils import execute_api_call +from datetime import datetime +import time +import shutil +import os + + +# Constants +PILOT_INDEX = "pilot_index" # Name of the index used for testing + +logger = logging.getLogger(__name__) +ops = DefaultOperationsLibrary() + + +# Test configuration from pytest options +@pytest.fixture(scope="session") +def test_config(request): + """Fixture to provide test configuration at class level""" + return { + 'NUM_SHARDS': request.config.getoption("--num_shards"), + 'MULTIPLICATION_FACTOR': request.config.getoption("--multiplication_factor"), + 'BATCH_COUNT': request.config.getoption("--batch_count"), + 'DOCS_PER_BATCH': request.config.getoption("--docs_per_batch"), + 'BACKFILL_TIMEOUT_HOURS': request.config.getoption("--backfill_timeout_hours"), + 'TRANSFORMATION_DIRECTORY': request.config.getoption("--transformation_directory"), + 'LARGE_SNAPSHOT_S3_URI': request.config.getoption("--large_snapshot_s3_uri"), + # 'DEPLOY_REGION': request.config.getoption("--deploy_region"), + 'SNAPSHOT_REGION': request.config.getoption("--snapshot_region"), + 'LARGE_SNAPSHOT_RATE_MB_PER_NODE': request.config.getoption("--large_snapshot_rate_mb_per_node"), + 'RFS_WORKERS': request.config.getoption("--rfs_workers"), + # 'STAGE': request.config.getoption("--stage"), + 'CLUSTER_VERSION': request.config.getoption("--cluster_version") + } + + +def preload_data_cluster_es56(target_cluster: Cluster, test_config): + config = test_config + # Create source index with settings for ES 5.6 and ES 6.8 + index_settings_es56 = { + "settings": { + "number_of_shards": str(config['NUM_SHARDS']), + "number_of_replicas": "0" + }, + "mappings": { + "doc": { + "properties": { + "timestamp": {"type": "date"}, + "value": {"type": "keyword"}, + "doc_number": {"type": "integer"}, + "description": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}}, + "metadata": { + "properties": { + "tags": {"type": "keyword"}, + "category": {"type": "keyword"}, + "subcategories": {"type": "keyword"}, + "attributes": {"type": "keyword"}, + "status": {"type": "keyword"}, + "version": {"type": "keyword"}, + "region": {"type": "keyword"}, + "details": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}} + } + }, + "content": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}}, + "additional_info": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}} + } + } + } + } + + logger.info("Creating index %s with settings: %s", PILOT_INDEX, index_settings_es56) + ops.create_custom_index(cluster=target_cluster, index_name=PILOT_INDEX, data=json.dumps(index_settings_es56)) + + # Create documents with timestamp in bulk + for j in range(config['BATCH_COUNT']): + bulk_data = [] + for i in range(config['DOCS_PER_BATCH']): + doc_id = f"doc_{j}_{i}" + bulk_data.extend([ + {"index": {"_index": PILOT_INDEX, "_type": "doc", "_id": doc_id}}, + { + "timestamp": datetime.now().isoformat(), + "value": f"test_value_{i}", + "doc_number": i, + "description": ( + f"This is a detailed description for document {doc_id} " + "containing information about the test data and its purpose " + "in the large snapshot creation process." + ), + "metadata": { + "tags": [f"tag1_{i}", f"tag2_{i}", f"tag3_{i}"], + "category": f"category_{i % 10}", + "subcategories": [f"subcat1_{i % 5}", f"subcat2_{i % 5}"], + "attributes": [f"attr1_{i % 8}", f"attr2_{i % 8}"], + "status": f"status_{i % 6}", + "version": f"1.{i % 10}.{i % 5}", + "region": f"region_{i % 12}", + "details": f"Detailed metadata information for document {doc_id} including test parameters." + }, + "content": ( + f"Main content for document {doc_id}. This section contains the primary information " + "and data relevant to the testing process. The content is designed to create minimal " + "document for migration and multiplication." + ), + "additional_info": ( + f"Supplementary information for document {doc_id} " + "providing extra context and details about the test data." + ) + } + ]) + + # Bulk index documents + execute_api_call( + cluster=target_cluster, + method=HttpMethod.POST, + path="/_bulk", + data="\n".join(json.dumps(d) for d in bulk_data) + "\n", + headers={"Content-Type": "application/x-ndjson"} + ) + + +def preload_data_cluster_es710(target_cluster: Cluster, test_config): + config = test_config + # Create source index with settings for ES 7.10 + index_settings_es710 = { + "settings": { + "number_of_shards": str(config['NUM_SHARDS']), + "number_of_replicas": "0" + }, + "mappings": { + "properties": { + "timestamp": {"type": "date"}, + "value": {"type": "keyword"}, + "doc_number": {"type": "integer"}, + "description": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}}, + "metadata": { + "properties": { + "tags": {"type": "keyword"}, + "category": {"type": "keyword"}, + "subcategories": {"type": "keyword"}, + "attributes": {"type": "keyword"}, + "status": {"type": "keyword"}, + "version": {"type": "keyword"}, + "region": {"type": "keyword"}, + "details": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}} + } + }, + "content": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}}, + "additional_info": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}} + } + } + } + + logger.info("Creating index %s with settings: %s", PILOT_INDEX, index_settings_es710) + ops.create_custom_index(cluster=target_cluster, index_name=PILOT_INDEX, body=index_settings_es710) + + # Create documents with timestamp in bulk for ES 7.10 + for j in range(config['BATCH_COUNT']): + bulk_data = [] + for i in range(config['DOCS_PER_BATCH']): + doc_id = f"doc_{j}_{i}" + bulk_data.extend([ + {"index": {"_index": PILOT_INDEX, "_id": doc_id}}, + { + "timestamp": datetime.now().isoformat(), + "value": f"test_value_{i}", + "doc_number": i, + "description": ( + f"This is a detailed description for document {doc_id} " + "containing information about the test data and its purpose " + "in the large snapshot creation process." + ), + "metadata": { + "tags": [f"tag1_{i}", f"tag2_{i}", f"tag3_{i}"], + "category": f"category_{i % 10}", + "subcategories": [f"subcat1_{i % 5}", f"subcat2_{i % 5}"], + "attributes": [f"attr1_{i % 8}", f"attr2_{i % 8}"], + "status": f"status_{i % 6}", + "version": f"1.{i % 10}.{i % 5}", + "region": f"region_{i % 12}", + "details": f"Detailed metadata information for document {doc_id} including test parameters." + }, + "content": ( + f"Main content for document {doc_id}. This section contains the primary information " + "and data relevant to the testing process. The content is designed to create minimal " + "document for migration and multiplication." + ), + "additional_info": ( + f"Supplementary information for document {doc_id} " + "providing extra context and details about the test data." + ) + } + ]) + + # Bulk index documents + execute_api_call( + cluster=target_cluster, + method=HttpMethod.POST, + path="/_bulk", + data="\n".join(json.dumps(d) for d in bulk_data) + "\n", + headers={"Content-Type": "application/x-ndjson"} + ) + + +def preload_data_cluster_os217(target_cluster: Cluster, test_config): + config = test_config + # Index settings for OpenSearch 2.17 + index_settings_os217 = { + "settings": { + "number_of_shards": str(config['NUM_SHARDS']), + "number_of_replicas": "0" + }, + "mappings": { + "properties": { + "timestamp": {"type": "date"}, + "value": {"type": "keyword"}, + "doc_number": {"type": "integer"}, + "description": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} + }, + "metadata": { + "properties": { + "tags": {"type": "keyword"}, + "category": {"type": "keyword"}, + "subcategories": {"type": "keyword"}, + "attributes": {"type": "keyword"}, + "status": {"type": "keyword"}, + "version": {"type": "keyword"}, + "region": {"type": "keyword"}, + "details": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} + } + } + }, + "content": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} + }, + "additional_info": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} + } + } + } + } + + logger.info("Creating index %s with settings: %s", PILOT_INDEX, index_settings_os217) + ops.create_custom_index(cluster=target_cluster, index_name=PILOT_INDEX, body=index_settings_os217) + + # Create documents with timestamp in bulk for OS 2.17 + for j in range(config['BATCH_COUNT']): + bulk_data = [] + for i in range(config['DOCS_PER_BATCH']): + doc_id = f"doc_{j}_{i}" + bulk_data.extend([ + {"index": {"_index": PILOT_INDEX, "_id": doc_id}}, + { + "timestamp": datetime.now().isoformat(), + "value": f"test_value_{i}", + "doc_number": i, + "description": ( + f"This is a detailed description for document {doc_id} " + "containing information about the test data and its purpose " + "in the large snapshot creation process." + ), + "metadata": { + "tags": [f"tag1_{i}", f"tag2_{i}", f"tag3_{i}"], + "category": f"category_{i % 10}", + "subcategories": [f"subcat1_{i % 5}", f"subcat2_{i % 5}"], + "attributes": [f"attr1_{i % 8}", f"attr2_{i % 8}"], + "status": f"status_{i % 6}", + "version": f"1.{i % 10}.{i % 5}", + "region": f"region_{i % 12}", + "details": f"Detailed metadata information for document {doc_id} including test parameters." + }, + "content": ( + f"Main content for document {doc_id}. This section contains the primary information " + "and data relevant to the testing process. The content is designed to create minimal " + "document for migration and multiplication." + ), + "additional_info": ( + f"Supplementary information for document {doc_id} " + "providing extra context and details about the test data." + ) + } + ]) + + # Bulk index documents + execute_api_call( + cluster=target_cluster, + method=HttpMethod.POST, + path="/_bulk", + data="\n".join(json.dumps(d) for d in bulk_data) + "\n", + headers={"Content-Type": "application/x-ndjson"} + ) + + +def setup_test_environment(target_cluster: Cluster, test_config): + """Setup test data""" + # If target_cluster is None, we'll need to get the target cluster from environment + if target_cluster is None: + logger.info("Target cluster is None, using target cluster from environment instead") + config_path = "/config/migration_services.yaml" + env = Context(config_path).env + target_cluster = env.target_cluster + + if target_cluster is None: + raise Exception("Target cluster is not available") + + logger.info(f"Using cluster endpoint: {target_cluster.endpoint}") + logger.info(f"Target Cluster Auth Type: {pytest.console_env.target_cluster.auth_type}") + logger.info(f"Target Cluster Auth Details: {pytest.console_env.target_cluster.auth_details}") + + # Clear indices + logger.info("Clearing indices in the target cluster...") + try: + response = clear_indices(target_cluster) + logger.info(f"Indices cleared successfully. Response: {response}") + except Exception as e: + logger.warning(f"Failed to clear indices: {str(e)}. Continuing with test...") + + # Clear snapshots from the repository + repository = "migration_assistant_repo" + logger.info(f"Clearing all snapshots from repository '{repository}'...") + try: + delete_all_snapshots(target_cluster, repository) + logger.info(f"Successfully cleared all snapshots from '{repository}'.") + except Exception as e: + logger.warning(f"Failed to clear snapshots from '{repository}': {str(e)}") + + # Delete the repository itself + logger.info(f"Deleting snapshot repository '{repository}'...") + try: + delete_snapshot_repo(target_cluster, repository) + logger.info(f"Successfully deleted repository '{repository}'.") + except Exception as e: + logger.warning(f"Failed to delete repository '{repository}': {str(e)}") + + # Cleanup generated transformation files + try: + shutil.rmtree(test_config['TRANSFORMATION_DIRECTORY']) + logger.info("Removed existing " + test_config['TRANSFORMATION_DIRECTORY'] + " directory") + except FileNotFoundError: + logger.info("No transformation files detected to cleanup") + + # Transformer structure + config = test_config + multiplication_factor = str(config['MULTIPLICATION_FACTOR']) + initialization_script = ( + f"const MULTIPLICATION_FACTOR = {multiplication_factor}; " + "function transform(document) { " + "if (!document) { throw new Error(\"No source_document was defined - nothing to transform!\"); } " + "const indexCommandMap = document.get(\"index\"); " + "const originalSource = document.get(\"source\"); " + "const docsToCreate = []; " + "for (let i = 0; i < MULTIPLICATION_FACTOR; i++) { " + "const newIndexMap = new Map(indexCommandMap); " + "const newId = newIndexMap.get(\"_id\") + ((i !== 0) ? `_${i}` : \"\"); " + "newIndexMap.set(\"_id\", newId); " + "docsToCreate.push(new Map([[\"index\", newIndexMap], [\"source\", originalSource]])); " + "} " + "return docsToCreate; " + "} " + "function main(context) { " + "console.log(\"Context: \", JSON.stringify(context, null, 2)); " + "return (document) => { " + "if (Array.isArray(document)) { " + "return document.flatMap((item) => transform(item, context)); " + "} " + "return transform(document); " + "}; " + "} " + "(() => main)();" + ) + transform_config = { + "JsonJSTransformerProvider": { + "initializationScript": initialization_script, + "bindingsObject": "{}" + } + } + ops.create_transformation_json_file( + [transform_config], + os.path.join(config['TRANSFORMATION_DIRECTORY'], "transformation.json") + ) + + # Select appropriate preload function based on cluster version + cluster_version = config['CLUSTER_VERSION'] + logger.info(f"Using cluster version: {cluster_version}") + + if cluster_version == "es5x" or cluster_version == "es6x": + logger.info("Using ES5.x/ES6.x preload function") + preload_data_cluster_es56(target_cluster, test_config) + elif cluster_version == "es7x": + logger.info("Using ES7.x preload function") + preload_data_cluster_es710(target_cluster, test_config) + elif cluster_version == "os1x": + logger.info("Using OpenSearch 1.x preload function") + # OpenSearch 1.x is compatible with ES7.x API + preload_data_cluster_es710(target_cluster, test_config) + elif cluster_version == "os2x": + logger.info("Using OpenSearch 2.x preload function") + preload_data_cluster_os217(target_cluster, test_config) + else: + logger.warning(f"Unknown cluster version '{cluster_version}', defaulting to ES5.x/ES6.x") + preload_data_cluster_es56(target_cluster, test_config) + + # Refresh indices before creating initial snapshot + execute_api_call( + cluster=target_cluster, + method=HttpMethod.POST, + path="/_refresh" + ) + logger.info( + f"Created {config['BATCH_COUNT'] * config['DOCS_PER_BATCH']} documents " + f"in bulk in index %s", + PILOT_INDEX + ) + + +@pytest.fixture(scope="class") +def setup_backfill(test_config, request): + """Test setup with backfill lifecycle management""" + config_path = request.config.getoption("--config_file_path") + unique_id = request.config.getoption("--unique_id") + # Log config file path + logger.info(f"Using config file: {config_path}") + + # Try to read the config file directly + try: + with open(config_path, 'r') as f: + config_content = f.read() + logger.info(f"Config file content:\n{config_content}") + except Exception as e: + logger.error(f"Failed to read config file: {str(e)}") + + # Load environment + env = Context(config_path).env + pytest.console_env = env + pytest.unique_id = unique_id + + # Log target cluster details + if env.target_cluster: + logger.info("Setting Target cluster auth type from default NO_AUTH to be SIGV4_AUTH for Multiplication test") + logger.info(f"Target cluster endpoint: {env.target_cluster.endpoint}") + logger.info(f"Target cluster auth type: {env.target_cluster.auth_type}") + if hasattr(env.target_cluster, 'auth_details'): + logger.info(f"Target cluster auth details: {env.target_cluster.auth_details}") + + # Try to get version info + try: + version_info = execute_api_call( + cluster=env.target_cluster, + method=HttpMethod.GET, + path="/" + ).json() + logger.info(f"Target cluster version: {version_info.get('version', {}).get('number', 'unknown')}") + except Exception as e: + logger.error(f"Failed to get cluster version: {str(e)}") + + # Try to get indices + try: + indices = execute_api_call( + cluster=env.target_cluster, + method=HttpMethod.GET, + path="/_cat/indices?format=json" + ).json() + logger.info(f"Target cluster indices: {indices}") + except Exception as e: + logger.error(f"Failed to get indices: {str(e)}") + else: + logger.warning("Target cluster is not configured!") + + # Preload data on pilot index + setup_test_environment(target_cluster=pytest.console_env.target_cluster, test_config=test_config) + + # Get components + backfill: Backfill = pytest.console_env.backfill + logger.info(f"Backfill object: {backfill}") + logger.info(f"Backfill type: {type(backfill)}") + assert backfill is not None + + snapshot: Snapshot = pytest.console_env.snapshot + logger.info(f"Snapshot object: {snapshot}") + logger.info(f"Snapshot type: {type(snapshot)}") + assert snapshot is not None + + # Initialize backfill first (creates .migrations_working_state) + try: + backfill_create_result: CommandResult = backfill.create() + logger.info(f"Backfill create result: {backfill_create_result}") + assert backfill_create_result.success + logger.info("Backfill initialized successfully. Created working state at %s", backfill_create_result.value) + except Exception as e: + logger.error(f"Failed to create backfill: {str(e)}") + raise + + # Create initial RFS snapshot and wait for completion + try: + # Log snapshot settings + logger.info(f"Snapshot settings: {snapshot.__dict__}") + + # Try to create snapshot + logger.info("Creating snapshot...") + snapshot_result: CommandResult = snapshot.create(wait=True) + logger.info(f"Snapshot create result: {snapshot_result}") + + # Check result + if not snapshot_result.success: + logger.error(f"Snapshot creation failed with error: {snapshot_result.error}") + logger.error(f"Snapshot creation output: {snapshot_result.output}") + + assert snapshot_result.success + logger.info("Snapshot creation completed successfully") + except Exception as e: + logger.error(f"Failed to create snapshot: {str(e)}") + import traceback + logger.error(f"Snapshot creation error traceback: {traceback.format_exc()}") + raise + + yield + + # Cleanup - stop backfill + logger.info("Cleaning up test environment...") + try: + backfill.stop() + logger.info("Backfill stopped and snapshots cleaned up.") + except Exception as e: + logger.error(f"Error during cleanup: {str(e)}") + + +@pytest.fixture(scope="session", autouse=True) +def setup_environment(request): + """Initialize test environment""" + config_path = request.config.getoption("--config_file_path") + unique_id = request.config.getoption("--unique_id") + pytest.console_env = Context(config_path).env + pytest.unique_id = unique_id + logger.info(f"Target Cluster Auth Type: {pytest.console_env.target_cluster.auth_type}") + logger.info(f"Target Cluster Auth Details: {pytest.console_env.target_cluster.auth_details}") + logger.info("Starting tests...") + yield + # Note: Individual tests handle their own cleanup + logger.info("Test environment teardown complete") + + +@pytest.mark.usefixtures("setup_backfill", "test_config") +class BackfillTest(unittest.TestCase): + """Test backfill functionality""" + + @pytest.fixture(autouse=True) + def setup_test(self, test_config, request): + """Setup test configuration before each test method""" + self.config = test_config + self.request = request + + def wait_for_backfill_completion(self, cluster: Cluster, pilot_index: str, timeout_hours: int = None): + """Wait until document count stabilizes or bulk-loader pods terminate""" + + def _calculate_expected_doc_count(): + return int( + self.config['BATCH_COUNT'] * + self.config['DOCS_PER_BATCH'] * + self.config['MULTIPLICATION_FACTOR'] + ) + + expected_doc_count = _calculate_expected_doc_count() + previous_count = 0 + stable_count = 0 + required_stable_checks = 3 # Need 3 consecutive stable counts at EXPECTED_TOTAL_TARGET_DOCS + start_time = time.time() + timeout_seconds = timeout_hours * 3600 if timeout_hours else self.config['BACKFILL_TIMEOUT_HOURS'] * 3600 + stuck_count = 0 + + while True: + if time.time() - start_time > timeout_seconds: + raise TimeoutError( + f"Backfill monitoring timed out after " + f"{timeout_hours if timeout_hours else self.config['BACKFILL_TIMEOUT_HOURS']} " + f"hours. Last count: {previous_count:,}" + ) + + cluster_response = execute_api_call( + cluster=cluster, + method=HttpMethod.GET, + path=f"/{pilot_index}/_count?format=json" + ) + current_count = cluster_response.json()['count'] + + # Get bulk loader pod status + try: + bulk_loader_pods = execute_api_call( + cluster=cluster, + method=HttpMethod.GET, + path="/_cat/tasks?detailed", + headers={"Accept": "application/json"} + ).json() + bulk_loader_active = any( + task.get('action', '').startswith('indices:data/write/bulk') + for task in bulk_loader_pods + ) + except Exception as e: + logger.warning(f"Failed to check bulk loader status: {e}") + bulk_loader_active = True # Assume active if we can't check + + elapsed_hours = (time.time() - start_time) / 3600 + progress = ((current_count / expected_doc_count) * 100) + logger.info(f"Backfill Progress - {elapsed_hours:.2f} hours elapsed:") + logger.info(f"- Current doc count: {current_count:,}") + logger.info(f"- Expected doc count: {expected_doc_count:,}") + logger.info(f"- Progress: {progress:.2f}%") + logger.info(f"- Bulk loader active: {bulk_loader_active}") + + # Don't consider it stable if count is 0 and bulk loader is still active + if current_count == 0 and bulk_loader_active: + logger.info("Waiting for documents to start appearing...") + stable_count = 0 + stuck_count = 0 + # Only consider it stable if count matches previous and is non-zero + elif current_count == expected_doc_count: + stable_count += 1 + logger.info( + f"Count stable at target {expected_doc_count:,} " + f"for {stable_count}/{required_stable_checks} checks" + ) + if stable_count >= required_stable_checks: + logger.info( + f"Document count reached value {current_count:,} and " + f"stabilized for {required_stable_checks} consecutive checks" + ) + return + # If count is less than expected and not zero, check for stuck condition + elif 0 < current_count < expected_doc_count: + if current_count == previous_count: + stuck_count += 1 + logger.warning(f"Count has been stuck at {current_count:,} for {stuck_count}/60 checks") + if stuck_count >= 60: + raise SystemExit( + f"Document count has been stuck at {current_count:,} for too long. " + "Possible issue with backfill." + ) + else: + stuck_count = 0 + + if current_count != previous_count: + logger.info(f"Count changed from {previous_count:,} to {current_count:,}") + stable_count = 0 + + previous_count = current_count + time.sleep(30) + + def wait_for_working_state_archive(self, backfill, max_retries=30, retry_interval=10): + """Wait for the working state to be properly archived before proceeding.""" + logger.info("Archiving the working state of the backfill operation...") + retries = 0 + archive_success = False + index_deleted = False + last_check_time = 0 + + while retries < max_retries and (not archive_success or not index_deleted): + current_time = time.time() + + # Check for archive status + if not archive_success: + archive_result = backfill.archive() + + # First wait for RFS workers to complete + if isinstance(archive_result.value, RfsWorkersInProgress): + logger.info("RFS Workers are still running, waiting for them to complete...") + time.sleep(5) # Keep original 5 second wait for RFS workers + continue + + # Then check for successful archive + if isinstance(archive_result.value, str) and "working_state_backup" in archive_result.value: + logger.info(f"Working state archived to: {archive_result.value}") + archive_success = True + index_deleted = True # Archive success means index was deleted + break + + # Only check index if archive wasn't successful + # If checking, check every 5 seconds + if not archive_success and not index_deleted and (current_time - last_check_time) >= 5: + try: + response = execute_api_call( + cluster=pytest.console_env.target_cluster, + method=HttpMethod.GET, + path="/_cat/indices/.migrations_working_state?format=json" + ) + + if response.status_code == 404: + logger.info("Migrations working state index has been deleted (404 response)") + index_deleted = True + break + elif response.status_code == 200 and len(response.json()) == 0: + logger.info("Migrations working state index has been deleted (empty response)") + index_deleted = True + break + else: + logger.info("Waiting for migrations working state index to be deleted...") + except Exception as e: + # Check if the error is a 404 response + if '"status":404' in str(e) or 'index_not_found_exception' in str(e): + logger.info("Migrations working state index has been deleted (404 exception)") + index_deleted = True + break + else: + logger.warning(f"Error checking index status: {e}") + + last_check_time = current_time + + if not archive_success or not index_deleted: + time.sleep(retry_interval) + retries += 1 + + if retries >= max_retries: + logger.warning( + f"Timeout after {max_retries * retry_interval} seconds. " + f"Archive success result: {archive_success}, " + f"Index deletion result: {index_deleted}" + ) + return archive_result if archive_success else None + + def test_data_multiplication(self): + """Monitor backfill progress and report final stats""" + source = pytest.console_env.target_cluster + index_name = PILOT_INDEX + backfill = pytest.console_env.backfill + + logger.info("\n" + "=" * 50) + logger.info("Starting Document Multiplication Test") + logger.info("=" * 50) + + # Initial index stats + initial_doc_count, initial_index_size = self.get_cluster_stats(source, index_name) + logger.info("\n=== Initial Source Cluster Stats ===") + logger.info(f"Source Index: {index_name}") + logger.info(f"Documents: {initial_doc_count:,}") + logger.info(f"Index Size: {initial_index_size:.2f} MB") + + # Start backfill + logger.info("\n=== Starting Backfill Process ===") + logger.info(f"Expected Document Multiplication Factor: {self.config['MULTIPLICATION_FACTOR']}") + expected_final_doc_count = ( + self.config['BATCH_COUNT'] * + self.config['DOCS_PER_BATCH'] * + self.config['MULTIPLICATION_FACTOR'] + ) + logger.info(f"Expected Final Document Count: {expected_final_doc_count:,}") + logger.info("Starting backfill...") + backfill_start_result: CommandResult = backfill.start() + assert backfill_start_result.success, f"Failed to start backfill: {backfill_start_result.error}" + + # Scale backfill workers + logger.info("Scaling backfill...") + rfs_workers = self.config['RFS_WORKERS'] + logger.info(f"Scaling to {rfs_workers} RFS workers") + backfill_scale_result: CommandResult = backfill.scale(units=rfs_workers) + assert backfill_scale_result.success, f"Failed to scale backfill: {backfill_scale_result.error}" + + # Wait for backfill to complete + logger.info("\n=== Monitoring Backfill Progress ===") + self.wait_for_backfill_completion(source, index_name) + + # Get final stats + logger.info("\n=== Final Cluster Stats ===") + final_doc_count, final_index_size = self.get_cluster_stats(source, index_name) + + logger.info("\nInitial Cluster Stats:") + logger.info(f"- Index: {index_name}") + logger.info(f"- Total Documents: {initial_doc_count:,}") + logger.info(f"- Total Size: {initial_index_size:.2f} MB") + + logger.info("\nFinal Cluster Stats:") + logger.info(f"- Index: {index_name}") + logger.info(f"- Total Documents: {final_doc_count:,}") + logger.info(f"- Total Size: {final_index_size:.2f} MB") + logger.info(f"- Multiplication Factor Achieved: {final_doc_count/initial_doc_count:.2f}x") + + # Assert that documents were actually migrated + assert final_doc_count > 0, "No documents were migrated to target index" + calculate_final_doc_count = ( + self.config['BATCH_COUNT'] * + self.config['DOCS_PER_BATCH'] * + self.config['MULTIPLICATION_FACTOR'] + ) + assert final_doc_count == calculate_final_doc_count, ( + f"Document count mismatch: source={initial_doc_count}, target={final_doc_count}" + ) + + # Stop backfill + logger.info("\n=== Stopping Backfill ===") + stop_result = backfill.stop() + assert stop_result.success, f"Failed to stop backfill: {stop_result.error}" + logger.info("Backfill stopped successfully") + + # Archive working state + archive_result = self.wait_for_working_state_archive(backfill) + if archive_result and archive_result.success: + logger.info("Backfill archive completed successfully") + else: + logger.warning( + "Could not fully verify backfill archive completion. " + "Proceeding with incomplete backfill stop." + ) + + # Setup S3 Bucket + snapshot: Snapshot = pytest.console_env.snapshot + assert snapshot is not None + migrationAssistant_deployTimeRole = snapshot.config['s3']['role'] + # Extract account ID from the role ARN + account_number = migrationAssistant_deployTimeRole.split(':')[4] + snapshot_region = self.config['SNAPSHOT_REGION'] + updated_s3_uri = self.setup_s3_bucket(account_number, snapshot_region, self.config) + logger.info(f"Updated S3 URI: {updated_s3_uri}") + + # Delete the existing snapshot and snapshot repo from the cluster + max_retries = 5 + retry_interval = 10 + + for attempt in range(max_retries): + try: + snapshot.delete() + snapshot.delete_snapshot_repo() + logger.info("Successfully deleted existing snapshot and repository") + break + except Exception as e: + if attempt < max_retries - 1: + logger.warning(f"Attempt {attempt + 1}/{max_retries} to delete snapshot failed: {str(e)}") + # Run aws configure to refresh credentials + aws_cmd = CommandRunner( + command_root="aws", + command_args={ + "__positional__": ["configure", "list"], + "--profile": "default" + } + ) + aws_cmd.run() + time.sleep(retry_interval) + else: + logger.error(f"Failed to delete snapshot after {max_retries} attempts: {str(e)}") + raise + + # Create final snapshot + logger.info("\n=== Creating Final Snapshot ===") + large_snapshot_role = f"arn:aws:iam::{account_number}:role/LargeSnapshotAccessRole" + snapshot_name = 'large-snapshot' + snapshot_repo = 'migration_assistant_repo' + endpoint = f"s3.{snapshot_region}.amazonaws.com" + + # Print all parameters for better logging and debugging + logger.info("Snapshot Parameters:") + logger.info(f" - Snapshot Name: {snapshot_name}") + logger.info(f" - Repository Name: {snapshot_repo}") + logger.info(f" - S3 Bucket: {updated_s3_uri.split('/')[2]}") + logger.info(f" - S3 URI: {updated_s3_uri}") + logger.info(f" - S3 Region: {snapshot_region}") + logger.info(f" - S3 Endpoint: {endpoint}") + logger.info(f" - IAM Role: {large_snapshot_role}") + logger.info(f" - Max Snapshot Rate (MB/Node): {self.config['LARGE_SNAPSHOT_RATE_MB_PER_NODE']}") + + final_snapshot_config = { + 'snapshot_name': snapshot_name, + 's3': { + 'repo_uri': updated_s3_uri, + 'role': large_snapshot_role, + 'aws_region': snapshot_region, + 'endpoint': endpoint + } + } + final_snapshot = S3Snapshot(final_snapshot_config, pytest.console_env.target_cluster) + final_snapshot_result: CommandResult = final_snapshot.create( + wait=True, + max_snapshot_rate_mb_per_node=self.config['LARGE_SNAPSHOT_RATE_MB_PER_NODE'] + ) + assert final_snapshot_result.success, f"Failed to create final snapshot: {final_snapshot_result.error}" + logger.info("Final Snapshot after migration and multiplication was created successfully") + + # Add detailed success information for easier snapshot retrieval + logger.info("\n=== Final Snapshot Details ===") + logger.info("Snapshot successfully stored at:") + logger.info(f" - S3 Bucket: {updated_s3_uri.split('/')[2]}") + logger.info(f" - S3 Path: {updated_s3_uri}") + logger.info(f" - Snapshot Name: {snapshot_name}") + logger.info(f" - Repository Name: {snapshot_repo}") + logger.info(f" - Region: {snapshot_region}") + logger.info(f"To retrieve this snapshot, use the S3 URI: {updated_s3_uri}") + + logger.info("\n=== Test Completed Successfully ===") + logger.info("Document multiplication verified with correct count") + + def get_cluster_stats(self, cluster: Cluster, pilot_index: str = None): + """Get document count and size stats for a cluster (primary shards only)""" + try: + if pilot_index: + path = f"/{pilot_index}/_stats" + else: + path = "/_stats" + + stats = execute_api_call(cluster=cluster, method=HttpMethod.GET, path=path).json() + total_docs = stats['_all']['primaries']['docs']['count'] + total_size_bytes = stats['_all']['primaries']['store']['size_in_bytes'] + total_size_mb = total_size_bytes / (1024 * 1024) + + return total_docs, total_size_mb + except Exception as e: + logger.error(f"Error getting cluster stats: {str(e)}") + return 0, 0 + + def setup_s3_bucket(self, account_number: str, snapshot_region: str, test_config): + """Check and create S3 bucket to store large snapshot""" + cluster_version = test_config['CLUSTER_VERSION'] + bucket_name = f"migration-jenkins-snapshot-{account_number}-{snapshot_region}" + snapshot_folder = f"large-snapshot-{cluster_version}" + + # Check if bucket exists + logger.info(f"Checking if S3 bucket {bucket_name} exists in region {snapshot_region}...") + check_bucket_cmd = CommandRunner( + command_root="aws", + command_args={ + "__positional__": ["s3api", "head-bucket"], + "--bucket": bucket_name, + "--region": snapshot_region + } + ) + + try: + check_result = check_bucket_cmd.run() + bucket_exists = check_result.success + except CommandRunnerError: + bucket_exists = False + + if bucket_exists: + logger.info(f"S3 bucket {bucket_name} already exists.") + logger.info("\n=== Cleaning up S3 bucket contents ===") + s3_cleanup_cmd = CommandRunner( + command_root="aws", + command_args={ + "__positional__": ["s3", "rm", f"s3://{bucket_name}/{snapshot_folder}/"], + "--recursive": None, + "--region": snapshot_region + } + ) + cleanup_result = s3_cleanup_cmd.run() + assert cleanup_result.success, f"Failed to clean up S3 bucket: {cleanup_result.display()}" + logger.info("Successfully cleaned up S3 bucket contents") + else: + logger.info(f"S3 bucket {bucket_name} does not exist. Creating it...") + logger.info("\n=== Creating new S3 bucket as it does not exist ===") + create_args = { + "__positional__": ["s3api", "create-bucket"], + "--bucket": bucket_name, + "--region": snapshot_region, + } + + # Only add LocationConstraint for non-us-east-1 regions + if snapshot_region != "us-east-1": + create_args["--create-bucket-configuration"] = f"LocationConstraint={snapshot_region}" + + create_bucket_cmd = CommandRunner( + command_root="aws", + command_args=create_args + ) + create_result = create_bucket_cmd.run() + assert create_result.success, f"Failed to create S3 bucket: {create_result.display()}" + logger.info(f"S3 bucket {bucket_name} created successfully.") + + return f"s3://{bucket_name}/{snapshot_folder}/" diff --git a/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts b/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts index 58cc35bde4..7b2df43261 100644 --- a/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts +++ b/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts @@ -202,13 +202,13 @@ export function createSnapshotOnAOSRole(scope: Construct, artifactS3Arn: string, snapshotRole.addToPolicy(new PolicyStatement({ effect: Effect.ALLOW, actions: ['s3:ListBucket'], - resources: [artifactS3Arn], + resources: ["*"], })); snapshotRole.addToPolicy(new PolicyStatement({ effect: Effect.ALLOW, actions: ['s3:GetObject', 's3:PutObject', 's3:DeleteObject'], - resources: [`${artifactS3Arn}/*`], + resources: [`*`], })); // The Migration Console Role needs to be able to pass the snapshot role as well as any other role diff --git a/deployment/cdk/opensearch-service-migration/lib/network-stack.ts b/deployment/cdk/opensearch-service-migration/lib/network-stack.ts index 6a92b9ff26..baf54c7f5c 100644 --- a/deployment/cdk/opensearch-service-migration/lib/network-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/network-stack.ts @@ -233,7 +233,7 @@ export class NetworkStack extends Stack { cidrMask: 24, }, ], - natGateways: 0, + natGateways: 1, }); // Only create interface endpoints if VPC not imported this.createVpcEndpoints(vpc); diff --git a/deployment/cdk/opensearch-service-migration/lib/opensearch-domain-stack.ts b/deployment/cdk/opensearch-service-migration/lib/opensearch-domain-stack.ts index bcf156f76e..b3acb07eb7 100644 --- a/deployment/cdk/opensearch-service-migration/lib/opensearch-domain-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/opensearch-domain-stack.ts @@ -17,7 +17,7 @@ import { ClusterYaml } from "./migration-services-yaml"; import { ClusterAuth, ClusterBasicAuth, - ClusterNoAuth, + ClusterSigV4Auth, MigrationSSMParameter, createMigrationStringParameter, getMigrationStringParameterValue @@ -123,7 +123,7 @@ export class OpenSearchDomainStack extends Stack { if (adminUserName) { clusterAuth.basicAuth = new ClusterBasicAuth({ username: adminUserName, password_from_secret_arn: adminUserSecret?.secretArn }) } else { - clusterAuth.noAuth = new ClusterNoAuth(); + clusterAuth.sigv4 = new ClusterSigV4Auth({region: Stack.of(this).region, serviceSigningName: "es"}); } this.targetClusterYaml = new ClusterYaml({endpoint: `https://${domain.domainEndpoint}:443`, auth: clusterAuth, version: version.toString()}) diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index 560347d0cf..604f7a60a1 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -34,6 +34,8 @@ export interface MigrationConsoleProps extends StackPropsExt { readonly servicesYaml: ServicesYaml, readonly otelCollectorEnabled?: boolean, readonly sourceCluster?: ClusterYaml, + readonly sourceClusterDisabled?: boolean, + readonly targetCluster?: ClusterYaml, readonly managedServiceSourceSnapshotEnabled?: boolean } @@ -272,11 +274,13 @@ export class MigrationConsoleStack extends MigrationServiceCore { servicePolicies = servicePolicies.concat(mskAdminPolicies) } - if (props.managedServiceSourceSnapshotEnabled && servicesYaml.snapshot?.s3) { - servicesYaml.snapshot.s3.role = - createSnapshotOnAOSRole(this, artifactS3Arn, serviceTaskRole.roleArn, - this.region, props.stage, props.defaultDeployId) - .roleArn; + // Always create the snapshot role regardless of any conditions + const snapshotRole = createSnapshotOnAOSRole(this, artifactS3Arn, serviceTaskRole.roleArn, + this.region, props.stage, props.defaultDeployId); + + // If snapshot s3 config exists, add the role ARN to it + if (servicesYaml.snapshot?.s3) { + servicesYaml.snapshot.s3.role = snapshotRole.roleArn; } const parameter = createMigrationStringParameter(this, servicesYaml.stringify(), { diff --git a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts index 3779e0b917..a4df3ff8de 100644 --- a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts +++ b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts @@ -243,14 +243,6 @@ export class StackComposer { } const sourceClusterEndpoint = sourceCluster?.endpoint - if (managedServiceSourceSnapshotEnabled && !sourceCluster?.auth.sigv4) { - throw new Error("A managed service source snapshot is only compatible with sigv4 authentication. If you would like to proceed" + - " please disable `managedServiceSourceSnapshotEnabled` and provide your own snapshot of the source cluster.") - } else if (sourceCluster?.auth.sigv4 && managedServiceSourceSnapshotEnabled == null) { - managedServiceSourceSnapshotEnabled = true; - CdkLogger.info("`managedServiceSourceSnapshotEnabled` is not set with source cluster set with sigv4 auth, defaulting to true.") - } - const targetClusterEndpointField = this.getContextForType('targetClusterEndpoint', 'string', defaultValues, contextJSON) let targetClusterDefinition = this.getContextForType('targetCluster', 'object', defaultValues, contextJSON) const usePreexistingTargetCluster = !!(targetClusterEndpointField ?? targetClusterDefinition) @@ -272,6 +264,18 @@ export class StackComposer { } const targetCluster = usePreexistingTargetCluster ? parseClusterDefinition(targetClusterDefinition) : undefined + if (managedServiceSourceSnapshotEnabled && !sourceCluster?.auth.sigv4) { + throw new Error("A managed service source snapshot is only compatible with sigv4 authentication. If you would like to proceed" + + " please disable `managedServiceSourceSnapshotEnabled` and provide your own snapshot of the source cluster.") + } else if (sourceCluster?.auth.sigv4 && managedServiceSourceSnapshotEnabled == null) { + managedServiceSourceSnapshotEnabled = true; + CdkLogger.info("`managedServiceSourceSnapshotEnabled` is not set with source cluster set with sigv4 auth, defaulting to true.") + } else if (!sourceCluster && targetCluster?.auth.sigv4 && managedServiceSourceSnapshotEnabled == null) { + // If no source cluster is defined but target cluster uses SigV4 auth, enable snapshot role creation + managedServiceSourceSnapshotEnabled = true; + CdkLogger.info("`managedServiceSourceSnapshotEnabled` is not set with no source cluster but target cluster set with sigv4 auth, defaulting to true.") + } + // Ensure that target cluster username and password are not defined in multiple places if (targetCluster && (fineGrainedManagerUserName || fineGrainedManagerUserSecretManagerKeyARN)) { throw new Error("The `fineGrainedManagerUserName` and `fineGrainedManagerUserSecretManagerKeyARN` can only be used when a domain is being " + diff --git a/deployment/cdk/opensearch-service-migration/options.md b/deployment/cdk/opensearch-service-migration/options.md index ac6f767971..a6f76f340e 100644 --- a/deployment/cdk/opensearch-service-migration/options.md +++ b/deployment/cdk/opensearch-service-migration/options.md @@ -37,7 +37,7 @@ These tables list all CDK context configuration values a user can specify for th #### Structure of the cluster objects -If no source cluster is being configured, the source cluster object should be `{"disabled": true} and no other fields are necessary. +If no source cluster is being configured, the source cluster object should be `{"disabled": true}` and no other fields are necessary. In all other cases, the required components of each cluster object are: @@ -49,6 +49,12 @@ In all other cases, the required components of each cluster object are: 3. Basic auth with plaintext password (only supported for the source cluster and not recommended): `{"type": "basic", "username": "admin", "password": "admin123"}` 4. Basic auth with password in secrets manager (recommended): `{"type": "basic", "username": "admin", "passwordFromSecretArn": "arn:aws:secretsmanager:us-east-1:12345678912:secret:master-user-os-pass-123abc"}` +**Note on Source Cluster Version:** +If you want to use Migration Console CDK Context for deploying a source cluster, you can specify the source version in one of three ways (consider 5.6 as an example): +1. In the source cluster configuration: `"version": "ES_5.6"` +2. In the Migration CDK Context: `"sourceClusterVersion": "ES_5.6"` +3. By passing RFS Extra Args with the `--source-version` flag: `"reindexFromSnapshotExtraArgs": "--source-version ES_5.6"` + ### Snapshot Definition Options | Name | Type | Example | Description | @@ -145,7 +151,6 @@ A number of options are currently available but deprecated. While they function | sourceClusterDisabled | boolean | true | Disable configuring a source cluster in any way. This is incompatible with specifying any type of capture proxy or a source cluster endpoint. It's suitable for backfill migrations using ReindexFromSnapshot from an already-existing snapshot. | | sourceClusterEndpoint | string | `"https://my-source-cluster.com:443"` | The URI of the source cluster from that the migration will reference. **Note**: if this is not provided and elasticsearchService or captureProxyESService is enabled, the migration will reference a uri for that service. | - ### Other Options | Name | Type | Example | Description | @@ -160,4 +165,3 @@ A number of options are currently available but deprecated. While they function [^1]: Extra arguments can be added, overridden, or removed as follows: - To add/override an argument: Include the argument with the value, e.g., `"--new-arg value"` - Include quotes/escaping as appropriate for bash processing `"--new-arg \"my value\""` - diff --git a/jenkins/migrationIntegPipelines/largeSnapshotGeneratorTestCover.groovy b/jenkins/migrationIntegPipelines/largeSnapshotGeneratorTestCover.groovy new file mode 100644 index 0000000000..dc3c9ef21e --- /dev/null +++ b/jenkins/migrationIntegPipelines/largeSnapshotGeneratorTestCover.groovy @@ -0,0 +1,20 @@ +def gitBranch = params.GIT_BRANCH ?: 'test-k8s-large-snapshot' +def gitUrl = params.GIT_REPO_URL ?: 'https://github.com/jugal-chauhan/opensearch-migrations.git' + +library identifier: "migrations-lib@${gitBranch}", retriever: modernSCM( + [$class: 'GitSCMSource', + remote: "${gitUrl}"]) + +// Shared library function (location from root: vars/largeSnapshotGeneratorTest.groovy) +largeSnapshotGeneratorTest( + NUM_SHARDS: params.NUM_SHARDS, + MULTIPLICATION_FACTOR: params.MULTIPLICATION_FACTOR, + BATCH_COUNT: params.BATCH_COUNT, + DOCS_PER_BATCH: params.DOCS_PER_BATCH, + BACKFILL_TIMEOUT_HOURS: params.BACKFILL_TIMEOUT_HOURS, + LARGE_SNAPSHOT_RATE_MB_PER_NODE: params.LARGE_SNAPSHOT_RATE_MB_PER_NODE, + RFS_WORKERS: params.RFS_WORKERS, + CLUSTER_VERSION: params.CLUSTER_VERSION, + DEPLOY_REGION: params.DEPLOY_REGION, + SNAPSHOT_REGION: params.SNAPSHOT_REGION, +) diff --git a/vars/largeSnapshotGeneratorTest.groovy b/vars/largeSnapshotGeneratorTest.groovy new file mode 100644 index 0000000000..d1e0f9c2bd --- /dev/null +++ b/vars/largeSnapshotGeneratorTest.groovy @@ -0,0 +1,142 @@ +import groovy.json.JsonOutput + +def call(Map config = [:]) { + def migrationContextId = 'document-multiplier-rfs' + def time = new Date().getTime() + def testUniqueId = "document_multiplier_${time}_${currentBuild.number}" + + def docTransformerPath = "/shared-logs-output/test-transformations/transformation.json" + + // Map the cluster version parameter to the actual engine version + String engineVersion + String distVersion + String dataNodeType + String dedicatedManagerNodeType + boolean ebsEnabled = false + Integer ebsVolumeSize = null + Integer dataNodeCount = null + + switch (params.CLUSTER_VERSION) { + case 'es5x': + engineVersion = "ES_5.6" + distVersion = "5.6" + dataNodeType = "r5.4xlarge.search" + dedicatedManagerNodeType = "m4.xlarge.search" + ebsEnabled = true + ebsVolumeSize = 300 + dataNodeCount = 60 + break + case 'es6x': + engineVersion = "ES_6.8" + distVersion = "6.8" + dataNodeType = "r5.4xlarge.search" + dedicatedManagerNodeType = "m5.xlarge.search" + ebsEnabled = true + ebsVolumeSize = 300 + dataNodeCount = 60 + break + case 'es7x': + engineVersion = "ES_7.10" + distVersion = "7.10" + dataNodeType = "r6gd.4xlarge.search" + dedicatedManagerNodeType = "m6g.xlarge.search" + dataNodeCount = 60 + break + case 'os1x': + engineVersion = "OS_1.3" + distVersion = "1.3" + dataNodeType = "r6g.4xlarge.search" + dedicatedManagerNodeType = "m6g.xlarge.search" + ebsEnabled = true + ebsVolumeSize = 1024 + dataNodeCount = 16 + break + case 'os2x': + engineVersion = "OS_2.17" + distVersion = "2.17" + dataNodeType = "r6g.4xlarge.search" + dedicatedManagerNodeType = "m6g.xlarge.search" + ebsEnabled = true + ebsVolumeSize = 1024 + dataNodeCount = 16 + break + default: + throw new RuntimeException("Unsupported CLUSTER_VERSION: ${params.CLUSTER_VERSION}") + } + + // Determine if node-to-node encryption should be enabled based on ES version + def nodeToNodeEncryption = params.CLUSTER_VERSION != 'es5x' + + def migration_cdk_context = """ + { + "document-multiplier-rfs": { + "stage": "${params.STAGE}", + "region": "${params.DEPLOY_REGION}", + "artifactBucketRemovalPolicy": "DESTROY", + "captureProxyServiceEnabled": false, + "targetClusterProxyServiceEnabled": false, + "trafficReplayerServiceEnabled": false, + "reindexFromSnapshotServiceEnabled": true, + "reindexFromSnapshotExtraArgs": "--doc-transformer-config-file ${docTransformerPath} --source-version ${engineVersion}", + "sourceClusterDeploymentEnabled": false, + "sourceClusterDisabled": true, + "vpcEnabled": true, + "vpcAZCount": 2, + "migrationAssistanceEnabled": true, + "replayerOutputEFSRemovalPolicy": "DESTROY", + "migrationConsoleServiceEnabled": true, + "otelCollectorEnabled": true, + "engineVersion": "${engineVersion}", + "distVersion": "${distVersion}", + "domainName": "${params.CLUSTER_VERSION}-jenkins-test", + "dataNodeCount": ${dataNodeCount}, + "dataNodeType": "${dataNodeType}", + "masterEnabled": true, + "dedicatedManagerNodeCount": 3, + "dedicatedManagerNodeType": "${dedicatedManagerNodeType}", + "ebsEnabled": ${ebsEnabled}, + ${ebsVolumeSize ? '"ebsVolumeSize": ' + ebsVolumeSize + ',' : ''} + "openAccessPolicyEnabled": false, + "domainRemovalPolicy": "DESTROY", + "tlsSecurityPolicy": "TLS_1_2", + "enforceHTTPS": true, + "nodeToNodeEncryptionEnabled": ${nodeToNodeEncryption}, + "encryptionAtRestEnabled": true + } + } + """ + + def source_cdk_context = """ + { + "source-single-node-ec2": { + "suffix": "ec2-source-", + "networkStackSuffix": "ec2-source-", + "region": "${params.DEPLOY_REGION}" + } + } + """ + + largeSnapshotPipeline( + sourceContext: source_cdk_context, + migrationContext: migration_cdk_context, + migrationContextId: migrationContextId, + defaultStageId: 'dev', + skipCaptureProxyOnNodeSetup: true, + skipSourceDeploy: true, + jobName: 'k8s-large-snapshot-test', + testUniqueId: testUniqueId, + integTestCommand: '/root/lib/integ_test/integ_test/document_multiplier.py --config-file=/config/migration-services.yaml --log-cli-level=info', + parameterDefaults: [ + NUM_SHARDS: params.NUM_SHARDS, + MULTIPLICATION_FACTOR: params.MULTIPLICATION_FACTOR, + BATCH_COUNT: params.BATCH_COUNT, + DOCS_PER_BATCH: params.DOCS_PER_BATCH, + BACKFILL_TIMEOUT_HOURS: params.BACKFILL_TIMEOUT_HOURS, + LARGE_SNAPSHOT_RATE_MB_PER_NODE: params.LARGE_SNAPSHOT_RATE_MB_PER_NODE, + RFS_WORKERS: params.RFS_WORKERS, + CLUSTER_VERSION: params.CLUSTER_VERSION, + DEPLOY_REGION: params.DEPLOY_REGION, + SNAPSHOT_REGION: params.SNAPSHOT_REGION, + ] + ) +} diff --git a/vars/largeSnapshotPipeline.groovy b/vars/largeSnapshotPipeline.groovy new file mode 100644 index 0000000000..5f9188abd6 --- /dev/null +++ b/vars/largeSnapshotPipeline.groovy @@ -0,0 +1,269 @@ +def call(Map config = [:]) { + def sourceContext = config.sourceContext + def migrationContext = config.migrationContext + def defaultStageId = config.defaultStageId + def jobName = config.jobName + + if(sourceContext == null || sourceContext.isEmpty()){ + throw new RuntimeException("The sourceContext argument must be provided"); + } + if(migrationContext == null || migrationContext.isEmpty()){ + throw new RuntimeException("The migrationContext argument must be provided"); + } + if(defaultStageId == null || defaultStageId.isEmpty()){ + throw new RuntimeException("The defaultStageId argument must be provided"); + } + if(jobName == null || jobName.isEmpty()){ + throw new RuntimeException("The jobName argument must be provided"); + } + def source_context_id = config.sourceContextId ?: 'source-single-node-ec2' + def migration_context_id = config.migrationContextId ?: 'migration-default' + def source_context_file_name = 'sourceJenkinsContext.json' + def migration_context_file_name = 'migrationJenkinsContext.json' + def skipCaptureProxyOnNodeSetup = config.skipCaptureProxyOnNodeSetup ?: false + def time = new Date().getTime() + def skipSourceDeploy = config.skipSourceDeploy ?: false + def testUniqueId = config.testUniqueId ?: "integ_full_${time}_${currentBuild.number}" + def testDir = "/root/lib/integ_test/integ_test" + def integTestCommand = config.integTestCommand ?: "${testDir}/replayer_tests.py" + pipeline { + agent { label config.workerAgent ?: 'Jenkins-Default-Agent-X64-C5xlarge-Single-Host' } + + parameters { + string(name: 'GIT_REPO_URL', defaultValue: 'https://github.com/jugal-chauhan/opensearch-migrations.git', description: 'Git repository url') + string(name: 'GIT_BRANCH', defaultValue: 'test-k8s-large-snapshot', description: 'Git branch to use for repository') + string(name: 'STAGE', defaultValue: "${defaultStageId}", description: 'Stage name for deployment environment') + choice(name: 'DEPLOY_REGION', choices: ['us-west-1', 'us-west-2', 'us-east-2'], description: 'AWS Region to deploy resources') + choice(name: 'SNAPSHOT_REGION', choices: ['us-east-1', 'us-west-2'], description: 'AWS Region for taking the final snapshot') + string(name: 'NUM_SHARDS', defaultValue: '10', description: 'Number of index shards') + string(name: 'MULTIPLICATION_FACTOR', defaultValue: '1000', description: 'Document multiplication factor') + string(name: 'BATCH_COUNT', defaultValue: '3', description: 'Number of batches') + string(name: 'DOCS_PER_BATCH', defaultValue: '100', description: 'Documents per batch') + string(name: 'BACKFILL_TIMEOUT_HOURS', defaultValue: '45', description: 'Backfill timeout in hours') + string(name: 'LARGE_SNAPSHOT_RATE_MB_PER_NODE', defaultValue: '2000', description: 'Rate for large snapshot creation in MB per node') + string(name: 'RFS_WORKERS', defaultValue: '8', description: 'Number of RFS workers to scale to') + choice(name: 'CLUSTER_VERSION', choices: ['es5x', 'es6x', 'es7x', 'os1x', 'os2x'], description: 'Target cluster version for data format') + } + + environment { + NUM_SHARDS = "${params.NUM_SHARDS}" + MULTIPLICATION_FACTOR = "${params.MULTIPLICATION_FACTOR}" + BATCH_COUNT = "${params.BATCH_COUNT}" + DOCS_PER_BATCH = "${params.DOCS_PER_BATCH}" + BACKFILL_TIMEOUT_HOURS = "${params.BACKFILL_TIMEOUT_HOURS}" + LARGE_SNAPSHOT_RATE_MB_PER_NODE = "${params.LARGE_SNAPSHOT_RATE_MB_PER_NODE}" + RFS_WORKERS = "${params.RFS_WORKERS}" + CLUSTER_VERSION = "${params.CLUSTER_VERSION}" + DEPLOY_REGION = "${params.DEPLOY_REGION}" + SNAPSHOT_REGION = "${params.SNAPSHOT_REGION}" + } + + options { + // Acquire lock on a fixed resource name but store the stage parameter + lock(resource: "${params.STAGE}", variable: 'lockId') + timeout(time: 30, unit: 'HOURS') + buildDiscarder(logRotator(daysToKeepStr: '30')) + } + + triggers { + GenericTrigger( + genericVariables: [ + [key: 'GIT_REPO_URL', value: '$.GIT_REPO_URL'], + [key: 'GIT_BRANCH', value: '$.GIT_BRANCH'], + [key: 'job_name', value: '$.job_name'] + ], + tokenCredentialId: 'jenkins-migrations-generic-webhook-token', + causeString: 'Triggered by PR on opensearch-migrations repository', + regexpFilterExpression: "^$jobName\$", + regexpFilterText: "\$job_name", + ) + } + + stages { + stage('Checkout') { + steps { + script { + // Allow overwriting this step + if (config.checkoutStep) { + config.checkoutStep() + } else { + git branch: "${params.GIT_BRANCH}", url: "${params.GIT_REPO_URL}" + } + } + } + } + + stage('Test Caller Identity') { + steps { + script { + // Allow overwriting this step + if (config.awsIdentityCheckStep) { + config.awsIdentityCheckStep() + } else { + sh 'aws sts get-caller-identity' + } + } + } + } + + stage('Setup E2E CDK Context') { + steps { + script { + // Allow overwriting this step + if (config.cdkContextStep) { + config.cdkContextStep() + } else { + writeFile (file: "test/$source_context_file_name", text: sourceContext) + sh "echo 'Using source context file options: ' && cat test/$source_context_file_name" + writeFile (file: "test/$migration_context_file_name", text: migrationContext) + sh "echo 'Using migration context file options: ' && cat test/$migration_context_file_name" + } + } + } + } + + stage('Build') { + steps { + timeout(time: 1, unit: 'HOURS') { + script { + // Allow overwriting this step + if (config.buildStep) { + config.buildStep() + } else { + sh 'sudo --preserve-env ./gradlew clean build --no-daemon' + } + } + } + } + } + + stage('Deploy') { + steps { + timeout(time: 90, unit: 'MINUTES') { + dir('test') { + script { + // Allow overwriting this step + if (config.deployStep) { + config.deployStep() + } else { + echo "Acquired deployment stage: ${params.STAGE}" + sh 'sudo usermod -aG docker $USER' + sh 'sudo newgrp docker' + def baseCommand = "sudo --preserve-env AWS_REGION=${params.DEPLOY_REGION} AWS_DEFAULT_REGION=${params.DEPLOY_REGION} ./awsE2ESolutionSetup.sh --source-context-file './$source_context_file_name' " + + "--migration-context-file './$migration_context_file_name' " + + "--source-context-id $source_context_id " + + "--migration-context-id $migration_context_id " + + "--stage ${params.STAGE} " + + "--migrations-git-url ${params.GIT_REPO_URL} " + + "--migrations-git-branch ${params.GIT_BRANCH}" + if (skipCaptureProxyOnNodeSetup) { + baseCommand += " --skip-capture-proxy" + } + if (skipSourceDeploy) { + baseCommand += " --skip-source-deploy" + } + withCredentials([string(credentialsId: 'migrations-test-account-id', variable: 'MIGRATIONS_TEST_ACCOUNT_ID')]) { + withAWS(role: 'JenkinsDeploymentRole', roleAccount: "${MIGRATIONS_TEST_ACCOUNT_ID}", region: "${params.DEPLOY_REGION}", duration: 5400, roleSessionName: 'jenkins-session') { + sh """ + ${baseCommand} + """ + } + } + } + } + } + } + } + } + + stage('Integ Tests') { + steps { + timeout(time: 24, unit: 'HOURS') { + dir('test') { + script { + // Allow overwriting this step + if (config.integTestStep) { + config.integTestStep() + } else { + echo "Running with NUM_SHARDS=${env.NUM_SHARDS}, MULTIPLICATION_FACTOR=${env.MULTIPLICATION_FACTOR}, BATCH_COUNT=${env.BATCH_COUNT}, DOCS_PER_BATCH=${env.DOCS_PER_BATCH}, BACKFILL_TIMEOUT_HOURS=${env.BACKFILL_TIMEOUT_HOURS}, LARGE_SNAPSHOT_RATE_MB_PER_NODE=${env.LARGE_SNAPSHOT_RATE_MB_PER_NODE}, RFS_WORKERS=${env.RFS_WORKERS}, CLUSTER_VERSION=${env.CLUSTER_VERSION}, DEPLOY_REGION=${env.DEPLOY_REGION}, SNAPSHOT_REGION=${env.SNAPSHOT_REGION}" + echo "Running integration tests with command: ${integTestCommand}" + def test_result_file = "${testDir}/reports/${testUniqueId}/report.xml" + def populatedIntegTestCommand = integTestCommand.replaceAll("", params.STAGE) + def command = "pipenv run pytest --log-file=${testDir}/reports/${testUniqueId}/pytest.log " + + "--num_shards=${env.NUM_SHARDS} " + + "--multiplication_factor=${env.MULTIPLICATION_FACTOR} " + + "--batch_count=${env.BATCH_COUNT} " + + "--docs_per_batch=${env.DOCS_PER_BATCH} " + + "--backfill_timeout_hours=${env.BACKFILL_TIMEOUT_HOURS} " + + "--large_snapshot_rate_mb_per_node=${env.LARGE_SNAPSHOT_RATE_MB_PER_NODE} " + + "--junitxml=${test_result_file} ${populatedIntegTestCommand} " + + "--unique_id ${testUniqueId} " + + "--stage ${env.STAGE} " + + "--rfs_workers ${env.RFS_WORKERS} " + + "--cluster_version ${env.CLUSTER_VERSION} " + + "--deploy_region ${env.DEPLOY_REGION} " + + "--snapshot_region ${env.SNAPSHOT_REGION} " + + "-s" + withCredentials([string(credentialsId: 'migrations-test-account-id', variable: 'MIGRATIONS_TEST_ACCOUNT_ID')]) { + withAWS(role: 'JenkinsDeploymentRole', roleAccount: "${MIGRATIONS_TEST_ACCOUNT_ID}", region: "${env.DEPLOY_REGION}", duration: 43200, roleSessionName: 'jenkins-session') { + sh "sudo --preserve-env ./awsRunIntegTests.sh --command '${command}' " + + "--test-result-file ${test_result_file} " + + "--stage ${env.STAGE}" + } + } + } + } + } + } + } + } + + stage('Cleanup Deployment') { + steps { + timeout(time: 1, unit: 'HOURS') { + dir('test') { + script { + // Allow overwriting this step + if (config.cleanupStep) { + config.cleanupStep() + } else { + echo "Cleaning up all deployed stacks on stage: ${params.STAGE}" + dir('cleanupDeployment') { + sh "sudo --preserve-env pipenv install --deploy --ignore-pipfile" + def command = "pipenv run python3 cleanup_deployment.py --stage ${params.STAGE}" + withCredentials([string(credentialsId: 'migrations-test-account-id', variable: 'MIGRATIONS_TEST_ACCOUNT_ID')]) { + withAWS(role: 'JenkinsDeploymentRole', roleAccount: "${MIGRATIONS_TEST_ACCOUNT_ID}", region: "${params.DEPLOY_REGION}", duration: 3600, roleSessionName: 'jenkins-session') { + sh """ + export AWS_REGION=${params.DEPLOY_REGION} + export AWS_DEFAULT_REGION=${params.DEPLOY_REGION} + sudo --preserve-env ${command} + """ + } + } + } + } + } + } + } + } + } + } + + post { + always { + timeout(time: 10, unit: 'MINUTES') { + dir('test') { + script { + echo "Pipeline execution complete" + if (config.finishStep) { + config.finishStep() + } else { + sh "echo 'Default post step performs no actions'" + } + } + } + } + } + } + } +}