Skip to content

Commit 7ffcbf0

Browse files
authored
[lake] Introduce lake lag metrics for datalake enabled table (#1729)
1 parent ee33fa2 commit 7ffcbf0

File tree

3 files changed

+38
-0
lines changed

3 files changed

+38
-0
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ public class MetricNames {
133133
public static final String LOG_NUM_SEGMENTS = "numSegments";
134134
public static final String LOG_END_OFFSET = "endOffset";
135135
public static final String REMOTE_LOG_SIZE = "size";
136+
public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords";
137+
public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag";
136138

137139
// for logic storage
138140
public static final String LOCAL_STORAGE_LOG_SIZE = "logSize";

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ public final class Replica {
201201
private Counter isrExpands;
202202
private Counter failedIsrUpdates;
203203

204+
private MetricGroup lakeTieringMetricGroup;
205+
204206
public Replica(
205207
PhysicalTablePath physicalPath,
206208
TableBucket tableBucket,
@@ -536,6 +538,10 @@ public LogOffsetSnapshot fetchOffsetSnapshot(boolean fetchOnlyFromLeader) throws
536538
private void onBecomeNewLeader() {
537539
updateLeaderEndOffsetSnapshot();
538540

541+
if (isDataLakeEnabled()) {
542+
registerLakeTieringMetrics();
543+
}
544+
539545
if (isKvTable()) {
540546
// if it's become new leader, we must
541547
// first destroy the old kv tablet
@@ -546,11 +552,30 @@ private void onBecomeNewLeader() {
546552
}
547553
}
548554

555+
private void registerLakeTieringMetrics() {
556+
lakeTieringMetricGroup = bucketMetricGroup.addGroup("lakeTiering");
557+
lakeTieringMetricGroup.gauge(
558+
MetricNames.LOG_LAKE_PENDING_RECORDS,
559+
() ->
560+
getLakeLogEndOffset() < 0L
561+
? -1
562+
: getLogHighWatermark() - getLakeLogEndOffset());
563+
lakeTieringMetricGroup.gauge(
564+
MetricNames.LOG_LAKE_TIMESTAMP_LAG,
565+
() ->
566+
logTablet.getLakeMaxTimestamp() < 0L
567+
? -1
568+
: logTablet.localMaxTimestamp() - logTablet.getLakeMaxTimestamp());
569+
}
570+
549571
private void onBecomeNewFollower() {
550572
if (isKvTable()) {
551573
// it should be from leader to follower, we need to destroy the kv tablet
552574
dropKv();
553575
}
576+
if (lakeTieringMetricGroup != null) {
577+
lakeTieringMetricGroup.close();
578+
}
554579
}
555580

556581
@VisibleForTesting

website/docs/maintenance/observability/monitor-metrics.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,17 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
762762
<td>endOffset</td>
763763
<td>The end offset in local storage for this table bucket.</td>
764764
<td>Gauge</td>
765+
</tr>
766+
<tr>
767+
<td rowspan="2">table_bucket_lakeTiering</td>
768+
<td>pendingRecords</td>
769+
<td>The number of records lag between local log and remote log for this table bucket.</td>
770+
<td>Gauge</td>
771+
</tr>
772+
<tr>
773+
<td>timestampLag</td>
774+
<td>The timestamp lag between local log and remote log for this table bucket in milliseconds.</td>
775+
<td>Gauge</td>
765776
</tr>
766777
<tr>
767778
<td rowspan="3">table_bucket_remoteLog</td>

0 commit comments

Comments
 (0)