Skip to content

Commit 27112c8

Browse files
committed
Fix comments and notify the timestamp to the tablet server
1 parent b74cbc1 commit 27112c8

File tree

8 files changed

+59
-10
lines changed

8 files changed

+59
-10
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
8888
Long partitionId = partitionBucket.f0;
8989
if (partitionId == null) {
9090
tableBucket = new TableBucket(tableId, partitionBucket.f1);
91+
// we use -1 since we don't store timestamp in lake snapshot property for
92+
// simplicity, it may cause the timestamp to be -1 during constructing lake
93+
// snapshot to commit to Fluss.
94+
// But it should happen rarely and should be a normal value after next tiering.
9195
flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
9296
tableBucket, entry.getValue(), -1);
9397
} else {

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ message PbNotifyLakeTableOffsetReqForBucket {
474474
required int64 snapshot_id = 4;
475475
optional int64 log_start_offset = 5;
476476
optional int64 log_end_offset = 6;
477+
optional int64 max_timestamp = 7;
477478
}
478479

479480
message NotifyLakeTableOffsetResponse {

fluss-server/src/main/java/org/apache/fluss/server/entity/LakeBucketOffset.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ public class LakeBucketOffset {
2525

2626
private final Long logStartOffset;
2727
private final Long logEndOffset;
28+
private final Long maxTimestamp;
2829

29-
public LakeBucketOffset(long snapshotId, Long logStartOffset, Long logEndOffset) {
30+
public LakeBucketOffset(
31+
long snapshotId, Long logStartOffset, Long logEndOffset, Long maxTimestamp) {
3032
this.snapshotId = snapshotId;
3133
this.logStartOffset = logStartOffset;
3234
this.logEndOffset = logEndOffset;
35+
this.maxTimestamp = maxTimestamp;
3336
}
3437

3538
public long getSnapshotId() {
@@ -43,4 +46,8 @@ public Optional<Long> getLogStartOffset() {
4346
public Optional<Long> getLogEndOffset() {
4447
return Optional.ofNullable(logEndOffset);
4548
}
49+
50+
public Optional<Long> getMaxTimestamp() {
51+
return Optional.ofNullable(maxTimestamp);
52+
}
4653
}

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public final class LogTablet {
123123
// note: currently, for primary key table, the log start offset nerve be updated
124124
private volatile long lakeLogStartOffset = Long.MAX_VALUE;
125125
private volatile long lakeLogEndOffset = -1L;
126+
private volatile long lakeMaxTimestamp = -1;
126127

127128
private LogTablet(
128129
PhysicalTablePath physicalPath,
@@ -248,6 +249,10 @@ public long getLakeLogEndOffset() {
248249
return lakeLogEndOffset;
249250
}
250251

252+
public long getLakeMaxTimestamp() {
253+
return lakeMaxTimestamp;
254+
}
255+
251256
public int getWriterIdCount() {
252257
return writerStateManager.writerIdCount();
253258
}
@@ -507,6 +512,12 @@ public void updateLakeLogEndOffset(long lakeLogEndOffset) {
507512
}
508513
}
509514

515+
public void updateLakeMaxTimestamp(long lakeMaxTimestamp) {
516+
if (lakeMaxTimestamp > this.lakeMaxTimestamp) {
517+
this.lakeMaxTimestamp = lakeMaxTimestamp;
518+
}
519+
}
520+
510521
public void loadWriterSnapshot(long lastOffset) throws IOException {
511522
synchronized (lock) {
512523
rebuildWriterState(lastOffset, writerStateManager);

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,10 @@ public void notifyLakeTableOffset(
720720
.getLogEndOffset()
721721
.ifPresent(logTablet::updateLakeLogEndOffset);
722722

723+
lakeBucketOffset
724+
.getMaxTimestamp()
725+
.ifPresent(logTablet::updateLakeMaxTimestamp);
726+
723727
responseCallback.accept(new NotifyLakeTableOffsetResponse());
724728
}
725729
});
@@ -779,6 +783,10 @@ private void updateWithLakeTableSnapshot(Replica replica) throws Exception {
779783
lakeTableSnapshot
780784
.getLogEndOffset(tb)
781785
.ifPresent(replica.getLogTablet()::updateLakeLogEndOffset);
786+
787+
lakeTableSnapshot
788+
.getMaxTimestamp(tb)
789+
.ifPresent(replica.getLogTablet()::updateLakeMaxTimestamp);
782790
}
783791
}
784792

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1490,6 +1490,8 @@ public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBu
14901490

14911491
lakeTableSnapshot.getLogEndOffset(tableBucket).ifPresent(reqForBucket::setLogEndOffset);
14921492

1493+
lakeTableSnapshot.getMaxTimestamp(tableBucket).ifPresent(reqForBucket::setMaxTimestamp);
1494+
14931495
return reqForBucket;
14941496
}
14951497

@@ -1509,8 +1511,11 @@ public static NotifyLakeTableOffsetData getNotifyLakeTableOffset(
15091511
reqForBucket.hasLogStartOffset() ? reqForBucket.getLogStartOffset() : null;
15101512
Long logEndOffset =
15111513
reqForBucket.hasLogEndOffset() ? reqForBucket.getLogEndOffset() : null;
1514+
Long maxTimestamp =
1515+
reqForBucket.hasMaxTimestamp() ? reqForBucket.getMaxTimestamp() : null;
15121516
lakeBucketOffsetMap.put(
1513-
tableBucket, new LakeBucketOffset(snapshotId, logStartOffset, logEndOffset));
1517+
tableBucket,
1518+
new LakeBucketOffset(snapshotId, logStartOffset, logEndOffset, maxTimestamp));
15141519
}
15151520

15161521
return new NotifyLakeTableOffsetData(

fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ void testCommitDataLakeData() throws Exception {
131131
assertThat(logTablet.getLakeLogStartOffset())
132132
.isEqualTo(dataLakeLogStartOffset);
133133
assertThat(logTablet.getLakeLogEndOffset()).isEqualTo(dataLakeLogEndOffset);
134+
assertThat(logTablet.getLakeMaxTimestamp()).isEqualTo(dataLakeMaxTimestamp);
134135
});
135136
}
136137

fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,30 +43,37 @@ void testNotifyWithOutRemoteLog(boolean partitionedTable) throws Exception {
4343
Replica replica = replicaManager.getReplicaOrException(tb);
4444

4545
// now, notify lake table offset
46-
notifyAndVerify(tb, replica, 1, 0L, 20L);
46+
notifyAndVerify(tb, replica, 1, 0L, 20L, System.currentTimeMillis());
4747
// notify again
48-
notifyAndVerify(tb, replica, 2, 20L, 30L);
48+
notifyAndVerify(tb, replica, 2, 20L, 30L, System.currentTimeMillis());
4949
}
5050

5151
private void notifyAndVerify(
52-
TableBucket tb, Replica replica, long snapshotId, long startOffset, long endOffset)
52+
TableBucket tb,
53+
Replica replica,
54+
long snapshotId,
55+
long startOffset,
56+
long endOffset,
57+
long maxTimestamp)
5358
throws Exception {
5459
NotifyLakeTableOffsetData notifyLakeTableOffsetData =
55-
getNotifyLakeTableOffset(tb, snapshotId, startOffset, endOffset);
60+
getNotifyLakeTableOffset(tb, snapshotId, startOffset, endOffset, maxTimestamp);
5661
CompletableFuture<NotifyLakeTableOffsetResponse> future = new CompletableFuture<>();
5762
replicaManager.notifyLakeTableOffset(notifyLakeTableOffsetData, future::complete);
5863
future.get();
59-
verifyLakeTableOffset(replica, snapshotId, startOffset, endOffset);
64+
verifyLakeTableOffset(replica, snapshotId, startOffset, endOffset, maxTimestamp);
6065
}
6166

6267
private void verifyLakeTableOffset(
63-
Replica replica, long snapshotId, long startOffset, long endOffset) {
68+
Replica replica, long snapshotId, long startOffset, long endOffset, long maxTimestamp) {
6469
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeTableSnapshotId())
6570
.isEqualTo(snapshotId);
6671
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogStartOffset())
6772
.isEqualTo(startOffset);
6873
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogEndOffset())
6974
.isEqualTo(endOffset);
75+
AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeMaxTimestamp())
76+
.isEqualTo(maxTimestamp);
7077
}
7178

7279
private TableBucket makeTableBucket(boolean partitionTable) {
@@ -82,10 +89,15 @@ private TableBucket makeTableBucket(long tableId, boolean partitionTable) {
8289
}
8390

8491
private NotifyLakeTableOffsetData getNotifyLakeTableOffset(
85-
TableBucket tableBucket, long snapshotId, long startOffset, long endOffset) {
92+
TableBucket tableBucket,
93+
long snapshotId,
94+
long startOffset,
95+
long endOffset,
96+
long maxTimestamp) {
8697
return new NotifyLakeTableOffsetData(
8798
1,
8899
Collections.singletonMap(
89-
tableBucket, new LakeBucketOffset(snapshotId, startOffset, endOffset)));
100+
tableBucket,
101+
new LakeBucketOffset(snapshotId, startOffset, endOffset, maxTimestamp)));
90102
}
91103
}

0 commit comments

Comments
 (0)