Skip to content

Commit 4d206bd

Browse files
jayteejtedyu
authored andcommitted
KAFKA-16368: Update remote.log.manager.* default thread pool values for KIP-1030 (apache#18137)
Reviewers: Divij Vaidya <diviv@amazon.com>
1 parent 72e282b commit 4d206bd

3 files changed

Lines changed: 22 additions & 15 deletions

File tree

docs/upgrade.html

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,20 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
5959
Please use <code>log.message.timestamp.before.max.ms</code> and <code>log.message.timestamp.after.max.ms</code> instead.
6060
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation">KIP-937</a> for details.
6161
</li>
62+
<li>
63+
The <code>remote.log.manager.copier.thread.pool.size</code> configuration default value was changed to 10 from -1.
64+
Values of -1 are no longer valid. A minimum of 1 or higher is valid.
65+
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
66+
</li>
67+
<li>
68+
The <code>remote.log.manager.expiration.thread.pool.size</code> configuration default value was changed to 10 from -1.
69+
Values of -1 are no longer valid. A minimum of 1 or higher is valid.
70+
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
71+
</li>
72+
<li>
73+
The <code>remote.log.manager.thread.pool.size</code> configuration default value was changed to 2 from 10.
74+
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
75+
</li>
6276
</ul>
6377
</li>
6478
<li><b>MirrorMaker</b>

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.kafka.common.config.AbstractConfig;
2020
import org.apache.kafka.common.config.ConfigDef;
21-
import org.apache.kafka.common.config.ConfigException;
2221

2322
import java.util.Collections;
2423
import java.util.Map;
@@ -96,26 +95,20 @@ public final class RemoteLogManagerConfig {
9695
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size";
9796
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " +
9897
"segments, fetch remote log indexes and clean up remote log segments.";
99-
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
98+
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
10099

101100
private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " +
102101
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
103-
private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
104-
(name, value) -> {
105-
if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0");
106-
},
107-
() -> "[-1,1,...]"
108-
);
109102

110103
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size";
111104
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
112105
"to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
113-
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1;
106+
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10;
114107

115108
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size";
116109
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
117110
"to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
118-
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
111+
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
119112

120113
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms";
121114
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " +
@@ -270,13 +263,13 @@ public static ConfigDef configDef() {
270263
.define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
271264
INT,
272265
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
273-
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
266+
atLeast(1),
274267
MEDIUM,
275268
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
276269
.define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
277270
INT,
278271
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
279-
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
272+
atLeast(1),
280273
MEDIUM,
281274
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
282275
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public void testThreadPoolDefaults() {
6161
Map<String, Object> emptyProps = new HashMap<>();
6262
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
6363
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
64-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
65-
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
64+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
65+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
6666
}
6767

6868
@Test
@@ -103,7 +103,7 @@ private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
103103
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
104104
0.3);
105105
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP,
106-
10);
106+
2);
107107
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
108108
100);
109109
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,

0 commit comments

Comments
 (0)