diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index be8d644e7d..4008c6b19e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -143,12 +143,17 @@ public class MetricNames { // -------------------------------------------------------------------------------------------- // metrics for rpc client // -------------------------------------------------------------------------------------------- - public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond"; - public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond"; - public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond"; - public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond"; - public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs"; - public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight"; + public static final String CLIENT_REQUESTS_RATE_AVG = "requestsPerSecond_avg"; + public static final String CLIENT_REQUESTS_RATE_TOTAL = "requestsPerSecond_total"; + public static final String CLIENT_RESPONSES_RATE_AVG = "responsesPerSecond_avg"; + public static final String CLIENT_RESPONSES_RATE_TOTAL = "responsesPerSecond_total"; + public static final String CLIENT_BYTES_IN_RATE_AVG = "bytesInPerSecond_avg"; + public static final String CLIENT_BYTES_IN_RATE_TOTAL = "bytesInPerSecond_total"; + public static final String CLIENT_BYTES_OUT_RATE_AVG = "bytesOutPerSecond_avg"; + public static final String CLIENT_BYTES_OUT_RATE_TOTAL = "bytesOutPerSecond_total"; + public static final String CLIENT_REQUEST_LATENCY_MS_AVG = "requestLatencyMs_avg"; + public static final String CLIENT_REQUEST_LATENCY_MS_MAX = "requestLatencyMs_max"; + public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL = "requestsInFlight_total"; // -------------------------------------------------------------------------------------------- // metrics for client diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java index 163ee8fced..02abeaac7d 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java @@ -18,13 +18,18 @@ package org.apache.fluss.rpc.metrics; import org.apache.fluss.metrics.CharacterFilter; +import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.utils.MapUtils; import java.util.Map; +import java.util.function.ToLongFunction; /** The metric group for clients. */ public class ClientMetricGroup extends AbstractMetricGroup { + private final Map nodeToConnectionMetrics = + MapUtils.newConcurrentHashMap(); private static final String NAME = "client"; @@ -33,6 +38,39 @@ public class ClientMetricGroup extends AbstractMetricGroup { public ClientMetricGroup(MetricRegistry registry, String clientId) { super(registry, new String[] {NAME}, null); this.clientId = clientId; + this.gauge( + MetricNames.CLIENT_REQUESTS_RATE_AVG, + () -> getMetricsAvg(ConnectionMetrics.Metrics::requestRate)); + this.gauge( + MetricNames.CLIENT_REQUESTS_RATE_TOTAL, + () -> getMetricsSum(ConnectionMetrics.Metrics::requestRate)); + this.gauge( + MetricNames.CLIENT_RESPONSES_RATE_AVG, + () -> getMetricsAvg(ConnectionMetrics.Metrics::responseRate)); + this.gauge( + MetricNames.CLIENT_RESPONSES_RATE_TOTAL, + () -> getMetricsSum(ConnectionMetrics.Metrics::responseRate)); + this.gauge( + MetricNames.CLIENT_BYTES_IN_RATE_AVG, + () -> getMetricsAvg(ConnectionMetrics.Metrics::byteInRate)); + this.gauge( + MetricNames.CLIENT_BYTES_IN_RATE_TOTAL, + () -> getMetricsSum(ConnectionMetrics.Metrics::byteInRate)); + this.gauge( + MetricNames.CLIENT_BYTES_OUT_RATE_AVG, + () -> getMetricsAvg(ConnectionMetrics.Metrics::byteOutRate)); + this.gauge( + MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL, + () -> getMetricsSum(ConnectionMetrics.Metrics::byteOutRate)); + this.gauge( + MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG, + () -> getMetricsAvg(ConnectionMetrics.Metrics::requestLatencyMs)); + this.gauge( + MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX, + () -> getMetricsMax(ConnectionMetrics.Metrics::requestLatencyMs)); + this.gauge( + MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL, + () -> getMetricsSum(ConnectionMetrics.Metrics::requestsInFlight)); } @Override @@ -49,7 +87,43 @@ public MetricRegistry getMetricRegistry() { return registry; } - public ConnectionMetricGroup createConnectionMetricGroup(String serverId) { - return new ConnectionMetricGroup(registry, serverId, this); + public ConnectionMetrics createConnectionMetricGroup(String serverId) { + // Only expose aggregate metrics to reduce the reporter pressure. + ConnectionMetrics connectionMetrics = new ConnectionMetrics(serverId, this); + nodeToConnectionMetrics.put(serverId, connectionMetrics); + return connectionMetrics; + } + + public void removeConnectionMetricGroup(String serverId, ConnectionMetrics connectionMetrics) { + nodeToConnectionMetrics.remove(serverId, connectionMetrics); + } + + private double getMetricsAvg(ToLongFunction metricGetter) { + return nodeToConnectionMetrics.values().stream() + .flatMap( + connectionMetricGroup -> + connectionMetricGroup.metricsByRequestName.values().stream()) + .mapToLong(metricGetter) + .average() + .orElse(0); + } + + private long getMetricsSum(ToLongFunction metricGetter) { + return nodeToConnectionMetrics.values().stream() + .flatMap( + connectionMetricGroup -> + connectionMetricGroup.metricsByRequestName.values().stream()) + .mapToLong(metricGetter) + .sum(); + } + + private long getMetricsMax(ToLongFunction metricGetter) { + return nodeToConnectionMetrics.values().stream() + .flatMap( + connectionMetricGroup -> + connectionMetricGroup.metricsByRequestName.values().stream()) + .mapToLong(metricGetter) + .max() + .orElse(0); } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java similarity index 61% rename from fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java rename to fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java index 849e6a22a7..6b1fd90648 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java @@ -17,14 +17,8 @@ package org.apache.fluss.rpc.metrics; -import org.apache.fluss.metrics.CharacterFilter; import org.apache.fluss.metrics.Counter; -import org.apache.fluss.metrics.MeterView; -import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.ThreadSafeSimpleCounter; -import org.apache.fluss.metrics.groups.AbstractMetricGroup; -import org.apache.fluss.metrics.groups.MetricGroup; -import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.utils.MapUtils; @@ -35,38 +29,24 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; - /** Metrics for ServerConnection with {@link ClientMetricGroup} as parent group. */ -public class ConnectionMetricGroup extends AbstractMetricGroup { +public class ConnectionMetrics { private static final List REPORT_API_KEYS = Arrays.asList(ApiKeys.PRODUCE_LOG, ApiKeys.FETCH_LOG, ApiKeys.PUT_KV, ApiKeys.LOOKUP); private final String serverId; + private final ClientMetricGroup clientMetricGroup; /** Metrics for different request/response metrics with specify {@link ApiKeys}. */ - private final Map metricsByRequestName = MapUtils.newConcurrentHashMap(); + final Map metricsByRequestName = MapUtils.newConcurrentHashMap(); - public ConnectionMetricGroup( - MetricRegistry registry, String serverId, ClientMetricGroup parent) { - super(registry, makeScope(parent, serverId), parent); + public ConnectionMetrics(String serverId, ClientMetricGroup clientMetricGroup) { this.serverId = serverId; + this.clientMetricGroup = clientMetricGroup; } - @Override - protected void putVariables(Map variables) { - variables.put("server_id", serverId); - } - - @Override - protected String getGroupName(CharacterFilter filter) { - return ""; - } - - @Override - protected String createLogicalScope(CharacterFilter filter, char delimiter) { - // ignore this metric group name in logical scope - return parent.getLogicalScope(filter, delimiter); + public void close() { + clientMetricGroup.removeConnectionMetricGroup(serverId, this); } // ------------------------------------------------------------------------ @@ -93,16 +73,15 @@ public void updateMetricsAfterGetResponse(ApiKeys apikey, long requestStartTime, } @Nullable - private Metrics getOrCreateRequestMetrics(ApiKeys apikey) { + Metrics getOrCreateRequestMetrics(ApiKeys apikey) { if (!REPORT_API_KEYS.contains(apikey)) { return null; } - return metricsByRequestName.computeIfAbsent( - apikey.name(), keyName -> new Metrics(this.addGroup("request", keyName))); + return metricsByRequestName.computeIfAbsent(apikey.name(), keyName -> new Metrics()); } - private static final class Metrics { + static final class Metrics { final Counter requests; final Counter responses; final Counter inComingBytes; @@ -111,18 +90,36 @@ private static final class Metrics { volatile long requestLatencyMs; final AtomicInteger requestsInFlight; - private Metrics(MetricGroup metricGroup) { + private Metrics() { requests = new ThreadSafeSimpleCounter(); - metricGroup.meter(MetricNames.CLIENT_REQUESTS_RATE, new MeterView(requests)); responses = new ThreadSafeSimpleCounter(); - metricGroup.meter(MetricNames.CLIENT_RESPONSES_RATE, new MeterView(responses)); inComingBytes = new ThreadSafeSimpleCounter(); - metricGroup.meter(MetricNames.CLIENT_BYTES_IN_RATE, new MeterView(inComingBytes)); outGoingBytes = new ThreadSafeSimpleCounter(); - metricGroup.meter(MetricNames.CLIENT_BYTES_OUT_RATE, new MeterView(outGoingBytes)); - metricGroup.gauge(MetricNames.CLIENT_REQUEST_LATENCY_MS, () -> requestLatencyMs); requestsInFlight = new AtomicInteger(0); - metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, requestsInFlight::get); + } + + long requestRate() { + return requests.getCount(); + } + + long responseRate() { + return responses.getCount(); + } + + long byteInRate() { + return inComingBytes.getCount(); + } + + long byteOutRate() { + return outGoingBytes.getCount(); + } + + long requestLatencyMs() { + return requestLatencyMs; + } + + long requestsInFlight() { + return requestsInFlight.get(); } } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index 3aa85e24ff..a09a50c097 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -29,7 +29,7 @@ import org.apache.fluss.rpc.messages.AuthenticateRequest; import org.apache.fluss.rpc.messages.AuthenticateResponse; import org.apache.fluss.rpc.metrics.ClientMetricGroup; -import org.apache.fluss.rpc.metrics.ConnectionMetricGroup; +import org.apache.fluss.rpc.metrics.ConnectionMetrics; import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.rpc.protocol.ApiManager; import org.apache.fluss.rpc.protocol.ApiMethod; @@ -72,7 +72,7 @@ final class ServerConnection { // TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection" private final Map inflightRequests = MapUtils.newConcurrentHashMap(); private final CompletableFuture closeFuture = new CompletableFuture<>(); - private final ConnectionMetricGroup connectionMetricGroup; + private final ConnectionMetrics connectionMetrics; private final ClientAuthenticator authenticator; private final ExponentialBackoff backoff; @@ -108,7 +108,7 @@ final class ServerConnection { boolean isInnerClient) { this.node = node; this.state = ConnectionState.CONNECTING; - this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid()); + this.connectionMetrics = clientMetricGroup.createConnectionMetricGroup(node.uid()); this.authenticator = authenticator; this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2); whenClose(closeCallback); @@ -190,7 +190,7 @@ private CompletableFuture close(Throwable cause) { closeFuture.completeExceptionally(cause); } - connectionMetricGroup.close(); + connectionMetrics.close(); } closeQuietly(authenticator); @@ -220,7 +220,7 @@ public ApiMethod getRequestApiMethod(int requestId) { public void onRequestResult(int requestId, ApiMessage response) { InflightRequest request = inflightRequests.remove(requestId); if (request != null && !request.responseFuture.isDone()) { - connectionMetricGroup.updateMetricsAfterGetResponse( + connectionMetrics.updateMetricsAfterGetResponse( ApiKeys.forId(request.apiKey), request.requestStartTime, response.totalSize()); @@ -232,7 +232,7 @@ public void onRequestResult(int requestId, ApiMessage response) { public void onRequestFailure(int requestId, Throwable cause) { InflightRequest request = inflightRequests.remove(requestId); if (request != null && !request.responseFuture.isDone()) { - connectionMetricGroup.updateMetricsAfterGetResponse( + connectionMetrics.updateMetricsAfterGetResponse( ApiKeys.forId(request.apiKey), request.requestStartTime, 0); request.responseFuture.completeExceptionally(cause); } @@ -329,14 +329,14 @@ private CompletableFuture doSend( return responseFuture; } - connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize()); + connectionMetrics.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize()); channel.writeAndFlush(byteBuf) .addListener( (ChannelFutureListener) future -> { if (!future.isSuccess()) { - connectionMetricGroup.updateMetricsAfterGetResponse( + connectionMetrics.updateMetricsAfterGetResponse( apiKey, inflight.requestStartTime, 0); Throwable cause = future.cause(); if (cause instanceof IOException) { @@ -449,8 +449,10 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) { } private void switchState(ConnectionState targetState) { - LOG.debug("switch state form {} to {}", state, targetState); - state = targetState; + if (state != ConnectionState.DISCONNECTED) { + LOG.debug("switch state form {} to {}", state, targetState); + state = targetState; + } if (targetState == ConnectionState.READY) { // process pending requests PendingRequest pending; diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 30ef735cd8..7db3654383 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -122,7 +122,7 @@ public CompletableFuture putKv(PutKvRequest request) { @Override public CompletableFuture lookup(LookupRequest request) { - return null; + return CompletableFuture.completedFuture(new LookupResponse()); } @Override diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java index 6b2363c883..554926ac47 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java @@ -22,11 +22,20 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.DisconnectException; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.MetricType; +import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.registry.NOPMetricRegistry; import org.apache.fluss.metrics.util.NOPMetricsGroup; import org.apache.fluss.rpc.TestingGatewayService; +import org.apache.fluss.rpc.TestingTabletGatewayService; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; +import org.apache.fluss.rpc.messages.LookupRequest; +import org.apache.fluss.rpc.messages.PbLookupReqForBucket; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.metrics.ClientMetricGroup; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState; import org.apache.fluss.rpc.netty.server.NettyServer; @@ -42,9 +51,23 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Collections; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; - +import java.util.concurrent.ExecutionException; + +import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG; +import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL; +import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_AVG; +import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL; +import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL; +import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_AVG; +import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_TOTAL; +import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG; +import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX; +import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_AVG; +import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_TOTAL; import static org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass; import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup; import static org.apache.fluss.utils.NetUtils.getAvailablePort; @@ -60,12 +83,13 @@ public class ServerConnectionTest { private Configuration conf; private NettyServer nettyServer; private ServerNode serverNode; + private ServerNode serverNode2; private TestingGatewayService service; @BeforeEach void setUp() throws Exception { conf = new Configuration(); - buildNettyServer(0); + buildNettyServer(); eventLoopGroup = newEventLoopGroup(1, "fluss-netty-client-test"); bootstrap = @@ -121,22 +145,93 @@ void testConnectionClose() { assertThat(future.isDone()).isTrue(); } - private void buildNettyServer(int serverId) throws Exception { - try (NetUtils.Port availablePort = getAvailablePort()) { + @Test + void testConnectionMetrics() throws ExecutionException, InterruptedException { + MockMetricRegistry metricRegistry = new MockMetricRegistry(); + ClientMetricGroup client = new ClientMetricGroup(metricRegistry, "client"); + ServerConnection connection = + new ServerConnection( + bootstrap, + serverNode, + client, + clientAuthenticator, + (con, ignore) -> {}, + false); + ServerConnection connection2 = + new ServerConnection( + bootstrap, + serverNode2, + client, + clientAuthenticator, + (con, ignore) -> {}, + false); + LookupRequest request = new LookupRequest().setTableId(1); + PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq(); + pbLookupReqForBucket.setBucketId(1); + assertThat(metricRegistry.registeredMetrics).hasSize(11); + + connection.send(ApiKeys.LOOKUP, request).get(); + connection2.send(ApiKeys.LOOKUP, request).get(); + + assertThat(metricRegistry.registeredMetrics).hasSize(11); + assertThat(metricRegistry.registeredMetrics.keySet()) + .containsExactlyInAnyOrder( + CLIENT_REQUESTS_RATE_AVG, + CLIENT_REQUESTS_RATE_TOTAL, + CLIENT_RESPONSES_RATE_AVG, + CLIENT_RESPONSES_RATE_TOTAL, + CLIENT_BYTES_IN_RATE_AVG, + CLIENT_BYTES_IN_RATE_TOTAL, + CLIENT_BYTES_OUT_RATE_AVG, + CLIENT_BYTES_OUT_RATE_TOTAL, + CLIENT_REQUEST_LATENCY_MS_AVG, + CLIENT_REQUEST_LATENCY_MS_MAX, + CLIENT_REQUESTS_IN_FLIGHT_TOTAL); + Metric metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_AVG); + assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE); + assertThat(((Gauge) metric).getValue()).isEqualTo(1.0); + metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_TOTAL); + assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE); + assertThat(((Gauge) metric).getValue()).isEqualTo(2L); + connection.close().get(); + } + + private void buildNettyServer() throws Exception { + try (NetUtils.Port availablePort = getAvailablePort(); + NetUtils.Port availablePort2 = getAvailablePort()) { serverNode = new ServerNode( - serverId, "localhost", availablePort.getPort(), ServerType.COORDINATOR); - service = new TestingGatewayService(); + 1, "localhost", availablePort.getPort(), ServerType.TABLET_SERVER); + serverNode2 = + new ServerNode( + 2, "localhost", availablePort2.getPort(), ServerType.TABLET_SERVER); + service = new TestingTabletGatewayService(); MetricGroup metricGroup = NOPMetricsGroup.newInstance(); nettyServer = new NettyServer( conf, - Collections.singleton( - new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL")), + Arrays.asList( + new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL"), + new Endpoint(serverNode2.host(), serverNode2.port(), "CLIENT")), service, metricGroup, RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup)); nettyServer.start(); } } + + private static class MockMetricRegistry extends NOPMetricRegistry { + + Map registeredMetrics = new HashMap<>(); + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + registeredMetrics.put(metricName, metric); + } + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + registeredMetrics.remove(metricName, metric); + } + } }