Skip to content

Commit 50e1c9b

Browse files
committed
more changes
1 parent 213d73d commit 50e1c9b

7 files changed

Lines changed: 68 additions & 140 deletions

File tree

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ public void handle()
558558
if (desiredTaskCount > currentTaskCount) {
559559
scaleDirection = ScaleDirection.SCALE_UP;
560560
cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
561-
} else {
561+
} else { // desiredTaskCount < currentTaskCount
562562
scaleDirection = ScaleDirection.SCALE_DOWN;
563563
cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis();
564564
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,9 @@
2525
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
2626
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
2727
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
28-
import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection;
2928
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
3029
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
3130
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
32-
import org.apache.druid.java.util.common.DateTimes;
3331
import org.apache.druid.java.util.common.Either;
3432
import org.apache.druid.java.util.common.StringUtils;
3533
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -91,8 +89,6 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
9189
private final WeightedCostFunction costFunction;
9290
private volatile CostMetrics lastKnownMetrics;
9391

94-
private volatile long lastScaleActionTimeMillis = -1;
95-
9692
public CostBasedAutoScaler(
9793
SeekableStreamSupervisor supervisor,
9894
CostBasedAutoScalerConfig config,
@@ -177,48 +173,38 @@ public int computeTaskCountForScaleAction()
177173

178174
int currentTaskCount = supervisor.getIoConfig().getTaskCount();
179175

180-
// 2) If already outsized, scale to the configured boundary to get back to a safe state,
181-
// regardless of optimal task count. Clamp either way so subsequent logic uses a valid
182-
// reference even when scaling is on cooldown.
183176
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
184177
|| currentTaskCount > config.getTaskCountMax();
185178
if (isTaskCountOutOfBounds) {
186179
final int clampedTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount));
187-
final ScaleDirection direction = currentTaskCount < config.getTaskCountMin()
188-
? ScaleDirection.SCALE_UP : ScaleDirection.SCALE_DOWN;
189-
if (isScaleActionAllowed(direction)) {
190-
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
191-
log.info(
192-
"Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].",
193-
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, clampedTaskCount
194-
);
195-
return clampedTaskCount;
196-
}
197-
currentTaskCount = clampedTaskCount;
180+
log.info(
181+
"Task count for supervisor[%s] was out of bounds [%d,%d], recommending scale from [%d] to [%d].",
182+
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, clampedTaskCount
183+
);
184+
return clampedTaskCount;
198185
}
199186

200187
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
188+
if (optimalTaskCount == -1) {
189+
return -1;
190+
}
201191

202-
if (optimalTaskCount > currentTaskCount && isScaleActionAllowed(ScaleDirection.SCALE_UP)) {
203-
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
204-
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, optimalTaskCount);
192+
if (optimalTaskCount > currentTaskCount) {
193+
log.info("Recommending taskCount scale-up for supervisor[%s] from [%d] to [%d].", supervisorId, currentTaskCount, optimalTaskCount);
205194
return optimalTaskCount;
206195
} else if (!config.isScaleDownOnTaskRolloverOnly()
207-
&& optimalTaskCount > 0 // guards against the -1 sentinel
208-
&& optimalTaskCount < currentTaskCount
209-
&& isScaleActionAllowed(ScaleDirection.SCALE_DOWN)) {
210-
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
211-
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, optimalTaskCount);
196+
&& optimalTaskCount < currentTaskCount) {
197+
log.info("Recommending taskCount scale-down for supervisor[%s] from [%d] to [%d].", supervisorId, currentTaskCount, optimalTaskCount);
212198
return optimalTaskCount;
213199
}
214200

215-
// No scaling (optimalTaskCount == -1, cooldown, or rollover-only mode).
216-
log.debug("No scaling required for supervisor[%s]", supervisorId);
217-
if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount == config.getTaskCountMax()) {
201+
// No scaling (optimalTaskCount == currentTaskCount, optimalTaskCount <= min, optimalTaskCount >= max or rollover-only mode).
202+
log.debug("No scaling allowed/required for supervisor[%s]", supervisorId);
203+
if (optimalTaskCount >= config.getTaskCountMax()) {
218204
emitter.emit(getMetricBuilder()
219205
.setDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, "Already at max task count")
220206
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount));
221-
} else if (optimalTaskCount == config.getTaskCountMin() || currentTaskCount == config.getTaskCountMin()) {
207+
} else if (optimalTaskCount <= config.getTaskCountMin()) {
222208
emitter.emit(getMetricBuilder()
223209
.setDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, "Already at min task count")
224210
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount));
@@ -590,24 +576,4 @@ private Either<String, Boolean> validateMetricsForScaling(CostMetrics metrics)
590576
}
591577
}
592578

593-
/**
594-
* Determines if a scale action in the given direction is currently allowed based on the elapsed time
595-
* since the last scale action and the configured delay for that direction.
596-
*/
597-
private boolean isScaleActionAllowed(ScaleDirection direction)
598-
{
599-
if (lastScaleActionTimeMillis < 0) {
600-
return true;
601-
}
602-
603-
final long barrierMillis = direction == ScaleDirection.SCALE_UP
604-
? config.getMinScaleUpDelay().getMillis()
605-
: config.getMinScaleDownDelay().getMillis();
606-
if (barrierMillis <= 0) {
607-
return true;
608-
}
609-
610-
final long elapsedMillis = DateTimes.nowUtc().getMillis() - lastScaleActionTimeMillis;
611-
return elapsedMillis >= barrierMillis;
612-
}
613579
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
6161
private final double idleWeight;
6262
private final boolean useTaskCountBoundaries;
6363
private final int highLagThreshold;
64-
@Nullable private final Duration minScaleUpDelay;
64+
private final Duration minScaleUpDelay;
6565
private final Duration minScaleDownDelay;
6666
private final boolean scaleDownDuringTaskRolloverOnly;
6767

@@ -99,7 +99,7 @@ public CostBasedAutoScalerConfig(
9999
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
100100
this.useTaskCountBoundaries = Configs.valueOrDefault(useTaskCountBoundaries, false);
101101
this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
102-
this.minScaleUpDelay = minScaleUpDelay;
102+
this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS));
103103
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY);
104104
this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
105105

@@ -127,11 +127,8 @@ public CostBasedAutoScalerConfig(
127127

128128
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
129129
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 0");
130-
Preconditions.checkArgument(
131-
this.minScaleUpDelay == null || this.minScaleUpDelay.getMillis() >= 0,
132-
"minScaleUpDelay must be >= 0"
133-
);
134-
Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, "minScaleDownDelay must be >= 0");
130+
Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0, "minScaleUpDelay must be a duration >= 0 millis");
131+
Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, "minScaleDownDelay must be a duration >= 0 millis");
135132
}
136133

137134
/**
@@ -225,14 +222,12 @@ public int getHighLagThreshold()
225222

226223
/**
227224
* Returns the minimum delay before a scale-up action is allowed after any previous scale action.
228-
* Falls back to {@link #getMinScaleDownDelay()} if not explicitly configured.
229225
*/
230226
@Override
231227
@JsonProperty
232-
@Nullable
233228
public Duration getMinScaleUpDelay()
234229
{
235-
return minScaleUpDelay != null ? minScaleUpDelay : minScaleDownDelay;
230+
return minScaleUpDelay;
236231
}
237232

238233
/**

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.fasterxml.jackson.annotation.JsonInclude;
2424
import com.fasterxml.jackson.annotation.JsonProperty;
2525
import com.google.common.base.Preconditions;
26+
import org.apache.druid.common.config.Configs;
2627
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
2728
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
2829
import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
@@ -52,8 +53,8 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
5253
private final int scaleOutStep;
5354
private final boolean enableTaskAutoScaler;
5455
private final long minTriggerScaleActionFrequencyMillis;
55-
@Nullable private final Duration minScaleUpDelay;
56-
@Nullable private final Duration minScaleDownDelay;
56+
private final Duration minScaleUpDelay;
57+
private final Duration minScaleDownDelay;
5758
private final AggregateFunction lagAggregate;
5859
private final Double stopTaskCountRatio;
5960

@@ -110,13 +111,21 @@ public LagBasedAutoScalerConfig(
110111
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
111112
this.minTriggerScaleActionFrequencyMillis =
112113
minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000;
113-
this.minScaleUpDelay = minScaleUpDelay;
114-
this.minScaleDownDelay = minScaleDownDelay;
114+
this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, Duration.millis(this.minTriggerScaleActionFrequencyMillis));
115+
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, Duration.millis(this.minTriggerScaleActionFrequencyMillis));
115116

116117
Preconditions.checkArgument(
117118
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),
118119
"0.0 < stopTaskCountRatio <= 1.0"
119120
);
121+
Preconditions.checkArgument(
122+
this.minScaleUpDelay.getMillis() >= 0,
123+
"minScaleUpDelay must be a duration >= 0 millis"
124+
);
125+
Preconditions.checkArgument(
126+
this.minScaleDownDelay.getMillis() >= 0,
127+
"minScaleDownDelay must be a duration >= 0 millis"
128+
);
120129
this.stopTaskCountRatio = stopTaskCountRatio;
121130
}
122131

@@ -230,18 +239,16 @@ public long getMinTriggerScaleActionFrequencyMillis()
230239

231240
@Override
232241
@JsonProperty
233-
@Nullable
234242
public Duration getMinScaleUpDelay()
235243
{
236-
return minScaleUpDelay != null ? minScaleUpDelay : Duration.millis(minTriggerScaleActionFrequencyMillis);
244+
return minScaleUpDelay;
237245
}
238246

239247
@Override
240248
@JsonProperty
241-
@Nullable
242249
public Duration getMinScaleDownDelay()
243250
{
244-
return minScaleDownDelay != null ? minScaleDownDelay : Duration.millis(minTriggerScaleActionFrequencyMillis);
251+
return minScaleDownDelay;
245252
}
246253

247254
@JsonProperty

indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void testSerde() throws Exception
9595
1,
9696
5,
9797
true,
98-
5000L,
98+
null,
9999
Duration.millis(3000),
100100
Duration.millis(7000),
101101
AggregateFunction.SUM,
@@ -401,7 +401,7 @@ public void testScaleDelayFallback() throws Exception
401401
null,
402402
null,
403403
false,
404-
60000L,
404+
null,
405405
Duration.millis(15000),
406406
Duration.millis(30000),
407407
null,

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ public void testSerdeWithDefaults() throws Exception
9696
Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS, config.getScaleActionPeriodMillis());
9797
Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
9898
Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
99-
// minScaleUpDelay falls back to minScaleDownDelay when not explicitly set
100-
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, config.getMinScaleUpDelay());
99+
// minScaleUpDelay and minScaleDownDelay each have their own independent default
100+
Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), config.getMinScaleUpDelay());
101101
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, config.getMinScaleDownDelay());
102102
Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
103103
Assert.assertNull(config.getTaskCountStart());
@@ -205,17 +205,17 @@ public void testBuilder()
205205
}
206206

207207
@Test
208-
public void testScaleDelayFallback() throws Exception
208+
public void testScaleDelayDefaults() throws Exception
209209
{
210-
// Neither minScaleUpDelay nor minScaleDownDelay set: both fall back to DEFAULT_MIN_SCALE_DELAY
210+
// Neither set: each direction gets its own independent default
211211
CostBasedAutoScalerConfig defaults = CostBasedAutoScalerConfig.builder()
212212
.taskCountMax(10)
213213
.taskCountMin(1)
214214
.build();
215-
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, defaults.getMinScaleUpDelay());
215+
Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), defaults.getMinScaleUpDelay());
216216
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, defaults.getMinScaleDownDelay());
217217

218-
// Only minScaleUpDelay set: getMinScaleUpDelay returns the explicit value; minScaleDownDelay uses its default
218+
// Only minScaleUpDelay set: up uses explicit value, down uses its default
219219
CostBasedAutoScalerConfig upOnly = CostBasedAutoScalerConfig.builder()
220220
.taskCountMax(10)
221221
.taskCountMin(1)
@@ -224,13 +224,13 @@ public void testScaleDelayFallback() throws Exception
224224
Assert.assertEquals(Duration.standardMinutes(5), upOnly.getMinScaleUpDelay());
225225
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, upOnly.getMinScaleDownDelay());
226226

227-
// Only minScaleDownDelay set: getMinScaleUpDelay falls back to minScaleDownDelay
227+
// Only minScaleDownDelay set: down uses explicit value, up uses its own default (does not fall back to down)
228228
CostBasedAutoScalerConfig downOnly = CostBasedAutoScalerConfig.builder()
229229
.taskCountMax(10)
230230
.taskCountMin(1)
231231
.minScaleDownDelay(Duration.standardMinutes(20))
232232
.build();
233-
Assert.assertEquals(Duration.standardMinutes(20), downOnly.getMinScaleUpDelay());
233+
Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), downOnly.getMinScaleUpDelay());
234234
Assert.assertEquals(Duration.standardMinutes(20), downOnly.getMinScaleDownDelay());
235235

236236
// Both set: serde roundtrip preserves values

0 commit comments

Comments
 (0)