Skip to content

Commit 5095236

Browse files
authored
[metrics] Adding Server-Level Storage Aggregation Metrics (#1548)
1 parent 8449d47 commit 5095236

File tree

17 files changed

+491
-54
lines changed

17 files changed

+491
-54
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public class MetricNames {
4747
public static final String EVENT_QUEUE_TIME_MS = "eventQueueTimeMs";
4848
public static final String EVENT_PROCESSING_TIME_MS = "eventProcessingTimeMs";
4949

50+
// for kv tablet which reported by coordinator
51+
public static final String KV_NUM_SNAPSHOTS = "numKvSnapshots";
52+
public static final String KV_ALL_SNAPSHOT_SIZE = "allKvSnapshotSize";
53+
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize";
54+
5055
// --------------------------------------------------------------------------------------------
5156
// metrics for tablet server
5257
// --------------------------------------------------------------------------------------------
@@ -63,6 +68,11 @@ public class MetricNames {
6368
public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
6469
"delayedFetchFromClientExpiresPerSecond";
6570

71+
public static final String SERVER_LOGICAL_STORAGE_LOG_SIZE = "logSize";
72+
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
73+
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
74+
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
75+
6676
// --------------------------------------------------------------------------------------------
6777
// metrics for table
6878
// --------------------------------------------------------------------------------------------
@@ -121,10 +131,11 @@ public class MetricNames {
121131
// for log tablet
122132
public static final String LOG_NUM_SEGMENTS = "numSegments";
123133
public static final String LOG_END_OFFSET = "endOffset";
124-
public static final String LOG_SIZE = "size";
134+
public static final String REMOTE_LOG_SIZE = "size";
125135

126-
// for kv tablet
127-
public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
136+
// for logic storage
137+
public static final String LOCAL_STORAGE_LOG_SIZE = "logSize";
138+
public static final String LOCAL_STORAGE_KV_SIZE = "kvSize";
128139

129140
// --------------------------------------------------------------------------------------------
130141
// metrics for rpc client

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TablePath;
23+
import org.apache.fluss.metrics.MetricNames;
24+
import org.apache.fluss.metrics.groups.MetricGroup;
2225
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
2326
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
2427
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore;
2528
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
2629
import org.apache.fluss.server.kv.snapshot.SharedKvFileRegistry;
2730
import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
31+
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
2832
import org.apache.fluss.server.zk.ZooKeeperClient;
2933
import org.apache.fluss.utils.MapUtils;
3034

@@ -60,18 +64,19 @@ public class CompletedSnapshotStoreManager {
6064
private final Executor ioExecutor;
6165
private final Function<ZooKeeperClient, CompletedSnapshotHandleStore>
6266
makeZookeeperCompletedSnapshotHandleStore;
67+
private final CoordinatorMetricGroup coordinatorMetricGroup;
6368

6469
public CompletedSnapshotStoreManager(
6570
int maxNumberOfSnapshotsToRetain,
6671
Executor ioExecutor,
67-
ZooKeeperClient zooKeeperClient) {
68-
checkArgument(
69-
maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive");
70-
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
71-
this.zooKeeperClient = zooKeeperClient;
72-
this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
73-
this.ioExecutor = ioExecutor;
74-
this.makeZookeeperCompletedSnapshotHandleStore = ZooKeeperCompletedSnapshotHandleStore::new;
72+
ZooKeeperClient zooKeeperClient,
73+
CoordinatorMetricGroup coordinatorMetricGroup) {
74+
this(
75+
maxNumberOfSnapshotsToRetain,
76+
ioExecutor,
77+
zooKeeperClient,
78+
ZooKeeperCompletedSnapshotHandleStore::new,
79+
coordinatorMetricGroup);
7580
}
7681

7782
@VisibleForTesting
@@ -80,17 +85,43 @@ public CompletedSnapshotStoreManager(
8085
Executor ioExecutor,
8186
ZooKeeperClient zooKeeperClient,
8287
Function<ZooKeeperClient, CompletedSnapshotHandleStore>
83-
makeZookeeperCompletedSnapshotHandleStore) {
88+
makeZookeeperCompletedSnapshotHandleStore,
89+
CoordinatorMetricGroup coordinatorMetricGroup) {
8490
checkArgument(
8591
maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive");
8692
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
8793
this.zooKeeperClient = zooKeeperClient;
8894
this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
8995
this.ioExecutor = ioExecutor;
9096
this.makeZookeeperCompletedSnapshotHandleStore = makeZookeeperCompletedSnapshotHandleStore;
97+
this.coordinatorMetricGroup = coordinatorMetricGroup;
98+
99+
registerMetrics();
91100
}
92101

93-
public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(TableBucket tableBucket) {
102+
private void registerMetrics() {
103+
MetricGroup physicalStorage = coordinatorMetricGroup.addGroup("physicalStorage");
104+
physicalStorage.gauge(
105+
MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE,
106+
this::physicalStorageRemoteKvSize);
107+
}
108+
109+
private long physicalStorageRemoteKvSize() {
110+
return bucketCompletedSnapshotStores.values().stream()
111+
.map(CompletedSnapshotStore::getPhysicalStorageRemoteKvSize)
112+
.reduce(0L, Long::sum);
113+
}
114+
115+
private long getNumSnapshots(TableBucket tableBucket) {
116+
return bucketCompletedSnapshotStores.get(tableBucket).getNumSnapshots();
117+
}
118+
119+
private long getAllSnapshotSize(TableBucket tableBucket) {
120+
return bucketCompletedSnapshotStores.get(tableBucket).getPhysicalStorageRemoteKvSize();
121+
}
122+
123+
public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(
124+
TablePath tablePath, TableBucket tableBucket) {
94125
return bucketCompletedSnapshotStores.computeIfAbsent(
95126
tableBucket,
96127
(bucket) -> {
@@ -104,6 +135,22 @@ public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(TableBucket tabl
104135
"Created snapshot store for table bucket {} in {} ms.",
105136
bucket,
106137
end - start);
138+
139+
MetricGroup bucketMetricGroup =
140+
coordinatorMetricGroup.getTableBucketMetricGroup(
141+
tablePath, tableBucket);
142+
if (bucketMetricGroup != null) {
143+
LOG.info("Add bucketMetricGroup for tableBucket {}.", bucket);
144+
bucketMetricGroup.gauge(
145+
MetricNames.KV_NUM_SNAPSHOTS, () -> getNumSnapshots(bucket));
146+
bucketMetricGroup.gauge(
147+
MetricNames.KV_ALL_SNAPSHOT_SIZE,
148+
() -> getAllSnapshotSize(bucket));
149+
} else {
150+
LOG.warn(
151+
"Failed to add bucketMetricGroup for tableBucket {} when creating completed snapshot.",
152+
bucket);
153+
}
107154
return snapshotStore;
108155
} catch (Exception e) {
109156
throw new RuntimeException(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ public CoordinatorEventProcessor(
188188
new CompletedSnapshotStoreManager(
189189
conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS),
190190
ioExecutor,
191-
zooKeeperClient);
191+
zooKeeperClient,
192+
coordinatorMetricGroup);
192193
this.autoPartitionManager = autoPartitionManager;
193194
this.lakeTableTieringManager = lakeTableTieringManager;
194195
this.coordinatorMetricGroup = coordinatorMetricGroup;
@@ -456,6 +457,20 @@ private void loadAssignment(
456457
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
457458
}
458459
}
460+
461+
// register table/bucket metrics when initialing context.
462+
TablePath tablePath = coordinatorContext.getTablePathById(tableId);
463+
if (tablePath != null) {
464+
coordinatorMetricGroup.addTableBucketMetricGroup(
465+
PhysicalTablePath.of(
466+
tablePath,
467+
partitionId == null
468+
? null
469+
: coordinatorContext.getPartitionName(partitionId)),
470+
tableId,
471+
partitionId,
472+
tableAssignment.getBucketAssignments().keySet());
473+
}
459474
}
460475

461476
private void onShutdown() {
@@ -529,10 +544,10 @@ private void processCreateTable(CreateTableEvent createTableEvent) {
529544
return;
530545
}
531546
TableInfo tableInfo = createTableEvent.getTableInfo();
547+
TablePath tablePath = tableInfo.getTablePath();
532548
coordinatorContext.putTableInfo(tableInfo);
533549
TableAssignment tableAssignment = createTableEvent.getTableAssignment();
534-
tableManager.onCreateNewTable(
535-
tableInfo.getTablePath(), tableInfo.getTableId(), tableAssignment);
550+
tableManager.onCreateNewTable(tablePath, tableInfo.getTableId(), tableAssignment);
536551
if (createTableEvent.isAutoPartitionTable()) {
537552
autoPartitionManager.addAutoPartitionTable(tableInfo, true);
538553
}
@@ -551,6 +566,14 @@ private void processCreateTable(CreateTableEvent createTableEvent) {
551566
null,
552567
null,
553568
tableBuckets);
569+
570+
// register table metrics.
571+
coordinatorMetricGroup.addTableBucketMetricGroup(
572+
PhysicalTablePath.of(tablePath),
573+
tableId,
574+
null,
575+
tableAssignment.getBucketAssignments().keySet());
576+
554577
} else {
555578
updateTabletServerMetadataCache(
556579
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -568,10 +591,11 @@ private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
568591
}
569592

570593
long tableId = createPartitionEvent.getTableId();
594+
TablePath tablePath = createPartitionEvent.getTablePath();
571595
String partitionName = createPartitionEvent.getPartitionName();
572596
PartitionAssignment partitionAssignment = createPartitionEvent.getPartitionAssignment();
573597
tableManager.onCreateNewPartition(
574-
createPartitionEvent.getTablePath(),
598+
tablePath,
575599
tableId,
576600
createPartitionEvent.getPartitionId(),
577601
partitionName,
@@ -585,6 +609,14 @@ private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
585609
.forEach(
586610
bucketId ->
587611
tableBuckets.add(new TableBucket(tableId, partitionId, bucketId)));
612+
613+
// register partition metrics.
614+
coordinatorMetricGroup.addTableBucketMetricGroup(
615+
PhysicalTablePath.of(tablePath, partitionName),
616+
tableId,
617+
partitionId,
618+
partitionAssignment.getBucketAssignments().keySet());
619+
588620
updateTabletServerMetadataCache(
589621
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
590622
null,
@@ -617,6 +649,9 @@ private void processDropTable(DropTableEvent dropTableEvent) {
617649
tableId,
618650
null,
619651
Collections.emptySet());
652+
653+
// remove table metrics.
654+
coordinatorMetricGroup.removeTableMetricGroup(dropTableInfo.getTablePath(), tableId);
620655
}
621656

622657
private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
@@ -644,6 +679,10 @@ private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
644679
tableId,
645680
tablePartition.getPartitionId(),
646681
Collections.emptySet());
682+
683+
// remove partition metrics.
684+
coordinatorMetricGroup.removeTablePartitionMetricsGroup(
685+
dropTableInfo.getTablePath(), tableId, tablePartition.getPartitionId());
647686
}
648687

649688
private void processDeleteReplicaResponseReceived(
@@ -990,15 +1029,17 @@ private void tryProcessCommitKvSnapshot(
9901029
return;
9911030
}
9921031
// commit the kv snapshot asynchronously
1032+
TableBucket tb = event.getTableBucket();
1033+
TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId());
9931034
ioExecutor.execute(
9941035
() -> {
9951036
try {
996-
TableBucket tb = event.getTableBucket();
9971037
CompletedSnapshot completedSnapshot =
9981038
event.getAddCompletedSnapshotData().getCompletedSnapshot();
9991039
// add completed snapshot
10001040
CompletedSnapshotStore completedSnapshotStore =
1001-
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
1041+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
1042+
tablePath, tb);
10021043
// this involves IO operation (ZK), so we do it in ioExecutor
10031044
completedSnapshotStore.add(completedSnapshot);
10041045
coordinatorEventManager.put(

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ public void add(final CompletedSnapshot completedSnapshot) throws Exception {
8787
addSnapshotAndSubsumeOldestOne(completedSnapshot, snapshotsCleaner, () -> {});
8888
}
8989

90+
public long getPhysicalStorageRemoteKvSize() {
91+
return sharedKvFileRegistry.getFileSize();
92+
}
93+
94+
public long getNumSnapshots() {
95+
return completedSnapshots.size();
96+
}
97+
9098
/**
9199
* Synchronously writes the new snapshots to snapshot handle store and asynchronously removes
92100
* older ones.

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.fluss.fs.FileSystemSafetyNet;
2222
import org.apache.fluss.fs.FsPath;
2323
import org.apache.fluss.metadata.TableBucket;
24-
import org.apache.fluss.metrics.MetricNames;
25-
import org.apache.fluss.metrics.groups.MetricGroup;
2624
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
2725
import org.apache.fluss.utils.MathUtils;
2826
import org.apache.fluss.utils.concurrent.Executors;
@@ -124,8 +122,6 @@ protected PeriodicSnapshotManager(
124122
periodicSnapshotDelay > 0
125123
? MathUtils.murmurHash(tableBucket.hashCode()) % periodicSnapshotDelay
126124
: 0;
127-
128-
registerMetrics(bucketMetricGroup);
129125
}
130126

131127
public static PeriodicSnapshotManager create(
@@ -156,9 +152,8 @@ public void start() {
156152
}
157153
}
158154

159-
private void registerMetrics(BucketMetricGroup bucketMetricGroup) {
160-
MetricGroup metricGroup = bucketMetricGroup.addGroup("kv").addGroup("snapshot");
161-
metricGroup.gauge(MetricNames.KV_LATEST_SNAPSHOT_SIZE, target::getSnapshotSize);
155+
public long getSnapshotSize() {
156+
return target.getSnapshotSize();
162157
}
163158

164159
// schedule thread and asyncOperationsThreadPool can access this method

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class SharedKvFileRegistry implements AutoCloseable {
5353
/** This flag indicates whether or not the registry is open or if close() was called. */
5454
private boolean open;
5555

56+
/** The total size of all kv files registered in this registry. */
57+
private volatile long fileSize;
58+
5659
/** Executor for async kv deletion. */
5760
private final Executor asyncDisposalExecutor;
5861

@@ -64,6 +67,11 @@ public SharedKvFileRegistry(Executor asyncDisposalExecutor) {
6467
this.registeredKvEntries = new HashMap<>();
6568
this.asyncDisposalExecutor = checkNotNull(asyncDisposalExecutor);
6669
this.open = true;
70+
this.fileSize = 0L;
71+
}
72+
73+
public long getFileSize() {
74+
return fileSize;
6775
}
6876

6977
public KvFileHandle registerReference(
@@ -87,6 +95,7 @@ public KvFileHandle registerReference(
8795
LOG.trace("Registered new kv file {} under key {}.", newHandle, registrationKey);
8896
entry = new SharedKvEntry(newHandle, snapshotID);
8997
registeredKvEntries.put(registrationKey, entry);
98+
fileSize += newHandle.getSize();
9099

91100
// no further handling
92101
return entry.kvFileHandle;
@@ -134,6 +143,7 @@ public void unregisterUnusedKvFile(long lowestSnapshotID) {
134143
if (entry.lastUsedSnapshotID < lowestSnapshotID) {
135144
subsumed.add(entry.kvFileHandle);
136145
it.remove();
146+
fileSize -= entry.kvFileHandle.getSize();
137147
}
138148
}
139149
}

0 commit comments

Comments
 (0)