Skip to content

Commit 514ff19

Browse files
authored
Reduce default pool size of worker threads in SeekableStreamSupervisor (#18163)
The core pool size of worker threads is calculated as follows: - If `tuningConfig.workerThreads` is set, use that. - If auto scaler is enabled, use `Math.max(2, taskCountMax/4)` - Otherwise use `Math.max(2, taskCount/2)`
1 parent 2b1f7df commit 514ff19

3 files changed

Lines changed: 170 additions & 20 deletions

File tree

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import java.util.concurrent.ExecutionException;
127127
import java.util.concurrent.ExecutorService;
128128
import java.util.concurrent.ScheduledExecutorService;
129+
import java.util.concurrent.ScheduledThreadPoolExecutor;
129130
import java.util.concurrent.TimeUnit;
130131
import java.util.concurrent.atomic.AtomicInteger;
131132
import java.util.concurrent.locks.ReentrantLock;
@@ -162,6 +163,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
162163

163164
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
164165
private static final int MAX_INITIALIZATION_RETRIES = 20;
166+
private static final int MIN_WORKER_CORE_THREADS = 2;
167+
private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
168+
private static final int WORKER_THREAD_KEEPALIVE_TIME_MILLIS = 2000;
165169

166170
private static final EmittingLogger log = new EmittingLogger(SeekableStreamSupervisor.class);
167171

@@ -940,17 +944,8 @@ public SeekableStreamSupervisor(
940944
spec.isSuspended()
941945
);
942946

943-
int workerThreads;
944947
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
945948
log.info("Running Task autoscaler for supervisor[%s] for datasource[%s]", supervisorId, dataSource);
946-
947-
workerThreads = (this.tuningConfig.getWorkerThreads() != null
948-
? this.tuningConfig.getWorkerThreads()
949-
: Math.min(10, autoScalerConfig.getTaskCountMax()));
950-
} else {
951-
workerThreads = (this.tuningConfig.getWorkerThreads() != null
952-
? this.tuningConfig.getWorkerThreads()
953-
: Math.min(10, this.ioConfig.getTaskCount()));
954949
}
955950

956951
IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
@@ -970,13 +965,18 @@ public SeekableStreamSupervisor(
970965
);
971966
}
972967

973-
this.workerExec = MoreExecutors.listeningDecorator(
974-
ScheduledExecutors.fixed(
975-
workerThreads,
976-
StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d"
977-
)
968+
final int workerThreads = calculateWorkerThreads(tuningConfig, ioConfig);
969+
ScheduledThreadPoolExecutor executor = ScheduledExecutors.fixedWithKeepAliveTime(
970+
workerThreads,
971+
StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d",
972+
WORKER_THREAD_KEEPALIVE_TIME_MILLIS
973+
);
974+
975+
this.workerExec = MoreExecutors.listeningDecorator(executor);
976+
log.info(
977+
"Created worker pool with [%d] threads for supervisor[%s], dataSource[%s]",
978+
workerThreads, this.supervisorId, this.dataSource
978979
);
979-
log.info("Created worker pool with [%d] threads for supervisor[%s] for dataSource[%s]", workerThreads, this.supervisorId, this.dataSource);
980980

981981
this.taskInfoProvider = new TaskInfoProvider()
982982
{
@@ -1009,6 +1009,31 @@ public Optional<TaskStatus> getTaskStatus(String id)
10091009
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, this.tuningConfig, workerExec);
10101010
}
10111011

1012+
/**
1013+
* Calculates the number of worker threads to use in a {@link SeekableStreamSupervisor}.
1014+
* These threads are used to interact with tasks in {@link SeekableStreamIndexTaskClient}
1015+
* and handle task interactions (discovery, updates etc.) in {@link SeekableStreamSupervisor}
1016+
* <p>
1017+
* If the tuning config explicitly specifies the field {@code workerThreads}, that value is used.
1018+
* Otherwise, the value is derived from either the auto-scaler config (if enabled) or the ioConfig task count,
1019+
* divided by the {@link DEFAULT_TASKS_PER_WORKER_THREAD}, with a minimum of {@link MIN_WORKER_CORE_THREADS}.
1020+
*/
1021+
public static int calculateWorkerThreads(
1022+
SeekableStreamSupervisorTuningConfig tuningConfig,
1023+
SeekableStreamSupervisorIOConfig ioConfig
1024+
)
1025+
{
1026+
if (tuningConfig.getWorkerThreads() != null) {
1027+
return tuningConfig.getWorkerThreads();
1028+
}
1029+
final AutoScalerConfig autoScalerConfig = ioConfig.getAutoScalerConfig();
1030+
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
1031+
return Math.max(MIN_WORKER_CORE_THREADS, autoScalerConfig.getTaskCountMax() / DEFAULT_TASKS_PER_WORKER_THREAD);
1032+
} else {
1033+
return Math.max(MIN_WORKER_CORE_THREADS, ioConfig.getTaskCount() / DEFAULT_TASKS_PER_WORKER_THREAD);
1034+
}
1035+
}
1036+
10121037
@Override
10131038
public int getActiveTaskGroupsCount()
10141039
{

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
7373
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
7474
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
75+
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
7576
import org.apache.druid.java.util.common.DateTimes;
7677
import org.apache.druid.java.util.common.ISE;
7778
import org.apache.druid.java.util.common.Intervals;
@@ -135,6 +136,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
135136
private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID);
136137
private static final String EXCEPTION_MSG = "I had an exception";
137138
private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);
139+
private static final int DEFAULT_WORKER_THREADS = 2;
140+
private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
138141

139142
private TaskStorage taskStorage;
140143
private TaskMaster taskMaster;
@@ -177,7 +180,7 @@ public void setupTest()
177180
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
178181

179182
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
180-
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
183+
EasyMock.expect(spec.getIoConfig()).andReturn(createSupervisorIOConfig()).anyTimes();
181184
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
182185
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
183186
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
@@ -2563,6 +2566,83 @@ public LagStats computeLagStats()
25632566
EasyMock.verify(executorService, spec);
25642567
}
25652568

2569+
@Test
2570+
public void test_calculateWorkerThreads_shouldHonourWorkerConfig()
2571+
{
2572+
final int numWorkerThreads = 5;
2573+
final int taskCount = 1;
2574+
SeekableStreamSupervisorTuningConfig tuningConfig = createSupervisorTuningConfigWithWorkerThreads(numWorkerThreads);
2575+
SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(taskCount, null);
2576+
Assert.assertEquals(numWorkerThreads, SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig));
2577+
}
2578+
2579+
@Test
2580+
public void test_calculateWorkerThreads_shouldUseDefaultWorkerThreads()
2581+
{
2582+
final int taskCount = 1;
2583+
SeekableStreamSupervisorTuningConfig tuningConfig = createSupervisorTuningConfig();
2584+
SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(taskCount, null);
2585+
Assert.assertEquals(
2586+
DEFAULT_WORKER_THREADS,
2587+
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
2588+
);
2589+
}
2590+
2591+
@Test
2592+
public void test_calculateWorkerThreads_shouldUseMinimumWorkerThreadstWithTasks()
2593+
{
2594+
final int taskCount = 7;
2595+
SeekableStreamSupervisorTuningConfig tuningConfig = createSupervisorTuningConfig();
2596+
SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(taskCount, null);
2597+
Assert.assertEquals(
2598+
DEFAULT_WORKER_THREADS,
2599+
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
2600+
);
2601+
}
2602+
2603+
@Test
2604+
public void test_calculateWorkerThreads_shouldUseFactorOfTaskCount()
2605+
{
2606+
final int taskCount = 18;
2607+
SeekableStreamSupervisorTuningConfig tuningConfig = createSupervisorTuningConfig();
2608+
SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(taskCount, null);
2609+
Assert.assertEquals(
2610+
taskCount / DEFAULT_TASKS_PER_WORKER_THREAD,
2611+
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
2612+
);
2613+
}
2614+
2615+
@Test
2616+
public void test_calculateWorkerThreads_shouldUseAutoScalerConfig()
2617+
{
2618+
final int taskCountMax = 21;
2619+
final int taskCountMin = 5;
2620+
SeekableStreamSupervisorTuningConfig tuningConfig = createSupervisorTuningConfig();
2621+
AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfig(
2622+
null,
2623+
null,
2624+
null,
2625+
null,
2626+
null,
2627+
null,
2628+
null,
2629+
null,
2630+
taskCountMax,
2631+
null,
2632+
taskCountMin,
2633+
null,
2634+
null,
2635+
true,
2636+
null,
2637+
null
2638+
);
2639+
SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1, autoScalerConfig);
2640+
Assert.assertEquals(
2641+
taskCountMax / DEFAULT_TASKS_PER_WORKER_THREAD,
2642+
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
2643+
);
2644+
}
2645+
25662646
private void expectEmitterSupervisor(boolean suspended)
25672647
{
25682648
spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -2630,21 +2710,29 @@ private static DataSchema getDataSchema()
26302710
.build();
26312711
}
26322712

2633-
private static SeekableStreamSupervisorIOConfig getIOConfig()
2713+
private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig()
2714+
{
2715+
return createSupervisorIOConfig(1, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class));
2716+
}
2717+
2718+
private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
2719+
int taskCount,
2720+
@Nullable AutoScalerConfig autoScalerConfig
2721+
)
26342722
{
26352723
return new SeekableStreamSupervisorIOConfig(
26362724
"stream",
26372725
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
26382726
1,
2639-
1,
2727+
taskCount,
26402728
new Period("PT1H"),
26412729
new Period("P1D"),
26422730
new Period("PT30S"),
26432731
false,
26442732
new Period("PT30M"),
26452733
null,
26462734
null,
2647-
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
2735+
autoScalerConfig,
26482736
LagAggregator.DEFAULT,
26492737
null,
26502738
null,
@@ -2675,13 +2763,23 @@ private static Map<String, Object> getProperties()
26752763
}
26762764

26772765
private static SeekableStreamSupervisorTuningConfig getTuningConfig()
2766+
{
2767+
return createSupervisorTuningConfigWithWorkerThreads(1);
2768+
}
2769+
2770+
private static SeekableStreamSupervisorTuningConfig createSupervisorTuningConfig()
2771+
{
2772+
return createSupervisorTuningConfigWithWorkerThreads(null);
2773+
}
2774+
2775+
private static SeekableStreamSupervisorTuningConfig createSupervisorTuningConfigWithWorkerThreads(@Nullable Integer workerThreads)
26782776
{
26792777
return new SeekableStreamSupervisorTuningConfig()
26802778
{
26812779
@Override
26822780
public Integer getWorkerThreads()
26832781
{
2684-
return 1;
2782+
return workerThreads;
26852783
}
26862784

26872785
@Override

processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Callable;
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.ScheduledExecutorService;
29+
import java.util.concurrent.ScheduledThreadPoolExecutor;
2930
import java.util.concurrent.TimeUnit;
3031

3132
public class ScheduledExecutors
@@ -172,8 +173,34 @@ public static ScheduledExecutorFactory createFactory(final Lifecycle lifecycle)
172173
return (corePoolSize, nameFormat) -> ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
173174
}
174175

176+
/**
177+
* Creates a new {@link ScheduledExecutorService} with a minimum number of threads.
178+
*
179+
* @param corePoolSize the minimum number of threads in the pool
180+
* @param nameFormat the naming format for threads created by the pool
181+
* @return a new {@link ScheduledExecutorService} with the specified configuration
182+
*/
175183
public static ScheduledExecutorService fixed(int corePoolSize, String nameFormat)
176184
{
177185
return Executors.newScheduledThreadPool(corePoolSize, Execs.makeThreadFactory(nameFormat));
178186
}
187+
188+
/**
189+
* Creates a new {@link ScheduledExecutorService} with a minimum number of threads along with a
190+
* keep-alive time for idle non-core threads.
191+
* <p>
192+
*/
193+
public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(
194+
int corePoolSize,
195+
String nameFormat,
196+
long keepAliveTimeInMillis
197+
)
198+
{
199+
ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(
200+
corePoolSize,
201+
Execs.makeThreadFactory(nameFormat)
202+
);
203+
scheduledExecutor.setKeepAliveTime(keepAliveTimeInMillis, TimeUnit.MILLISECONDS);
204+
return scheduledExecutor;
205+
}
179206
}

0 commit comments

Comments
 (0)