Skip to content

Commit 6dde57d

Browse files
authored
[kafka] Handle Kafka API_VERSIONS request (#547)
1 parent 8af721f commit 6dde57d

File tree

4 files changed

+390
-3
lines changed

4 files changed

+390
-3
lines changed

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.protocol.ApiKeys;
2424
import org.apache.kafka.common.protocol.ApiMessage;
2525
import org.apache.kafka.common.protocol.ByteBufferAccessor;
26+
import org.apache.kafka.common.protocol.Message;
2627
import org.apache.kafka.common.protocol.ObjectSerializationCache;
2728
import org.apache.kafka.common.requests.AbstractRequest;
2829
import org.apache.kafka.common.requests.AbstractResponse;
@@ -132,14 +133,17 @@ public ByteBuf serialize() {
132133
private ByteBuf serialize(AbstractResponse response) {
133134
final ObjectSerializationCache cache = new ObjectSerializationCache();
134135
ResponseHeader responseHeader = header.toResponseHeader();
135-
int headerSize = responseHeader.size();
136+
short headerVersion = responseHeader.headerVersion();
137+
short apiVersion = request.version();
138+
Message headerData = responseHeader.data();
139+
int headerSize = headerData.size(cache, headerVersion);
136140
ApiMessage apiMessage = response.data();
137141
int messageSize = apiMessage.size(cache, apiVersion);
138142
final ByteBuf buffer = ctx.alloc().buffer(headerSize + messageSize);
139143
buffer.writerIndex(headerSize + messageSize);
140144
final ByteBuffer nioBuffer = buffer.nioBuffer();
141145
final ByteBufferAccessor writable = new ByteBufferAccessor(nioBuffer);
142-
responseHeader.data().write(writable, cache, apiVersion);
146+
headerData.write(writable, cache, headerVersion);
143147
apiMessage.write(writable, cache, apiVersion);
144148
return buffer;
145149
}

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaRequestHandler.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
2020

2121
import org.apache.kafka.common.errors.LeaderNotAvailableException;
22+
import org.apache.kafka.common.message.ApiVersionsResponseData;
23+
import org.apache.kafka.common.protocol.ApiKeys;
24+
import org.apache.kafka.common.protocol.Errors;
25+
import org.apache.kafka.common.record.RecordBatch;
26+
import org.apache.kafka.common.requests.ApiVersionsResponse;
2227
import org.slf4j.Logger;
2328
import org.slf4j.LoggerFactory;
2429

@@ -52,7 +57,34 @@ protected void handleInactive(KafkaRequest request) {
5257
}
5358

5459
@Override
55-
protected void handleApiVersionsRequest(KafkaRequest request) {}
60+
protected void handleApiVersionsRequest(KafkaRequest request) {
61+
short apiVersion = request.apiVersion();
62+
if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersion)) {
63+
request.fail(Errors.UNSUPPORTED_VERSION.exception());
64+
return;
65+
}
66+
ApiVersionsResponseData data = new ApiVersionsResponseData();
67+
for (ApiKeys apiKey : ApiKeys.values()) {
68+
if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) {
69+
ApiVersionsResponseData.ApiVersion apiVersionData =
70+
new ApiVersionsResponseData.ApiVersion()
71+
.setApiKey(apiKey.id)
72+
.setMinVersion(apiKey.oldestVersion())
73+
.setMaxVersion(apiKey.latestVersion());
74+
if (apiKey.equals(ApiKeys.METADATA)) {
75+
// Not support TopicId
76+
short v = apiKey.latestVersion() > 11 ? 11 : apiKey.latestVersion();
77+
apiVersionData.setMaxVersion(v);
78+
} else if (apiKey.equals(ApiKeys.FETCH)) {
79+
// Not support TopicId
80+
short v = apiKey.latestVersion() > 12 ? 12 : apiKey.latestVersion();
81+
apiVersionData.setMaxVersion(v);
82+
}
83+
data.apiKeys().add(apiVersionData);
84+
}
85+
}
86+
request.complete(new ApiVersionsResponse(data));
87+
}
5688

5789
@Override
5890
protected void handleProducerRequest(KafkaRequest request) {}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.kafka;
18+
19+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
20+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
21+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
22+
23+
import org.apache.kafka.common.protocol.ApiKeys;
24+
import org.apache.kafka.common.protocol.Errors;
25+
import org.apache.kafka.common.requests.AbstractResponse;
26+
import org.apache.kafka.common.requests.ApiVersionsRequest;
27+
import org.apache.kafka.common.requests.ApiVersionsResponse;
28+
import org.apache.kafka.common.requests.RequestHeader;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.Map;
32+
import java.util.concurrent.CompletableFuture;
33+
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
public class KafkaRequestHandlerTest {
37+
38+
@Test
39+
public void testKafkaApiVersionsNotSupported() {
40+
KafkaRequestHandler handler = new KafkaRequestHandler();
41+
short latestVersion = ApiKeys.API_VERSIONS.latestVersion();
42+
ApiVersionsRequest apiVersionsRequest =
43+
new ApiVersionsRequest.Builder().build(latestVersion);
44+
ChannelHandlerContext ctx = new TestChannelHandlerContext();
45+
KafkaRequest request =
46+
new KafkaRequest(
47+
ApiKeys.API_VERSIONS,
48+
(short) (latestVersion + 1), // unsupported version
49+
new RequestHeader(ApiKeys.API_VERSIONS, latestVersion, "client-id", 0),
50+
apiVersionsRequest,
51+
ByteBufAllocator.DEFAULT.buffer(),
52+
ctx,
53+
new CompletableFuture<>());
54+
handler.handleApiVersionsRequest(request);
55+
56+
ByteBuf responseBuffer = request.serialize();
57+
ApiVersionsResponse response =
58+
(ApiVersionsResponse)
59+
AbstractResponse.parseResponse(
60+
responseBuffer.nioBuffer(), request.header());
61+
Map<Errors, Integer> errorCounts = response.errorCounts();
62+
assertThat(1).isEqualTo(errorCounts.size());
63+
assertThat(1).isEqualTo(errorCounts.get(Errors.UNSUPPORTED_VERSION));
64+
}
65+
66+
@Test
67+
public void testKafkaApiVersionsRequest() {
68+
KafkaRequestHandler handler = new KafkaRequestHandler();
69+
short latestVersion = ApiKeys.API_VERSIONS.latestVersion();
70+
ApiVersionsRequest apiVersionsRequest =
71+
new ApiVersionsRequest.Builder().build(latestVersion);
72+
ChannelHandlerContext ctx = new TestChannelHandlerContext();
73+
KafkaRequest request =
74+
new KafkaRequest(
75+
ApiKeys.API_VERSIONS,
76+
latestVersion,
77+
new RequestHeader(ApiKeys.API_VERSIONS, latestVersion, "client-id", 0),
78+
apiVersionsRequest,
79+
ByteBufAllocator.DEFAULT.buffer(),
80+
ctx,
81+
new CompletableFuture<>());
82+
handler.handleApiVersionsRequest(request);
83+
84+
ByteBuf responseBuffer = request.serialize();
85+
ApiVersionsResponse response =
86+
(ApiVersionsResponse)
87+
AbstractResponse.parseResponse(
88+
responseBuffer.nioBuffer(), request.header());
89+
Map<Errors, Integer> errorCounts = response.errorCounts();
90+
assertThat(1).isEqualTo(errorCounts.size());
91+
assertThat(1).isEqualTo(errorCounts.get(Errors.NONE));
92+
response.data()
93+
.apiKeys()
94+
.forEach(
95+
apiVersion -> {
96+
if (ApiKeys.METADATA.id == apiVersion.apiKey()) {
97+
assertThat((short) 11)
98+
.isGreaterThanOrEqualTo(apiVersion.maxVersion());
99+
} else if (ApiKeys.FETCH.id == apiVersion.apiKey()) {
100+
assertThat((short) 12)
101+
.isGreaterThanOrEqualTo(apiVersion.maxVersion());
102+
} else {
103+
ApiKeys apiKeys = ApiKeys.forId(apiVersion.apiKey());
104+
assertThat(apiVersion.minVersion())
105+
.isEqualTo(apiKeys.oldestVersion());
106+
assertThat(apiVersion.maxVersion())
107+
.isEqualTo(apiKeys.latestVersion());
108+
}
109+
});
110+
}
111+
}

0 commit comments

Comments
 (0)