Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace only, lets revert?

Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public MigrateResult execute(RootMetadataMigrationContext context) {
} catch (ParameterException pe) {
log.atError().setCause(pe).setMessage("Invalid parameter").log();
migrateResult
.exitCode(INVALID_PARAMETER_CODE)
.errorMessage("Invalid parameter: " + pe.getMessage())
.build();
.exitCode(INVALID_PARAMETER_CODE)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, this is my IDE being out of sync with the usual settings. I'll go through and clean up the indents.

Copy link
Member

@peternied peternied Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a PR once upon a time for an intellij formatter [1], there is a VS code in the project too. Maybe that could help. 🤷

.errorMessage("Invalid parameter: " + pe.getMessage())
.build();
} catch (Throwable e) {
log.atError().setCause(e).setMessage("Unexpected failure").log();
migrateResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public abstract class MigratorEvaluatorBase {
static final int INVALID_PARAMETER_CODE = 999;
static final int UNEXPECTED_FAILURE_CODE = 888;

static final int MAX_REPLICA_ADJUSTMENT_LOOPS = 4;

protected final MigrateOrEvaluateArgs arguments;
protected final ClusterReaderExtractor clusterReaderCliExtractor;

Expand Down Expand Up @@ -68,19 +70,23 @@ protected Transformer getCustomTransformer() {
return new TransformerToIJsonTransformerAdapter(transformer);
}

protected Transformer selectTransformer(Clusters clusters) {
protected Transformer selectTransformer(Clusters clusters, int presumedClusterDimensionality) {
var versionTransformer = TransformFunctions.getTransformer(
clusters.getSource().getVersion(),
clusters.getTarget().getVersion(),
arguments.minNumberOfReplicas,
arguments.metadataTransformationParams
clusters.getSource().getVersion(),
clusters.getTarget().getVersion(),
presumedClusterDimensionality,
arguments.metadataTransformationParams
);
var customTransformer = getCustomTransformer();
var compositeTransformer = new FanOutCompositeTransformer(customTransformer, versionTransformer);
log.atInfo().setMessage("Selected transformer: {}").addArgument(compositeTransformer).log();
return compositeTransformer;
}

protected Transformer selectTransformer(Clusters clusters) {
return selectTransformer(clusters, arguments.minNumberOfReplicas);
}

protected Items migrateAllItems(MigrationMode migrationMode, Clusters clusters, Transformer transformer, RootMetadataMigrationContext context) {
var items = Items.builder();
items.dryRun(migrationMode.equals(MigrationMode.SIMULATE));
Expand Down Expand Up @@ -118,16 +124,33 @@ private GlobalMetadataCreatorResults migrateGlobalMetadata(MigrationMode mode, C
return metadataResults;
}

private IndexMetadataResults migrateIndices(MigrationMode mode, Clusters clusters, Transformer transformer, RootMetadataMigrationContext context) {
var indexRunner = new IndexRunner(
arguments.snapshotName,
clusters.getSource().getIndexMetadata(),
clusters.getTarget().getIndexCreator(),
transformer,
arguments.dataFilterArgs.indexAllowlist
);
var indexResults = indexRunner.migrateIndices(mode, context.createIndexContext());
log.info("Index copy complete.");
return indexResults;
private IndexMetadataResults migrateIndices(MigrationMode mode, Clusters clusters, Transformer initalTransformer, RootMetadataMigrationContext context) {
int presumedClusterDimensionality = arguments.minNumberOfReplicas;
var transformer = initalTransformer;
while (true) {
var indexRunner = new IndexRunner(
arguments.snapshotName,
clusters.getSource().getIndexMetadata(),
clusters.getTarget().getIndexCreator(),
transformer,
arguments.dataFilterArgs.indexAllowlist
);
var indexResults = indexRunner.migrateIndices(mode, context.createIndexContext());
// Check whether any indices failed with an incompatibleReplicaCount
boolean incompatibleReplicaCountSeen = indexResults.getIndexes().stream().anyMatch(result -> result.wasFatal() && result.getFailureType().equals(CreationResult.CreationFailureType.INCOMPATIBLE_REPLICA_COUNT_FAILURE));
if (incompatibleReplicaCountSeen) {
if (presumedClusterDimensionality >= arguments.minNumberOfReplicas + MAX_REPLICA_ADJUSTMENT_LOOPS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given cluster.routing.allocation.awareness.force.zone.values can be arbritraily high, can we parse out the response and skip straight to the next-higher valid value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we definitely could. I went with the approach that we defined in the ticket refinement, but this would also be possible.

log.atWarn().setMessage("Incompatible replica count seen after max adjustment attempts ({}). Max replica count attempted: {}")
.addArgument(MAX_REPLICA_ADJUSTMENT_LOOPS).addArgument(presumedClusterDimensionality - 1).log();
return indexResults;
}
presumedClusterDimensionality++;
log.warn("Incompatible replica count seen for the cluster dimensionality. Retrying with an assumed cluster dimensionality of {}", presumedClusterDimensionality);
transformer = selectTransformer(clusters, presumedClusterDimensionality);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of spooky - we theoretically 'switch' which transformer is being used min-flight based on behavior. Can we replumb how this value get passed around to avoid this high level swap?

continue;
}
log.info("Index copy complete.");
return indexResults;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.opensearch.migrations;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterWithZoneAwarenessContainer;
import org.opensearch.migrations.commands.MigrationItemResult;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Tag("isolatedTest")
public class ReplicaCountWithAZsTest extends BaseMigrationTest{

private static final SearchClusterContainer.ContainerVersion SOURCE_VERSION = SearchClusterContainer.ES_V7_10_2;
private static final SearchClusterContainer.ContainerVersion TARGET_VERSION = SearchClusterContainer.OS_LATEST;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its strange that this is unused, maybe the zoned search cluster is a static class for only this test case


private static Stream<Arguments> scenarios() {
return Stream.of(
Arguments.of(1),
Arguments.of(2),
Arguments.of(3)
);
}
Comment on lines +26 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a test coverage benefit to running this more than once, seems like one test that detect this scenario would be good enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason to do it is primarily that these match the scenarios customers may have and the behavior--at least for 1 AZ vs more-than-one is different. I'm open to removing the 2 AZ case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 AZ doesn't seem like an interesting case, because any number will match, no?


@ParameterizedTest(name = "Replica count test with {0} AZs")
@MethodSource(value = "scenarios")
void testReplicaCounts(int availabilityZoneCount) {
try (
final var sourceCluster = new SearchClusterContainer(SOURCE_VERSION);
final var targetCluster = new SearchClusterWithZoneAwarenessContainer(availabilityZoneCount)
) {
this.sourceCluster = sourceCluster;
this.targetCluster = targetCluster;
startClusters();

// Create indices on source cluster with specified shard and replica counts
// Shard count is 1 or 5, replica count is 0 through 3
AtomicInteger createdIndexCount = new AtomicInteger();
var indexNames = new ArrayList<String>();
List.of(1, 5).forEach(
shardCount -> {
List.of(0, 1, 2, 3).forEach(replicaCount -> {
var name = "index_" + shardCount + "_" + replicaCount;
var body = "{\"settings\": {\"index\": {\"number_of_replicas\": "+ replicaCount +", \"number_of_shards\": "+ shardCount + "}}}";
sourceOperations.createIndex(name, body);
indexNames.add(name);
createdIndexCount.getAndIncrement();
});
}
);

MigrateOrEvaluateArgs arguments = new MigrateOrEvaluateArgs();
arguments.sourceArgs.host = sourceCluster.getUrl();
arguments.targetArgs.host = targetCluster.getUrl();

MigrationItemResult result = executeMigration(arguments, MetadataCommands.MIGRATE);
assertThat(result.getExitCode(), equalTo(0));
// Ensure that the same number of indices were created (successfully) on the target cluster
verifyIndexesExistOnTargetCluster(indexNames);
}
}

void verifyIndexesExistOnTargetCluster(List<String> indexNames) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like the new INCOMPATIBLE_REPLICA_COUNT_FAILURE is encountered during this test, is that scenario possible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for the number of zones + max number of loops we're testing here. I could set up a test with 5+ zones that would cause it to be emitted, but I didn't because 1/ that's going to be a very slow test, and 2/ while a possible situation for a self-managed service cluster, I don't think it's possible in AOS (and unlikely no matter what)

for (String indexName : indexNames) {
var res = targetOperations.get("/" + indexName);
assertThat(res.getValue(), res.getKey(), equalTo(200));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.migrations.bulkload.common;

public class IncompatibleReplicaCountException extends Exception {
public IncompatibleReplicaCountException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
@Slf4j
public class InvalidResponse extends RfsException {
private static final Pattern UNKNOWN_SETTING = Pattern.compile("unknown setting \\[([a-zA-Z0-9_.-]+)\\].+");
private static final Pattern AWARENESS_ATTRIBUTE_EXCEPTION = Pattern.compile("expected total copies needs to be a multiple of total awareness attributes");
private static final ObjectMapper objectMapper = new ObjectMapper();
private final transient HttpResponse response;

Expand Down Expand Up @@ -85,4 +86,36 @@ private static Map.Entry<String, String> getUnknownSetting(JsonNode json) {
return Map.entry(entry.getKey().asText(), matcher.group(1));
}).orElse(null);
}

/** Awareness attribute exceptions (when the replica count doesn't match the number of zones) present slightly differently
in different versions. The message is the same (`"Validation Failed: 1: expected total copies needs to be a multiple of total awareness attributes [3];"`,
for instance), but the type is different (either `validation_exception` or `illegal_argument_exception`). For this reason,
we're matching based on a regex against the message instead of also checking the type.
**/
public Optional<String> containsAwarenessAttributeException() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the message need to be parsed and returned by this function, that would aid the user if we displayed it to them? If not, this function could be reduced to a return a boolean value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should revisit exactly what we're doing with it down the line, but the reason I kept the whole thing is because it looks like Validation Failed: 1: expected total copies needs to be a multiple of total awareness attributes [3]; and that number at the end shows the number of awareness attributes, which could be very useful for a user trying to troubleshoot/understand this behavior.

try {
var interimResults = new ArrayList<String>();
var bodyNode = objectMapper.readTree(response.body);

if (bodyNode != null && bodyNode.has("error")) {
JsonNode errorNode = bodyNode.get("error");
JsonNode rootCauses = errorNode.get("root_cause");

if (rootCauses != null && rootCauses.isArray()) {
for (JsonNode cause : rootCauses) {
JsonNode reasonNode = cause.get("reason");
if (reasonNode != null && !reasonNode.isNull()) {
interimResults.add(reasonNode.textValue());
}
}
}
}
interimResults = interimResults.stream().filter(AWARENESS_ATTRIBUTE_EXCEPTION.asPredicate()).collect(Collectors.toCollection(ArrayList::new));

return interimResults.stream().findAny();

} catch (Exception e) {
log.warn("Error parsing error message to attempt recovery" + response.body, e);
return Optional.empty();
} }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ public class TransformFunctions {

private TransformFunctions() {}

/**
* presumedDimensionality is effectively another way of saying "our guess at the number of availability zones the
* target cluster is deployed across". We don't know for sure how many there are, so we start at 1 and then increment
* it until we find a number that works.
*/
public static Transformer getTransformer(
Version sourceVersion,
Version targetVersion,
int dimensionality,
int presumedDimensionality,
MetadataTransformerParams metadataTransformerParams
) {
if (VersionMatchers.isOS_2_X.or(VersionMatchers.isOS_1_X).test(targetVersion)) {
if (VersionMatchers.isES_5_X.test(sourceVersion)) {
return new Transformer_ES_5_6_to_OS_2_11(dimensionality, metadataTransformerParams);
return new Transformer_ES_5_6_to_OS_2_11(presumedDimensionality, metadataTransformerParams);
}
if (VersionMatchers.isES_6_X.test(sourceVersion)) {
return new Transformer_ES_6_8_to_OS_2_11(dimensionality, metadataTransformerParams);
return new Transformer_ES_6_8_to_OS_2_11(presumedDimensionality, metadataTransformerParams);
}
if (VersionMatchers.equalOrGreaterThanES_7_10.test(sourceVersion)) {
return new Transformer_ES_7_10_OS_2_11(dimensionality);
return new Transformer_ES_7_10_OS_2_11(presumedDimensionality);
}
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).test(sourceVersion)) {
return new Transformer_ES_7_10_OS_2_11(dimensionality);
return new Transformer_ES_7_10_OS_2_11(presumedDimensionality);
}
}
throw new IllegalArgumentException("Unsupported transformation requested for " + sourceVersion + " to " + targetVersion);
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned a long time for the test, that seems strange to me. Is it possible that the exception thrown during creation is causing the retry strategy to be hit? See OpenSearchClient.CREATE_ITEM_EXISTS_RETRY_STRATEGY, it specifically excludes InvalidResponse from being retried.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually pretty fast during the failed attempts, but the cluster seems to create indices very slowly when zone awareness is turned on. The cluster logs are showing ~30 seconds between successful index create commands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a bug in OpenSearch, is there one open for it?

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.IncompatibleReplicaCountException;
import org.opensearch.migrations.bulkload.common.InvalidResponse;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
Expand Down Expand Up @@ -46,6 +47,9 @@ public CreationResult create(

try {
createInner(index, mode, context, result, settings, body);
} catch (IncompatibleReplicaCountException e) {
result.failureType(CreationFailureType.INCOMPATIBLE_REPLICA_COUNT_FAILURE);
result.exception(e);
} catch (Exception e) {
result.failureType(CreationFailureType.TARGET_CLUSTER_FAILURE);
result.exception(e);
Expand All @@ -58,7 +62,7 @@ private void createInner(IndexMetadata index,
ICreateIndexContext context,
CreationResultBuilder result,
ObjectNode settings,
ObjectNode body) {
ObjectNode body) throws IncompatibleReplicaCountException {
// Create the index; it's fine if it already exists
try {
var alreadyExists = false;
Expand All @@ -72,6 +76,13 @@ private void createInner(IndexMetadata index,
result.failureType(CreationFailureType.ALREADY_EXISTS);
}
} catch (InvalidResponse invalidResponse) {
var potentialAwarenessAttributeException = invalidResponse.containsAwarenessAttributeException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming is a little messy, not sure if this was caught in a bulk rename.

Suggested change
var potentialAwarenessAttributeException = invalidResponse.containsAwarenessAttributeException();
var hasInvalidReplicaCountForZoneSize = invalidResponse.containsAwarenessAttributeException();

if (potentialAwarenessAttributeException.isPresent()) {
log.warn("Index creation failed due to awareness attribute exception: " + potentialAwarenessAttributeException.get());
throw new IncompatibleReplicaCountException(potentialAwarenessAttributeException.get(), invalidResponse);

}

var illegalArguments = invalidResponse.getIllegalArguments();

if (illegalArguments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
import org.opensearch.migrations.bulkload.common.SnapshotRepo;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.transformers.IndexTransformationException;
import org.opensearch.migrations.bulkload.transformers.Transformer;
Expand All @@ -31,34 +32,32 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte
var results = IndexMetadataResults.builder();
var skipCreation = FilterScheme.filterByAllowList(indexAllowlist).negate();

repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.forEach(index -> {
List<CreationResult> creationResults;
if (skipCreation.test(index.getName())) {
log.atInfo()
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it shifted from a stream -> for loop, is this needed?

List<CreationResult> creationResults;
if (skipCreation.test(index.getName())) {
log.atInfo()
.setMessage("Index {} was not part of the allowlist and will not be migrated.")
.addArgument(index.getName())
.log();
creationResults = List.of(CreationResult.builder()
creationResults = List.of(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build());
} else {
creationResults = createIndex(index.getName(), mode, context);
}
} else {
creationResults = createIndex(index.getName(), mode, context);
}

creationResults.forEach(results::index);
creationResults.forEach(results::index);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
if (!creationResults.isEmpty()) {
aliasResult.failureType(creationResults.get(0).getFailureType());
}
results.alias(aliasResult.build());
});
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
if (!creationResults.isEmpty()) {
aliasResult.failureType(creationResults.get(0).getFailureType());
}
results.alias(aliasResult.build());
});
}
return results.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum CreationFailureType {
ALREADY_EXISTS(false, "already exists"),
UNABLE_TO_TRANSFORM_FAILURE(true, "failed to transform to the target version"),
TARGET_CLUSTER_FAILURE(true, "failed on target cluster"),
INCOMPATIBLE_REPLICA_COUNT_FAILURE(true, "failed due to incompatible replica count for awareness attribute count"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include the CLI value to add to resolve this for our users?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisiting after finishing the review, maybe this doesn't get sent to users?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will only get sent to users if we can't find a working replica count after 4 increments. By the CLI value to add -- what do you mean? The right replica count to set?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case doesn't seem to catch this case, but also this seems highly unlikely since AWS AZ are going to be a number between 2-6.

SKIPPED_DUE_TO_FILTER(false, "skipped due to filter");

private final boolean fatal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ public SearchClusterContainer(final ContainerVersion version) {
this.containerVersion = version;
}

public SearchClusterContainer(final ContainerVersion version, Map<String, String> supplementaryEnvVariables) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is replacing the other constructor implementation causing code duplicate, lets instead have the previous version call the new constructor with an empty map.

super(DockerImageName.parse(version.imageName));
var builder = this.withExposedPorts(9200, 9300);

var combinedEnvVariables = new ImmutableMap.Builder<String, String>().putAll(
version.getInitializationType().getEnvVariables()).putAll(
supplementaryEnvVariables
).build();
builder.withEnv(combinedEnvVariables)
.waitingFor(Wait.forHttp("/").forPort(9200).forStatusCode(200).withStartupTimeout(Duration.ofMinutes(1)));
this.containerVersion = version;
}


public void copySnapshotData(final String directory) {
try {
// Execute command to list all files in the directory
Expand Down
Loading
Loading