Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Seconds;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -59,6 +59,7 @@
* <p>
* Tests the autoscaler's ability to compute optimal task counts based on partition count and cost metrics (lag and idle time).
*/
@SuppressWarnings("resource")
public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
{
private static final String TOPIC = EmbeddedClusterApis.createTestDatasourceName();
Expand Down Expand Up @@ -95,18 +96,11 @@ public void stop()
}
};

// Increase worker capacity to handle more tasks
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
.addProperty("druid.worker.capacity", "60");

overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}")
.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced")
.addProperty("druid.manager.segments.pollDuration", "PT0.1s");

coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced");
.addProperty("druid.worker.capacity", "100");

cluster.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(120)
.useDefaultTimeoutForLatchableEmitter(60)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
Expand Down Expand Up @@ -162,7 +156,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
}

@Test
@Timeout(125)
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
{
final String superId = dataSource + "_super_scaleup";
Expand Down Expand Up @@ -215,6 +208,63 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}

@Test
void test_scaleDownDuringTaskRollover()
{
final String superId = dataSource + "_super";
final int initialTaskCount = 10;

final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
.builder()
.enableTaskAutoScaler(true)
.taskCountMin(1)
.taskCountMax(10)
.taskCountStart(initialTaskCount)
.scaleActionPeriodMillis(2000)
.minTriggerScaleActionFrequencyMillis(2000)
// High idle weight ensures scale-down when tasks are mostly idle (little data to process)
.lagWeight(0.1)
.idleWeight(0.9)
.build();

final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);

// Submit the supervisor
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));

// Wait for at least one task running for the datasource managed by the supervisor.
overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.DATASOURCE, dataSource));

// Wait for autoscaler to emit metric indicating scale-down, it should be just less than the current task count.
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
.hasValueMatching(Matchers.lessThan((long) initialTaskCount)));

// Wait for tasks to complete (first rollover)
overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/action/success/count"));

// Wait for the task running for the datasource managed by a supervisor.
overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.DATASOURCE, dataSource));

// After rollover, verify that the running task count has decreased
// The autoscaler should have recommended fewer tasks due to high idle time
final int postRolloverRunningTasks = cluster.callApi().getTaskCount("running", dataSource);

Assertions.assertTrue(
Comment on lines +253 to +255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this verification.

Can we update the existing scale down test to verify that scaling was actually skipped in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is actually should not be skipped :)

postRolloverRunningTasks < initialTaskCount,
StringUtils.format(
"Expected running task count to decrease after rollover. Initial: %d, After rollover: %d",
initialTaskCount,
postRolloverRunningTasks
)
);

// Suspend the supervisor to clean up
cluster.callApi().postSupervisor(spec.createSuspendedSpec());
}

private void produceRecordsToKafka(int recordCount, int iterations)
{
int recordCountPerSlice = recordCount / iterations;
Expand Down Expand Up @@ -258,7 +308,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
ioConfig -> ioConfig
.withConsumerProperties(kafkaServer.consumerProperties())
.withTaskCount(taskCount)
.withTaskDuration(Seconds.THREE.toPeriod())
.withTaskDuration(Period.seconds(7))
.withAutoScalerConfig(autoScalerConfig)
)
.withId(supervisorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
SupervisorTaskAutoScaler autoscaler;
try {
supervisor = spec.createSupervisor();
autoscaler = spec.createAutoscaler(supervisor);
autoscaler = supervisor.createAutoscaler();

supervisor.start();
if (autoscaler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
Expand Down Expand Up @@ -885,7 +886,7 @@ public String getType()

/**
* Tag for identifying this supervisor in thread-names, listeners, etc. tag = (type + supervisorId).
*/
*/
private final String supervisorTag;
private final TaskInfoProvider taskInfoProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
Expand Down Expand Up @@ -926,6 +927,12 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;

/**
* Reference to the autoscaler, used for rollover-based scale-down decisions.
* Wired by {@link SupervisorManager} after supervisor creation.
*/
private volatile SupervisorTaskAutoScaler taskAutoScaler;

// snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
Expand Down Expand Up @@ -1306,7 +1313,7 @@ public void tryInit()
if (log.isDebugEnabled()) {
log.debug(
"Handled notice[%s] from notices queue in [%d] ms, "
+ "current notices queue size [%d] for supervisor[%s] for datasource[%s].",
+ "current notices queue size [%d] for supervisor[%s] for datasource[%s].",
noticeType, noticeHandleTime.millisElapsed(), getNoticesQueueSize(), supervisorId, dataSource
);
}
Expand Down Expand Up @@ -1677,6 +1684,13 @@ private List<ParseExceptionReport> getCurrentParseErrors()
return limitedParseErrors;
}

@Override
public SupervisorTaskAutoScaler createAutoscaler()
{
this.taskAutoScaler = spec.createAutoscaler(this);
return this.taskAutoScaler;
}

@VisibleForTesting
public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
Expand Down Expand Up @@ -3338,6 +3352,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException

final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate comment.

activelyReadingTaskGroups.entrySet().stream().sorted(
Comparator.comparingLong(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert all the formatting changes in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

taskGroupEntry -> computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis()
Expand Down Expand Up @@ -3376,7 +3391,6 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
}
}
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please retain the newline for clean separation of code.

List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> results = coalesceAndAwait(futures);
for (int j = 0; j < results.size(); j++) {
Integer groupId = futureGroupIds.get(j);
Expand Down Expand Up @@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that it has been handled
activelyReadingTaskGroups.remove(groupId);
}

if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this entire new logic into a new private method and add a short javadoc to it.

int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0 && rolloverTaskCount < ioConfig.getTaskCount()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also allow scale up on task rollover?

log.info("Cost-based autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.info("Cost-based autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
log.info("Autoscaler recommends scaling down to [%d] tasks during rollover.", rolloverTaskCount);

changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during the rollover.
clearAllocationInfo();
}
}
}

private DateTime computeEarliestTaskStartTime(TaskGroup group)
Expand Down Expand Up @@ -4330,6 +4355,15 @@ public SeekableStreamSupervisorIOConfig getIoConfig()
return ioConfig;
}

/**
* Sets the autoscaler reference for rollover-based scale-down decisions.
* Called by {@link SupervisorManager} after supervisor creation.
*/
public void setTaskAutoScaler(@Nullable SupervisorTaskAutoScaler taskAutoScaler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

{
this.taskAutoScaler = taskAutoScaler;
}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
Expand Down Expand Up @@ -4688,9 +4722,9 @@ protected void emitLag()

// Try emitting lag even with stale metrics provided that none of the partitions has negative lag
final long staleMillis = sequenceLastUpdated == null
? 0
: DateTimes.nowUtc().getMillis()
- (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
? 0
: DateTimes.nowUtc().getMillis()
- (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) {
// Log at most once every twenty supervisor runs to reduce noise in the logs
if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) {
Expand Down
Loading
Loading