Skip to content

Commit 7212125

Browse files
committed
Aggregate fluss client connection metrics.
1 parent e69e4f4 commit 7212125

File tree

6 files changed

+238
-14
lines changed

6 files changed

+238
-14
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,23 @@ public class MetricNames {
144144
// metrics for rpc client
145145
// --------------------------------------------------------------------------------------------
146146
public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
147+
public static final String CLIENT_REQUESTS_RATE_AVG = "requestsPerSecond_avg";
148+
public static final String CLIENT_REQUESTS_RATE_SUM = "requestsPerSecond_sum";
147149
public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
150+
public static final String CLIENT_RESPONSES_RATE_AVG = "responsesPerSecond_avg";
151+
public static final String CLIENT_RESPONSES_RATE_SUM = "responsesPerSecond_sum";
148152
public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
153+
public static final String CLIENT_BYTES_IN_RATE_AVG = "bytesInPerSecond_avg";
154+
public static final String CLIENT_BYTES_IN_RATE_SUM = "bytesInPerSecond_sum";
149155
public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
156+
public static final String CLIENT_BYTES_OUT_RATE_AVG = "bytesOutPerSecond_avg";
157+
public static final String CLIENT_BYTES_OUT_RATE_SUM = "bytesOutPerSecond_sum";
150158
public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
159+
public static final String CLIENT_REQUEST_LATENCY_MS_AVG = "requestLatencyMs_avg";
160+
public static final String CLIENT_REQUEST_LATENCY_MS_MAX = "requestLatencyMs_max";
151161
public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
162+
public static final String CLIENT_REQUESTS_IN_FLIGHT_AVG = "requestsInFlight_avg";
163+
public static final String CLIENT_REQUESTS_IN_FLIGHT_SUM = "requestsInFlight_sum";
152164

153165
// --------------------------------------------------------------------------------------------
154166
// metrics for client

fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818
package org.apache.fluss.rpc.metrics;
1919

2020
import org.apache.fluss.metrics.CharacterFilter;
21+
import org.apache.fluss.metrics.MetricNames;
2122
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2223
import org.apache.fluss.metrics.registry.MetricRegistry;
24+
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
2325

26+
import java.util.HashSet;
2427
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.function.ToLongFunction;
2530

2631
/** The metric group for clients. */
2732
public class ClientMetricGroup extends AbstractMetricGroup {
33+
private final Set<ConnectionMetricGroup> connectionMetricGroups;
2834

2935
private static final String NAME = "client";
3036

@@ -33,6 +39,43 @@ public class ClientMetricGroup extends AbstractMetricGroup {
3339
public ClientMetricGroup(MetricRegistry registry, String clientId) {
3440
super(registry, new String[] {NAME}, null);
3541
this.clientId = clientId;
42+
this.connectionMetricGroups = new HashSet<>();
43+
this.gauge(
44+
MetricNames.CLIENT_REQUESTS_RATE_AVG,
45+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestRate));
46+
this.gauge(
47+
MetricNames.CLIENT_REQUESTS_RATE_SUM,
48+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::requestRate));
49+
this.gauge(
50+
MetricNames.CLIENT_RESPONSES_RATE_AVG,
51+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::responseRate));
52+
this.gauge(
53+
MetricNames.CLIENT_RESPONSES_RATE_SUM,
54+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::responseRate));
55+
this.gauge(
56+
MetricNames.CLIENT_BYTES_IN_RATE_AVG,
57+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::byteInRate));
58+
this.gauge(
59+
MetricNames.CLIENT_BYTES_IN_RATE_SUM,
60+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::byteInRate));
61+
this.gauge(
62+
MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
63+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::byteOutRate));
64+
this.gauge(
65+
MetricNames.CLIENT_BYTES_OUT_RATE_SUM,
66+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::byteOutRate));
67+
this.gauge(
68+
MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
69+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestLatencyMs));
70+
this.gauge(
71+
MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
72+
() -> getMetricsMax(ConnectionMetricGroup.Metrics::requestLatencyMs));
73+
this.gauge(
74+
MetricNames.CLIENT_REQUESTS_IN_FLIGHT_AVG,
75+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestsInFlight));
76+
this.gauge(
77+
MetricNames.CLIENT_REQUESTS_IN_FLIGHT_SUM,
78+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::requestsInFlight));
3679
}
3780

3881
@Override
@@ -50,6 +93,43 @@ public MetricRegistry getMetricRegistry() {
5093
}
5194

5295
public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
53-
return new ConnectionMetricGroup(registry, serverId, this);
96+
// Only expose aggregate metrics to reduce the reporter pressure.
97+
ConnectionMetricGroup connectionMetricGroup =
98+
new ConnectionMetricGroup(NOPMetricRegistry.INSTANCE, serverId, this);
99+
connectionMetricGroups.add(connectionMetricGroup);
100+
return connectionMetricGroup;
101+
}
102+
103+
public void removeConnectionMetricGroup(ConnectionMetricGroup connectionMetricGroup) {
104+
connectionMetricGroups.remove(connectionMetricGroup);
105+
}
106+
107+
private double getMetricsAvg(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
108+
return connectionMetricGroups.stream()
109+
.flatMap(
110+
connectionMetricGroup ->
111+
connectionMetricGroup.metricsByRequestName.values().stream())
112+
.mapToLong(metricGetter)
113+
.average()
114+
.orElse(0);
115+
}
116+
117+
private long getMetricsSum(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
118+
return connectionMetricGroups.stream()
119+
.flatMap(
120+
connectionMetricGroup ->
121+
connectionMetricGroup.metricsByRequestName.values().stream())
122+
.mapToLong(metricGetter)
123+
.sum();
124+
}
125+
126+
private long getMetricsMax(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
127+
return connectionMetricGroups.stream()
128+
.flatMap(
129+
connectionMetricGroup ->
130+
connectionMetricGroup.metricsByRequestName.values().stream())
131+
.mapToLong(metricGetter)
132+
.max()
133+
.orElse(0);
54134
}
55135
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class ConnectionMetricGroup extends AbstractMetricGroup {
4545
private final String serverId;
4646

4747
/** Metrics for different request/response metrics with specify {@link ApiKeys}. */
48-
private final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
48+
final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
4949

5050
public ConnectionMetricGroup(
5151
MetricRegistry registry, String serverId, ClientMetricGroup parent) {
@@ -93,7 +93,7 @@ public void updateMetricsAfterGetResponse(ApiKeys apikey, long requestStartTime,
9393
}
9494

9595
@Nullable
96-
private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
96+
Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
9797
if (!REPORT_API_KEYS.contains(apikey)) {
9898
return null;
9999
}
@@ -102,7 +102,7 @@ private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
102102
apikey.name(), keyName -> new Metrics(this.addGroup("request", keyName)));
103103
}
104104

105-
private static final class Metrics {
105+
static final class Metrics {
106106
final Counter requests;
107107
final Counter responses;
108108
final Counter inComingBytes;
@@ -124,5 +124,29 @@ private Metrics(MetricGroup metricGroup) {
124124
requestsInFlight = new AtomicInteger(0);
125125
metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, requestsInFlight::get);
126126
}
127+
128+
long requestRate() {
129+
return requests.getCount();
130+
}
131+
132+
long responseRate() {
133+
return responses.getCount();
134+
}
135+
136+
long byteInRate() {
137+
return inComingBytes.getCount();
138+
}
139+
140+
long byteOutRate() {
141+
return outGoingBytes.getCount();
142+
}
143+
144+
long requestLatencyMs() {
145+
return requestLatencyMs;
146+
}
147+
148+
long requestsInFlight() {
149+
return requestsInFlight.get();
150+
}
127151
}
128152
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ final class ServerConnection {
7272
// TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection"
7373
private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap();
7474
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
75+
private final ClientMetricGroup clientMetricGroup;
7576
private final ConnectionMetricGroup connectionMetricGroup;
7677
private final ClientAuthenticator authenticator;
7778
private final ExponentialBackoff backoff;
@@ -108,6 +109,7 @@ final class ServerConnection {
108109
boolean isInnerClient) {
109110
this.node = node;
110111
this.state = ConnectionState.CONNECTING;
112+
this.clientMetricGroup = clientMetricGroup;
111113
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
112114
this.authenticator = authenticator;
113115
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
@@ -190,6 +192,7 @@ private CompletableFuture<Void> close(Throwable cause) {
190192
closeFuture.completeExceptionally(cause);
191193
}
192194

195+
clientMetricGroup.removeConnectionMetricGroup(connectionMetricGroup);
193196
connectionMetricGroup.close();
194197
}
195198

@@ -449,8 +452,10 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
449452
}
450453

451454
private void switchState(ConnectionState targetState) {
452-
LOG.debug("switch state form {} to {}", state, targetState);
453-
state = targetState;
455+
if (state != ConnectionState.DISCONNECTED) {
456+
LOG.debug("switch state form {} to {}", state, targetState);
457+
state = targetState;
458+
}
454459
if (targetState == ConnectionState.READY) {
455460
// process pending requests
456461
PendingRequest pending;

fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
122122

123123
@Override
124124
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
125-
return null;
125+
return CompletableFuture.completedFuture(new LookupResponse());
126126
}
127127

128128
@Override

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@
2222
import org.apache.fluss.cluster.ServerType;
2323
import org.apache.fluss.config.Configuration;
2424
import org.apache.fluss.exception.DisconnectException;
25+
import org.apache.fluss.metrics.Gauge;
26+
import org.apache.fluss.metrics.Metric;
27+
import org.apache.fluss.metrics.MetricType;
28+
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2529
import org.apache.fluss.metrics.groups.MetricGroup;
30+
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
2631
import org.apache.fluss.metrics.util.NOPMetricsGroup;
2732
import org.apache.fluss.rpc.TestingGatewayService;
33+
import org.apache.fluss.rpc.TestingTabletGatewayService;
2834
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
35+
import org.apache.fluss.rpc.messages.LookupRequest;
36+
import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
2937
import org.apache.fluss.rpc.messages.PbTablePath;
38+
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
3039
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
3140
import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
3241
import org.apache.fluss.rpc.netty.server.NettyServer;
@@ -42,9 +51,24 @@
4251
import org.junit.jupiter.api.BeforeEach;
4352
import org.junit.jupiter.api.Test;
4453

45-
import java.util.Collections;
54+
import java.util.Arrays;
55+
import java.util.HashMap;
56+
import java.util.Map;
4657
import java.util.concurrent.CompletableFuture;
47-
58+
import java.util.concurrent.ExecutionException;
59+
60+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
61+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_SUM;
62+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_AVG;
63+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_SUM;
64+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_AVG;
65+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_SUM;
66+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_AVG;
67+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_SUM;
68+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG;
69+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX;
70+
import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_AVG;
71+
import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_SUM;
4872
import static org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
4973
import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
5074
import static org.apache.fluss.utils.NetUtils.getAvailablePort;
@@ -60,6 +84,7 @@ public class ServerConnectionTest {
6084
private Configuration conf;
6185
private NettyServer nettyServer;
6286
private ServerNode serverNode;
87+
private ServerNode serverNode2;
6388
private TestingGatewayService service;
6489

6590
@BeforeEach
@@ -121,22 +146,100 @@ void testConnectionClose() {
121146
assertThat(future.isDone()).isTrue();
122147
}
123148

149+
@Test
150+
void testConnectionMetrics() throws ExecutionException, InterruptedException {
151+
MockMetricRegistry metricRegistry = new MockMetricRegistry();
152+
ClientMetricGroup client = new ClientMetricGroup(metricRegistry, "client");
153+
ServerConnection connection =
154+
new ServerConnection(
155+
bootstrap,
156+
serverNode,
157+
client,
158+
clientAuthenticator,
159+
(con, ignore) -> {},
160+
false);
161+
ServerConnection connection2 =
162+
new ServerConnection(
163+
bootstrap,
164+
serverNode2,
165+
client,
166+
clientAuthenticator,
167+
(con, ignore) -> {},
168+
false);
169+
LookupRequest request = new LookupRequest().setTableId(1);
170+
PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
171+
pbLookupReqForBucket.setBucketId(1);
172+
assertThat(metricRegistry.registeredMetrics).hasSize(12);
173+
174+
connection.send(ApiKeys.LOOKUP, request).get();
175+
connection2.send(ApiKeys.LOOKUP, request).get();
176+
177+
assertThat(metricRegistry.registeredMetrics).hasSize(12);
178+
assertThat(metricRegistry.registeredMetrics.keySet())
179+
.containsExactlyInAnyOrder(
180+
CLIENT_REQUESTS_RATE_AVG,
181+
CLIENT_REQUESTS_RATE_SUM,
182+
CLIENT_RESPONSES_RATE_AVG,
183+
CLIENT_RESPONSES_RATE_SUM,
184+
CLIENT_BYTES_IN_RATE_AVG,
185+
CLIENT_BYTES_IN_RATE_SUM,
186+
CLIENT_BYTES_OUT_RATE_AVG,
187+
CLIENT_BYTES_OUT_RATE_SUM,
188+
CLIENT_REQUEST_LATENCY_MS_AVG,
189+
CLIENT_REQUEST_LATENCY_MS_MAX,
190+
CLIENT_REQUESTS_IN_FLIGHT_AVG,
191+
CLIENT_REQUESTS_IN_FLIGHT_SUM);
192+
Metric metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_AVG);
193+
assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
194+
assertThat(((Gauge<?>) metric).getValue()).isEqualTo(1.0);
195+
metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_SUM);
196+
assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
197+
assertThat(((Gauge<?>) metric).getValue()).isEqualTo(2L);
198+
connection.close().get();
199+
}
200+
124201
private void buildNettyServer(int serverId) throws Exception {
125-
try (NetUtils.Port availablePort = getAvailablePort()) {
202+
try (NetUtils.Port availablePort = getAvailablePort();
203+
NetUtils.Port availablePort2 = getAvailablePort()) {
126204
serverNode =
127205
new ServerNode(
128-
serverId, "localhost", availablePort.getPort(), ServerType.COORDINATOR);
129-
service = new TestingGatewayService();
206+
serverId,
207+
"localhost",
208+
availablePort.getPort(),
209+
ServerType.TABLET_SERVER);
210+
serverNode2 =
211+
new ServerNode(
212+
serverId,
213+
"localhost",
214+
availablePort2.getPort(),
215+
ServerType.TABLET_SERVER);
216+
service = new TestingTabletGatewayService();
130217
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
131218
nettyServer =
132219
new NettyServer(
133220
conf,
134-
Collections.singleton(
135-
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL")),
221+
Arrays.asList(
222+
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL"),
223+
new Endpoint(serverNode2.host(), serverNode2.port(), "CLIENT")),
136224
service,
137225
metricGroup,
138226
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
139227
nettyServer.start();
140228
}
141229
}
230+
231+
private static class MockMetricRegistry extends NOPMetricRegistry {
232+
233+
Map<String, Metric> registeredMetrics = new HashMap<>();
234+
235+
@Override
236+
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
237+
registeredMetrics.put(metricName, metric);
238+
}
239+
240+
@Override
241+
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
242+
registeredMetrics.remove(metricName, metric);
243+
}
244+
}
142245
}

0 commit comments

Comments
 (0)