-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-10551: Add topic id support to produce request and response #15968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
d3acdf0
ddeac1b
a92abe6
afede12
a8f0c91
8c3602b
cba0a2d
27ed97b
35dba4b
916e471
c59b10c
65d4a41
646839d
29adbd3
7ed70b1
fef8fd6
1c12e48
d388d8a
a2e858c
30f0aa0
1e87719
d1b026c
c97212f
dc72f92
8b69253
c3d2c24
776375f
f7f0a04
654a967
315b6a3
ce85e4f
048f6ff
67e7bd2
270ab27
4166eb0
bed9c8e
f8830b1
d90580e
33ba2b0
92bad70
f662cf7
c57cc63
239e321
44da5ba
410fd1f
4114067
ba30756
6e281fd
900c460
b091e28
9b6a892
0ba7431
628371a
6cd5d4e
2f37078
aa811f1
f158714
cb66c85
6932bd1
54ecdf5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
package org.apache.kafka.common.requests; | ||
|
||
import org.apache.kafka.common.InvalidRecordException; | ||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.kafka.common.TopicIdPartition; | ||
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; | ||
import org.apache.kafka.common.message.ProduceRequestData; | ||
import org.apache.kafka.common.message.ProduceResponseData; | ||
|
@@ -96,7 +96,7 @@ public String toString() { | |
// Care should be taken in methods that use this field. | ||
private volatile ProduceRequestData data; | ||
// the partitionSizes is lazily initialized since it is used by server-side in production. | ||
private volatile Map<TopicPartition, Integer> partitionSizes; | ||
private volatile Map<TopicIdPartition, Integer> partitionSizes; | ||
|
||
public ProduceRequest(ProduceRequestData produceRequestData, short version) { | ||
super(ApiKeys.PRODUCE, version); | ||
|
@@ -107,15 +107,20 @@ public ProduceRequest(ProduceRequestData produceRequestData, short version) { | |
} | ||
|
||
// visible for testing | ||
Map<TopicPartition, Integer> partitionSizes() { | ||
Map<TopicIdPartition, Integer> partitionSizes() { | ||
if (partitionSizes == null) { | ||
// this method may be called by different thread (see the comment on data) | ||
synchronized (this) { | ||
if (partitionSizes == null) { | ||
Map<TopicPartition, Integer> tmpPartitionSizes = new HashMap<>(); | ||
Map<TopicIdPartition, Integer> tmpPartitionSizes = new HashMap<>(); | ||
data.topicData().forEach(topicData -> | ||
topicData.partitionData().forEach(partitionData -> | ||
tmpPartitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()), | ||
// While topic id and name might not be populated at the same time in the request all the time; | ||
// for example on server side they will never be populated together while in produce client they will be, | ||
// to simplify initializing `TopicIdPartition` the code will use both topic name and id. | ||
// TopicId will be Uuid.ZERO_UUID in versions < 13 and topic name will be used as main identifier of topic partition. | ||
// TopicName will be empty string in versions >= 13 and topic id will be used as the main identifier. | ||
tmpPartitionSizes.compute(new TopicIdPartition(topicData.topicId(), partitionData.index(), topicData.name()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will we ever need the name and ID for this data structure? I know fetch had something where we pass in a map to convert IDs to names if needed. Just want to make sure folks won't use this info expecting the name to be there. If we don't think it is needed, maybe just include a comment about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the context of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok -- it is fine if we don't need the name. But it might be good to leave a comment or something that says the name will not be present for older versions. |
||
(ignored, previousValue) -> | ||
partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue)) | ||
) | ||
|
@@ -161,14 +166,14 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { | |
if (acks == 0) return null; | ||
ApiError apiError = ApiError.fromThrowable(e); | ||
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs); | ||
partitionSizes().forEach((tp, ignored) -> { | ||
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic()); | ||
partitionSizes().forEach((tpId, ignored) -> { | ||
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tpId.topic(), tpId.topicId()); | ||
if (tpr == null) { | ||
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()); | ||
tpr = new ProduceResponseData.TopicProduceResponse().setName(tpId.topic()).setTopicId(tpId.topicId()); | ||
data.responses().add(tpr); | ||
} | ||
tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse() | ||
.setIndex(tp.partition()) | ||
.setIndex(tpId.partition()) | ||
.setRecordErrors(Collections.emptyList()) | ||
.setBaseOffset(INVALID_OFFSET) | ||
.setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this doesn't work for responses of V11 or below, right? Perhaps we should check topicId is not zero before using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add validation to check before using topic id however I didn't do this here as
metadata.topicNames()
doesn't have what matchtopicId
if it is zero thentopicName
will fall back to default which is topicname
provided in the request.