Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZooKeeper interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|`PT300S`|
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`|
|`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`|
|`druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold`|Only used when `druid.coordinator.balancer.strategy` is `diskNormalized`. Minimum fractional cost reduction required before a segment is moved off a server that already holds it. A value of `0.05` requires the destination to be at least 5% cheaper than the source, which prevents oscillation between servers with similar disk utilization. Must be in `[0.0, 1.0)`; `0.0` disables the anti-oscillation discount.|`0.05`|
|`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute|
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than or equal to the `druid.segmentCache.numLoadingThreads` config on Historical service. If this value is not configured, the coordinator uses the value of the `numLoadingThreads` for the respective server. | `druid.segmentCache.numLoadingThreads` |
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false|
Expand Down
2 changes: 1 addition & 1 deletion docs/design/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ But in a tier with several Historicals (or a low replication factor), segment re
Thus, the Coordinator constantly monitors the set of segments present on each Historical in a tier and employs one of the following strategies to identify segments that may be moved from one Historical to another to retain balance.

- `cost` (default): For a given segment in a tier, this strategy picks the server with the minimum "cost" of placing that segment. The cost is a function of the data interval of the segment and the data intervals of all the segments already present on the candidate server. In essence, this strategy tries to avoid placing segments with adjacent or overlapping data intervals on the same server. This is based on the premise that adjacent-interval segments are more likely to be used together in a query and placing them on the same server may lead to skewed CPU usages of Historicals.
- `diskNormalized`: A derivative of the `cost` strategy that weights the cost of placing a segment on a server with the disk usage ratio of the server. There are known issues with this strategy and is not recommended for a production cluster.
- `diskNormalized`: A derivative of the `cost` strategy that multiplies the cost of placing a segment on a server by the server's disk usage ratio (`diskUsed / maxSize`). This penalizes fuller servers and drives disk utilization to equalize across the tier, which is useful when historicals within a tier hold segments of widely varying sizes. To prevent oscillation when servers have similar utilization, a segment that is already placed on a server receives a cost discount; a move only fires when the destination saves at least `druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold` (default `0.05`, i.e. 5%) of the source's cost.
- `random`: Distributes segments randomly across servers. This is an experimental strategy and is not recommended for a production cluster.

All of the above strategies prioritize moving segments from the Historical with the least available disk space.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,58 @@

package org.apache.druid.server.coordinator.balancer;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;

/**
* A {@link BalancerStrategy} which can be used when historicals in a tier have
* varying disk capacities. This strategy normalizes the cost of placing a segment on
* a server as calculated by {@link CostBalancerStrategy} by doing the following:
* <ul>
* <li>Divide the cost by the number of segments on the server. This ensures that
* cost does not increase just because the number of segments on a server is higher.</li>
* <li>Multiply the resulting value by disk usage ratio. This ensures that all
* hosts have equivalent levels of percentage disk utilization.</li>
* </ul>
* i.e. to place a segment on a given server
* A {@link BalancerStrategy} which normalizes the cost of placing a segment on a
* server as calculated by {@link CostBalancerStrategy} by multiplying it by the
* server's disk usage ratio.
* <pre>
* cost = as computed by CostBalancerStrategy
* normalizedCost = (cost / numSegments) * usageRatio
* = (cost / numSegments) * (diskUsed / totalDiskSpace)
* normalizedCost = cost * usageRatio
* where usageRatio = diskUsed / totalDiskSpace
* </pre>
* This penalizes servers that are more full, driving disk utilization to equalize
* across the tier. When all servers have equal disk usage, the behavior is identical
* to {@link CostBalancerStrategy}. When historicals have different disk capacities,
* this naturally accounts for both fill level and total capacity.
* <p>
* To prevent oscillation when servers have similar utilization, any server that
* is already projected to hold the segment (the source on a move, or a currently
* serving node on a drop) receives a cost discount equal to
* {@link #DEFAULT_MOVE_COST_SAVINGS_THRESHOLD}. A move therefore fires only when
* the destination saves at least this fraction of the source's cost. The default
* is configurable via
* {@code druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold}.
*/
public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
{
/**
* Default minimum fractional cost reduction required before a segment will
* be moved off a server that is already projected to hold it. A value of
* {@code 0.05} means the destination must be at least 5% cheaper than the
* source for the move to happen.
*/
static final double DEFAULT_MOVE_COST_SAVINGS_THRESHOLD = 0.05;

private final double sourceCostMultiplier;

public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec)
{
this(exec, DEFAULT_MOVE_COST_SAVINGS_THRESHOLD);
}

public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec, double moveCostSavingsThreshold)
{
super(exec);
Preconditions.checkArgument(
moveCostSavingsThreshold >= 0.0 && moveCostSavingsThreshold < 1.0,
"moveCostSavingsThreshold[%s] must be in [0.0, 1.0)",
moveCostSavingsThreshold
);
this.sourceCostMultiplier = 1.0 - moveCostSavingsThreshold;
}

@Override
Expand All @@ -59,15 +85,22 @@ protected double computePlacementCost(
return cost;
}

int nSegments = 1;
if (server.getServer().getNumSegments() > 0) {
nSegments = server.getServer().getNumSegments();
// Guard against NaN propagation in the cost comparator if a server
// somehow reports a non-positive maxSize. Such a server cannot hold
// anything and will be rejected by canLoadSegment, so returning the
// raw cost is safe.
final long maxSize = server.getMaxSize();
if (maxSize <= 0) {
return cost;
}

double normalizedCost = cost / nSegments;
double usageRatio = (double) server.getSizeUsed() / (double) server.getServer().getMaxSize();
double usageRatio = (double) server.getSizeUsed() / maxSize;
double normalizedCost = cost * usageRatio;

if (server.isProjectedSegment(proposalSegment)) {
normalizedCost *= sourceCostMultiplier;
}

return normalizedCost * usageRatio;
return normalizedCost;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.balancer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;

import javax.annotation.Nullable;

/**
* Configuration for {@link DiskNormalizedCostBalancerStrategy}.
* <p>
* Bound to the prefix
* {@code druid.coordinator.balancer.diskNormalized}.
*/
public class DiskNormalizedCostBalancerStrategyConfig
{
/**
* Minimum fractional cost reduction required to move a segment off a server
* that is already projected to hold it. For example, a value of {@code 0.05} means the
* destination must be at least 5% cheaper than the source before a move
* fires.
*/
@JsonProperty
private final double moveCostSavingsThreshold;

public DiskNormalizedCostBalancerStrategyConfig()
{
this(null);
}

@JsonCreator
public DiskNormalizedCostBalancerStrategyConfig(
@JsonProperty("moveCostSavingsThreshold") @Nullable Double moveCostSavingsThreshold
)
{
this.moveCostSavingsThreshold = Configs.valueOrDefault(moveCostSavingsThreshold, DiskNormalizedCostBalancerStrategy.DEFAULT_MOVE_COST_SAVINGS_THRESHOLD);

Preconditions.checkArgument(
this.moveCostSavingsThreshold >= 0.0 && this.moveCostSavingsThreshold < 1.0,
"'druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold'[%s] must be in [0.0, 1.0)",
this.moveCostSavingsThreshold
);
}

public double getMoveCostSavingsThreshold()
{
return moveCostSavingsThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,32 @@

package org.apache.druid.server.coordinator.balancer;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;

public class DiskNormalizedCostBalancerStrategyFactory extends BalancerStrategyFactory
{
private final DiskNormalizedCostBalancerStrategyConfig config;

public DiskNormalizedCostBalancerStrategyFactory()
{
this(new DiskNormalizedCostBalancerStrategyConfig());
}

@JsonCreator
public DiskNormalizedCostBalancerStrategyFactory(
@JacksonInject DiskNormalizedCostBalancerStrategyConfig config
)
{
this.config = config;
}

@Override
public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{
return new DiskNormalizedCostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
return new DiskNormalizedCostBalancerStrategy(
getOrCreateBalancerExecutor(numBalancerThreads),
config.getMoveCostSavingsThreshold()
);
}
}
Loading
Loading