-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-10551: Add topic id support to produce request and response #15968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
1805c4f
to
a8f0c91
Compare
Few of the failed tests are related to this change and am working on fixing them |
63a6032
to
8c3602b
Compare
I believe that failed tests now are unrelated |
My understanding is that all requests going forward will use ID and not name similar to fetch request. I believe that is what is in the PR, but the comment suggests otherwise. |
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, | |||
// This will be set by completeBatch. | |||
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); | |||
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { | |||
TopicPartition tp = new TopicPartition(r.name(), p.index()); | |||
// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. | |||
String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we do if metadata has refreshed and is no longer in the metadata?
For fetch it is a bit different since we have the session logic, and can handle missing topics.
I would recommend writing through a few cases where the server and client have/don't have the topic ID to reason about the upgrade case/downgrade case/deletions/reassignments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If topic has been recreated and topic id is out of date, the client will get
UNKNOWN_TOPIC_ID
and on the retry the topic id will be updated - If topic has been reassigned to another broker then the client will get
NOT_LEADER_OR_FOLLOWER
and then the client can retry with the right broker. - Am not sure what
upgrade case/downgrade
you refer too here Do you mean the client and broker IBP combination? If yes then some of these are covered inProduceRequestTest
andRequestResponseTest
I added two test cases to cover the first two and the producer seem to self recover on retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, topics will not have IDs. So when we decide which version of produce we want to send, we want to be aware of this.
Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility.
We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jolshan sorry for taking too long to get back to you on this. I have added ApiVersions::maxSupportedProduceVersion
to determine what the supported version supported on the cluster. Regarding the tests for these combination of old/new producer with old/new broker are all covered in the system tests tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
and tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
. I am trying to add another one to test the same while deleting topic and leadership changes but not sure if we really need to cover this or not as these are covered in the integration tests. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem. I've also been super busy so not sure if I would have had time to review.
As long as the tests are between an older version and a newer version (ie not upgrade tests where the issue can just seem transient) then this should be ok.
Whenever I implemented the fetch change, I also did some sanity checks by looking at the logs when upgrading to see the change in request version without errors. I'd recommend doing this at least once as a manual test.
Thanks for the change and I will take a look.
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
@@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig, | |||
*/ | |||
private def appendToLocalLog(internalTopicsAllowed: Boolean, | |||
origin: AppendOrigin, | |||
entriesPerPartition: Map[TopicPartition, MemoryRecords], | |||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason to pass this data structure here if we are not using the ID to check the append at the log level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two reasons here
- I didn't want to keep convert between
TopicIdPartitions
toTopicPartition
- KAFKA-16212 will eventually use
TopicIdPartitions
togetPartitionOrException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok -- once we start using these across the log layer it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we end up doing the getPartitionOrException in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We introduced getPartitionOrException(topicIdPartition: TopicIdPartition)
that verifies the topicId if provided.
I would recommend taking a look at where we are passing the topic ID through and the checks we do. If we think it is useful to ensure we are writing to the right topic, we should do it, but if it is just adding complexity, we may want to consider changing. |
I meant that in Json files both will be marked |
8daeb62
to
1abc2ac
Compare
1abc2ac
to
35dba4b
Compare
79814ff
to
916e471
Compare
86c1f71
to
c59b10c
Compare
Looks like this needs a rebase or merge from master. |
I will have a look into this next week |
@OmniaGM any updates here? Thanks for your patience and work so far! |
Hi, Sorry for this very late reply. I have been busy with too many things lately and haven't had time to look into resolving the conflicts. I'll try to work on this as soon as possible. Thanks for your patience |
@OmniaGM Would you like someone to help get this over the finish line? |
I am nearly done with fixing the conflict should publish this this week. sorry for late replay I have been away on holiday for the last 2 weeks |
That's great, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. Left a few more comments.
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
Outdated
Show resolved
Hide resolved
val topicPartition = new TopicPartition(topicName, partition.index()) | ||
if (topicName.isEmpty) | ||
nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID) | ||
else if (!metadataCache.contains(topicPartition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we don't need this since it's handled in ReplicaManager
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition was there before my changes. I just moved it
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. A few more comments. Also, are the test failures related to this PR?
private final TopicPartition tp2 = new TopicPartition("test", 2); | ||
private static final String TOPIC_NAME = "test"; | ||
private static final Uuid TOPIC_ID = Uuid.fromString("MKXx1fIkQy2J9jXHhK8m1w"); | ||
private static final Map<String, Uuid> TOPIC_IDS = new HashMap<>(Map.of(TOPIC_NAME, TOPIC_ID)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new HashMap<>(Map.of(TOPIC_NAME, TOPIC_ID));
=> Map.of(TOPIC_NAME, TOPIC_ID));
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map.of
create immutable map which can't be edited later to add other topics needed for other test cases
} | ||
|
||
@Test | ||
public void testTransactionalSplitBatchAndSend() throws Exception { | ||
Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1"); | ||
TOPIC_IDS.put("testSplitBatchAndSend", topicId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, topicId in this test is the same as the one in testIdempotentSplitBatchAndSend. Will this cause confusion during testing since TOPIC_IDS accumulates across tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are the same topic ids and name. I am not sure if the topic name and id should move to the setup of the test suite as multiple test cases need this topic. WDYT?
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
Outdated
Show resolved
Hide resolved
|
||
val topicId = topicNames.find { case (_, topicName) => topicName == topic} | ||
.map { case (topicId, _) => topicId } | ||
.getOrElse(Uuid.ZERO_UUID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, should we only set topicId if version is <= 12? The produce response is only keyed on topic name when version is <= 12.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I haver version at this point in the test to decide also, this is not to set topic id but rather to call find(topicName, topicId)
which can't be called without topicId
for old versions this would be Uuid.ZERO_UUID
isn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this test always sends produce requests with the latest version. Perhaps we could add a comment and remove if (version >= 13)
?
@@ -33,8 +33,8 @@ import scala.jdk.CollectionConverters._ | |||
|
|||
class ProducerSendWhileDeletionTest extends IntegrationTestHarness { | |||
val producerCount: Int = 1 | |||
val brokerCount: Int = 2 | |||
val defaultLingerMs: Int = 5; | |||
val brokerCount: Int = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the tests need 3 brokers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think due to the reassignment test? There may be a way to do it with just 2 though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as @jolshan mentioned this for easier test for reassignment
@@ -886,7 +888,6 @@ public void testProduceResponseVersions() throws Exception { | |||
testAllMessageRoundTrips(new ProduceResponseData() | |||
.setResponses(new ProduceResponseData.TopicProduceResponseCollection(singletonList( | |||
new ProduceResponseData.TopicProduceResponse() | |||
.setName(topicName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we remove this for version compatibility? I would think name or ID would be returned in the response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, topic name or id is added below based on the version.
} | ||
|
||
val reassignment = Map( | ||
partition0 -> Optional.of(new NewPartitionReassignment(util.Arrays.asList(1, 2))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the partition started with 1 replica, we should assign it to a single replica, right? We will need to adjust the comment below too.
@@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig, | |||
*/ | |||
private def appendToLocalLog(internalTopicsAllowed: Boolean, | |||
origin: AppendOrigin, | |||
entriesPerPartition: Map[TopicPartition, MemoryRecords], | |||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We introduced getPartitionOrException(topicIdPartition: TopicIdPartition)
that verifies the topicId if provided.
ProduceRequest
/ProduceResponse
. Topic nameand Topic Id will become
ignorable
following the footstep ofFetchRequest
/FetchResponse
HostedPartition
usingTopicPartition
and doesn't check topic id. This is an [OPEN QUESTION] if we should
address this in this pr or wait for
KAFKA-16212 as this
will update
ReplicaManager::getPartition
to useTopicIdParittion
once we update the cache. Other option is that we compare provided
topicId
withPartition
topic id and returnUNKNOW_TOPIC_ID
orUNKNOW_TOPIC_PARTITION
if we can't find partition with matched topicid.
Committer Checklist (excluded from commit message)