Skip to content

Commit 6739142

Browse files
committed
[metric] Add user level metrics for byteIn and byteOut
1 parent 4ce849f commit 6739142

File tree

7 files changed

+181
-20
lines changed

7 files changed

+181
-20
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.entity;
20+
21+
import org.apache.fluss.security.acl.FlussPrincipal;
22+
23+
/** The session for user who writes or reads table. */
24+
public class LogUserSession {
25+
private final FlussPrincipal principal;
26+
27+
public LogUserSession(FlussPrincipal principal) {
28+
this.principal = principal;
29+
}
30+
31+
public FlussPrincipal getPrincipal() {
32+
return principal;
33+
}
34+
}

fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
2828
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2929
import org.apache.fluss.metrics.registry.MetricRegistry;
30+
import org.apache.fluss.security.acl.FlussPrincipal;
31+
import org.apache.fluss.utils.MapUtils;
3032

3133
import javax.annotation.Nullable;
3234

@@ -43,6 +45,8 @@ public class TableMetricGroup extends AbstractMetricGroup {
4345

4446
private final Map<TableBucket, BucketMetricGroup> buckets = new HashMap<>();
4547

48+
private final Map<String, UserMetricGroup> userMetricGroups = MapUtils.newConcurrentHashMap();
49+
4650
private final TablePath tablePath;
4751

4852
// server-level metrics
@@ -89,19 +93,37 @@ protected String getGroupName(CharacterFilter filter) {
8993
return "table";
9094
}
9195

92-
public void incLogMessageIn(long n) {
96+
public void incLogMessageIn(long n, FlussPrincipal principal) {
9397
logMetrics.messagesIn.inc(n);
9498
serverMetrics.messageIn().inc(n);
99+
100+
// user level metric
101+
if (principal != null && !principal.getName().isEmpty()) {
102+
UserMetricGroup userGroup = getOrCreateUserMetricGroup(principal.getName());
103+
userGroup.messagesIn.inc(n);
104+
}
95105
}
96106

97-
public void incLogBytesIn(long n) {
107+
public void incLogBytesIn(long n, FlussPrincipal principal) {
98108
logMetrics.bytesIn.inc(n);
99109
serverMetrics.bytesIn().inc(n);
110+
111+
// user level metric
112+
if (principal != null && !principal.getName().isEmpty()) {
113+
UserMetricGroup userGroup = getOrCreateUserMetricGroup(principal.getName());
114+
userGroup.bytesIn.inc(n);
115+
}
100116
}
101117

102-
public void incLogBytesOut(long n) {
118+
public void incLogBytesOut(long n, FlussPrincipal principal) {
103119
logMetrics.bytesOut.inc(n);
104120
serverMetrics.bytesOut().inc(n);
121+
122+
// user level metric
123+
if (principal != null && !principal.getName().isEmpty()) {
124+
UserMetricGroup userGroup = getOrCreateUserMetricGroup(principal.getName());
125+
userGroup.bytesOut.inc(n);
126+
}
105127
}
106128

107129
public Counter totalFetchLogRequests() {
@@ -222,6 +244,48 @@ public Counter failedPrefixLookupRequests() {
222244
}
223245
}
224246

247+
// ------------------------------------------------------------------------
248+
// user groups
249+
// ------------------------------------------------------------------------
250+
private UserMetricGroup getOrCreateUserMetricGroup(String principalName) {
251+
return userMetricGroups.computeIfAbsent(
252+
principalName,
253+
name -> {
254+
return new UserMetricGroup(this, principalName);
255+
});
256+
}
257+
258+
private static class UserMetricGroup extends AbstractMetricGroup {
259+
private final String principalName;
260+
protected final Counter bytesIn;
261+
protected final Counter bytesOut;
262+
protected final Counter messagesIn;
263+
264+
private UserMetricGroup(TableMetricGroup tableMetricGroup, String principalName) {
265+
super(
266+
tableMetricGroup.registry,
267+
makeScope(tableMetricGroup, principalName),
268+
tableMetricGroup);
269+
this.principalName = principalName;
270+
messagesIn = new ThreadSafeSimpleCounter();
271+
meter(MetricNames.MESSAGES_IN_RATE, new MeterView(messagesIn));
272+
bytesIn = new ThreadSafeSimpleCounter();
273+
meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
274+
bytesOut = new ThreadSafeSimpleCounter();
275+
meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
276+
}
277+
278+
@Override
279+
protected String getGroupName(CharacterFilter filter) {
280+
return "user";
281+
}
282+
283+
@Override
284+
protected void putVariables(Map<String, String> variables) {
285+
variables.put("name", principalName);
286+
}
287+
}
288+
225289
// ------------------------------------------------------------------------
226290
// bucket groups
227291
// ------------------------------------------------------------------------

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.fluss.server.coordinator.CoordinatorContext;
6161
import org.apache.fluss.server.entity.FetchReqInfo;
6262
import org.apache.fluss.server.entity.LakeBucketOffset;
63+
import org.apache.fluss.server.entity.LogUserSession;
6364
import org.apache.fluss.server.entity.NotifyKvSnapshotOffsetData;
6465
import org.apache.fluss.server.entity.NotifyLakeTableOffsetData;
6566
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
@@ -450,21 +451,30 @@ public void appendRecordsToLog(
450451
int timeoutMs,
451452
int requiredAcks,
452453
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
454+
LogUserSession userSession,
453455
Consumer<List<ProduceLogResultForBucket>> responseCallback) {
454456
if (isRequiredAcksInvalid(requiredAcks)) {
455457
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
456458
}
457459

458460
long startTime = System.currentTimeMillis();
459461
Map<TableBucket, ProduceLogResultForBucket> appendResult =
460-
appendToLocalLog(entriesPerBucket, requiredAcks);
462+
appendToLocalLog(entriesPerBucket, requiredAcks, userSession);
461463
LOG.debug("Append records to local log in {} ms", System.currentTimeMillis() - startTime);
462464

463465
// maybe do delay write operation.
464466
maybeAddDelayedWrite(
465467
timeoutMs, requiredAcks, entriesPerBucket.size(), appendResult, responseCallback);
466468
}
467469

470+
public void appendRecordsToLog(
471+
int timeoutMs,
472+
int requiredAcks,
473+
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
474+
Consumer<List<ProduceLogResultForBucket>> responseCallback) {
475+
appendRecordsToLog(timeoutMs, requiredAcks, entriesPerBucket, null, responseCallback);
476+
}
477+
468478
/**
469479
* Fetch records from a replica. Currently, we will return the fetched records immediately.
470480
*
@@ -474,17 +484,27 @@ public void appendRecordsToLog(
474484
public void fetchLogRecords(
475485
FetchParams params,
476486
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
487+
LogUserSession userSession,
477488
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
478489
long startTime = System.currentTimeMillis();
479-
Map<TableBucket, LogReadResult> logReadResults = readFromLog(params, bucketFetchInfo);
490+
Map<TableBucket, LogReadResult> logReadResults =
491+
readFromLog(params, bucketFetchInfo, userSession);
480492
if (LOG.isTraceEnabled()) {
481493
LOG.trace(
482494
"Fetch log records from local log in {} ms",
483495
System.currentTimeMillis() - startTime);
484496
}
485497

486498
// maybe do delay fetch log operation.
487-
maybeAddDelayedFetchLog(params, bucketFetchInfo, logReadResults, responseCallback);
499+
maybeAddDelayedFetchLog(
500+
params, bucketFetchInfo, logReadResults, userSession, responseCallback);
501+
}
502+
503+
public void fetchLogRecords(
504+
FetchParams params,
505+
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
506+
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
507+
fetchLogRecords(params, bucketFetchInfo, null, responseCallback);
488508
}
489509

490510
/**
@@ -496,14 +516,15 @@ public void putRecordsToKv(
496516
int requiredAcks,
497517
Map<TableBucket, KvRecordBatch> entriesPerBucket,
498518
@Nullable int[] targetColumns,
519+
LogUserSession userSession,
499520
Consumer<List<PutKvResultForBucket>> responseCallback) {
500521
if (isRequiredAcksInvalid(requiredAcks)) {
501522
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
502523
}
503524

504525
long startTime = System.currentTimeMillis();
505526
Map<TableBucket, PutKvResultForBucket> kvPutResult =
506-
putToLocalKv(entriesPerBucket, targetColumns, requiredAcks);
527+
putToLocalKv(entriesPerBucket, targetColumns, requiredAcks, userSession);
507528
LOG.debug(
508529
"Put records to local kv storage and wait generate cdc log in {} ms",
509530
System.currentTimeMillis() - startTime);
@@ -514,6 +535,16 @@ public void putRecordsToKv(
514535
timeoutMs, requiredAcks, entriesPerBucket.size(), kvPutResult, responseCallback);
515536
}
516537

538+
public void putRecordsToKv(
539+
int timeoutMs,
540+
int requiredAcks,
541+
Map<TableBucket, KvRecordBatch> entriesPerBucket,
542+
@Nullable int[] targetColumns,
543+
Consumer<List<PutKvResultForBucket>> responseCallback) {
544+
putRecordsToKv(
545+
timeoutMs, requiredAcks, entriesPerBucket, targetColumns, null, responseCallback);
546+
}
547+
517548
/** Lookup a single key value. */
518549
@VisibleForTesting
519550
protected void lookup(TableBucket tableBucket, byte[] key, Consumer<byte[]> responseCallback) {
@@ -927,7 +958,9 @@ private void addFetcherForReplicas(
927958

928959
/** Append log records to leader replicas of the buckets. */
929960
private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
930-
Map<TableBucket, MemoryLogRecords> entriesPerBucket, int requiredAcks) {
961+
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
962+
int requiredAcks,
963+
LogUserSession userSession) {
931964
Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new HashMap<>();
932965
for (Map.Entry<TableBucket, MemoryLogRecords> entry : entriesPerBucket.entrySet()) {
933966
TableBucket tb = entry.getKey();
@@ -950,8 +983,8 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
950983
resultForBucketMap.put(
951984
tb,
952985
new ProduceLogResultForBucket(tb, baseOffset, appendInfo.lastOffset() + 1));
953-
tableMetrics.incLogBytesIn(appendInfo.validBytes());
954-
tableMetrics.incLogMessageIn(appendInfo.numMessages());
986+
tableMetrics.incLogBytesIn(appendInfo.validBytes(), userSession.getPrincipal());
987+
tableMetrics.incLogMessageIn(appendInfo.numMessages(), userSession.getPrincipal());
955988
} catch (Exception e) {
956989
if (isUnexpectedException(e)) {
957990
LOG.error("Error append records to local log on replica {}", tb, e);
@@ -973,7 +1006,8 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
9731006
private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
9741007
Map<TableBucket, KvRecordBatch> entriesPerBucket,
9751008
@Nullable int[] targetColumns,
976-
int requiredAcks) {
1009+
int requiredAcks,
1010+
LogUserSession userSession) {
9771011
Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new HashMap<>();
9781012
for (Map.Entry<TableBucket, KvRecordBatch> entry : entriesPerBucket.entrySet()) {
9791013
TableBucket tb = entry.getKey();
@@ -997,8 +1031,8 @@ private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
9971031
tableMetrics.incKvMessageIn(entry.getValue().getRecordCount());
9981032
tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes());
9991033
// metric for cdc log of kv
1000-
tableMetrics.incLogBytesIn(appendInfo.validBytes());
1001-
tableMetrics.incLogMessageIn(appendInfo.numMessages());
1034+
tableMetrics.incLogBytesIn(appendInfo.validBytes(), userSession.getPrincipal());
1035+
tableMetrics.incLogMessageIn(appendInfo.numMessages(), userSession.getPrincipal());
10021036
} catch (Exception e) {
10031037
if (isUnexpectedException(e)) {
10041038
LOG.error("Error put records to local kv on replica {}", tb, e);
@@ -1051,7 +1085,9 @@ public void limitScan(
10511085
}
10521086

10531087
public Map<TableBucket, LogReadResult> readFromLog(
1054-
FetchParams fetchParams, Map<TableBucket, FetchReqInfo> bucketFetchInfo) {
1088+
FetchParams fetchParams,
1089+
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
1090+
LogUserSession userSession) {
10551091
Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
10561092
boolean isFromFollower = fetchParams.isFromFollower();
10571093
int limitBytes = fetchParams.maxFetchBytes();
@@ -1109,7 +1145,7 @@ public Map<TableBucket, LogReadResult> readFromLog(
11091145
if (isFromFollower) {
11101146
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
11111147
} else {
1112-
tableMetrics.incLogBytesOut(recordBatchSize);
1148+
tableMetrics.incLogBytesOut(recordBatchSize, userSession.getPrincipal());
11131149
}
11141150
} catch (Exception e) {
11151151
if (isUnexpectedException(e)) {
@@ -1299,6 +1335,7 @@ private void maybeAddDelayedFetchLog(
12991335
FetchParams params,
13001336
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
13011337
Map<TableBucket, LogReadResult> logReadResults,
1338+
LogUserSession userSession,
13021339
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
13031340
long bytesReadable = 0;
13041341
boolean errorReadingData = false;
@@ -1365,7 +1402,8 @@ private void maybeAddDelayedFetchLog(
13651402
delayedResponse.putAll(expectedErrorBuckets);
13661403
responseCallback.accept(delayedResponse);
13671404
},
1368-
serverMetricGroup);
1405+
serverMetricGroup,
1406+
userSession);
13691407

13701408
// try to complete the request immediately, otherwise put it into the
13711409
// delayedFetchLogManager; this is because while the delayed fetch log operation is

fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
2929
import org.apache.fluss.rpc.messages.FetchLogRequest;
3030
import org.apache.fluss.server.entity.FetchReqInfo;
31+
import org.apache.fluss.server.entity.LogUserSession;
3132
import org.apache.fluss.server.log.FetchIsolation;
3233
import org.apache.fluss.server.log.FetchParams;
3334
import org.apache.fluss.server.log.LogOffsetMetadata;
@@ -59,19 +60,22 @@ public class DelayedFetchLog extends DelayedOperation {
5960
private final Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap;
6061
private final Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback;
6162
private final TabletServerMetricGroup serverMetricGroup;
63+
private final LogUserSession userSession;
6264

6365
public DelayedFetchLog(
6466
FetchParams params,
6567
ReplicaManager replicaManager,
6668
Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap,
6769
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback,
68-
TabletServerMetricGroup serverMetricGroup) {
70+
TabletServerMetricGroup serverMetricGroup,
71+
LogUserSession userSession) {
6972
super(params.maxWaitMs());
7073
this.params = params;
7174
this.replicaManager = replicaManager;
7275
this.fetchBucketStatusMap = fetchBucketStatusMap;
7376
this.responseCallback = responseCallback;
7477
this.serverMetricGroup = serverMetricGroup;
78+
this.userSession = userSession;
7579
}
7680

7781
/** Upon completion, read whatever data is available and pass to the complete callback. */
@@ -93,7 +97,7 @@ public void onComplete() {
9397

9498
// re-fetch data.
9599
Map<TableBucket, LogReadResult> reReadResult =
96-
replicaManager.readFromLog(params, reFetchBuckets);
100+
replicaManager.readFromLog(params, reFetchBuckets, userSession);
97101
reReadResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket()));
98102
responseCallback.accept(result);
99103
}

0 commit comments

Comments
 (0)