Skip to content

Commit f5ec8be

Browse files
authored
perf: Submit tasks to TaskRunner order by task priority (#19203)
Ensure active tasks are (re)-submitted to TaskRunner in order of priority. With the way things are currently structured, TaskQueue and TaskRunner both introduce their own delays to the scheduling of a task as they are both queues in their own ways. This patch attempts to minimize the HOL-blocking delay introduced by the TaskQueue and #18851 will help reduce the slowness on the TaskRunner. This helps enormously for large task volume cases (O(5k) tasks+) where lots of low-priority tasks are hitting the queue (think large compaction or batch volume) while realtime indexing tasks are being submitted. This allows the higher-priority realtime tasks to "jump" the line in submission to the runner (which is still ultimately FIFO relatively speaking), and not pay the O(XX) seconds of time wasted waiting for other lower-priority tasks to be submitted to the queue. Notably, this does do a sort every time startPendingTasksOnRunner is called, however, given the activeTasks should be on the order of O(10k) tasks and the comparator is comparing integers, this should be relatively cheap (empirically, this has never shown up in flamegraph as TaskQueue is bound by other things). When #18851 is merged (and priority based running is introduced) this may have less of an effect. Storing activeTasks in an ordered container might be nice, but given this is a single use-case which requires a sorted order (and the maximum container size is tolerable) I opted to go for this approach.
1 parent b2eed67 commit f5ec8be

2 files changed

Lines changed: 77 additions & 2 deletions

File tree

indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.joda.time.Duration;
7575

7676
import java.util.Collection;
77+
import java.util.Comparator;
7778
import java.util.HashMap;
7879
import java.util.HashSet;
7980
import java.util.List;
@@ -417,8 +418,14 @@ private void startPendingTasksOnRunner()
417418
log.info("Notified task runner to clean up [%,d] tasks with IDs[%s].", unknownTaskIds.size(), unknownTaskIds);
418419

419420
// Attain futures for all active tasks (assuming they are ready to run).
420-
// Copy tasks list, as notifyStatus may modify it.
421-
for (final String queuedTaskId : List.copyOf(activeTasks.keySet())) {
421+
// Sort by priority (highest first) so that higher-priority tasks are submitted
422+
// to the runner before lower-priority ones.
423+
final List<String> queuedTaskIds = activeTasks.values()
424+
.stream()
425+
.sorted(Comparator.comparingInt((TaskEntry entry) -> entry.getTask().getPriority()).reversed())
426+
.map(entry -> entry.getTask().getId())
427+
.toList();
428+
for (final String queuedTaskId : queuedTaskIds) {
422429
updateTaskEntry(
423430
queuedTaskId,
424431
entry -> startPendingTaskOnRunner(entry, runnerTaskFutures.get(queuedTaskId))

indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.druid.indexing.common.config.TaskStorageConfig;
5353
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
5454
import org.apache.druid.indexing.common.task.IngestionTestBase;
55+
import org.apache.druid.indexing.common.task.NoopTask;
5556
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
5657
import org.apache.druid.indexing.common.task.Task;
5758
import org.apache.druid.indexing.common.task.Tasks;
@@ -98,6 +99,7 @@
9899
import javax.annotation.Nullable;
99100
import java.io.IOException;
100101
import java.net.URI;
102+
import java.util.ArrayList;
101103
import java.util.Arrays;
102104
import java.util.Collections;
103105
import java.util.HashMap;
@@ -751,6 +753,47 @@ public void testTaskWaitingTimeMetricEmittedForMultipleTasks() throws Exception
751753
serviceEmitter.verifyEmitted("task/run/time", 3);
752754
}
753755

756+
@Test
757+
public void testTaskSubmissionToTaskRunnerBasedOnPriority() throws Exception
758+
{
759+
final RecordingTaskRunner recordingRunner = new RecordingTaskRunner(serviceEmitter);
760+
final TaskQueue priorityQueue = new TaskQueue(
761+
new TaskLockConfig(),
762+
new TaskQueueConfig(10, null, null, null, null, null),
763+
new DefaultTaskConfig(),
764+
getTaskStorage(),
765+
recordingRunner,
766+
actionClientFactory,
767+
getLockbox(),
768+
serviceEmitter,
769+
getObjectMapper(),
770+
new NoopTaskContextEnricher()
771+
);
772+
priorityQueue.setActive(true);
773+
774+
final NoopTask lowPriority1 = NoopTask.ofPriority(1);
775+
final NoopTask lowPriority2 = NoopTask.ofPriority(10);
776+
final NoopTask medPriority = NoopTask.ofPriority(50);
777+
final NoopTask highPriority1 = NoopTask.ofPriority(90);
778+
final NoopTask highPriority2 = NoopTask.ofPriority(100);
779+
780+
priorityQueue.add(lowPriority1);
781+
priorityQueue.add(medPriority);
782+
priorityQueue.add(lowPriority2);
783+
priorityQueue.add(highPriority2);
784+
priorityQueue.add(highPriority1);
785+
786+
priorityQueue.manageQueuedTasks();
787+
788+
final List<String> submitted = recordingRunner.getSubmittedTaskIds();
789+
Assert.assertEquals(5, submitted.size());
790+
Assert.assertEquals(highPriority2.getId(), submitted.get(0));
791+
Assert.assertEquals(highPriority1.getId(), submitted.get(1));
792+
Assert.assertEquals(medPriority.getId(), submitted.get(2));
793+
Assert.assertEquals(lowPriority2.getId(), submitted.get(3));
794+
Assert.assertEquals(lowPriority1.getId(), submitted.get(4));
795+
}
796+
754797
private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
755798
{
756799
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
@@ -904,4 +947,29 @@ public ListenableFuture<TaskStatus> run(Task task)
904947
}
905948
}
906949
}
950+
951+
/**
952+
* A task runner that records the order in which tasks are submitted via {@link #run(Task)}.
953+
*/
954+
static class RecordingTaskRunner extends SimpleTaskRunner
955+
{
956+
private final List<String> submittedTaskIds = new ArrayList<>();
957+
958+
RecordingTaskRunner(ServiceEmitter emitter)
959+
{
960+
super(emitter);
961+
}
962+
963+
@Override
964+
public ListenableFuture<TaskStatus> run(Task task)
965+
{
966+
submittedTaskIds.add(task.getId());
967+
return Futures.immediateFuture(TaskStatus.success(task.getId()));
968+
}
969+
970+
List<String> getSubmittedTaskIds()
971+
{
972+
return submittedTaskIds;
973+
}
974+
}
907975
}

0 commit comments

Comments
 (0)