Skip to content

Commit 008d2f6

Browse files
committed
feat: support scaling direction-aware cooldown for task auto-scalers
1 parent 8f3df74 commit 008d2f6

20 files changed

Lines changed: 701 additions & 131 deletions

File tree

docs/ingestion/kafka-ingestion.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle configuration enabled:
235235
"enableTaskAutoScaler": true,
236236
"taskCountMax": 6,
237237
"taskCountMin": 2,
238-
"minTriggerScaleActionFrequencyMillis": 600000,
238+
"minScaleUpDelay": "PT10M",
239+
"minScaleDownDelay": "PT10M",
239240
"autoScalerStrategy": "lagBased",
240241
"lagCollectionIntervalMillis": 30000,
241242
"lagCollectionRangeMillis": 600000,

docs/ingestion/supervisor.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ The following table outlines the configuration properties for `autoScalerConfig`
7979
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka partitions or Kinesis shards, Druid sets the maximum number of reading tasks to the number of Kafka partitions or Kinesis shards and ignores `taskCountMax`.|Yes||
8080
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|Yes||
8181
|`taskCountStart`|Optional config to specify the number of ingestion tasks to start with. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|No|`taskCount` or `taskCountMin`|
82-
|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two scale actions.| No|600000|
82+
|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
83+
|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
84+
|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` and `minScaleDownDelay` instead. Minimum time interval in milliseconds between scale actions, used as the fallback when the Duration-based fields are not set.|No|600000|
8385
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`|
8486
|`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in steady state to be proportional to the number of tasks currently running.|No||
8587

@@ -161,7 +163,8 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
161163
"enableTaskAutoScaler": true,
162164
"taskCountMax": 6,
163165
"taskCountMin": 2,
164-
"minTriggerScaleActionFrequencyMillis": 600000,
166+
"minScaleUpDelay": "PT10M",
167+
"minScaleDownDelay": "PT10M",
165168
"autoScalerStrategy": "lagBased",
166169
"lagCollectionIntervalMillis": 30000,
167170
"lagCollectionRangeMillis": 600000,
@@ -210,10 +213,11 @@ The following table outlines the configuration properties related to the `costBa
210213
|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 |
211214
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` |
212215
|`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1|
213-
|`minScaleDownDelay`|Minimum duration between successful scale actions, specified as an ISO-8601 duration string.|No|`PT30M`|
216+
|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string. Falls back to `minScaleDownDelay` if not set.|No||
217+
|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`|
214218
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`|
215219

216-
The following example shows a supervisor spec with `lagBased` autoscaler:
220+
The following example shows a supervisor spec with `costBased` autoscaler:
217221

218222
<details>
219223
<summary>Click to view the example</summary>
@@ -227,9 +231,10 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
227231
"autoScalerStrategy": "costBased",
228232
"taskCountMin": 1,
229233
"taskCountMax": 10,
230-
"minTriggerScaleActionFrequencyMillis": 600000,
234+
"minScaleUpDelay": "PT10M",
235+
"minScaleDownDelay": "PT30M",
231236
"lagWeight": 0.1,
232-
"idleWeight": 0.9,
237+
"idleWeight": 0.9
233238
}
234239
}
235240
}

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
315315
autoScalerConfig.put("scaleInStep", 1);
316316
autoScalerConfig.put("scaleOutStep", 2);
317317
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
318+
autoScalerConfig.put("minScaleUpDelay", "PT20M");
319+
autoScalerConfig.put("minScaleDownDelay", "PT20M");
318320

319321
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
320322
consumerProperties.put("bootstrap.servers", "localhost:8082");

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
306306
autoScalerConfig.put("taskCountMin", 1);
307307
autoScalerConfig.put("scaleInStep", 1);
308308
autoScalerConfig.put("scaleOutStep", 2);
309-
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
309+
autoScalerConfig.put("minScaleUpDelay", "PT20M");
310+
autoScalerConfig.put("minScaleDownDelay", "PT20M");
310311

311312
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
312313
consumerProperties.put("myCustomKey", "myCustomValue");

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception
310310
autoScalerConfigMap.put("taskCountMin", 1);
311311
autoScalerConfigMap.put("scaleInStep", 1);
312312
autoScalerConfigMap.put("scaleOutStep", 2);
313-
autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
313+
autoScalerConfigMap.put("minScaleUpDelay", "PT20M");
314+
autoScalerConfigMap.put("minScaleDownDelay", "PT20M");
314315

315316
AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
316317
supervisor = getTestableSupervisor(
@@ -384,7 +385,8 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception
384385
autoScalerConfigMap.put("taskCountMin", 1);
385386
autoScalerConfigMap.put("scaleInStep", 1);
386387
autoScalerConfigMap.put("scaleOutStep", 2);
387-
autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
388+
autoScalerConfigMap.put("minScaleUpDelay", "PT20M");
389+
autoScalerConfigMap.put("minScaleDownDelay", "PT20M");
388390

389391
AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
390392
supervisor = getTestableSupervisor(

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
6565
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
6666
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
67+
import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleActionTaskCountSupplier;
68+
import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection;
6769
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
6870
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
6971
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -122,7 +124,6 @@
122124
import java.util.SortedMap;
123125
import java.util.TreeMap;
124126
import java.util.TreeSet;
125-
import java.util.concurrent.Callable;
126127
import java.util.concurrent.ConcurrentHashMap;
127128
import java.util.concurrent.CopyOnWriteArrayList;
128129
import java.util.concurrent.ExecutionException;
@@ -454,13 +455,13 @@ public boolean equals(Object obj)
454455
// change taskCount without resubmitting.
455456
private class DynamicAllocationTasksNotice implements Notice
456457
{
457-
Callable<Integer> computeDesiredTaskCount;
458+
ScaleActionTaskCountSupplier computeDesiredTaskCount;
458459
ServiceEmitter emitter;
459460
Runnable onSuccessfulScale;
460461
private static final String TYPE = "dynamic_allocation_tasks_notice";
461462

462463
DynamicAllocationTasksNotice(
463-
Callable<Integer> computeDesiredTaskCount,
464+
ScaleActionTaskCountSupplier computeDesiredTaskCount,
464465
Runnable onSuccessfulScale,
465466
ServiceEmitter emitter
466467
)
@@ -499,11 +500,34 @@ public void handle()
499500
supervisorId,
500501
dataSource
501502
);
502-
final Integer desiredTaskCount = computeDesiredTaskCount.call();
503+
final int desiredTaskCount = computeDesiredTaskCount.computeTaskCount();
504+
final int currentTaskCount = getCurrentTaskCount();
505+
506+
if (desiredTaskCount <= 0) {
507+
return;
508+
}
509+
503510
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
504511
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
505512
.setDimension(DruidMetrics.DATASOURCE, dataSource)
506513
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
514+
515+
// 1) This should already be handled by the auto-scaler implementation, but make sure we catch/record these for auditability
516+
if (desiredTaskCount == currentTaskCount) {
517+
log.warn(
518+
"Skipping scaling for supervisor[%s] for dataSource[%s]: already at desired task count [%d]",
519+
supervisorId,
520+
dataSource,
521+
desiredTaskCount
522+
);
523+
emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "desired capacity reached")
524+
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
525+
return;
526+
}
527+
528+
// 2) Make sure we wait for any pending completion tasks to finish.
529+
// At this point there could be 2 generations of tasks: pending completion tasks (old generation), running tasks (current generation), and (after our scale) pending tasks (new generation).
530+
// We want to avoid killing any old generation tasks preemptively, as that might cause the current generation tasks' offsets to become invalid.
507531
for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
508532
// There are expected to be pending tasks if this scaling is happening on task rollover
509533
if (!list.isEmpty() && !isScalingTasksOnRollover.get()) {
@@ -513,45 +537,59 @@ public void handle()
513537
dataSource,
514538
list
515539
);
516-
if (desiredTaskCount > 0) {
517-
emitter.emit(event.setDimension(
518-
AUTOSCALER_SKIP_REASON_DIMENSION,
519-
"There are tasks pending completion"
520-
)
521-
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
522-
}
540+
emitter.emit(event.setDimension(
541+
AUTOSCALER_SKIP_REASON_DIMENSION,
542+
"There are tasks pending completion"
543+
)
544+
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
523545
return;
524546
}
525547
}
526-
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
548+
549+
// 3) Make sure we are not breaching any scaling cooldown limits.
550+
// Scaling operations are disruptive — scale-down in particular can leave the supervisor
551+
// under-resourced while it recovers from lag induced by the scale event, so callers may
552+
// configure a longer cooldown for scale-down than for scale-up. Both directions are measured against the same
553+
// last-scale timestamp so that a rapid up/down oscillation is still subject to the appropriate cooldown,
554+
// regardless of which direction triggered last.
555+
final ScaleDirection scaleDirection;
556+
final long cooldownMillis;
557+
558+
if (desiredTaskCount > currentTaskCount) {
559+
scaleDirection = ScaleDirection.SCALE_UP;
560+
cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
561+
} else {
562+
scaleDirection = ScaleDirection.SCALE_DOWN;
563+
cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis();
564+
}
565+
566+
if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
527567
log.info(
528-
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%s], active task count is [%s]",
529-
nowTime - dynamicTriggerLastRunTime,
530-
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
568+
"DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]",
569+
nowTime - dynamicTriggerLastScaleRunTime,
570+
scaleDirection,
571+
cooldownMillis,
531572
supervisorId,
532573
dataSource,
533574
desiredTaskCount,
534-
getActiveTaskGroupsCount()
575+
currentTaskCount
535576
);
536577

537-
if (desiredTaskCount > 0) {
538-
emitter.emit(event.setDimension(
539-
AUTOSCALER_SKIP_REASON_DIMENSION,
540-
"minTriggerScaleActionFrequencyMillis not elapsed yet"
541-
)
542-
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
543-
}
578+
emitter.emit(event.setDimension(
579+
AUTOSCALER_SKIP_REASON_DIMENSION,
580+
"Scale cooldown not elapsed yet"
581+
)
582+
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
544583
return;
545584
}
546585

547-
if (desiredTaskCount > 0) {
548-
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
549-
}
586+
// At this point, we can reasonably attempt a scaling action, so emit our required task count
587+
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
550588

551589
boolean allocationSuccess = changeTaskCount(desiredTaskCount);
552590
if (allocationSuccess) {
553591
onSuccessfulScale.run();
554-
dynamicTriggerLastRunTime = nowTime;
592+
dynamicTriggerLastScaleRunTime = nowTime;
555593
}
556594
}
557595
catch (Exception ex) {
@@ -586,8 +624,9 @@ public String getType()
586624
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
587625
*
588626
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
589-
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
590-
* If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
627+
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least the configured
628+
* minScaleUpDelay or minScaleDownDelay (falling back to minTriggerScaleActionFrequencyMillis) before the next
629+
* same-direction 'changeTaskCount'. If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
591630
* @throws InterruptedException
592631
* @throws ExecutionException
593632
*/
@@ -958,7 +997,7 @@ public String getType()
958997
private final boolean useExclusiveStartingSequence;
959998
private boolean listenerRegistered = false;
960999
private long lastRunTime;
961-
private long dynamicTriggerLastRunTime;
1000+
private long dynamicTriggerLastScaleRunTime;
9621001
private int initRetryCounter = 0;
9631002
private volatile DateTime firstRunTime;
9641003
private volatile DateTime earlyStopTime = null;
@@ -1416,7 +1455,7 @@ public void tryInit()
14161455
}
14171456

14181457
public Runnable buildDynamicAllocationTask(
1419-
Callable<Integer> scaleAction,
1458+
ScaleActionTaskCountSupplier scaleAction,
14201459
Runnable onSuccessfulScale,
14211460
ServiceEmitter emitter
14221461
)

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
2828
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
2929
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
30+
import org.joda.time.Duration;
3031

3132
@UnstableApi
3233
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
@@ -37,7 +38,28 @@
3738
public interface AutoScalerConfig
3839
{
3940
boolean getEnableTaskAutoScaler();
41+
42+
/**
43+
* @deprecated Use {@link #getMinScaleUpDelay()} and {@link #getMinScaleDownDelay()} instead.
44+
* This field is retained for backward compatibility and will be removed in a future version.
45+
*/
46+
@Deprecated
4047
long getMinTriggerScaleActionFrequencyMillis();
48+
49+
/**
50+
* Minimum time that must elapse after any scale action before a scale-up is permitted.
51+
* If not explicitly configured, implementations fall back to
52+
* {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility.
53+
*/
54+
Duration getMinScaleUpDelay();
55+
56+
/**
57+
* Minimum time that must elapse after any scale action before a scale-down is permitted.
58+
* If not explicitly configured, implementations fall back to
59+
* {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility.
60+
*/
61+
Duration getMinScaleDownDelay();
62+
4163
int getTaskCountMax();
4264
int getTaskCountMin();
4365
Integer getTaskCountStart();

0 commit comments

Comments
 (0)