Skip to content

Commit 7fc0488

Browse files
committed
impl protocol and server side
1 parent 499e167 commit 7fc0488

10 files changed

Lines changed: 610 additions & 40 deletions

File tree

clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19+
import org.apache.kafka.common.Uuid;
1920
import org.apache.kafka.common.errors.UnsupportedVersionException;
2021
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
2122
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
@@ -70,6 +71,24 @@ public OffsetsForLeaderEpochRequest build(short version) {
7071
if (version < oldestAllowedVersion() || version > latestAllowedVersion())
7172
throw new UnsupportedVersionException("Cannot build " + this + " with version " + version);
7273

74+
if (version <= 4) {
75+
for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic : data.topics()) {
76+
if (topic.topic() == null || topic.topic().isEmpty()) {
77+
throw new UnsupportedVersionException("The broker offsets for leader api version " +
78+
version + " does require usage of topic names.");
79+
}
80+
}
81+
}
82+
83+
if (version >= 5) {
84+
for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic : data.topics()) {
85+
if (topic.topicId() == null || topic.topicId() == Uuid.ZERO_UUID) {
86+
throw new UnsupportedVersionException("The broker offsets for leader api version " +
87+
version + " does require usage of topic ids.");
88+
}
89+
}
90+
}
91+
7392
return new OffsetsForLeaderEpochRequest(data, version);
7493
}
7594

@@ -93,6 +112,10 @@ public int replicaId() {
93112
return data.replicaId();
94113
}
95114

115+
public static boolean useTopicIds(short version) {
116+
return version >= 5;
117+
}
118+
96119
public static OffsetsForLeaderEpochRequest parse(Readable readable, short version) {
97120
return new OffsetsForLeaderEpochRequest(new OffsetForLeaderEpochRequestData(readable, version), version);
98121
}
@@ -104,7 +127,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
104127
OffsetForLeaderEpochResponseData responseData = new OffsetForLeaderEpochResponseData();
105128
data.topics().forEach(topic -> {
106129
OffsetForLeaderTopicResult topicData = new OffsetForLeaderTopicResult()
107-
.setTopic(topic.topic());
130+
.setTopic(topic.topic()).setTopicId(topic.topicId());
108131
topic.partitions().forEach(partition ->
109132
topicData.partitions().add(new EpochEndOffset()
110133
.setPartition(partition.partition())

clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public Map<Errors, Integer> errorCounts() {
6363
return errorCounts;
6464
}
6565

66+
public static boolean useTopicIds(short version) {
67+
return version >= 5;
68+
}
69+
6670
public int throttleTimeMs() {
6771
return data.throttleTimeMs();
6872
}

clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,19 @@
2929
// Followers will use this replicaId when using an older version of the protocol.
3030
//
3131
// Version 4 enables flexible versions.
32-
"validVersions": "2-4",
32+
//
33+
// Version 5 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
34+
"validVersions": "2-5",
3335
"flexibleVersions": "4+",
3436
"fields": [
3537
{ "name": "ReplicaId", "type": "int32", "versions": "3+", "default": -2, "ignorable": true, "entityType": "brokerId",
3638
"about": "The broker ID of the follower, of -1 if this request is from a consumer." },
3739
{ "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+",
3840
"about": "Each topic to get offsets for.", "fields": [
39-
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
40-
"mapKey": true, "about": "The topic name." },
41+
{ "name": "Topic", "type": "string", "versions": "0-4", "entityType": "topicName",
42+
"mapKey": true, "ignorable": true, "about": "The topic name." },
43+
{ "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
44+
"about": "The unique topic ID."},
4145
{ "name": "Partitions", "type": "[]OffsetForLeaderPartition", "versions": "0+",
4246
"about": "Each partition to get offsets for.", "fields": [
4347
{ "name": "Partition", "type": "int32", "versions": "0+",

clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,19 @@
2626
// Version 3 is the same as version 2.
2727
//
2828
// Version 4 enables flexible versions.
29-
"validVersions": "2-4",
29+
//
30+
// Version 5 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
31+
"validVersions": "2-5",
3032
"flexibleVersions": "4+",
3133
"fields": [
3234
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
3335
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
3436
{ "name": "Topics", "type": "[]OffsetForLeaderTopicResult", "versions": "0+",
3537
"about": "Each topic we fetched offsets for.", "fields": [
36-
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
37-
"mapKey": true, "about": "The topic name." },
38+
{ "name": "Topic", "type": "string", "versions": "0-4", "entityType": "topicName",
39+
"mapKey": true, "ignorable": true, "about": "The topic name." },
40+
{ "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
41+
"about": "The unique topic ID."},
3842
{ "name": "Partitions", "type": "[]EpochEndOffset", "versions": "0+",
3943
"about": "Each partition in the topic we fetched offsets for.", "fields": [
4044
{ "name": "ErrorCode", "type": "int16", "versions": "0+",

clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -378,36 +378,24 @@ public void testThrottleTimeIgnorableInDescribeGroupsResponse() throws Exception
378378
testAllMessageRoundTripsBeforeVersion((short) 1, responseWithGroupInstanceId, expectedResponse);
379379
}
380380

381-
@Test
382-
public void testOffsetForLeaderEpochVersions() throws Exception {
383-
// Version 2 adds optional current leader epoch
384-
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataNoCurrentEpoch =
385-
new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition()
386-
.setPartition(0)
387-
.setLeaderEpoch(3);
388-
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataWithCurrentEpoch =
389-
new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition()
381+
@ParameterizedTest
382+
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FOR_LEADER_EPOCH)
383+
public void testOffsetForLeaderEpochVersions(short version) throws Exception {
384+
OffsetForLeaderEpochRequestData request = new OffsetForLeaderEpochRequestData()
385+
.setReplicaId(version >= 3 ? 5 : -2);
386+
request.topics().add(
387+
new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic()
388+
.setTopicId(version >= 5 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
389+
.setTopic(version < 5 ? "foo" : "")
390+
.setPartitions(singletonList(
391+
new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition()
390392
.setPartition(0)
391393
.setLeaderEpoch(3)
392-
.setCurrentLeaderEpoch(5);
393-
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData();
394-
data.topics().add(new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic()
395-
.setTopic("foo")
396-
.setPartitions(singletonList(partitionDataNoCurrentEpoch)));
394+
.setCurrentLeaderEpoch(version >= 2 ? 5 : -1)
395+
))
396+
);
397397

398-
testAllMessageRoundTrips(data);
399-
short lowestVersion = ApiKeys.OFFSET_FOR_LEADER_EPOCH.oldestVersion();
400-
testAllMessageRoundTripsBetweenVersions(lowestVersion, (short) 2, partitionDataWithCurrentEpoch, partitionDataNoCurrentEpoch);
401-
testAllMessageRoundTripsFromVersion((short) 2, partitionDataWithCurrentEpoch);
402-
403-
// Version 3 adds the optional replica Id field
404-
testAllMessageRoundTripsFromVersion((short) 3, new OffsetForLeaderEpochRequestData().setReplicaId(5));
405-
testAllMessageRoundTripsBeforeVersion((short) 3,
406-
new OffsetForLeaderEpochRequestData().setReplicaId(5),
407-
new OffsetForLeaderEpochRequestData());
408-
testAllMessageRoundTripsBeforeVersion((short) 3,
409-
new OffsetForLeaderEpochRequestData().setReplicaId(5),
410-
new OffsetForLeaderEpochRequestData().setReplicaId(-2));
398+
testMessageRoundTrip(version, request, request);
411399
}
412400

413401
@ParameterizedTest

clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19+
import org.apache.kafka.common.Uuid;
1920
import org.apache.kafka.common.errors.UnsupportedVersionException;
21+
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
22+
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
23+
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
2024
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
2125
import org.apache.kafka.common.protocol.ApiKeys;
2226

2327
import org.junit.jupiter.api.Test;
2428

29+
import java.util.Collections;
30+
2531
import static org.junit.jupiter.api.Assertions.assertEquals;
2632
import static org.junit.jupiter.api.Assertions.assertThrows;
2733

@@ -51,4 +57,128 @@ public void testForFollower() {
5157
OffsetsForLeaderEpochRequest parsed = OffsetsForLeaderEpochRequest.parse(request.serialize(), version);
5258
assertEquals(replicaId, parsed.replicaId());
5359
}
60+
61+
@Test
62+
public void testVersion4BelowFailUseTopicId() {
63+
// Version 4 and below require topic names, not topic IDs
64+
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
65+
OffsetForLeaderTopic topic = new OffsetForLeaderTopic()
66+
.setTopicId(Uuid.randomUuid()) // Only topicId, no topic name
67+
.setPartitions(Collections.singletonList(
68+
new OffsetForLeaderPartition()
69+
.setPartition(0)
70+
.setLeaderEpoch(1)
71+
));
72+
topics.add(topic);
73+
74+
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData()
75+
.setReplicaId(1)
76+
.setTopics(topics);
77+
78+
OffsetsForLeaderEpochRequest.Builder builder = new OffsetsForLeaderEpochRequest.Builder(
79+
(short) 2, (short) 4, data);
80+
81+
// Should throw UnsupportedVersionException when building version 4 without topic names
82+
assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 4));
83+
assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 3));
84+
assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 2));
85+
}
86+
87+
@Test
88+
public void testVersion4SucceedsWithTopicNames() {
89+
// Version 4 should succeed when topic names are provided
90+
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
91+
OffsetForLeaderTopic topic = new OffsetForLeaderTopic()
92+
.setTopic("test-topic") // Topic name provided
93+
.setPartitions(Collections.singletonList(
94+
new OffsetForLeaderPartition()
95+
.setPartition(0)
96+
.setLeaderEpoch(1)
97+
));
98+
topics.add(topic);
99+
100+
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData()
101+
.setReplicaId(1)
102+
.setTopics(topics);
103+
104+
OffsetsForLeaderEpochRequest.Builder builder = new OffsetsForLeaderEpochRequest.Builder(
105+
(short) 2, (short) 4, data);
106+
107+
// Should succeed for version 4 with topic names
108+
OffsetsForLeaderEpochRequest request = builder.build((short) 4);
109+
assertEquals(1, request.replicaId());
110+
}
111+
112+
@Test
113+
public void testVersion5RequiresTopicIds() {
114+
// Version 5 and above require topic IDs, not topic names
115+
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
116+
OffsetForLeaderTopic topic = new OffsetForLeaderTopic()
117+
.setTopic("test-topic") // Only topic name, no topicId
118+
.setPartitions(Collections.singletonList(
119+
new OffsetForLeaderPartition()
120+
.setPartition(0)
121+
.setLeaderEpoch(1)
122+
));
123+
topics.add(topic);
124+
125+
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData()
126+
.setReplicaId(-1)
127+
.setTopics(topics);
128+
129+
OffsetsForLeaderEpochRequest.Builder builder = new OffsetsForLeaderEpochRequest.Builder(
130+
(short) 3, (short) 5, data);
131+
132+
// Should throw UnsupportedVersionException when building version 5 without topic IDs
133+
assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 5));
134+
}
135+
136+
@Test
137+
public void testVersion5SucceedsWithTopicIds() {
138+
// Version 5 should succeed when topic IDs are provided
139+
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
140+
OffsetForLeaderTopic topic = new OffsetForLeaderTopic()
141+
.setTopicId(Uuid.randomUuid()) // Topic ID provided
142+
.setPartitions(Collections.singletonList(
143+
new OffsetForLeaderPartition()
144+
.setPartition(0)
145+
.setLeaderEpoch(1)
146+
));
147+
topics.add(topic);
148+
149+
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData()
150+
.setReplicaId(-1)
151+
.setTopics(topics);
152+
153+
OffsetsForLeaderEpochRequest.Builder builder = new OffsetsForLeaderEpochRequest.Builder(
154+
(short) 3, (short) 5, data);
155+
156+
// Should succeed for version 5 with topic IDs
157+
OffsetsForLeaderEpochRequest request = builder.build((short) 5);
158+
assertEquals(-1, request.replicaId());
159+
}
160+
161+
@Test
162+
public void testVersion5RejectsZeroTopicId() {
163+
// Version 5 should reject ZERO_UUID as topic ID
164+
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
165+
OffsetForLeaderTopic topic = new OffsetForLeaderTopic()
166+
.setTopicId(Uuid.ZERO_UUID) // Invalid: ZERO_UUID
167+
.setPartitions(Collections.singletonList(
168+
new OffsetForLeaderPartition()
169+
.setPartition(0)
170+
.setLeaderEpoch(1)
171+
));
172+
topics.add(topic);
173+
174+
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData()
175+
.setReplicaId(-1)
176+
.setTopics(topics);
177+
178+
OffsetsForLeaderEpochRequest.Builder builder = new OffsetsForLeaderEpochRequest.Builder(
179+
(short) 3, (short) 5, data);
180+
181+
// Should throw UnsupportedVersionException when using ZERO_UUID
182+
assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 5));
183+
}
54184
}

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2117,13 +2117,29 @@ class KafkaApis(val requestChannel: RequestChannel,
21172117
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
21182118
val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq
21192119

2120+
// Separate topics with unknown topic IDs when using version 5+
2121+
val (knownTopics, unknownTopicIdTopics) = if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
2122+
topics.partition { offsetForLeaderTopic =>
2123+
metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
2124+
}
2125+
} else {
2126+
(topics, Seq.empty[OffsetForLeaderTopic])
2127+
}
2128+
21202129
// The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required
21212130
// cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation
21222131
// following a leader change, so we also allow topic describe permission.
21232132
val (authorizedTopics, unauthorizedTopics) =
21242133
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false))
2125-
(topics, Seq.empty[OffsetForLeaderTopic])
2126-
else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic)
2134+
(knownTopics, Seq.empty[OffsetForLeaderTopic])
2135+
else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, knownTopics) { offsetForLeaderTopic =>
2136+
// Resolve topic name from topicId if needed for authorization
2137+
if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
2138+
metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()
2139+
} else {
2140+
offsetForLeaderTopic.topic
2141+
}
2142+
}
21272143

21282144
val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedTopics)
21292145
val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { offsetForLeaderTopic =>
@@ -2133,13 +2149,34 @@ class KafkaApis(val requestChannel: RequestChannel,
21332149
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
21342150
}
21352151

2152+
// Resolve topic name from topicId if needed
2153+
val topicName = if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
2154+
metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()
2155+
} else {
2156+
offsetForLeaderTopic.topic
2157+
}
2158+
2159+
new OffsetForLeaderTopicResult()
2160+
.setTopic(topicName)
2161+
.setTopicId(offsetForLeaderTopic.topicId)
2162+
.setPartitions(partitions.toList.asJava)
2163+
}
2164+
2165+
val endOffsetsForUnknownTopicIdTopics = unknownTopicIdTopics.map { offsetForLeaderTopic =>
2166+
val partitions = offsetForLeaderTopic.partitions.asScala.map { offsetForLeaderPartition =>
2167+
new EpochEndOffset()
2168+
.setPartition(offsetForLeaderPartition.partition)
2169+
.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
2170+
}
2171+
21362172
new OffsetForLeaderTopicResult()
2137-
.setTopic(offsetForLeaderTopic.topic)
2173+
.setTopic("")
2174+
.setTopicId(offsetForLeaderTopic.topicId)
21382175
.setPartitions(partitions.toList.asJava)
21392176
}
21402177

21412178
val endOffsetsForAllTopics = new OffsetForLeaderTopicResultCollection(
2142-
(endOffsetsForAuthorizedPartitions ++ endOffsetsForUnauthorizedPartitions).asJava.iterator
2179+
(endOffsetsForAuthorizedPartitions ++ endOffsetsForUnauthorizedPartitions ++ endOffsetsForUnknownTopicIdTopics).asJava.iterator
21432180
)
21442181

21452182
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>

0 commit comments

Comments
 (0)