Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,40 @@

/**
* A {@link BalancerStrategy} which normalizes the cost of placing a segment on a
* server as calculated by {@link CostBalancerStrategy} by multiplying it by the
* server's disk usage ratio.
* server as calculated by {@link CostBalancerStrategy} by dividing by the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Public docs still describe the old formula

The implementation and Javadoc now divide by available headroom, but docs/design/coordinator.md and docs/configuration/index.md still say diskNormalized multiplies cost by diskUsed / maxSize. That leaves user-facing behavior documentation incorrect for this config option.

* server's available disk headroom.
* <pre>
* normalizedCost = cost * usageRatio
* normalizedCost = cost / max(EPSILON, 1 - usageRatio)
* where usageRatio = diskUsed / totalDiskSpace
* </pre>
* This penalizes servers that are more full, driving disk utilization to equalize
* across the tier. When all servers have equal disk usage, the behavior is identical
* to {@link CostBalancerStrategy}. When historicals have different disk capacities,
* this naturally accounts for both fill level and total capacity.
* The denominator diverges as a server approaches full, so disk fullness has
* more weight over the placement decision when servers are nearly full,
* regardless of asymmetries in the locality cost. {@link #EPSILON} is a small
* numerical floor on the divisor to guard against division by zero (or by
* negative values during in-flight loads).
* <p>
* To prevent oscillation when servers have similar utilization, any server that
* To prevent oscillation when servers have similar headroom, any server that
* is already projected to hold the segment (the source on a move, or a currently
* serving node on a drop) receives a cost discount equal to
* {@link #DEFAULT_MOVE_COST_SAVINGS_THRESHOLD}. A move therefore fires only when
* {@link DiskNormalizedCostBalancerStrategyConfig.DEFAULT_MOVE_COST_SAVINGS_THRESHOLD}. A move therefore fires only when
* the destination saves at least this fraction of the source's cost. The default
* is configurable via
* {@code druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold}.
*/
public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
{
/**
* Default minimum fractional cost reduction required before a segment will
* be moved off a server that is already projected to hold it. A value of
* {@code 0.05} means the destination must be at least 5% cheaper than the
* source for the move to happen.
* Numerical floor on the headroom divisor to prevent division by zero or by
* negative values when {@code usageRatio >= 1.0} (possible for over-allocated
* servers or during in-flight loads).
*/
static final double DEFAULT_MOVE_COST_SAVINGS_THRESHOLD = 0.05;
static final double EPSILON = 1e-6;

private final double sourceCostMultiplier;

public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec)
{
this(exec, DEFAULT_MOVE_COST_SAVINGS_THRESHOLD);
this(exec, DiskNormalizedCostBalancerStrategyConfig.DEFAULT_MOVE_COST_SAVINGS_THRESHOLD);
}

public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec, double moveCostSavingsThreshold)
Expand All @@ -85,17 +85,16 @@ protected double computePlacementCost(
return cost;
}

// Guard against NaN propagation in the cost comparator if a server
// somehow reports a non-positive maxSize. Such a server cannot hold
// anything and will be rejected by canLoadSegment, so returning the
// raw cost is safe.
// A server with non-positive maxSize cannot hold anything and will be
// rejected by canLoadSegment; return the raw cost to avoid NaN propagation.
final long maxSize = server.getMaxSize();
if (maxSize <= 0) {
return cost;
}

double usageRatio = (double) server.getSizeUsed() / maxSize;
double normalizedCost = cost * usageRatio;
final double usageRatio = (double) server.getSizeUsed() / maxSize;
final double headroom = Math.max(EPSILON, 1.0 - usageRatio);
double normalizedCost = cost / headroom;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Existing threshold test now fails

Changing normalization to cost / headroom makes the existing testThresholdBlocksMarginalMove scenario choose DEST: source is roughly 38K / 0.20 * 0.95 = 180.5K, while dest is 40K / 0.26 = 153.8K. The test still asserts null, so the server test suite should fail unless the threshold scenario or algorithm is adjusted.


if (server.isProjectedSegment(proposalSegment)) {
normalizedCost *= sourceCostMultiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@
*/
public class DiskNormalizedCostBalancerStrategyConfig
{
/**
* Default minimum fractional cost reduction required before a segment will
* be moved off a server that is already projected to hold it. A value of
* {@code 0.05} means the destination must be at least 5% cheaper than the
* source for the move to happen.
*/
static final double DEFAULT_MOVE_COST_SAVINGS_THRESHOLD = 0.05;

/**
* Minimum fractional cost reduction required to move a segment off a server
* that is already projected to hold it. For example, a value of {@code 0.05} means the
Expand All @@ -53,7 +61,7 @@ public DiskNormalizedCostBalancerStrategyConfig(
@JsonProperty("moveCostSavingsThreshold") @Nullable Double moveCostSavingsThreshold
)
{
this.moveCostSavingsThreshold = Configs.valueOrDefault(moveCostSavingsThreshold, DiskNormalizedCostBalancerStrategy.DEFAULT_MOVE_COST_SAVINGS_THRESHOLD);
this.moveCostSavingsThreshold = Configs.valueOrDefault(moveCostSavingsThreshold, DEFAULT_MOVE_COST_SAVINGS_THRESHOLD);

Preconditions.checkArgument(
this.moveCostSavingsThreshold >= 0.0 && this.moveCostSavingsThreshold < 1.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,63 @@ public void testThresholdBlocksMarginalMove()
Assert.assertEquals("DEST", movedTo.getServer().getName());
}

@Test
public void testNearFullServerIsNotChosenForNewSegmentLoad()
{
final long maxSize = 10_000_000L;
// A: 95% full, 5 same-DS DAY segments -> raw cost = 10 * K (low, few co-located segs)
final ServerHolder nearFull = buildServer("A", maxSize, 9_500_000L, 0, 5);
// B: 70% full, 20 same-DS DAY segments -> raw cost = 40 * K (higher, more co-located)
final ServerHolder partial = buildServer("B", maxSize, 7_000_000L, 100, 20);

final DataSegment newSegment = getSegment(1000);
final List<ServerHolder> servers = new ArrayList<>();
servers.add(nearFull);
servers.add(partial);

// CostBalancerStrategy picks A because raw cost 10K < 40K.
Assert.assertEquals(
"Pure CostBalancerStrategy must pick the near-full server (lower raw cost)",
"A",
newCostStrategy().findServersToLoadSegment(newSegment, servers).next().getServer().getName()
);

// DiskNormalized: A_norm = 10K / 0.05 = 200K, B_norm = 40K / 0.30 = 133K -> B wins.
Assert.assertEquals(
"DiskNormalized must prefer the emptier server despite its higher raw cost",
"B",
newDiskNormalizedStrategy().findServersToLoadSegment(newSegment, servers).next().getServer().getName()
);
}

@Test
public void testNearFullServerIsNotChosenAsMoveDestination()
{
final long maxSize = 10_000_000L;
// SOURCE: 70% full, 20 same-DS DAY segments; segmentToMove is one of them.
final ServerHolder source = buildServer("SOURCE", maxSize, 7_000_000L, 0, 20);
// DEST: 95% full, 5 same-DS DAY segments -> raw cost 10K < SOURCE's 38K.
final ServerHolder nearFullDest = buildServer("DEST", maxSize, 9_500_000L, 100, 5);

final DataSegment segmentToMove = getSegment(0);
final List<ServerHolder> servers = new ArrayList<>();
servers.add(source);
servers.add(nearFullDest);

// CostBalancerStrategy: DEST raw cost (10K) < SOURCE raw cost (38K) -> recommends the move.
final ServerHolder costResult =
newCostStrategy().findDestinationServerToMoveSegment(segmentToMove, source, servers);
Assert.assertNotNull("CostBalancerStrategy must recommend moving to the near-full DEST", costResult);
Assert.assertEquals("DEST", costResult.getServer().getName());

// DiskNormalized: DEST_norm = 10K / 0.05 = 200K > SOURCE_norm = 38K / 0.30 * 0.95 ≈ 120K.
// Near-full DEST is too expensive after normalization -> no move.
Assert.assertNull(
"DiskNormalized must block the move to the near-full server",
newDiskNormalizedStrategy().findDestinationServerToMoveSegment(segmentToMove, source, servers)
);
}

@Test
public void testRejectsInvalidThreshold()
{
Expand Down
Loading