Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,22 @@ public class MetricNames {
// metrics for rpc client
// --------------------------------------------------------------------------------------------
public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
public static final String CLIENT_REQUESTS_RATE_AVG = "requestsPerSecond_avg";
public static final String CLIENT_REQUESTS_RATE_TOTAL = "requestsPerSecond_total";
public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
public static final String CLIENT_RESPONSES_RATE_AVG = "responsesPerSecond_avg";
public static final String CLIENT_RESPONSES_RATE_TOTAL = "responsesPerSecond_total";
public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
public static final String CLIENT_BYTES_IN_RATE_AVG = "bytesInPerSecond_avg";
public static final String CLIENT_BYTES_IN_RATE_TOTAL = "bytesInPerSecond_total";
public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
public static final String CLIENT_BYTES_OUT_RATE_AVG = "bytesOutPerSecond_avg";
public static final String CLIENT_BYTES_OUT_RATE_TOTAL = "bytesOutPerSecond_total";
public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
public static final String CLIENT_REQUEST_LATENCY_MS_AVG = "requestLatencyMs_avg";
public static final String CLIENT_REQUEST_LATENCY_MS_MAX = "requestLatencyMs_max";
public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL = "requestsInFlight_total";

// --------------------------------------------------------------------------------------------
// metrics for client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
package org.apache.fluss.rpc.metrics;

import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.metrics.registry.NOPMetricRegistry;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.ToLongFunction;

/** The metric group for clients. */
public class ClientMetricGroup extends AbstractMetricGroup {
private final Set<ConnectionMetricGroup> connectionMetricGroups;

private static final String NAME = "client";

Expand All @@ -33,6 +39,40 @@ public class ClientMetricGroup extends AbstractMetricGroup {
public ClientMetricGroup(MetricRegistry registry, String clientId) {
super(registry, new String[] {NAME}, null);
this.clientId = clientId;
this.connectionMetricGroups = new HashSet<>();
this.gauge(
MetricNames.CLIENT_REQUESTS_RATE_AVG,
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestRate));
this.gauge(
MetricNames.CLIENT_REQUESTS_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetricGroup.Metrics::requestRate));
this.gauge(
MetricNames.CLIENT_RESPONSES_RATE_AVG,
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::responseRate));
this.gauge(
MetricNames.CLIENT_RESPONSES_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetricGroup.Metrics::responseRate));
this.gauge(
MetricNames.CLIENT_BYTES_IN_RATE_AVG,
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::byteInRate));
this.gauge(
MetricNames.CLIENT_BYTES_IN_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetricGroup.Metrics::byteInRate));
this.gauge(
MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::byteOutRate));
this.gauge(
MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetricGroup.Metrics::byteOutRate));
this.gauge(
MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
() -> getMetricsAvg(ConnectionMetricGroup.Metrics::requestLatencyMs));
this.gauge(
MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
() -> getMetricsMax(ConnectionMetricGroup.Metrics::requestLatencyMs));
this.gauge(
MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL,
() -> getMetricsSum(ConnectionMetricGroup.Metrics::requestsInFlight));
}

@Override
Expand All @@ -50,6 +90,43 @@ public MetricRegistry getMetricRegistry() {
}

public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
return new ConnectionMetricGroup(registry, serverId, this);
// Only expose aggregate metrics to reduce the reporter pressure.
ConnectionMetricGroup connectionMetricGroup =
new ConnectionMetricGroup(NOPMetricRegistry.INSTANCE, serverId, this);
connectionMetricGroups.add(connectionMetricGroup);
return connectionMetricGroup;
}

public void removeConnectionMetricGroup(ConnectionMetricGroup connectionMetricGroup) {
connectionMetricGroups.remove(connectionMetricGroup);
}

private double getMetricsAvg(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
return connectionMetricGroups.stream()
.flatMap(
connectionMetricGroup ->
connectionMetricGroup.metricsByRequestName.values().stream())
.mapToLong(metricGetter)
.average()
.orElse(0);
}

private long getMetricsSum(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
return connectionMetricGroups.stream()
.flatMap(
connectionMetricGroup ->
connectionMetricGroup.metricsByRequestName.values().stream())
.mapToLong(metricGetter)
.sum();
}

private long getMetricsMax(ToLongFunction<ConnectionMetricGroup.Metrics> metricGetter) {
return connectionMetricGroups.stream()
.flatMap(
connectionMetricGroup ->
connectionMetricGroup.metricsByRequestName.values().stream())
.mapToLong(metricGetter)
.max()
.orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ConnectionMetricGroup extends AbstractMetricGroup {
private final String serverId;

/** Metrics for different request/response metrics with specify {@link ApiKeys}. */
private final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();

public ConnectionMetricGroup(
MetricRegistry registry, String serverId, ClientMetricGroup parent) {
Expand Down Expand Up @@ -93,7 +93,7 @@ public void updateMetricsAfterGetResponse(ApiKeys apikey, long requestStartTime,
}

@Nullable
private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
if (!REPORT_API_KEYS.contains(apikey)) {
return null;
}
Expand All @@ -102,7 +102,7 @@ private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
apikey.name(), keyName -> new Metrics(this.addGroup("request", keyName)));
}

private static final class Metrics {
static final class Metrics {
final Counter requests;
final Counter responses;
final Counter inComingBytes;
Expand All @@ -124,5 +124,29 @@ private Metrics(MetricGroup metricGroup) {
requestsInFlight = new AtomicInteger(0);
metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, requestsInFlight::get);
}

long requestRate() {
return requests.getCount();
}

long responseRate() {
return responses.getCount();
}

long byteInRate() {
return inComingBytes.getCount();
}

long byteOutRate() {
return outGoingBytes.getCount();
}

long requestLatencyMs() {
return requestLatencyMs;
}

long requestsInFlight() {
return requestsInFlight.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ final class ServerConnection {
// TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection"
private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ClientMetricGroup clientMetricGroup;
private final ConnectionMetricGroup connectionMetricGroup;
private final ClientAuthenticator authenticator;
private final ExponentialBackoff backoff;
Expand Down Expand Up @@ -108,6 +109,7 @@ final class ServerConnection {
boolean isInnerClient) {
this.node = node;
this.state = ConnectionState.CONNECTING;
this.clientMetricGroup = clientMetricGroup;
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
this.authenticator = authenticator;
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
Expand Down Expand Up @@ -190,6 +192,7 @@ private CompletableFuture<Void> close(Throwable cause) {
closeFuture.completeExceptionally(cause);
}

clientMetricGroup.removeConnectionMetricGroup(connectionMetricGroup);
connectionMetricGroup.close();
}

Expand Down Expand Up @@ -449,8 +452,10 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
}

private void switchState(ConnectionState targetState) {
LOG.debug("switch state form {} to {}", state, targetState);
state = targetState;
if (state != ConnectionState.DISCONNECTED) {
LOG.debug("switch state form {} to {}", state, targetState);
state = targetState;
}
if (targetState == ConnectionState.READY) {
// process pending requests
PendingRequest pending;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {

@Override
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
return null;
return CompletableFuture.completedFuture(new LookupResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.DisconnectException;
import org.apache.fluss.metrics.Gauge;
import org.apache.fluss.metrics.Metric;
import org.apache.fluss.metrics.MetricType;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
import org.apache.fluss.metrics.util.NOPMetricsGroup;
import org.apache.fluss.rpc.TestingGatewayService;
import org.apache.fluss.rpc.TestingTabletGatewayService;
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
import org.apache.fluss.rpc.messages.PbTablePath;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
import org.apache.fluss.rpc.netty.server.NettyServer;
Expand All @@ -42,9 +51,23 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ExecutionException;

import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL;
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_AVG;
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL;
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL;
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_AVG;
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_TOTAL;
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG;
import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX;
import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_AVG;
import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_TOTAL;
import static org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
import static org.apache.fluss.utils.NetUtils.getAvailablePort;
Expand All @@ -60,6 +83,7 @@ public class ServerConnectionTest {
private Configuration conf;
private NettyServer nettyServer;
private ServerNode serverNode;
private ServerNode serverNode2;
private TestingGatewayService service;

@BeforeEach
Expand Down Expand Up @@ -121,22 +145,99 @@ void testConnectionClose() {
assertThat(future.isDone()).isTrue();
}

@Test
void testConnectionMetrics() throws ExecutionException, InterruptedException {
MockMetricRegistry metricRegistry = new MockMetricRegistry();
ClientMetricGroup client = new ClientMetricGroup(metricRegistry, "client");
ServerConnection connection =
new ServerConnection(
bootstrap,
serverNode,
client,
clientAuthenticator,
(con, ignore) -> {},
false);
ServerConnection connection2 =
new ServerConnection(
bootstrap,
serverNode2,
client,
clientAuthenticator,
(con, ignore) -> {},
false);
LookupRequest request = new LookupRequest().setTableId(1);
PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
pbLookupReqForBucket.setBucketId(1);
assertThat(metricRegistry.registeredMetrics).hasSize(11);

connection.send(ApiKeys.LOOKUP, request).get();
connection2.send(ApiKeys.LOOKUP, request).get();

assertThat(metricRegistry.registeredMetrics).hasSize(11);
assertThat(metricRegistry.registeredMetrics.keySet())
.containsExactlyInAnyOrder(
CLIENT_REQUESTS_RATE_AVG,
CLIENT_REQUESTS_RATE_TOTAL,
CLIENT_RESPONSES_RATE_AVG,
CLIENT_RESPONSES_RATE_TOTAL,
CLIENT_BYTES_IN_RATE_AVG,
CLIENT_BYTES_IN_RATE_TOTAL,
CLIENT_BYTES_OUT_RATE_AVG,
CLIENT_BYTES_OUT_RATE_TOTAL,
CLIENT_REQUEST_LATENCY_MS_AVG,
CLIENT_REQUEST_LATENCY_MS_MAX,
CLIENT_REQUESTS_IN_FLIGHT_TOTAL);
Metric metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_AVG);
assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
assertThat(((Gauge<?>) metric).getValue()).isEqualTo(1.0);
metric = metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_TOTAL);
assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
assertThat(((Gauge<?>) metric).getValue()).isEqualTo(2L);
connection.close().get();
}

private void buildNettyServer(int serverId) throws Exception {
try (NetUtils.Port availablePort = getAvailablePort()) {
try (NetUtils.Port availablePort = getAvailablePort();
NetUtils.Port availablePort2 = getAvailablePort()) {
serverNode =
new ServerNode(
serverId, "localhost", availablePort.getPort(), ServerType.COORDINATOR);
service = new TestingGatewayService();
serverId,
"localhost",
availablePort.getPort(),
ServerType.TABLET_SERVER);
serverNode2 =
new ServerNode(
serverId,
"localhost",
availablePort2.getPort(),
ServerType.TABLET_SERVER);
service = new TestingTabletGatewayService();
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
nettyServer =
new NettyServer(
conf,
Collections.singleton(
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL")),
Arrays.asList(
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL"),
new Endpoint(serverNode2.host(), serverNode2.port(), "CLIENT")),
service,
metricGroup,
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
nettyServer.start();
}
}

private static class MockMetricRegistry extends NOPMetricRegistry {

Map<String, Metric> registeredMetrics = new HashMap<>();

@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
registeredMetrics.put(metricName, metric);
}

@Override
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
registeredMetrics.remove(metricName, metric);
}
}
}