Skip to content

Commit c59b10c

Browse files
committed
Fetch supported produce version from apiVersions.
1 parent 916e471 commit c59b10c

File tree

2 files changed

+7
-8
lines changed

2 files changed

+7
-8
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/ApiVersions.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ public class ApiVersions {
3434

3535
private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
3636
private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
37-
private short maxProduceSupportedVersion = ApiKeys.PRODUCE.latestVersion();
37+
private short maxSupportedProduceVersion = ApiKeys.PRODUCE.latestVersion();
3838

3939
public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
4040
this.nodeApiVersions.put(nodeId, nodeApiVersions);
4141
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
42-
this.maxProduceSupportedVersion = computeMaxProduceSupportedVersion();
42+
this.maxSupportedProduceVersion = computeMaxSupportedProduceVersion();
4343
}
4444

45-
private short computeMaxProduceSupportedVersion() {
45+
private short computeMaxSupportedProduceVersion() {
4646
Optional<Short> knownBrokerNodesMinSupportedVersionForProduce = this.nodeApiVersions.values().stream()
4747
.filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
4848
.map(versions -> versions.latestUsableVersion(ApiKeys.PRODUCE))
@@ -54,6 +54,7 @@ private short computeMaxProduceSupportedVersion() {
5454
public synchronized void remove(String nodeId) {
5555
this.nodeApiVersions.remove(nodeId);
5656
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
57+
this.maxSupportedProduceVersion = computeMaxSupportedProduceVersion();
5758
}
5859

5960
public synchronized NodeApiVersions get(String nodeId) {
@@ -74,8 +75,8 @@ private byte computeMaxUsableProduceMagic() {
7475
public synchronized byte maxUsableProduceMagic() {
7576
return maxUsableProduceMagic;
7677
}
77-
public synchronized short getMaxSupportedProduceVersion() {
78-
return maxProduceSupportedVersion;
78+
public synchronized short maxSupportedProduceVersion() {
79+
return maxSupportedProduceVersion;
7980
}
8081

8182
}

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.clients.Metadata;
2828
import org.apache.kafka.clients.MetadataSnapshot;
2929
import org.apache.kafka.clients.NetworkClientUtils;
30-
import org.apache.kafka.clients.NodeApiVersions;
3130
import org.apache.kafka.clients.RequestCompletionHandler;
3231
import org.apache.kafka.common.InvalidRecordException;
3332
import org.apache.kafka.common.KafkaException;
@@ -51,7 +50,6 @@
5150
import org.apache.kafka.common.metrics.stats.Avg;
5251
import org.apache.kafka.common.metrics.stats.Max;
5352
import org.apache.kafka.common.metrics.stats.Meter;
54-
import org.apache.kafka.common.protocol.ApiKeys;
5553
import org.apache.kafka.common.protocol.Errors;
5654
import org.apache.kafka.common.record.MemoryRecords;
5755
import org.apache.kafka.common.record.RecordBatch;
@@ -884,7 +882,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
884882
minUsedMagic = batch.magic();
885883
}
886884
Map<String, Uuid> topicIds = getTopicIdsFromBatches(batches);
887-
boolean canUseTopicId = apiVersions.getMaxSupportedProduceVersion() >= 12;
885+
boolean canUseTopicId = apiVersions.maxSupportedProduceVersion() >= 12;
888886

889887
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
890888
for (ProducerBatch batch : batches) {

0 commit comments

Comments
 (0)