Skip to content

Commit 921e0d5

Browse files
kamalcphtedyu
authored andcommitted
KAFKA-17928: Make remote log manager thread-pool configs dynamic (apache#17859)
- Disallow configuring -1 for copier and expiration thread pools dynamically Co-authored-by: Peter Lee <peterxcli@gmail.com> Reviewers: Peter Lee <peterxcli@gmail.com>, Satish Duggana <satishd@apache.org>
1 parent 89cd553 commit 921e0d5

7 files changed

Lines changed: 199 additions & 82 deletions

File tree

clients/src/main/java/org/apache/kafka/common/utils/ThreadUtils.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicLong;
2727

28+
import static java.lang.Thread.UncaughtExceptionHandler;
29+
2830
/**
2931
* Utilities for working with threads.
3032
*/
3133
public class ThreadUtils {
3234

3335
private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class);
36+
3437
/**
3538
* Create a new ThreadFactory.
3639
*
@@ -42,6 +45,22 @@ public class ThreadUtils {
4245
*/
4346
public static ThreadFactory createThreadFactory(final String pattern,
4447
final boolean daemon) {
48+
return createThreadFactory(pattern, daemon, null);
49+
}
50+
51+
/**
52+
* Create a new ThreadFactory.
53+
*
54+
* @param pattern The pattern to use. If this contains %d, it will be
55+
* replaced with a thread number. It should not contain more
56+
* than one %d.
57+
* @param daemon True if we want daemon threads.
58+
* @param ueh thread's uncaught exception handler.
59+
* @return The new ThreadFactory.
60+
*/
61+
public static ThreadFactory createThreadFactory(final String pattern,
62+
final boolean daemon,
63+
final UncaughtExceptionHandler ueh) {
4564
return new ThreadFactory() {
4665
private final AtomicLong threadEpoch = new AtomicLong(0);
4766

@@ -55,6 +74,9 @@ public Thread newThread(Runnable r) {
5574
}
5675
Thread thread = new Thread(r, threadName);
5776
thread.setDaemon(daemon);
77+
if (ueh != null) {
78+
thread.setUncaughtExceptionHandler(ueh);
79+
}
5880
return thread;
5981
}
6082
};
@@ -64,12 +86,15 @@ public Thread newThread(Runnable r) {
6486
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
6587
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
6688
* After the timeout/on interrupt, the service is forcefully closed.
89+
* This pattern of shutting down thread pool is adopted from here:
90+
* <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html">ExecutorService</a>
6791
* @param executorService The service to shut down.
68-
* @param timeout The timeout of the shutdown.
69-
* @param timeUnit The time unit of the shutdown timeout.
92+
* @param timeout The timeout of the shutdown.
93+
* @param timeUnit The time unit of the shutdown timeout.
7094
*/
7195
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
72-
long timeout, TimeUnit timeUnit) {
96+
long timeout,
97+
TimeUnit timeUnit) {
7398
if (executorService == null) {
7499
return;
75100
}

core/src/main/java/kafka/log/remote/RemoteLogManager.java

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
import org.apache.kafka.common.utils.BufferSupplier;
4242
import org.apache.kafka.common.utils.ChildFirstClassLoader;
4343
import org.apache.kafka.common.utils.CloseableIterator;
44-
import org.apache.kafka.common.utils.KafkaThread;
4544
import org.apache.kafka.common.utils.LogContext;
45+
import org.apache.kafka.common.utils.ThreadUtils;
4646
import org.apache.kafka.common.utils.Time;
4747
import org.apache.kafka.common.utils.Utils;
4848
import org.apache.kafka.server.common.CheckpointFile;
@@ -130,7 +130,6 @@
130130
import java.util.concurrent.ScheduledThreadPoolExecutor;
131131
import java.util.concurrent.ThreadFactory;
132132
import java.util.concurrent.TimeUnit;
133-
import java.util.concurrent.atomic.AtomicInteger;
134133
import java.util.concurrent.locks.Condition;
135134
import java.util.concurrent.locks.ReentrantLock;
136135
import java.util.function.BiConsumer;
@@ -162,7 +161,7 @@
162161
public class RemoteLogManager implements Closeable {
163162

164163
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
165-
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
164+
private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d";
166165
private final RemoteLogManagerConfig rlmConfig;
167166
private final int brokerId;
168167
private final String logDir;
@@ -255,18 +254,18 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
255254
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
256255
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
257256
rlmCopyThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
258-
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
257+
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
259258
rlmExpirationThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
260-
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
259+
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
261260
followerThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
262-
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-");
261+
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-%d");
263262

264263
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, rlmCopyThreadPool::getIdlePercent);
265264
remoteReadTimer = metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
266265
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
267266

268267
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
269-
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
268+
REMOTE_LOG_READER_THREAD_NAME_PATTERN,
270269
rlmConfig.remoteLogReaderThreads(),
271270
rlmConfig.remoteLogReaderMaxPendingTasks()
272271
);
@@ -290,6 +289,24 @@ public void updateFetchQuota(long quota) {
290289
rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
291290
}
292291

292+
public void resizeCopierThreadPool(int newSize) {
293+
int currentSize = rlmCopyThreadPool.getCorePoolSize();
294+
LOGGER.info("Updating remote copy thread pool size from {} to {}", currentSize, newSize);
295+
rlmCopyThreadPool.setCorePoolSize(newSize);
296+
}
297+
298+
public void resizeExpirationThreadPool(int newSize) {
299+
int currentSize = rlmExpirationThreadPool.getCorePoolSize();
300+
LOGGER.info("Updating remote expiration thread pool size from {} to {}", currentSize, newSize);
301+
rlmExpirationThreadPool.setCorePoolSize(newSize);
302+
}
303+
304+
public void resizeReaderThreadPool(int newSize) {
305+
int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
306+
LOGGER.info("Updating remote reader thread pool size from {} to {}", currentSize, newSize);
307+
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
308+
}
309+
293310
private void removeMetrics() {
294311
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
295312
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
@@ -2077,28 +2094,10 @@ public void close() {
20772094
}
20782095
}
20792096

2080-
private static void shutdownAndAwaitTermination(ExecutorService pool, String poolName, long timeout, TimeUnit timeUnit) {
2081-
// This pattern of shutting down thread pool is adopted from here: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
2082-
LOGGER.info("Shutting down of thread pool {} is started", poolName);
2083-
pool.shutdown(); // Disable new tasks from being submitted
2084-
try {
2085-
// Wait a while for existing tasks to terminate
2086-
if (!pool.awaitTermination(timeout, timeUnit)) {
2087-
LOGGER.info("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow.", poolName);
2088-
pool.shutdownNow(); // Cancel currently executing tasks
2089-
// Wait a while for tasks to respond to being cancelled
2090-
if (!pool.awaitTermination(timeout, timeUnit))
2091-
LOGGER.warn("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow.", poolName);
2092-
}
2093-
} catch (InterruptedException ex) {
2094-
// (Re-)Cancel if current thread also interrupted
2095-
LOGGER.warn("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow.", poolName);
2096-
pool.shutdownNow();
2097-
// Preserve interrupt status
2098-
Thread.currentThread().interrupt();
2099-
}
2100-
2101-
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
2097+
private static void shutdownAndAwaitTermination(ExecutorService executor, String poolName, long timeout, TimeUnit timeUnit) {
2098+
LOGGER.info("Shutting down {} executor", poolName);
2099+
ThreadUtils.shutdownExecutorServiceQuietly(executor, timeout, timeUnit);
2100+
LOGGER.info("{} executor shutdown completed", poolName);
21022101
}
21032102

21042103
//Visible for testing
@@ -2152,31 +2151,32 @@ RLMTaskWithFuture followerTask(TopicIdPartition partition) {
21522151
static class RLMScheduledThreadPool {
21532152

21542153
private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
2155-
private final int poolSize;
21562154
private final String threadPoolName;
2157-
private final String threadNamePrefix;
2155+
private final String threadNamePattern;
21582156
private final ScheduledThreadPoolExecutor scheduledThreadPool;
21592157

2160-
public RLMScheduledThreadPool(int poolSize, String threadPoolName, String threadNamePrefix) {
2161-
this.poolSize = poolSize;
2158+
public RLMScheduledThreadPool(int poolSize, String threadPoolName, String threadNamePattern) {
21622159
this.threadPoolName = threadPoolName;
2163-
this.threadNamePrefix = threadNamePrefix;
2164-
scheduledThreadPool = createPool();
2160+
this.threadNamePattern = threadNamePattern;
2161+
scheduledThreadPool = createPool(poolSize);
2162+
}
2163+
2164+
public void setCorePoolSize(int newSize) {
2165+
scheduledThreadPool.setCorePoolSize(newSize);
2166+
}
2167+
2168+
public int getCorePoolSize() {
2169+
return scheduledThreadPool.getCorePoolSize();
21652170
}
21662171

2167-
private ScheduledThreadPoolExecutor createPool() {
2172+
private ScheduledThreadPoolExecutor createPool(int poolSize) {
2173+
ThreadFactory threadFactory = ThreadUtils.createThreadFactory(threadNamePattern, true,
2174+
(t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e));
21682175
ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(poolSize);
21692176
threadPool.setRemoveOnCancelPolicy(true);
21702177
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
21712178
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
2172-
threadPool.setThreadFactory(new ThreadFactory() {
2173-
private final AtomicInteger sequence = new AtomicInteger();
2174-
2175-
public Thread newThread(Runnable r) {
2176-
return KafkaThread.daemon(threadNamePrefix + sequence.incrementAndGet(), r);
2177-
}
2178-
});
2179-
2179+
threadPool.setThreadFactory(threadFactory);
21802180
return threadPool;
21812181
}
21822182

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,22 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
11671167
throw new ConfigException(s"$errorMsg, value should be at least 1")
11681168
}
11691169
}
1170+
1171+
if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
1172+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k) ||
1173+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k)) {
1174+
val newValue = v.asInstanceOf[Int]
1175+
val oldValue = server.config.getInt(k)
1176+
if (newValue != oldValue) {
1177+
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
1178+
if (newValue <= 0)
1179+
throw new ConfigException(s"$errorMsg, value should be at least 1")
1180+
if (newValue < oldValue / 2)
1181+
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
1182+
if (newValue > oldValue * 2)
1183+
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
1184+
}
1185+
}
11701186
}
11711187
}
11721188

@@ -1176,29 +1192,40 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
11761192

11771193
def isChangedLongValue(k : String): Boolean = oldLongValue(k) != newLongValue(k)
11781194

1179-
val remoteLogManager = server.remoteLogManagerOpt
1180-
if (remoteLogManager.nonEmpty) {
1195+
if (server.remoteLogManagerOpt.nonEmpty) {
1196+
val remoteLogManager = server.remoteLogManagerOpt.get
11811197
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
11821198
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
11831199
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
1184-
remoteLogManager.get.resizeCacheSize(newValue)
1200+
remoteLogManager.resizeCacheSize(newValue)
11851201
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
11861202
s"old value: $oldValue, new value: $newValue")
11871203
}
11881204
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
11891205
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
11901206
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
1191-
remoteLogManager.get.updateCopyQuota(newValue)
1207+
remoteLogManager.updateCopyQuota(newValue)
11921208
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} updated, " +
11931209
s"old value: $oldValue, new value: $newValue")
11941210
}
11951211
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
11961212
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
11971213
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
1198-
remoteLogManager.get.updateFetchQuota(newValue)
1214+
remoteLogManager.updateFetchQuota(newValue)
11991215
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} updated, " +
12001216
s"old value: $oldValue, new value: $newValue")
12011217
}
1218+
1219+
val newRLMConfig = newConfig.remoteLogManagerConfig
1220+
val oldRLMConfig = oldConfig.remoteLogManagerConfig
1221+
if (newRLMConfig.remoteLogManagerCopierThreadPoolSize() != oldRLMConfig.remoteLogManagerCopierThreadPoolSize())
1222+
remoteLogManager.resizeCopierThreadPool(newRLMConfig.remoteLogManagerCopierThreadPoolSize())
1223+
1224+
if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() != oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
1225+
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())
1226+
1227+
if (newRLMConfig.remoteLogReaderThreads() != oldRLMConfig.remoteLogReaderThreads())
1228+
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
12021229
}
12031230
}
12041231

@@ -1219,6 +1246,9 @@ object DynamicRemoteLogConfig {
12191246
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
12201247
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
12211248
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
1222-
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
1249+
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
1250+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
1251+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
1252+
RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
12231253
)
12241254
}

0 commit comments

Comments
 (0)