Skip to content

Conversation

@jtuglu1
Copy link
Contributor

@jtuglu1 jtuglu1 commented Dec 18, 2025

Description

Draft. Clone of #18729 but merged into current runner per @kfaraz request.

I've seen on the giant lock in HttpRemoteTaskRunner cause severe performance degradation under heavy load(200-500ms per acquisition with 1000s of activeTasks can slow down the startPendingTasks loop in TaskQueue). This leads to scheduling delays, which leads to more lag, which auto-scales more tasks, ..., etc. The runner also has a few (un)documented races abundant in the code. This overhead also slows down query tasks under load (e.g. MSQE and others) which utilize the scheduler for execution.

I'm attempting a rewrite of this class to optimize for throughput and safety.

Apart from the performance improvements/bug fixes, this will also include some new features:

  • Simpler code. The old task runner had old, legacy ZK references dangling around as well as a pretty complicated scheduling loop.
  • Task priority-based scheduling onto workers (e.g. prioritize realtime tasks for scheduling), follows a simple priority-based fair-queueing policy.

I would ultimately like to make this the default HttpRemoteTaskRunner and have it run in all tests/production clusters, etc. as I think that would help catch more bugs/issues.

Performance Testing

Test results thus far have shown ~100-300ms speed up per task runner operation (add(), etc.). Over 1000s of tasks, this amounts to minutes of delay saved.

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jtuglu1 jtuglu1 changed the title Http remote task runner revamp v2 HttpRemoteTaskRunner enhancements Dec 18, 2025
@jtuglu1 jtuglu1 force-pushed the http-remote-task-runner-revamp-v2 branch from d6dc9a2 to 6cc5303 Compare December 18, 2025 03:28
@kfaraz
Copy link
Contributor

kfaraz commented Dec 18, 2025

Thanks for creating this PR, @jtuglu1 ! The patch seems much simpler now.
I should be able to complete an initial review today.

@jtuglu1 jtuglu1 force-pushed the http-remote-task-runner-revamp-v2 branch from 6cc5303 to 0deca3a Compare December 18, 2025 03:38
if (taskItem.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
taskItem.revertStateFromPendingWorkerAssignToPending();
}
if (!runTaskOnWorker(taskId, workerHost)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick note: @kfaraz I was tempted to place this inside the above .compute() to avoid the race, but given it could be holding the lock for a while (and concurrent hashmap locks a range of keys, not necessarily just the key), I opted to unlock (similar to what the existing runner does). Additionally, if something like a removeWorker callback occurs, I'd prefer to update the state right away, rather than wait for an assignment timeout, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Could you add a 1-line comment on why it is okay to do this.

throw new IAE("Invalid host and port: [%s]", colonIndex);
}
workerHost = hostAndPort.substring(0, colonIndex);
workerPort = Integer.parseInt(hostAndPort.substring(colonIndex + 1));

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note test

Potential uncaught 'java.lang.NumberFormatException'.
@jtuglu1 jtuglu1 requested a review from kfaraz December 18, 2025 10:12
@jtuglu1 jtuglu1 marked this pull request as ready for review December 18, 2025 17:12
@jtuglu1 jtuglu1 added this to the 36.0.0 milestone Dec 18, 2025
@jtuglu1 jtuglu1 force-pushed the http-remote-task-runner-revamp-v2 branch from 0deca3a to 006a079 Compare December 18, 2025 20:56
@jtuglu1 jtuglu1 force-pushed the http-remote-task-runner-revamp-v2 branch from 006a079 to f1b210a Compare December 21, 2025 19:45
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a partial review, will try to finish going through the rest of the changes today.

* ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths from ZK which are created by the
* workers to support deprecated RemoteTaskRunner. So a method "scheduleCompletedTaskStatusCleanupFromZk()" is added'
* which should be removed in the release that removes RemoteTaskRunner legacy ZK updation WorkerTaskMonitor class.
* HTTP-based distributed task scheduler that manages assignment of tasks to slots on workers (MiddleManagers).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* HTTP-based distributed task scheduler that manages assignment of tasks to slots on workers (MiddleManagers).
* HTTP-based distributed task scheduler that manages assignment of tasks to slots on workers (MiddleManagers or Indexers).


log.debug(
"Worker[%s] wrote [%s] status for task [%s] on [%s]",
"Worker[%s] wrote status[%s] for task[%s] on [%s]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Worker[%s] wrote status[%s] for task[%s] on [%s]",
"Worker[%s] wrote status[%s] for task[%s] on location[%s]",

private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final HttpClient httpClient;
private final ObjectMapper smileMapper;
private final ObjectMapper objectMapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the factory class, we are still passing @Smile annotated mapper here. Let's retain the original name of this field to avoid ambiguity with the other mapper (typically injected without an annotation or with @Json).

);
}

private boolean runTaskOnWorker(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a javadoc to this method, which clarifies what the return value/excpetion in this method means.

  • true: successfully assigned
  • false: failed to assign, retry later
  • exception: failed to assign, mark task as completed

I wonder if this tri-state should be captured in the return value itself rather than throwing varying exceptions like ISE, IllegalState (using Preconditions). You could return a value which has a success boolean, a retry boolean and a String failureReason, which can be used for alerting/task completion message.

Comment on lines +123 to +132
* Task state machine is as follows:
* 1. PENDING – Task has been submitted to the scheduler.
* 2. PENDING_ASSIGN – Task has been assignment to a worker, but has not started running yet.
* 3. EXECUTING – Task is running on a worker.
* 4. COMPLETE – Task has completed (success/fail).
* Worker state machine is as follows:
* 1. READY – Worker is online and ready to receive new tasks.
* 2. PENDING_ASSIGN – A task has been submitted to this worker, but has not started running yet.
* 3. BLACKLISTED – Worker has too many failed tasks.
* 4. LAZY – Worker has no more task running and been marked as reapable by the worker auto-scaler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed here and should be moved to the respective enum values.
Just link them here using @see WorkerHolder.State and @see HttpRemoteTaskRunnerWorkItem.State

Comment on lines +400 to +401
"The state of the new task complete event is different from its last known state. "
+ "New state[%s], last known state[%s]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The state of the new task complete event is different from its last known state. "
+ "New state[%s], last known state[%s]",
"Ignoring update to status[%s] as task[%s] has already completed with status[%s].",

Comment on lines +431 to +436
log.info(
"Worker[%s] completed task[%s] with status[%s]",
workerHolder.getWorker().getHost(),
taskStatus.getId(),
taskStatus.getStatusCode()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can be 1-lined, I feel.

Suggested change
log.info(
"Worker[%s] completed task[%s] with status[%s]",
workerHolder.getWorker().getHost(),
taskStatus.getId(),
taskStatus.getStatusCode()
);
log.info("Worker[%s] completed task[%s] with status[%s].", workerHost, taskId, taskStatus);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should we keep this log line outside the compute? It doesn't need to be inside the compute block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think that would help moderately with performance, since we're using key-based locking it's not likely to help that much. I can do it, but also nice way (in the logs) to validate that nothing else was modifying this simulatenously.

workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
blacklistWorkerIfNeeded(taskStatus, workerHolder);
} else {
log.warn("Could not find worker[%s]", workerHost);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a warning? It is possible that the task finished on the worker and then the worker went away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not unexpected behavior, but it's certainly not normal behavior (these are MMs, not Peons). I can change to info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Could you also update the log message to include more info e.g. Could not find worker[%s] while marking task[%s] as complete?

if (workerHolder != null) {
blacklistWorkerIfNeeded(taskStatus, workerHolder);
if (workerHost != null) {
synchronized (workerStateLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like we need to acquire this lock.

Copy link
Contributor Author

@jtuglu1 jtuglu1 Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use the workerStateLock as a global synchronization point for the worker state. This ensures that scheduling loop doesn't accidentally schedule a task on a worker that is being killed, just been scheduled, etc. It also helps notify sleeping scheduling threads that potential slots are ready due to a worker state change. In this case, a worker has completed a task; this frees up a slot so we should notify interested parties.

Comment on lines +505 to +513
workers.forEach((workerHost, workerEntry) -> {
try {
workerEntry.waitForInitialization();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
log.info("Workers have synced state successfully.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think changing from the regular for loop to a forEach makes much of a difference. It is still possible for new items to get added to the workers map after we started the forEach iteration.

Copy link
Contributor Author

@jtuglu1 jtuglu1 Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, with this one, we ensure that acquiring a key reference (while iterating through it instead of keySet), we also get a synchronized scope to the value to do what we want. I found this to be a better model that looping through a keySet, and then indexing with those (potentially stale) keys.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

Can that ever cause a deadlock?

Say a worker is yet to be initialized and a thread is waiting on waitForInitialization() while holding the lock for the corresponding key in the map. Will any other thread be able to mark this worker as "initialized" without acquiring that lock?

I found this to be a better model that looping through a keySet, and then indexing with those (potentially stale) keys.

Hmm, I was hoping we use .keySet() followed by .compute() itself and move the .compute() part into a separate method (similar to how TaskQueue does it). We could use that method wherever we add/update the entry for a worker so that we always perform actions using a compute() ensuring thread safety.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, .forEach() doesn't seem to be mutually exclusive with .compute() on the same key.
Could you please double check this?

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished going through the bulk of the changes.

On the whole, the patch looks good. I have these major suggestions:

  • For the time being, it would be cleaner to use workerStateLock consistently whenever accessing the workers map. We can try to improve this later.
  • Avoid use of .forEach() and use .compute() instead, preferably encasing it in an addOrUpdate method similar to TaskQueue.
  • Do not perform any heavy operation like metadata store access, metric emission, listener notification, etc. inside the .compute() lambda.
  • Avoid throwing exceptions inside the lambda, if they are just to be caught back in the same method/loop. Instead, log an error and continue with the loop.
  • Remove the priority scheduling changes for now.
  • Reduce debug logging.

cancelWorkerCleanup(worker.getHost());

// There cannot be any new tasks assigned to this worker as the entry has not been published yet.
// That being said, there can be callbacks in taskAddedOrUpdated() where some task suddenly begins running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// That being said, there can be callbacks in taskAddedOrUpdated() where some task suddenly begins running
// However, there can be callbacks in taskAddedOrUpdated() where some task suddenly begins running

log.info("Adding worker[%s]", worker.getHost());
synchronized (workerStateLock) {
workers.compute(
worker.getHost(), (key, workerEntry) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
worker.getHost(), (key, workerEntry) -> {
worker.getHost(),
(key, workerEntry) -> {

// It might be a worker that existed before, temporarily went away and came back. We might have a set of
// tasks that we think are running on this worker. Provide that information to WorkerHolder that
// manages the task syncing with that worker.
tasks.forEach((taskId, taskEntry) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating through all tasks here seems problematic. The original code was doing this too, but now with the introduction of two separate locks and two separate concurrent hash maps, we need to take extra care that we don't end up in a deadlock.

Maybe build the expected announcements before acquiring the workerStateLock, since the announcements deal with only task state and not worker state.


// All discovered workers, "host:port" -> WorkerHolder
private final ConcurrentMap<String, WorkerHolder> workers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, WorkerHolder> workers = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be @GuardedBy("workerStateLock") since it is always meant to be accessed within that lock.

tasksToFail.add(e.getValue());
}
final Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
tasks.forEach((taskId, taskEntry) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that .forEach() since it is not mutually exclusive with .compute() and essentially just provides iteration over a snapshot. But I suppose it is okay here?

setStateUnconditionally(state);
}

public void revertStateFromPendingWorkerAssignToPending()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we don't need this anymore since we simply add the task item back to the queue, right?

}
if (!runTaskOnWorker(taskId, workerHost)) {
log.warn("Failed to assign task[%s] to worker[%s]. Sending to back of queue", taskId, workerHost);
pendingTasks.put(taskItem.withFreshSequenceNumber());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task entry in the tasks map would now have state PENDING_WORKER_ASSIGN. Do we need to revert it back to PENDING (same as old code)?

tasks.compute(
taskId,
(key, taskEntry) -> {
if (taskEntry == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this entire lambda into a new method?

(key, taskEntry) -> {
if (taskEntry == null) {
// Try to find information about it in the TaskStorage
Optional<TaskStatus> knownStatusInStorage = taskStorage.getStatus(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe perform this before the .compute()?


final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, taskEntry.getTask());
emitter.emit(metricBuilder.setMetric(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do metric emission, listener notification, etc. outside the .compute() block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants