Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ public class MetricNames {
// --------------------------------------------------------------------------------------------
// metrics for rpc client
// --------------------------------------------------------------------------------------------
public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
public static final String CLIENT_REQUESTS_RATE_AVG = "requestsPerSecond_avg";
public static final String CLIENT_REQUESTS_RATE_TOTAL = "requestsPerSecond_total";
public static final String CLIENT_RESPONSES_RATE_AVG = "responsesPerSecond_avg";
public static final String CLIENT_RESPONSES_RATE_TOTAL = "responsesPerSecond_total";
public static final String CLIENT_BYTES_IN_RATE_AVG = "bytesInPerSecond_avg";
public static final String CLIENT_BYTES_IN_RATE_TOTAL = "bytesInPerSecond_total";
public static final String CLIENT_BYTES_OUT_RATE_AVG = "bytesOutPerSecond_avg";
public static final String CLIENT_BYTES_OUT_RATE_TOTAL = "bytesOutPerSecond_total";
public static final String CLIENT_REQUEST_LATENCY_MS_AVG = "requestLatencyMs_avg";
public static final String CLIENT_REQUEST_LATENCY_MS_MAX = "requestLatencyMs_max";
public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL = "requestsInFlight_total";

// --------------------------------------------------------------------------------------------
// metrics for client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package org.apache.fluss.rpc.metrics;

import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.utils.MapUtils;

import java.util.Map;
import java.util.function.ToLongFunction;

/** The metric group for clients. */
public class ClientMetricGroup extends AbstractMetricGroup {
private final Map<String, ConnectionMetrics> nodeToConnectionMetrics =
MapUtils.newConcurrentHashMap();

private static final String NAME = "client";

Expand All @@ -33,6 +38,39 @@ public class ClientMetricGroup extends AbstractMetricGroup {
public ClientMetricGroup(MetricRegistry registry, String clientId) {
super(registry, new String[] {NAME}, null);
this.clientId = clientId;
this.gauge(
MetricNames.CLIENT_REQUESTS_RATE_AVG,
() -> getMetricsAvg(ConnectionMetrics.Metrics::requestRate));
this.gauge(
MetricNames.CLIENT_REQUESTS_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetrics.Metrics::requestRate));
this.gauge(
MetricNames.CLIENT_RESPONSES_RATE_AVG,
() -> getMetricsAvg(ConnectionMetrics.Metrics::responseRate));
this.gauge(
MetricNames.CLIENT_RESPONSES_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetrics.Metrics::responseRate));
this.gauge(
MetricNames.CLIENT_BYTES_IN_RATE_AVG,
() -> getMetricsAvg(ConnectionMetrics.Metrics::byteInRate));
this.gauge(
MetricNames.CLIENT_BYTES_IN_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetrics.Metrics::byteInRate));
this.gauge(
MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
() -> getMetricsAvg(ConnectionMetrics.Metrics::byteOutRate));
this.gauge(
MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL,
() -> getMetricsSum(ConnectionMetrics.Metrics::byteOutRate));
this.gauge(
MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
() -> getMetricsAvg(ConnectionMetrics.Metrics::requestLatencyMs));
this.gauge(
MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
() -> getMetricsMax(ConnectionMetrics.Metrics::requestLatencyMs));
this.gauge(
MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL,
() -> getMetricsSum(ConnectionMetrics.Metrics::requestsInFlight));
}

@Override
Expand All @@ -49,7 +87,43 @@ public MetricRegistry getMetricRegistry() {
return registry;
}

public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
return new ConnectionMetricGroup(registry, serverId, this);
public ConnectionMetrics createConnectionMetricGroup(String serverId) {
// Only expose aggregate metrics to reduce the reporter pressure.
ConnectionMetrics connectionMetrics = new ConnectionMetrics(serverId, this);
nodeToConnectionMetrics.put(serverId, connectionMetrics);
return connectionMetrics;
}

public void removeConnectionMetricGroup(String serverId, ConnectionMetrics connectionMetrics) {
nodeToConnectionMetrics.remove(serverId, connectionMetrics);
}

private double getMetricsAvg(ToLongFunction<ConnectionMetrics.Metrics> metricGetter) {
return nodeToConnectionMetrics.values().stream()
.flatMap(
connectionMetricGroup ->
connectionMetricGroup.metricsByRequestName.values().stream())
.mapToLong(metricGetter)
.average()
.orElse(0);
}

private long getMetricsSum(ToLongFunction<ConnectionMetrics.Metrics> metricGetter) {
return nodeToConnectionMetrics.values().stream()
.flatMap(
connectionMetricGroup ->
connectionMetricGroup.metricsByRequestName.values().stream())
.mapToLong(metricGetter)
.sum();
}

private long getMetricsMax(ToLongFunction<ConnectionMetrics.Metrics> metricGetter) {
return nodeToConnectionMetrics.values().stream()
.flatMap(
connectionMetricGroup ->
connectionMetricGroup.metricsByRequestName.values().stream())
.mapToLong(metricGetter)
.max()
.orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@

package org.apache.fluss.rpc.metrics;

import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.Counter;
import org.apache.fluss.metrics.MeterView;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.utils.MapUtils;

Expand All @@ -35,38 +29,24 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;

/** Metrics for ServerConnection with {@link ClientMetricGroup} as parent group. */
public class ConnectionMetricGroup extends AbstractMetricGroup {
public class ConnectionMetrics {
private static final List<ApiKeys> REPORT_API_KEYS =
Arrays.asList(ApiKeys.PRODUCE_LOG, ApiKeys.FETCH_LOG, ApiKeys.PUT_KV, ApiKeys.LOOKUP);

private final String serverId;
private final ClientMetricGroup clientMetricGroup;

/** Metrics for different request/response metrics with specify {@link ApiKeys}. */
private final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();
final Map<String, Metrics> metricsByRequestName = MapUtils.newConcurrentHashMap();

public ConnectionMetricGroup(
MetricRegistry registry, String serverId, ClientMetricGroup parent) {
super(registry, makeScope(parent, serverId), parent);
public ConnectionMetrics(String serverId, ClientMetricGroup clientMetricGroup) {
this.serverId = serverId;
this.clientMetricGroup = clientMetricGroup;
}

@Override
protected void putVariables(Map<String, String> variables) {
variables.put("server_id", serverId);
}

@Override
protected String getGroupName(CharacterFilter filter) {
return "";
}

@Override
protected String createLogicalScope(CharacterFilter filter, char delimiter) {
// ignore this metric group name in logical scope
return parent.getLogicalScope(filter, delimiter);
public void close() {
clientMetricGroup.removeConnectionMetricGroup(serverId, this);
}

// ------------------------------------------------------------------------
Expand All @@ -93,16 +73,15 @@ public void updateMetricsAfterGetResponse(ApiKeys apikey, long requestStartTime,
}

@Nullable
private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
if (!REPORT_API_KEYS.contains(apikey)) {
return null;
}

return metricsByRequestName.computeIfAbsent(
apikey.name(), keyName -> new Metrics(this.addGroup("request", keyName)));
return metricsByRequestName.computeIfAbsent(apikey.name(), keyName -> new Metrics());
}

private static final class Metrics {
static final class Metrics {
final Counter requests;
final Counter responses;
final Counter inComingBytes;
Expand All @@ -111,18 +90,36 @@ private static final class Metrics {
volatile long requestLatencyMs;
final AtomicInteger requestsInFlight;

private Metrics(MetricGroup metricGroup) {
private Metrics() {
requests = new ThreadSafeSimpleCounter();
metricGroup.meter(MetricNames.CLIENT_REQUESTS_RATE, new MeterView(requests));
responses = new ThreadSafeSimpleCounter();
metricGroup.meter(MetricNames.CLIENT_RESPONSES_RATE, new MeterView(responses));
inComingBytes = new ThreadSafeSimpleCounter();
metricGroup.meter(MetricNames.CLIENT_BYTES_IN_RATE, new MeterView(inComingBytes));
outGoingBytes = new ThreadSafeSimpleCounter();
metricGroup.meter(MetricNames.CLIENT_BYTES_OUT_RATE, new MeterView(outGoingBytes));
metricGroup.gauge(MetricNames.CLIENT_REQUEST_LATENCY_MS, () -> requestLatencyMs);
requestsInFlight = new AtomicInteger(0);
metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, requestsInFlight::get);
}

long requestRate() {
return requests.getCount();
}

long responseRate() {
return responses.getCount();
}

long byteInRate() {
return inComingBytes.getCount();
}

long byteOutRate() {
return outGoingBytes.getCount();
}

long requestLatencyMs() {
return requestLatencyMs;
}

long requestsInFlight() {
return requestsInFlight.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.fluss.rpc.messages.AuthenticateRequest;
import org.apache.fluss.rpc.messages.AuthenticateResponse;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.rpc.metrics.ConnectionMetricGroup;
import org.apache.fluss.rpc.metrics.ConnectionMetrics;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.rpc.protocol.ApiManager;
import org.apache.fluss.rpc.protocol.ApiMethod;
Expand Down Expand Up @@ -72,7 +72,7 @@ final class ServerConnection {
// TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection"
private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ConnectionMetricGroup connectionMetricGroup;
private final ConnectionMetrics connectionMetrics;
private final ClientAuthenticator authenticator;
private final ExponentialBackoff backoff;

Expand Down Expand Up @@ -108,7 +108,7 @@ final class ServerConnection {
boolean isInnerClient) {
this.node = node;
this.state = ConnectionState.CONNECTING;
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
this.connectionMetrics = clientMetricGroup.createConnectionMetricGroup(node.uid());
this.authenticator = authenticator;
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
whenClose(closeCallback);
Expand Down Expand Up @@ -190,7 +190,7 @@ private CompletableFuture<Void> close(Throwable cause) {
closeFuture.completeExceptionally(cause);
}

connectionMetricGroup.close();
connectionMetrics.close();
}

closeQuietly(authenticator);
Expand Down Expand Up @@ -220,7 +220,7 @@ public ApiMethod getRequestApiMethod(int requestId) {
public void onRequestResult(int requestId, ApiMessage response) {
InflightRequest request = inflightRequests.remove(requestId);
if (request != null && !request.responseFuture.isDone()) {
connectionMetricGroup.updateMetricsAfterGetResponse(
connectionMetrics.updateMetricsAfterGetResponse(
ApiKeys.forId(request.apiKey),
request.requestStartTime,
response.totalSize());
Expand All @@ -232,7 +232,7 @@ public void onRequestResult(int requestId, ApiMessage response) {
public void onRequestFailure(int requestId, Throwable cause) {
InflightRequest request = inflightRequests.remove(requestId);
if (request != null && !request.responseFuture.isDone()) {
connectionMetricGroup.updateMetricsAfterGetResponse(
connectionMetrics.updateMetricsAfterGetResponse(
ApiKeys.forId(request.apiKey), request.requestStartTime, 0);
request.responseFuture.completeExceptionally(cause);
}
Expand Down Expand Up @@ -329,14 +329,14 @@ private CompletableFuture<ApiMessage> doSend(
return responseFuture;
}

connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());
connectionMetrics.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());

channel.writeAndFlush(byteBuf)
.addListener(
(ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
connectionMetricGroup.updateMetricsAfterGetResponse(
connectionMetrics.updateMetricsAfterGetResponse(
apiKey, inflight.requestStartTime, 0);
Throwable cause = future.cause();
if (cause instanceof IOException) {
Expand Down Expand Up @@ -449,8 +449,10 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
}

private void switchState(ConnectionState targetState) {
LOG.debug("switch state form {} to {}", state, targetState);
state = targetState;
if (state != ConnectionState.DISCONNECTED) {
LOG.debug("switch state form {} to {}", state, targetState);
state = targetState;
}
if (targetState == ConnectionState.READY) {
// process pending requests
PendingRequest pending;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {

@Override
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
return null;
return CompletableFuture.completedFuture(new LookupResponse());
}

@Override
Expand Down
Loading