Skip to content

Commit 03313a9

Browse files
authored
[metric] Change some metrics from bucket-level to table-level and server-level (apache#1531)
1 parent cb11779 commit 03313a9

File tree

24 files changed

+474
-284
lines changed

24 files changed

+474
-284
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,35 +94,37 @@ public class MetricNames {
9494
public static final String FAILED_PREFIX_LOOKUP_REQUESTS_RATE =
9595
"failedPrefixLookupRequestsPerSecond";
9696

97-
// --------------------------------------------------------------------------------------------
98-
// metrics for table bucket
99-
// --------------------------------------------------------------------------------------------
100-
10197
// for replica
10298
public static final String UNDER_REPLICATED = "underReplicated";
103-
public static final String IN_SYNC_REPLICAS = "inSyncReplicasCount";
10499
public static final String UNDER_MIN_ISR = "underMinIsr";
105100
public static final String AT_MIN_ISR = "atMinIsr";
106101
public static final String ISR_EXPANDS_RATE = "isrExpandsPerSecond";
107102
public static final String ISR_SHRINKS_RATE = "isrShrinksPerSecond";
108103
public static final String FAILED_ISR_UPDATES_RATE = "failedIsrUpdatesPerSecond";
109104

110105
// for log tablet
111-
public static final String LOG_NUM_SEGMENTS = "numSegments";
112-
public static final String LOG_END_OFFSET = "endOffset";
113-
public static final String LOG_SIZE = "size";
114-
public static final String LOG_FLUSH_RATE = "flushPerSecond";
115-
public static final String LOG_FLUSH_LATENCY_MS = "flushLatencyMs";
106+
public static final String LOG_FLUSH_RATE = "logFlushPerSecond";
107+
public static final String LOG_FLUSH_LATENCY_MS = "logFlushLatencyMs";
116108

117109
// for kv tablet
118-
public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
110+
public static final String KV_FLUSH_RATE = "kvFlushPerSecond";
111+
public static final String KV_FLUSH_LATENCY_MS = "kvFlushLatencyMs";
119112
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE =
120113
"preWriteBufferTruncateAsDuplicatedPerSecond";
121114
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
122115
"preWriteBufferTruncateAsErrorPerSecond";
123-
public static final String KV_PRE_WRITE_BUFFER_FLUSH_RATE = "preWriteBufferFlushPerSecond";
124-
public static final String KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS =
125-
"preWriteBufferFlushLatencyMs";
116+
117+
// --------------------------------------------------------------------------------------------
118+
// metrics for table bucket
119+
// --------------------------------------------------------------------------------------------
120+
121+
// for log tablet
122+
public static final String LOG_NUM_SEGMENTS = "numSegments";
123+
public static final String LOG_END_OFFSET = "endOffset";
124+
public static final String LOG_SIZE = "size";
125+
126+
// for kv tablet
127+
public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
126128

127129
// --------------------------------------------------------------------------------------------
128130
// metrics for rpc client

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.fluss.server.kv.rowmerger.RowMerger;
3737
import org.apache.fluss.server.log.LogManager;
3838
import org.apache.fluss.server.log.LogTablet;
39+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
3940
import org.apache.fluss.server.zk.ZooKeeperClient;
4041
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
4142
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
@@ -69,6 +70,8 @@ public final class KvManager extends TabletManagerBase {
6970
private static final Logger LOG = LoggerFactory.getLogger(KvManager.class);
7071
private final LogManager logManager;
7172

73+
private final TabletServerMetricGroup serverMetricGroup;
74+
7275
private final ZooKeeperClient zkClient;
7376

7477
private final Map<TableBucket, KvTablet> currentKvs = MapUtils.newConcurrentHashMap();
@@ -91,7 +94,8 @@ private KvManager(
9194
Configuration conf,
9295
ZooKeeperClient zkClient,
9396
int recoveryThreadsPerDataDir,
94-
LogManager logManager)
97+
LogManager logManager,
98+
TabletServerMetricGroup tabletServerMetricGroup)
9599
throws IOException {
96100
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
97101
this.logManager = logManager;
@@ -100,10 +104,14 @@ private KvManager(
100104
this.zkClient = zkClient;
101105
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
102106
this.remoteFileSystem = remoteKvDir.getFileSystem();
107+
this.serverMetricGroup = tabletServerMetricGroup;
103108
}
104109

105110
public static KvManager create(
106-
Configuration conf, ZooKeeperClient zkClient, LogManager logManager)
111+
Configuration conf,
112+
ZooKeeperClient zkClient,
113+
LogManager logManager,
114+
TabletServerMetricGroup tabletServerMetricGroup)
107115
throws IOException {
108116
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
109117
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -112,7 +120,8 @@ public static KvManager create(
112120
conf,
113121
zkClient,
114122
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
115-
logManager);
123+
logManager,
124+
tabletServerMetricGroup);
116125
}
117126

118127
public void startup() {
@@ -171,6 +180,7 @@ public KvTablet getOrCreateKv(
171180
logTablet,
172181
tabletDir,
173182
conf,
183+
serverMetricGroup,
174184
arrowBufferAllocator,
175185
memorySegmentPool,
176186
kvFormat,
@@ -279,6 +289,7 @@ public KvTablet loadKv(File tabletDir) throws Exception {
279289
logTablet,
280290
tabletDir,
281291
conf,
292+
serverMetricGroup,
282293
arrowBufferAllocator,
283294
memorySegmentPool,
284295
tableInfo.getTableConfig().getKvFormat(),

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
import org.apache.fluss.metadata.Schema;
3030
import org.apache.fluss.metadata.TableBucket;
3131
import org.apache.fluss.metadata.TablePath;
32-
import org.apache.fluss.metrics.MeterView;
33-
import org.apache.fluss.metrics.MetricNames;
34-
import org.apache.fluss.metrics.groups.MetricGroup;
3532
import org.apache.fluss.record.ChangeType;
3633
import org.apache.fluss.record.KvRecord;
3734
import org.apache.fluss.record.KvRecordBatch;
@@ -55,7 +52,7 @@
5552
import org.apache.fluss.server.kv.wal.WalBuilder;
5653
import org.apache.fluss.server.log.LogAppendInfo;
5754
import org.apache.fluss.server.log.LogTablet;
58-
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
55+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
5956
import org.apache.fluss.server.utils.FatalErrorHandler;
6057
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
6158
import org.apache.fluss.types.DataType;
@@ -124,6 +121,7 @@ private KvTablet(
124121
TableBucket tableBucket,
125122
LogTablet logTablet,
126123
File kvTabletDir,
124+
TabletServerMetricGroup serverMetricGroup,
127125
RocksDBKv rocksDBKv,
128126
long writeBatchSize,
129127
LogFormat logFormat,
@@ -139,7 +137,7 @@ private KvTablet(
139137
this.kvTabletDir = kvTabletDir;
140138
this.rocksDBKv = rocksDBKv;
141139
this.writeBatchSize = writeBatchSize;
142-
this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter());
140+
this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), serverMetricGroup);
143141
this.logFormat = logFormat;
144142
this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator);
145143
this.memorySegmentPool = memorySegmentPool;
@@ -153,6 +151,7 @@ public static KvTablet create(
153151
LogTablet logTablet,
154152
File kvTabletDir,
155153
Configuration serverConf,
154+
TabletServerMetricGroup serverMetricGroup,
156155
BufferAllocator arrowBufferAllocator,
157156
MemorySegmentPool memorySegmentPool,
158157
KvFormat kvFormat,
@@ -168,6 +167,7 @@ public static KvTablet create(
168167
logTablet,
169168
kvTabletDir,
170169
serverConf,
170+
serverMetricGroup,
171171
arrowBufferAllocator,
172172
memorySegmentPool,
173173
kvFormat,
@@ -182,6 +182,7 @@ public static KvTablet create(
182182
LogTablet logTablet,
183183
File kvTabletDir,
184184
Configuration serverConf,
185+
TabletServerMetricGroup serverMetricGroup,
185186
BufferAllocator arrowBufferAllocator,
186187
MemorySegmentPool memorySegmentPool,
187188
KvFormat kvFormat,
@@ -195,6 +196,7 @@ public static KvTablet create(
195196
tableBucket,
196197
logTablet,
197198
kvTabletDir,
199+
serverMetricGroup,
198200
kv,
199201
serverConf.get(ConfigOptions.KV_WRITE_BATCH_SIZE).getBytes(),
200202
logTablet.getLogFormat(),
@@ -243,24 +245,6 @@ public long getFlushedLogOffset() {
243245
return flushedLogOffset;
244246
}
245247

246-
public void registerMetrics(BucketMetricGroup bucketMetricGroup) {
247-
MetricGroup metricGroup = bucketMetricGroup.addGroup("kv");
248-
249-
// about pre-write buffer.
250-
metricGroup.meter(
251-
MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_RATE,
252-
new MeterView(kvPreWriteBuffer.getFlushCount()));
253-
metricGroup.histogram(
254-
MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS,
255-
kvPreWriteBuffer.getFlushLatencyHistogram());
256-
metricGroup.meter(
257-
MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
258-
new MeterView(kvPreWriteBuffer.getTruncateAsDuplicatedCount()));
259-
metricGroup.meter(
260-
MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE,
261-
new MeterView(kvPreWriteBuffer.getTruncateAsErrorCount()));
262-
}
263-
264248
/**
265249
* Put the KvRecordBatch into the kv storage, and return the appended wal log info.
266250
*

fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.memory.MemorySegment;
2222
import org.apache.fluss.metrics.Counter;
23-
import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
2423
import org.apache.fluss.metrics.Histogram;
25-
import org.apache.fluss.metrics.SimpleCounter;
2624
import org.apache.fluss.server.kv.KvBatchWriter;
25+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
2726
import org.apache.fluss.utils.MurmurHashUtils;
2827

2928
import javax.annotation.Nullable;
@@ -61,7 +60,7 @@
6160
* <li>Buffer all the key-value pairs that are waiting for the corresponding WAL to be persisted.
6261
* And flush these key-value pairs whose WAL has been persisted to underlying kv storage.
6362
* <li>A temporary in-memory key-value buffer for put/get a key. Since Fluss will lookup the
64-
* previous written data to generate CDC as WAL, it need a buffer to buffer the data been
63+
* previous written data to generate CDC as WAL, it needs a buffer to buffer the data been
6564
* written before but is still waiting for the WAL to be persisted before flush to underlying
6665
* kv storage.
6766
* </ol>
@@ -104,14 +103,14 @@ public class KvPreWriteBuffer implements AutoCloseable {
104103
// the max LSN in the buffer
105104
private long maxLogSequenceNumber = -1;
106105

107-
public KvPreWriteBuffer(KvBatchWriter kvBatchWriter) {
106+
public KvPreWriteBuffer(
107+
KvBatchWriter kvBatchWriter, TabletServerMetricGroup serverMetricGroup) {
108108
this.kvBatchWriter = kvBatchWriter;
109109

110-
flushCount = new SimpleCounter();
111-
// consider won't flush frequently, we set a small window size
112-
flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
113-
truncateAsDuplicatedCount = new SimpleCounter();
114-
truncateAsErrorCount = new SimpleCounter();
110+
flushCount = serverMetricGroup.kvFlushCount();
111+
flushLatencyHistogram = serverMetricGroup.kvFlushLatencyHistogram();
112+
truncateAsDuplicatedCount = serverMetricGroup.kvTruncateAsDuplicatedCount();
113+
truncateAsErrorCount = serverMetricGroup.kvTruncateAsErrorCount();
115114
}
116115

117116
/**

fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@
2525
import org.apache.fluss.metadata.LogFormat;
2626
import org.apache.fluss.metadata.TableBucket;
2727
import org.apache.fluss.metrics.Counter;
28-
import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
2928
import org.apache.fluss.metrics.Histogram;
30-
import org.apache.fluss.metrics.SimpleCounter;
3129
import org.apache.fluss.record.FileLogProjection;
3230
import org.apache.fluss.record.MemoryLogRecords;
31+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
3332
import org.apache.fluss.utils.FileUtils;
3433
import org.apache.fluss.utils.FlussPaths;
3534

@@ -90,6 +89,7 @@ public final class LocalLog {
9089
public LocalLog(
9190
File logTabletDir,
9291
Configuration config,
92+
TabletServerMetricGroup serverMetricGroup,
9393
LogSegments segments,
9494
long recoveryPoint,
9595
LogOffsetMetadata nextOffsetMetadata,
@@ -105,9 +105,8 @@ public LocalLog(
105105
this.logFormat = logFormat;
106106

107107
lastFlushedTime = new AtomicLong(System.currentTimeMillis());
108-
flushCount = new SimpleCounter();
109-
// consider won't flush frequently, we set a small window size
110-
flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
108+
flushCount = serverMetricGroup.logFlushCount();
109+
flushLatencyHistogram = serverMetricGroup.logFlushLatencyHistogram();
111110
localLogStartOffset = segments.isEmpty() ? 0L : segments.firstSegmentBaseOffset().get();
112111
localMaxTimestamp =
113112
segments.isEmpty() ? 0L : segments.lastSegment().get().maxTimestampSoFar();
@@ -125,14 +124,6 @@ long getRecoveryPoint() {
125124
return recoveryPoint;
126125
}
127126

128-
Histogram getFlushLatencyHistogram() {
129-
return flushLatencyHistogram;
130-
}
131-
132-
Counter getFlushCount() {
133-
return flushCount;
134-
}
135-
136127
/** The offset metadata of the next message that will be appended to the log. */
137128
@VisibleForTesting
138129
LogOffsetMetadata getLocalLogEndOffsetMetadata() {

fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.fluss.metadata.TablePath;
3030
import org.apache.fluss.server.TabletManagerBase;
3131
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
32+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
3233
import org.apache.fluss.server.zk.ZooKeeperClient;
3334
import org.apache.fluss.utils.FileUtils;
3435
import org.apache.fluss.utils.FlussPaths;
@@ -87,6 +88,7 @@ public final class LogManager extends TabletManagerBase {
8788
private final ZooKeeperClient zkClient;
8889
private final Scheduler scheduler;
8990
private final Clock clock;
91+
private final TabletServerMetricGroup serverMetricGroup;
9092
private final ReentrantLock logCreationOrDeletionLock = new ReentrantLock();
9193

9294
private final Map<TableBucket, LogTablet> currentLogs = MapUtils.newConcurrentHashMap();
@@ -100,19 +102,25 @@ private LogManager(
100102
ZooKeeperClient zkClient,
101103
int recoveryThreadsPerDataDir,
102104
Scheduler scheduler,
103-
Clock clock)
105+
Clock clock,
106+
TabletServerMetricGroup serverMetricGroup)
104107
throws Exception {
105108
super(TabletType.LOG, dataDir, conf, recoveryThreadsPerDataDir);
106109
this.zkClient = zkClient;
107110
this.scheduler = scheduler;
108111
this.clock = clock;
112+
this.serverMetricGroup = serverMetricGroup;
109113
createAndValidateDataDir(dataDir);
110114

111115
initializeCheckpointMaps();
112116
}
113117

114118
public static LogManager create(
115-
Configuration conf, ZooKeeperClient zkClient, Scheduler scheduler, Clock clock)
119+
Configuration conf,
120+
ZooKeeperClient zkClient,
121+
Scheduler scheduler,
122+
Clock clock,
123+
TabletServerMetricGroup serverMetricGroup)
116124
throws Exception {
117125
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
118126
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -122,7 +130,8 @@ public static LogManager create(
122130
zkClient,
123131
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
124132
scheduler,
125-
clock);
133+
clock,
134+
serverMetricGroup);
126135
}
127136

128137
public void startup() {
@@ -246,6 +255,7 @@ public LogTablet getOrCreateLog(
246255
tablePath,
247256
tabletDir,
248257
conf,
258+
serverMetricGroup,
249259
0L,
250260
scheduler,
251261
logFormat,
@@ -348,6 +358,7 @@ private LogTablet loadLog(
348358
physicalTablePath,
349359
tabletDir,
350360
conf,
361+
serverMetricGroup,
351362
logRecoveryPoint,
352363
scheduler,
353364
tableInfo.getTableConfig().getLogFormat(),

0 commit comments

Comments
 (0)