Skip to content

Conversation

@Fly-Style
Copy link
Contributor

@Fly-Style Fly-Style commented Dec 19, 2025

As a follow-up PR for #18819, the patch fixes temporal behaviour when scale down happens in the same manner, as scaleup, by injecting the new task count calculation logic during the task rollover time,

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow up, @Fly-Style !
Left some initial drive-by comments. Will do a more thorough review later today/tomorrow.

@SuppressWarnings("resource")
@Test
@Timeout(300)
void test_scaleDownDuringTaskRollover()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this test not be moved to the CostBasedAutoScalerIntegrationTest itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to make it separate, because we're testing abstract autoscaler actions during the task rollover, no specifically the cost-based one. We might create lets say CPU-based autoscaler later with the requirement to scale-down during the rollover.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we're testing abstract autoscaler actions during the task rollover, no specifically the cost-based one.

I am not sure if this is entirely true, since cost-based auto-scaler is the only one that supports task count change on rollover right now.

The new test should be in the existing CostBasedAutoScalerIntegrationTest since it uses the same cluster setup (AFAICT) and verifies an important aspect of the cost-based auto-scaler.

In the future, when we add more auto-scalers, we can add separate tests for them.


int optimalTaskCount = -1;
double optimalCost = Double.POSITIVE_INFINITY;
Tuple3<Double, Double, Double> optimalCost = new Tuple3<>(Double.POSITIVE_INFINITY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use a dedicated class than a Tuple or Pair.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @return optimal task count for scale-up, or -1 if no scaling action needed
*/
public int computeOptimalTaskCount(CostMetrics metrics)
public int computeOptimalTaskCount(CostMetrics metrics, CostComputeMode costComputeMode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of keeping a cost compute mode, please keep two separate methods that may have some internal common implementation. Methods may be named something like:
computeOptimalTaskCount and computeOptimalTaskCountOnRollover.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 604 to 609
log.info(
"Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].",
desiredActiveTaskCount,
supervisorId,
dataSource
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: @Fly-Style , could we please revert all the formatting changes from this PR, to keep the focus on the actual changes?

I think we can leave out the formatting changes for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow up, @Fly-Style !
Left some initial drive-by comments. Will do a more thorough review later today/tomorrow.

@Fly-Style Fly-Style requested a review from kfaraz December 22, 2025 10:32
@Fly-Style Fly-Style marked this pull request as ready for review December 22, 2025 10:46
* Holds the result of a cost computation from {@link WeightedCostFunction#computeCost}.
* All costs are measured in seconds.
*/
public class CostResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this.

autoscaler = spec.createAutoscaler(supervisor);

// Wire autoscaler back to supervisor for rollover-based scale-down
if (supervisor instanceof SeekableStreamSupervisor && autoscaler != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to first create the auto-scaler and then inject it back into the supervisor.

How about we add a createTaskAutoScaler() method on Supervisor interface itself.
Internally, this method will simply call spec.createAutoscaler(this) and will initialize its own auto-scaler field if needed.

final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
activelyReadingTaskGroups.entrySet().stream().sorted(
Comparator.comparingLong(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert all the formatting changes in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 158 to 171
/**
* Tests that scale down happen during task rollover via checkTaskDuration().
*
* <p>Test flow:</p>
* <ol>
* <li>Start supervisor with 10 tasks and 50 partitions, minimal data (500 records)</li>
* <li>Wait for initial tasks to start running</li>
* <li>Wait for the first task rollover to complete (task duration is 8 seconds)</li>
* <li>Verify that after rollover, fewer tasks are running due to cost-based autoscaler (no ingestion at all)</li>
* </ol>
*
* <p>Scale down during rollover is triggered in {@code SeekableStreamSupervisor.checkTaskDuration()}
* when all task groups have rolled over and the autoscaler recommends a lower task count.</p>
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should omit the javadoc. The test itself should be readable enough to follow through the details.

.withConsumerProperties(kafkaServer.consumerProperties())
.withTaskCount(taskCount)
.withTaskDuration(Seconds.THREE.toPeriod())
.withTaskDuration(Seconds.parseSeconds("PT7S").toPeriod())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.withTaskDuration(Seconds.parseSeconds("PT7S").toPeriod())
.withTaskDuration(Period.seconds(7))

Comment on lines +269 to +271
final int postRolloverRunningTasks = cluster.callApi().getTaskCount("running", dataSource);

Assertions.assertTrue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this verification.

Can we update the existing scale down test to verify that scaling was actually skipped in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is actually should not be skipped :)

// No-op.
}

private CostMetrics collectMetrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we retain the original order of methods? It might help clean up the patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I prefer to keep it changed, because it follows the correct order.

* @return optimal task count for scale-up, or -1 if no scaling action needed
*/
public int computeOptimalTaskCount(CostMetrics metrics)
int computeOptimalTaskCount(CostMetrics metrics, CostComputeMode costComputeMode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rather than using a ComputeMode or even a boolean flag, it would be cleaner to just add two new methods, both of which call the computeOptimalTaskCount method.

public int computeTaskCountOnRollover(int currentTaskCount)
{
    // Perform both scale downs and scale ups
    return computeOptimalTaskCount();
}

public int computeTaskCountForScaleAction()
{
    final CostMetrics metrics = collectMetrics();
    final int currentTaskCount = metrics.currentTaskCount();
    final int optimalTaskCount = computeOptimalTaskCount(metrics);

    // Perform only scale up actions
    return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1;
}

This would significantly simplify the diff and clarify the intent better.

@Fly-Style Fly-Style requested a review from kfaraz December 23, 2025 12:57
@Fly-Style Fly-Style force-pushed the cost-autoscaler-task-rollout branch from 01f5fee to 2719a17 Compare December 23, 2025 13:07
@Fly-Style Fly-Style force-pushed the cost-autoscaler-task-rollout branch from 2719a17 to 5936fd0 Compare December 23, 2025 13:08
This reverts commit 5936fd0.
This reverts commit 7788e38.
Signed-off-by: Sasha Syrotenko <[email protected]>
@Fly-Style Fly-Style force-pushed the cost-autoscaler-task-rollout branch from 6f3ce38 to 971f978 Compare December 23, 2025 13:17
@Fly-Style Fly-Style force-pushed the cost-autoscaler-task-rollout branch from 971f978 to fd654e2 Compare December 23, 2025 13:18
@Fly-Style Fly-Style force-pushed the cost-autoscaler-task-rollout branch from 065c5fa to 833cb88 Compare December 23, 2025 20:57

final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate comment.

}
}
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please retain the newline for clean separation of code.

if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0 && rolloverTaskCount < ioConfig.getTaskCount()) {
log.info("Cost-based autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.info("Cost-based autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
log.info("Autoscaler recommends scaling down to [%d] tasks during rollover.", rolloverTaskCount);


if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0 && rolloverTaskCount < ioConfig.getTaskCount()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also allow scale up on task rollover?

* Sets the autoscaler reference for rollover-based scale-down decisions.
* Called by {@link SupervisorManager} after supervisor creation.
*/
public void setTaskAutoScaler(@Nullable SupervisorTaskAutoScaler taskAutoScaler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

metrics.getPollIdleRatio()
);


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra newline?

Comment on lines +90 to +93
default SupervisorTaskAutoScaler createAutoscaler()
{
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little untidy but the default impl should do the same thing that the existing impl does, so that we do not break extensions that use auto-scalers.

Suggested change
default SupervisorTaskAutoScaler createAutoscaler()
{
return null;
}
default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
{
return spec.createAutoscaler(this);
}

final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
final double movingAvgRate = extractMovingAverage(taskStats, DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
final double pollIdleRatio = extractPollIdleRatio(taskStats);
return computeOptimalTaskCount(collectMetrics());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if for this method we shouldn't just reuse the metrics collected in the last cycle.
Metrics collection may be slow since the supervisor might need to contact all the running tasks.
This would slow down the task rollover process causing ingestion lag.

activelyReadingTaskGroups.remove(groupId);
}

if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this entire new logic into a new private method and add a short javadoc to it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants