Skip to content

Commit ba0dcd1

Browse files
peterxclikamalcph
andcommitted
KAFKA-17928: Make remote log manager thread-pool configs dynamic
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
1 parent 4c5ea05 commit ba0dcd1

4 files changed

Lines changed: 147 additions & 48 deletions

File tree

core/src/main/java/kafka/log/remote/RemoteLogManager.java

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
import org.apache.kafka.common.utils.BufferSupplier;
4242
import org.apache.kafka.common.utils.ChildFirstClassLoader;
4343
import org.apache.kafka.common.utils.CloseableIterator;
44-
import org.apache.kafka.common.utils.KafkaThread;
4544
import org.apache.kafka.common.utils.LogContext;
45+
import org.apache.kafka.common.utils.ThreadUtils;
4646
import org.apache.kafka.common.utils.Time;
4747
import org.apache.kafka.common.utils.Utils;
4848
import org.apache.kafka.server.common.CheckpointFile;
@@ -128,9 +128,7 @@
128128
import java.util.concurrent.Future;
129129
import java.util.concurrent.ScheduledFuture;
130130
import java.util.concurrent.ScheduledThreadPoolExecutor;
131-
import java.util.concurrent.ThreadFactory;
132131
import java.util.concurrent.TimeUnit;
133-
import java.util.concurrent.atomic.AtomicInteger;
134132
import java.util.concurrent.locks.Condition;
135133
import java.util.concurrent.locks.ReentrantLock;
136134
import java.util.function.BiConsumer;
@@ -162,7 +160,7 @@
162160
public class RemoteLogManager implements Closeable {
163161

164162
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
165-
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
163+
private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d";
166164
private final RemoteLogManagerConfig rlmConfig;
167165
private final int brokerId;
168166
private final String logDir;
@@ -255,18 +253,18 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
255253
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
256254
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
257255
rlmCopyThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
258-
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
256+
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
259257
rlmExpirationThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
260-
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
258+
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
261259
followerThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
262-
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-");
260+
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-%d");
263261

264262
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, rlmCopyThreadPool::getIdlePercent);
265263
remoteReadTimer = metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
266264
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
267265

268266
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
269-
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
267+
REMOTE_LOG_READER_THREAD_NAME_PATTERN,
270268
rlmConfig.remoteLogReaderThreads(),
271269
rlmConfig.remoteLogReaderMaxPendingTasks()
272270
);
@@ -290,6 +288,24 @@ public void updateFetchQuota(long quota) {
290288
rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
291289
}
292290

291+
public void resizeCopierThreadPool(int newSize) {
292+
int currentSize = rlmCopyThreadPool.getCorePoolSize();
293+
LOGGER.info("Updating remote copy thread pool size from {} to {}", currentSize, newSize);
294+
rlmCopyThreadPool.setCorePoolSize(newSize);
295+
}
296+
297+
public void resizeExpirationThreadPool(int newSize) {
298+
int currentSize = rlmExpirationThreadPool.getCorePoolSize();
299+
LOGGER.info("Updating remote expiration thread pool size from {} to {}", currentSize, newSize);
300+
rlmExpirationThreadPool.setCorePoolSize(newSize);
301+
}
302+
303+
public void resizeReaderThreadPool(int newSize) {
304+
int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
305+
LOGGER.info("Updating remote reader thread pool size from {} to {}", currentSize, newSize);
306+
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
307+
}
308+
293309
private void removeMetrics() {
294310
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
295311
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
@@ -2152,31 +2168,30 @@ RLMTaskWithFuture followerTask(TopicIdPartition partition) {
21522168
static class RLMScheduledThreadPool {
21532169

21542170
private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
2155-
private final int poolSize;
21562171
private final String threadPoolName;
2157-
private final String threadNamePrefix;
2172+
private final String threadNamePattern;
21582173
private final ScheduledThreadPoolExecutor scheduledThreadPool;
21592174

2160-
public RLMScheduledThreadPool(int poolSize, String threadPoolName, String threadNamePrefix) {
2161-
this.poolSize = poolSize;
2175+
public RLMScheduledThreadPool(int poolSize, String threadPoolName, String threadNamePattern) {
21622176
this.threadPoolName = threadPoolName;
2163-
this.threadNamePrefix = threadNamePrefix;
2164-
scheduledThreadPool = createPool();
2177+
this.threadNamePattern = threadNamePattern;
2178+
scheduledThreadPool = createPool(poolSize);
2179+
}
2180+
2181+
public void setCorePoolSize(int newSize) {
2182+
scheduledThreadPool.setCorePoolSize(newSize);
2183+
}
2184+
2185+
public int getCorePoolSize() {
2186+
return scheduledThreadPool.getCorePoolSize();
21652187
}
21662188

2167-
private ScheduledThreadPoolExecutor createPool() {
2189+
private ScheduledThreadPoolExecutor createPool(int poolSize) {
21682190
ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(poolSize);
21692191
threadPool.setRemoveOnCancelPolicy(true);
21702192
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
21712193
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
2172-
threadPool.setThreadFactory(new ThreadFactory() {
2173-
private final AtomicInteger sequence = new AtomicInteger();
2174-
2175-
public Thread newThread(Runnable r) {
2176-
return KafkaThread.daemon(threadNamePrefix + sequence.incrementAndGet(), r);
2177-
}
2178-
});
2179-
2194+
threadPool.setThreadFactory(ThreadUtils.createThreadFactory(threadNamePattern, true));
21802195
return threadPool;
21812196
}
21822197

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,22 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
11671167
throw new ConfigException(s"$errorMsg, value should be at least 1")
11681168
}
11691169
}
1170+
1171+
// No validations are done for copier and expiration thread pools, it follows the default validation
1172+
// defined in the ConfigDef
1173+
if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k)) {
1174+
val newValue = v.asInstanceOf[Int]
1175+
val oldValue = server.config.getInt(k)
1176+
if (newValue != oldValue) {
1177+
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
1178+
if (newValue <= 0)
1179+
throw new ConfigException(s"$errorMsg, value should be at least 1")
1180+
if (newValue < oldValue / 2)
1181+
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
1182+
if (newValue > oldValue * 2)
1183+
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
1184+
}
1185+
}
11701186
}
11711187
}
11721188

@@ -1176,29 +1192,40 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
11761192

11771193
def isChangedLongValue(k : String): Boolean = oldLongValue(k) != newLongValue(k)
11781194

1179-
val remoteLogManager = server.remoteLogManagerOpt
1180-
if (remoteLogManager.nonEmpty) {
1195+
if (server.remoteLogManagerOpt.nonEmpty) {
1196+
val remoteLogManager = server.remoteLogManagerOpt.get
11811197
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
11821198
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
11831199
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
1184-
remoteLogManager.get.resizeCacheSize(newValue)
1200+
remoteLogManager.resizeCacheSize(newValue)
11851201
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
11861202
s"old value: $oldValue, new value: $newValue")
11871203
}
11881204
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
11891205
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
11901206
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
1191-
remoteLogManager.get.updateCopyQuota(newValue)
1207+
remoteLogManager.updateCopyQuota(newValue)
11921208
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} updated, " +
11931209
s"old value: $oldValue, new value: $newValue")
11941210
}
11951211
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
11961212
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
11971213
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
1198-
remoteLogManager.get.updateFetchQuota(newValue)
1214+
remoteLogManager.updateFetchQuota(newValue)
11991215
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} updated, " +
12001216
s"old value: $oldValue, new value: $newValue")
12011217
}
1218+
1219+
val newRLMConfig = newConfig.remoteLogManagerConfig
1220+
val oldRLMConfig = oldConfig.remoteLogManagerConfig
1221+
if (newRLMConfig.remoteLogManagerCopierThreadPoolSize() != oldRLMConfig.remoteLogManagerCopierThreadPoolSize())
1222+
remoteLogManager.resizeCopierThreadPool(newRLMConfig.remoteLogManagerCopierThreadPoolSize())
1223+
1224+
if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() != oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
1225+
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())
1226+
1227+
if (newRLMConfig.remoteLogReaderThreads() != oldRLMConfig.remoteLogReaderThreads())
1228+
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
12021229
}
12031230
}
12041231

@@ -1219,6 +1246,9 @@ object DynamicRemoteLogConfig {
12191246
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
12201247
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
12211248
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
1222-
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
1249+
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
1250+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
1251+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
1252+
RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
12231253
)
12241254
}

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,76 @@ class DynamicBrokerConfigTest {
202202
)
203203
}
204204

205+
@Test
206+
def testUpdateRemoteLogManagerDynamicThreadPool(): Unit = {
207+
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
208+
val config = KafkaConfig(origProps)
209+
// When copierThreadPool and expirationThreadPool are not configured, then it defaults to the remoteLogManagerThreadPoolSize
210+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
211+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
212+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS, config.remoteLogManagerConfig.remoteLogReaderThreads())
213+
214+
val serverMock = mock(classOf[KafkaBroker])
215+
val remoteLogManager = mock(classOf[RemoteLogManager])
216+
when(serverMock.config).thenReturn(config)
217+
when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
218+
219+
config.dynamicConfig.initialize(None, None)
220+
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
221+
222+
// Test dynamic update with valid values
223+
val props = new Properties()
224+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, "8")
225+
config.dynamicConfig.validate(props, perBrokerConfig = true)
226+
config.dynamicConfig.updateDefaultConfig(props)
227+
assertEquals(8, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
228+
verify(remoteLogManager).resizeCopierThreadPool(8)
229+
230+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, "7")
231+
config.dynamicConfig.validate(props, perBrokerConfig = false)
232+
config.dynamicConfig.updateDefaultConfig(props)
233+
assertEquals(7, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
234+
verify(remoteLogManager).resizeExpirationThreadPool(7)
235+
236+
// When copier and expiration thread pools are set to -1 dynamically, then it defaults to the remoteLogManagerThreadPoolSize
237+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, "-1")
238+
config.dynamicConfig.validate(props, perBrokerConfig = true)
239+
config.dynamicConfig.updateDefaultConfig(props)
240+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
241+
verify(remoteLogManager).resizeCopierThreadPool(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE)
242+
243+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, "-1")
244+
config.dynamicConfig.validate(props, perBrokerConfig = false)
245+
config.dynamicConfig.updateDefaultConfig(props)
246+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
247+
verify(remoteLogManager).resizeExpirationThreadPool(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE)
248+
249+
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "6")
250+
config.dynamicConfig.validate(props, perBrokerConfig = true)
251+
config.dynamicConfig.updateDefaultConfig(props)
252+
assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
253+
verify(remoteLogManager).resizeReaderThreadPool(6)
254+
props.clear()
255+
256+
// Test dynamic update with invalid values
257+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, "0")
258+
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
259+
props.clear()
260+
261+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, "-2")
262+
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = false))
263+
props.clear()
264+
265+
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
266+
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = false))
267+
props.clear()
268+
269+
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
270+
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
271+
props.clear()
272+
verifyNoMoreInteractions(remoteLogManager)
273+
}
274+
205275
@nowarn("cat=deprecation")
206276
@Test
207277
def testConfigUpdateWithSomeInvalidConfigs(): Unit = {

storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818

1919
import org.apache.kafka.common.internals.FatalExitError;
2020
import org.apache.kafka.common.utils.Exit;
21+
import org.apache.kafka.common.utils.ThreadUtils;
2122
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
2223

2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

2627
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.ThreadFactory;
2828
import java.util.concurrent.ThreadPoolExecutor;
2929
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.atomic.AtomicInteger;
3130

3231
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
3332
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
@@ -37,11 +36,11 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
3736
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class);
3837
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
3938

40-
public RemoteStorageThreadPool(String threadNamePrefix,
39+
public RemoteStorageThreadPool(String threadNamePattern,
4140
int numThreads,
4241
int maxPendingTasks) {
4342
super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
44-
new RemoteStorageThreadFactory(threadNamePrefix));
43+
ThreadUtils.createThreadFactory(threadNamePattern, false));
4544
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
4645
() -> getQueue().size());
4746
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
@@ -61,21 +60,6 @@ protected void afterExecute(Runnable runnable, Throwable th) {
6160
}
6261
}
6362

64-
private static class RemoteStorageThreadFactory implements ThreadFactory {
65-
private final String namePrefix;
66-
private final AtomicInteger threadNumber = new AtomicInteger(0);
67-
68-
RemoteStorageThreadFactory(String namePrefix) {
69-
this.namePrefix = namePrefix;
70-
}
71-
72-
@Override
73-
public Thread newThread(Runnable r) {
74-
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
75-
}
76-
77-
}
78-
7963
public void removeMetrics() {
8064
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
8165
}

0 commit comments

Comments
 (0)