Skip to content

Commit 916e471

Browse files
committed
Fetch supported produce version from apiVersions.
1 parent 35dba4b commit 916e471

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/ApiVersions.java

+14
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,21 @@ public class ApiVersions {
3434

3535
private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
3636
private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
37+
private short maxProduceSupportedVersion = ApiKeys.PRODUCE.latestVersion();
3738

3839
public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
3940
this.nodeApiVersions.put(nodeId, nodeApiVersions);
4041
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
42+
this.maxProduceSupportedVersion = computeMaxProduceSupportedVersion();
43+
}
44+
45+
private short computeMaxProduceSupportedVersion() {
46+
Optional<Short> knownBrokerNodesMinSupportedVersionForProduce = this.nodeApiVersions.values().stream()
47+
.filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
48+
.map(versions -> versions.latestUsableVersion(ApiKeys.PRODUCE))
49+
.min(Short::compare);
50+
return (short) Math.min(ApiKeys.PRODUCE.latestVersion(),
51+
knownBrokerNodesMinSupportedVersionForProduce.orElse(ApiKeys.PRODUCE.latestVersion()));
4152
}
4253

4354
public synchronized void remove(String nodeId) {
@@ -63,5 +74,8 @@ private byte computeMaxUsableProduceMagic() {
6374
public synchronized byte maxUsableProduceMagic() {
6475
return maxUsableProduceMagic;
6576
}
77+
public synchronized short getMaxSupportedProduceVersion() {
78+
return maxProduceSupportedVersion;
79+
}
6680

6781
}

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.clients.Metadata;
2828
import org.apache.kafka.clients.MetadataSnapshot;
2929
import org.apache.kafka.clients.NetworkClientUtils;
30+
import org.apache.kafka.clients.NodeApiVersions;
3031
import org.apache.kafka.clients.RequestCompletionHandler;
3132
import org.apache.kafka.common.InvalidRecordException;
3233
import org.apache.kafka.common.KafkaException;
@@ -50,6 +51,7 @@
5051
import org.apache.kafka.common.metrics.stats.Avg;
5152
import org.apache.kafka.common.metrics.stats.Max;
5253
import org.apache.kafka.common.metrics.stats.Meter;
54+
import org.apache.kafka.common.protocol.ApiKeys;
5355
import org.apache.kafka.common.protocol.Errors;
5456
import org.apache.kafka.common.record.MemoryRecords;
5557
import org.apache.kafka.common.record.RecordBatch;
@@ -882,7 +884,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
882884
minUsedMagic = batch.magic();
883885
}
884886
Map<String, Uuid> topicIds = getTopicIdsFromBatches(batches);
885-
boolean canUseTopicId = !topicIds.entrySet().stream().anyMatch(e -> e.getValue() == Uuid.ZERO_UUID);
887+
boolean canUseTopicId = apiVersions.getMaxSupportedProduceVersion() >= 12;
886888

887889
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
888890
for (ProducerBatch batch : batches) {
@@ -903,11 +905,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
903905
tpd.find(new ProduceRequestData.TopicProduceData().setName(tp.topic()));
904906

905907
if (tpData == null) {
906-
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
908+
tpData = new ProduceRequestData.TopicProduceData();
907909
tpd.add(tpData);
908910
}
909911
if (canUseTopicId) {
910912
tpData.setTopicId(topicIds.get(tp.topic()));
913+
} else {
914+
tpData.setName(tp.topic());
911915
}
912916

913917
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()

Diff for: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public class SenderTest {
175175
@BeforeEach
176176
public void setup() {
177177
setupWithTransactionState(null);
178+
apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12));
178179
}
179180

180181
@AfterEach
@@ -582,8 +583,11 @@ public void testNodeLatencyStats() throws Exception {
582583
new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics"));
583584

584585
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
586+
ApiVersions apiVersions1 = new ApiVersions();
587+
apiVersions1.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12));
588+
585589
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1,
586-
senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, new ApiVersions());
590+
senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, apiVersions1);
587591

588592
// Produce and send batch.
589593
long time1 = time.milliseconds();
@@ -2407,6 +2411,7 @@ public void testTransactionalSplitBatchAndSend() throws Exception {
24072411

24082412
txnManager.beginTransaction();
24092413
txnManager.maybeAddPartition(tp);
2414+
apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12));
24102415
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp, Errors.NONE)));
24112416
sender.runOnce();
24122417

@@ -2424,16 +2429,21 @@ private void testSplitBatchAndSend(TransactionManager txnManager,
24242429
String metricGrpName = "producer-metrics";
24252430
// Set a good compression ratio.
24262431
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
2432+
ApiVersions apiVersions = new ApiVersions();
24272433
try (Metrics m = new Metrics()) {
24282434
accumulator = new RecordAccumulator(logContext, batchSize, Compression.gzip().build(),
2429-
0, 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
2435+
0, 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, apiVersions, txnManager,
24302436
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
24312437
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
24322438
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
2433-
senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
2439+
senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, apiVersions);
24342440
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
24352441
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
24362442
client.prepareMetadataUpdate(metadataUpdate1);
2443+
metadataUpdate1.brokers().stream().forEach(node ->
2444+
apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12))
2445+
);
2446+
24372447
// Send the first message.
24382448
long nowMs = time.milliseconds();
24392449
Cluster cluster = TestUtils.singletonCluster();
@@ -3426,6 +3436,9 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader
34263436
}
34273437
}));
34283438
Cluster startingMetadataCluster = metadata.fetch();
3439+
startingMetadataCluster.nodes().forEach(node ->
3440+
apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12))
3441+
);
34293442

34303443
// Produce to tp0/1/2, where NO_LEADER_OR_FOLLOWER with new leader info is returned for tp0/1, and tp2 is returned without errors.
34313444
Future<RecordMetadata> futureIsProducedTp0 = appendToAccumulator(tp0, 0L, "key", "value");
@@ -3446,7 +3459,9 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader
34463459
responses.put(tp0, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
34473460
responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
34483461
responses.put(tp2, new OffsetAndError(100, Errors.NONE));
3449-
3462+
newNodes.forEach(node ->
3463+
apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12))
3464+
);
34503465
Map<TopicPartition, ProduceResponseData.LeaderIdAndEpoch> partitionLeaderInfo = new HashMap<>();
34513466
ProduceResponseData.LeaderIdAndEpoch tp0LeaderInfo = new ProduceResponseData.LeaderIdAndEpoch();
34523467
tp0LeaderInfo.setLeaderEpoch(tp0LeaderEpoch + 1);

0 commit comments

Comments
 (0)