Skip to content

Commit 85a8d50

Browse files
authored
[rpc] Support Netty Logging at the network layer (alibaba#379)
1 parent 16255f5 commit 85a8d50

File tree

3 files changed

+96
-0
lines changed

3 files changed

+96
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.rpc.netty;
18+
19+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
20+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBufHolder;
21+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
22+
import com.alibaba.fluss.shaded.netty4.io.netty.handler.logging.LogLevel;
23+
import com.alibaba.fluss.shaded.netty4.io.netty.handler.logging.LoggingHandler;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.io.IOException;
29+
import java.io.InputStream;
30+
31+
/* This file is based on source code of Apache Spark Project (https://spark.apache.org/), licensed by the Apache
32+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
33+
* additional information regarding copyright ownership. */
34+
35+
/** A Netty logger that constructs a log handler depending on the log level. */
36+
public class NettyLogger {
37+
private static final Logger logger = LoggerFactory.getLogger(NettyLogger.class);
38+
39+
/** A Netty LoggingHandler which does not dump the message contents. */
40+
private static class NoContentLoggingHandler extends LoggingHandler {
41+
42+
NoContentLoggingHandler(Class<?> clazz, LogLevel level) {
43+
super(clazz, level);
44+
}
45+
46+
@Override
47+
protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {
48+
if (arg instanceof ByteBuf) {
49+
return format(ctx, eventName) + " " + ((ByteBuf) arg).readableBytes() + "B";
50+
} else if (arg instanceof ByteBufHolder) {
51+
return format(ctx, eventName)
52+
+ " "
53+
+ ((ByteBufHolder) arg).content().readableBytes()
54+
+ "B";
55+
} else if (arg instanceof InputStream) {
56+
int available = -1;
57+
try {
58+
available = ((InputStream) arg).available();
59+
} catch (IOException ex) {
60+
// Swallow, but return -1 to indicate an error happened
61+
}
62+
return format(ctx, eventName, arg) + " " + available + "B";
63+
} else {
64+
return super.format(ctx, eventName, arg);
65+
}
66+
}
67+
}
68+
69+
private final LoggingHandler loggingHandler;
70+
71+
public NettyLogger() {
72+
if (logger.isTraceEnabled()) {
73+
loggingHandler = new LoggingHandler(NettyLogger.class, LogLevel.TRACE);
74+
} else if (logger.isDebugEnabled()) {
75+
loggingHandler = new NoContentLoggingHandler(NettyLogger.class, LogLevel.DEBUG);
76+
} else {
77+
loggingHandler = null;
78+
}
79+
}
80+
81+
public LoggingHandler getLoggingHandler() {
82+
return loggingHandler;
83+
}
84+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ClientChannelInitializer.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.rpc.netty.client;
1818

19+
import com.alibaba.fluss.rpc.netty.NettyLogger;
1920
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
2021
import com.alibaba.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
2122
import com.alibaba.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -31,6 +32,8 @@ final class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
3132

3233
private final int maxIdleTimeSeconds;
3334

35+
private static final NettyLogger nettyLogger = new NettyLogger();
36+
3437
public ClientChannelInitializer(long maxIdleTimeSeconds) {
3538
checkArgument(maxIdleTimeSeconds <= Integer.MAX_VALUE, "maxIdleTimeSeconds too large");
3639
this.maxIdleTimeSeconds = (int) maxIdleTimeSeconds;
@@ -39,6 +42,9 @@ public ClientChannelInitializer(long maxIdleTimeSeconds) {
3942
@Override
4043
protected void initChannel(SocketChannel ch) {
4144
// NettyClientHandler will be added dynamically when connection is built
45+
if (nettyLogger.getLoggingHandler() != null) {
46+
ch.pipeline().addLast("loggingHandler", nettyLogger.getLoggingHandler());
47+
}
4248
ch.pipeline()
4349
.addLast(
4450
"frameDecoder",

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/ServerChannelInitializer.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.rpc.netty.server;
1818

19+
import com.alibaba.fluss.rpc.netty.NettyLogger;
1920
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
2021
import com.alibaba.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
2122
import com.alibaba.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -32,6 +33,8 @@ final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
3233
private final NettyServerHandler sharedServerHandler;
3334
private final int maxIdleTimeSeconds;
3435

36+
private static final NettyLogger nettyLogger = new NettyLogger();
37+
3538
public ServerChannelInitializer(
3639
NettyServerHandler sharedServerHandler, long maxIdleTimeSeconds) {
3740
checkArgument(maxIdleTimeSeconds <= Integer.MAX_VALUE, "maxIdleTimeSeconds too large");
@@ -41,6 +44,9 @@ public ServerChannelInitializer(
4144

4245
@Override
4346
protected void initChannel(SocketChannel ch) {
47+
if (nettyLogger.getLoggingHandler() != null) {
48+
ch.pipeline().addLast("loggingHandler", nettyLogger.getLoggingHandler());
49+
}
4450
ch.pipeline()
4551
.addLast(
4652
"frameDecoder",

0 commit comments

Comments
 (0)