Skip to content

Commit e739562

Browse files
committed
Disallow configuring -1 for copier and expiration thread pools
To provide clear configuration and reduce the need for manual tuning, the following changes are proposed: - Change the default value of copier and expiration thread pool size from -1 to 10. This will also ensure smooth validation (step increments x/2 to x*2) of thread-count dynamic config change. - Change the default value of remote.log.manager.thread.pool.size from 10 to 2, as the task is light-weight to fetch the highest-uploaded remote offset for the follower partitions. - We can deprecate the logic that refers the value from another thread-pool (transitive config dependency). The thread-pool configured values become independent of each other.
1 parent ba0dcd1 commit e739562

5 files changed

Lines changed: 50 additions & 55 deletions

File tree

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,9 +1168,9 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
11681168
}
11691169
}
11701170

1171-
// No validations are done for copier and expiration thread pools, it follows the default validation
1172-
// defined in the ConfigDef
1173-
if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k)) {
1171+
if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
1172+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k) ||
1173+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k)) {
11741174
val newValue = v.asInstanceOf[Int]
11751175
val oldValue = server.config.getInt(k)
11761176
if (newValue != oldValue) {

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,8 @@ class DynamicBrokerConfigTest {
206206
def testUpdateRemoteLogManagerDynamicThreadPool(): Unit = {
207207
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
208208
val config = KafkaConfig(origProps)
209-
// When copierThreadPool and expirationThreadPool are not configured, then it defaults to the remoteLogManagerThreadPoolSize
210-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
211-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
209+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
210+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
212211
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS, config.remoteLogManagerConfig.remoteLogReaderThreads())
213212

214213
val serverMock = mock(classOf[KafkaBroker])
@@ -233,42 +232,48 @@ class DynamicBrokerConfigTest {
233232
assertEquals(7, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
234233
verify(remoteLogManager).resizeExpirationThreadPool(7)
235234

236-
// When copier and expiration thread pools are set to -1 dynamically, then it defaults to the remoteLogManagerThreadPoolSize
237-
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, "-1")
238-
config.dynamicConfig.validate(props, perBrokerConfig = true)
239-
config.dynamicConfig.updateDefaultConfig(props)
240-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
241-
verify(remoteLogManager).resizeCopierThreadPool(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE)
242-
243-
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, "-1")
244-
config.dynamicConfig.validate(props, perBrokerConfig = false)
245-
config.dynamicConfig.updateDefaultConfig(props)
246-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
247-
verify(remoteLogManager).resizeExpirationThreadPool(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE)
248-
249235
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "6")
250236
config.dynamicConfig.validate(props, perBrokerConfig = true)
251237
config.dynamicConfig.updateDefaultConfig(props)
252238
assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
253239
verify(remoteLogManager).resizeReaderThreadPool(6)
254240
props.clear()
241+
verifyNoMoreInteractions(remoteLogManager)
242+
}
255243

256-
// Test dynamic update with invalid values
257-
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, "0")
258-
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
259-
props.clear()
244+
@Test
245+
def testRemoteLogDynamicThreadPoolWithInvalidValues(): Unit = {
246+
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
247+
val config = KafkaConfig(origProps)
260248

261-
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, "-2")
262-
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = false))
263-
props.clear()
249+
val serverMock = mock(classOf[KafkaBroker])
250+
val remoteLogManager = mock(classOf[RemoteLogManager])
251+
when(serverMock.config).thenReturn(config)
252+
when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
264253

265-
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
266-
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = false))
267-
props.clear()
254+
config.dynamicConfig.initialize(None, None)
255+
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
268256

269-
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
270-
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
271-
props.clear()
257+
// Test dynamic update with invalid values
258+
val props = new Properties()
259+
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, "0")
260+
val err = assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
261+
assertTrue(err.getMessage.contains("Value must be at least 1"))
262+
263+
val props1 = new Properties()
264+
props1.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, "-1")
265+
val err1 = assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props1, perBrokerConfig = false))
266+
assertTrue(err1.getMessage.contains("Value must be at least 1"))
267+
268+
val props2 = new Properties()
269+
props2.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
270+
val err2 = assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props2, perBrokerConfig = false))
271+
assertTrue(err2.getMessage.contains("value should be at least half the current value"))
272+
273+
val props3 = new Properties()
274+
props3.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
275+
val err3 = assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
276+
assertTrue(err3.getMessage.contains("Value must be at least 1"))
272277
verifyNoMoreInteractions(remoteLogManager)
273278
}
274279

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,8 +1102,8 @@ class KafkaConfigTest {
11021102
case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string
11031103
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
11041104
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1105-
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
1106-
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
1105+
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
1106+
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
11071107
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
11081108
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
11091109
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.kafka.common.config.AbstractConfig;
2020
import org.apache.kafka.common.config.ConfigDef;
21-
import org.apache.kafka.common.config.ConfigException;
2221

2322
import java.util.Collections;
2423
import java.util.Map;
@@ -94,28 +93,19 @@ public final class RemoteLogManagerConfig {
9493
public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
9594

9695
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size";
97-
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " +
98-
"segments, fetch remote log indexes and clean up remote log segments.";
99-
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
100-
101-
private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " +
102-
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
103-
private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
104-
(name, value) -> {
105-
if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0");
106-
},
107-
() -> "[-1,1,...]"
108-
);
96+
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling follower tasks to read " +
97+
"the highest-uploaded remote-offset for follower partitions.";
98+
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
10999

110100
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size";
111101
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
112-
"to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
113-
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1;
102+
"to copy segments.";
103+
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10;
114104

115105
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size";
116106
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
117-
"to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
118-
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
107+
"to clean up the expired remote log segments.";
108+
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
119109

120110
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms";
121111
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " +
@@ -270,13 +260,13 @@ public static ConfigDef configDef() {
270260
.define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
271261
INT,
272262
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
273-
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
263+
atLeast(1),
274264
MEDIUM,
275265
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
276266
.define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
277267
INT,
278268
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
279-
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
269+
atLeast(1),
280270
MEDIUM,
281271
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
282272
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public void testThreadPoolDefaults() {
6161
Map<String, Object> emptyProps = new HashMap<>();
6262
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
6363
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
64-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
65-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
64+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
65+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
6666
}
6767

6868
@Test

0 commit comments

Comments
 (0)