Skip to content

Commit 8c29d67

Browse files
committed
[server] Lake tiering service report the max timestamp of fluss record
1 parent cbc083e commit 8c29d67

File tree

30 files changed

+478
-131
lines changed

30 files changed

+478
-131
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/LakeSnapshot.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
/**
2727
* A class representing the lake snapshot information of a table. It contains:
28-
* <li>The snapshot id and the log offset for each bucket.
28+
* <li>The snapshot id, the log offset and timestamp for each bucket.
2929
*
3030
* @since 0.3
3131
*/
@@ -37,16 +37,21 @@ public class LakeSnapshot {
3737
// the specific log offset of the snapshot
3838
private final Map<TableBucket, Long> tableBucketsOffset;
3939

40+
// the specific timestamp of the snapshot
41+
private final Map<TableBucket, Long> tableBucketsTimestamp;
42+
4043
// the partition name by partition id of this lake snapshot if
4144
// is a partitioned table, empty if not a partitioned table
4245
private final Map<Long, String> partitionNameById;
4346

4447
public LakeSnapshot(
4548
long snapshotId,
4649
Map<TableBucket, Long> tableBucketsOffset,
50+
Map<TableBucket, Long> tableBucketsTimestamp,
4751
Map<Long, String> partitionNameById) {
4852
this.snapshotId = snapshotId;
4953
this.tableBucketsOffset = tableBucketsOffset;
54+
this.tableBucketsTimestamp = tableBucketsTimestamp;
5055
this.partitionNameById = partitionNameById;
5156
}
5257

@@ -58,6 +63,10 @@ public Map<TableBucket, Long> getTableBucketsOffset() {
5863
return Collections.unmodifiableMap(tableBucketsOffset);
5964
}
6065

66+
public Map<TableBucket, Long> getTableBucketsTimestamp() {
67+
return Collections.unmodifiableMap(tableBucketsTimestamp);
68+
}
69+
6170
public Map<Long, String> getPartitionNameById() {
6271
return Collections.unmodifiableMap(partitionNameById);
6372
}
@@ -69,6 +78,8 @@ public String toString() {
6978
+ snapshotId
7079
+ ", tableBucketsOffset="
7180
+ tableBucketsOffset
81+
+ ", tableBucketsTimestamp="
82+
+ tableBucketsTimestamp
7283
+ ", partitionNameById="
7384
+ partitionNameById
7485
+ '}';

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
202202
long snapshotId = response.getSnapshotId();
203203
Map<TableBucket, Long> tableBucketsOffset =
204204
new HashMap<>(response.getBucketSnapshotsCount());
205+
Map<TableBucket, Long> tableBucketsTimestamp =
206+
new HashMap<>(response.getBucketSnapshotsCount());
205207
Map<Long, String> partitionNameById = new HashMap<>();
206208
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : response.getBucketSnapshotsList()) {
207209
Long partitionId =
@@ -214,8 +216,12 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
214216
partitionNameById.put(partitionId, pbLakeSnapshotForBucket.getPartitionName());
215217
}
216218
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
219+
if (pbLakeSnapshotForBucket.hasTimestamp()) {
220+
tableBucketsTimestamp.put(tableBucket, pbLakeSnapshotForBucket.getTimestamp());
221+
}
217222
}
218-
return new LakeSnapshot(snapshotId, tableBucketsOffset, partitionNameById);
223+
return new LakeSnapshot(
224+
snapshotId, tableBucketsOffset, tableBucketsTimestamp, partitionNameById);
219225
}
220226

221227
public static List<FsPathAndFileName> toFsPathAndFileName(

fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,29 @@
2222
import java.io.Serializable;
2323
import java.util.Objects;
2424

25-
/** The bucket offset information to be expected to be stored in Lake's snapshot property. */
25+
/**
26+
* The bucket offset and timestamp information to be expected to be stored in Lake's snapshot
27+
* property.
28+
*/
2629
public class BucketOffset implements Serializable {
2730

2831
private static final long serialVersionUID = 1L;
2932
public static final String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets";
3033

3134
private final long logOffset;
35+
private final long timestamp;
3236
private final int bucket;
3337
private final @Nullable Long partitionId;
3438
private final @Nullable String partitionQualifiedName;
3539

3640
public BucketOffset(
3741
long logOffset,
42+
long timestamp,
3843
int bucket,
3944
@Nullable Long partitionId,
4045
@Nullable String partitionQualifiedName) {
4146
this.logOffset = logOffset;
47+
this.timestamp = timestamp;
4248
this.bucket = bucket;
4349
this.partitionId = partitionId;
4450
this.partitionQualifiedName = partitionQualifiedName;
@@ -48,6 +54,10 @@ public long getLogOffset() {
4854
return logOffset;
4955
}
5056

57+
public long getTimestamp() {
58+
return timestamp;
59+
}
60+
5161
public int getBucket() {
5262
return bucket;
5363
}
@@ -62,6 +72,11 @@ public String getPartitionQualifiedName() {
6272
return partitionQualifiedName;
6373
}
6474

75+
@Override
76+
public int hashCode() {
77+
return Objects.hash(logOffset, timestamp, bucket, partitionId, partitionQualifiedName);
78+
}
79+
6580
@Override
6681
public boolean equals(Object o) {
6782
if (this == o) {
@@ -73,6 +88,7 @@ public boolean equals(Object o) {
7388
BucketOffset that = (BucketOffset) o;
7489
return bucket == that.bucket
7590
&& logOffset == that.logOffset
91+
&& timestamp == that.timestamp
7692
&& Objects.equals(partitionId, that.partitionId)
7793
&& Objects.equals(partitionQualifiedName, that.partitionQualifiedName);
7894
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,21 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424
import java.util.Objects;
25+
import java.util.Set;
2526

2627
/**
27-
* The lake already committed snapshot, containing the lake snapshot id and the bucket end offsets
28-
* in this snapshot.
28+
* The lake already committed snapshot, containing the lake snapshot id, the bucket end offsets and
29+
* max timestamp in this snapshot.
2930
*/
3031
public class CommittedLakeSnapshot {
3132

3233
private final long lakeSnapshotId;
3334
// <partition_id, bucket> -> log offset, partition_id will be null if it's not a
3435
// partition bucket
3536
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
37+
// <partition_id, bucket> -> max timestamp, partition_id will be null if it's not a
38+
// partition bucket
39+
private final Map<Tuple2<Long, Integer>, Long> maxTimestamps = new HashMap<>();
3640

3741
// partition id -> partition name, will be empty if is not a partitioned table
3842
// the partition name is a qualified name in the format: key1=value1/key2=value2
@@ -46,18 +50,32 @@ public long getLakeSnapshotId() {
4650
return lakeSnapshotId;
4751
}
4852

49-
public void addBucket(int bucketId, long offset) {
53+
public void addBucket(int bucketId, long offset, long timestamp) {
5054
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
55+
maxTimestamps.put(Tuple2.of(null, bucketId), timestamp);
5156
}
5257

5358
public void addPartitionBucket(
54-
Long partitionId, String partitionQualifiedName, int bucketId, long offset) {
59+
Long partitionId,
60+
String partitionQualifiedName,
61+
int bucketId,
62+
long offset,
63+
long timestamp) {
5564
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
65+
maxTimestamps.put(Tuple2.of(partitionId, bucketId), timestamp);
5666
qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
5767
}
5868

59-
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
60-
return logEndOffsets;
69+
public Set<Tuple2<Long, Integer>> tablePartitionBuckets() {
70+
return logEndOffsets.keySet();
71+
}
72+
73+
public Long getLogEndOffset(Tuple2<Long, Integer> partitionBucket) {
74+
return logEndOffsets.get(partitionBucket);
75+
}
76+
77+
public Long getMaxTimestamp(Tuple2<Long, Integer> partitionBucket) {
78+
return maxTimestamps.get(partitionBucket);
6179
}
6280

6381
public Map<Long, String> getQualifiedPartitionNameById() {
@@ -72,12 +90,14 @@ public boolean equals(Object o) {
7290
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
7391
return lakeSnapshotId == that.lakeSnapshotId
7492
&& Objects.equals(logEndOffsets, that.logEndOffsets)
93+
&& Objects.equals(maxTimestamps, that.maxTimestamps)
7594
&& Objects.equals(qualifiedPartitionNameById, that.qualifiedPartitionNameById);
7695
}
7796

7897
@Override
7998
public int hashCode() {
80-
return Objects.hash(lakeSnapshotId, logEndOffsets, qualifiedPartitionNameById);
99+
return Objects.hash(
100+
lakeSnapshotId, logEndOffsets, maxTimestamps, qualifiedPartitionNameById);
81101
}
82102

83103
@Override
@@ -87,6 +107,8 @@ public String toString() {
87107
+ lakeSnapshotId
88108
+ ", logEndOffsets="
89109
+ logEndOffsets
110+
+ ", maxTimestamps="
111+
+ maxTimestamps
90112
+ ", partitionNameById="
91113
+ qualifiedPartitionNameById
92114
+ '}';

fluss-common/src/main/java/org/apache/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class BucketOffsetJsonSerde
3232
private static final String BUCKET_ID = "bucket_id";
3333
private static final String PARTITION_NAME = "partition_name";
3434
private static final String LOG_OFFSET = "log_offset";
35+
private static final String TIMESTAMP = "timestamp";
3536

3637
@Override
3738
public BucketOffset deserialize(JsonNode node) {
@@ -46,7 +47,10 @@ public BucketOffset deserialize(JsonNode node) {
4647
// deserialize log offset
4748
long logOffset = node.get(LOG_OFFSET).asLong();
4849

49-
return new BucketOffset(logOffset, bucketId, partitionId, partitionName);
50+
// deserialize timestamp
51+
long timestamp = node.get(TIMESTAMP).asLong();
52+
53+
return new BucketOffset(logOffset, timestamp, bucketId, partitionId, partitionName);
5054
}
5155

5256
@Override
@@ -67,6 +71,9 @@ public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws
6771
// serialize bucket offset
6872
generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());
6973

74+
// serialize bucket timestamp
75+
generator.writeNumberField(TIMESTAMP, bucketOffset.getTimestamp());
76+
7077
generator.writeEndObject();
7178
}
7279
}

fluss-common/src/test/java/org/apache/fluss/utils/json/BucketOffsetJsonSerdeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> {
2929
@Override
3030
protected BucketOffset[] createObjects() {
3131
return new BucketOffset[] {
32-
new BucketOffset(10, 1, 1L, "country=eu-central/year=2023/month=12"),
33-
new BucketOffset(20, 2, null, null)
32+
new BucketOffset(10, 100, 1, 1L, "country=eu-central/year=2023/month=12"),
33+
new BucketOffset(20, 200, 2, null, null)
3434
};
3535
}
3636

3737
@Override
3838
protected String[] expectedJsons() {
3939
return new String[] {
40-
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"log_offset\":10}",
41-
"{\"bucket_id\":2,\"log_offset\":20}"
40+
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"log_offset\":10,\"timestamp\":100}",
41+
"{\"bucket_id\":2,\"log_offset\":20,\"timestamp\":200}"
4242
};
4343
}
4444
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.HashMap;
2424
import java.util.Map;
25+
import java.util.Set;
2526

2627
/** A lake snapshot for a Fluss table. */
2728
class FlussTableLakeSnapshot {
@@ -34,10 +35,15 @@ class FlussTableLakeSnapshot {
3435
// if the bucket is not of a partition, the partition_name is null
3536
private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
3637

38+
// <table_bucket, partition_name> -> max timestamps,
39+
// if the bucket is not of a partition, the partition_name is null
40+
private final Map<Tuple2<TableBucket, String>, Long> maxTimestamps;
41+
3742
FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
3843
this.tableId = tableId;
3944
this.lakeSnapshotId = lakeSnapshotId;
4045
this.logEndOffsets = new HashMap<>();
46+
this.maxTimestamps = new HashMap<>();
4147
}
4248

4349
public long tableId() {
@@ -48,16 +54,27 @@ public long lakeSnapshotId() {
4854
return lakeSnapshotId;
4955
}
5056

51-
public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
52-
return logEndOffsets;
57+
public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
58+
return logEndOffsets.keySet();
5359
}
5460

55-
public void addBucketOffset(TableBucket bucket, long offset) {
61+
public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset, long timestamp) {
5662
logEndOffsets.put(Tuple2.of(bucket, null), offset);
63+
maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
5764
}
5865

59-
public void addPartitionBucketOffset(TableBucket bucket, String partitionName, long offset) {
66+
public void addPartitionBucketOffsetAndTimestamp(
67+
TableBucket bucket, String partitionName, long offset, long timestamp) {
6068
logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
69+
maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
70+
}
71+
72+
public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
73+
return logEndOffsets.get(bucketPartition);
74+
}
75+
76+
public long getMaxTimestamp(Tuple2<TableBucket, String> bucketPartition) {
77+
return maxTimestamps.get(bucketPartition);
6178
}
6279

6380
@Override
@@ -69,6 +86,8 @@ public String toString() {
6986
+ lakeSnapshotId
7087
+ ", logEndOffsets="
7188
+ logEndOffsets
89+
+ ", maxTimestamps="
90+
+ maxTimestamps
7291
+ '}';
7392
}
7493
}

0 commit comments

Comments
 (0)