-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat: CBA idle re-modeling and separate scale-up / scale-down task-count boundaries #19378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
e38fbb6
0266cb3
df864db
e367f87
9efa91d
5b61b6d
3f7dcf1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,7 +143,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() | |
| } | ||
| }); | ||
|
|
||
| // These values were carefully handpicked to allow that test to pass in a stable manner. | ||
| // These values were carefully handpicked to allow that test to pass stably. | ||
| final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig | ||
| .builder() | ||
| .enableTaskAutoScaler(true) | ||
|
|
@@ -152,8 +152,8 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() | |
| .taskCountStart(lowInitialTaskCount) | ||
| .scaleActionPeriodMillis(500) | ||
| .minTriggerScaleActionFrequencyMillis(1000) | ||
| .lagWeight(0.2) | ||
| .idleWeight(0.8) | ||
| .lagWeight(0.8) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will be the effect of the change to lag and idle weights?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was passing without any problems in normal circumstances. The main idea of the change is to reduce the potential of not scaling over the timeout due to CI CPU pressure. |
||
| .idleWeight(0.2) | ||
| .build(); | ||
|
|
||
| final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,21 +39,23 @@ | |
| import org.apache.druid.utils.CollectionUtils; | ||
|
|
||
| import javax.annotation.Nullable; | ||
| import java.util.Arrays; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| /** | ||
| * Cost-based auto-scaler for seekable stream supervisors. | ||
| * Uses a weighted cost function combining lag recovery time (seconds) and idleness cost (seconds) | ||
| * Uses a weighted cost function combining lag recovery time and idle-ratio cost | ||
| * to determine optimal task counts. | ||
| * <p> | ||
| * Candidate task counts are derived by scanning a bounded window of partitions-per-task (PPT) values | ||
| * around the current PPT, then converting those to task counts. This allows non-divisor task counts | ||
| * while keeping changes gradual (no large jumps). | ||
| * Candidate task counts are derived from possible partitions-per-task ratios, then converted | ||
| * to task counts. When configured, scale-up and scale-down can independently limit the evaluated | ||
| * candidates to a small window around the current task count to avoid large jumps. | ||
| * <p> | ||
| * Scale-up and scale-down are both evaluated proactively. | ||
| * Future versions may perform scale-down on task rollover only. | ||
| * Scale-up is applied during regular scale-action checks. Scale-down is applied during regular | ||
| * checks unless {@link CostBasedAutoScalerConfig#isScaleDownOnTaskRolloverOnly()} defers it | ||
| * to task rollover. | ||
| */ | ||
| public class CostBasedAutoScaler implements SupervisorTaskAutoScaler | ||
| { | ||
|
|
@@ -64,21 +66,19 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler | |
| public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount"; | ||
| public static final String INVALID_METRICS_COUNT = "task/autoScaler/costBased/invalidMetrics"; | ||
|
|
||
| static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; | ||
| static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; | ||
|
|
||
| /** | ||
| * If average partition lag crosses this value and the processing rate is | ||
| * still zero, scaling actions are skipped and an alert is raised. | ||
| * Maximum number of candidate task counts to evaluate above or below the current task count | ||
| * when scale-up or scale-down boundaries are enabled. | ||
| * <p> | ||
| * The misspelling is preserved to avoid unnecessary churn in this package-private constant. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this comment. The constant is new in this patch. Please fix the spelling. |
||
| */ | ||
| static final int MAX_IDLENESS_PARTITION_LAG = 10_000; | ||
| static final int BOUNARY_LIMIT_IN_PARTITIONS_PER_TASK = 2; | ||
|
|
||
| /** | ||
| * Divisor for partition count in the K formula: K = (partitionCount / K_PARTITION_DIVISOR) / sqrt(currentTaskCount). | ||
| * This controls how aggressive the scaling is relative to partition count. | ||
| * That value was chosen by carefully analyzing the math model behind the implementation. | ||
| * If the average partition lag crosses this value and the processing rate is | ||
| * still zero, scaling actions are skipped and an alert is raised. | ||
| */ | ||
| static final double K_PARTITION_DIVISOR = 6.4; | ||
| static final int MAX_IDLENESS_PARTITION_LAG = 10_000; | ||
|
|
||
| private final String supervisorId; | ||
| private final SeekableStreamSupervisor supervisor; | ||
|
|
@@ -176,7 +176,7 @@ public int computeTaskCountForScaleAction() | |
| // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. | ||
| // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. | ||
| final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() | ||
| || currentTaskCount > config.getTaskCountMax(); | ||
| || currentTaskCount > config.getTaskCountMax(); | ||
| if (isTaskCountOutOfBounds) { | ||
| currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount)); | ||
| } | ||
|
|
@@ -188,16 +188,28 @@ public int computeTaskCountForScaleAction() | |
| // regardless of optimal task count, to get back to a safe state. | ||
| if (isTaskCountOutOfBounds) { | ||
| taskCount = currentTaskCount; | ||
| log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].", | ||
| supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount); | ||
| log.info( | ||
| "Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].", | ||
| supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount | ||
| ); | ||
| } else if (optimalTaskCount > currentTaskCount) { | ||
| taskCount = optimalTaskCount; | ||
| log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); | ||
| log.info( | ||
| "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", | ||
| supervisorId, | ||
| currentTaskCount, | ||
| taskCount | ||
| ); | ||
| } else if (!config.isScaleDownOnTaskRolloverOnly() | ||
| && optimalTaskCount < currentTaskCount | ||
| && optimalTaskCount > 0) { | ||
| taskCount = optimalTaskCount; | ||
| log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount); | ||
| log.info( | ||
| "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", | ||
| supervisorId, | ||
| currentTaskCount, | ||
| taskCount | ||
| ); | ||
| } else { | ||
| taskCount = -1; | ||
| log.debug("No scaling required for supervisor[%s]", supervisorId); | ||
|
|
@@ -237,7 +249,7 @@ public CostBasedAutoScalerConfig getConfig() | |
| * <li>Current task count already optimal</li> | ||
| * </ul> | ||
| * | ||
| * @return optimal task count for scale-up, or -1 if no scaling action needed | ||
| * @return optimal task count, or -1 if no scaling action is needed | ||
| */ | ||
| int computeOptimalTaskCount(CostMetrics metrics) | ||
| { | ||
|
|
@@ -261,11 +273,8 @@ int computeOptimalTaskCount(CostMetrics metrics) | |
| final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts( | ||
| partitionCount, | ||
| currentTaskCount, | ||
| (long) metrics.getAggregateLag(), | ||
| config.getTaskCountMin(), | ||
| config.getTaskCountMax(), | ||
| config.shouldUseTaskCountBoundaries(), | ||
| config.getHighLagThreshold() | ||
| config.getTaskCountMax() | ||
| ); | ||
|
|
||
| if (validTaskCounts.length == 0) { | ||
|
|
@@ -290,9 +299,28 @@ int computeOptimalTaskCount(CostMetrics metrics) | |
| config.getIdleWeight() | ||
| ); | ||
|
|
||
| // Find the task count which reduces cost | ||
| // Find the evaluated task count with the lowest cost. | ||
| final CostResult[] costResults = new CostResult[validTaskCounts.length]; | ||
| for (int i = 0; i < validTaskCounts.length; ++i) { | ||
| Arrays.fill(costResults, CostResult.INFINITE_COST); | ||
|
|
||
| int startIndex = 0; | ||
| int endIndex = validTaskCounts.length - 1; | ||
|
|
||
| if (config.shouldUseTaskCountBoundariesOnScaleUp()) { | ||
| int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount); | ||
| endIndex = currentTaskCountIndex >= 0 | ||
| ? Math.min(currentTaskCountIndex + BOUNARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex) | ||
| : endIndex; | ||
| } | ||
|
|
||
| if (config.shouldUseTaskCountBoundariesOnScaleDown()) { | ||
| int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount); | ||
| startIndex = currentTaskCountIndex >= 0 | ||
| ? Math.max(currentTaskCountIndex - BOUNARY_LIMIT_IN_PARTITIONS_PER_TASK, startIndex) | ||
| : startIndex; | ||
| } | ||
|
|
||
| for (int i = startIndex; i <= endIndex; ++i) { | ||
| final int taskCount = validTaskCounts[i]; | ||
| CostResult costResult = costFunction.computeCost(metrics, taskCount, config); | ||
| double cost = costResult.totalCost(); | ||
|
|
@@ -317,57 +345,33 @@ int computeOptimalTaskCount(CostMetrics metrics) | |
| ); | ||
| } | ||
|
|
||
| // Scale up is performed eagerly. | ||
| // Scale-up is applied eagerly; scale-down may be deferred by computeTaskCountForScaleAction(). | ||
| return optimalTaskCount; | ||
| } | ||
|
|
||
| /** | ||
| * Generates valid task counts based on partitions-per-task ratios. | ||
| * Generates valid task counts by converting every possible partitions-per-task ratio | ||
| * into a task count and filtering by configured min/max task count bounds. | ||
| * | ||
| * @return sorted list of valid task counts within bounds | ||
| * @return list of valid task counts within bounds | ||
| */ | ||
| @SuppressWarnings({"ReassignedVariable"}) | ||
| static int[] computeValidTaskCounts( | ||
| int partitionCount, | ||
| int currentTaskCount, | ||
| double aggregateLag, | ||
| int taskCountMin, | ||
| int taskCountMax, | ||
| boolean isTaskCountBoundariesEnabled, | ||
| int highLagThreshold | ||
| int taskCountMax | ||
| ) | ||
| { | ||
| if (partitionCount <= 0 || currentTaskCount <= 0) { | ||
| return new int[]{}; | ||
| } | ||
|
|
||
| IntSet result = new IntArraySet(); | ||
| final int currentPartitionsPerTask = partitionCount / currentTaskCount; | ||
|
|
||
| // Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa. | ||
| int minPartitionsPerTask = Math.min(1, partitionCount / taskCountMax); | ||
| int maxPartitionsPerTask = Math.max(partitionCount, partitionCount / taskCountMin); | ||
|
|
||
| if (isTaskCountBoundariesEnabled) { | ||
| maxPartitionsPerTask = Math.min( | ||
| partitionCount, | ||
| currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK | ||
| ); | ||
|
|
||
| int extraIncrease = 0; | ||
| if (highLagThreshold > 0) { | ||
| extraIncrease = computeExtraPPTIncrease( | ||
| highLagThreshold, | ||
| aggregateLag, | ||
| partitionCount, | ||
| currentTaskCount, | ||
| taskCountMax | ||
| ); | ||
| } | ||
| int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + extraIncrease; | ||
| minPartitionsPerTask = Math.max(minPartitionsPerTask, currentPartitionsPerTask - effectiveMaxIncrease); | ||
| } | ||
|
|
||
| for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask | ||
| && partitionsPerTask != 0; partitionsPerTask--) { | ||
| final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask; | ||
|
|
@@ -378,50 +382,6 @@ static int[] computeValidTaskCounts( | |
| return result.toIntArray(); | ||
| } | ||
|
|
||
| /** | ||
| * Computes extra allowed increase in partitions-per-task in scenarios when the average per-partition lag | ||
| * is above the configured threshold. | ||
| * <p> | ||
| * This uses a logarithmic formula for consistent absolute growth: | ||
| * {@code deltaTasks = K * ln(lagSeverity)} | ||
| * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)} | ||
| * <p> | ||
| * This ensures that small taskCount's get a massive relative boost, | ||
| * while large taskCount's receive more measured, stable increases. | ||
| */ | ||
| static int computeExtraPPTIncrease( | ||
| double lagThreshold, | ||
| double aggregateLag, | ||
| int partitionCount, | ||
| int currentTaskCount, | ||
| int taskCountMax | ||
| ) | ||
| { | ||
| if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) { | ||
| return 0; | ||
| } | ||
|
|
||
| final double lagPerPartition = aggregateLag / partitionCount; | ||
| if (lagPerPartition < lagThreshold) { | ||
| return 0; | ||
| } | ||
|
|
||
| final double lagSeverity = lagPerPartition / lagThreshold; | ||
|
|
||
| // Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1 | ||
| // First multoplier decreases with sqrt(currentTaskCount): aggressive when small, conservative when large | ||
| final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) / Math.sqrt(currentTaskCount) * Math.log( | ||
| lagSeverity); | ||
|
|
||
| final double targetTaskCount = Math.min(taskCountMax, (double) currentTaskCount + deltaTasks); | ||
|
|
||
| // Compute precise PPT reduction to avoid early integer truncation artifacts | ||
| final double currentPPT = (double) partitionCount / currentTaskCount; | ||
| final double targetPPT = (double) partitionCount / targetTaskCount; | ||
|
|
||
| return Math.max(0, (int) Math.floor(currentPPT - targetPPT)); | ||
| } | ||
|
|
||
| /** | ||
| * Extracts the average poll-idle-ratio metric from task stats. | ||
| * This metric indicates how much time the consumer spends idle waiting for data. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really in progress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to verify if anybody have a Kinesis workload with CBA working. If you want, we can remove that part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather remove it. My feeling is the documentation should reference things that exist currently rather than speculating about future development.