Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/ingestion/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle configuration enabled:
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"minScaleUpDelay": "PT10M",
"minScaleDownDelay": "PT10M",
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
Expand Down
17 changes: 11 additions & 6 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ The following table outlines the configuration properties for `autoScalerConfig`
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka partitions or Kinesis shards, Druid sets the maximum number of reading tasks to the number of Kafka partitions or Kinesis shards and ignores `taskCountMax`.|Yes||
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|Yes||
|`taskCountStart`|Optional config to specify the number of ingestion tasks to start with. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|No|`taskCount` or `taskCountMin`|
|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two scale actions.| No|600000|
|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` and `minScaleDownDelay` instead. Minimum time interval in milliseconds between scale actions, used as the fallback when the Duration-based fields are not set.|No|600000|
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`|
|`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in steady state to be proportional to the number of tasks currently running.|No||

Expand Down Expand Up @@ -161,7 +163,8 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"minScaleUpDelay": "PT10M",
"minScaleDownDelay": "PT10M",
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
Expand Down Expand Up @@ -210,10 +213,11 @@ The following table outlines the configuration properties related to the `costBa
|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 |
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` |
|`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1|
|`minScaleDownDelay`|Minimum duration between successful scale actions, specified as an ISO-8601 duration string.|No|`PT30M`|
|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`|
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`|

The following example shows a supervisor spec with `lagBased` autoscaler:
The following example shows a supervisor spec with `costBased` autoscaler:

<details>
<summary>Click to view the example</summary>
Expand All @@ -227,9 +231,10 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
"autoScalerStrategy": "costBased",
"taskCountMin": 1,
"taskCountMax": 10,
"minTriggerScaleActionFrequencyMillis": 600000,
"minScaleUpDelay": "PT10M",
"minScaleDownDelay": "PT30M",
"lagWeight": 0.1,
"idleWeight": 0.9,
"idleWeight": 0.9
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
autoScalerConfig.put("minScaleUpDelay", "PT20M");
autoScalerConfig.put("minScaleDownDelay", "PT20M");

final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("bootstrap.servers", "localhost:8082");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
Expand Down Expand Up @@ -499,11 +500,34 @@ public void handle()
supervisorId,
dataSource
);
final Integer desiredTaskCount = computeDesiredTaskCount.call();
final int desiredTaskCount = computeDesiredTaskCount.call();
final int currentTaskCount = getCurrentTaskCount();

if (desiredTaskCount <= 0) {
return;
}

ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());

// 1) This should already be handled by the auto-scaler implementation, but make sure we catch/record these for auditability
if (desiredTaskCount == currentTaskCount) {
log.warn(
"Skipping scaling for supervisor[%s] for dataSource[%s]: already at desired task count [%d]",
supervisorId,
dataSource,
desiredTaskCount
);
emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "desired capacity reached")
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
return;
}

// 2) Make sure we wait for any pending completion tasks to finish.
// At this point there could be 3 generations of tasks: pending completion tasks (old generation), running tasks (current generation), and (after our scale) pending tasks (new generation).
// We want to avoid killing any old generation tasks preemptively, as that might cause the current generation tasks' offsets to become invalid.
for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
// There are expected to be pending tasks if this scaling is happening on task rollover
if (!list.isEmpty() && !isScalingTasksOnRollover.get()) {
Expand All @@ -513,45 +537,59 @@ public void handle()
dataSource,
list
);
if (desiredTaskCount > 0) {
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"There are tasks pending completion"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"There are tasks pending completion"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
return;
}
}
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {

// 3) Make sure we are not breaching any scaling cooldown limits.
// Scaling operations are disruptive — scale-down in particular can leave the supervisor
// under-resourced while it recovers from lag induced by the scale event, so callers may
// configure a longer cooldown for scale-down than for scale-up. Both directions are measured against the same
// last-scale timestamp so that a rapid up/down oscillation is still subject to the appropriate cooldown,
// regardless of which direction triggered last.
final ScaleDirection scaleDirection;
final long cooldownMillis;

if (desiredTaskCount > currentTaskCount) {
scaleDirection = ScaleDirection.SCALE_UP;
cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
} else { // desiredTaskCount < currentTaskCount
scaleDirection = ScaleDirection.SCALE_DOWN;
cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis();
}

if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%s], active task count is [%s]",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
"DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]",
nowTime - dynamicTriggerLastScaleRunTime,
scaleDirection,
cooldownMillis,
supervisorId,
dataSource,
desiredTaskCount,
getActiveTaskGroupsCount()
currentTaskCount
);

if (desiredTaskCount > 0) {
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"minTriggerScaleActionFrequencyMillis not elapsed yet"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"Scale cooldown not elapsed yet"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
return;
}

if (desiredTaskCount > 0) {
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
// At this point, we can reasonably attempt a scaling action, so emit our required task count
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));

boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
onSuccessfulScale.run();
dynamicTriggerLastRunTime = nowTime;
dynamicTriggerLastScaleRunTime = nowTime;
}
}
catch (Exception ex) {
Expand Down Expand Up @@ -586,8 +624,9 @@ public String getType()
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
*
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
* If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least the configured
* 'minScaleUpDelay' or 'minScaleDownDelay' (whichever matches the direction of the next scale) before the
* next 'changeTaskCount'. If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
* @throws InterruptedException
* @throws ExecutionException
*/
Expand Down Expand Up @@ -958,7 +997,7 @@ public String getType()
private final boolean useExclusiveStartingSequence;
private boolean listenerRegistered = false;
private long lastRunTime;
private long dynamicTriggerLastRunTime;
private long dynamicTriggerLastScaleRunTime;
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;
Expand Down Expand Up @@ -1424,6 +1463,16 @@ public Runnable buildDynamicAllocationTask(
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter));
}

@VisibleForTesting
void handleDynamicAllocationTasksNotice(
Callable<Integer> scaleAction,
Runnable onSuccessfulScale,
ServiceEmitter emitter
)
{
new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter).handle();
}

private Runnable buildRunTask()
{
return () -> addNotice(new RunNotice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;

@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
Expand All @@ -37,7 +38,28 @@
public interface AutoScalerConfig
{
boolean getEnableTaskAutoScaler();

/**
* @deprecated Use {@link #getMinScaleUpDelay()} and {@link #getMinScaleDownDelay()} instead.
* This field is retained for backward compatibility and will be removed in a future version.
*/
@Deprecated
long getMinTriggerScaleActionFrequencyMillis();

/**
* Minimum time that must elapse after any scale action before a scale-up is permitted.
* If not explicitly configured, implementations fall back to
* {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility.
*/
Duration getMinScaleUpDelay();

/**
* Minimum time that must elapse after any scale action before a scale-down is permitted.
* If not explicitly configured, implementations fall back to
* {@link #getMinTriggerScaleActionFrequencyMillis()} for backward compatibility.
*/
Duration getMinScaleDownDelay();

int getTaskCountMax();
int getTaskCountMin();
Integer getTaskCountStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -90,8 +89,6 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
private final WeightedCostFunction costFunction;
private volatile CostMetrics lastKnownMetrics;

private volatile long lastScaleActionTimeMillis = -1;

public CostBasedAutoScaler(
SeekableStreamSupervisor supervisor,
CostBasedAutoScalerConfig config,
Expand Down Expand Up @@ -189,21 +186,17 @@ public int computeTaskCountForScaleAction()

// If task count is out of bounds, scale to the configured boundary
// regardless of optimal task count, to get back to a safe state.
if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
if (isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].",
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount);
} else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
} else if (optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& isScaleActionAllowed()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount);
} else {
taskCount = -1;
Expand Down Expand Up @@ -594,22 +587,4 @@ private Either<String, Boolean> validateMetricsForScaling(CostMetrics metrics)
}
}

/**
* Determines if a scale action is currently allowed based on the elapsed time
* since the last scale action and the configured minimum scale-down delay.
*/
private boolean isScaleActionAllowed()
{
if (lastScaleActionTimeMillis < 0) {
return true;
}

final long barrierMillis = config.getMinScaleDownDelay().getMillis();
if (barrierMillis <= 0) {
return true;
}

final long elapsedMillis = DateTimes.nowUtc().getMillis() - lastScaleActionTimeMillis;
return elapsedMillis >= barrierMillis;
}
}
Loading
Loading