Skip to content

Commit 94ed8fb

Browse files
committed
add user-level and table-user level metrics and cleaner
1 parent 7d573a0 commit 94ed8fb

File tree

13 files changed

+432
-34
lines changed

13 files changed

+432
-34
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1695,6 +1695,14 @@ public class ConfigOptions {
16951695
+ "the CoordinatorServer) it is advisable to use a port range "
16961696
+ "like 9250-9260.");
16971697

1698+
public static final ConfigOption<Duration> METRICS_MANAGER_INACTIVE_EXPIRATION_TIME =
1699+
key("metrics.manager.inactive-expiration-time")
1700+
.durationType()
1701+
.defaultValue(Duration.ofHours(1))
1702+
.withDescription(
1703+
"The time to wait an inactive metric to be expired."
1704+
+ "This is not effective for permanent metric but for temporary metric. Mostly user level metric.");
1705+
16981706
// ------------------------------------------------------------------------
16991707
// ConfigOptions for jmx reporter
17001708
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ public class MetricNames {
7474
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
7575
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
7676

77+
// --------------------------------------------------------------------------------------------
78+
// metrics for user
79+
// --------------------------------------------------------------------------------------------
80+
public static final String BYTES_IN_COUNT = "bytesInCount";
81+
public static final String BYTES_OUT_COUNT = "bytesOutCount";
82+
7783
// --------------------------------------------------------------------------------------------
7884
// metrics for table
7985
// --------------------------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ public abstract class AbstractMetricGroup implements MetricGroup {
7373
/** The registry that this metrics group belongs to. */
7474
protected final MetricRegistry registry;
7575

76+
/** The last record time for the group. */
77+
protected volatile long lastRecordTime;
78+
7679
/** All metrics that are directly contained in this group. */
7780
private final Map<String, Metric> metrics = new HashMap<>();
7881

@@ -101,6 +104,7 @@ public AbstractMetricGroup(
101104
this.scopeComponents = checkNotNull(scope);
102105
this.parent = parent;
103106
this.logicalScopeStrings = new String[registry.getNumberReporters()];
107+
this.lastRecordTime = System.currentTimeMillis();
104108
}
105109

106110
@Override
@@ -195,6 +199,10 @@ protected void putVariables(Map<String, String> variables) {}
195199
*/
196200
protected abstract String getGroupName(CharacterFilter filter);
197201

202+
public long getLastRecordTime() {
203+
return lastRecordTime;
204+
}
205+
198206
// ------------------------------------------------------------------------
199207
// Closing
200208
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metrics/utils/MetricGroupUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.fluss.metrics.groups.MetricGroup;
2121

22+
import java.util.Arrays;
23+
2224
/** Utils for {@link MetricGroup}. */
2325
public class MetricGroupUtils {
2426

@@ -30,4 +32,8 @@ public static String[] makeScope(MetricGroup parentGroup, String... scopes) {
3032
System.arraycopy(scopes, 0, parts, parentComponents.length, scopes.length);
3133
return parts;
3234
}
35+
36+
public static String getScopeName(MetricGroup parentGroup, String... scopes) {
37+
return Arrays.toString(makeScope(parentGroup, scopes));
38+
}
3339
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.metrics;
19+
20+
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.config.ConfigOptions;
22+
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
24+
import org.apache.fluss.utils.MapUtils;
25+
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.Map;
31+
import java.util.concurrent.ConcurrentMap;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.function.Function;
37+
38+
/**
39+
* Designed to manage and clean up metrics which will be expired. To be more specific, we may have
40+
* three kind of metrics: 1. Never expired metrics, such as "tableCount". It's value will change but
41+
* the metric itself is persistent. 2. Expired but autonomous metrics, such as "bytesInPerSecond"
42+
* for a table. Once the table was dropped, the metric will be deleted. So we don't need to worry
43+
* about memory leak. 3. Expired and free metrics, mostly as a user level metric, such as
44+
* "bytesInCount" or "bytesOutCount" for a user. Users may start and stop to write/read a table
45+
* anytime so that we can't predict when to free the metric. In such scene, to avoid memory leak, we
46+
* design MetricManager to clean up them periodically if a metric is inactive and reach an
47+
* expiration time.
48+
*/
49+
public class MetricManager implements AutoCloseable {
50+
private static final Logger LOG = LoggerFactory.getLogger(MetricManager.class);
51+
52+
private final long inactiveMetricExpirationTimeMs;
53+
private final ScheduledExecutorService metricsScheduler;
54+
private final ConcurrentMap<String, AbstractMetricGroup> metrics =
55+
MapUtils.newConcurrentHashMap();
56+
private final AtomicBoolean isClosed = new AtomicBoolean(false);
57+
58+
public MetricManager(Configuration configuration) {
59+
this.inactiveMetricExpirationTimeMs =
60+
configuration
61+
.get(ConfigOptions.METRICS_MANAGER_INACTIVE_EXPIRATION_TIME)
62+
.toMillis();
63+
64+
this.metricsScheduler =
65+
Executors.newScheduledThreadPool(
66+
1, new ExecutorThreadFactory("periodic-metric-cleanup-manager"));
67+
this.metricsScheduler.scheduleAtFixedRate(
68+
new ExpiredMetricCleanupTask(), 30, 30, TimeUnit.SECONDS);
69+
}
70+
71+
@SuppressWarnings("unchecked")
72+
public <T extends AbstractMetricGroup> T getOrCreateMetric(
73+
String key, Function<String, T> mappingFunction) {
74+
checkNotClosed();
75+
76+
return (T) metrics.computeIfAbsent(key, mappingFunction);
77+
}
78+
79+
@VisibleForTesting
80+
public AbstractMetricGroup getMetric(String key) {
81+
return metrics.get(key);
82+
}
83+
84+
public void removeMetric(String name) {
85+
checkNotClosed();
86+
87+
AbstractMetricGroup metricGroup = metrics.remove(name);
88+
if (metricGroup != null) {
89+
metricGroup.close();
90+
}
91+
}
92+
93+
public boolean hasExpired(String metricName) {
94+
return (System.currentTimeMillis() - metrics.get(metricName).getLastRecordTime())
95+
> this.inactiveMetricExpirationTimeMs;
96+
}
97+
98+
private void checkNotClosed() {
99+
if (isClosed.get()) {
100+
throw new IllegalStateException("MetricManager is already closed.");
101+
}
102+
}
103+
104+
@Override
105+
public void close() throws Exception {
106+
if (isClosed.compareAndSet(false, true)) {
107+
metricsScheduler.shutdownNow();
108+
}
109+
}
110+
111+
class ExpiredMetricCleanupTask implements Runnable {
112+
@Override
113+
public void run() {
114+
for (Map.Entry<String, AbstractMetricGroup> metricEntry : metrics.entrySet()) {
115+
String metricName = metricEntry.getKey();
116+
AbstractMetricGroup metricGroup = metricEntry.getValue();
117+
synchronized (metricGroup) {
118+
if (hasExpired(metricName)) {
119+
LOG.info("Removing expired metric {}", metricName);
120+
removeMetric(metricName);
121+
}
122+
}
123+
}
124+
}
125+
}
126+
}

fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,11 @@ public static TabletServerMetricGroup createTabletServerGroup(
8484
String clusterId,
8585
@Nullable String rack,
8686
String hostname,
87-
int serverId) {
87+
int serverId,
88+
MetricManager metricManager) {
8889
TabletServerMetricGroup tabletServerMetricGroup =
89-
new TabletServerMetricGroup(registry, clusterId, rack, hostname, serverId);
90+
new TabletServerMetricGroup(
91+
registry, clusterId, rack, hostname, serverId, metricManager);
9092
createAndInitializeStatusMetricGroup(tabletServerMetricGroup);
9193
return tabletServerMetricGroup;
9294
}

fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
2828
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2929
import org.apache.fluss.metrics.registry.MetricRegistry;
30+
import org.apache.fluss.metrics.utils.MetricGroupUtils;
3031
import org.apache.fluss.security.acl.FlussPrincipal;
3132
import org.apache.fluss.server.entity.UserContext;
32-
import org.apache.fluss.utils.MapUtils;
33+
import org.apache.fluss.server.metrics.MetricManager;
3334

3435
import javax.annotation.Nullable;
3536

@@ -47,7 +48,7 @@ public class TableMetricGroup extends AbstractMetricGroup {
4748

4849
private final Map<TableBucket, BucketMetricGroup> buckets = new HashMap<>();
4950

50-
private final Map<String, UserMetricGroup> userMetricGroups = MapUtils.newConcurrentHashMap();
51+
private final MetricManager metricManager;
5152

5253
private final TablePath tablePath;
5354

@@ -64,14 +65,15 @@ public TableMetricGroup(
6465
MetricRegistry registry,
6566
TablePath tablePath,
6667
boolean isKvTable,
67-
TabletServerMetricGroup serverMetricGroup) {
68+
TabletServerMetricGroup serverMetricGroup,
69+
MetricManager metricManager) {
6870
super(
6971
registry,
7072
makeScope(serverMetricGroup, tablePath.getDatabaseName(), tablePath.getTableName()),
7173
serverMetricGroup);
7274
this.serverMetrics = serverMetricGroup;
7375
this.tablePath = tablePath;
74-
76+
this.metricManager = metricManager;
7577
// if is kv table, create kv metrics
7678
if (isKvTable) {
7779
kvMetrics = new KvMetricGroup(this);
@@ -100,28 +102,28 @@ public void incLogMessageIn(long n) {
100102
serverMetrics.messageIn().inc(n);
101103
}
102104

103-
public void incLogBytesIn(long n, UserContext userContext) {
105+
public void incLogBytesIn(long n, @Nullable UserContext userContext) {
104106
logMetrics.bytesIn.inc(n);
105-
serverMetrics.bytesIn().inc(n);
107+
serverMetrics.incBytesIn(n, userContext);
106108

107-
// user level metric
109+
// table user level metric, only consider log table, exclude CDC log.
108110
Optional.ofNullable(userContext)
109111
.map(UserContext::getPrincipal)
110112
.map(FlussPrincipal::getName)
111113
.filter(name -> !name.isEmpty())
112-
.ifPresent(name -> getOrCreateUserMetricGroup(name).bytesIn.inc(n));
114+
.ifPresent(name -> getOrCreateUserMetricGroup(name).incBytesIn(n));
113115
}
114116

115-
public void incLogBytesOut(long n, UserContext userContext) {
117+
public void incLogBytesOut(long n, @Nullable UserContext userContext) {
116118
logMetrics.bytesOut.inc(n);
117-
serverMetrics.bytesOut().inc(n);
119+
serverMetrics.incBytesOut(n, userContext);
118120

119121
// user level metric
120122
Optional.ofNullable(userContext)
121123
.map(UserContext::getPrincipal)
122124
.map(FlussPrincipal::getName)
123125
.filter(name -> !name.isEmpty())
124-
.ifPresent(name -> getOrCreateUserMetricGroup(name).bytesOut.inc(n));
126+
.ifPresent(name -> getOrCreateUserMetricGroup(name).incBytesOut(n));
125127
}
126128

127129
public Counter totalFetchLogRequests() {
@@ -245,26 +247,26 @@ public Counter failedPrefixLookupRequests() {
245247
// ------------------------------------------------------------------------
246248
// user groups
247249
// ------------------------------------------------------------------------
248-
private UserMetricGroup getOrCreateUserMetricGroup(String principalName) {
249-
return userMetricGroups.computeIfAbsent(
250-
principalName, name -> new UserMetricGroup(this, principalName));
250+
private TableUserMetricGroup getOrCreateUserMetricGroup(String principalName) {
251+
String uniqueName = MetricGroupUtils.getScopeName(this, principalName);
252+
return metricManager.getOrCreateMetric(
253+
uniqueName, name -> new TableUserMetricGroup(this, principalName));
251254
}
252255

253-
private static class UserMetricGroup extends AbstractMetricGroup {
256+
private static class TableUserMetricGroup extends AbstractMetricGroup {
254257
private final String principalName;
255258
protected final Counter bytesIn;
256259
protected final Counter bytesOut;
257260

258-
private UserMetricGroup(TableMetricGroup tableMetricGroup, String principalName) {
261+
private TableUserMetricGroup(TableMetricGroup tableMetricGroup, String principalName) {
259262
super(
260263
tableMetricGroup.registry,
261264
makeScope(tableMetricGroup, principalName),
262265
tableMetricGroup);
263266
this.principalName = principalName;
264-
bytesIn = new ThreadSafeSimpleCounter();
265-
meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
266-
bytesOut = new ThreadSafeSimpleCounter();
267-
meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
267+
268+
bytesIn = counter(MetricNames.BYTES_IN_COUNT, new ThreadSafeSimpleCounter());
269+
bytesOut = counter(MetricNames.BYTES_OUT_COUNT, new ThreadSafeSimpleCounter());
268270
}
269271

270272
@Override
@@ -274,7 +276,17 @@ protected String getGroupName(CharacterFilter filter) {
274276

275277
@Override
276278
protected void putVariables(Map<String, String> variables) {
277-
variables.put("name", principalName);
279+
variables.put("user", principalName);
280+
}
281+
282+
public void incBytesIn(long n) {
283+
this.lastRecordTime = System.currentTimeMillis();
284+
bytesIn.inc(n);
285+
}
286+
287+
public void incBytesOut(long n) {
288+
this.lastRecordTime = System.currentTimeMillis();
289+
bytesOut.inc(n);
278290
}
279291
}
280292

0 commit comments

Comments
 (0)