Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ public class CommonClientConfigs {
"metadata for this interval, client repeats the bootstrap process using <code>bootstrap.servers</code> configuration.";
public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS = 300 * 1000;

public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG = "metadata.cluster.check.enable";
public static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = "Whether the client should send cluster and node information " +
"when connecting to a broker to enable it to check for a misrouted connection. This configuration is ignored if " +
"rebootstrapping is disabled by setting the configuration <code>metadata.recovery.strategy=none</code>. If the client " +
"is connecting to a broker older than Apache Kafka 4.4, no checking is performed and this configuration has no effect.";

/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public class AdminClientConfig extends AbstractConfig {
public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;

public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG = CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG;
public static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC;

/**
* <code>security.providers</code>
*/
Expand Down Expand Up @@ -285,6 +288,11 @@ public class AdminClientConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,11 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
Type.STRING,
ShareAcknowledgementMode.IMPLICIT.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
// Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
"validVersions": "0-4",
//
// Version 5 introduces ClusterId and NodeId cbecking and REBOOTSTRAP_REQUIRED error (KIP-1242).
"validVersions": "0-5",
"flexibleVersions": "3+",
"latestVersionUnstable": true,
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
"ignorable": true, "about": "The name of the client." },
{ "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
"ignorable": true, "about": "The version of the client." }
"ignorable": true, "about": "The version of the client." },
{ "name": "ClusterId", "type": "string", "versions": "5+", "nullableVersions": "5+", "default": "null",
"ignorable": true, "about": "The cluster ID the client intends to connect to, if known." },
{ "name": "NodeId", "type": "int32", "versions": "5+", "default": -1,
"ignorable": true, "about": "The node ID the client intends to connect to, if known." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0.
"validVersions": "0-4",
//
// Version 5 introduces ClusterId and NodeId checking and REBOOTSTRAP_REQUIRED error (KIP-1242).
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void testUnsupportedApiVersionsRequestWithVersionProvidedByTheBroker() {
ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer();
RequestHeader header = parseHeader(buffer);
assertEquals(ApiKeys.API_VERSIONS, header.apiKey());
assertEquals(4, header.apiVersion());
assertEquals(5, header.apiVersion());

// prepare response
ApiVersionCollection apiKeys = new ApiVersionCollection();
Expand Down Expand Up @@ -535,7 +535,7 @@ public void testUnsupportedApiVersionsRequestWithoutVersionProvidedByTheBroker()
ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer();
RequestHeader header = parseHeader(buffer);
assertEquals(ApiKeys.API_VERSIONS, header.apiKey());
assertEquals(4, header.apiVersion());
assertEquals(5, header.apiVersion());

// prepare response
delayedApiVersionsResponse(0, (short) 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public final class DistributedConfig extends WorkerConfig {
private static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;

public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG = CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG;
private static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC;

/**
* <code>worker.sync.timeout.ms</code>
*/
Expand Down Expand Up @@ -535,7 +538,12 @@ private static ConfigDef config(Crypto crypto) {
DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
atLeast(0),
ConfigDef.Importance.LOW,
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.LOW,
METADATA_CLUSTER_CHECK_ENABLE_DOC);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,15 @@ class DefaultApiVersionManagerTest {
)

ApiKeys.apisForListener(apiScope).forEach { apiKey =>
if (apiKey.messageType.latestVersionUnstable()) {
if (apiKey.id == ApiKeys.API_VERSIONS.id) {
// ApiVersions API is a particular case. The client always send the highest version
// that it supports and the server fails back to version 0 if it does not know it.
// See ApiKeys.isVersionEnabled for more information (KIP-511).
// Because API_VERSIONS has an unstable version while KIP-1242 is under development,
// we need a special case in this test. This assertion will start failing when the
// API is no longer unstable and the special case can be removed.
assertTrue(apiKey.messageType.latestVersionUnstable());
} else if (apiKey.messageType.latestVersionUnstable()) {
assertFalse(versionManager.isApiEnabled(apiKey, apiKey.latestVersion),
s"$apiKey version ${apiKey.latestVersion} should be disabled.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
null,
Expand Down
Loading