Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.x]
### Added
- Add seperate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532))
- Use Lucene `pack` method for `half_float` and `usigned_long` when using `ApproximatePointRangeQuery`.

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@

package org.opensearch.cluster.routing.allocation.decider;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.node.Node;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.After;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -28,28 +41,97 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING;

@ParameterizedStaticSettingsOpenSearchIntegTestCase.ClusterScope(scope = ParameterizedStaticSettingsOpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class ShardsLimitAllocationDeciderIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(100, ByteSizeUnit.KB).getBytes();

public ShardsLimitAllocationDeciderIT(Settings nodeSettings) {
super(nodeSettings);
}

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
public class ShardsLimitAllocationDeciderIT extends OpenSearchIntegTestCase {
@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() }
);
}

protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

@After
public void teardown() throws Exception {
String testName = getTestName();
if (testName.contains("WithoutRemoteStore") == false) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
Boolean isWarmEnabled = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmEnabled) {
return Settings.builder().put(super.indexSettings()).put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true).build();
} else {
return super.indexSettings();
}
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
String testName = getTestName();
Settings.Builder builder = Settings.builder();
if (testName.contains("WithoutRemoteStore") == false) {
builder.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath));
}
return builder.put(super.nodeSettings(nodeOrdinal)).build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

@Override
protected boolean addMockIndexStorePlugin() {
return WRITABLE_WARM_INDEX_SETTING.get(settings) == false;
}

public void testClusterWideShardsLimit() {
// Set the cluster-wide shard limit to 2
updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 4);
startTestNodes(3);
updateClusterSetting(getShardsPerNodeKey(false), 4);

// Create the first two indices with 3 shards and 1 replica each
createIndex("test1", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test1",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);
createIndex(
"test2",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

// Create the third index with 2 shards and 1 replica
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test3",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

// Wait for the shard limit to be applied
try {
Expand Down Expand Up @@ -83,11 +165,13 @@ public void testClusterWideShardsLimit() {
}

public void testIndexSpecificShardLimit() {
startTestNodes(3);
// Set the index-specific shard limit to 2 for the first index only
Settings indexSettingsWithLimit = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 4)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2)
.put(getIndexLevelShardsPerNodeKey(false), 2)
.build();

Settings indexSettingsWithoutLimit = Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build();
Expand All @@ -99,7 +183,10 @@ public void testIndexSpecificShardLimit() {
createIndex("test2", indexSettingsWithoutLimit);

// Create the third index with 3 shards and 1 replica, without the index-specific limit
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test3",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

try {
// Wait for the shard limit to be applied
Expand Down Expand Up @@ -146,22 +233,30 @@ public void testIndexSpecificShardLimit() {
}

public void testCombinedClusterAndIndexSpecificShardLimits() {
startTestNodes(3);
// Set the cluster-wide shard limit to 6
updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 6);
updateClusterSetting(getShardsPerNodeKey(false), 6);

// Create the first index with 3 shards, 1 replica, and index-specific limit of 1
Settings indexSettingsWithLimit = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 3)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
.put(getIndexLevelShardsPerNodeKey(false), 1)
.build();
createIndex("test1", indexSettingsWithLimit);

// Create the second index with 4 shards and 1 replica
createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test2",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

// Create the third index with 3 shards and 1 replica
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test3",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

try {
assertBusy(() -> {
Expand Down Expand Up @@ -218,9 +313,6 @@ public void testCombinedClusterAndIndexSpecificShardLimits() {
assertEquals("One node should have 5 shards", 5, shardCounts.get(2).intValue());

// Check that all nodes have only one shard of the first index
for (Set<String> indexesOnNode : indexShardsPerNode.values()) {
assertTrue("Each node should have a shard from test1", indexesOnNode.contains("test1"));
}
});
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -242,6 +334,7 @@ public void testCombinedClusterAndIndexSpecificShardLimits() {
public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
// Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 3)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1)
Expand Down Expand Up @@ -276,24 +369,30 @@ public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
* indicating that this setting is only applicable for remote store enabled clusters.
*/
public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
assumeTrue(
"Test should only run in the default (non-parameterized) test suite",
WRITABLE_WARM_INDEX_SETTING.get(settings) == false
);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1);
updateClusterSetting(getShardsPerNodeKey(true), 1);
});

// Verify the exception message
assertTrue(
"Exception should mention that the setting requires remote store",
exception.getMessage()
.contains(
"Setting [cluster.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters"
"Setting [cluster.routing.allocation.total_primary_shards_per_node] "
+ "can only be used with remote store enabled clusters"
)
);

// Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 3)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
.put(getIndexLevelShardsPerNodeKey(false), 1)
.build();

createIndex("test_index", indexSettings);
Expand All @@ -302,4 +401,40 @@ public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
private void updateClusterSetting(String setting, int value) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(setting, value)).get();
}

private Settings warmNodeSettings(ByteSizeValue cacheSize) {
return Settings.builder()
.put(super.nodeSettings(0))
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
}

/**
* Helper method to start nodes that support both data and warm roles
*/
private void startTestNodes(int nodeCount) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
Settings nodeSettings = Settings.builder().put(warmNodeSettings(new ByteSizeValue(TOTAL_SPACE_BYTES))).build();
internalCluster().startWarmOnlyNodes(nodeCount, nodeSettings);
}
}

private String getShardsPerNodeKey(boolean primary) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
return CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey();
} else {
return primary ? CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() : CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey();
}
}

private String getIndexLevelShardsPerNodeKey(boolean primary) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
return INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey();
} else {
return primary ? INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() : INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey();
}
}
}
Loading
Loading