Skip to content

Commit 99fd922

Browse files
authored
[rpc] Support netty metrics (#407)
1 parent 50ccf77 commit 99fd922

File tree

11 files changed

+233
-5
lines changed

11 files changed

+233
-5
lines changed

fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,11 @@ public class MetricNames {
152152
public static final String SCANNER_REMOTE_FETCH_BYTES_RATE = "remoteFetchBytesPerSecond";
153153
public static final String SCANNER_REMOTE_FETCH_RATE = "remoteFetchRequestsPerSecond";
154154
public static final String SCANNER_REMOTE_FETCH_ERROR_RATE = "remoteFetchErrorPerSecond";
155+
156+
// for netty
157+
public static final String NETTY_USED_DIRECT_MEMORY = "usedDirectMemory";
158+
public static final String NETTY_NUM_DIRECT_ARENAS = "numDirectArenas";
159+
public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond";
160+
public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS =
161+
"numHugeAllocationsPerSecond";
155162
}

fluss-common/src/main/java/com/alibaba/fluss/metrics/groups/AbstractMetricGroup.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.metrics.groups;
1818

1919
import com.alibaba.fluss.annotation.Internal;
20+
import com.alibaba.fluss.annotation.VisibleForTesting;
2021
import com.alibaba.fluss.metrics.CharacterFilter;
2122
import com.alibaba.fluss.metrics.Counter;
2223
import com.alibaba.fluss.metrics.Gauge;
@@ -361,6 +362,11 @@ protected GenericMetricGroup createChildGroup(String name, ChildType childType)
361362
}
362363
}
363364

365+
@VisibleForTesting
366+
public Map<String, Metric> getMetrics() {
367+
return metrics;
368+
}
369+
364370
/**
365371
* Enum for indicating which child group should be created. `KEY` is used to create {@link
366372
* GenericKeyMetricGroup}. `VALUE` is used to create {@link GenericValueMetricGroup}. `GENERIC`

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcServer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.rpc;
1818

1919
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.metrics.groups.MetricGroup;
2021
import com.alibaba.fluss.rpc.netty.server.NettyServer;
2122
import com.alibaba.fluss.rpc.netty.server.RequestsMetrics;
2223
import com.alibaba.fluss.utils.AutoCloseableAsync;
@@ -36,6 +37,7 @@ public interface RpcServer extends AutoCloseableAsync {
3637
* @param externalAddress The external address to bind to.
3738
* @param externalPortRange The external port range to bind to.
3839
* @param service The service to handle incoming requests.
40+
* @param serverMetricGroup The metric group of server to report.
3941
* @param requestsMetrics the requests metrics to report.
4042
* @return The new RPC server.
4143
*/
@@ -44,9 +46,16 @@ static RpcServer create(
4446
String externalAddress,
4547
String externalPortRange,
4648
RpcGatewayService service,
49+
MetricGroup serverMetricGroup,
4750
RequestsMetrics requestsMetrics)
4851
throws IOException {
49-
return new NettyServer(conf, externalAddress, externalPortRange, service, requestsMetrics);
52+
return new NettyServer(
53+
conf,
54+
externalAddress,
55+
externalPortRange,
56+
service,
57+
serverMetricGroup,
58+
requestsMetrics);
5059
}
5160

5261
/** Starts the RPC server by binding to the configured bind address and port (blocking). */
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.rpc.netty;
18+
19+
import com.alibaba.fluss.metrics.Gauge;
20+
import com.alibaba.fluss.metrics.MeterView;
21+
import com.alibaba.fluss.metrics.MetricNames;
22+
import com.alibaba.fluss.metrics.groups.MetricGroup;
23+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.PoolArenaMetric;
24+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
25+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorMetric;
26+
27+
/** A netty metrics class to register metrics from netty. */
28+
public class NettyMetrics {
29+
30+
public static final String NETTY_METRIC_GROUP = "netty";
31+
32+
public static void registerNettyMetrics(
33+
MetricGroup metricGroup, PooledByteBufAllocator pooledAllocator) {
34+
MetricGroup nettyMetricGroup = metricGroup.addGroup(NETTY_METRIC_GROUP);
35+
PooledByteBufAllocatorMetric pooledAllocatorMetric = pooledAllocator.metric();
36+
nettyMetricGroup.<Long, Gauge<Long>>gauge(
37+
MetricNames.NETTY_USED_DIRECT_MEMORY, pooledAllocatorMetric::usedDirectMemory);
38+
nettyMetricGroup.<Integer, Gauge<Integer>>gauge(
39+
MetricNames.NETTY_NUM_DIRECT_ARENAS, pooledAllocatorMetric::numDirectArenas);
40+
nettyMetricGroup.meter(
41+
MetricNames.NETTY_NUM_ALLOCATIONS_PER_SECONDS,
42+
new MeterView(
43+
() ->
44+
pooledAllocatorMetric.directArenas().stream()
45+
.mapToLong(PoolArenaMetric::numAllocations)
46+
.sum()));
47+
nettyMetricGroup.meter(
48+
MetricNames.NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS,
49+
new MeterView(
50+
() ->
51+
pooledAllocatorMetric.directArenas().stream()
52+
.mapToLong(PoolArenaMetric::numHugeAllocations)
53+
.sum()));
54+
}
55+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.rpc.RpcClient;
2424
import com.alibaba.fluss.rpc.messages.ApiMessage;
2525
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
26+
import com.alibaba.fluss.rpc.netty.NettyMetrics;
2627
import com.alibaba.fluss.rpc.netty.NettyUtils;
2728
import com.alibaba.fluss.rpc.protocol.ApiKeys;
2829
import com.alibaba.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
@@ -82,14 +83,16 @@ public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) {
8283
int connectTimeoutMs = (int) conf.get(ConfigOptions.CLIENT_CONNECT_TIMEOUT).toMillis();
8384
int connectionMaxIdle =
8485
(int) conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
86+
PooledByteBufAllocator pooledAllocator = PooledByteBufAllocator.DEFAULT;
8587
this.bootstrap =
8688
new Bootstrap()
8789
.group(eventGroup)
8890
.channel(NettyUtils.getClientSocketChannelClass(eventGroup))
89-
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
91+
.option(ChannelOption.ALLOCATOR, pooledAllocator)
9092
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
9193
.handler(new ClientChannelInitializer(connectionMaxIdle));
9294
this.clientMetricGroup = clientMetricGroup;
95+
NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
9396
}
9497

9598
/**

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import com.alibaba.fluss.config.ConfigOptions;
2020
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.metrics.groups.MetricGroup;
2122
import com.alibaba.fluss.rpc.RpcGateway;
2223
import com.alibaba.fluss.rpc.RpcGatewayService;
2324
import com.alibaba.fluss.rpc.RpcServer;
25+
import com.alibaba.fluss.rpc.netty.NettyMetrics;
2426
import com.alibaba.fluss.rpc.netty.NettyUtils;
2527
import com.alibaba.fluss.rpc.protocol.ApiManager;
2628
import com.alibaba.fluss.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -60,6 +62,7 @@ public final class NettyServer implements RpcServer {
6062
private final String portRange;
6163
private final RequestProcessorPool workerPool;
6264
private final ApiManager apiManager;
65+
private final MetricGroup serverMetricGroup;
6366

6467
private EventLoopGroup acceptorGroup;
6568
private EventLoopGroup selectorGroup;
@@ -73,10 +76,12 @@ public NettyServer(
7376
String hostname,
7477
String portRange,
7578
RpcGatewayService service,
79+
MetricGroup serverMetricGroup,
7680
RequestsMetrics requestsMetrics) {
7781
this.conf = checkNotNull(conf, "conf");
7882
this.hostname = checkNotNull(hostname, "hostname");
7983
this.portRange = checkNotNull(portRange, "portRange");
84+
this.serverMetricGroup = checkNotNull(serverMetricGroup, "serverMetricGroup");
8085
this.apiManager = new ApiManager(service.providerType());
8186

8287
this.workerPool =
@@ -109,7 +114,8 @@ public void start() throws IOException {
109114
NettyUtils.newEventLoopGroup(numNetworkThreads, "fluss-netty-server-selector");
110115

111116
ServerBootstrap bootstrap = new ServerBootstrap();
112-
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
117+
PooledByteBufAllocator pooledBufAllocator = PooledByteBufAllocator.DEFAULT;
118+
bootstrap.childOption(ChannelOption.ALLOCATOR, pooledBufAllocator);
113119
bootstrap.group(acceptorGroup, selectorGroup);
114120
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
115121
bootstrap.childOption(
@@ -168,6 +174,7 @@ public void start() throws IOException {
168174
bindAddress);
169175

170176
isRunning = true;
177+
NettyMetrics.registerNettyMetrics(serverMetricGroup, pooledBufAllocator);
171178
}
172179

173180
@Override
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.rpc.netty;
18+
19+
import com.alibaba.fluss.cluster.ServerNode;
20+
import com.alibaba.fluss.cluster.ServerType;
21+
import com.alibaba.fluss.config.Configuration;
22+
import com.alibaba.fluss.metrics.Metric;
23+
import com.alibaba.fluss.metrics.MetricNames;
24+
import com.alibaba.fluss.metrics.groups.GenericMetricGroup;
25+
import com.alibaba.fluss.metrics.groups.MetricGroup;
26+
import com.alibaba.fluss.metrics.util.NOPMetricsGroup;
27+
import com.alibaba.fluss.rpc.TestingGatewayService;
28+
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
29+
import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup;
30+
import com.alibaba.fluss.rpc.netty.client.NettyClient;
31+
import com.alibaba.fluss.rpc.netty.server.NettyServer;
32+
import com.alibaba.fluss.rpc.netty.server.RequestsMetrics;
33+
import com.alibaba.fluss.utils.NetUtils;
34+
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
39+
import java.util.Map;
40+
41+
import static com.alibaba.fluss.utils.NetUtils.getAvailablePort;
42+
import static org.assertj.core.api.Assertions.assertThat;
43+
44+
/** Tests for {@link NettyMetrics}. */
45+
public class NettyMetricsTest {
46+
47+
private NettyServer nettyServer;
48+
private NettyClient nettyClient;
49+
private MetricGroup serverGroup;
50+
private MetricGroup clientGroup;
51+
52+
@BeforeEach
53+
public void setup() throws Exception {
54+
Configuration conf = new Configuration();
55+
try (NetUtils.Port availablePort = getAvailablePort()) {
56+
ServerNode serverNode =
57+
new ServerNode(1, "localhost", availablePort.getPort(), ServerType.COORDINATOR);
58+
MetricGroup serverMetricGroup = NOPMetricsGroup.newInstance();
59+
serverGroup = serverMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP);
60+
nettyServer =
61+
new NettyServer(
62+
conf,
63+
serverNode.host(),
64+
String.valueOf(serverNode.port()),
65+
new TestingGatewayService(),
66+
serverMetricGroup,
67+
RequestsMetrics.createCoordinatorServerRequestMetrics(
68+
serverMetricGroup));
69+
nettyServer.start();
70+
}
71+
ClientMetricGroup clientMetricGroup = TestingClientMetricGroup.newInstance();
72+
clientGroup = clientMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP);
73+
nettyClient = new NettyClient(conf, clientMetricGroup);
74+
}
75+
76+
@AfterEach
77+
public void cleanup() throws Exception {
78+
if (nettyServer != null) {
79+
nettyServer.close();
80+
}
81+
if (nettyClient != null) {
82+
nettyClient.close();
83+
}
84+
}
85+
86+
@Test
87+
void testNettyMetricsCompleteness() {
88+
assertNettyMetrics(serverGroup);
89+
assertNettyMetrics(clientGroup);
90+
}
91+
92+
private void assertNettyMetrics(MetricGroup metricGroup) {
93+
Map<String, Metric> metrics = ((GenericMetricGroup) metricGroup).getMetrics();
94+
assertThat(metrics.get(MetricNames.NETTY_USED_DIRECT_MEMORY)).isNotNull();
95+
assertThat(metrics.get(MetricNames.NETTY_NUM_DIRECT_ARENAS)).isNotNull();
96+
assertThat(metrics.get(MetricNames.NETTY_NUM_ALLOCATIONS_PER_SECONDS)).isNotNull();
97+
assertThat(metrics.get(MetricNames.NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS)).isNotNull();
98+
}
99+
}

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.cluster.ServerType;
2121
import com.alibaba.fluss.config.ConfigOptions;
2222
import com.alibaba.fluss.config.Configuration;
23+
import com.alibaba.fluss.metrics.groups.MetricGroup;
2324
import com.alibaba.fluss.metrics.util.NOPMetricsGroup;
2425
import com.alibaba.fluss.rpc.TestingGatewayService;
2526
import com.alibaba.fluss.rpc.messages.ApiMessage;
@@ -186,14 +187,15 @@ private void buildNettyServer(int serverId) throws Exception {
186187
new ServerNode(
187188
serverId, "localhost", availablePort.getPort(), ServerType.COORDINATOR);
188189
service = new TestingGatewayService();
190+
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
189191
nettyServer =
190192
new NettyServer(
191193
conf,
192194
serverNode.host(),
193195
String.valueOf(serverNode.port()),
194196
service,
195-
RequestsMetrics.createCoordinatorServerRequestMetrics(
196-
NOPMetricsGroup.newInstance()));
197+
metricGroup,
198+
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
197199
nettyServer.start();
198200
}
199201
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ protected void startServices() throws Exception {
156156
conf.getString(ConfigOptions.COORDINATOR_HOST),
157157
conf.getString(ConfigOptions.COORDINATOR_PORT),
158158
coordinatorService,
159+
serverMetricGroup,
159160
RequestsMetrics.createCoordinatorServerRequestMetrics(
160161
serverMetricGroup));
161162
rpcServer.start();

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ protected void startServices() throws Exception {
209209
conf.getString(ConfigOptions.TABLET_SERVER_HOST),
210210
conf.getString(ConfigOptions.TABLET_SERVER_PORT),
211211
tabletService,
212+
tabletServerMetricGroup,
212213
requestsMetrics);
213214
rpcServer.start();
214215

0 commit comments

Comments
 (0)