Skip to content

KAFKA-10551: Add topic id support to produce request and response #15968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 60 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
d3acdf0
KAFKA-10551: Add topic id support to produce request and response
OmniaGM May 7, 2024
ddeac1b
KAFKA-10551: fix compatibility with 2.6 IBP
OmniaGM May 9, 2024
a92abe6
KAFKA-10551: refactor
OmniaGM May 14, 2024
afede12
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM May 14, 2024
a8f0c91
fix test
OmniaGM May 14, 2024
8c3602b
fix integration tests
OmniaGM May 16, 2024
cba0a2d
Merge branch 'trunk' into KAFKA-10551-produceRequest
OmniaGM May 22, 2024
27ed97b
address part of the feedback
OmniaGM May 22, 2024
35dba4b
Add testing for while recreate the topic and reassignment
OmniaGM May 28, 2024
916e471
Fetch supported produce version from apiVersions.
OmniaGM Jun 11, 2024
c59b10c
Fetch supported produce version from apiVersions.
OmniaGM Jun 11, 2024
65d4a41
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Jun 12, 2024
646839d
fix
OmniaGM Jun 12, 2024
29adbd3
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Jun 18, 2024
7ed70b1
address feedback
OmniaGM Jun 18, 2024
fef8fd6
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Jun 26, 2024
1c12e48
add comment
OmniaGM Jun 26, 2024
d388d8a
fix import
OmniaGM Jun 28, 2024
a2e858c
apply spotless
OmniaGM Jun 28, 2024
30f0aa0
fix tests
OmniaGM Jul 1, 2024
1e87719
Merge remote-tracking branch 'apache' into KAFKA-10551-produceRequest
OmniaGM Jul 3, 2024
d1b026c
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Sep 17, 2024
c97212f
Address some of the feedback
OmniaGM Sep 30, 2024
dc72f92
fix filtering `entriesWithoutErrorsPerPartition` based on topicIdPart…
OmniaGM Sep 30, 2024
8b69253
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Sep 30, 2024
c3d2c24
remove maxSupportedProduceVersion
OmniaGM Oct 10, 2024
776375f
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Oct 14, 2024
f7f0a04
update test
OmniaGM Oct 15, 2024
654a967
update FetchResponseStats to use TopicIdPartition
OmniaGM Oct 15, 2024
315b6a3
address some of the feedback
OmniaGM Oct 15, 2024
ce85e4f
address feedback
OmniaGM Oct 15, 2024
048f6ff
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Oct 29, 2024
67e7bd2
feedback
OmniaGM Oct 29, 2024
270ab27
fix tests
OmniaGM Nov 6, 2024
4166eb0
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Nov 6, 2024
bed9c8e
add partitionOrException(topicIdPartition)
OmniaGM Nov 6, 2024
f8830b1
cleanup
OmniaGM Nov 6, 2024
d90580e
cleanup
OmniaGM Nov 6, 2024
33ba2b0
handle topicid zero or null when get partition
OmniaGM Nov 6, 2024
92bad70
refactor
OmniaGM Nov 6, 2024
f662cf7
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Nov 7, 2024
c57cc63
fix test
OmniaGM Nov 7, 2024
239e321
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Nov 11, 2024
44da5ba
address some of the feedback
OmniaGM Dec 3, 2024
410fd1f
add doc
OmniaGM Dec 3, 2024
4114067
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Dec 3, 2024
ba30756
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Dec 13, 2024
6e281fd
upgrade version to 13
OmniaGM Dec 13, 2024
900c460
addressing some of @junrao feedback
OmniaGM Dec 13, 2024
b091e28
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 2, 2025
9b6a892
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 2, 2025
0ba7431
fix merge
OmniaGM Apr 2, 2025
628371a
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 3, 2025
6cd5d4e
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 4, 2025
2f37078
address feedback
OmniaGM Apr 9, 2025
aa811f1
address feedback
OmniaGM Apr 16, 2025
f158714
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 16, 2025
cb66c85
fix test
OmniaGM Apr 16, 2025
6932bd1
address feedback
OmniaGM Apr 16, 2025
54ecdf5
address feedback
OmniaGM Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
Expand Down Expand Up @@ -604,7 +605,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
// This will be set by completeBatch.
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
TopicPartition tp = new TopicPartition(r.name(), p.index());
// Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this doesn't work for responses of V11 or below, right? Perhaps we should check topicId is not zero before using it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add validation to check before using topic id however I didn't do this here as

  • metadata has topicIds for long time now
  • If metadata.topicNames() doesn't have what match topicId if it is zero then topicName will fall back to default which is topic name provided in the request.

TopicPartition tp = new TopicPartition(topicName, p.index());
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
Expand Down Expand Up @@ -864,15 +867,21 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
return;

final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
Map<String, Uuid> topicIds = topicIdsForBatches(batches);

ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
Uuid topicId = topicIds.get(tp.topic());
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic(), topicId);

if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpData = new ProduceRequestData.TopicProduceData()
.setTopicId(topicId).setName(tp.topic());
tpd.add(tpData);
}

tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
Expand Down Expand Up @@ -903,6 +912,15 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

private Map<String, Uuid> topicIdsForBatches(List<ProducerBatch> batches) {
return batches.stream()
.collect(Collectors.toMap(
b -> b.topicPartition.topic(),
b -> metadata.topicIds().getOrDefault(b.topicPartition.topic(), Uuid.ZERO_UUID),
(existing, replacement) -> replacement)
);
}

/**
* Wake up the selector associated with this send thread
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
Expand Down Expand Up @@ -96,7 +96,7 @@ public String toString() {
// Care should be taken in methods that use this field.
private volatile ProduceRequestData data;
// the partitionSizes is lazily initialized since it is used by server-side in production.
private volatile Map<TopicPartition, Integer> partitionSizes;
private volatile Map<TopicIdPartition, Integer> partitionSizes;

public ProduceRequest(ProduceRequestData produceRequestData, short version) {
super(ApiKeys.PRODUCE, version);
Expand All @@ -107,15 +107,20 @@ public ProduceRequest(ProduceRequestData produceRequestData, short version) {
}

// visible for testing
Map<TopicPartition, Integer> partitionSizes() {
Map<TopicIdPartition, Integer> partitionSizes() {
if (partitionSizes == null) {
// this method may be called by different thread (see the comment on data)
synchronized (this) {
if (partitionSizes == null) {
Map<TopicPartition, Integer> tmpPartitionSizes = new HashMap<>();
Map<TopicIdPartition, Integer> tmpPartitionSizes = new HashMap<>();
data.topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData ->
tmpPartitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
// While topic id and name might not be populated at the same time in the request all the time;
// for example on server side they will never be populated together while in produce client they will be,
// to simplify initializing `TopicIdPartition` the code will use both topic name and id.
// TopicId will be Uuid.ZERO_UUID in versions < 13 and topic name will be used as main identifier of topic partition.
// TopicName will be empty string in versions >= 13 and topic id will be used as the main identifier.
tmpPartitionSizes.compute(new TopicIdPartition(topicData.topicId(), partitionData.index(), topicData.name()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we ever need the name and ID for this data structure? I know fetch had something where we pass in a map to convert IDs to names if needed.

Just want to make sure folks won't use this info expecting the name to be there. If we don't think it is needed, maybe just include a comment about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the context of partitionSizes method we don't need topic name however,

  1. I didn't want to have if-else around this to decide if I will populate topic name as this logic will handle both old and new version
  2. TopicIdPartition contractor need either a topic name to construct TopicPartition automatically or pass a constructed TopicPartition which need topic name as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- it is fine if we don't need the name. But it might be good to leave a comment or something that says the name will not be present for older versions.

(ignored, previousValue) ->
partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue))
)
Expand Down Expand Up @@ -161,14 +166,14 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
if (acks == 0) return null;
ApiError apiError = ApiError.fromThrowable(e);
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
partitionSizes().forEach((tp, ignored) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic());
partitionSizes().forEach((tpId, ignored) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tpId.topic(), tpId.topicId());
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic());
tpr = new ProduceResponseData.TopicProduceResponse().setName(tpId.topic()).setTopicId(tpId.topicId());
data.responses().add(tpr);
}
tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
.setIndex(tp.partition())
.setIndex(tpId.partition())
.setRecordErrors(Collections.emptyList())
.setBaseOffset(INVALID_OFFSET)
.setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.ProduceResponseData.LeaderIdAndEpoch;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -56,6 +56,7 @@
* {@link Errors#INVALID_TXN_STATE}
* {@link Errors#INVALID_PRODUCER_ID_MAPPING}
* {@link Errors#CONCURRENT_TRANSACTIONS}
* {@link Errors#UNKNOWN_TOPIC_ID}
*/
public class ProduceResponse extends AbstractResponse {
public static final long INVALID_OFFSET = -1L;
Expand All @@ -73,7 +74,7 @@ public ProduceResponse(ProduceResponseData produceResponseData) {
* @param responses Produced data grouped by topic-partition
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses) {
this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
}

Expand All @@ -84,7 +85,7 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
* @param throttleTimeMs Time in milliseconds the response was throttled
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) {
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs) {
this(toData(responses, throttleTimeMs, Collections.emptyList()));
}

Expand All @@ -97,16 +98,16 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int thr
* @param nodeEndpoints List of node endpoints
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
this(toData(responses, throttleTimeMs, nodeEndpoints));
}

private static ProduceResponseData toData(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
private static ProduceResponseData toData(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
responses.forEach((tp, response) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic());
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic(), tp.topicId());
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic());
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()).setTopicId(tp.topicId());
data.responses().add(tpr);
}
tpr.partitionResponses()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,7 @@ public static ConfigDef mergeConfigs(List<ConfigDef> configDefs) {
configDefs.forEach(configDef -> configDef.configKeys().values().forEach(all::define));
return all;
}

/**
* A runnable that can throw checked exception.
*/
Expand Down
6 changes: 4 additions & 2 deletions clients/src/main/resources/common/message/ProduceRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
// transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a
// AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within
// a transaction.
"validVersions": "3-12",
// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "3-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
Expand All @@ -57,8 +58,9 @@
"about": "The timeout to await a response in milliseconds." },
{ "name": "TopicData", "type": "[]TopicProduceData", "versions": "0+",
"about": "Each topic to produce to.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
{ "name": "Name", "type": "string", "versions": "0-12", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "13+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
{ "name": "PartitionData", "type": "[]PartitionProduceData", "versions": "0+",
"about": "Each partition to produce to.", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 12 is the same as version 10 (KIP-890).
"validVersions": "3-12",
// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "3-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
"about": "Each produce response.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
{ "name": "Name", "type": "string", "versions": "0-12", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "13+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
{ "name": "PartitionResponses", "type": "[]PartitionProduceResponse", "versions": "0+",
"about": "Each partition that we produced to within the topic.", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -1424,7 +1425,7 @@ public void testTransactionV2ProduceWithConcurrentTransactionError() throws Exce
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);

String topic = "foo";
TopicPartition topicPartition = new TopicPartition(topic, 0);
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, topic);
Cluster cluster = TestUtils.singletonCluster(topic, 1);

when(ctx.sender.isRunning()).thenReturn(true);
Expand Down Expand Up @@ -1463,8 +1464,8 @@ public void testTransactionV2ProduceWithConcurrentTransactionError() throws Exce

client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
client.prepareResponse(produceResponse(topicPartition, 1L, Errors.CONCURRENT_TRANSACTIONS, 0, 1));
client.prepareResponse(produceResponse(topicPartition, 1L, Errors.NONE, 0, 1));
client.prepareResponse(produceResponse(topicIdPartition, 1L, Errors.CONCURRENT_TRANSACTIONS, 0, 1));
client.prepareResponse(produceResponse(topicIdPartition, 1L, Errors.NONE, 0, 1));
client.prepareResponse(endTxnResponse(Errors.NONE));

try (KafkaProducer<String, String> producer = new KafkaProducer<>(
Expand Down Expand Up @@ -1750,6 +1751,7 @@ public void testTransactionV2Produce() throws Exception {
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);

String topic = "foo";
Uuid topicId = Uuid.fromString("klZ9sa2rSvig6QpgGXzALT");
TopicPartition topicPartition = new TopicPartition(topic, 0);
Cluster cluster = TestUtils.singletonCluster(topic, 1);

Expand Down Expand Up @@ -1789,7 +1791,7 @@ public void testTransactionV2Produce() throws Exception {

client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
client.prepareResponse(produceResponse(topicPartition, 1L, Errors.NONE, 0, 1));
client.prepareResponse(produceResponse(new TopicIdPartition(topicId, topicPartition), 1L, Errors.NONE, 0, 1));
client.prepareResponse(endTxnResponse(Errors.NONE));

try (KafkaProducer<String, String> producer = new KafkaProducer<>(
Expand Down Expand Up @@ -2649,9 +2651,9 @@ void testDeliveryTimeoutAndLingerMsConfig() {
}

@SuppressWarnings("deprecation")
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, int logStartOffset) {
private ProduceResponse produceResponse(TopicIdPartition topicIdPartition, long offset, Errors error, int throttleTimeMs, int logStartOffset) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp);
Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(topicIdPartition, resp);
return new ProduceResponse(partResp, throttleTimeMs);
}

Expand Down
Loading
Loading