Skip to content

Commit ddb18a7

Browse files
committed
Aggregate fluss client connection metrics.
1 parent 31a9352 commit ddb18a7

File tree

6 files changed

+232
-14
lines changed

6 files changed

+232
-14
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,22 @@ public class MetricNames {
144144
// metrics for rpc client
145145
// --------------------------------------------------------------------------------------------
146146
public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
147+
public static final String CLIENT_REQUESTS_RATE_AVG = "requestsPerSecond_avg";
148+
public static final String CLIENT_REQUESTS_RATE_TOTAL = "requestsPerSecond_total";
147149
public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
150+
public static final String CLIENT_RESPONSES_RATE_AVG = "responsesPerSecond_avg";
151+
public static final String CLIENT_RESPONSES_RATE_TOTAL = "responsesPerSecond_total";
148152
public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
153+
public static final String CLIENT_BYTES_IN_RATE_AVG = "bytesInPerSecond_avg";
154+
public static final String CLIENT_BYTES_IN_RATE_TOTAL = "bytesInPerSecond_total";
149155
public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
156+
public static final String CLIENT_BYTES_OUT_RATE_AVG = "bytesOutPerSecond_avg";
157+
public static final String CLIENT_BYTES_OUT_RATE_TOTAL = "bytesOutPerSecond_total";
150158
public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
159+
public static final String CLIENT_REQUEST_LATENCY_MS_AVG = "requestLatencyMs_avg";
160+
public static final String CLIENT_REQUEST_LATENCY_MS_MAX = "requestLatencyMs_max";
151161
public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
162+
public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL = "requestsInFlight_total";
152163

153164
// --------------------------------------------------------------------------------------------
154165
// metrics for client

fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818
package org.apache.fluss.rpc.metrics;
1919

2020
import org.apache.fluss.metrics.CharacterFilter;
21+
import org.apache.fluss.metrics.MetricNames;
2122
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2223
import org.apache.fluss.metrics.registry.MetricRegistry;
24+
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
2325

26+
import java.util.HashSet;
2427
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.function.ToLongFunction;
2530

2631
/** The metric group for clients. */
2732
public class ClientMetricGroup extends AbstractMetricGroup {
33+
private final Set<ConnectionMetricGroup> connectionMetricGroups;
2834

2935
private static final String NAME = "client";
3036

@@ -33,6 +39,40 @@ public class ClientMetricGroup extends AbstractMetricGroup {
3339
public ClientMetricGroup(MetricRegistry registry, String clientId) {
3440
super(registry, new String[] {NAME}, null);
3541
this.clientId = clientId;
42+
this.connectionMetricGroups = new HashSet<>();
43+
this.gauge(
44+
MetricNames.CLIENT_REQUESTS_RATE_AVG,
45+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestRate));
46+
this.gauge(
47+
MetricNames.CLIENT_REQUESTS_RATE_TOTAL,
48+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::requestRate));
49+
this.gauge(
50+
MetricNames.CLIENT_RESPONSES_RATE_AVG,
51+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::responseRate));
52+
this.gauge(
53+
MetricNames.CLIENT_RESPONSES_RATE_TOTAL,
54+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::responseRate));
55+
this.gauge(
56+
MetricNames.CLIENT_BYTES_IN_RATE_AVG,
57+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::byteInRate));
58+
this.gauge(
59+
MetricNames.CLIENT_BYTES_IN_RATE_TOTAL,
60+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::byteInRate));
61+
this.gauge(
62+
MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
63+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::byteOutRate));
64+
this.gauge(
65+
MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL,
66+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::byteOutRate));
67+
this.gauge(
68+
MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
69+
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestLatencyMs));
70+
this.gauge(
71+
MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
72+
() -> getMetricsMax(ConnectionMetricGroup.Metrics::requestLatencyMs));
73+
this.gauge(
74+
MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL,
75+
() -> getMetricsSum(ConnectionMetricGroup.Metrics::requestsInFlight));
3676
}
3777

3878
@Override
@@ -50,6 +90,43 @@ public MetricRegistry getMetricRegistry() {
5090
}
5191

5292
public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
53-
return new ConnectionMetricGroup(registry, serverId, this);
93+
// Only expose aggregate metrics to reduce the reporter pressure.
94+
ConnectionMetricGroup connectionMetricGroup =
95+
new ConnectionMetricGroup(NOPMetricRegistry.INSTANCE, serverId, this);
96+
connectionMetricGroups.add(connectionMetricGroup);
97+
return connectionMetricGroup;
98+
}
99+
100+
public void removeConnectionMetricGroup(ConnectionMetricGroup connectionMetricGroup) {
101+
connectionMetricGroups.remove(connectionMetricGroup);
102+
}
103+
104+
private double getMetricsAvg(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
105+
return connectionMetricGroups.stream()
106+
.flatMap(
107+
connectionMetricGroup ->
108+
connectionMetricGroup.metricsByRequestName.values().stream())
109+
.mapToLong(metricGetter)
110+
.average()
111+
.orElse(0);
112+
}
113+
114+
private long getMetricsSum(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
115+
return connectionMetricGroups.stream()
116+
.flatMap(
117+
connectionMetricGroup ->
118+
connectionMetricGroup.metricsByRequestName.values().stream())
119+
.mapToLong(metricGetter)
120+
.sum();
121+
}
122+
123+
private long getMetricsMax(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
124+
return connectionMetricGroups.stream()
125+
.flatMap(
126+
connectionMetricGroup ->
127+
connectionMetricGroup.metricsByRequestName.values().stream())
128+
.mapToLong(metricGetter)
129+
.max()
130+
.orElse(0);
54131
}
55132
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class ConnectionMetricGroup extends AbstractMetricGroup {
4545
private final String serverId;
4646

4747
/** Metrics for different request/response metrics with specify {@link ApiKeys}. */
48-
private final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
48+
final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
4949

5050
public ConnectionMetricGroup(
5151
MetricRegistry registry, String serverId, ClientMetricGroup parent) {
@@ -93,7 +93,7 @@ public void updateMetricsAfterGetResponse(ApiKeys apikey, long requestStartTime,
9393
}
9494

9595
@Nullable
96-
private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
96+
Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
9797
if (!REPORT_API_KEYS.contains(apikey)) {
9898
return null;
9999
}
@@ -102,7 +102,7 @@ private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
102102
apikey.name(), keyName -> new Metrics(this.addGroup("request", keyName)));
103103
}
104104

105-
private static final class Metrics {
105+
static final class Metrics {
106106
final Counter requests;
107107
final Counter responses;
108108
final Counter inComingBytes;
@@ -124,5 +124,29 @@ private Metrics(MetricGroup metricGroup) {
124124
requestsInFlight = new AtomicInteger(0);
125125
metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, requestsInFlight::get);
126126
}
127+
128+
long requestRate() {
129+
return requests.getCount();
130+
}
131+
132+
long responseRate() {
133+
return responses.getCount();
134+
}
135+
136+
long byteInRate() {
137+
return inComingBytes.getCount();
138+
}
139+
140+
long byteOutRate() {
141+
return outGoingBytes.getCount();
142+
}
143+
144+
long requestLatencyMs() {
145+
return requestLatencyMs;
146+
}
147+
148+
long requestsInFlight() {
149+
return requestsInFlight.get();
150+
}
127151
}
128152
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ final class ServerConnection {
7272
// TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection"
7373
private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap();
7474
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
75+
private final ClientMetricGroup clientMetricGroup;
7576
private final ConnectionMetricGroup connectionMetricGroup;
7677
private final ClientAuthenticator authenticator;
7778
private final ExponentialBackoff backoff;
@@ -108,6 +109,7 @@ final class ServerConnection {
108109
boolean isInnerClient) {
109110
this.node = node;
110111
this.state = ConnectionState.CONNECTING;
112+
this.clientMetricGroup = clientMetricGroup;
111113
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
112114
this.authenticator = authenticator;
113115
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
@@ -190,6 +192,7 @@ private CompletableFuture<Void> close(Throwable cause) {
190192
closeFuture.completeExceptionally(cause);
191193
}
192194

195+
clientMetricGroup.removeConnectionMetricGroup(connectionMetricGroup);
193196
connectionMetricGroup.close();
194197
}
195198

@@ -449,8 +452,10 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
449452
}
450453

451454
private void switchState(ConnectionState targetState) {
452-
LOG.debug("switch state form {} to {}", state, targetState);
453-
state = targetState;
455+
if (state != ConnectionState.DISCONNECTED) {
456+
LOG.debug("switch state form {} to {}", state, targetState);
457+
state = targetState;
458+
}
454459
if (targetState == ConnectionState.READY) {
455460
// process pending requests
456461
PendingRequest pending;

fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
122122

123123
@Override
124124
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
125-
return null;
125+
return CompletableFuture.completedFuture(new LookupResponse());
126126
}
127127

128128
@Override

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java

Lines changed: 108 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@
2222
import org.apache.fluss.cluster.ServerType;
2323
import org.apache.fluss.config.Configuration;
2424
import org.apache.fluss.exception.DisconnectException;
25+
import org.apache.fluss.metrics.Gauge;
26+
import org.apache.fluss.metrics.Metric;
27+
import org.apache.fluss.metrics.MetricType;
28+
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2529
import org.apache.fluss.metrics.groups.MetricGroup;
30+
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
2631
import org.apache.fluss.metrics.util.NOPMetricsGroup;
2732
import org.apache.fluss.rpc.TestingGatewayService;
33+
import org.apache.fluss.rpc.TestingTabletGatewayService;
2834
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
35+
import org.apache.fluss.rpc.messages.LookupRequest;
36+
import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
2937
import org.apache.fluss.rpc.messages.PbTablePath;
38+
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
3039
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
3140
import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
3241
import org.apache.fluss.rpc.netty.server.NettyServer;
@@ -42,9 +51,23 @@
4251
import org.junit.jupiter.api.BeforeEach;
4352
import org.junit.jupiter.api.Test;
4453

45-
import java.util.Collections;
54+
import java.util.Arrays;
55+
import java.util.HashMap;
56+
import java.util.Map;
4657
import java.util.concurrent.CompletableFuture;
47-
58+
import java.util.concurrent.ExecutionException;
59+
60+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
61+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL;
62+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_AVG;
63+
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL;
64+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL;
65+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_AVG;
66+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_TOTAL;
67+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG;
68+
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX;
69+
import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_AVG;
70+
import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_TOTAL;
4871
import static org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
4972
import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
5073
import static org.apache.fluss.utils.NetUtils.getAvailablePort;
@@ -60,6 +83,7 @@ public class ServerConnectionTest {
6083
private Configuration conf;
6184
private NettyServer nettyServer;
6285
private ServerNode serverNode;
86+
private ServerNode serverNode2;
6387
private TestingGatewayService service;
6488

6589
@BeforeEach
@@ -121,22 +145,99 @@ void testConnectionClose() {
121145
assertThat(future.isDone()).isTrue();
122146
}
123147

148+
@Test
149+
void testConnectionMetrics() throws ExecutionException, InterruptedException {
150+
MockMetricRegistry metricRegistry = new MockMetricRegistry();
151+
ClientMetricGroup client = new ClientMetricGroup(metricRegistry, "client");
152+
ServerConnection connection =
153+
new ServerConnection(
154+
bootstrap,
155+
serverNode,
156+
client,
157+
clientAuthenticator,
158+
(con, ignore) -> {},
159+
false);
160+
ServerConnection connection2 =
161+
new ServerConnection(
162+
bootstrap,
163+
serverNode2,
164+
client,
165+
clientAuthenticator,
166+
(con, ignore) -> {},
167+
false);
168+
LookupRequest request = new LookupRequest().setTableId(1);
169+
PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
170+
pbLookupReqForBucket.setBucketId(1);
171+
assertThat(metricRegistry.registeredMetrics).hasSize(11);
172+
173+
connection.send(ApiKeys.LOOKUP, request).get();
174+
connection2.send(ApiKeys.LOOKUP, request).get();
175+
176+
assertThat(metricRegistry.registeredMetrics).hasSize(11);
177+
assertThat(metricRegistry.registeredMetrics.keySet())
178+
.containsExactlyInAnyOrder(
179+
CLIENT_REQUESTS_RATE_AVG,
180+
CLIENT_REQUESTS_RATE_TOTAL,
181+
CLIENT_RESPONSES_RATE_AVG,
182+
CLIENT_RESPONSES_RATE_TOTAL,
183+
CLIENT_BYTES_IN_RATE_AVG,
184+
CLIENT_BYTES_IN_RATE_TOTAL,
185+
CLIENT_BYTES_OUT_RATE_AVG,
186+
CLIENT_BYTES_OUT_RATE_TOTAL,
187+
CLIENT_REQUEST_LATENCY_MS_AVG,
188+
CLIENT_REQUEST_LATENCY_MS_MAX,
189+
CLIENT_REQUESTS_IN_FLIGHT_TOTAL);
190+
Metric metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_AVG);
191+
assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
192+
assertThat(((Gauge<?>) metric).getValue()).isEqualTo(1.0);
193+
metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_TOTAL);
194+
assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
195+
assertThat(((Gauge<?>) metric).getValue()).isEqualTo(2L);
196+
connection.close().get();
197+
}
198+
124199
private void buildNettyServer(int serverId) throws Exception {
125-
try (NetUtils.Port availablePort = getAvailablePort()) {
200+
try (NetUtils.Port availablePort = getAvailablePort();
201+
NetUtils.Port availablePort2 = getAvailablePort()) {
126202
serverNode =
127203
new ServerNode(
128-
serverId, "localhost", availablePort.getPort(), ServerType.COORDINATOR);
129-
service = new TestingGatewayService();
204+
serverId,
205+
"localhost",
206+
availablePort.getPort(),
207+
ServerType.TABLET_SERVER);
208+
serverNode2 =
209+
new ServerNode(
210+
serverId,
211+
"localhost",
212+
availablePort2.getPort(),
213+
ServerType.TABLET_SERVER);
214+
service = new TestingTabletGatewayService();
130215
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
131216
nettyServer =
132217
new NettyServer(
133218
conf,
134-
Collections.singleton(
135-
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL")),
219+
Arrays.asList(
220+
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL"),
221+
new Endpoint(serverNode2.host(), serverNode2.port(), "CLIENT")),
136222
service,
137223
metricGroup,
138224
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
139225
nettyServer.start();
140226
}
141227
}
228+
229+
private static class MockMetricRegistry extends NOPMetricRegistry {
230+
231+
Map<String, Metric> registeredMetrics = new HashMap<>();
232+
233+
@Override
234+
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
235+
registeredMetrics.put(metricName, metric);
236+
}
237+
238+
@Override
239+
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
240+
registeredMetrics.remove(metricName, metric);
241+
}
242+
}
142243
}

0 commit comments

Comments
 (0)