Skip to content

KAFKA-10551: Add topic id support to produce request and response #15968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 60 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
d3acdf0
KAFKA-10551: Add topic id support to produce request and response
OmniaGM May 7, 2024
ddeac1b
KAFKA-10551: fix compatibility with 2.6 IBP
OmniaGM May 9, 2024
a92abe6
KAFKA-10551: refactor
OmniaGM May 14, 2024
afede12
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM May 14, 2024
a8f0c91
fix test
OmniaGM May 14, 2024
8c3602b
fix integration tests
OmniaGM May 16, 2024
cba0a2d
Merge branch 'trunk' into KAFKA-10551-produceRequest
OmniaGM May 22, 2024
27ed97b
address part of the feedback
OmniaGM May 22, 2024
35dba4b
Add testing for while recreate the topic and reassignment
OmniaGM May 28, 2024
916e471
Fetch supported produce version from apiVersions.
OmniaGM Jun 11, 2024
c59b10c
Fetch supported produce version from apiVersions.
OmniaGM Jun 11, 2024
65d4a41
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Jun 12, 2024
646839d
fix
OmniaGM Jun 12, 2024
29adbd3
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Jun 18, 2024
7ed70b1
address feedback
OmniaGM Jun 18, 2024
fef8fd6
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Jun 26, 2024
1c12e48
add comment
OmniaGM Jun 26, 2024
d388d8a
fix import
OmniaGM Jun 28, 2024
a2e858c
apply spotless
OmniaGM Jun 28, 2024
30f0aa0
fix tests
OmniaGM Jul 1, 2024
1e87719
Merge remote-tracking branch 'apache' into KAFKA-10551-produceRequest
OmniaGM Jul 3, 2024
d1b026c
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Sep 17, 2024
c97212f
Address some of the feedback
OmniaGM Sep 30, 2024
dc72f92
fix filtering `entriesWithoutErrorsPerPartition` based on topicIdPart…
OmniaGM Sep 30, 2024
8b69253
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Sep 30, 2024
c3d2c24
remove maxSupportedProduceVersion
OmniaGM Oct 10, 2024
776375f
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Oct 14, 2024
f7f0a04
update test
OmniaGM Oct 15, 2024
654a967
update FetchResponseStats to use TopicIdPartition
OmniaGM Oct 15, 2024
315b6a3
address some of the feedback
OmniaGM Oct 15, 2024
ce85e4f
address feedback
OmniaGM Oct 15, 2024
048f6ff
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Oct 29, 2024
67e7bd2
feedback
OmniaGM Oct 29, 2024
270ab27
fix tests
OmniaGM Nov 6, 2024
4166eb0
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Nov 6, 2024
bed9c8e
add partitionOrException(topicIdPartition)
OmniaGM Nov 6, 2024
f8830b1
cleanup
OmniaGM Nov 6, 2024
d90580e
cleanup
OmniaGM Nov 6, 2024
33ba2b0
handle topicid zero or null when get partition
OmniaGM Nov 6, 2024
92bad70
refactor
OmniaGM Nov 6, 2024
f662cf7
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Nov 7, 2024
c57cc63
fix test
OmniaGM Nov 7, 2024
239e321
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Nov 11, 2024
44da5ba
address some of the feedback
OmniaGM Dec 3, 2024
410fd1f
add doc
OmniaGM Dec 3, 2024
4114067
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Dec 3, 2024
ba30756
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Dec 13, 2024
6e281fd
upgrade version to 13
OmniaGM Dec 13, 2024
900c460
addressing some of @junrao feedback
OmniaGM Dec 13, 2024
b091e28
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 2, 2025
9b6a892
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 2, 2025
0ba7431
fix merge
OmniaGM Apr 2, 2025
628371a
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 3, 2025
6cd5d4e
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 4, 2025
2f37078
address feedback
OmniaGM Apr 9, 2025
aa811f1
address feedback
OmniaGM Apr 16, 2025
f158714
Merge remote-tracking branch 'apache/trunk' into KAFKA-10551-produceR…
OmniaGM Apr 16, 2025
cb66c85
fix test
OmniaGM Apr 16, 2025
6932bd1
address feedback
OmniaGM Apr 16, 2025
54ecdf5
address feedback
OmniaGM Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
Expand Down Expand Up @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
// This will be set by completeBatch.
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
TopicPartition tp = new TopicPartition(r.name(), p.index());
// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we do if metadata has refreshed and is no longer in the metadata?
For fetch it is a bit different since we have the session logic, and can handle missing topics.

I would recommend writing through a few cases where the server and client have/don't have the topic ID to reason about the upgrade case/downgrade case/deletions/reassignments.

Copy link
Contributor Author

@OmniaGM OmniaGM May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If topic has been recreated and topic id is out of date, the client will get UNKNOWN_TOPIC_ID and on the retry the topic id will be updated
  • If topic has been reassigned to another broker then the client will get NOT_LEADER_OR_FOLLOWER and then the client can retry with the right broker.
  • Am not sure what upgrade case/downgrade you refer too here Do you mean the client and broker IBP combination? If yes then some of these are covered in ProduceRequestTest and RequestResponseTest

I added two test cases to cover the first two and the producer seem to self recover on retry.

Copy link
Member

@jolshan jolshan May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, topics will not have IDs. So when we decide which version of produce we want to send, we want to be aware of this.

Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility.

We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade.

Copy link
Contributor Author

@OmniaGM OmniaGM Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jolshan sorry for taking too long to get back to you on this. I have added ApiVersions::maxSupportedProduceVersion to determine what the supported version supported on the cluster. Regarding the tests for these combination of old/new producer with old/new broker are all covered in the system tests tests/kafkatest/tests/core/compatibility_test_new_broker_test.py and tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py. I am trying to add another one to test the same while deleting topic and leadership changes but not sure if we really need to cover this or not as these are covered in the integration tests. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. I've also been super busy so not sure if I would have had time to review.

As long as the tests are between an older version and a newer version (ie not upgrade tests where the issue can just seem transient) then this should be ok.

Whenever I implemented the fetch change, I also did some sanity checks by looking at the logs when upgrading to see the change in request version without errors. I'd recommend doing this at least once as a manual test.

Thanks for the change and I will take a look.

TopicPartition tp = new TopicPartition(topicName, p.index());
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
Expand Down Expand Up @@ -877,6 +880,9 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
Map<String, Uuid> topicIds = getTopicIdsFromBatches(batches);
boolean canUseTopicId = !topicIds.entrySet().stream().anyMatch(e -> e.getValue() == Uuid.ZERO_UUID);

ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
Expand All @@ -891,11 +897,15 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
Optional<ProduceRequestData.TopicProduceData> topicProduceData = canUseTopicId ?
Optional.ofNullable(tpd.find(tp.topic(), topicIds.get(tp.topic()))) :
tpd.stream().filter(data -> data.name().equals(tp.topic())).findFirst();

ProduceRequestData.TopicProduceData tpData = topicProduceData.orElse(new ProduceRequestData.TopicProduceData().setName(tp.topic()));
if (canUseTopicId) {
tpData.setTopicId(topicIds.get(tp.topic()));
}
tpd.add(tpData);
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
Expand All @@ -922,6 +932,15 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

private Map<String, Uuid> getTopicIdsFromBatches(List<ProducerBatch> batches) {
return batches.stream()
.collect(Collectors.toMap(
b -> b.topicPartition.topic(),
b -> metadata.topicIds().getOrDefault(b.topicPartition.topic(), Uuid.ZERO_UUID),
(existing, replacement) -> replacement)
);
}

/**
* Wake up the selector associated with this send thread
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
Expand Down Expand Up @@ -51,13 +52,20 @@ public static Builder forMagic(byte magic, ProduceRequestData data) {
if (magic < RecordBatch.MAGIC_VALUE_V2) {
minVersion = 2;
maxVersion = 2;
} else if (canNotSupportTopicId(data)) {
minVersion = 3;
maxVersion = 11;
} else {
minVersion = 3;
maxVersion = ApiKeys.PRODUCE.latestVersion();
}
return new Builder(minVersion, maxVersion, data);
}

private static boolean canNotSupportTopicId(ProduceRequestData data) {
return data.topicData().stream().anyMatch(d -> d.topicId() == null || d.topicId() == Uuid.ZERO_UUID);
}

public static Builder forCurrentMagic(ProduceRequestData data) {
return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data);
}
Expand Down Expand Up @@ -117,7 +125,7 @@ public String toString() {
// Care should be taken in methods that use this field.
private volatile ProduceRequestData data;
// the partitionSizes is lazily initialized since it is used by server-side in production.
private volatile Map<TopicPartition, Integer> partitionSizes;
private volatile Map<TopicIdPartition, Integer> partitionSizes;

public ProduceRequest(ProduceRequestData produceRequestData, short version) {
super(ApiKeys.PRODUCE, version);
Expand All @@ -128,15 +136,15 @@ public ProduceRequest(ProduceRequestData produceRequestData, short version) {
}

// visible for testing
Map<TopicPartition, Integer> partitionSizes() {
Map<TopicIdPartition, Integer> partitionSizes() {
if (partitionSizes == null) {
// this method may be called by different thread (see the comment on data)
synchronized (this) {
if (partitionSizes == null) {
Map<TopicPartition, Integer> tmpPartitionSizes = new HashMap<>();
Map<TopicIdPartition, Integer> tmpPartitionSizes = new HashMap<>();
data.topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData ->
tmpPartitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
tmpPartitionSizes.compute(new TopicIdPartition(topicData.topicId(), partitionData.index(), topicData.name()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we ever need the name and ID for this data structure? I know fetch had something where we pass in a map to convert IDs to names if needed.

Just want to make sure folks won't use this info expecting the name to be there. If we don't think it is needed, maybe just include a comment about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the context of partitionSizes method we don't need topic name however,

  1. I didn't want to have if-else around this to decide if I will populate topic name as this logic will handle both old and new version
  2. TopicIdPartition contractor need either a topic name to construct TopicPartition automatically or pass a constructed TopicPartition which need topic name as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- it is fine if we don't need the name. But it might be good to leave a comment or something that says the name will not be present for older versions.

(ignored, previousValue) ->
partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue))
)
Expand Down Expand Up @@ -183,9 +191,9 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
partitionSizes().forEach((tp, ignored) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic());
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic(), tp.topicId());
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic());
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()).setTopicId(tp.topicId());
data.responses().add(tpr);
}
tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.ProduceResponseData.LeaderIdAndEpoch;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -33,6 +33,7 @@
import java.util.Objects;
import java.util.stream.Collectors;


/**
* This wrapper supports both v0 and v8 of ProduceResponse.
*
Expand Down Expand Up @@ -72,7 +73,7 @@ public ProduceResponse(ProduceResponseData produceResponseData) {
* @param responses Produced data grouped by topic-partition
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses) {
this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
}

Expand All @@ -83,7 +84,7 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
* @param throttleTimeMs Time in milliseconds the response was throttled
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) {
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs) {
this(toData(responses, throttleTimeMs, Collections.emptyList()));
}

Expand All @@ -96,16 +97,16 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int thr
* @param nodeEndpoints List of node endpoints
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
this(toData(responses, throttleTimeMs, nodeEndpoints));
}

private static ProduceResponseData toData(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
private static ProduceResponseData toData(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
responses.forEach((tp, response) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic());
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic(), tp.topicId());
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic());
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()).setTopicId(tp.topicId());
data.responses().add(tpr);
}
tpr.partitionResponses()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,15 @@ public static void require(boolean requirement) {
throw new IllegalArgumentException("requirement failed");
}

/**
* Convert a map's keys to another type.
*/
public static <K1, K2, V> Map<K2, V> convertKeys(Map<K1, V> originalMap, Function<K1, K2> converter) {
Map<K2, V> newMap = new HashMap<>();
originalMap.forEach((key, value) -> newMap.put(converter.apply(key), value));
return newMap;
}

/**
* A runnable that can throw checked exception.
*/
Expand Down
6 changes: 4 additions & 2 deletions clients/src/main/resources/common/message/ProduceRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
// Version 10 is the same as version 9 (KIP-951).
//
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-11",
// Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "0-12",
"deprecatedVersions": "0-6",
"flexibleVersions": "9+",
"fields": [
Expand All @@ -49,8 +50,9 @@
"about": "The timeout to await a response in milliseconds." },
{ "name": "TopicData", "type": "[]TopicProduceData", "versions": "0+",
"about": "Each topic to produce to.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
{ "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "12+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
{ "name": "PartitionData", "type": "[]PartitionProduceData", "versions": "0+",
"about": "Each partition to produce to.", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
//
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-11",
// Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
"about": "Each produce response", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
{ "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name" },
{ "name": "TopicId", "type": "uuid", "versions": "12+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
{ "name": "PartitionResponses", "type": "[]PartitionProduceResponse", "versions": "0+",
"about": "Each partition that we produced to within the topic.", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
Expand Down
Loading