Skip to content

Commit 7c52efb

Browse files
committed
KAFKA-18157: Consider UnsupportedVersionException child class to represent the case of unsupported fields
1 parent a562042 commit 7c52efb

17 files changed

Lines changed: 97 additions & 54 deletions

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
2222
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
2323
import org.apache.kafka.common.Uuid;
24+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2425
import org.apache.kafka.common.errors.UnsupportedVersionException;
2526
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
2627
import org.apache.kafka.common.metrics.Metrics;
@@ -100,12 +101,14 @@ public boolean handleSpecificFailure(Throwable exception) {
100101
String errorMessage = exception.getMessage();
101102
if (exception instanceof UnsupportedVersionException) {
102103
String message = CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
103-
if (errorMessage.equals(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) {
104+
if (exception instanceof UnsupportedProtocolFieldException) {
104105
message = REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
105106
logger.error("{} regex resolution not supported: {}", heartbeatRequestName(), message);
106107
} else {
107108
logger.error("{} failed due to unsupported version while sending request: {}", heartbeatRequestName(), errorMessage);
108109
}
110+
// Surface the parent type here: handleFatalFailure propagates this via BackgroundEvent
111+
// to the user-facing API. Propagating the subclass would be a user-visible behavior change.
109112
handleFatalFailure(new UnsupportedVersionException(message, exception));
110113
errorHandled = true;
111114
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.errors;
18+
19+
public class UnsupportedProtocolFieldException extends UnsupportedVersionException {
20+
private static final long serialVersionUID = 1L;
21+
22+
public UnsupportedProtocolFieldException(String fieldOrValue, String apiKeyName,
23+
int apiVersion, int lowestSupportedVersion) {
24+
super("The cluster does not support [" + fieldOrValue + "] in " + apiKeyName
25+
+ " API version " + apiVersion + ". Upgrade the cluster to " + apiKeyName
26+
+ " API version >= " + lowestSupportedVersion + " to enable [" + fieldOrValue + "].");
27+
}
28+
29+
public UnsupportedProtocolFieldException(String message) {
30+
super(message);
31+
}
32+
}

clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19-
import org.apache.kafka.common.errors.UnsupportedVersionException;
19+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2020
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
2121
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
2222
import org.apache.kafka.common.protocol.ApiKeys;
@@ -63,7 +63,7 @@ public Builder(ConsumerGroupHeartbeatRequestData data, boolean enableUnstableLas
6363
@Override
6464
public ConsumerGroupHeartbeatRequest build(short version) {
6565
if (version == 0 && data.subscribedTopicRegex() != null) {
66-
throw new UnsupportedVersionException(REGEX_RESOLUTION_NOT_SUPPORTED_MSG);
66+
throw new UnsupportedProtocolFieldException(REGEX_RESOLUTION_NOT_SUPPORTED_MSG);
6767
}
6868
return new ConsumerGroupHeartbeatRequest(data, version);
6969
}

clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.kafka.common.acl.AclBinding;
2222
import org.apache.kafka.common.acl.AclOperation;
2323
import org.apache.kafka.common.acl.AclPermissionType;
24-
import org.apache.kafka.common.errors.UnsupportedVersionException;
24+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2525
import org.apache.kafka.common.message.CreateAclsRequestData;
2626
import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
2727
import org.apache.kafka.common.message.CreateAclsResponseData;
@@ -32,8 +32,10 @@
3232
import org.apache.kafka.common.resource.ResourcePattern;
3333
import org.apache.kafka.common.resource.ResourceType;
3434

35+
import java.util.Arrays;
3536
import java.util.Collections;
3637
import java.util.List;
38+
import java.util.stream.Collectors;
3739

3840
public class CreateAclsRequest extends AbstractRequest {
3941

@@ -90,8 +92,13 @@ private void validate(CreateAclsRequestData data) {
9092
if (version() == 0) {
9193
final boolean unsupported = data.creations().stream().anyMatch(creation ->
9294
creation.resourcePatternType() != PatternType.LITERAL.code());
93-
if (unsupported)
94-
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
95+
if (unsupported) {
96+
String unsupportedType = Arrays.stream(PatternType.values())
97+
.filter(type -> type != PatternType.LITERAL)
98+
.map(PatternType::name)
99+
.collect(Collectors.joining(","));
100+
throw new UnsupportedProtocolFieldException(unsupportedType, apiKey().name(), version(), 1);
101+
}
95102
}
96103

97104
final boolean unknown = data.creations().stream().anyMatch(creation ->

clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19-
import org.apache.kafka.common.errors.UnsupportedVersionException;
19+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2020
import org.apache.kafka.common.message.CreateTopicsRequestData;
2121
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
2222
import org.apache.kafka.common.message.CreateTopicsResponseData;
@@ -39,8 +39,7 @@ public Builder(CreateTopicsRequestData data) {
3939
@Override
4040
public CreateTopicsRequest build(short version) {
4141
if (data.validateOnly() && version == 0)
42-
throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
43-
"CreateTopicsRequest");
42+
throw new UnsupportedProtocolFieldException("validateOnly", apiKey().name(), version, 1);
4443

4544
final List<String> topicsWithDefaults = data.topics()
4645
.stream()
@@ -52,10 +51,8 @@ public CreateTopicsRequest build(short version) {
5251
.collect(Collectors.toList());
5352

5453
if (!topicsWithDefaults.isEmpty() && version < 4) {
55-
throw new UnsupportedVersionException("Creating topics with default "
56-
+ "partitions/replication factor are only supported in CreateTopicRequest "
57-
+ "version 4+. The following topics need values for partitions and replicas: "
58-
+ topicsWithDefaults);
54+
throw new UnsupportedProtocolFieldException(String.join(",", topicsWithDefaults),
55+
apiKey().name(), version, 4);
5956
}
6057

6158
return new CreateTopicsRequest(data, version);

clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.kafka.common.acl.AclBindingFilter;
2121
import org.apache.kafka.common.acl.AclOperation;
2222
import org.apache.kafka.common.acl.AclPermissionType;
23-
import org.apache.kafka.common.errors.UnsupportedVersionException;
23+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2424
import org.apache.kafka.common.message.DeleteAclsRequestData;
2525
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
2626
import org.apache.kafka.common.message.DeleteAclsResponseData;
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.common.resource.ResourcePatternFilter;
3232
import org.apache.kafka.common.resource.ResourceType;
3333

34+
import java.util.Arrays;
3435
import java.util.Collections;
3536
import java.util.List;
3637
import java.util.stream.Collectors;
@@ -76,9 +77,13 @@ private void normalizeAndValidate() {
7677
// to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
7778
if (patternType == PatternType.ANY)
7879
filter.setPatternTypeFilter(PatternType.LITERAL.code());
79-
else if (patternType != PatternType.LITERAL)
80-
throw new UnsupportedVersionException("Version 0 does not support pattern type " +
81-
patternType + " (only LITERAL and ANY are supported)");
80+
else if (patternType != PatternType.LITERAL) {
81+
String unsupportedTypes = Arrays.stream(PatternType.values())
82+
.filter(type -> type != PatternType.ANY && type != PatternType.LITERAL)
83+
.map(PatternType::name)
84+
.collect(Collectors.joining(","));
85+
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version(), 1);
86+
}
8287
}
8388
}
8489

clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.kafka.common.acl.AclBindingFilter;
2121
import org.apache.kafka.common.acl.AclOperation;
2222
import org.apache.kafka.common.acl.AclPermissionType;
23-
import org.apache.kafka.common.errors.UnsupportedVersionException;
23+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2424
import org.apache.kafka.common.message.DescribeAclsRequestData;
2525
import org.apache.kafka.common.message.DescribeAclsResponseData;
2626
import org.apache.kafka.common.protocol.ApiKeys;
@@ -29,6 +29,9 @@
2929
import org.apache.kafka.common.resource.ResourcePatternFilter;
3030
import org.apache.kafka.common.resource.ResourceType;
3131

32+
import java.util.Arrays;
33+
import java.util.stream.Collectors;
34+
3235
public class DescribeAclsRequest extends AbstractRequest {
3336

3437
public static class Builder extends AbstractRequest.Builder<DescribeAclsRequest> {
@@ -75,8 +78,13 @@ private void normalizeAndValidate(short version) {
7578
// to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
7679
if (patternType == PatternType.ANY)
7780
data.setPatternTypeFilter(PatternType.LITERAL.code());
78-
else if (patternType != PatternType.LITERAL)
79-
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
81+
else if (patternType != PatternType.LITERAL) {
82+
String unsupportedTypes = Arrays.stream(PatternType.values())
83+
.filter(type -> type != PatternType.LITERAL)
84+
.map(PatternType::name)
85+
.collect(Collectors.joining(","));
86+
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1);
87+
}
8088
}
8189

8290
if (data.patternTypeFilter() == PatternType.UNKNOWN.code()

clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.kafka.common.ElectionType;
2121
import org.apache.kafka.common.TopicPartition;
22-
import org.apache.kafka.common.errors.UnsupportedVersionException;
22+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2323
import org.apache.kafka.common.message.ElectLeadersRequestData;
2424
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
2525
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
@@ -63,7 +63,7 @@ public String toString() {
6363

6464
private ElectLeadersRequestData toRequestData(short version) {
6565
if (electionType != ElectionType.PREFERRED && version == 0) {
66-
throw new UnsupportedVersionException("API Version 0 only supports PREFERRED election type");
66+
throw new UnsupportedProtocolFieldException("ElectionType", apiKey().name(), version, 1);
6767
}
6868

6969
ElectLeadersRequestData data = new ElectLeadersRequestData()

clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.Node;
2020
import org.apache.kafka.common.errors.InvalidRequestException;
21+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2122
import org.apache.kafka.common.errors.UnsupportedVersionException;
2223
import org.apache.kafka.common.message.FindCoordinatorRequestData;
2324
import org.apache.kafka.common.message.FindCoordinatorResponseData;
@@ -43,8 +44,8 @@ public Builder(FindCoordinatorRequestData data) {
4344
@Override
4445
public FindCoordinatorRequest build(short version) {
4546
if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) {
46-
throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
47-
"because we require features supported only in 2 or later.");
47+
throw new UnsupportedProtocolFieldException(CoordinatorType.TRANSACTION.name(),
48+
apiKey().name(), version, 2);
4849
}
4950
int batchedKeys = data.coordinatorKeys().size();
5051
if (version < MIN_BATCHED_VERSION) {

clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19-
import org.apache.kafka.common.errors.UnsupportedVersionException;
19+
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
2020
import org.apache.kafka.common.message.HeartbeatRequestData;
2121
import org.apache.kafka.common.message.HeartbeatResponseData;
2222
import org.apache.kafka.common.protocol.ApiKeys;
@@ -36,8 +36,7 @@ public Builder(HeartbeatRequestData data) {
3636
@Override
3737
public HeartbeatRequest build(short version) {
3838
if (data.groupInstanceId() != null && version < 3) {
39-
throw new UnsupportedVersionException("The broker heartbeat protocol version " +
40-
version + " does not support usage of config group.instance.id.");
39+
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3);
4140
}
4241
return new HeartbeatRequest(data, version);
4342
}

0 commit comments

Comments
 (0)