Skip to content

Commit b94b2b6

Browse files
[client] Aggregate Fluss client connection metrics to reduce the number of metrics (#1896)
1 parent a956aa5 commit b94b2b6

File tree

6 files changed

+239
-66
lines changed

6 files changed

+239
-66
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,17 @@ public class MetricNames {
143143
// --------------------------------------------------------------------------------------------
144144
// metrics for rpc client
145145
// --------------------------------------------------------------------------------------------
146-
public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
147-
public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
148-
public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
149-
public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
150-
public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
151-
public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
146+
public static final String CLIENT_REQUESTS_RATE_AVG = "requestsPerSecond_avg";
147+
public static final String CLIENT_REQUESTS_RATE_TOTAL = "requestsPerSecond_total";
148+
public static final String CLIENT_RESPONSES_RATE_AVG = "responsesPerSecond_avg";
149+
public static final String CLIENT_RESPONSES_RATE_TOTAL = "responsesPerSecond_total";
150+
public static final String CLIENT_BYTES_IN_RATE_AVG = "bytesInPerSecond_avg";
151+
public static final String CLIENT_BYTES_IN_RATE_TOTAL = "bytesInPerSecond_total";
152+
public static final String CLIENT_BYTES_OUT_RATE_AVG = "bytesOutPerSecond_avg";
153+
public static final String CLIENT_BYTES_OUT_RATE_TOTAL = "bytesOutPerSecond_total";
154+
public static final String CLIENT_REQUEST_LATENCY_MS_AVG = "requestLatencyMs_avg";
155+
public static final String CLIENT_REQUEST_LATENCY_MS_MAX = "requestLatencyMs_max";
156+
public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL = "requestsInFlight_total";
152157

153158
// --------------------------------------------------------------------------------------------
154159
// metrics for client

fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
package org.apache.fluss.rpc.metrics;
1919

2020
import org.apache.fluss.metrics.CharacterFilter;
21+
import org.apache.fluss.metrics.MetricNames;
2122
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2223
import org.apache.fluss.metrics.registry.MetricRegistry;
24+
import org.apache.fluss.utils.MapUtils;
2325

2426
import java.util.Map;
27+
import java.util.function.ToLongFunction;
2528

2629
/** The metric group for clients. */
2730
public class ClientMetricGroup extends AbstractMetricGroup {
31+
private final Map<String, ConnectionMetrics> nodeToConnectionMetrics =
32+
MapUtils.newConcurrentHashMap();
2833

2934
private static final String NAME = "client";
3035

@@ -33,6 +38,39 @@ public class ClientMetricGroup extends AbstractMetricGroup {
3338
public ClientMetricGroup(MetricRegistry registry, String clientId) {
3439
super(registry, new String[] {NAME}, null);
3540
this.clientId = clientId;
41+
this.gauge(
42+
MetricNames.CLIENT_REQUESTS_RATE_AVG,
43+
() -> getMetricsAvg(ConnectionMetrics.Metrics::requestRate));
44+
this.gauge(
45+
MetricNames.CLIENT_REQUESTS_RATE_TOTAL,
46+
() -> getMetricsSum(ConnectionMetrics.Metrics::requestRate));
47+
this.gauge(
48+
MetricNames.CLIENT_RESPONSES_RATE_AVG,
49+
() -> getMetricsAvg(ConnectionMetrics.Metrics::responseRate));
50+
this.gauge(
51+
MetricNames.CLIENT_RESPONSES_RATE_TOTAL,
52+
() -> getMetricsSum(ConnectionMetrics.Metrics::responseRate));
53+
this.gauge(
54+
MetricNames.CLIENT_BYTES_IN_RATE_AVG,
55+
() -> getMetricsAvg(ConnectionMetrics.Metrics::byteInRate));
56+
this.gauge(
57+
MetricNames.CLIENT_BYTES_IN_RATE_TOTAL,
58+
() -> getMetricsSum(ConnectionMetrics.Metrics::byteInRate));
59+
this.gauge(
60+
MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
61+
() -> getMetricsAvg(ConnectionMetrics.Metrics::byteOutRate));
62+
this.gauge(
63+
MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL,
64+
() -> getMetricsSum(ConnectionMetrics.Metrics::byteOutRate));
65+
this.gauge(
66+
MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
67+
() -> getMetricsAvg(ConnectionMetrics.Metrics::requestLatencyMs));
68+
this.gauge(
69+
MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
70+
() -> getMetricsMax(ConnectionMetrics.Metrics::requestLatencyMs));
71+
this.gauge(
72+
MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL,
73+
() -> getMetricsSum(ConnectionMetrics.Metrics::requestsInFlight));
3674
}
3775

3876
@Override
@@ -49,7 +87,43 @@ public MetricRegistry getMetricRegistry() {
4987
return registry;
5088
}
5189

52-
public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
53-
return new ConnectionMetricGroup(registry, serverId, this);
90+
public ConnectionMetrics createConnectionMetricGroup(String serverId) {
91+
// Only expose aggregate metrics to reduce the reporter pressure.
92+
ConnectionMetrics connectionMetrics = new ConnectionMetrics(serverId, this);
93+
nodeToConnectionMetrics.put(serverId, connectionMetrics);
94+
return connectionMetrics;
95+
}
96+
97+
public void removeConnectionMetricGroup(String serverId, ConnectionMetrics connectionMetrics) {
98+
nodeToConnectionMetrics.remove(serverId, connectionMetrics);
99+
}
100+
101+
private double getMetricsAvg(ToLongFunction<ConnectionMetrics.Metrics> metricGetter) {
102+
return nodeToConnectionMetrics.values().stream()
103+
.flatMap(
104+
connectionMetricGroup ->
105+
connectionMetricGroup.metricsByRequestName.values().stream())
106+
.mapToLong(metricGetter)
107+
.average()
108+
.orElse(0);
109+
}
110+
111+
private long getMetricsSum(ToLongFunction<ConnectionMetrics.Metrics> metricGetter) {
112+
return nodeToConnectionMetrics.values().stream()
113+
.flatMap(
114+
connectionMetricGroup ->
115+
connectionMetricGroup.metricsByRequestName.values().stream())
116+
.mapToLong(metricGetter)
117+
.sum();
118+
}
119+
120+
private long getMetricsMax(ToLongFunction<ConnectionMetrics.Metrics> metricGetter) {
121+
return nodeToConnectionMetrics.values().stream()
122+
.flatMap(
123+
connectionMetricGroup ->
124+
connectionMetricGroup.metricsByRequestName.values().stream())
125+
.mapToLong(metricGetter)
126+
.max()
127+
.orElse(0);
54128
}
55129
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java renamed to fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,8 @@
1717

1818
package org.apache.fluss.rpc.metrics;
1919

20-
import org.apache.fluss.metrics.CharacterFilter;
2120
import org.apache.fluss.metrics.Counter;
22-
import org.apache.fluss.metrics.MeterView;
23-
import org.apache.fluss.metrics.MetricNames;
2421
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
25-
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
26-
import org.apache.fluss.metrics.groups.MetricGroup;
27-
import org.apache.fluss.metrics.registry.MetricRegistry;
2822
import org.apache.fluss.rpc.protocol.ApiKeys;
2923
import org.apache.fluss.utils.MapUtils;
3024

@@ -35,38 +29,24 @@
3529
import java.util.Map;
3630
import java.util.concurrent.atomic.AtomicInteger;
3731

38-
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
39-
4032
/** Metrics for ServerConnection with {@link ClientMetricGroup} as parent group. */
41-
public class ConnectionMetricGroup extends AbstractMetricGroup {
33+
public class ConnectionMetrics {
4234
private static final List<ApiKeys> REPORT_API_KEYS =
4335
Arrays.asList(ApiKeys.PRODUCE_LOG, ApiKeys.FETCH_LOG, ApiKeys.PUT_KV, ApiKeys.LOOKUP);
4436

4537
private final String serverId;
38+
private final ClientMetricGroup clientMetricGroup;
4639

4740
/** Metrics for different request/response metrics with specify {@link ApiKeys}. */
48-
private final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
41+
final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
4942

50-
public ConnectionMetricGroup(
51-
MetricRegistry registry, String serverId, ClientMetricGroup parent) {
52-
super(registry, makeScope(parent, serverId), parent);
43+
public ConnectionMetrics(String serverId, ClientMetricGroup clientMetricGroup) {
5344
this.serverId = serverId;
45+
this.clientMetricGroup = clientMetricGroup;
5446
}
5547

56-
@Override
57-
protected void putVariables(Map<String, String> variables) {
58-
variables.put("server_id", serverId);
59-
}
60-
61-
@Override
62-
protected String getGroupName(CharacterFilter filter) {
63-
return "";
64-
}
65-
66-
@Override
67-
protected String createLogicalScope(CharacterFilter filter, char delimiter) {
68-
// ignore this metric group name in logical scope
69-
return parent.getLogicalScope(filter, delimiter);
48+
public void close() {
49+
clientMetricGroup.removeConnectionMetricGroup(serverId, this);
7050
}
7151

7252
// ------------------------------------------------------------------------
@@ -93,16 +73,15 @@ public void updateMetricsAfterGetResponse(ApiKeys apikey, long requestStartTime,
9373
}
9474

9575
@Nullable
96-
private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
76+
Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
9777
if (!REPORT_API_KEYS.contains(apikey)) {
9878
return null;
9979
}
10080

101-
return metricsByRequestName.computeIfAbsent(
102-
apikey.name(), keyName -> new Metrics(this.addGroup("request", keyName)));
81+
return metricsByRequestName.computeIfAbsent(apikey.name(), keyName -> new Metrics());
10382
}
10483

105-
private static final class Metrics {
84+
static final class Metrics {
10685
final Counter requests;
10786
final Counter responses;
10887
final Counter inComingBytes;
@@ -111,18 +90,36 @@ private static final class Metrics {
11190
volatile long requestLatencyMs;
11291
final AtomicInteger requestsInFlight;
11392

114-
private Metrics(MetricGroup metricGroup) {
93+
private Metrics() {
11594
requests = new ThreadSafeSimpleCounter();
116-
metricGroup.meter(MetricNames.CLIENT_REQUESTS_RATE, new MeterView(requests));
11795
responses = new ThreadSafeSimpleCounter();
118-
metricGroup.meter(MetricNames.CLIENT_RESPONSES_RATE, new MeterView(responses));
11996
inComingBytes = new ThreadSafeSimpleCounter();
120-
metricGroup.meter(MetricNames.CLIENT_BYTES_IN_RATE, new MeterView(inComingBytes));
12197
outGoingBytes = new ThreadSafeSimpleCounter();
122-
metricGroup.meter(MetricNames.CLIENT_BYTES_OUT_RATE, new MeterView(outGoingBytes));
123-
metricGroup.gauge(MetricNames.CLIENT_REQUEST_LATENCY_MS, () -> requestLatencyMs);
12498
requestsInFlight = new AtomicInteger(0);
125-
metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, requestsInFlight::get);
99+
}
100+
101+
long requestRate() {
102+
return requests.getCount();
103+
}
104+
105+
long responseRate() {
106+
return responses.getCount();
107+
}
108+
109+
long byteInRate() {
110+
return inComingBytes.getCount();
111+
}
112+
113+
long byteOutRate() {
114+
return outGoingBytes.getCount();
115+
}
116+
117+
long requestLatencyMs() {
118+
return requestLatencyMs;
119+
}
120+
121+
long requestsInFlight() {
122+
return requestsInFlight.get();
126123
}
127124
}
128125
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.fluss.rpc.messages.AuthenticateRequest;
3030
import org.apache.fluss.rpc.messages.AuthenticateResponse;
3131
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
32-
import org.apache.fluss.rpc.metrics.ConnectionMetricGroup;
32+
import org.apache.fluss.rpc.metrics.ConnectionMetrics;
3333
import org.apache.fluss.rpc.protocol.ApiKeys;
3434
import org.apache.fluss.rpc.protocol.ApiManager;
3535
import org.apache.fluss.rpc.protocol.ApiMethod;
@@ -72,7 +72,7 @@ final class ServerConnection {
7272
// TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection"
7373
private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap();
7474
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
75-
private final ConnectionMetricGroup connectionMetricGroup;
75+
private final ConnectionMetrics connectionMetrics;
7676
private final ClientAuthenticator authenticator;
7777
private final ExponentialBackoff backoff;
7878

@@ -108,7 +108,7 @@ final class ServerConnection {
108108
boolean isInnerClient) {
109109
this.node = node;
110110
this.state = ConnectionState.CONNECTING;
111-
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
111+
this.connectionMetrics = clientMetricGroup.createConnectionMetricGroup(node.uid());
112112
this.authenticator = authenticator;
113113
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
114114
whenClose(closeCallback);
@@ -190,7 +190,7 @@ private CompletableFuture<Void> close(Throwable cause) {
190190
closeFuture.completeExceptionally(cause);
191191
}
192192

193-
connectionMetricGroup.close();
193+
connectionMetrics.close();
194194
}
195195

196196
closeQuietly(authenticator);
@@ -220,7 +220,7 @@ public ApiMethod getRequestApiMethod(int requestId) {
220220
public void onRequestResult(int requestId, ApiMessage response) {
221221
InflightRequest request = inflightRequests.remove(requestId);
222222
if (request != null && !request.responseFuture.isDone()) {
223-
connectionMetricGroup.updateMetricsAfterGetResponse(
223+
connectionMetrics.updateMetricsAfterGetResponse(
224224
ApiKeys.forId(request.apiKey),
225225
request.requestStartTime,
226226
response.totalSize());
@@ -232,7 +232,7 @@ public void onRequestResult(int requestId, ApiMessage response) {
232232
public void onRequestFailure(int requestId, Throwable cause) {
233233
InflightRequest request = inflightRequests.remove(requestId);
234234
if (request != null && !request.responseFuture.isDone()) {
235-
connectionMetricGroup.updateMetricsAfterGetResponse(
235+
connectionMetrics.updateMetricsAfterGetResponse(
236236
ApiKeys.forId(request.apiKey), request.requestStartTime, 0);
237237
request.responseFuture.completeExceptionally(cause);
238238
}
@@ -329,14 +329,14 @@ private CompletableFuture<ApiMessage> doSend(
329329
return responseFuture;
330330
}
331331

332-
connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());
332+
connectionMetrics.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());
333333

334334
channel.writeAndFlush(byteBuf)
335335
.addListener(
336336
(ChannelFutureListener)
337337
future -> {
338338
if (!future.isSuccess()) {
339-
connectionMetricGroup.updateMetricsAfterGetResponse(
339+
connectionMetrics.updateMetricsAfterGetResponse(
340340
apiKey, inflight.requestStartTime, 0);
341341
Throwable cause = future.cause();
342342
if (cause instanceof IOException) {
@@ -449,8 +449,10 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
449449
}
450450

451451
private void switchState(ConnectionState targetState) {
452-
LOG.debug("switch state form {} to {}", state, targetState);
453-
state = targetState;
452+
if (state != ConnectionState.DISCONNECTED) {
453+
LOG.debug("switch state form {} to {}", state, targetState);
454+
state = targetState;
455+
}
454456
if (targetState == ConnectionState.READY) {
455457
// process pending requests
456458
PendingRequest pending;

fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
122122

123123
@Override
124124
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
125-
return null;
125+
return CompletableFuture.completedFuture(new LookupResponse());
126126
}
127127

128128
@Override

0 commit comments

Comments
 (0)