diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 172d8062a5e8..80ce6b208bc9 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -749,6 +749,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZooKeeper interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|`PT300S`| |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`| |`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`| +|`druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold`|Only used when `druid.coordinator.balancer.strategy` is `diskNormalized`. Minimum fractional cost reduction required before a segment is moved off a server that already holds it. A value of `0.05` requires the destination to be at least 5% cheaper than the source, which prevents oscillation between servers with similar disk utilization. Must be in `[0.0, 1.0)`; `0.0` disables the anti-oscillation discount.|`0.05`| |`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute| |`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than or equal to the `druid.segmentCache.numLoadingThreads` config on Historical service. If this value is not configured, the coordinator uses the value of the `numLoadingThreads` for the respective server. | `druid.segmentCache.numLoadingThreads` | |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false| diff --git a/docs/design/coordinator.md b/docs/design/coordinator.md index bc4c5ebc1cba..b0b68115df8d 100644 --- a/docs/design/coordinator.md +++ b/docs/design/coordinator.md @@ -88,7 +88,7 @@ But in a tier with several Historicals (or a low replication factor), segment re Thus, the Coordinator constantly monitors the set of segments present on each Historical in a tier and employs one of the following strategies to identify segments that may be moved from one Historical to another to retain balance. - `cost` (default): For a given segment in a tier, this strategy picks the server with the minimum "cost" of placing that segment. The cost is a function of the data interval of the segment and the data intervals of all the segments already present on the candidate server. In essence, this strategy tries to avoid placing segments with adjacent or overlapping data intervals on the same server. This is based on the premise that adjacent-interval segments are more likely to be used together in a query and placing them on the same server may lead to skewed CPU usages of Historicals. -- `diskNormalized`: A derivative of the `cost` strategy that weights the cost of placing a segment on a server with the disk usage ratio of the server. There are known issues with this strategy and is not recommended for a production cluster. +- `diskNormalized`: A derivative of the `cost` strategy that multiplies the cost of placing a segment on a server by the server's disk usage ratio (`diskUsed / maxSize`). This penalizes fuller servers and drives disk utilization to equalize across the tier, which is useful when historicals within a tier hold segments of widely varying sizes. To prevent oscillation when servers have similar utilization, a segment that is already placed on a server receives a cost discount; a move only fires when the destination saves at least `druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold` (default `0.05`, i.e. 5%) of the source's cost. - `random`: Distributes segments randomly across servers. This is an experimental strategy and is not recommended for a production cluster. All of the above strategies prioritize moving segments from the Historical with the least available disk space. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java index 1d16c4785b56..e8b1b902dde6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java @@ -19,32 +19,58 @@ package org.apache.druid.server.coordinator.balancer; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; /** - * A {@link BalancerStrategy} which can be used when historicals in a tier have - * varying disk capacities. This strategy normalizes the cost of placing a segment on - * a server as calculated by {@link CostBalancerStrategy} by doing the following: - * - * i.e. to place a segment on a given server + * A {@link BalancerStrategy} which normalizes the cost of placing a segment on a + * server as calculated by {@link CostBalancerStrategy} by multiplying it by the + * server's disk usage ratio. *
- * cost = as computed by CostBalancerStrategy
- * normalizedCost = (cost / numSegments) * usageRatio
- *                = (cost / numSegments) * (diskUsed / totalDiskSpace)
+ * normalizedCost = cost * usageRatio
+ *     where usageRatio = diskUsed / totalDiskSpace
  * 
+ * This penalizes servers that are more full, driving disk utilization to equalize + * across the tier. When all servers have equal disk usage, the behavior is identical + * to {@link CostBalancerStrategy}. When historicals have different disk capacities, + * this naturally accounts for both fill level and total capacity. + *

+ * To prevent oscillation when servers have similar utilization, any server that + * is already projected to hold the segment (the source on a move, or a currently + * serving node on a drop) receives a cost discount equal to + * {@link #DEFAULT_MOVE_COST_SAVINGS_THRESHOLD}. A move therefore fires only when + * the destination saves at least this fraction of the source's cost. The default + * is configurable via + * {@code druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold}. */ public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy { + /** + * Default minimum fractional cost reduction required before a segment will + * be moved off a server that is already projected to hold it. A value of + * {@code 0.05} means the destination must be at least 5% cheaper than the + * source for the move to happen. + */ + static final double DEFAULT_MOVE_COST_SAVINGS_THRESHOLD = 0.05; + + private final double sourceCostMultiplier; + public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec) + { + this(exec, DEFAULT_MOVE_COST_SAVINGS_THRESHOLD); + } + + public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec, double moveCostSavingsThreshold) { super(exec); + Preconditions.checkArgument( + moveCostSavingsThreshold >= 0.0 && moveCostSavingsThreshold < 1.0, + "moveCostSavingsThreshold[%s] must be in [0.0, 1.0)", + moveCostSavingsThreshold + ); + this.sourceCostMultiplier = 1.0 - moveCostSavingsThreshold; } @Override @@ -59,15 +85,22 @@ protected double computePlacementCost( return cost; } - int nSegments = 1; - if (server.getServer().getNumSegments() > 0) { - nSegments = server.getServer().getNumSegments(); + // Guard against NaN propagation in the cost comparator if a server + // somehow reports a non-positive maxSize. Such a server cannot hold + // anything and will be rejected by canLoadSegment, so returning the + // raw cost is safe. + final long maxSize = server.getMaxSize(); + if (maxSize <= 0) { + return cost; } - double normalizedCost = cost / nSegments; - double usageRatio = (double) server.getSizeUsed() / (double) server.getServer().getMaxSize(); + double usageRatio = (double) server.getSizeUsed() / maxSize; + double normalizedCost = cost * usageRatio; + + if (server.isProjectedSegment(proposalSegment)) { + normalizedCost *= sourceCostMultiplier; + } - return normalizedCost * usageRatio; + return normalizedCost; } } - diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyConfig.java new file mode 100644 index 000000000000..95680e7e7dd6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.balancer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; + +import javax.annotation.Nullable; + +/** + * Configuration for {@link DiskNormalizedCostBalancerStrategy}. + *

+ * Bound to the prefix + * {@code druid.coordinator.balancer.diskNormalized}. + */ +public class DiskNormalizedCostBalancerStrategyConfig +{ + /** + * Minimum fractional cost reduction required to move a segment off a server + * that is already projected to hold it. For example, a value of {@code 0.05} means the + * destination must be at least 5% cheaper than the source before a move + * fires. + */ + @JsonProperty + private final double moveCostSavingsThreshold; + + public DiskNormalizedCostBalancerStrategyConfig() + { + this(null); + } + + @JsonCreator + public DiskNormalizedCostBalancerStrategyConfig( + @JsonProperty("moveCostSavingsThreshold") @Nullable Double moveCostSavingsThreshold + ) + { + this.moveCostSavingsThreshold = Configs.valueOrDefault(moveCostSavingsThreshold, DiskNormalizedCostBalancerStrategy.DEFAULT_MOVE_COST_SAVINGS_THRESHOLD); + + Preconditions.checkArgument( + this.moveCostSavingsThreshold >= 0.0 && this.moveCostSavingsThreshold < 1.0, + "'druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold'[%s] must be in [0.0, 1.0)", + this.moveCostSavingsThreshold + ); + } + + public double getMoveCostSavingsThreshold() + { + return moveCostSavingsThreshold; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java index 3389f6732e18..2023f7a27481 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java @@ -19,11 +19,32 @@ package org.apache.druid.server.coordinator.balancer; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; + public class DiskNormalizedCostBalancerStrategyFactory extends BalancerStrategyFactory { + private final DiskNormalizedCostBalancerStrategyConfig config; + + public DiskNormalizedCostBalancerStrategyFactory() + { + this(new DiskNormalizedCostBalancerStrategyConfig()); + } + + @JsonCreator + public DiskNormalizedCostBalancerStrategyFactory( + @JacksonInject DiskNormalizedCostBalancerStrategyConfig config + ) + { + this.config = config; + } + @Override public BalancerStrategy createBalancerStrategy(int numBalancerThreads) { - return new DiskNormalizedCostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads)); + return new DiskNormalizedCostBalancerStrategy( + getOrCreateBalancerExecutor(numBalancerThreads), + config.getMoveCostSavingsThreshold() + ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java index 1f0e42efb1a8..f57199c1f48c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java @@ -55,7 +55,7 @@ public static List setupDummyCluster(int serverCount, int maxSegme { List serverHolderList = new ArrayList<>(); // Create 10 servers with current size being 3K & max size being 10K - // Each having having 100 segments + // Each having 100 segments for (int i = 0; i < serverCount; i++) { TestLoadQueuePeon fromPeon = new TestLoadQueuePeon(); @@ -163,4 +163,170 @@ public void testNormalizedCostBalancerSingleThreadStrategy() Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); } + + /** + * Builds a ServerHolder with {@code segmentCount} same-datasource DAY-interval + * segments indexed {@code [baseIndex, baseIndex + segmentCount)}, and + * {@code sizeUsed} bytes used out of {@code maxSize}. + */ + private static ServerHolder buildServer( + String name, + long maxSize, + long sizeUsed, + int baseIndex, + int segmentCount + ) + { + List segments = IntStream.range(baseIndex, baseIndex + segmentCount) + .mapToObj(DiskNormalizedCostBalancerStrategyTest::getSegment) + .collect(Collectors.toList()); + ImmutableDruidDataSource ds = + new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments); + return new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata(name, name, null, maxSize, null, ServerType.HISTORICAL, "hot", 1), + sizeUsed, + ImmutableMap.of("DUMMY", ds), + segments.size() + ), + new TestLoadQueuePeon() + ); + } + + private static BalancerStrategy newCostStrategy() + { + return new CostBalancerStrategy( + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d")) + ); + } + + private static BalancerStrategy newDiskNormalizedStrategy() + { + return new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d")) + ); + } + + @Test + public void testDiskWeightingBeatsRawCost() + { + final long maxSize = 10_000_000L; + // A: 90% usage, 5 overlapping segments -> raw cost ~= 5 * 2K. + final ServerHolder fuller = buildServer("A", maxSize, 9_000_000L, 0, 5); + // B: 10% usage, 30 overlapping segments -> raw cost ~= 30 * 2K. + final ServerHolder emptier = buildServer("B", maxSize, 1_000_000L, 100, 30); + + final DataSegment proposal = getSegment(1000); + final List servers = new ArrayList<>(); + servers.add(fuller); + servers.add(emptier); + + // Pure CostBalancerStrategy picks A (it has the cheapest raw cost). + Assert.assertEquals( + "Pure CostBalancerStrategy should pick the fuller server", + "A", + newCostStrategy().findServersToLoadSegment(proposal, servers).next().getServer().getName() + ); + + // DiskNormalized: A = 10 * 0.9 = 9.0, B = 60 * 0.1 = 6.0. + // The emptier server must win. + Assert.assertEquals( + "DiskNormalizedCostBalancerStrategy must prefer the emptier server", + "B", + newDiskNormalizedStrategy().findServersToLoadSegment(proposal, servers).next().getServer().getName() + ); + } + + @Test + public void testDiskNormalizedFixesSkewThatCostCannotCorrect() + { + final long maxSize = 10_000_000L; + // A: 80% full, 20 same-DS DAY segments (indices 0-19). + final ServerHolder heavy = buildServer("A", maxSize, 8_000_000L, 0, 20); + // B: 20% full, 20 same-DS DAY segments (indices 100-119 — same + // interval/datasource, just different segment ids). + final ServerHolder light = buildServer("B", maxSize, 2_000_000L, 100, 20); + + // The move candidate is one of A's segments. + final DataSegment segmentToMove = getSegment(0); + final List servers = new ArrayList<>(); + servers.add(heavy); + servers.add(light); + + // CostBalancerStrategy: + // A (source, 20 segs, self-cost subtracted): 38 * K + // B (dest, 20 segs, no self-cost): 40 * K + // A is cheaper by 2K, so the cluster stays skewed forever. + Assert.assertNull( + "Pure CostBalancerStrategy cannot correct the disk skew: no move from A to B", + newCostStrategy().findDestinationServerToMoveSegment(segmentToMove, heavy, servers) + ); + + // DiskNormalizedCostBalancerStrategy (default 5% threshold): + // A: 38K * 0.80 * 0.95 = 28.88K + // B: 40K * 0.20 = 8.00K + // B wins decisively and the segment moves, reducing the skew. + final ServerHolder diskNormalizedResult = + newDiskNormalizedStrategy().findDestinationServerToMoveSegment(segmentToMove, heavy, servers); + Assert.assertNotNull( + "DiskNormalized must correct the skew by moving the segment off the heavier server", + diskNormalizedResult + ); + Assert.assertEquals("B", diskNormalizedResult.getServer().getName()); + } + + @Test + public void testThresholdBlocksMarginalMove() + { + final long maxSize = 10_000_000L; + final ServerHolder source = buildServer("SOURCE", maxSize, 8_000_000L, 0, 20); + final ServerHolder dest = buildServer("DEST", maxSize, 7_400_000L, 100, 20); + + final DataSegment segmentToMove = getSegment(0); + final List servers = new ArrayList<>(); + servers.add(source); + servers.add(dest); + + // Default threshold (5%): dest is not cheap enough to justify the move. + Assert.assertNull( + "Default threshold must block a marginal move to prevent ping-ponging", + newDiskNormalizedStrategy().findDestinationServerToMoveSegment(segmentToMove, source, servers) + ); + + // threshold=0 removes the discount; the same marginal difference now + // triggers the move. This proves the threshold is what blocks it above. + final BalancerStrategy noDiscount = new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d")), + 0.01 + ); + final ServerHolder movedTo = noDiscount.findDestinationServerToMoveSegment(segmentToMove, source, servers); + Assert.assertNotNull("With threshold=0.01, the marginal move should fire", movedTo); + Assert.assertEquals("DEST", movedTo.getServer().getName()); + } + + @Test + public void testRejectsInvalidThreshold() + { + try { + new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d")), + 1.0 + ); + Assert.fail("Expected IllegalArgumentException for threshold=1.0"); + } + catch (IllegalArgumentException expected) { + // expected + } + + try { + new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d")), + -0.01 + ); + Assert.fail("Expected IllegalArgumentException for negative threshold"); + } + catch (IllegalArgumentException expected) { + // expected + } + } } diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 848dbd5da303..f8b52fafbdff 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -76,6 +76,7 @@ import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; +import org.apache.druid.server.coordinator.balancer.DiskNormalizedCostBalancerStrategyConfig; import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig; import org.apache.druid.server.coordinator.config.CoordinatorRunConfig; @@ -197,6 +198,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.coordinator.period", CoordinatorPeriodConfig.class); JsonConfigProvider.bind(binder, "druid.coordinator.loadqueuepeon.http", HttpLoadQueuePeonConfig.class); JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class); + JsonConfigProvider.bind(binder, "druid.coordinator.balancer.diskNormalized", DiskNormalizedCostBalancerStrategyConfig.class); JsonConfigProvider.bind(binder, "druid.coordinator.segment", CoordinatorSegmentWatcherConfig.class); JsonConfigProvider.bind(binder, "druid.coordinator.segmentMetadataCache", SegmentMetadataCacheConfig.class); binder.bind(DruidCoordinatorConfig.class);