Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DataGenerator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation libs.mockito.junit.jupiter
testImplementation libs.hamcrest
testImplementation libs.testcontainers
testImplementation project(':transformation')

testRuntimeOnly libs.junit.jupiter.engine
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
.addArgument(indexName)
.addArgument(doc::toString).log();
var docId = docIdCounter.incrementAndGet();
return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null);
var type = options.getDefaultDocType();
var routing = options.getDefaultDocRouting();
return new BulkDocSection(indexName + "_" + docId, indexName, type, doc.toString(), routing);
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class WorkloadOptions {
@Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests")
private int maxBulkBatchSize = 50;

private String defaultDocType = null;

private String defaultDocRouting = null;

private final IndexOptions index = new IndexOptions();

private boolean refreshAfterEachWrite = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.opensearch.migrations;

import java.util.Map;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.SupportedClusters;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
Expand All @@ -10,30 +12,34 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;

/**
* Tests focused on running end to end test cases for Data Generator
* Tests focused on running end-to-end test cases for Data Generator
*/
@Tag("isolatedTest")
@Slf4j
class DataGeneratorEndToEnd {
class DataGeneratorEndToEndTest {

@Test
void generateData_OS_2_X() throws Exception {
try (var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_LATEST)) {
generateData(targetCluster);
}
private static Stream<Arguments> scenarios() {
return SupportedClusters.supportedSourcesOrTargets(true)
.stream()
// Exclude ES 5 from DataGenerator as not currently supported
.filter(version -> !VersionMatchers.isES_5_X.test(version.getVersion()))
.map(Arguments::of);
}

@Test
void generateData_ES_6_8() throws Exception {
try (var targetCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23)) {
generateData(targetCluster);
@ParameterizedTest(name = "Cluster {0}")
@MethodSource(value = "scenarios")
void generateData(SearchClusterContainer.ContainerVersion version) {
try (var cluster = new SearchClusterContainer(version)) {
generateData(cluster);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
Expand Down Expand Up @@ -35,11 +36,8 @@ public class EndToEndTest extends SourceTestBase {
private File localDirectory;

private static Stream<Arguments> scenarios() {
var scenarios = Stream.<Arguments>builder();
for (var migrationPair : SupportedClusters.supportedPairs(true)) {
scenarios.add(Arguments.of(migrationPair.source(), migrationPair.target()));
}
return scenarios.build();
return SupportedClusters.supportedPairs(true).stream()
.map(migrationPair -> Arguments.of(migrationPair.source(), migrationPair.target()));
}

@ParameterizedTest(name = "Source {0} to Target {1}")
Expand Down Expand Up @@ -120,18 +118,25 @@ private void migrationDocumentsWithClusters(

// === ACTION: Migrate the documents ===
var runCounter = new AtomicInteger();
final var clockJitter = new Random(1);
var clockJitter = new Random(1);

var transformationConfig = VersionMatchers.isES_5_X.or(VersionMatchers.isES_6_X)
.test(targetCluster.getContainerVersion().getVersion()) ?
"[{\"NoopTransformerProvider\":{}}]" // skip transformations including doc type removal
: null;

// ExpectedMigrationWorkTerminationException is thrown on completion.
var expectedTerminationException = waitForRfsCompletion(() -> migrateDocumentsSequentially(
sourceRepo,
snapshotName,
List.of(),
targetCluster,
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion()
sourceRepo,
snapshotName,
List.of(),
targetCluster,
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
targetCluster.getContainerVersion().getVersion(),
transformationConfig
));

Assertions.assertEquals(numberOfShards + 1, expectedTerminationException.numRuns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.stream.Stream;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
Expand Down Expand Up @@ -40,17 +41,21 @@ public class LeaseExpirationTest extends SourceTestBase {
public static final String TARGET_DOCKER_HOSTNAME = "target";

private static Stream<Arguments> testParameters() {
List<Boolean> forceMoreSegmentsValues = List.of(false, true);
List<SearchClusterContainer.ContainerVersion> sourceClusterVersions = List.of(SearchClusterContainer.ES_V6_8_23, SearchClusterContainer.ES_V7_10_2);

return forceMoreSegmentsValues.stream()
.flatMap(force -> sourceClusterVersions.stream()
.map(version -> Arguments.of(force, version)));
return Stream.concat(
// Test with all pairs with forceMoreSegments=false
SupportedClusters.supportedPairs(true).stream()
.map(migrationPair ->
Arguments.of(false, migrationPair.source(), migrationPair.target())),
// Add test for ES 7 -> OS 2 with forceMoreSegments=true
Stream.of(Arguments.of(true, SearchClusterContainer.ES_V7_10_2, SearchClusterContainer.OS_V2_19_1))
);
}

@ParameterizedTest(name = "forceMoreSegments={0}, sourceClusterVersion={1}")
@ParameterizedTest(name = "forceMoreSegments={0}, sourceClusterVersion={1}, targetClusterVersion={2}")
@MethodSource("testParameters")
public void testProcessExitsAsExpected(boolean forceMoreSegments, SearchClusterContainer.ContainerVersion sourceClusterVersion) {
public void testProcessExitsAsExpected(boolean forceMoreSegments,
SearchClusterContainer.ContainerVersion sourceClusterVersion,
SearchClusterContainer.ContainerVersion targetClusterVersion) {
// Sending 10 docs per request with 2 requests concurrently with each taking 1 second is 40 docs/sec
// will process 1640 docs in 21 seconds. With 10s lease duration, expect to be finished in 3 leases.
// This is ensured with the toxiproxy settings, the migration should not be able to be completed
Expand All @@ -63,16 +68,20 @@ public void testProcessExitsAsExpected(boolean forceMoreSegments, SearchClusterC
int continueExitCode = 2;
int finalExitCodePerShard = 0;
runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards,
finalExitCodePerShard, shards, shards, indexDocCount, forceMoreSegments, sourceClusterVersion,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, sourceClusterVersion
));
finalExitCodePerShard, shards, shards, indexDocCount, forceMoreSegments,
sourceClusterVersion,
targetClusterVersion,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer,
sourceClusterVersion, targetClusterVersion));
}

@SneakyThrows
private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expectedInitialExitCodeCount,
int expectedEventualExitCode, int expectedEventualExitCodeCount,
int shards, int indexDocCount,
boolean forceMoreSegments, SearchClusterContainer.ContainerVersion sourceClusterVersion,
boolean forceMoreSegments,
SearchClusterContainer.ContainerVersion sourceClusterVersion,
SearchClusterContainer.ContainerVersion targetClusterVersion,
Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

Expand All @@ -83,7 +92,7 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec
var esSourceContainer = new SearchClusterContainer(sourceClusterVersion)
.withAccessToHost(true);
var network = Network.newNetwork();
var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_LATEST)
var osTargetContainer = new SearchClusterContainer(targetClusterVersion)
.withAccessToHost(true)
.withNetwork(network)
.withNetworkAliases(TARGET_DOCKER_HOSTNAME);
Expand Down Expand Up @@ -129,6 +138,9 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec
// Segments will be created on each refresh which tests segment ordering logic
workloadOptions.setRefreshAfterEachWrite(forceMoreSegments);
workloadOptions.setMaxBulkBatchSize(forceMoreSegments ? 10 : 1000);
if (VersionMatchers.isES_5_X.or(VersionMatchers.isES_6_X).test(sourceClusterVersion.getVersion())) {
workloadOptions.setDefaultDocType("myType");
}
generator.generate(workloadOptions);

// Create the snapshot from the source cluster
Expand All @@ -149,12 +161,8 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec
do {
exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
runs++;
if (exitCode == expectedInitialExitCode) {
initialExitCodeCount++;
}
if (exitCode == expectedEventualExitCode) {
finalExitCodeCount++;
}
initialExitCodeCount += exitCode == expectedInitialExitCode ? 1 : 0;
finalExitCodeCount += exitCode == expectedEventualExitCode ? 1 : 0;
log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log();
// Clean tree for subsequent run
deleteTree(tempDirLucene);
Expand Down Expand Up @@ -192,7 +200,8 @@ private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer,
SearchClusterContainer.ContainerVersion sourceClusterVersion
SearchClusterContainer.ContainerVersion sourceClusterVersion,
SearchClusterContainer.ContainerVersion targetClusterVersion
) {
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void testDocumentMigration(
runCounter,
clockJitter,
testDocMigrationContext,
sourceVersion.getVersion()
sourceVersion.getVersion(),
targetVersion.getVersion(),
null
),
executorService
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -185,7 +186,9 @@ public static int migrateDocumentsSequentially(
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext,
Version sourceVersion
Version sourceVersion,
Version targetVersion,
String transformationConfig
) {
for (int runNumber = 1; ; ++runNumber) {
try {
Expand All @@ -197,7 +200,8 @@ public static int migrateDocumentsSequentially(
clockJitter,
testContext,
sourceVersion,
target.getContainerVersion().getVersion()
targetVersion,
transformationConfig
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return runNumber;
Expand Down Expand Up @@ -230,7 +234,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
Random clockJitter,
DocumentMigrationTestContext context,
Version sourceVersion,
Version targetVersion
Version targetVersion,
String transformationConfig
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
var shouldThrow = new AtomicBoolean();
Expand Down Expand Up @@ -269,7 +274,11 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
return reader;
});

var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);

var docTransformer = new TransformationLoader().getTransformerFactoryLoader(
Optional.ofNullable(transformationConfig).orElse(
RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG
));

AtomicReference<WorkItemCursor> progressCursor = new AtomicReference<>();
var coordinatorFactory = new WorkCoordinatorFactory(targetVersion);
Expand All @@ -290,7 +299,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
var clientFactory = new OpenSearchClientFactory(connectionContext);
return RfsMigrateDocuments.run(
readerFactory,
new DocumentReindexer(clientFactory.determineVersionAndCreate(), 1000, Long.MAX_VALUE, 1, () -> defaultDocTransformer),
new DocumentReindexer(clientFactory.determineVersionAndCreate(), 1000, Long.MAX_VALUE, 1, () -> docTransformer),
progressCursor,
workCoordinator,
Duration.ofMinutes(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public void migrateFromUpgrade(
counter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion()));
sourceVersion.getVersion(),
targetVersion.getVersion(),
null));
assertThat("5 shards + 1 nothing to do worker shoul d spin up", result.numRuns, equalTo(6));

var targetOperations = new ClusterOperations(targetCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ class CustomTransformationTest extends BaseMigrationTest {

private static Stream<Arguments> scenarios() {
// Transformations are differentiated only by source, so lock to a specific target.
var target = SupportedClusters.targets().stream().limit(1).findFirst().get();
return SupportedClusters.sources().stream()
var target = SearchClusterContainer.OS_LATEST;
return SupportedClusters.supportedSources(false)
.stream()
// The TypeMappingSanitizationTransformerProvider is not supported on 2.x source
.filter(source -> !VersionMatchers.isOS_2_X.test(source.getVersion()))
.map(sourceCluster -> Arguments.of(sourceCluster, target));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.common.http.HttpResponse;
import org.opensearch.migrations.bulkload.version_es_5_6.OpenSearchClient_ES_5_6;
import org.opensearch.migrations.bulkload.version_es_6_8.OpenSearchClient_ES_6_8;
import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchClient_OS_2_11;
import org.opensearch.migrations.reindexer.FailedRequestsLogger;
Expand Down Expand Up @@ -66,11 +67,12 @@ public OpenSearchClient determineVersionAndCreate(RestClient restClient, FailedR
private Class<? extends OpenSearchClient> getOpenSearchClientClass(Version version) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) {
return OpenSearchClient_OS_2_11.class;
} else if (VersionMatchers.isES_6_X.or(VersionMatchers.isES_5_X).test(version)) {
} else if (VersionMatchers.isES_6_X.test(version)) {
return OpenSearchClient_ES_6_8.class;
} else {
throw new IllegalArgumentException("Unsupported version: " + version);
} else if (VersionMatchers.isES_5_X.test(version)) {
return OpenSearchClient_ES_5_6.class;
}
throw new IllegalArgumentException("Unsupported version: " + version);
}

/** Amazon OpenSearch Serverless cluster don't have a version number, but
Expand Down
Loading
Loading