Skip to content

Commit 752b235

Browse files
committed
Switch to minScale.*Delay configs and deprecate minTriggerScaleActionFrequencyMillis
1 parent cadba38 commit 752b235

14 files changed

Lines changed: 188 additions & 177 deletions

File tree

docs/ingestion/kafka-ingestion.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle configuration enabled:
235235
"enableTaskAutoScaler": true,
236236
"taskCountMax": 6,
237237
"taskCountMin": 2,
238-
"minTriggerScaleActionFrequencyMillis": 600000,
238+
"minScaleUpDelay": "PT10M",
239+
"minScaleDownDelay": "PT10M",
239240
"autoScalerStrategy": "lagBased",
240241
"lagCollectionIntervalMillis": 30000,
241242
"lagCollectionRangeMillis": 600000,

docs/ingestion/supervisor.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ The following table outlines the configuration properties for `autoScalerConfig`
7979
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka partitions or Kinesis shards, Druid sets the maximum number of reading tasks to the number of Kafka partitions or Kinesis shards and ignores `taskCountMax`.|Yes||
8080
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|Yes||
8181
|`taskCountStart`|Optional config to specify the number of ingestion tasks to start with. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|No|`taskCount` or `taskCountMin`|
82-
|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between any two scale actions. Used as the default fallback when `minTriggerScaleUpActionFrequencyMillis` or `minTriggerScaleDownActionFrequencyMillis` are not set.| No|600000|
83-
|`minTriggerScaleUpActionFrequencyMillis`|The minimum time interval between two scale-up actions. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.| No||
84-
|`minTriggerScaleDownActionFrequencyMillis`|The minimum time interval between two scale-down actions. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.| No||
82+
|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
83+
|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
84+
|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` and `minScaleDownDelay` instead. Minimum time interval in milliseconds between scale actions, used as the fallback when the Duration-based fields are not set.|No|600000|
8585
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`|
8686
|`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in steady state to be proportional to the number of tasks currently running.|No||
8787

@@ -163,7 +163,8 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
163163
"enableTaskAutoScaler": true,
164164
"taskCountMax": 6,
165165
"taskCountMin": 2,
166-
"minTriggerScaleActionFrequencyMillis": 600000,
166+
"minScaleUpDelay": "PT10M",
167+
"minScaleDownDelay": "PT10M",
167168
"autoScalerStrategy": "lagBased",
168169
"lagCollectionIntervalMillis": 30000,
169170
"lagCollectionRangeMillis": 600000,
@@ -212,10 +213,11 @@ The following table outlines the configuration properties related to the `costBa
212213
|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 |
213214
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` |
214215
|`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1|
215-
|`minScaleDownDelay`|Minimum duration between successful scale actions, specified as an ISO-8601 duration string.|No|`PT30M`|
216+
|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string. Falls back to `minScaleDownDelay` if not set.|No||
217+
|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`|
216218
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`|
217219

218-
The following example shows a supervisor spec with `lagBased` autoscaler:
220+
The following example shows a supervisor spec with `costBased` autoscaler:
219221

220222
<details>
221223
<summary>Click to view the example</summary>
@@ -229,9 +231,10 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
229231
"autoScalerStrategy": "costBased",
230232
"taskCountMin": 1,
231233
"taskCountMax": 10,
232-
"minTriggerScaleActionFrequencyMillis": 600000,
234+
"minScaleUpDelay": "PT10M",
235+
"minScaleDownDelay": "PT30M",
233236
"lagWeight": 0.1,
234-
"idleWeight": 0.9,
237+
"idleWeight": 0.9
235238
}
236239
}
237240
}

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
315315
autoScalerConfig.put("scaleInStep", 1);
316316
autoScalerConfig.put("scaleOutStep", 2);
317317
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
318+
autoScalerConfig.put("minScaleUpDelay", "PT20M");
319+
autoScalerConfig.put("minScaleDownDelay", "PT20M");
318320

319321
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
320322
consumerProperties.put("bootstrap.servers", "localhost:8082");

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
306306
autoScalerConfig.put("taskCountMin", 1);
307307
autoScalerConfig.put("scaleInStep", 1);
308308
autoScalerConfig.put("scaleOutStep", 2);
309-
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
309+
autoScalerConfig.put("minScaleUpDelay", "PT20M");
310+
autoScalerConfig.put("minScaleDownDelay", "PT20M");
310311

311312
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
312313
consumerProperties.put("myCustomKey", "myCustomValue");

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception
310310
autoScalerConfigMap.put("taskCountMin", 1);
311311
autoScalerConfigMap.put("scaleInStep", 1);
312312
autoScalerConfigMap.put("scaleOutStep", 2);
313-
autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
313+
autoScalerConfigMap.put("minScaleUpDelay", "PT20M");
314+
autoScalerConfigMap.put("minScaleDownDelay", "PT20M");
314315

315316
AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
316317
supervisor = getTestableSupervisor(
@@ -384,7 +385,8 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception
384385
autoScalerConfigMap.put("taskCountMin", 1);
385386
autoScalerConfigMap.put("scaleInStep", 1);
386387
autoScalerConfigMap.put("scaleOutStep", 2);
387-
autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
388+
autoScalerConfigMap.put("minScaleUpDelay", "PT20M");
389+
autoScalerConfigMap.put("minScaleDownDelay", "PT20M");
388390

389391
AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
390392
supervisor = getTestableSupervisor(

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -552,31 +552,23 @@ public void handle()
552552
// configure a longer cooldown for scale-down than for scale-up. Both directions are measured against the same
553553
// last-scale timestamp so that a rapid up/down oscillation is still subject to the appropriate cooldown,
554554
// regardless of which direction triggered last.
555-
final long effectiveFreq;
556555
final ScaleDirection scaleDirection;
556+
final long cooldownMillis;
557557

558558
if (desiredTaskCount > currentTaskCount) {
559-
// Scale up: use the scale-up specific frequency, falling back to base.
560-
Long specificFreq = autoScalerConfig.getMinTriggerScaleUpActionFrequencyMillis();
561-
effectiveFreq = specificFreq != null
562-
? specificFreq
563-
: autoScalerConfig.getMinTriggerScaleActionFrequencyMillis();
564559
scaleDirection = ScaleDirection.SCALE_UP;
560+
cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
565561
} else {
566-
// Scale down: use the scale-down specific frequency, falling back to base.
567-
Long specificFreq = autoScalerConfig.getMinTriggerScaleDownActionFrequencyMillis();
568-
effectiveFreq = specificFreq != null
569-
? specificFreq
570-
: autoScalerConfig.getMinTriggerScaleActionFrequencyMillis();
571562
scaleDirection = ScaleDirection.SCALE_DOWN;
563+
cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis();
572564
}
573565

574-
if (nowTime - dynamicTriggerLastScaleRunTime < effectiveFreq) {
566+
if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
575567
log.info(
576-
"DynamicAllocationTasksNotice submitted again in [%d]ms, effective [%s] throttle is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]",
568+
"DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]",
577569
nowTime - dynamicTriggerLastScaleRunTime,
578570
scaleDirection,
579-
effectiveFreq,
571+
cooldownMillis,
580572
supervisorId,
581573
dataSource,
582574
desiredTaskCount,
@@ -585,7 +577,7 @@ public void handle()
585577

586578
emitter.emit(event.setDimension(
587579
AUTOSCALER_SKIP_REASON_DIMENSION,
588-
"minTriggerScaleActionFrequencyMillis not elapsed yet"
580+
"Scale cooldown not elapsed yet"
589581
)
590582
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
591583
return;
@@ -633,8 +625,8 @@ public String getType()
633625
*
634626
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
635627
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least the configured
636-
* minTriggerScale(Up|Down)ActionFrequencyMillis (falling back to minTriggerScaleActionFrequencyMillis) before the next same-direction 'changeTaskCount'.
637-
* If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
628+
* minScaleUpDelay or minScaleDownDelay (falling back to minTriggerScaleActionFrequencyMillis) before the next
629+
* same-direction 'changeTaskCount'. If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
638630
* @throws InterruptedException
639631
* @throws ExecutionException
640632
*/

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
2828
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
2929
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
30-
31-
import javax.annotation.Nullable;
30+
import org.joda.time.Duration;
3231

3332
@UnstableApi
3433
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
@@ -39,19 +38,27 @@
3938
public interface AutoScalerConfig
4039
{
4140
boolean getEnableTaskAutoScaler();
41+
42+
/**
43+
* @deprecated Use {@link #getMinScaleUpDelay()} and {@link #getMinScaleDownDelay()} instead.
44+
* This field is retained for backward compatibility and will be removed in a future version.
45+
*/
46+
@Deprecated
4247
long getMinTriggerScaleActionFrequencyMillis();
4348

44-
@Nullable
45-
default Long getMinTriggerScaleUpActionFrequencyMillis()
46-
{
47-
return null;
48-
}
49-
50-
@Nullable
51-
default Long getMinTriggerScaleDownActionFrequencyMillis()
52-
{
53-
return null;
54-
}
49+
/**
50+
* Minimum time that must elapse after any scale action before a scale-up is permitted.
51+
* If not explicitly configured, implementations fall back to
52+
* {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility.
53+
*/
54+
Duration getMinScaleUpDelay();
55+
56+
/**
57+
* Minimum time that must elapse after any scale action before a scale-down is permitted.
58+
* If not explicitly configured, implementations fall back to
59+
* {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility.
60+
*/
61+
Duration getMinScaleDownDelay();
5562

5663
int getTaskCountMax();
5764
int getTaskCountMin();

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,12 @@ public int computeTaskCountForScaleAction()
179179

180180
// Perform scale-up actions; scale-down actions only if configured.
181181
final int taskCount;
182-
if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
182+
if (isScaleActionAllowed(true) && optimalTaskCount > currentTaskCount) {
183183
taskCount = optimalTaskCount;
184184
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
185185
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
186186
} else if (!config.isScaleDownOnTaskRolloverOnly()
187-
&& isScaleActionAllowed()
187+
&& isScaleActionAllowed(false)
188188
&& optimalTaskCount < currentTaskCount
189189
&& optimalTaskCount > 0) {
190190
taskCount = optimalTaskCount;
@@ -562,16 +562,18 @@ private Either<String, Boolean> validateMetricsForScaling(CostMetrics metrics)
562562
}
563563

564564
/**
565-
* Determines if a scale action is currently allowed based on the elapsed time
566-
* since the last scale action and the configured minimum scale-down delay.
565+
* Determines if a scale action in the given direction is currently allowed based on the elapsed time
566+
* since the last scale action and the configured delay for that direction.
567567
*/
568-
private boolean isScaleActionAllowed()
568+
private boolean isScaleActionAllowed(boolean isScaleUp)
569569
{
570570
if (lastScaleActionTimeMillis < 0) {
571571
return true;
572572
}
573573

574-
final long barrierMillis = config.getMinScaleDownDelay().getMillis();
574+
final long barrierMillis = isScaleUp
575+
? config.getMinScaleUpDelay().getMillis()
576+
: config.getMinScaleDownDelay().getMillis();
575577
if (barrierMillis <= 0) {
576578
return true;
577579
}

0 commit comments

Comments
 (0)