Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val replicaSelectorClassName = Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))

/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
val replicaLagTimeMaxMs = getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
val replicaSocketTimeoutMs = getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.config.AbstractKafkaConfig;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;

import org.slf4j.Logger;
Expand Down Expand Up @@ -82,7 +81,7 @@ public NodeToControllerChannelManagerImpl(Supplier<ControllerInformation> contro
buildNetworkClient(controllerInformation),
manualMetadataUpdater,
controllerNodeProvider,
config,
config.controllerSocketTimeoutMs(),
time,
threadName,
retryTimeoutMs
Expand Down Expand Up @@ -122,7 +121,7 @@ private KafkaClient buildNetworkClient(ControllerInformation controllerInfo) {
50,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Math.min(Integer.MAX_VALUE, (int) Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG), retryTimeoutMs)), // request timeout should not exceed the provided retry timeout
Math.min(Integer.MAX_VALUE, (int) Math.min(config.controllerSocketTimeoutMs(), retryTimeoutMs)), // request timeout should not exceed the provided retry timeout
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;

Expand Down Expand Up @@ -67,11 +65,11 @@ public void setStarted(boolean started) {
public NodeToControllerRequestThread(KafkaClient initialNetworkClient,
ManualMetadataUpdater metadataUpdater,
Supplier<ControllerInformation> controllerNodeProvider,
AbstractConfig config,
int controllerSocketTimeoutMs,
Time time,
String threadName,
Long retryTimeoutMs) {
super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE, (int) Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG), retryTimeoutMs)), time, false);
super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE, (int) Math.min(controllerSocketTimeoutMs, retryTimeoutMs)), time, false);
this.time = time;
this.controllerNodeProvider = controllerNodeProvider;
this.metadataUpdater = metadataUpdater;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public int numReplicaFetchers() {
return getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG);
}

public int controllerSocketTimeoutMs() {
return getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG);
}

public int numRecoveryThreadsPerDataDir() {
return getInt(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
Expand Down Expand Up @@ -60,10 +59,6 @@

class NodeToControllerRequestThreadTest {

private static AbstractConfig createConfig() {
return new AbstractConfig(ReplicationConfigs.CONFIG_DEF, Map.of());
}

private static ControllerInformation controllerInfo(Optional<Node> node) {
return new ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "");
}
Expand All @@ -89,7 +84,6 @@ private static Supplier<ControllerInformation> sequentialProvider(
@Test
void testRetryTimeoutWhileControllerNotAvailable() {
MockTime time = new MockTime();
AbstractConfig config = createConfig();
Metadata metadata = mock(Metadata.class);
MockClient mockClient = new MockClient(time, metadata);

Expand All @@ -98,7 +92,7 @@ void testRetryTimeoutWhileControllerNotAvailable() {
long retryTimeoutMs = 30000;
NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
Comment thread
yunchipang marked this conversation as resolved.
Outdated
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", retryTimeoutMs);
testRequestThread.setStarted(true);

TestControllerRequestCompletionHandler completionHandler =
Expand All @@ -123,7 +117,6 @@ mockClient, new ManualMetadataUpdater(),
void testRequestsSent() {
// just a simple test that tests whether the request from 1 -> 2 is sent and the response callback is called
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int controllerId = 2;

Metadata metadata = mock(Metadata.class);
Expand All @@ -136,7 +129,7 @@ void testRequestsSent() {
MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith(2, Map.of("a", 2));
NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);
testRequestThread.setStarted(true);
mockClient.prepareResponse(expectedResponse);

Expand Down Expand Up @@ -164,7 +157,6 @@ mockClient, new ManualMetadataUpdater(),
void testControllerChanged() {
// in this test the controller changes from node 1 -> node 2
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int oldControllerId = 1;
int newControllerId = 2;

Expand All @@ -180,7 +172,7 @@ void testControllerChanged() {
MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);
testRequestThread.setStarted(true);

TestControllerRequestCompletionHandler completionHandler =
Expand Down Expand Up @@ -212,7 +204,6 @@ mockClient, new ManualMetadataUpdater(),
@Test
void testNotController() {
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int oldControllerId = 1;
int newControllerId = 2;

Expand All @@ -232,7 +223,7 @@ void testNotController() {
MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);
testRequestThread.setStarted(true);

TestControllerRequestCompletionHandler completionHandler =
Expand Down Expand Up @@ -271,7 +262,6 @@ mockClient, new ManualMetadataUpdater(),
@Test
void testEnvelopeResponseWithNotControllerError() {
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int oldControllerId = 1;
int newControllerId = 2;

Expand All @@ -296,7 +286,7 @@ void testEnvelopeResponseWithNotControllerError() {

NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);
testRequestThread.setStarted(true);

TestControllerRequestCompletionHandler completionHandler =
Expand Down Expand Up @@ -343,7 +333,6 @@ mockClient, new ManualMetadataUpdater(),
@Test
void testRetryTimeout() {
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int controllerId = 1;

Metadata metadata = mock(Metadata.class);
Expand All @@ -359,7 +348,7 @@ void testRetryTimeout() {
Map.of("a", 2));
NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", retryTimeoutMs);
testRequestThread.setStarted(true);

TestControllerRequestCompletionHandler completionHandler =
Expand Down Expand Up @@ -391,7 +380,6 @@ mockClient, new ManualMetadataUpdater(),
@Test
void testUnsupportedVersionHandling() {
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int controllerId = 2;

Metadata metadata = mock(Metadata.class);
Expand Down Expand Up @@ -424,7 +412,7 @@ public void onComplete(ClientResponse response) {

NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);
testRequestThread.setStarted(true);

testRequestThread.enqueue(queueItem);
Expand All @@ -435,7 +423,6 @@ mockClient, new ManualMetadataUpdater(),
@Test
void testAuthenticationExceptionHandling() {
MockTime time = new MockTime();
AbstractConfig config = createConfig();
int controllerId = 2;

Metadata metadata = mock(Metadata.class);
Expand Down Expand Up @@ -468,7 +455,7 @@ public void onComplete(ClientResponse response) {

NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);
testRequestThread.setStarted(true);

testRequestThread.enqueue(queueItem);
Expand All @@ -481,7 +468,6 @@ mockClient, new ManualMetadataUpdater(),
void testThreadNotStarted() {
// Make sure we throw if we enqueue anything while the thread is not running
MockTime time = new MockTime();
AbstractConfig config = createConfig();

Metadata metadata = mock(Metadata.class);
MockClient mockClient = new MockClient(time, metadata);
Expand All @@ -490,7 +476,7 @@ void testThreadNotStarted() {

NodeToControllerRequestThread testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", Long.MAX_VALUE);
controllerNodeProvider, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", Long.MAX_VALUE);

TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler(null);
Expand Down
Loading