Skip to content

Commit ece2fe3

Browse files
authored
[lake] Lake tiering service report the max timestamp of fluss record (#1700)
1 parent 283b3b8 commit ece2fe3

File tree

19 files changed

+328
-61
lines changed

19 files changed

+328
-61
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.HashMap;
2424
import java.util.Map;
25+
import java.util.Set;
2526

2627
/** A lake snapshot for a Fluss table. */
2728
class FlussTableLakeSnapshot {
@@ -34,10 +35,15 @@ class FlussTableLakeSnapshot {
3435
// if the bucket is not of a partition, the partition_name is null
3536
private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
3637

38+
// <table_bucket, partition_name> -> max timestamps,
39+
// if the bucket is not of a partition, the partition_name is null
40+
private final Map<Tuple2<TableBucket, String>, Long> maxTimestamps;
41+
3742
FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
3843
this.tableId = tableId;
3944
this.lakeSnapshotId = lakeSnapshotId;
4045
this.logEndOffsets = new HashMap<>();
46+
this.maxTimestamps = new HashMap<>();
4147
}
4248

4349
public long tableId() {
@@ -48,16 +54,27 @@ public long lakeSnapshotId() {
4854
return lakeSnapshotId;
4955
}
5056

51-
public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
52-
return logEndOffsets;
57+
public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
58+
return logEndOffsets.keySet();
5359
}
5460

55-
public void addBucketOffset(TableBucket bucket, long offset) {
61+
public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset, long timestamp) {
5662
logEndOffsets.put(Tuple2.of(bucket, null), offset);
63+
maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
5764
}
5865

59-
public void addPartitionBucketOffset(TableBucket bucket, String partitionName, long offset) {
66+
public void addPartitionBucketOffsetAndTimestamp(
67+
TableBucket bucket, String partitionName, long offset, long timestamp) {
6068
logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
69+
maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
70+
}
71+
72+
public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
73+
return logEndOffsets.get(bucketPartition);
74+
}
75+
76+
public long getMaxTimestamp(Tuple2<TableBucket, String> bucketPartition) {
77+
return maxTimestamps.get(bucketPartition);
6178
}
6279

6380
@Override
@@ -69,6 +86,8 @@ public String toString() {
6986
+ lakeSnapshotId
7087
+ ", logEndOffsets="
7188
+ logEndOffsets
89+
+ ", maxTimestamps="
90+
+ maxTimestamps
7291
+ '}';
7392
}
7493
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
8888
Long partitionId = partitionBucket.f0;
8989
if (partitionId == null) {
9090
tableBucket = new TableBucket(tableId, partitionBucket.f1);
91-
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
91+
// we use -1 since we don't store timestamp in lake snapshot property for
92+
// simplicity, it may cause the timestamp to be -1 during constructing lake
93+
// snapshot to commit to Fluss.
94+
// But it should happen rarely and should be a normal value after next tiering.
95+
flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
96+
tableBucket, entry.getValue(), -1);
9297
} else {
9398
tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
9499
// the partition name is qualified partition name in format:
@@ -98,8 +103,11 @@ public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
98103
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
99104
ResolvedPartitionSpec resolvedPartitionSpec =
100105
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
101-
flussTableLakeSnapshot.addPartitionBucketOffset(
102-
tableBucket, resolvedPartitionSpec.getPartitionName(), entry.getValue());
106+
flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
107+
tableBucket,
108+
resolvedPartitionSpec.getPartitionName(),
109+
entry.getValue(),
110+
-1);
103111
}
104112
}
105113
commit(flussTableLakeSnapshot);
@@ -114,13 +122,14 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
114122

115123
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
116124
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
117-
for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry :
118-
flussTableLakeSnapshot.logEndOffsets().entrySet()) {
125+
for (Tuple2<TableBucket, String> bucketPartition :
126+
flussTableLakeSnapshot.tablePartitionBuckets()) {
119127
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
120128
pbLakeTableSnapshotInfo.addBucketsReq();
121-
TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
122-
String partitionName = bucketEndOffsetEntry.getKey().f1;
123-
long endOffset = bucketEndOffsetEntry.getValue();
129+
TableBucket tableBucket = bucketPartition.f0;
130+
String partitionName = bucketPartition.f1;
131+
long endOffset = flussTableLakeSnapshot.getLogEndOffset(bucketPartition);
132+
long maxTimestamp = flussTableLakeSnapshot.getMaxTimestamp(bucketPartition);
124133
if (tableBucket.getPartitionId() != null) {
125134
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
126135
}
@@ -129,6 +138,7 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
129138
}
130139
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
131140
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
141+
pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
132142
}
133143
return commitLakeTableSnapshotRequest;
134144
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,14 @@ private Committable commitWriteResults(
229229
for (TableBucketWriteResult<WriteResult> writeResult : committableWriteResults) {
230230
TableBucket tableBucket = writeResult.tableBucket();
231231
if (writeResult.tableBucket().getPartitionId() == null) {
232-
flussTableLakeSnapshot.addBucketOffset(tableBucket, writeResult.logEndOffset());
232+
flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
233+
tableBucket, writeResult.logEndOffset(), writeResult.maxTimestamp());
233234
} else {
234-
flussTableLakeSnapshot.addPartitionBucketOffset(
235-
tableBucket, writeResult.partitionName(), writeResult.logEndOffset());
235+
flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
236+
tableBucket,
237+
writeResult.partitionName(),
238+
writeResult.logEndOffset(),
239+
writeResult.maxTimestamp());
236240
}
237241
}
238242
flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
4242
// null when the bucket is not for a partition
4343
@Nullable private final String partitionName;
4444

45-
// will be null when no any data write, such as for tiering a empty log split
45+
// will be null when no any data write, such as for tiering an empty log split
4646
@Nullable private final WriteResult writeResult;
4747

4848
// the end offset of tiering, should be the last tiered record's offset + 1
4949
private final long logEndOffset;
5050

51+
// the max timestamp of tiering, should be the last tiered record's timestamp,
52+
// will be -1 for empty log splits or snapshot splits
53+
private final long maxTimestamp;
54+
5155
// the total number of write results in one round of tiering,
5256
// used for downstream commiter operator to determine when all write results
5357
// for the round of tiering is finished
@@ -59,12 +63,14 @@ public TableBucketWriteResult(
5963
@Nullable String partitionName,
6064
@Nullable WriteResult writeResult,
6165
long logEndOffset,
66+
long maxTimestamp,
6267
int numberOfWriteResults) {
6368
this.tablePath = tablePath;
6469
this.tableBucket = tableBucket;
6570
this.partitionName = partitionName;
6671
this.writeResult = writeResult;
6772
this.logEndOffset = logEndOffset;
73+
this.maxTimestamp = maxTimestamp;
6874
this.numberOfWriteResults = numberOfWriteResults;
6975
}
7076

@@ -93,4 +99,8 @@ public int numberOfWriteResults() {
9399
public long logEndOffset() {
94100
return logEndOffset;
95101
}
102+
103+
public long maxTimestamp() {
104+
return maxTimestamp;
105+
}
96106
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu
8585
// serialize log end offset
8686
out.writeLong(tableBucketWriteResult.logEndOffset());
8787

88+
// serialize max timestamp
89+
out.writeLong(tableBucketWriteResult.maxTimestamp());
90+
8891
// serialize number of write results
8992
out.writeInt(tableBucketWriteResult.numberOfWriteResults());
9093

@@ -129,6 +132,8 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
129132

130133
// deserialize log end offset
131134
long logEndOffset = in.readLong();
135+
// deserialize max timestamp
136+
long maxTimestamp = in.readLong();
132137
// deserialize number of write results
133138
int numberOfWriteResults = in.readInt();
134139
return new TableBucketWriteResult<>(
@@ -137,6 +142,7 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
137142
partitionName,
138143
writeResult,
139144
logEndOffset,
145+
maxTimestamp,
140146
numberOfWriteResults);
141147
}
142148
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class TieringSplitReader<WriteResult>
6868

6969
private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L);
7070

71+
// unknown bucket timestamp for empty split or snapshot split
72+
private static final long UNKNOWN_BUCKET_TIMESTAMP = -1;
73+
7174
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
7275

7376
// the id for the pending tables to be tiered
@@ -285,7 +288,10 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
285288
writeResults.put(
286289
bucket,
287290
completeLakeWriter(
288-
bucket, currentTieringSplit.getPartitionName(), stoppingOffset));
291+
bucket,
292+
currentTieringSplit.getPartitionName(),
293+
stoppingOffset,
294+
lastRecord.timestamp()));
289295
// put split of the bucket
290296
finishedSplitIds.put(bucket, currentSplitId);
291297
LOG.info("Split {} has been finished.", currentSplitId);
@@ -316,7 +322,10 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
316322
}
317323

318324
private TableBucketWriteResult<WriteResult> completeLakeWriter(
319-
TableBucket bucket, @Nullable String partitionName, long logEndOffset)
325+
TableBucket bucket,
326+
@Nullable String partitionName,
327+
long logEndOffset,
328+
long maxTimestamp)
320329
throws IOException {
321330
LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
322331
WriteResult writeResult = lakeWriter.complete();
@@ -327,6 +336,7 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter(
327336
partitionName,
328337
writeResult,
329338
logEndOffset,
339+
maxTimestamp,
330340
checkNotNull(currentTableNumberOfSplits));
331341
}
332342

@@ -344,6 +354,7 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringLogSplit> e
344354
logSplit.getPartitionName(),
345355
null,
346356
logSplit.getStoppingOffset(),
357+
UNKNOWN_BUCKET_TIMESTAMP,
347358
logSplit.getNumberOfSplits()));
348359
}
349360
return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds);
@@ -363,7 +374,10 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I
363374
String splitId = currentTableSplitsByBucket.remove(tableBucket).splitId();
364375
TableBucketWriteResult<WriteResult> writeResult =
365376
completeLakeWriter(
366-
tableBucket, currentSnapshotSplit.getPartitionName(), logEndOffset);
377+
tableBucket,
378+
currentSnapshotSplit.getPartitionName(),
379+
logEndOffset,
380+
UNKNOWN_BUCKET_TIMESTAMP);
367381
closeCurrentSnapshotSplit();
368382
mayFinishCurrentTable();
369383
return new TableBucketWriteResultWithSplitIds(
@@ -483,9 +497,16 @@ private TableBucketWriteResult<WriteResult> toTableBucketWriteResult(
483497
@Nullable String partitionName,
484498
@Nullable WriteResult writeResult,
485499
long endLogOffset,
500+
long maxTimestamp,
486501
int numberOfSplits) {
487502
return new TableBucketWriteResult<>(
488-
tablePath, tableBucket, partitionName, writeResult, endLogOffset, numberOfSplits);
503+
tablePath,
504+
tableBucket,
505+
partitionName,
506+
writeResult,
507+
endLogOffset,
508+
maxTimestamp,
509+
numberOfSplits);
489510
}
490511

491512
private class TableBucketWriteResultWithSplitIds

0 commit comments

Comments
 (0)