diff --git a/docs/ingestion/kafka-ingestion.md b/docs/ingestion/kafka-ingestion.md index 2326b59d99dd..5216a84b4e42 100644 --- a/docs/ingestion/kafka-ingestion.md +++ b/docs/ingestion/kafka-ingestion.md @@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle configuration enabled: "enableTaskAutoScaler": true, "taskCountMax": 6, "taskCountMin": 2, - "minTriggerScaleActionFrequencyMillis": 600000, + "minScaleUpDelay": "PT10M", + "minScaleDownDelay": "PT10M", "autoScalerStrategy": "lagBased", "lagCollectionIntervalMillis": 30000, "lagCollectionRangeMillis": 600000, diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index 592f0328cbe9..ba92ed66b536 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -79,7 +79,9 @@ The following table outlines the configuration properties for `autoScalerConfig` |`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka partitions or Kinesis shards, Druid sets the maximum number of reading tasks to the number of Kafka partitions or Kinesis shards and ignores `taskCountMax`.|Yes|| |`taskCountMin`|The minimum number of ingestion tasks. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|Yes|| |`taskCountStart`|Optional config to specify the number of ingestion tasks to start with. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|No|`taskCount` or `taskCountMin`| -|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two scale actions.| No|600000| +|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No|| +|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No|| +|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` and `minScaleDownDelay` instead. Minimum time interval in milliseconds between scale actions, used as the fallback when the Duration-based fields are not set.|No|600000| |`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`| |`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in steady state to be proportional to the number of tasks currently running.|No|| @@ -161,7 +163,8 @@ The following example shows a supervisor spec with `lagBased` autoscaler: "enableTaskAutoScaler": true, "taskCountMax": 6, "taskCountMin": 2, - "minTriggerScaleActionFrequencyMillis": 600000, + "minScaleUpDelay": "PT10M", + "minScaleDownDelay": "PT10M", "autoScalerStrategy": "lagBased", "lagCollectionIntervalMillis": 30000, "lagCollectionRangeMillis": 600000, @@ -210,10 +213,11 @@ The following table outlines the configuration properties related to the `costBa |`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 | |`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` | |`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1| -|`minScaleDownDelay`|Minimum duration between successful scale actions, specified as an ISO-8601 duration string.|No|`PT30M`| +|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string.|No|| +|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`| |`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`| -The following example shows a supervisor spec with `lagBased` autoscaler: +The following example shows a supervisor spec with `costBased` autoscaler:
Click to view the example @@ -227,9 +231,10 @@ The following example shows a supervisor spec with `lagBased` autoscaler: "autoScalerStrategy": "costBased", "taskCountMin": 1, "taskCountMax": 10, - "minTriggerScaleActionFrequencyMillis": 600000, + "minScaleUpDelay": "PT10M", + "minScaleDownDelay": "PT30M", "lagWeight": 0.1, - "idleWeight": 0.9, + "idleWeight": 0.9 } } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 6295d41937e8..289d3c989f3c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -315,6 +315,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException autoScalerConfig.put("scaleInStep", 1); autoScalerConfig.put("scaleOutStep", 2); autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + autoScalerConfig.put("minScaleUpDelay", "PT20M"); + autoScalerConfig.put("minScaleDownDelay", "PT20M"); final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); consumerProperties.put("bootstrap.servers", "localhost:8082"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 58d366d2306e..f857cfaa436b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -64,6 +64,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; @@ -499,11 +500,34 @@ public void handle() supervisorId, dataSource ); - final Integer desiredTaskCount = computeDesiredTaskCount.call(); + final int desiredTaskCount = computeDesiredTaskCount.call(); + final int currentTaskCount = getCurrentTaskCount(); + + if (desiredTaskCount <= 0) { + return; + } + ServiceMetricEvent.Builder event = ServiceMetricEvent.builder() .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) .setDimension(DruidMetrics.DATASOURCE, dataSource) .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); + + // 1) This should already be handled by the auto-scaler implementation, but make sure we catch/record these for auditability + if (desiredTaskCount == currentTaskCount) { + log.warn( + "Skipping scaling for supervisor[%s] for dataSource[%s]: already at desired task count [%d]", + supervisorId, + dataSource, + desiredTaskCount + ); + emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "desired capacity reached") + .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); + return; + } + + // 2) Make sure we wait for any pending completion tasks to finish. + // At this point there could be 3 generations of tasks: pending completion tasks (old generation), running tasks (current generation), and (after our scale) pending tasks (new generation). + // We want to avoid killing any old generation tasks preemptively, as that might cause the current generation tasks' offsets to become invalid. for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) { // There are expected to be pending tasks if this scaling is happening on task rollover if (!list.isEmpty() && !isScalingTasksOnRollover.get()) { @@ -513,45 +537,59 @@ public void handle() dataSource, list ); - if (desiredTaskCount > 0) { - emitter.emit(event.setDimension( - AUTOSCALER_SKIP_REASON_DIMENSION, - "There are tasks pending completion" - ) - .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); - } + emitter.emit(event.setDimension( + AUTOSCALER_SKIP_REASON_DIMENSION, + "There are tasks pending completion" + ) + .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); return; } } - if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) { + + // 3) Make sure we are not breaching any scaling cooldown limits. + // Scaling operations are disruptive — scale-down in particular can leave the supervisor + // under-resourced while it recovers from lag induced by the scale event, so callers may + // configure a longer cooldown for scale-down than for scale-up. Both directions are measured against the same + // last-scale timestamp so that a rapid up/down oscillation is still subject to the appropriate cooldown, + // regardless of which direction triggered last. + final ScaleDirection scaleDirection; + final long cooldownMillis; + + if (desiredTaskCount > currentTaskCount) { + scaleDirection = ScaleDirection.SCALE_UP; + cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis(); + } else { // desiredTaskCount < currentTaskCount + scaleDirection = ScaleDirection.SCALE_DOWN; + cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis(); + } + + if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) { log.info( - "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%s], active task count is [%s]", - nowTime - dynamicTriggerLastRunTime, - autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), + "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]", + nowTime - dynamicTriggerLastScaleRunTime, + scaleDirection, + cooldownMillis, supervisorId, dataSource, desiredTaskCount, - getActiveTaskGroupsCount() + currentTaskCount ); - if (desiredTaskCount > 0) { - emitter.emit(event.setDimension( - AUTOSCALER_SKIP_REASON_DIMENSION, - "minTriggerScaleActionFrequencyMillis not elapsed yet" - ) - .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); - } + emitter.emit(event.setDimension( + AUTOSCALER_SKIP_REASON_DIMENSION, + "Scale cooldown not elapsed yet" + ) + .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); return; } - if (desiredTaskCount > 0) { - emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); - } + // At this point, we can reasonably attempt a scaling action, so emit our required task count + emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); boolean allocationSuccess = changeTaskCount(desiredTaskCount); if (allocationSuccess) { onSuccessfulScale.run(); - dynamicTriggerLastRunTime = nowTime; + dynamicTriggerLastScaleRunTime = nowTime; } } catch (Exception ex) { @@ -586,8 +624,9 @@ public String getType() * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor. * * @param desiredActiveTaskCount desired taskCount computed from AutoScaler - * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'. - * If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis. + * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least the configured + * 'minScaleUpDelay' or 'minScaleDownDelay' (whichever matches the direction of the next scale) before the + * next 'changeTaskCount'. If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis. * @throws InterruptedException * @throws ExecutionException */ @@ -958,7 +997,7 @@ public String getType() private final boolean useExclusiveStartingSequence; private boolean listenerRegistered = false; private long lastRunTime; - private long dynamicTriggerLastRunTime; + private long dynamicTriggerLastScaleRunTime; private int initRetryCounter = 0; private volatile DateTime firstRunTime; private volatile DateTime earlyStopTime = null; @@ -1424,6 +1463,16 @@ public Runnable buildDynamicAllocationTask( return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter)); } + @VisibleForTesting + void handleDynamicAllocationTasksNotice( + Callable scaleAction, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) + { + new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter).handle(); + } + private Runnable buildRunTask() { return () -> addNotice(new RunNotice()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java index 6d3b2a0daa02..f6384d2a64e5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; @UnstableApi @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class) @@ -37,7 +38,28 @@ public interface AutoScalerConfig { boolean getEnableTaskAutoScaler(); + + /** + * @deprecated Use {@link #getMinScaleUpDelay()} and {@link #getMinScaleDownDelay()} instead. + * This field is retained for backward compatibility and will be removed in a future version. + */ + @Deprecated long getMinTriggerScaleActionFrequencyMillis(); + + /** + * Minimum time that must elapse after any scale action before a scale-up is permitted. + * If not explicitly configured, implementations fall back to + * {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility. + */ + Duration getMinScaleUpDelay(); + + /** + * Minimum time that must elapse after any scale action before a scale-down is permitted. + * If not explicitly configured, implementations fall back to + * {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility. + */ + Duration getMinScaleDownDelay(); + int getTaskCountMax(); int getTaskCountMin(); Integer getTaskCountStart(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index 21614c90c8c3..fa7db4f6928c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -90,8 +89,6 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler private final WeightedCostFunction costFunction; private volatile CostMetrics lastKnownMetrics; - private volatile long lastScaleActionTimeMillis = -1; - public CostBasedAutoScaler( SeekableStreamSupervisor supervisor, CostBasedAutoScalerConfig config, @@ -189,21 +186,17 @@ public int computeTaskCountForScaleAction() // If task count is out of bounds, scale to the configured boundary // regardless of optimal task count, to get back to a safe state. - if (isScaleActionAllowed() && isTaskCountOutOfBounds) { + if (isTaskCountOutOfBounds) { taskCount = currentTaskCount; - lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].", supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount); - } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { + } else if (optimalTaskCount > currentTaskCount) { taskCount = optimalTaskCount; - lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); } else if (!config.isScaleDownOnTaskRolloverOnly() - && isScaleActionAllowed() && optimalTaskCount < currentTaskCount && optimalTaskCount > 0) { taskCount = optimalTaskCount; - lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount); } else { taskCount = -1; @@ -594,22 +587,4 @@ private Either validateMetricsForScaling(CostMetrics metrics) } } - /** - * Determines if a scale action is currently allowed based on the elapsed time - * since the last scale action and the configured minimum scale-down delay. - */ - private boolean isScaleActionAllowed() - { - if (lastScaleActionTimeMillis < 0) { - return true; - } - - final long barrierMillis = config.getMinScaleDownDelay().getMillis(); - if (barrierMillis <= 0) { - return true; - } - - final long elapsedMillis = DateTimes.nowUtc().getMillis() - lastScaleActionTimeMillis; - return elapsedMillis >= barrierMillis; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java index 025da7878519..b19a3e2cbbe8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java @@ -45,7 +45,6 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig { static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 minutes - static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60 * 1000; // 5 minutes static final double DEFAULT_LAG_WEIGHT = 0.25; static final double DEFAULT_IDLE_WEIGHT = 0.75; static final Duration DEFAULT_MIN_SCALE_DELAY = Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3); @@ -62,6 +61,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig private final double idleWeight; private final boolean useTaskCountBoundaries; private final int highLagThreshold; + private final Duration minScaleUpDelay; private final Duration minScaleDownDelay; private final boolean scaleDownDuringTaskRolloverOnly; @@ -78,6 +78,7 @@ public CostBasedAutoScalerConfig( @Nullable @JsonProperty("idleWeight") Double idleWeight, @Nullable @JsonProperty("useTaskCountBoundaries") Boolean useTaskCountBoundaries, @Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold, + @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay, @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay, @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly ) @@ -90,7 +91,7 @@ public CostBasedAutoScalerConfig( : DEFAULT_SCALE_ACTION_PERIOD_MILLIS; this.minTriggerScaleActionFrequencyMillis = Configs.valueOrDefault( minTriggerScaleActionFrequencyMillis, - DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS + DEFAULT_SCALE_ACTION_PERIOD_MILLIS ); // Cost function weights with defaults @@ -98,6 +99,7 @@ public CostBasedAutoScalerConfig( this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT); this.useTaskCountBoundaries = Configs.valueOrDefault(useTaskCountBoundaries, false); this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1); + this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, Duration.millis(this.minTriggerScaleActionFrequencyMillis)); this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY); this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false); @@ -125,7 +127,8 @@ public CostBasedAutoScalerConfig( Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0"); Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 0"); - Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, "minScaleDownDelay must be >= 0"); + Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0, "minScaleUpDelay must be a duration >= 0 millis"); + Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, "minScaleDownDelay must be a duration >= 0 millis"); } /** @@ -165,6 +168,7 @@ public Integer getTaskCountStart() return taskCountStart; } + @Deprecated @Override @JsonProperty public long getMinTriggerScaleActionFrequencyMillis() @@ -217,10 +221,19 @@ public int getHighLagThreshold() } /** - * Represents the minimum duration between successful scale actions. - * A higher value implies a more conservative scaling behavior, ensuring that tasks - * are not scaled too frequently during workload fluctuations. + * Returns the minimum delay before a scale-up action is allowed after any previous scale action. */ + @Override + @JsonProperty + public Duration getMinScaleUpDelay() + { + return minScaleUpDelay; + } + + /** + * Returns the minimum delay before a scale-down action is allowed after any previous scale action. + */ + @Override @JsonProperty public Duration getMinScaleDownDelay() { @@ -263,6 +276,7 @@ public boolean equals(Object o) && Double.compare(that.lagWeight, lagWeight) == 0 && Double.compare(that.idleWeight, idleWeight) == 0 && useTaskCountBoundaries == that.useTaskCountBoundaries + && Objects.equals(minScaleUpDelay, that.minScaleUpDelay) && Objects.equals(minScaleDownDelay, that.minScaleDownDelay) && scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly && Objects.equals(taskCountStart, that.taskCountStart) @@ -285,6 +299,7 @@ public int hashCode() idleWeight, useTaskCountBoundaries, highLagThreshold, + minScaleUpDelay, minScaleDownDelay, scaleDownDuringTaskRolloverOnly ); @@ -305,6 +320,7 @@ public String toString() ", idleWeight=" + idleWeight + ", useTaskCountBoundaries=" + useTaskCountBoundaries + ", highLagThreshold=" + highLagThreshold + + ", minScaleUpDelay=" + minScaleUpDelay + ", minScaleDownDelay=" + minScaleDownDelay + ", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly + '}'; @@ -327,6 +343,7 @@ public static class Builder private Double idleWeight; private Boolean useTaskCountBoundaries; private Integer highLagThreshold; + private Duration minScaleUpDelay; private Duration minScaleDownDelay; private Boolean scaleDownDuringTaskRolloverOnly; @@ -388,6 +405,12 @@ public Builder idleWeight(double idleWeight) return this; } + public Builder minScaleUpDelay(Duration minScaleUpDelay) + { + this.minScaleUpDelay = minScaleUpDelay; + return this; + } + public Builder minScaleDownDelay(Duration minScaleDownDelay) { this.minScaleDownDelay = minScaleDownDelay; @@ -426,6 +449,7 @@ public CostBasedAutoScalerConfig build() idleWeight, useTaskCountBoundaries, highLagThreshold, + minScaleUpDelay, minScaleDownDelay, scaleDownDuringTaskRolloverOnly ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index c53ac0e379c0..1ec08384a40d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -23,12 +23,14 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Objects; @@ -51,6 +53,8 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig private final int scaleOutStep; private final boolean enableTaskAutoScaler; private final long minTriggerScaleActionFrequencyMillis; + private final Duration minScaleUpDelay; + private final Duration minScaleDownDelay; private final AggregateFunction lagAggregate; private final Double stopTaskCountRatio; @@ -71,6 +75,8 @@ public LagBasedAutoScalerConfig( @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep, @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler, @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis, + @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay, + @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay, @Nullable @JsonProperty("lagAggregate") AggregateFunction lagAggregate, @Nullable @JsonProperty("stopTaskCountRatio") Double stopTaskCountRatio ) @@ -105,11 +111,21 @@ public LagBasedAutoScalerConfig( this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2; this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000; + this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, Duration.millis(this.minTriggerScaleActionFrequencyMillis)); + this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, Duration.millis(this.minTriggerScaleActionFrequencyMillis)); Preconditions.checkArgument( stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0), "0.0 < stopTaskCountRatio <= 1.0" ); + Preconditions.checkArgument( + this.minScaleUpDelay.getMillis() >= 0, + "minScaleUpDelay must be a duration >= 0 millis" + ); + Preconditions.checkArgument( + this.minScaleDownDelay.getMillis() >= 0, + "minScaleDownDelay must be a duration >= 0 millis" + ); this.stopTaskCountRatio = stopTaskCountRatio; } @@ -213,6 +229,7 @@ public boolean getEnableTaskAutoScaler() return enableTaskAutoScaler; } + @Deprecated @Override @JsonProperty public long getMinTriggerScaleActionFrequencyMillis() @@ -220,6 +237,20 @@ public long getMinTriggerScaleActionFrequencyMillis() return minTriggerScaleActionFrequencyMillis; } + @Override + @JsonProperty + public Duration getMinScaleUpDelay() + { + return minScaleUpDelay; + } + + @Override + @JsonProperty + public Duration getMinScaleDownDelay() + { + return minScaleDownDelay; + } + @JsonProperty @Nullable public AggregateFunction getLagAggregate() @@ -244,8 +275,10 @@ public String toString() ", taskCountMin=" + taskCountMin + ", taskCountStart=" + taskCountStart + ", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis + + ", minScaleUpDelay=" + minScaleUpDelay + + ", minScaleDownDelay=" + minScaleDownDelay + ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis + - ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis + + ", lagCollectionRangeMillis=" + lagCollectionRangeMillis + ", scaleOutThreshold=" + scaleOutThreshold + ", triggerScaleOutFractionThreshold=" + triggerScaleOutFractionThreshold + ", scaleInThreshold=" + scaleInThreshold + @@ -286,6 +319,8 @@ public boolean equals(Object o) scaleOutStep == that.scaleOutStep && enableTaskAutoScaler == that.enableTaskAutoScaler && minTriggerScaleActionFrequencyMillis == that.minTriggerScaleActionFrequencyMillis && + Objects.equals(minScaleUpDelay, that.minScaleUpDelay) && + Objects.equals(minScaleDownDelay, that.minScaleDownDelay) && Objects.equals(taskCountStart, that.taskCountStart) && lagAggregate == that.lagAggregate && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio); @@ -310,6 +345,8 @@ public int hashCode() scaleOutStep, enableTaskAutoScaler, minTriggerScaleActionFrequencyMillis, + minScaleUpDelay, + minScaleDownDelay, lagAggregate, stopTaskCountRatio ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java index 93653f28fe98..db9bdb3d0c87 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java @@ -21,12 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Test; public class LagBasedAutoScalerConfigTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); @Test public void testDefaults() @@ -48,6 +50,8 @@ public void testDefaults() null, null, null, + null, + null, null ); @@ -63,6 +67,9 @@ public void testDefaults() Assert.assertEquals(1, config.getScaleInStep()); Assert.assertEquals(2, config.getScaleOutStep()); Assert.assertEquals(600000, config.getMinTriggerScaleActionFrequencyMillis()); + // When minScaleUpDelay/minScaleDownDelay are not set, they fall back to minTriggerScaleActionFrequencyMillis + Assert.assertEquals(Duration.millis(600000), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(600000), config.getMinScaleDownDelay()); Assert.assertNull(config.getLagAggregate()); Assert.assertNull(config.getStopTaskCountRatio()); Assert.assertEquals(0, config.getTaskCountMax()); @@ -88,7 +95,9 @@ public void testSerde() throws Exception 1, 5, true, - 5000L, + null, + Duration.millis(3000), + Duration.millis(7000), AggregateFunction.SUM, 0.1 ); @@ -125,6 +134,8 @@ public void testEnabledTaskCountChecks() true, null, null, + null, + null, null ) ); @@ -151,6 +162,8 @@ public void testEnabledTaskCountChecks() true, null, null, + null, + null, null ) ); @@ -177,6 +190,8 @@ public void testEnabledTaskCountChecks() true, null, null, + null, + null, null ) ); @@ -206,6 +221,8 @@ public void testStopTaskCountRatioBounds() true, null, null, + null, + null, 0.0 ) ); @@ -230,6 +247,8 @@ public void testStopTaskCountRatioBounds() true, null, null, + null, + null, -0.1 ) ); @@ -255,6 +274,8 @@ public void testStopTaskCountRatioBounds() true, null, null, + null, + null, 1.1 ) ); @@ -278,11 +299,256 @@ public void testStopTaskCountRatioBounds() true, null, null, + null, + null, 0.5 ); Assert.assertEquals(Double.valueOf(0.5), config.getStopTaskCountRatio()); } + @Test + public void testScaleDelayFallback() throws Exception + { + // Neither minScaleUpDelay nor minScaleDownDelay set: both fall back to minTriggerScaleActionFrequencyMillis + LagBasedAutoScalerConfig baseOnly = new LagBasedAutoScalerConfig( + null, + null, + null, + null, + null, + null, + null, + null, + 10, + null, + 1, + null, + null, + false, + 60000L, + null, + null, + null, + null + ); + Assert.assertEquals(60000L, baseOnly.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(60000), baseOnly.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(60000), baseOnly.getMinScaleDownDelay()); + + // Only minScaleUpDelay set + LagBasedAutoScalerConfig upOnly = new LagBasedAutoScalerConfig( + null, + null, + null, + null, + null, + null, + null, + null, + 10, + null, + 1, + null, + null, + false, + 60000L, + Duration.millis(15000), + null, + null, + null + ); + Assert.assertEquals(Duration.millis(15000), upOnly.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(60000), upOnly.getMinScaleDownDelay()); + + // Only minScaleDownDelay set + LagBasedAutoScalerConfig downOnly = new LagBasedAutoScalerConfig( + null, + null, + null, + null, + null, + null, + null, + null, + 10, + null, + 1, + null, + null, + false, + 60000L, + null, + Duration.millis(30000), + null, + null + ); + Assert.assertEquals(Duration.millis(60000), downOnly.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(30000), downOnly.getMinScaleDownDelay()); + + // Both set: serde roundtrip preserves values + LagBasedAutoScalerConfig bothSet = new LagBasedAutoScalerConfig( + null, + null, + null, + null, + null, + null, + null, + null, + 10, + null, + 1, + null, + null, + false, + null, + Duration.millis(15000), + Duration.millis(30000), + null, + null + ); + Assert.assertEquals(Duration.millis(15000), bothSet.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(30000), bothSet.getMinScaleDownDelay()); + LagBasedAutoScalerConfig roundTripped = OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(bothSet), + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(bothSet, roundTripped); + } + + @Test + @SuppressWarnings("deprecation") + public void testScaleDelayFallbackViaSerde() throws Exception + { + // JSON with only minTriggerScaleActionFrequencyMillis (no Duration fields): + // both getMinScaleUpDelay() and getMinScaleDownDelay() should fall back to it. + String json = "{\"taskCountMax\":10,\"taskCountMin\":1,\"minTriggerScaleActionFrequencyMillis\":45000}"; + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(json, LagBasedAutoScalerConfig.class); + Assert.assertEquals(45000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(45000), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(45000), config.getMinScaleDownDelay()); + + // JSON with minTriggerScaleActionFrequencyMillis and only minScaleUpDelay: + // getMinScaleUpDelay() should return the explicit value; getMinScaleDownDelay() falls back. + String jsonUpOnly = "{\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":45000," + + "\"minScaleUpDelay\":\"PT10S\"}"; + LagBasedAutoScalerConfig configUpOnly = OBJECT_MAPPER.readValue(jsonUpOnly, LagBasedAutoScalerConfig.class); + Assert.assertEquals(Duration.standardSeconds(10), configUpOnly.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(45000), configUpOnly.getMinScaleDownDelay()); + } + + @Test + @SuppressWarnings("deprecation") + public void testMinTriggerScaleActionFrequencyMillisSerdeCompat() throws Exception + { + final long defaultMinTriggerMillis = 600_000L; + + // Backwards-compat: nothing set -> deprecated field gets its default and both direction + // delays fall back to it. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(defaultMinTriggerMillis, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(defaultMinTriggerMillis), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(defaultMinTriggerMillis), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Backwards-compat: legacy spec sets only the deprecated field. Both direction delays fall + // back to it. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1,\"minTriggerScaleActionFrequencyMillis\":900000}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(900_000), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(900_000), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Forwards-compat: direction delays set, deprecated field omitted. Deprecated field defaults. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(defaultMinTriggerMillis, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(15), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Forwards-compat: deprecated field AND direction delays set (overlapping migration window). + // Direction delays win for their own direction; the deprecated field is preserved. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000," + + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(15), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Mixed: only minScaleUpDelay + deprecated field set. Down falls back to the deprecated field. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000," + + "\"minScaleUpDelay\":\"PT2M\"}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(900_000), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Mixed: only minScaleDownDelay + deprecated field set. Up falls back to the deprecated field. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000," + + "\"minScaleDownDelay\":\"PT15M\"}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(900_000), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(15), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Mixed: deprecated field omitted, only minScaleUpDelay set. Down falls back to the + // deprecated field's default. + { + LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue( + "{\"taskCountMax\":10,\"taskCountMin\":1,\"minScaleUpDelay\":\"PT2M\"}", + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(defaultMinTriggerMillis, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.millis(defaultMinTriggerMillis), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + } + + private void assertRoundTrips(LagBasedAutoScalerConfig config) throws Exception + { + LagBasedAutoScalerConfig roundTripped = OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(config), + LagBasedAutoScalerConfig.class + ); + Assert.assertEquals(config, roundTripped); + } + @Test public void testEqualsAndHashCode() { @@ -302,6 +568,8 @@ public void testEqualsAndHashCode() 3, true, 7000L, + null, + null, AggregateFunction.SUM, 0.5 ); @@ -321,6 +589,8 @@ public void testEqualsAndHashCode() 1, true, 7000L, + null, + null, AggregateFunction.AVERAGE, 0.5 ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 7c20855b033d..bf3d5f9d71e4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -441,7 +441,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc .stream() .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) .filter(Objects::nonNull) - .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed yet"::equals)); + .anyMatch("Scale cooldown not elapsed yet"::equals)); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); autoScaler.reset(); autoScaler.stop(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d59e2711ce88..5f6986551a76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -49,10 +49,13 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; @@ -81,6 +84,8 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.PendingSegmentRecord; @@ -2683,6 +2688,8 @@ public void test_calculateWorkerThreads_shouldUseAutoScalerConfig() true, null, null, + null, + null, null ); SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1, autoScalerConfig, null); @@ -2760,6 +2767,8 @@ public void testMaxAllowedStopsWithStopTaskCountRatio() true, null, null, + null, + null, 0.4 ); SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( @@ -3303,6 +3312,22 @@ public void setStreamOffsets(Map streamOffsets) } } + private class StateOverrideTestSeekableStreamSupervisor extends TestSeekableStreamSupervisor + { + private final SupervisorStateManager.State state; + + private StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.State state) + { + this.state = state; + } + + @Override + public SupervisorStateManager.State getState() + { + return state; + } + } + private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor { private final CountDownLatch latch; @@ -3572,6 +3597,134 @@ public void testDiscoverExistingTasks_withServerPriorities() } + @Test + public void testDynamicAllocationScaleUpAllowedWhenCooldownElapsed() + { + final long zeroCooldown = 0L; + final long unusedCooldown = Duration.standardHours(1).getMillis(); + final StubServiceEmitter scalingEmitter = setupSupervisorForAutoScalingTest(zeroCooldown, unusedCooldown, 2); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + // minScaleUpDelay = 0 means any scale-up is immediately allowed. + supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, scalingEmitter); + + Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals("Exactly one required-tasks emission expected", 1, events.size()); + assertScaledToTaskCount(events.get(0), 5); + } + + @Test + public void testDynamicAllocationScaleUpBlockedWhenCooldownNotElapsed() + { + final long scaleUpCooldown = Duration.standardHours(1).getMillis(); + final long unusedCooldown = 0L; + final StubServiceEmitter scalingEmitter = setupSupervisorForAutoScalingTest(scaleUpCooldown, unusedCooldown, 2); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + // First scale-up succeeds and stamps the last-scale timestamp. + supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, scalingEmitter); + Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue()); + + // Second scale-up is within the 1h minScaleUpDelay window and must be blocked. + supervisor.handleDynamicAllocationTasksNotice(() -> 7, () -> {}, scalingEmitter); + Assert.assertEquals("Second scale-up must not take effect", 5, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals("Two required-tasks emissions expected (one applied, one skipped)", 2, events.size()); + // First emission: the successful scale carries no skip-reason dim and reports the applied count. + assertScaledToTaskCount(events.get(0), 5); + // Second emission: the gated scale carries the cooldown skip-reason dim and the proposed (not applied) count. + assertScaleSkipped(events.get(1), 7, "Scale cooldown not elapsed yet"); + } + + @Test + public void testDynamicAllocationScaleDownAllowedWhenCooldownElapsed() + { + final long unusedCooldown = Duration.standardHours(1).getMillis(); + final long zeroCooldown = 0L; + final StubServiceEmitter scalingEmitter = setupSupervisorForAutoScalingTest(unusedCooldown, zeroCooldown, 5); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + // minScaleDownDelay = 0 means any scale-down is immediately allowed. + supervisor.handleDynamicAllocationTasksNotice(() -> 2, () -> {}, scalingEmitter); + + Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals("Exactly one required-tasks emission expected", 1, events.size()); + assertScaledToTaskCount(events.get(0), 2); + } + + @Test + public void testDynamicAllocationScaleDownBlockedWhenCooldownNotElapsed() + { + final long unusedCooldown = 0L; + final long scaleDownCooldown = Duration.standardHours(1).getMillis(); + final StubServiceEmitter scalingEmitter = setupSupervisorForAutoScalingTest(unusedCooldown, scaleDownCooldown, 5); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + // First scale-down succeeds and stamps the last-scale timestamp. + supervisor.handleDynamicAllocationTasksNotice(() -> 3, () -> {}, scalingEmitter); + Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount().intValue()); + + // Second scale-down is within the 1h minScaleDownDelay window and must be blocked. + supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {}, scalingEmitter); + Assert.assertEquals("Second scale-down must not take effect", 3, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals("Two required-tasks emissions expected (one applied, one skipped)", 2, events.size()); + assertScaledToTaskCount(events.get(0), 3); + assertScaleSkipped(events.get(1), 1, "Scale cooldown not elapsed yet"); + } + + /** + * Asserts that a required-tasks emission represents an scale event: it carries the standard + * supervisor/datasource/stream dims, no scalingSkipReason dim, and the metric value matches the + * new task count. + */ + private static void assertScaledToTaskCount(ServiceMetricEvent event, int expectedRequiredCount) + { + assertStandardDimensions(event); + Assert.assertNull( + "Attempted scale must not carry a scalingSkipReason dim", + event.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION) + ); + Assert.assertEquals(expectedRequiredCount, event.getValue().intValue()); + } + + /** + * Asserts that a required-tasks emission represents a skipped scale: it carries the standard + * supervisor/datasource/stream dims, a scalingSkipReason dim equal to {@code expectedReason}, + * and the metric value matches the proposed (not applied) task count. + */ + private static void assertScaleSkipped(ServiceMetricEvent event, int expectedRequiredCount, String expectedReason) + { + assertStandardDimensions(event); + Assert.assertEquals( + expectedReason, + event.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION) + ); + Assert.assertEquals(expectedRequiredCount, event.getValue().intValue()); + } + + private static void assertStandardDimensions(ServiceMetricEvent event) + { + final Map dims = event.getUserDims(); + Assert.assertEquals(SUPERVISOR_ID, dims.get(DruidMetrics.SUPERVISOR_ID)); + Assert.assertEquals(DATASOURCE, dims.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals(STREAM, dims.get(DruidMetrics.STREAM)); + } + private static TestSeekableStreamIndexTask createTestTask(String taskId, String groupId, @Nullable Integer serverPriority, SeekableStreamIndexTaskIOConfig taskIoConfig, RecordSupplier recordSupplier) { return new TestSeekableStreamIndexTask( @@ -3588,4 +3741,118 @@ private static TestSeekableStreamIndexTask createTestTask(String taskId, String serverPriority ); } + + /** + * Resets the {@link #spec} and {@link #taskMaster} mocks so the supervisor sees an ioConfig with + * the given direction-specific cooldowns and so {@code changeTaskCountInIOConfig} can run + * without hitting unmocked calls. Returns a dedicated emitter for the caller to pass into the + * notice handler so dynamic-allocation events can be asserted in isolation. + */ + private StubServiceEmitter setupSupervisorForAutoScalingTest( + long minScaleUpDelayMillis, + long minScaleDownDelayMillis, + int initialTaskCount + ) + { + final AutoScalerConfig autoScalerConfig = testAutoScalerConfig( + minScaleUpDelayMillis, + minScaleDownDelayMillis + ); + final SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig( + initialTaskCount, + autoScalerConfig, + null + ); + return resetSpecAndTaskMasterForScaling(ioConfig); + } + + /** + * Returns a minimal test-only {@link AutoScalerConfig} + */ + private static AutoScalerConfig testAutoScalerConfig(long minScaleUpDelayMillis, long minScaleDownDelayMillis) + { + return new AutoScalerConfig() + { + @Override + public boolean getEnableTaskAutoScaler() + { + return true; + } + + @Override + public long getMinTriggerScaleActionFrequencyMillis() + { + return 0L; + } + + @Override + public Duration getMinScaleUpDelay() + { + return Duration.millis(minScaleUpDelayMillis); + } + + @Override + public Duration getMinScaleDownDelay() + { + return Duration.millis(minScaleDownDelayMillis); + } + + @Override + public int getTaskCountMax() + { + return 100; + } + + @Override + public int getTaskCountMin() + { + return 1; + } + + @Override + public Integer getTaskCountStart() + { + return null; + } + + @Override + public Double getStopTaskCountRatio() + { + return null; + } + + @Override + public SupervisorTaskAutoScaler createAutoScaler( + Supervisor supervisor, + SupervisorSpec spec, + ServiceEmitter emitter + ) + { + throw new UnsupportedOperationException("test autoscaler config: createAutoScaler not used"); + } + }; + } + + private StubServiceEmitter resetSpecAndTaskMasterForScaling(SeekableStreamSupervisorIOConfig ioConfig) + { + final StubServiceEmitter scalingEmitter = new StubServiceEmitter("scaling", "localhost"); + + EasyMock.reset(spec, taskMaster); + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + // changeTaskCountInIOConfig calls this; absent path just logs and moves on. + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + + replayAll(); + return scalingEmitter; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java index fa50d87b56d0..05724d1e8375 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java @@ -28,7 +28,6 @@ import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT; import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT; import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY; -import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS; import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS; @SuppressWarnings("TextBlockMigration") @@ -51,6 +50,7 @@ public void testSerdeWithAllProperties() throws Exception + " \"lagWeight\": 0.6,\n" + " \"idleWeight\": 0.4,\n" + " \"highLagThreshold\": 30000,\n" + + " \"minScaleUpDelay\": \"PT5M\",\n" + " \"minScaleDownDelay\": \"PT10M\",\n" + " \"scaleDownDuringTaskRolloverOnly\": true\n" + "}"; @@ -61,11 +61,11 @@ public void testSerdeWithAllProperties() throws Exception Assert.assertEquals(100, config.getTaskCountMax()); Assert.assertEquals(5, config.getTaskCountMin()); Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart()); - Assert.assertEquals(600000L, config.getMinTriggerScaleActionFrequencyMillis()); Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio()); Assert.assertEquals(60000L, config.getScaleActionPeriodMillis()); Assert.assertEquals(0.6, config.getLagWeight(), 0.001); Assert.assertEquals(0.4, config.getIdleWeight(), 0.001); + Assert.assertEquals(Duration.standardMinutes(5), config.getMinScaleUpDelay()); Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay()); Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly()); Assert.assertEquals(30000, config.getHighLagThreshold()); @@ -95,12 +95,10 @@ public void testSerdeWithDefaults() throws Exception // Check defaults Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS, config.getScaleActionPeriodMillis()); - Assert.assertEquals( - DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS, - config.getMinTriggerScaleActionFrequencyMillis() - ); Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001); Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001); + // minScaleUpDelay and minScaleDownDelay each have their own independent default + Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), config.getMinScaleUpDelay()); Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, config.getMinScaleDownDelay()); Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly()); Assert.assertNull(config.getTaskCountStart()); @@ -183,11 +181,11 @@ public void testBuilder() .taskCountMin(5) .taskCountStart(10) .enableTaskAutoScaler(true) - .minTriggerScaleActionFrequencyMillis(600000L) .stopTaskCountRatio(0.8) .scaleActionPeriodMillis(60000L) .lagWeight(0.6) .idleWeight(0.4) + .minScaleUpDelay(Duration.standardMinutes(5)) .minScaleDownDelay(Duration.standardMinutes(10)) .scaleDownDuringTaskRolloverOnly(true) .highLagThreshold(30000) @@ -197,13 +195,156 @@ public void testBuilder() Assert.assertEquals(100, config.getTaskCountMax()); Assert.assertEquals(5, config.getTaskCountMin()); Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart()); - Assert.assertEquals(600000L, config.getMinTriggerScaleActionFrequencyMillis()); Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio()); Assert.assertEquals(60000L, config.getScaleActionPeriodMillis()); Assert.assertEquals(0.6, config.getLagWeight(), 0.001); Assert.assertEquals(0.4, config.getIdleWeight(), 0.001); + Assert.assertEquals(Duration.standardMinutes(5), config.getMinScaleUpDelay()); Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay()); Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly()); Assert.assertEquals(30000, config.getHighLagThreshold()); } + + @Test + public void testScaleDelayDefaults() throws Exception + { + // Neither set: each direction gets its own independent default + CostBasedAutoScalerConfig defaults = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .build(); + Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), defaults.getMinScaleUpDelay()); + Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, defaults.getMinScaleDownDelay()); + + // Only minScaleUpDelay set: up uses explicit value, down uses its default + CostBasedAutoScalerConfig upOnly = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .minScaleUpDelay(Duration.standardMinutes(5)) + .build(); + Assert.assertEquals(Duration.standardMinutes(5), upOnly.getMinScaleUpDelay()); + Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, upOnly.getMinScaleDownDelay()); + + // Only minScaleDownDelay set: down uses explicit value, up uses its own default (does not fall back to down) + CostBasedAutoScalerConfig downOnly = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .minScaleDownDelay(Duration.standardMinutes(20)) + .build(); + Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), downOnly.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(20), downOnly.getMinScaleDownDelay()); + + // Both set: serde roundtrip preserves values + CostBasedAutoScalerConfig bothSet = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .minScaleUpDelay(Duration.standardMinutes(5)) + .minScaleDownDelay(Duration.standardMinutes(20)) + .build(); + Assert.assertEquals(Duration.standardMinutes(5), bothSet.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(20), bothSet.getMinScaleDownDelay()); + CostBasedAutoScalerConfig roundTripped = mapper.readValue(mapper.writeValueAsString(bothSet), CostBasedAutoScalerConfig.class); + Assert.assertEquals(bothSet, roundTripped); + } + + @Test + @SuppressWarnings("deprecation") + public void testMinTriggerScaleActionFrequencyMillisSerdeCompat() throws Exception + { + final long defaultMinTriggerMillis = DEFAULT_SCALE_ACTION_PERIOD_MILLIS; + final Duration defaultUp = Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS); + final Duration defaultDown = DEFAULT_MIN_SCALE_DELAY; + + // Backwards-compat: nothing set -> everything uses its own default. + { + CostBasedAutoScalerConfig config = mapper.readValue( + "{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1}", + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(defaultMinTriggerMillis, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(defaultUp, config.getMinScaleUpDelay()); + Assert.assertEquals(defaultDown, config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Backwards-compat: legacy spec sets only the deprecated field. Direction delays still use + // their own defaults (no cross-field fallback in CostBased). + { + CostBasedAutoScalerConfig config = mapper.readValue( + "{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000}", + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(900_000L), config.getMinScaleUpDelay()); + Assert.assertEquals(defaultDown, config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Forwards-compat: direction delays set, deprecated field omitted. Deprecated field defaults. + { + CostBasedAutoScalerConfig config = mapper.readValue( + "{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}", + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(defaultMinTriggerMillis, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(15), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Forwards-compat: deprecated field AND direction delays set (overlapping migration window). + // All three are honored independently. + { + CostBasedAutoScalerConfig config = mapper.readValue( + "{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000," + + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}", + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(15), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Only minScaleUpDelay set alongside the deprecated field: down uses its own default, + // not the deprecated field's value. + { + CostBasedAutoScalerConfig config = mapper.readValue( + "{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000," + + "\"minScaleUpDelay\":\"PT2M\"}", + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.standardMinutes(2), config.getMinScaleUpDelay()); + Assert.assertEquals(defaultDown, config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + + // Only minScaleDownDelay set alongside the deprecated field: up uses its own default. + { + CostBasedAutoScalerConfig config = mapper.readValue( + "{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1," + + "\"minTriggerScaleActionFrequencyMillis\":900000," + + "\"minScaleDownDelay\":\"PT15M\"}", + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(900_000L, config.getMinTriggerScaleActionFrequencyMillis()); + Assert.assertEquals(Duration.millis(900_000L), config.getMinScaleUpDelay()); + Assert.assertEquals(Duration.standardMinutes(15), config.getMinScaleDownDelay()); + assertRoundTrips(config); + } + } + + private void assertRoundTrips(CostBasedAutoScalerConfig config) throws Exception + { + CostBasedAutoScalerConfig roundTripped = mapper.readValue( + mapper.writeValueAsString(config), + CostBasedAutoScalerConfig.class + ); + Assert.assertEquals(config, roundTripped); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java index 26c4c20f8594..6466b0966e8b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java @@ -82,24 +82,10 @@ public void setUp() @Test public void testScaleUpWhenOptimalGreaterThanCurrent() { - // Use config with a long barrier to test cooldown behavior - CostBasedAutoScalerConfig barrierConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .minScaleDownDelay(Duration.standardHours(1)) - .build(); - - CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler( - mockSupervisor, - barrierConfig, - mockSpec, - mockEmitter - )); + CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter)); int currentTaskCount = 10; int scaleUpOptimal = 17; - // Trigger scale-up, which should set the cooldown timer doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any()); setupMocksForMetricsCollection(autoScaler, currentTaskCount, 5000.0, 0.1); @@ -108,15 +94,6 @@ public void testScaleUpWhenOptimalGreaterThanCurrent() scaleUpOptimal, autoScaler.computeTaskCountForScaleAction() ); - - // Verify cooldown blocks immediate subsequent scaling - doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any()); - setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9); - Assert.assertEquals( - "Scale action should be blocked during the cooldown window", - -1, - autoScaler.computeTaskCountForScaleAction() - ); } @Test @@ -135,45 +112,6 @@ public void testNoOpWhenOptimalEqualsCurrent() Assert.assertEquals("Should return -1 when it equals current (no change needed)", -1, result); } - @Test - public void testScaleDownBlockedReturnsMinusOne() - { - // Use config with a long barrier to test cooldown behavior - CostBasedAutoScalerConfig barrierConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .minScaleDownDelay(Duration.standardHours(1)) - .build(); - - CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler( - mockSupervisor, - barrierConfig, - mockSpec, - mockEmitter - )); - - int currentTaskCount = 50; - int optimalCount = 30; // Lower than current (scale-down scenario) - - doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any()); - setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9); - - // First attempt: allowed (no prior scale action) - Assert.assertEquals( - "Scale-down should succeed when no prior scale action exists", - optimalCount, - autoScaler.computeTaskCountForScaleAction() - ); - - // Second attempt: blocked by cooldown - Assert.assertEquals( - "Scale-down should be blocked during the cooldown window", - -1, - autoScaler.computeTaskCountForScaleAction() - ); - } - @Test public void testReturnsMinusOneWhenMetricsCollectionFails() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java index 7f5a0f5a9dfe..ffbfee77b723 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java @@ -74,6 +74,8 @@ public void setUp() 4, true, // enableTaskAutoScaler 6_000_000L, // minTriggerScaleActionFrequencyMillis + null, // minScaleUpDelay + null, // minScaleDownDelay null, // lagAggregate null // stopTaskCountRatio ); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java new file mode 100644 index 000000000000..63523f6cfb04 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor.autoscaler; + +public enum ScaleDirection +{ + SCALE_UP("scale-up"), SCALE_DOWN("scale-down"); + + private final String label; + + ScaleDirection(String label) + { + this.label = label; + } + + @Override + public String toString() + { + return label; + } +} diff --git a/website/.spelling b/website/.spelling index 99e767c48910..28701817f362 100644 --- a/website/.spelling +++ b/website/.spelling @@ -895,6 +895,8 @@ c3.2xlarge defaultManualBrokerService maxPriority minPriority +minScaleUpDelay +minScaleDownDelay NUMBER_FEATURES NUMBER_OF_CONTRIBUTORS PreparedStatement @@ -2080,6 +2082,7 @@ autoscalers batch_index_task cgroup classloader +cooldown com.metamx common.runtime.properties cpuacct @@ -2168,6 +2171,8 @@ s3n slf4j sql sqlQuery +scale-up +scale-down successfulSending [S]igar taskBlackListCleanupPeriod