Skip to content

Commit 7a6f221

Browse files
linuxpimitrofmep
authored andcommitted
Enhance search preference based routing for WRR (opensearch-project#6834)
Signed-off-by: Varun Bansal <[email protected]> Signed-off-by: Valentin Mitrofanov <[email protected]>
1 parent db21009 commit 7a6f221

File tree

9 files changed

+597
-64
lines changed

9 files changed

+597
-64
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8787
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))
8888
- Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464)
8989
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
90+
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
9091

9192
### Dependencies
9293
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))

server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java

+379-16
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.util.List;
2222

23+
import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING;
24+
2325
/**
2426
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
2527
* This decides if retryable shard search requests can be tried on shard copies present in data
@@ -72,9 +74,13 @@ public SearchShardTarget findNext(
7274
Runnable onShardSkipped
7375
) {
7476
SearchShardTarget next = shardIt.nextOrNull();
77+
if (ignoreWeightedRouting(clusterState)) {
78+
return next;
79+
}
80+
7581
while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) {
7682
SearchShardTarget nextShard = next;
77-
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
83+
if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) {
7884
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
7985
getWeightedRoutingStats().updateFailOpenCount();
8086
break;
@@ -98,10 +104,13 @@ public SearchShardTarget findNext(
98104
*/
99105
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) {
100106
ShardRouting next = shardsIt.nextOrNull();
107+
if (ignoreWeightedRouting(clusterState)) {
108+
return next;
109+
}
101110

102111
while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) {
103112
ShardRouting nextShard = next;
104-
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
113+
if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) {
105114
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
106115
getWeightedRoutingStats().updateFailOpenCount();
107116
break;
@@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
117126
* @return true if can fail open ie request shard copies present in nodes with weighted shard
118127
* routing weight set to zero
119128
*/
120-
private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) {
121-
return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
129+
private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) {
130+
return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
122131
}
123132

124133
private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) {
@@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI
131140
return false;
132141
}
133142

143+
private boolean ignoreWeightedRouting(ClusterState clusterState) {
144+
return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings());
145+
}
146+
134147
public WeightedRoutingStats getWeightedRoutingStats() {
135148
return WeightedRoutingStats.getInstance();
136149
}

server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java

+6-17
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,12 @@ public ShardIterator activeInitializingShardsWeightedIt(
324324
WeightedRouting weightedRouting,
325325
DiscoveryNodes nodes,
326326
double defaultWeight,
327-
boolean isFailOpenEnabled
327+
boolean isFailOpenEnabled,
328+
@Nullable Integer seed
328329
) {
329-
final int seed = shufflerForWeightedRouting.nextSeed();
330+
if (seed == null) {
331+
seed = shufflerForWeightedRouting.nextSeed();
332+
}
330333
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);
331334

332335
// append shards for attribute value with weight zero, so that shard search requests can be tried on
@@ -350,6 +353,7 @@ public ShardIterator activeInitializingShardsWeightedIt(
350353
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
351354
}
352355
}
356+
353357
return new PlainShardIterator(shardId, ordered);
354358
}
355359

@@ -371,21 +375,6 @@ private List<ShardRouting> activeInitializingShardsWithWeights(
371375
return orderedListWithDistinctShards;
372376
}
373377

374-
/**
375-
* Returns an iterator over active and initializing shards, shards are ordered by weighted
376-
* round-robin scheduling policy. Uses the passed seed to shuffle the shards.
377-
*
378-
*/
379-
public ShardIterator activeInitializingShardsSimpleWeightedIt(
380-
WeightedRouting weightedRouting,
381-
DiscoveryNodes nodes,
382-
double defaultWeight,
383-
int seed
384-
) {
385-
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);
386-
return new PlainShardIterator(shardId, ordered);
387-
}
388-
389378
/**
390379
* Returns a list containing shard routings ordered using weighted round-robin scheduling.
391380
*/

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

+42-9
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,30 @@ public class OperationRouting {
9393

9494
public static final Setting<Boolean> STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting(
9595
"cluster.routing.weighted.strict",
96+
true,
97+
Setting.Property.Dynamic,
98+
Setting.Property.NodeScope
99+
);
100+
101+
public static final Setting<Boolean> IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting(
102+
"cluster.routing.ignore_weighted_routing",
96103
false,
97104
Setting.Property.Dynamic,
98105
Setting.Property.NodeScope
99106
);
107+
108+
private static final List<Preference> WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList(
109+
Preference.ONLY_NODES,
110+
Preference.PREFER_NODES
111+
);
112+
100113
private volatile List<String> awarenessAttributes;
101114
private volatile boolean useAdaptiveReplicaSelection;
102115
private volatile boolean ignoreAwarenessAttr;
103116
private volatile double weightedRoutingDefaultWeight;
104117
private volatile boolean isFailOpenEnabled;
105118
private volatile boolean isStrictWeightedShardRouting;
119+
private volatile boolean ignoreWeightedRouting;
106120

107121
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
108122
// whether to ignore awareness attributes when routing requests
@@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
116130
this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings);
117131
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
118132
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
133+
this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings);
119134
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
120135
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
121136
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
122137
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
123138
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
139+
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
124140
}
125141

126142
void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
@@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) {
143159
this.isStrictWeightedShardRouting = strictWeightedShardRouting;
144160
}
145161

162+
void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) {
163+
this.ignoreWeightedRouting = isWeightedRoundRobinEnabled;
164+
}
165+
146166
public boolean isIgnoreAwarenessAttr() {
147167
return ignoreAwarenessAttr;
148168
}
@@ -314,11 +334,7 @@ private ShardIterator preferenceActiveShardIterator(
314334
}
315335
}
316336
preferenceType = Preference.parse(preference);
317-
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
318-
throw new PreferenceBasedSearchNotAllowedException(
319-
"Preference type based routing not allowed with strict weighted shard routing enabled"
320-
);
321-
}
337+
checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata);
322338
switch (preferenceType) {
323339
case PREFER_NODES:
324340
final Set<String> nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(","))
@@ -344,11 +360,16 @@ private ShardIterator preferenceActiveShardIterator(
344360
// for a different element in the list by also incorporating the
345361
// shard ID into the hash of the user-supplied preference key.
346362
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
347-
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
348-
return indexShard.activeInitializingShardsSimpleWeightedIt(
363+
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
364+
isStrictWeightedShardRouting,
365+
ignoreWeightedRouting,
366+
weightedRoutingMetadata
367+
)) {
368+
return indexShard.activeInitializingShardsWeightedIt(
349369
weightedRoutingMetadata.getWeightedRouting(),
350370
nodes,
351371
getWeightedRoutingDefaultWeight(),
372+
isFailOpenEnabled,
352373
routingHash
353374
);
354375
} else if (ignoreAwarenessAttributes()) {
@@ -365,12 +386,13 @@ private ShardIterator shardRoutings(
365386
@Nullable Map<String, Long> nodeCounts,
366387
@Nullable WeightedRoutingMetadata weightedRoutingMetadata
367388
) {
368-
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) {
389+
if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) {
369390
return indexShard.activeInitializingShardsWeightedIt(
370391
weightedRoutingMetadata.getWeightedRouting(),
371392
nodes,
372393
getWeightedRoutingDefaultWeight(),
373-
isFailOpenEnabled
394+
isFailOpenEnabled,
395+
null
374396
);
375397
} else if (ignoreAwarenessAttributes()) {
376398
if (useAdaptiveReplicaSelection) {
@@ -438,4 +460,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef
438460
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
439461
}
440462

463+
private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) {
464+
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
465+
isStrictWeightedShardRouting,
466+
ignoreWeightedRouting,
467+
weightedRoutingMetadata
468+
) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) {
469+
throw new PreferenceBasedSearchNotAllowedException(
470+
"Preference type based routing not allowed with strict weighted shard routing enabled"
471+
);
472+
}
473+
}
441474
}

server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java

+12
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) {
5353
}
5454
return false;
5555
}
56+
57+
public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) {
58+
return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet();
59+
}
60+
61+
public static boolean shouldPerformStrictWeightedRouting(
62+
boolean isStrictWeightedShardRouting,
63+
boolean ignoreWeightedRouting,
64+
WeightedRoutingMetadata weightedRoutingMetadata
65+
) {
66+
return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata);
67+
}
5668
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ public void apply(Settings value, Settings current, Settings previous) {
546546
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
547547
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
548548
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
549+
OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING,
549550
IndexGraveyard.SETTING_MAX_TOMBSTONES,
550551
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
551552
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,

0 commit comments

Comments
 (0)