Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ def pytest_configure(config):

def pytest_addoption(parser):
parser.addoption("--unique_id", action="store", default=uuid.uuid4().hex)
parser.addoption("--stage", action="store", default="dev")
parser.addoption("--config_file_path", action="store", default="/etc/migration_services.yaml",
help="Path to config file for console library")

parser.addoption("--source_proxy_alb_endpoint", action="store", default=None,
help="Specify the Migration ALB endpoint for the source capture proxy")
parser.addoption("--target_proxy_alb_endpoint", action="store", default=None,
help="Specify the Migration ALB endpoint for the target proxy")

@pytest.fixture
def unique_id(pytestconfig):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import os
import pytest
import unittest
from http import HTTPStatus
from console_link.middleware.clusters import run_test_benchmarks, connection_check, clear_indices, ConnectionResult
from console_link.middleware.clusters import connection_check, clear_indices, ConnectionResult
from console_link.models.cluster import Cluster
from console_link.models.backfill_base import Backfill
from console_link.models.replayer_base import Replayer
Expand All @@ -12,8 +13,7 @@
from console_link.middleware.kafka import delete_topic
from console_link.models.metadata import Metadata
from console_link.cli import Context
from common_operations import (create_index, create_document, check_doc_counts_match, wait_for_running_replayer,
EXPECTED_BENCHMARK_DOCS)
from common_operations import (create_index, create_document, check_doc_counts_match, wait_for_running_replayer)
logger = logging.getLogger(__name__)


Expand All @@ -25,6 +25,19 @@ def initialize(request):
pytest.unique_id = unique_id
source_cluster = pytest.console_env.source_cluster
target_cluster = pytest.console_env.target_cluster
# If in AWS, modify source and target objects here to route requests through the created ALB to verify its operation
if 'AWS_EXECUTION_ENV' in os.environ:
logger.info("Detected an AWS environment")
source_proxy_alb_endpoint = request.config.getoption("--source_proxy_alb_endpoint")
target_proxy_alb_endpoint = request.config.getoption("--target_proxy_alb_endpoint")
logger.info("Checking original source and target endpoints can be reached, before using ALB endpoints for test")
direct_source_con_result: ConnectionResult = connection_check(source_cluster)
assert direct_source_con_result.connection_established is True
direct_target_con_result: ConnectionResult = connection_check(target_cluster)
assert direct_target_con_result.connection_established is True
source_cluster.endpoint = source_proxy_alb_endpoint
target_cluster.endpoint = target_proxy_alb_endpoint
target_cluster.allow_insecure = True
backfill: Backfill = pytest.console_env.backfill
assert backfill is not None
metadata: Metadata = pytest.console_env.metadata
Expand Down Expand Up @@ -74,7 +87,6 @@ def test_e2e_0001_default(self):
replayer: Replayer = pytest.console_env.replay

# Load initial data
run_test_benchmarks(source_cluster)
index_name = f"test_e2e_0001_{pytest.unique_id}"
doc_id_base = "e2e_0001_doc"
create_index(cluster=source_cluster, index_name=index_name, test_case=self)
Expand All @@ -90,21 +102,21 @@ def test_e2e_0001_default(self):
snapshot.create(wait=True)
metadata.migrate()
backfill.start()
backfill.scale(units=10)
backfill.scale(units=2)
# This document was created after snapshot and should not be included in Backfill but expected in Replay
create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_2",
expected_status_code=HTTPStatus.CREATED, test_case=self)

ignore_list = [".", "searchguard", "sg7", "security-auditlog", "reindexed-logs"]
expected_docs = dict(EXPECTED_BENCHMARK_DOCS)
expected_docs = {}
# Source should have both documents
expected_docs[index_name] = {"docs.count": "2"}
expected_docs[index_name] = {"count": 2}
check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_docs,
index_prefix_ignore_list=ignore_list, test_case=self)
# Target should have one document from snapshot
expected_docs[index_name] = {"docs.count": "1"}
expected_docs[index_name] = {"count": 1}
check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_docs,
index_prefix_ignore_list=ignore_list, max_attempts=40, delay=30.0, test_case=self)
index_prefix_ignore_list=ignore_list, max_attempts=20, delay=30.0, test_case=self)

backfill.stop()

Expand All @@ -114,7 +126,7 @@ def test_e2e_0001_default(self):
replayer.start()
wait_for_running_replayer(replayer=replayer)

expected_docs[index_name] = {"docs.count": "3"}
expected_docs[index_name] = {"count": 3}
check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_docs,
index_prefix_ignore_list=ignore_list, test_case=self)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
def gitBranch = params.GIT_BRANCH ?: 'main'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make a new Pipeline for this instead of use our existing ES 6.8 -> OS 2.X Pipeline?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing a combination of RFS + C&R, having this separation I think will be useful in the future to do more targeted testing with say a pre-made snapshot in the RFS specific pipeline. It might also be nice to see if both pipelines or a single pipeline is failing when a bug is introduced.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thoughts:

  • Why wouldn't we want to use a pre-made snapshot for all of our integ tests?
  • Why don't we want to test both RFS and C&R for each of our integ tests?
  • see if both pipelines or a single pipeline is failing -> Can you elaborate a bit more on this one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its helpful for testing our workflow where the user performs the snapshot creation themselves from the migration console

In a similar vein, I like testing RFS and C&R together and separately as it affirms that these are separate independent pieces that can operate on their own, and that we haven't introduced some logic that say requires a snapshot to exist when we are only doing a C&R scenario

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, my takeaways:

  • Combine w/ the existing ES 6.8 test
  • Update GitHub actions to use this instead of the existing 7.10 RFS only test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated this PR to include both these items. See recent Jenkins build here: https://migrations.ci.opensearch.org/job/full-es68source-e2e-test/4/

def gitUrl = params.GIT_REPO_URL ?: 'https://github.com/opensearch-project/opensearch-migrations.git'

library identifier: "migrations-lib@${gitBranch}", retriever: modernSCM(
[$class: 'GitSCMSource',
remote: "${gitUrl}"])

// Shared library function (location from root: vars/fullDefaultE2ETest.groovy)
fullDefaultE2ETest()
4 changes: 3 additions & 1 deletion vars/defaultIntegPipeline.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ def call(Map config = [:]) {
def time = new Date().getTime()
def uniqueId = "integ_min_${time}_${currentBuild.number}"
def test_result_file = "${testDir}/reports/${uniqueId}/report.xml"
def populatedIntegTestCommand = integTestCommand.replaceAll("<STAGE>", stage)
def command = "pipenv run pytest --log-file=${testDir}/reports/${uniqueId}/pytest.log " +
"--junitxml=${test_result_file} ${integTestCommand} " +
"--junitxml=${test_result_file} ${populatedIntegTestCommand} " +
"--unique_id ${uniqueId} " +
"--stage ${stage} " +
"-s"
withCredentials([string(credentialsId: 'migrations-test-account-id', variable: 'MIGRATIONS_TEST_ACCOUNT_ID')]) {
withAWS(role: 'JenkinsDeploymentRole', roleAccount: "${MIGRATIONS_TEST_ACCOUNT_ID}", duration: 3600, roleSessionName: 'jenkins-session') {
Expand Down
73 changes: 73 additions & 0 deletions vars/fullDefaultE2ETest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

def call(Map config = [:]) {
def sourceContextId = 'source-single-node-ec2'
def migrationContextId = 'full-migration'
def source_cdk_context = """
{
"source-single-node-ec2": {
"suffix": "ec2-source-<STAGE>",
"networkStackSuffix": "ec2-source-<STAGE>",
"distVersion": "6.8.23",
"distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-6.8.23.tar.gz",
"captureProxyEnabled": false,
"securityDisabled": true,
"minDistribution": false,
"cpuArch": "x64",
"isInternal": true,
"singleNodeCluster": true,
"networkAvailabilityZones": 2,
"dataNodeCount": 1,
"managerNodeCount": 0,
"serverAccessType": "ipv4",
"restrictServerAccessTo": "0.0.0.0/0"
}
}
"""
def migration_cdk_context = """
{
"full-migration": {
"stage": "<STAGE>",
"vpcId": "<VPC_ID>",
"engineVersion": "OS_2.11",
"domainName": "os-cluster-<STAGE>",
"dataNodeCount": 2,
"openAccessPolicyEnabled": true,
"domainRemovalPolicy": "DESTROY",
"artifactBucketRemovalPolicy": "DESTROY",
"captureProxyServiceEnabled": true,
"targetClusterProxyServiceEnabled": true,
"trafficReplayerServiceEnabled": true,
"trafficReplayerExtraArgs": "--speedup-factor 10.0",
"reindexFromSnapshotServiceEnabled": true,
"sourceCluster": {
"endpoint": "<SOURCE_CLUSTER_ENDPOINT>",
"auth": {"type": "none"},
"version": "ES_6.8.23"
},
"tlsSecurityPolicy": "TLS_1_2",
"enforceHTTPS": true,
"nodeToNodeEncryptionEnabled": true,
"encryptionAtRestEnabled": true,
"vpcEnabled": true,
"vpcAZCount": 2,
"domainAZCount": 2,
"mskAZCount": 2,
"migrationAssistanceEnabled": true,
"replayerOutputEFSRemovalPolicy": "DESTROY",
"migrationConsoleServiceEnabled": true,
"otelCollectorEnabled": true
}
}
"""

defaultIntegPipeline(
sourceContext: source_cdk_context,
migrationContext: migration_cdk_context,
sourceContextId: sourceContextId,
migrationContextId: migrationContextId,
defaultStageId: 'full-integ',
skipCaptureProxyOnNodeSetup: true,
jobName: 'full-default-e2e-test',
integTestCommand: '/root/lib/integ_test/integ_test/full_tests.py --source_proxy_alb_endpoint https://alb.migration.<STAGE>.local:9201 --target_proxy_alb_endpoint https://alb.migration.<STAGE>.local:9202'
)
}
Loading