Skip to content

Commit 390c766

Browse files
committed
fix codestyle
1 parent 8c5fc32 commit 390c766

File tree

4 files changed

+95
-119
lines changed

4 files changed

+95
-119
lines changed

fluss-kafka/pom.xml

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright (c) 2024 Alibaba Group Holding Ltd.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
217
<project xmlns="http://maven.apache.org/POM/4.0.0"
318
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
419
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -14,7 +29,6 @@
1429

1530
<properties>
1631
<kafka.version>3.9.0</kafka.version>
17-
<lombok.version>1.18.36</lombok.version>
1832
</properties>
1933

2034
<dependencies>
@@ -29,9 +43,8 @@
2943
<version>${netty.version}-${fluss.shaded.version}</version>
3044
</dependency>
3145
<dependency>
32-
<groupId>org.projectlombok</groupId>
33-
<artifactId>lombok</artifactId>
34-
<version>${lombok.version}</version>
46+
<groupId>org.slf4j</groupId>
47+
<artifactId>slf4j-api</artifactId>
3548
</dependency>
3649
</dependencies>
3750
</project>

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaCommandDecoder.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,16 @@
1616

1717
package com.alibaba.fluss.kafka;
1818

19+
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;
20+
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
21+
1922
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
2023
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
2124
import com.alibaba.fluss.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
25+
import com.alibaba.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
26+
2227
import lombok.extern.slf4j.Slf4j;
28+
2329
import org.apache.kafka.common.protocol.ApiKeys;
2430
import org.apache.kafka.common.requests.AbstractRequest;
2531
import org.apache.kafka.common.requests.AbstractResponse;
@@ -34,15 +40,14 @@
3440
import java.util.concurrent.ConcurrentLinkedDeque;
3541
import java.util.concurrent.atomic.AtomicBoolean;
3642

37-
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;
38-
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
39-
4043
@Slf4j
4144
public abstract class KafkaCommandDecoder extends SimpleChannelInboundHandler<ByteBuf> {
4245

43-
// Need to use a Queue to store the inflight responses, because Kafka clients require the responses to be sent in order.
46+
// Need to use a Queue to store the inflight responses, because Kafka clients require the
47+
// responses to be sent in order.
4448
// See: org.apache.kafka.clients.InFlightRequests#completeNext
45-
private final ConcurrentLinkedDeque<KafkaRequest> inflightResponses = new ConcurrentLinkedDeque<>();
49+
private final ConcurrentLinkedDeque<KafkaRequest> inflightResponses =
50+
new ConcurrentLinkedDeque<>();
4651
protected final AtomicBoolean isActive = new AtomicBoolean(true);
4752
protected volatile ChannelHandlerContext ctx;
4853
protected SocketAddress remoteAddress;
@@ -56,7 +61,11 @@ public void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Excep
5661
future.whenCompleteAsync((r, t) -> sendResponse(ctx), ctx.executor());
5762

5863
if (!isActive.get()) {
59-
handleInactive(request);
64+
try {
65+
handleInactive(request);
66+
} finally {
67+
ReferenceCountUtil.release(buffer);
68+
}
6069
return;
6170
}
6271
switch (request.apiKey()) {
@@ -160,7 +169,7 @@ public void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Excep
160169
log.error("Error handling request", t);
161170
future.completeExceptionally(t);
162171
} finally {
163-
buffer.release();
172+
ReferenceCountUtil.release(buffer);
164173
}
165174
}
166175

@@ -211,7 +220,10 @@ private void sendResponse(ChannelHandlerContext ctx) {
211220
protected void close() {
212221
isActive.set(false);
213222
ctx.close();
214-
log.warn("Close channel {} with {} pending requests.", remoteAddress, inflightResponses.size());
223+
log.warn(
224+
"Close channel {} with {} pending requests.",
225+
remoteAddress,
226+
inflightResponses.size());
215227
for (KafkaRequest request : inflightResponses) {
216228
request.cancel();
217229
}
@@ -226,7 +238,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
226238
protected void handleUnsupportedRequest(KafkaRequest request) {
227239
String message = String.format("Unsupported request with api key %s", request.apiKey());
228240
AbstractRequest abstractRequest = request.request();
229-
AbstractResponse response = abstractRequest.getErrorResponse(new UnsupportedOperationException(message));
241+
AbstractResponse response =
242+
abstractRequest.getErrorResponse(new UnsupportedOperationException(message));
230243
request.complete(response);
231244
}
232245

@@ -294,20 +307,24 @@ protected void handleUnsupportedRequest(KafkaRequest request) {
294307

295308
protected abstract void handleDescribeClusterRequest(KafkaRequest request);
296309

297-
298-
private static KafkaRequest parseRequest(ChannelHandlerContext ctx, CompletableFuture<AbstractResponse> future,
299-
ByteBuf buffer) {
310+
private static KafkaRequest parseRequest(
311+
ChannelHandlerContext ctx, CompletableFuture<AbstractResponse> future, ByteBuf buffer) {
300312
ByteBuffer nioBuffer = buffer.nioBuffer();
301313
RequestHeader header = RequestHeader.parse(nioBuffer);
302314
if (isUnsupportedApiVersionRequest(header)) {
303-
ApiVersionsRequest request = new ApiVersionsRequest.Builder(header.apiVersion()).build();
304-
return new KafkaRequest(API_VERSIONS, header.apiVersion(), header, request, buffer, ctx, future);
315+
ApiVersionsRequest request =
316+
new ApiVersionsRequest.Builder(header.apiVersion()).build();
317+
return new KafkaRequest(
318+
API_VERSIONS, header.apiVersion(), header, request, buffer, ctx, future);
305319
}
306-
RequestAndSize request = AbstractRequest.parseRequest(header.apiKey(), header.apiVersion(), nioBuffer);
307-
return new KafkaRequest(header.apiKey(), header.apiVersion(), header, request.request, buffer, ctx, future);
320+
RequestAndSize request =
321+
AbstractRequest.parseRequest(header.apiKey(), header.apiVersion(), nioBuffer);
322+
return new KafkaRequest(
323+
header.apiKey(), header.apiVersion(), header, request.request, buffer, ctx, future);
308324
}
309325

310326
private static boolean isUnsupportedApiVersionRequest(RequestHeader header) {
311-
return header.apiKey() == API_VERSIONS && !API_VERSIONS.isVersionSupported(header.apiVersion());
327+
return header.apiKey() == API_VERSIONS
328+
&& !API_VERSIONS.isVersionSupported(header.apiVersion());
312329
}
313330
}

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaRequest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
2020
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
2121
import com.alibaba.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
22+
2223
import org.apache.kafka.common.protocol.ApiKeys;
2324
import org.apache.kafka.common.protocol.ApiMessage;
2425
import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -46,8 +47,14 @@ public class KafkaRequest {
4647
private final CompletableFuture<AbstractResponse> future;
4748
private volatile boolean cancelled = false;
4849

49-
protected KafkaRequest(ApiKeys apiKey, short apiVersion, RequestHeader header, AbstractRequest request,
50-
ByteBuf buffer, ChannelHandlerContext ctx, CompletableFuture<AbstractResponse> future) {
50+
protected KafkaRequest(
51+
ApiKeys apiKey,
52+
short apiVersion,
53+
RequestHeader header,
54+
AbstractRequest request,
55+
ByteBuf buffer,
56+
ChannelHandlerContext ctx,
57+
CompletableFuture<AbstractResponse> future) {
5158
this.apiKey = apiKey;
5259
this.apiVersion = apiVersion;
5360
this.header = header;
@@ -78,7 +85,6 @@ public <T> T request() {
7885
return (T) request;
7986
}
8087

81-
8288
public void releaseBuffer() {
8389
ReferenceCountUtil.safeRelease(buffer);
8490
}

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaRequestHandler.java

Lines changed: 35 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
package com.alibaba.fluss.kafka;
1717

1818
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
19-
import lombok.extern.slf4j.Slf4j;
19+
2020
import org.apache.kafka.common.errors.LeaderNotAvailableException;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2123

22-
@Slf4j
2324
public final class KafkaRequestHandler extends KafkaCommandDecoder {
25+
private static final Logger log = LoggerFactory.getLogger(KafkaRequestHandler.class);
2426

2527
@Override
2628
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -49,157 +51,95 @@ protected void handleInactive(KafkaRequest request) {
4951
}
5052

5153
@Override
52-
protected void handleApiVersionsRequest(KafkaRequest request) {
53-
54-
}
54+
protected void handleApiVersionsRequest(KafkaRequest request) {}
5555

5656
@Override
57-
protected void handleProducerRequest(KafkaRequest request) {
58-
59-
}
57+
protected void handleProducerRequest(KafkaRequest request) {}
6058

6159
@Override
62-
protected void handleMetadataRequest(KafkaRequest request) {
63-
64-
}
60+
protected void handleMetadataRequest(KafkaRequest request) {}
6561

6662
@Override
67-
protected void handleFindCoordinatorRequest(KafkaRequest request) {
68-
69-
}
63+
protected void handleFindCoordinatorRequest(KafkaRequest request) {}
7064

7165
@Override
72-
protected void handleListOffsetRequest(KafkaRequest request) {
73-
74-
}
66+
protected void handleListOffsetRequest(KafkaRequest request) {}
7567

7668
@Override
77-
protected void handleOffsetFetchRequest(KafkaRequest request) {
78-
79-
}
69+
protected void handleOffsetFetchRequest(KafkaRequest request) {}
8070

8171
@Override
82-
protected void handleOffsetCommitRequest(KafkaRequest request) {
83-
84-
}
72+
protected void handleOffsetCommitRequest(KafkaRequest request) {}
8573

8674
@Override
87-
protected void handleFetchRequest(KafkaRequest request) {
88-
89-
}
75+
protected void handleFetchRequest(KafkaRequest request) {}
9076

9177
@Override
92-
protected void handleJoinGroupRequest(KafkaRequest request) {
93-
94-
}
78+
protected void handleJoinGroupRequest(KafkaRequest request) {}
9579

9680
@Override
97-
protected void handleSyncGroupRequest(KafkaRequest request) {
98-
99-
}
81+
protected void handleSyncGroupRequest(KafkaRequest request) {}
10082

10183
@Override
102-
protected void handleHeartbeatRequest(KafkaRequest request) {
103-
104-
}
84+
protected void handleHeartbeatRequest(KafkaRequest request) {}
10585

10686
@Override
107-
protected void handleLeaveGroupRequest(KafkaRequest request) {
108-
109-
}
87+
protected void handleLeaveGroupRequest(KafkaRequest request) {}
11088

11189
@Override
112-
protected void handleDescribeGroupsRequest(KafkaRequest request) {
113-
114-
}
90+
protected void handleDescribeGroupsRequest(KafkaRequest request) {}
11591

11692
@Override
117-
protected void handleListGroupsRequest(KafkaRequest request) {
118-
119-
}
93+
protected void handleListGroupsRequest(KafkaRequest request) {}
12094

12195
@Override
122-
protected void handleDeleteGroupsRequest(KafkaRequest request) {
123-
124-
}
96+
protected void handleDeleteGroupsRequest(KafkaRequest request) {}
12597

12698
@Override
127-
protected void handleSaslHandshakeRequest(KafkaRequest request) {
128-
129-
}
99+
protected void handleSaslHandshakeRequest(KafkaRequest request) {}
130100

131101
@Override
132-
protected void handleSaslAuthenticateRequest(KafkaRequest request) {
133-
134-
}
102+
protected void handleSaslAuthenticateRequest(KafkaRequest request) {}
135103

136104
@Override
137-
protected void handleCreateTopicsRequest(KafkaRequest request) {
138-
139-
}
105+
protected void handleCreateTopicsRequest(KafkaRequest request) {}
140106

141107
@Override
142-
protected void handleInitProducerIdRequest(KafkaRequest request) {
143-
144-
}
108+
protected void handleInitProducerIdRequest(KafkaRequest request) {}
145109

146110
@Override
147-
protected void handleAddPartitionsToTxnRequest(KafkaRequest request) {
148-
149-
}
111+
protected void handleAddPartitionsToTxnRequest(KafkaRequest request) {}
150112

151113
@Override
152-
protected void handleAddOffsetsToTxnRequest(KafkaRequest request) {
153-
154-
}
114+
protected void handleAddOffsetsToTxnRequest(KafkaRequest request) {}
155115

156116
@Override
157-
protected void handleTxnOffsetCommitRequest(KafkaRequest request) {
158-
159-
}
117+
protected void handleTxnOffsetCommitRequest(KafkaRequest request) {}
160118

161119
@Override
162-
protected void handleEndTxnRequest(KafkaRequest request) {
163-
164-
}
120+
protected void handleEndTxnRequest(KafkaRequest request) {}
165121

166122
@Override
167-
protected void handleWriteTxnMarkersRequest(KafkaRequest request) {
168-
169-
}
123+
protected void handleWriteTxnMarkersRequest(KafkaRequest request) {}
170124

171125
@Override
172-
protected void handleDescribeConfigsRequest(KafkaRequest request) {
173-
174-
}
126+
protected void handleDescribeConfigsRequest(KafkaRequest request) {}
175127

176128
@Override
177-
protected void handleAlterConfigsRequest(KafkaRequest request) {
178-
179-
}
129+
protected void handleAlterConfigsRequest(KafkaRequest request) {}
180130

181131
@Override
182-
protected void handleDeleteTopicsRequest(KafkaRequest request) {
183-
184-
}
132+
protected void handleDeleteTopicsRequest(KafkaRequest request) {}
185133

186134
@Override
187-
protected void handleDeleteRecordsRequest(KafkaRequest request) {
188-
189-
}
135+
protected void handleDeleteRecordsRequest(KafkaRequest request) {}
190136

191137
@Override
192-
protected void handleOffsetDeleteRequest(KafkaRequest request) {
193-
194-
}
138+
protected void handleOffsetDeleteRequest(KafkaRequest request) {}
195139

196140
@Override
197-
protected void handleCreatePartitionsRequest(KafkaRequest request) {
198-
199-
}
141+
protected void handleCreatePartitionsRequest(KafkaRequest request) {}
200142

201143
@Override
202-
protected void handleDescribeClusterRequest(KafkaRequest request) {
203-
204-
}
144+
protected void handleDescribeClusterRequest(KafkaRequest request) {}
205145
}

0 commit comments

Comments
 (0)