Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.lifecycle.Startables;


@Tag("isolatedTest")
public class EndToEndTest extends SourceTestBase {
@TempDir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private static Stream<Arguments> scenarios() {
return SupportedClusters.supportedSources(false)
.stream()
// The TypeMappingSanitizationTransformerProvider is not supported on 2.x source
.filter(source -> !VersionMatchers.isOS_2_X.test(source.getVersion()))
.filter(source -> !VersionMatchers.isOS_2_X.test(source.getVersion())
&& !VersionMatchers.isES_8_X.test(source.getVersion()))
.map(sourceCluster -> Arguments.of(sourceCluster, target));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class EndToEndTest extends BaseMigrationTest {

private static Stream<Arguments> scenarios() {
return SupportedClusters.supportedPairs(false).stream()
.filter(pair -> !VersionMatchers.isES_8_X.test(pair.source().getVersion()))
.flatMap(pair -> {
List<TemplateType> templateTypes = Stream.concat(
(VersionMatchers.isOS_2_X.test(pair.source().getVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ObjectNode getRequestBodyForRegisterRepo() {
// Assemble the request body
ObjectNode settings = mapper.createObjectNode();
settings.put("location", snapshotRepoDirectoryPath);

settings.put("compress", false);
ObjectNode body = mapper.createObjectNode();
body.put("type", "fs");
body.set("settings", settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public OpenSearchClient determineVersionAndCreate(RestClient restClient, FailedR
}

private Class<? extends OpenSearchClient> getOpenSearchClientClass(Version version) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).or(VersionMatchers.isES_8_X).test(version)) {
return OpenSearchClient_OS_2_11.class;
} else if (VersionMatchers.isES_6_X.test(version)) {
return OpenSearchClient_ES_6_8.class;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.migrations.bulkload.lucene.version_9;

import java.io.IOException;
import java.util.Iterator;

import shadow.lucene9.org.apache.lucene.codecs.FieldsConsumer;
import shadow.lucene9.org.apache.lucene.codecs.FieldsProducer;
import shadow.lucene9.org.apache.lucene.codecs.PostingsFormat;
import shadow.lucene9.org.apache.lucene.index.SegmentReadState;
import shadow.lucene9.org.apache.lucene.index.SegmentWriteState;
import shadow.lucene9.org.apache.lucene.index.Terms;
import shadow.lucene9.org.apache.lucene.store.Directory;

/**
* PostingsFormat fallback for Elasticsearch 8.12+ segment formats.
*
* <p>This class provides a dummy implementation for "ES812Postings" to avoid runtime
* errors when Lucene 9 attempts to load this postings format from snapshot-based
* segment metadata during document migration.</p>
*
* <p>This migration assistant does not support reading real ES 8.x segment data that uses
* proprietary `.psm` files. If any `.psm` file is detected, we fail fast with a clear error
* message. Otherwise, an empty FieldsProducer is returned to simulate a no-op reader.</p>
*
* <p>Registered via Lucene's SPI to allow dynamic loading based on PostingsFormat name
* stored in segment metadata.</p>
*
* <p><b>NOTE:</b> This class is intentionally limited to fallback behavior and not meant
* to parse actual ES 8.x Lucene segments.</p>
*/
public class IgnorePsmPostings extends PostingsFormat {

public IgnorePsmPostings() {
super("ES812Postings");
}

public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
throw new UnsupportedOperationException("ES812Postings is read-only fallback");
}

@Override
public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
Directory dir = state.directory;
for (String file : dir.listAll()) {
if (file.endsWith(".psm")) {
throw new UnsupportedOperationException(
"Detected .psm file in segment, which is not supported by the migration assistant. " +
"Your index may be using a newer/proprietary ES format. Migration cannot proceed."
);
}
}
return new FieldsProducer() {
@Override public void close() {}
@Override public void checkIntegrity() {}
@Override public Iterator<String> iterator() {
return java.util.Collections.emptyIterator();
}
@Override public Terms terms(String field) {
return null;
}
@Override public int size() {
return 0;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SnapshotReader_ES_7_10 implements ClusterSnapshotReader {
@Override
public boolean compatibleWith(Version version) {
return VersionMatchers.equalOrGreaterThanES_7_10
.or(VersionMatchers.isES_8_X)
.or(VersionMatchers.isOS_1_X)
.or(VersionMatchers.isOS_2_X)
.test(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public OpenSearchWorkCoordinator get(
long tolerableClientServerClockDifferenceSeconds,
String workerId
) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).or(VersionMatchers.isES_8_X).test(version)) {
return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
} else if (VersionMatchers.isES_6_X.or(VersionMatchers.isES_5_X).test(version)) {
return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
Expand All @@ -38,7 +38,7 @@ public OpenSearchWorkCoordinator get(
Clock clock,
Consumer<WorkItemAndDuration> workItemConsumer
) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).or(VersionMatchers.isES_8_X).test(version)) {
return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
} else if (VersionMatchers.isES_6_X.or(VersionMatchers.isES_5_X).test(version)) {
return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.opensearch.migrations.bulkload.lucene.version_9.IgnorePsmPostings
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public static List<ContainerVersion> sources() {
SearchClusterContainer.ES_V6_8_23,
SearchClusterContainer.ES_V7_10_2,
SearchClusterContainer.ES_V7_17,
SearchClusterContainer.ES_V8_17,
SearchClusterContainer.OS_V1_3_16
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class SearchClusterContainer extends GenericContainer<SearchClusterContai
"docker.elastic.co/elasticsearch/elasticsearch:7.17.22",
Version.fromString("ES 7.17.22")
);
public static final ContainerVersion ES_V8_17 = new Elasticsearch8Version(
"docker.elastic.co/elasticsearch/elasticsearch:8.17.5",
Version.fromString("ES 8.17.5")
);
public static final ContainerVersion ES_V7_10_2 = new ElasticsearchOssVersion(
"docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2",
Version.fromString("ES 7.10.2")
Expand Down Expand Up @@ -65,26 +69,40 @@ private enum INITIALIZATION_FLAVOR {
BASE(Map.of("discovery.type", "single-node",
"path.repo", CLUSTER_SNAPSHOT_DIR,
"ES_JAVA_OPTS", "-Xms2g -Xmx2g",
"index.store.type", "mmapfs",
"bootstrap.system_call_filter", "false"
"index.store.type", "mmapfs"
)),
ELASTICSEARCH(
new ImmutableMap.Builder<String, String>().putAll(BASE.getEnvVariables())
.put("xpack.security.enabled", "false")
.put("bootstrap.system_call_filter", "false")
.build()),
ELASTICSEARCH_OSS(
new ImmutableMap.Builder<String, String>().putAll(BASE.getEnvVariables())
.put("bootstrap.system_call_filter", "false")
.build()),
ELASTICSEARCH_8(
new ImmutableMap.Builder<String, String>().putAll(BASE.getEnvVariables())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put all from ELASTICSEARCH then just add the ones that are specifically needed for 8

Copy link
Collaborator Author

@jugal-chauhan jugal-chauhan May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main concern here is that ES 8 does not accept setting bootstrap.system_call_filter as false, whereas other elasticsearch flavors support it.

.put("xpack.security.enabled", "false")
.put("xpack.security.enrollment.enabled", "false")
.put("xpack.security.http.ssl.enabled", "false")
.put("xpack.security.transport.ssl.enabled", "false")
.put("cluster.name", "docker-test-cluster")
.put("node.name", "test-node")
.put("xpack.ml.enabled", "false")
.put("xpack.watcher.enabled", "false")
.build()),
OPENSEARCH(
new ImmutableMap.Builder<String, String>().putAll(BASE.getEnvVariables())
.put("plugins.security.disabled", "true")
.put("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "SecurityIsDisabled123$%^")
.put("bootstrap.system_call_filter", "false")
.build()),
OPENSEARCH_2_19(
new ImmutableMap.Builder<String, String>().putAll(BASE.getEnvVariables())
.put("plugins.security.disabled", "true")
.put("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "SecurityIsDisabled123$%^")
.put("search.insights.top_queries.exporter.type", "debug")
.put("bootstrap.system_call_filter", "false")
.build()
);

Expand Down Expand Up @@ -240,6 +258,12 @@ public ElasticsearchVersion(String imageName, Version version) {
}
}

public static class Elasticsearch8Version extends ContainerVersion {
public Elasticsearch8Version(String imageName, Version version) {
super(imageName, version, INITIALIZATION_FLAVOR.ELASTICSEARCH_8, "elasticsearch");
}
}

public static class OpenSearchVersion extends ContainerVersion {
public OpenSearchVersion(String imageName, Version version) {
super(imageName, version, VersionMatchers.isOS_2_19.test(version) ? INITIALIZATION_FLAVOR.OPENSEARCH_2_19 : INITIALIZATION_FLAVOR.OPENSEARCH, "opensearch");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,31 @@ public void createIndex(final String index) {
createIndex(index, body);
}

// Bloom Filter is a relatively new default index setting introduced in ES 8x
@SneakyThrows
public void disableBloom(final String index) {
final String body = "{" +
" \"index\": {" +
" \"bloom_filter_for_id_field\": {" +
" \"enabled\": false" +
" }" +
" }" +
"}";

var response = put("/" + index + "/_settings", body);
assertThat(response.getKey(), equalTo(200));
}

@SneakyThrows
public void createIndex(final String index, final String body) {
var response = put("/" + index, body);
assertThat(response.getKey(), anyOf(equalTo(201), equalTo(200)));

// Automatically apply ES 8.x specific index tweaks
if (VersionMatchers.isES_8_X.test(clusterVersion)) {
log.info("Cluster is ES 8.x — applying disableBloom setting on index: {}", index);
disableBloom(index);
}
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class VersionMatchers {
public static final Predicate<Version> isES_6_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 6.8"));
public static final Predicate<Version> isES_7_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 7.10"));
public static final Predicate<Version> isES_7_10 = VersionMatchers.matchesMinorVersion(Version.fromString("ES 7.10.2"));
public static final Predicate<Version> isES_8_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 8.17"));
public static final Predicate<Version> equalOrGreaterThanES_7_10 = VersionMatchers.equalOrGreaterThanMinorVersion(Version.fromString("ES 7.10"));

public static final Predicate<Version> isOS_1_X = VersionMatchers.matchesMajorVersion(Version.fromString("OS 1.0.0"));
Expand Down Expand Up @@ -45,7 +46,6 @@ private static Predicate<Version> matchesMajorVersion(final Version version) {
}
var flavorMatches = compatibleFlavor(other.getFlavor()).test(version.getFlavor());
var majorVersionNumberMatches = version.getMajor() == other.getMajor();

return flavorMatches && majorVersionNumberMatches;
};
}
Expand Down Expand Up @@ -79,4 +79,5 @@ private static Predicate<Version> belowMajorVersion(final Version version) {
var isLowerMajorVersion = other.getMajor() < version.getMajor();
return flavorMatches && isLowerMajorVersion;
};
}}
}
}
Loading