Skip to content

Commit 7d573a0

Browse files
committed
fix
1 parent b0f9d32 commit 7d573a0

File tree

15 files changed

+101
-77
lines changed

15 files changed

+101
-77
lines changed

fluss-server/src/main/java/org/apache/fluss/server/entity/LogUserContext.java renamed to fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.fluss.security.acl.FlussPrincipal;
2222

2323
/** The context information of user who writes or reads table. */
24-
public class LogUserContext {
24+
public class UserContext {
2525
private final FlussPrincipal principal;
2626

27-
public LogUserContext(FlussPrincipal principal) {
27+
public UserContext(FlussPrincipal principal) {
2828
this.principal = principal;
2929
}
3030

fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2929
import org.apache.fluss.metrics.registry.MetricRegistry;
3030
import org.apache.fluss.security.acl.FlussPrincipal;
31-
import org.apache.fluss.server.entity.LogUserContext;
31+
import org.apache.fluss.server.entity.UserContext;
3232
import org.apache.fluss.utils.MapUtils;
3333

3434
import javax.annotation.Nullable;
@@ -95,37 +95,30 @@ protected String getGroupName(CharacterFilter filter) {
9595
return "table";
9696
}
9797

98-
public void incLogMessageIn(long n, LogUserContext userContext) {
98+
public void incLogMessageIn(long n) {
9999
logMetrics.messagesIn.inc(n);
100100
serverMetrics.messageIn().inc(n);
101-
102-
// user level metric
103-
Optional.ofNullable(userContext)
104-
.map(LogUserContext::getPrincipal)
105-
.map(FlussPrincipal::getName)
106-
.filter(name -> !name.isEmpty())
107-
.ifPresent(name -> getOrCreateUserMetricGroup(name).messagesIn.inc(n));
108101
}
109102

110-
public void incLogBytesIn(long n, LogUserContext userContext) {
103+
public void incLogBytesIn(long n, UserContext userContext) {
111104
logMetrics.bytesIn.inc(n);
112105
serverMetrics.bytesIn().inc(n);
113106

114107
// user level metric
115108
Optional.ofNullable(userContext)
116-
.map(LogUserContext::getPrincipal)
109+
.map(UserContext::getPrincipal)
117110
.map(FlussPrincipal::getName)
118111
.filter(name -> !name.isEmpty())
119112
.ifPresent(name -> getOrCreateUserMetricGroup(name).bytesIn.inc(n));
120113
}
121114

122-
public void incLogBytesOut(long n, LogUserContext userContext) {
115+
public void incLogBytesOut(long n, UserContext userContext) {
123116
logMetrics.bytesOut.inc(n);
124117
serverMetrics.bytesOut().inc(n);
125118

126119
// user level metric
127120
Optional.ofNullable(userContext)
128-
.map(LogUserContext::getPrincipal)
121+
.map(UserContext::getPrincipal)
129122
.map(FlussPrincipal::getName)
130123
.filter(name -> !name.isEmpty())
131124
.ifPresent(name -> getOrCreateUserMetricGroup(name).bytesOut.inc(n));
@@ -254,26 +247,20 @@ public Counter failedPrefixLookupRequests() {
254247
// ------------------------------------------------------------------------
255248
private UserMetricGroup getOrCreateUserMetricGroup(String principalName) {
256249
return userMetricGroups.computeIfAbsent(
257-
principalName,
258-
name -> {
259-
return new UserMetricGroup(this, principalName);
260-
});
250+
principalName, name -> new UserMetricGroup(this, principalName));
261251
}
262252

263253
private static class UserMetricGroup extends AbstractMetricGroup {
264254
private final String principalName;
265255
protected final Counter bytesIn;
266256
protected final Counter bytesOut;
267-
protected final Counter messagesIn;
268257

269258
private UserMetricGroup(TableMetricGroup tableMetricGroup, String principalName) {
270259
super(
271260
tableMetricGroup.registry,
272261
makeScope(tableMetricGroup, principalName),
273262
tableMetricGroup);
274263
this.principalName = principalName;
275-
messagesIn = new ThreadSafeSimpleCounter();
276-
meter(MetricNames.MESSAGES_IN_RATE, new MeterView(messagesIn));
277264
bytesIn = new ThreadSafeSimpleCounter();
278265
meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
279266
bytesOut = new ThreadSafeSimpleCounter();

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@
6060
import org.apache.fluss.server.coordinator.CoordinatorContext;
6161
import org.apache.fluss.server.entity.FetchReqInfo;
6262
import org.apache.fluss.server.entity.LakeBucketOffset;
63-
import org.apache.fluss.server.entity.LogUserContext;
6463
import org.apache.fluss.server.entity.NotifyKvSnapshotOffsetData;
6564
import org.apache.fluss.server.entity.NotifyLakeTableOffsetData;
6665
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
6766
import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
6867
import org.apache.fluss.server.entity.NotifyRemoteLogOffsetsData;
6968
import org.apache.fluss.server.entity.StopReplicaData;
7069
import org.apache.fluss.server.entity.StopReplicaResultForBucket;
70+
import org.apache.fluss.server.entity.UserContext;
7171
import org.apache.fluss.server.kv.KvManager;
7272
import org.apache.fluss.server.kv.KvSnapshotResource;
7373
import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter;
@@ -451,7 +451,7 @@ public void appendRecordsToLog(
451451
int timeoutMs,
452452
int requiredAcks,
453453
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
454-
LogUserContext userContext,
454+
UserContext userContext,
455455
Consumer<List<ProduceLogResultForBucket>> responseCallback) {
456456
if (isRequiredAcksInvalid(requiredAcks)) {
457457
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
@@ -467,14 +467,6 @@ public void appendRecordsToLog(
467467
timeoutMs, requiredAcks, entriesPerBucket.size(), appendResult, responseCallback);
468468
}
469469

470-
public void appendRecordsToLog(
471-
int timeoutMs,
472-
int requiredAcks,
473-
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
474-
Consumer<List<ProduceLogResultForBucket>> responseCallback) {
475-
appendRecordsToLog(timeoutMs, requiredAcks, entriesPerBucket, null, responseCallback);
476-
}
477-
478470
/**
479471
* Fetch records from a replica. Currently, we will return the fetched records immediately.
480472
*
@@ -484,7 +476,7 @@ public void appendRecordsToLog(
484476
public void fetchLogRecords(
485477
FetchParams params,
486478
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
487-
LogUserContext userContext,
479+
UserContext userContext,
488480
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
489481
long startTime = System.currentTimeMillis();
490482
Map<TableBucket, LogReadResult> logReadResults =
@@ -500,13 +492,6 @@ public void fetchLogRecords(
500492
params, bucketFetchInfo, logReadResults, userContext, responseCallback);
501493
}
502494

503-
public void fetchLogRecords(
504-
FetchParams params,
505-
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
506-
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
507-
fetchLogRecords(params, bucketFetchInfo, null, responseCallback);
508-
}
509-
510495
/**
511496
* Put kv records to leader replicas of the buckets, the kv data will write to kv tablet and the
512497
* response callback need to wait for the cdc log to be replicated to other replicas if needed.
@@ -516,7 +501,7 @@ public void putRecordsToKv(
516501
int requiredAcks,
517502
Map<TableBucket, KvRecordBatch> entriesPerBucket,
518503
@Nullable int[] targetColumns,
519-
LogUserContext userContext,
504+
UserContext userContext,
520505
Consumer<List<PutKvResultForBucket>> responseCallback) {
521506
if (isRequiredAcksInvalid(requiredAcks)) {
522507
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
@@ -535,16 +520,6 @@ public void putRecordsToKv(
535520
timeoutMs, requiredAcks, entriesPerBucket.size(), kvPutResult, responseCallback);
536521
}
537522

538-
public void putRecordsToKv(
539-
int timeoutMs,
540-
int requiredAcks,
541-
Map<TableBucket, KvRecordBatch> entriesPerBucket,
542-
@Nullable int[] targetColumns,
543-
Consumer<List<PutKvResultForBucket>> responseCallback) {
544-
putRecordsToKv(
545-
timeoutMs, requiredAcks, entriesPerBucket, targetColumns, null, responseCallback);
546-
}
547-
548523
/** Lookup a single key value. */
549524
@VisibleForTesting
550525
protected void lookup(TableBucket tableBucket, byte[] key, Consumer<byte[]> responseCallback) {
@@ -960,7 +935,7 @@ private void addFetcherForReplicas(
960935
private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
961936
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
962937
int requiredAcks,
963-
LogUserContext userContext) {
938+
UserContext userContext) {
964939
Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new HashMap<>();
965940
for (Map.Entry<TableBucket, MemoryLogRecords> entry : entriesPerBucket.entrySet()) {
966941
TableBucket tb = entry.getKey();
@@ -984,7 +959,7 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
984959
tb,
985960
new ProduceLogResultForBucket(tb, baseOffset, appendInfo.lastOffset() + 1));
986961
tableMetrics.incLogBytesIn(appendInfo.validBytes(), userContext);
987-
tableMetrics.incLogMessageIn(appendInfo.numMessages(), userContext);
962+
tableMetrics.incLogMessageIn(appendInfo.numMessages());
988963
} catch (Exception e) {
989964
if (isUnexpectedException(e)) {
990965
LOG.error("Error append records to local log on replica {}", tb, e);
@@ -1007,7 +982,7 @@ private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
1007982
Map<TableBucket, KvRecordBatch> entriesPerBucket,
1008983
@Nullable int[] targetColumns,
1009984
int requiredAcks,
1010-
LogUserContext userContext) {
985+
UserContext userContext) {
1011986
Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new HashMap<>();
1012987
for (Map.Entry<TableBucket, KvRecordBatch> entry : entriesPerBucket.entrySet()) {
1013988
TableBucket tb = entry.getKey();
@@ -1032,7 +1007,7 @@ private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
10321007
tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes());
10331008
// metric for cdc log of kv
10341009
tableMetrics.incLogBytesIn(appendInfo.validBytes(), userContext);
1035-
tableMetrics.incLogMessageIn(appendInfo.numMessages(), userContext);
1010+
tableMetrics.incLogMessageIn(appendInfo.numMessages());
10361011
} catch (Exception e) {
10371012
if (isUnexpectedException(e)) {
10381013
LOG.error("Error put records to local kv on replica {}", tb, e);
@@ -1087,7 +1062,7 @@ public void limitScan(
10871062
public Map<TableBucket, LogReadResult> readFromLog(
10881063
FetchParams fetchParams,
10891064
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
1090-
LogUserContext userContext) {
1065+
UserContext userContext) {
10911066
Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
10921067
boolean isFromFollower = fetchParams.isFromFollower();
10931068
int limitBytes = fetchParams.maxFetchBytes();
@@ -1335,7 +1310,7 @@ private void maybeAddDelayedFetchLog(
13351310
FetchParams params,
13361311
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
13371312
Map<TableBucket, LogReadResult> logReadResults,
1338-
LogUserContext userContext,
1313+
UserContext userContext,
13391314
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
13401315
long bytesReadable = 0;
13411316
boolean errorReadingData = false;

fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
2929
import org.apache.fluss.rpc.messages.FetchLogRequest;
3030
import org.apache.fluss.server.entity.FetchReqInfo;
31-
import org.apache.fluss.server.entity.LogUserContext;
31+
import org.apache.fluss.server.entity.UserContext;
3232
import org.apache.fluss.server.log.FetchIsolation;
3333
import org.apache.fluss.server.log.FetchParams;
3434
import org.apache.fluss.server.log.LogOffsetMetadata;
@@ -60,15 +60,15 @@ public class DelayedFetchLog extends DelayedOperation {
6060
private final Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap;
6161
private final Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback;
6262
private final TabletServerMetricGroup serverMetricGroup;
63-
private final LogUserContext userContext;
63+
private final UserContext userContext;
6464

6565
public DelayedFetchLog(
6666
FetchParams params,
6767
ReplicaManager replicaManager,
6868
Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap,
6969
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback,
7070
TabletServerMetricGroup serverMetricGroup,
71-
LogUserContext userContext) {
71+
UserContext userContext) {
7272
super(params.maxWaitMs());
7373
this.params = params;
7474
this.replicaManager = replicaManager;

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@
6969
import org.apache.fluss.server.authorizer.Authorizer;
7070
import org.apache.fluss.server.coordinator.MetadataManager;
7171
import org.apache.fluss.server.entity.FetchReqInfo;
72-
import org.apache.fluss.server.entity.LogUserContext;
7372
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
73+
import org.apache.fluss.server.entity.UserContext;
7474
import org.apache.fluss.server.log.FetchParams;
7575
import org.apache.fluss.server.log.ListOffsetsParam;
7676
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
@@ -167,7 +167,7 @@ public CompletableFuture<ProduceLogResponse> produceLog(ProduceLogRequest reques
167167
request.getTimeoutMs(),
168168
request.getAcks(),
169169
produceLogData,
170-
new LogUserContext(currentSession().getPrincipal()),
170+
new UserContext(currentSession().getPrincipal()),
171171
bucketResponseMap -> response.complete(makeProduceLogResponse(bucketResponseMap)));
172172
return response;
173173
}
@@ -192,7 +192,7 @@ public CompletableFuture<FetchLogResponse> fetchLog(FetchLogRequest request) {
192192
replicaManager.fetchLogRecords(
193193
fetchParams,
194194
interesting,
195-
new LogUserContext(currentSession().getPrincipal()),
195+
new UserContext(currentSession().getPrincipal()),
196196
fetchResponseMap ->
197197
response.complete(
198198
makeFetchLogResponse(fetchResponseMap, errorResponseMap)));
@@ -227,7 +227,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
227227
request.getAcks(),
228228
putKvData,
229229
getTargetColumns(request),
230-
new LogUserContext(currentSession().getPrincipal()),
230+
new UserContext(currentSession().getPrincipal()),
231231
bucketResponse -> response.complete(makePutKvResponse(bucketResponse)));
232232
return response;
233233
}

fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void remoteLogMiscTest() throws Exception {
133133
.fetchLogRecords(
134134
new FetchParams(-1, Integer.MAX_VALUE),
135135
Collections.singletonMap(tb, new FetchReqInfo(tableId, 0, 10240)),
136+
null,
136137
future::complete);
137138

138139
Map<TableBucket, FetchLogResultForBucket> result = future.get();

fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ void testFetchRecordsFromRemote(boolean partitionTable) throws Exception {
322322
replicaManager.fetchLogRecords(
323323
new FetchParams(-1, Integer.MAX_VALUE),
324324
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 0L, 1024 * 1024)),
325+
null,
325326
future::complete);
326327
Map<TableBucket, FetchLogResultForBucket> result = future.get();
327328
assertThat(result.size()).isEqualTo(1);
@@ -341,6 +342,7 @@ void testFetchRecordsFromRemote(boolean partitionTable) throws Exception {
341342
replicaManager.fetchLogRecords(
342343
new FetchParams(-1, Integer.MAX_VALUE),
343344
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 47, 1024 * 1024)),
345+
null,
344346
future::complete);
345347
result = future.get();
346348
assertThat(result.size()).isEqualTo(1);
@@ -379,6 +381,7 @@ void testCleanupLocalSegments(boolean partitionTable) throws Exception {
379381
replicaManager.fetchLogRecords(
380382
new FetchParams(-1, Integer.MAX_VALUE),
381383
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 0L, 1024 * 1024)),
384+
null,
382385
future::complete);
383386
Map<TableBucket, FetchLogResultForBucket> result = future.get();
384387
assertThat(result.size()).isEqualTo(1);
@@ -398,6 +401,7 @@ void testCleanupLocalSegments(boolean partitionTable) throws Exception {
398401
replicaManager.fetchLogRecords(
399402
new FetchParams(-1, Integer.MAX_VALUE),
400403
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 35, 1024 * 1024)),
404+
null,
401405
future::complete);
402406
result = future.get();
403407
assertThat(result.size()).isEqualTo(1);
@@ -449,6 +453,7 @@ void testConfigureTieredLogLocalSegments(boolean partitionedTable) throws Except
449453
replicaManager.fetchLogRecords(
450454
new FetchParams(-1, Integer.MAX_VALUE),
451455
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 20L, 1024 * 1024)),
456+
null,
452457
future::complete);
453458
Map<TableBucket, FetchLogResultForBucket> result = future.get();
454459
assertThat(result.get(tb).fetchFromRemote()).isFalse();
@@ -459,6 +464,7 @@ void testConfigureTieredLogLocalSegments(boolean partitionedTable) throws Except
459464
replicaManager.fetchLogRecords(
460465
new FetchParams(-1, Integer.MAX_VALUE),
461466
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 0, 1024 * 1024)),
467+
null,
462468
future::complete);
463469
result = future.get();
464470
assertThat(result.get(tb).fetchFromRemote()).isTrue();

fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception {
8282
replicaManager.fetchLogRecords(
8383
new FetchParams(-1, Integer.MAX_VALUE),
8484
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 0L, 1024 * 1024)),
85+
null,
8586
future::complete);
8687
Map<TableBucket, FetchLogResultForBucket> result = future.get();
8788
assertThat(result.size()).isEqualTo(1);

fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void testExpandIsr() throws Exception {
6767
20000,
6868
1,
6969
Collections.singletonMap(tb, genMemoryLogRecordsByObject(DATA1)),
70+
null,
7071
future::complete);
7172
assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L));
7273

@@ -77,6 +78,7 @@ void testExpandIsr() throws Exception {
7778
2, (int) conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()),
7879
Collections.singletonMap(
7980
tb, new FetchReqInfo(tb.getTableId(), 10L, Integer.MAX_VALUE)),
81+
null,
8082
result -> {});
8183
retry(
8284
Duration.ofSeconds(20),
@@ -92,6 +94,7 @@ tb, new FetchReqInfo(tb.getTableId(), 10L, Integer.MAX_VALUE)),
9294
3, (int) conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()),
9395
Collections.singletonMap(
9496
tb, new FetchReqInfo(tb.getTableId(), 10L, Integer.MAX_VALUE)),
97+
null,
9598
result -> {});
9699
retry(
97100
Duration.ofSeconds(20),

0 commit comments

Comments
 (0)