Skip to content

Commit 459c763

Browse files
zcoowuchong
andauthored
[metric] Add user-level metrics for byteIn and byteOut (#2080)
Co-authored-by: Jark Wu <[email protected]>
1 parent cfb986b commit 459c763

File tree

23 files changed

+759
-20
lines changed

23 files changed

+759
-20
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ public class MetricNames {
7474
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
7575
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
7676

77+
// --------------------------------------------------------------------------------------------
78+
// metrics for user
79+
// --------------------------------------------------------------------------------------------
80+
public static final String BYTES_IN = "bytesIn";
81+
public static final String BYTES_OUT = "bytesOut";
82+
7783
// --------------------------------------------------------------------------------------------
7884
// metrics for table
7985
// --------------------------------------------------------------------------------------------
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.entity;
20+
21+
import org.apache.fluss.security.acl.FlussPrincipal;
22+
23+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
24+
25+
/** The context information of user who writes or reads table. */
26+
public class UserContext {
27+
private final FlussPrincipal principal;
28+
29+
public UserContext(FlussPrincipal principal) {
30+
this.principal = checkNotNull(principal);
31+
}
32+
33+
public FlussPrincipal getPrincipal() {
34+
return principal;
35+
}
36+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fluss.server.metrics;
21+
22+
import org.apache.fluss.annotation.VisibleForTesting;
23+
import org.apache.fluss.metadata.TablePath;
24+
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
25+
import org.apache.fluss.metrics.registry.MetricRegistry;
26+
import org.apache.fluss.security.acl.FlussPrincipal;
27+
import org.apache.fluss.server.entity.UserContext;
28+
import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
29+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
30+
import org.apache.fluss.server.metrics.group.UserMetricGroup;
31+
import org.apache.fluss.server.metrics.group.UserPerTableMetricGroup;
32+
import org.apache.fluss.utils.MapUtils;
33+
import org.apache.fluss.utils.concurrent.Scheduler;
34+
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import javax.annotation.Nullable;
39+
40+
import java.util.Map;
41+
import java.util.Objects;
42+
import java.util.concurrent.ConcurrentMap;
43+
import java.util.concurrent.ScheduledFuture;
44+
import java.util.concurrent.atomic.AtomicBoolean;
45+
46+
/**
47+
* Designed to manage and cleanup user-level metrics (will be used for future quota management).
48+
*
49+
* <p>To be more specific, Fluss server maintains three kind of metrics:
50+
*
51+
* <ul>
52+
* <li>1. Server-level metrics which is never expired or dropped, such as "tableCount".
53+
* <li>2. Table-level metrics which is dropped when the table is deleted, such as
54+
* "bytesInPerSecond" for a table.
55+
* <li>3. User-level metrics which will be expired and dropped after a period of inactivity, such
56+
* as "bytesInRate" for a user.
57+
* </ul>
58+
*
59+
* <p>This class mainly manages the user-level metrics. There are many a lot of users to read/write
60+
* tables, but most of them are idle after a period of time. To avoid memory leak or GC overhead, we
61+
* need to clean up those inactive user-level metrics periodically.
62+
*/
63+
public class UserMetrics implements AutoCloseable {
64+
private static final Logger LOG = LoggerFactory.getLogger(UserMetrics.class);
65+
private static final long INACTIVE_METRIC_EXPIRATION_TIME_MS = 3600_000L; // 1 hour
66+
private static final long METRICS_CLEANUP_INTERVAL_MS = 30_000L; // 30s
67+
68+
private final long inactiveMetricExpirationTimeMs;
69+
private final MetricRegistry metricRegistry;
70+
private final TabletServerMetricGroup parentMetricGroup;
71+
private final ScheduledFuture<?> schedule;
72+
73+
private final ConcurrentMap<MetricKey, AbstractUserMetricGroup> metrics =
74+
MapUtils.newConcurrentHashMap();
75+
private final AtomicBoolean isClosed = new AtomicBoolean(false);
76+
77+
public UserMetrics(
78+
Scheduler cleanupScheduler,
79+
MetricRegistry metricRegistry,
80+
TabletServerMetricGroup parentMetricGroup) {
81+
this(
82+
INACTIVE_METRIC_EXPIRATION_TIME_MS,
83+
METRICS_CLEANUP_INTERVAL_MS,
84+
cleanupScheduler,
85+
metricRegistry,
86+
parentMetricGroup);
87+
}
88+
89+
@VisibleForTesting
90+
UserMetrics(
91+
long inactiveMetricExpirationTimeMs,
92+
long cleanupIntervalMs,
93+
Scheduler cleanupScheduler,
94+
MetricRegistry metricRegistry,
95+
TabletServerMetricGroup parentMetricGroup) {
96+
this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
97+
this.metricRegistry = metricRegistry;
98+
this.parentMetricGroup = parentMetricGroup;
99+
this.schedule =
100+
cleanupScheduler.schedule(
101+
"user-metrics-expired-cleanup-task",
102+
new ExpiredMetricCleanupTask(),
103+
cleanupIntervalMs,
104+
cleanupIntervalMs);
105+
}
106+
107+
protected AbstractUserMetricGroup getOrCreateMetric(MetricKey metricKey) {
108+
return metrics.computeIfAbsent(
109+
metricKey,
110+
key -> {
111+
if (metricKey.tablePath != null) {
112+
return new UserPerTableMetricGroup(
113+
metricRegistry,
114+
key.userName,
115+
key.tablePath,
116+
inactiveMetricExpirationTimeMs,
117+
parentMetricGroup);
118+
} else {
119+
return new UserMetricGroup(
120+
metricRegistry,
121+
key.userName,
122+
inactiveMetricExpirationTimeMs,
123+
parentMetricGroup);
124+
}
125+
});
126+
}
127+
128+
/** Increments the number of bytes written by a user on a specific table. */
129+
public void incBytesIn(@Nullable UserContext userContext, TablePath tablePath, long numBytes) {
130+
incBytes(userContext, tablePath, numBytes, true);
131+
}
132+
133+
/** Increments the number of bytes read by a user on a specific table. */
134+
public void incBytesOut(@Nullable UserContext userContext, TablePath tablePath, long numBytes) {
135+
incBytes(userContext, tablePath, numBytes, false);
136+
}
137+
138+
private void incBytes(
139+
@Nullable UserContext userContext,
140+
TablePath tablePath,
141+
long numBytes,
142+
boolean bytesIn) {
143+
if (userContext == null
144+
|| userContext.getPrincipal() == FlussPrincipal.ANY
145+
|| userContext.getPrincipal() == FlussPrincipal.WILD_CARD_PRINCIPAL
146+
|| userContext.getPrincipal() == FlussPrincipal.ANONYMOUS) {
147+
// Ignore null or anonymous or wildcard users
148+
return;
149+
}
150+
String userName = userContext.getPrincipal().getName();
151+
AbstractUserMetricGroup user = getOrCreateMetric(new MetricKey(userName, null));
152+
AbstractUserMetricGroup perTable = getOrCreateMetric(new MetricKey(userName, tablePath));
153+
if (bytesIn) {
154+
user.incBytesIn(numBytes);
155+
perTable.incBytesIn(numBytes);
156+
} else {
157+
user.incBytesOut(numBytes);
158+
perTable.incBytesOut(numBytes);
159+
}
160+
}
161+
162+
@VisibleForTesting
163+
int numMetrics() {
164+
return metrics.size();
165+
}
166+
167+
@Override
168+
public void close() throws Exception {
169+
if (isClosed.compareAndSet(false, true)) {
170+
schedule.cancel(true);
171+
for (AbstractMetricGroup metricGroup : metrics.values()) {
172+
metricGroup.close();
173+
}
174+
metrics.clear();
175+
}
176+
}
177+
178+
/** A periodic task to clean up expired user metrics. */
179+
private class ExpiredMetricCleanupTask implements Runnable {
180+
181+
@Override
182+
public void run() {
183+
for (Map.Entry<MetricKey, AbstractUserMetricGroup> metricEntry : metrics.entrySet()) {
184+
MetricKey metricName = metricEntry.getKey();
185+
AbstractUserMetricGroup userMetric = metricEntry.getValue();
186+
synchronized (userMetric) {
187+
if (userMetric.hasExpired()) {
188+
LOG.debug("Removing expired user metric [{}]", metricName);
189+
metrics.remove(metricName);
190+
userMetric.close();
191+
}
192+
}
193+
}
194+
}
195+
}
196+
197+
/** The key to identify user metrics. */
198+
protected static class MetricKey {
199+
final String userName;
200+
@Nullable final TablePath tablePath;
201+
202+
MetricKey(String userName, @Nullable TablePath tablePath) {
203+
this.userName = userName;
204+
this.tablePath = tablePath;
205+
}
206+
207+
@Override
208+
public boolean equals(Object o) {
209+
if (o == null || getClass() != o.getClass()) {
210+
return false;
211+
}
212+
MetricKey metricKey = (MetricKey) o;
213+
return Objects.equals(userName, metricKey.userName)
214+
&& Objects.equals(tablePath, metricKey.tablePath);
215+
}
216+
217+
@Override
218+
public int hashCode() {
219+
return Objects.hash(userName, tablePath);
220+
}
221+
222+
@Override
223+
public String toString() {
224+
if (tablePath == null) {
225+
return userName;
226+
} else {
227+
return userName + ":" + tablePath;
228+
}
229+
}
230+
}
231+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fluss.server.metrics.group;
21+
22+
import org.apache.fluss.annotation.VisibleForTesting;
23+
import org.apache.fluss.metrics.CharacterFilter;
24+
import org.apache.fluss.metrics.Counter;
25+
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
26+
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
27+
import org.apache.fluss.metrics.registry.MetricRegistry;
28+
29+
import java.util.Map;
30+
31+
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
32+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
33+
34+
/** Abstract metric the users in server and tracks the expiration. */
35+
public abstract class AbstractUserMetricGroup extends AbstractMetricGroup {
36+
private static final String NAME = "user";
37+
38+
private final String principalName;
39+
private final long inactiveMetricExpirationTimeMs;
40+
protected final Counter bytesIn;
41+
protected final Counter bytesOut;
42+
43+
/** The last record time for the group. */
44+
protected volatile long lastRecordTime;
45+
46+
public AbstractUserMetricGroup(
47+
MetricRegistry registry,
48+
String principalName,
49+
long inactiveMetricExpirationTimeMs,
50+
TabletServerMetricGroup tabletServerMetricGroup) {
51+
super(registry, makeScope(tabletServerMetricGroup, principalName), tabletServerMetricGroup);
52+
this.principalName = checkNotNull(principalName);
53+
this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
54+
55+
this.bytesIn = new ThreadSafeSimpleCounter();
56+
this.bytesOut = new ThreadSafeSimpleCounter();
57+
58+
this.lastRecordTime = System.currentTimeMillis();
59+
}
60+
61+
@Override
62+
protected String getGroupName(CharacterFilter filter) {
63+
return NAME;
64+
}
65+
66+
@VisibleForTesting
67+
public String getPrincipalName() {
68+
return principalName;
69+
}
70+
71+
@Override
72+
protected void putVariables(Map<String, String> variables) {
73+
variables.put("user", principalName);
74+
}
75+
76+
public void incBytesIn(long numBytes) {
77+
this.lastRecordTime = System.currentTimeMillis();
78+
bytesIn.inc(numBytes);
79+
}
80+
81+
public void incBytesOut(long numBytes) {
82+
this.lastRecordTime = System.currentTimeMillis();
83+
bytesOut.inc(numBytes);
84+
}
85+
86+
/** Return true if the metric is eligible for removal due to inactivity. false otherwise. */
87+
public boolean hasExpired() {
88+
return (System.currentTimeMillis() - lastRecordTime) > this.inactiveMetricExpirationTimeMs;
89+
}
90+
}

0 commit comments

Comments
 (0)