Skip to content

Commit c677397

Browse files
authored
Add Warm Disk Threshold Allocation Decider for Warm shards (#18082)
Signed-off-by: Gagan Singh Saini <[email protected]>
1 parent 803884f commit c677397

File tree

10 files changed

+816
-149
lines changed

10 files changed

+816
-149
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
1111
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
1212
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
13+
- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082))
1314
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
1415
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
1516
- Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))

server/src/main/java/org/opensearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
8282
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
8383
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
84+
import org.opensearch.cluster.routing.allocation.decider.WarmDiskThresholdDecider;
8485
import org.opensearch.cluster.service.ClusterService;
8586
import org.opensearch.common.inject.AbstractModule;
8687
import org.opensearch.common.settings.ClusterSettings;
@@ -393,6 +394,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
393394
addAllocationDecider(deciders, new SearchReplicaAllocationDecider());
394395
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
395396
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
397+
addAllocationDecider(deciders, new WarmDiskThresholdDecider(settings, clusterSettings));
396398
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
397399
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
398400
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));

server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ public class DiskThresholdSettings {
6060
Setting.Property.Dynamic,
6161
Setting.Property.NodeScope
6262
);
63+
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING = Setting.boolSetting(
64+
"cluster.routing.allocation.disk.warm_threshold_enabled",
65+
true,
66+
Setting.Property.Dynamic,
67+
Setting.Property.NodeScope
68+
);
69+
public static final Setting<Boolean> ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting(
70+
"cluster.routing.allocation.disk.watermark.enable_for_single_data_node",
71+
false,
72+
Setting.Property.NodeScope
73+
);
6374
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = new Setting<>(
6475
"cluster.routing.allocation.disk.watermark.low",
6576
"85%",
@@ -113,6 +124,7 @@ public class DiskThresholdSettings {
113124
private volatile boolean includeRelocations;
114125
private volatile boolean createIndexBlockAutoReleaseEnabled;
115126
private volatile boolean enabled;
127+
private volatile boolean warmThresholdEnabled;
116128
private volatile TimeValue rerouteInterval;
117129
private volatile Double freeDiskThresholdFloodStage;
118130
private volatile ByteSizeValue freeBytesThresholdFloodStage;
@@ -139,13 +151,18 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings)
139151
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
140152
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
141153
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
154+
this.warmThresholdEnabled = CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
142155
this.createIndexBlockAutoReleaseEnabled = CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.get(settings);
143156
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
144157
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
145158
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
146159
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
147160
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
148161
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
162+
clusterSettings.addSettingsUpdateConsumer(
163+
CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING,
164+
this::setWarmThresholdEnabled
165+
);
149166
clusterSettings.addSettingsUpdateConsumer(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE, this::setCreateIndexBlockAutoReleaseEnabled);
150167
}
151168

@@ -311,6 +328,10 @@ private void setEnabled(boolean enabled) {
311328
this.enabled = enabled;
312329
}
313330

331+
private void setWarmThresholdEnabled(boolean enabled) {
332+
this.warmThresholdEnabled = enabled;
333+
}
334+
314335
private void setLowWatermark(String lowWatermark) {
315336
// Watermark is expressed in terms of used data, but we need "free" data watermark
316337
this.lowWatermarkRaw = lowWatermark;
@@ -390,6 +411,10 @@ public boolean isEnabled() {
390411
return enabled;
391412
}
392413

414+
public boolean isWarmThresholdEnabled() {
415+
return warmThresholdEnabled;
416+
}
417+
393418
public TimeValue getRerouteInterval() {
394419
return rerouteInterval;
395420
}

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 6 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -48,27 +48,24 @@
4848
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
4949
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
5050
import org.opensearch.common.settings.ClusterSettings;
51-
import org.opensearch.common.settings.Setting;
5251
import org.opensearch.common.settings.Settings;
5352
import org.opensearch.core.common.Strings;
5453
import org.opensearch.core.common.unit.ByteSizeValue;
5554
import org.opensearch.core.index.Index;
5655
import org.opensearch.core.index.shard.ShardId;
5756
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
58-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
5957
import org.opensearch.snapshots.SnapshotShardSizeInfo;
6058

6159
import java.util.List;
6260
import java.util.Map;
6361
import java.util.Set;
64-
import java.util.stream.Collectors;
65-
import java.util.stream.StreamSupport;
6662

6763
import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
6864
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
6965
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
7066
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
7167
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
68+
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.ENABLE_FOR_SINGLE_DATA_NODE;
7269

7370
/**
7471
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
@@ -101,12 +98,6 @@ public class DiskThresholdDecider extends AllocationDecider {
10198

10299
public static final String NAME = "disk_threshold";
103100

104-
public static final Setting<Boolean> ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting(
105-
"cluster.routing.allocation.disk.watermark.enable_for_single_data_node",
106-
false,
107-
Setting.Property.NodeScope
108-
);
109-
110101
private final DiskThresholdSettings diskThresholdSettings;
111102
private final boolean enableForSingleDataNode;
112103
private final FileCacheSettings fileCacheSettings;
@@ -176,44 +167,9 @@ public static long sizeOfRelocatingShards(
176167
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
177168
ClusterInfo clusterInfo = allocation.clusterInfo();
178169

179-
/*
180-
The following block enables allocation for remote shards within safeguard limits of the filecache.
181-
*/
170+
// For this case WarmDiskThresholdDecider Decider will take decision
182171
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
183-
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
184-
// we don't need to check the ratio
185-
if (dataToFileCacheSizeRatio <= 0.1f) {
186-
return Decision.YES;
187-
}
188-
189-
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
190-
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
191-
.collect(Collectors.toList());
192-
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
193-
.map(ShardRouting::getExpectedShardSize)
194-
.mapToLong(Long::longValue)
195-
.sum();
196-
197-
final long shardSize = getExpectedShardSize(
198-
shardRouting,
199-
0L,
200-
allocation.clusterInfo(),
201-
allocation.snapshotShardSizeInfo(),
202-
allocation.metadata(),
203-
allocation.routingTable()
204-
);
205-
206-
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
207-
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
208-
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
209-
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
210-
return allocation.decision(
211-
Decision.NO,
212-
NAME,
213-
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
214-
);
215-
}
216-
return Decision.YES;
172+
return Decision.ALWAYS;
217173
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
218174
return Decision.NO;
219175
}
@@ -481,12 +437,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
481437
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
482438
}
483439

484-
/*
485-
The following block prevents movement for remote shards since they do not use the local storage as
486-
the primary source of data storage.
487-
*/
440+
// For this case WarmDiskThresholdDecider Decider will take decision
488441
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
489442
return Decision.ALWAYS;
443+
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
444+
return Decision.NO;
490445
}
491446

492447
final ClusterInfo clusterInfo = allocation.clusterInfo();

0 commit comments

Comments
 (0)