Skip to content

Commit d3b083e

Browse files
authored
[kafka] Support connection max idle time configuration for KafkaProtocolPlugin (#821)
1 parent b6d4f4d commit d3b083e

File tree

11 files changed

+150
-93
lines changed

11 files changed

+150
-93
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ public class ConfigOptions {
705705
.durationType()
706706
.defaultValue(Duration.ofMinutes(10))
707707
.withDescription(
708-
"Close idle connections after the number of milliseconds specified by this config.");
708+
"Close idle connections after the given time specified by this config.");
709709

710710
public static final ConfigOption<Integer> NETTY_CLIENT_NUM_NETWORK_THREADS =
711711
key("netty.client.num-network-threads")
@@ -1536,6 +1536,13 @@ public class ConfigOptions {
15361536
.withDescription(
15371537
"The database for fluss kafka. The default database is 'kafka'.");
15381538

1539+
public static final ConfigOption<Duration> KAFKA_CONNECTION_MAX_IDLE_TIME =
1540+
key("kafka.connection.max-idle-time")
1541+
.durationType()
1542+
.defaultValue(Duration.ofSeconds(60))
1543+
.withDescription(
1544+
"Close kafka idle connections after the given time specified by this config.");
1545+
15391546
/**
15401547
* Compaction style for Fluss's kv, which is same to rocksdb's, but help use avoid including
15411548
* rocksdb dependency when only need include this common module.

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaChannelInitializer.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,34 @@
1616

1717
package com.alibaba.fluss.kafka;
1818

19+
import com.alibaba.fluss.rpc.netty.NettyChannelInitializer;
1920
import com.alibaba.fluss.rpc.netty.server.RequestChannel;
2021
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
2122
import com.alibaba.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
22-
import com.alibaba.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2323
import com.alibaba.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldPrepender;
2424
import com.alibaba.fluss.shaded.netty4.io.netty.handler.flow.FlowControlHandler;
25-
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
26-
27-
import java.util.concurrent.TimeUnit;
2825

2926
/**
3027
* A {@link ChannelInitializer} for initializing {@link SocketChannel} instances that will be used
3128
* by the server to handle the Kafka requests for the client.
3229
*/
33-
public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
30+
public class KafkaChannelInitializer extends NettyChannelInitializer {
3431
public static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB
3532

3633
private final RequestChannel[] requestChannels;
3734
private final LengthFieldPrepender prepender = new LengthFieldPrepender(4);
3835

39-
public KafkaChannelInitializer(RequestChannel[] requestChannels) {
36+
public KafkaChannelInitializer(RequestChannel[] requestChannels, long maxIdleTimeSeconds) {
37+
super(maxIdleTimeSeconds);
4038
this.requestChannels = requestChannels;
4139
}
4240

4341
@Override
4442
protected void initChannel(SocketChannel ch) throws Exception {
45-
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
43+
super.initChannel(ch);
44+
addIdleStateHandler(ch);
4645
ch.pipeline().addLast(prepender);
47-
ch.pipeline()
48-
.addLast(
49-
"frameDecoder",
50-
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
46+
addFrameDecoder(ch, MAX_FRAME_LENGTH, 4);
5147
ch.pipeline().addLast("flowController", new FlowControlHandler());
5248
ch.pipeline().addLast(new KafkaCommandDecoder(requestChannels));
5349
}

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaCommandDecoder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
2121
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
2222
import com.alibaba.fluss.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
23+
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
24+
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
2325
import com.alibaba.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
2426
import com.alibaba.fluss.utils.MathUtils;
2527

@@ -113,6 +115,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
113115
// TODO Channel metrics
114116
}
115117

118+
@Override
119+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
120+
if (evt instanceof IdleStateEvent) {
121+
IdleStateEvent event = (IdleStateEvent) evt;
122+
if (event.state().equals(IdleState.ALL_IDLE)) {
123+
LOG.warn("Connection {} is idle, closing...", ctx.channel().remoteAddress());
124+
ctx.close();
125+
}
126+
}
127+
}
128+
116129
private void sendResponse(ChannelHandlerContext ctx) {
117130
KafkaRequest request;
118131
while ((request = inflightResponses.peekFirst()) != null) {

fluss-kafka/src/main/java/com/alibaba/fluss/kafka/KafkaProtocolPlugin.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,29 @@
3030
/** The Kafka protocol plugin. */
3131
public class KafkaProtocolPlugin implements NetworkProtocolPlugin {
3232

33+
private Configuration conf;
34+
3335
@Override
3436
public String name() {
3537
return KAFKA_PROTOCOL_NAME;
3638
}
3739

3840
@Override
39-
public List<String> listenerNames(Configuration conf) {
41+
public void setup(Configuration conf) {
42+
this.conf = conf;
43+
}
44+
45+
@Override
46+
public List<String> listenerNames() {
4047
return conf.get(ConfigOptions.KAFKA_LISTENER_NAMES);
4148
}
4249

4350
@Override
4451
public ChannelHandler createChannelHandler(
4552
RequestChannel[] requestChannels, String listenerName) {
46-
return new KafkaChannelInitializer(requestChannels);
53+
return new KafkaChannelInitializer(
54+
requestChannels,
55+
conf.get(ConfigOptions.KAFKA_CONNECTION_MAX_IDLE_TIME).getSeconds());
4756
}
4857

4958
@Override
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.rpc.netty;
18+
19+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
20+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
21+
import com.alibaba.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
22+
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
23+
24+
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
25+
26+
/**
27+
* A basic {@link ChannelInitializer} for initializing {@link SocketChannel} instances to support
28+
* netty logging and add common handlers.
29+
*/
30+
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
31+
32+
private final int maxIdleTimeSeconds;
33+
34+
private static final NettyLogger nettyLogger = new NettyLogger();
35+
36+
public NettyChannelInitializer(long maxIdleTimeSeconds) {
37+
checkArgument(maxIdleTimeSeconds <= Integer.MAX_VALUE, "maxIdleTimeSeconds too large");
38+
this.maxIdleTimeSeconds = (int) maxIdleTimeSeconds;
39+
}
40+
41+
@Override
42+
protected void initChannel(SocketChannel ch) throws Exception {
43+
if (nettyLogger.getLoggingHandler() != null) {
44+
ch.pipeline().addLast("loggingHandler", nettyLogger.getLoggingHandler());
45+
}
46+
}
47+
48+
public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int initialBytesToStrip) {
49+
ch.pipeline()
50+
.addLast(
51+
"frameDecoder",
52+
new LengthFieldBasedFrameDecoder(
53+
maxFrameLength, 0, 4, 0, initialBytesToStrip));
54+
}
55+
56+
public void addIdleStateHandler(SocketChannel ch) {
57+
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, maxIdleTimeSeconds));
58+
}
59+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ClientChannelInitializer.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,25 @@
1616

1717
package com.alibaba.fluss.rpc.netty.client;
1818

19-
import com.alibaba.fluss.rpc.netty.NettyLogger;
19+
import com.alibaba.fluss.rpc.netty.NettyChannelInitializer;
2020
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
2121
import com.alibaba.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
22-
import com.alibaba.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
23-
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
24-
25-
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
2622

2723
/**
2824
* A specialized {@link ChannelInitializer} for initializing {@link SocketChannel} instances that
29-
* will be used by the server to handle the init request for the client.
25+
* will be used by the client to handle the init request for the server.
3026
*/
31-
final class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
32-
33-
private final int maxIdleTimeSeconds;
34-
35-
private static final NettyLogger nettyLogger = new NettyLogger();
27+
final class ClientChannelInitializer extends NettyChannelInitializer {
3628

3729
public ClientChannelInitializer(long maxIdleTimeSeconds) {
38-
checkArgument(maxIdleTimeSeconds <= Integer.MAX_VALUE, "maxIdleTimeSeconds too large");
39-
this.maxIdleTimeSeconds = (int) maxIdleTimeSeconds;
30+
super(maxIdleTimeSeconds);
4031
}
4132

4233
@Override
43-
protected void initChannel(SocketChannel ch) {
34+
protected void initChannel(SocketChannel ch) throws Exception {
4435
// NettyClientHandler will be added dynamically when connection is built
45-
if (nettyLogger.getLoggingHandler() != null) {
46-
ch.pipeline().addLast("loggingHandler", nettyLogger.getLoggingHandler());
47-
}
48-
ch.pipeline()
49-
.addLast(
50-
"frameDecoder",
51-
// initialBytesToStrip=0 to include the frame size field after decoding
52-
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
53-
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, maxIdleTimeSeconds));
36+
super.initChannel(ch);
37+
addFrameDecoder(ch, Integer.MAX_VALUE, 0);
38+
addIdleStateHandler(ch);
5439
}
5540
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/FlussProtocolPlugin.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,21 @@
2525
import com.alibaba.fluss.rpc.protocol.NetworkProtocolPlugin;
2626
import com.alibaba.fluss.security.auth.AuthenticationFactory;
2727
import com.alibaba.fluss.security.auth.PlainTextAuthenticationPlugin;
28-
import com.alibaba.fluss.security.auth.ServerAuthenticator;
2928
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandler;
3029

3130
import java.util.List;
32-
import java.util.Map;
3331
import java.util.Optional;
34-
import java.util.function.Supplier;
3532

3633
/** Build-in protocol plugin for Fluss. */
3734
public class FlussProtocolPlugin implements NetworkProtocolPlugin {
38-
public static final String FLUSS_PROTOCOL_NAME = "FLUSS";
39-
private final Map<String, Supplier<ServerAuthenticator>> authenticatorSuppliers;
4035
private final ApiManager apiManager;
41-
private final long maxIdleTimeSeconds;
4236
private final List<String> listeners;
4337
private final RequestsMetrics requestsMetrics;
44-
private final String internalListenerName;
38+
private Configuration conf;
4539

4640
public FlussProtocolPlugin(
47-
Configuration conf,
48-
ServerType serverType,
49-
List<String> listeners,
50-
RequestsMetrics requestsMetrics) {
51-
this.authenticatorSuppliers = AuthenticationFactory.loadServerAuthenticatorSuppliers(conf);
41+
ServerType serverType, List<String> listeners, RequestsMetrics requestsMetrics) {
5242
this.apiManager = new ApiManager(serverType);
53-
maxIdleTimeSeconds = conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
54-
this.internalListenerName = conf.get(ConfigOptions.INTERNAL_LISTENER_NAME);
5543
this.listeners = listeners;
5644
this.requestsMetrics = requestsMetrics;
5745
}
@@ -62,7 +50,12 @@ public String name() {
6250
}
6351

6452
@Override
65-
public List<String> listenerNames(Configuration conf) {
53+
public void setup(Configuration conf) {
54+
this.conf = conf;
55+
}
56+
57+
@Override
58+
public List<String> listenerNames() {
6659
return listeners;
6760
}
6861

@@ -73,10 +66,12 @@ public ChannelHandler createChannelHandler(
7366
requestChannels,
7467
apiManager,
7568
listenerName,
76-
listenerName.equals(internalListenerName),
69+
listenerName.equals(conf.get(ConfigOptions.INTERNAL_LISTENER_NAME)),
7770
requestsMetrics,
78-
maxIdleTimeSeconds,
79-
Optional.ofNullable(authenticatorSuppliers.get(listenerName))
71+
conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds(),
72+
Optional.ofNullable(
73+
AuthenticationFactory.loadServerAuthenticatorSuppliers(conf)
74+
.get(listenerName))
8075
.orElse(PlainTextAuthenticationPlugin.PlainTextServerAuthenticator::new));
8176
}
8277

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ private void startEndpoint(Endpoint endpoint, NetworkProtocolPlugin protocol)
207207
private Map<String, NetworkProtocolPlugin> getProtocolsByListenerName() {
208208
Map<String, NetworkProtocolPlugin> protocolsByListenerName = new HashMap<>();
209209
for (NetworkProtocolPlugin protocol : protocols) {
210-
for (String listenerName : protocol.listenerNames(conf)) {
210+
for (String listenerName : protocol.listenerNames()) {
211211
checkState(
212212
!protocolsByListenerName.containsKey(listenerName),
213213
"Multiple network protocols are bound to the same listener name %s",
@@ -229,13 +229,17 @@ private static List<NetworkProtocolPlugin> loadProtocols(
229229
if (conf.get(ConfigOptions.KAFKA_ENABLED)) {
230230
NetworkProtocolPlugin kafkaPlugin =
231231
loadProtocolPlugin(NetworkProtocolPlugin.KAFKA_PROTOCOL_NAME);
232-
listeners.removeAll(kafkaPlugin.listenerNames(conf));
232+
kafkaPlugin.setup(conf);
233+
listeners.removeAll(kafkaPlugin.listenerNames());
233234
protocolPlugins.add(kafkaPlugin);
234235
}
235236

236237
// Add the Fluss protocol plugin in the end to allow other protocol
237238
// pick their listener names first
238-
protocolPlugins.add(new FlussProtocolPlugin(conf, serverType, listeners, requestsMetrics));
239+
NetworkProtocolPlugin flussPlugin =
240+
new FlussProtocolPlugin(serverType, listeners, requestsMetrics);
241+
flussPlugin.setup(conf);
242+
protocolPlugins.add(flussPlugin);
239243
return protocolPlugins;
240244
}
241245

0 commit comments

Comments
 (0)