Skip to content

Commit 5db8f4b

Browse files
committed
remove unnecessary information in zk lake table node
1 parent c681127 commit 5db8f4b

File tree

16 files changed

+53
-288
lines changed

16 files changed

+53
-288
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/LakeSnapshot.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,9 @@ public class LakeSnapshot {
3737
// the specific log offset of the snapshot
3838
private final Map<TableBucket, Long> tableBucketsOffset;
3939

40-
// the partition name by partition id of this lake snapshot if
41-
// is a partitioned table, empty if not a partitioned table
42-
private final Map<Long, String> partitionNameById;
43-
44-
public LakeSnapshot(
45-
long snapshotId,
46-
Map<TableBucket, Long> tableBucketsOffset,
47-
Map<Long, String> partitionNameById) {
40+
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
4841
this.snapshotId = snapshotId;
4942
this.tableBucketsOffset = tableBucketsOffset;
50-
this.partitionNameById = partitionNameById;
5143
}
5244

5345
public long getSnapshotId() {
@@ -58,19 +50,13 @@ public Map<TableBucket, Long> getTableBucketsOffset() {
5850
return Collections.unmodifiableMap(tableBucketsOffset);
5951
}
6052

61-
public Map<Long, String> getPartitionNameById() {
62-
return Collections.unmodifiableMap(partitionNameById);
63-
}
64-
6553
@Override
6654
public String toString() {
6755
return "LakeSnapshot{"
6856
+ "snapshotId="
6957
+ snapshotId
7058
+ ", tableBucketsOffset="
7159
+ tableBucketsOffset
72-
+ ", partitionNameById="
73-
+ partitionNameById
7460
+ '}';
7561
}
7662
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -215,20 +215,16 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
215215
long snapshotId = response.getSnapshotId();
216216
Map<TableBucket, Long> tableBucketsOffset =
217217
new HashMap<>(response.getBucketSnapshotsCount());
218-
Map<Long, String> partitionNameById = new HashMap<>();
219218
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : response.getBucketSnapshotsList()) {
220219
Long partitionId =
221220
pbLakeSnapshotForBucket.hasPartitionId()
222221
? pbLakeSnapshotForBucket.getPartitionId()
223222
: null;
224223
TableBucket tableBucket =
225224
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
226-
if (partitionId != null && pbLakeSnapshotForBucket.hasPartitionName()) {
227-
partitionNameById.put(partitionId, pbLakeSnapshotForBucket.getPartitionName());
228-
}
229225
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
230226
}
231-
return new LakeSnapshot(snapshotId, tableBucketsOffset, partitionNameById);
227+
return new LakeSnapshot(snapshotId, tableBucketsOffset);
232228
}
233229

234230
public static List<FsPathAndFileName> toFsPathAndFileName(

fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,11 @@ public class BucketOffset implements Serializable {
3131
private final long logOffset;
3232
private final int bucket;
3333
private final @Nullable Long partitionId;
34-
private final @Nullable String partitionQualifiedName;
3534

36-
public BucketOffset(
37-
long logOffset,
38-
int bucket,
39-
@Nullable Long partitionId,
40-
@Nullable String partitionQualifiedName) {
35+
public BucketOffset(long logOffset, int bucket, @Nullable Long partitionId) {
4136
this.logOffset = logOffset;
4237
this.bucket = bucket;
4338
this.partitionId = partitionId;
44-
this.partitionQualifiedName = partitionQualifiedName;
4539
}
4640

4741
public long getLogOffset() {
@@ -57,23 +51,19 @@ public Long getPartitionId() {
5751
return partitionId;
5852
}
5953

60-
@Nullable
61-
public String getPartitionQualifiedName() {
62-
return partitionQualifiedName;
63-
}
64-
6554
@Override
6655
public boolean equals(Object o) {
67-
if (this == o) {
68-
return true;
69-
}
7056
if (o == null || getClass() != o.getClass()) {
7157
return false;
7258
}
7359
BucketOffset that = (BucketOffset) o;
74-
return bucket == that.bucket
75-
&& logOffset == that.logOffset
76-
&& Objects.equals(partitionId, that.partitionId)
77-
&& Objects.equals(partitionQualifiedName, that.partitionQualifiedName);
60+
return logOffset == that.logOffset
61+
&& bucket == that.bucket
62+
&& Objects.equals(partitionId, that.partitionId);
63+
}
64+
65+
@Override
66+
public int hashCode() {
67+
return Objects.hash(logOffset, bucket, partitionId);
7868
}
7969
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ public class CommittedLakeSnapshot {
3434
// partition bucket
3535
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3636

37-
// partition id -> partition name, will be empty if is not a partitioned table
38-
// the partition name is a qualified name in the format: key1=value1/key2=value2
39-
private final Map<Long, String> qualifiedPartitionNameById = new HashMap<>();
40-
4137
public CommittedLakeSnapshot(long lakeSnapshotId) {
4238
this.lakeSnapshotId = lakeSnapshotId;
4339
}
@@ -50,34 +46,27 @@ public void addBucket(int bucketId, long offset) {
5046
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
5147
}
5248

53-
public void addPartitionBucket(
54-
Long partitionId, String partitionQualifiedName, int bucketId, long offset) {
49+
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
5550
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
56-
qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
5751
}
5852

5953
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
6054
return logEndOffsets;
6155
}
6256

63-
public Map<Long, String> getQualifiedPartitionNameById() {
64-
return qualifiedPartitionNameById;
65-
}
66-
6757
@Override
6858
public boolean equals(Object o) {
6959
if (o == null || getClass() != o.getClass()) {
7060
return false;
7161
}
7262
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
7363
return lakeSnapshotId == that.lakeSnapshotId
74-
&& Objects.equals(logEndOffsets, that.logEndOffsets)
75-
&& Objects.equals(qualifiedPartitionNameById, that.qualifiedPartitionNameById);
64+
&& Objects.equals(logEndOffsets, that.logEndOffsets);
7665
}
7766

7867
@Override
7968
public int hashCode() {
80-
return Objects.hash(lakeSnapshotId, logEndOffsets, qualifiedPartitionNameById);
69+
return Objects.hash(lakeSnapshotId, logEndOffsets);
8170
}
8271

8372
@Override
@@ -87,8 +76,6 @@ public String toString() {
8776
+ lakeSnapshotId
8877
+ ", logEndOffsets="
8978
+ logEndOffsets
90-
+ ", partitionNameById="
91-
+ qualifiedPartitionNameById
9279
+ '}';
9380
}
9481
}

fluss-common/src/main/java/org/apache/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ public class BucketOffsetJsonSerde
3030
public static final BucketOffsetJsonSerde INSTANCE = new BucketOffsetJsonSerde();
3131
private static final String PARTITION_ID = "partition_id";
3232
private static final String BUCKET_ID = "bucket";
33-
private static final String PARTITION_NAME = "partition_name";
3433
private static final String LOG_OFFSET = "offset";
3534

3635
@Override
@@ -39,14 +38,10 @@ public BucketOffset deserialize(JsonNode node) {
3938
Long partitionId = partitionIdNode == null ? null : partitionIdNode.asLong();
4039
int bucketId = node.get(BUCKET_ID).asInt();
4140

42-
// deserialize partition name
43-
JsonNode partitionNameNode = node.get(PARTITION_NAME);
44-
String partitionName = partitionNameNode == null ? null : partitionNameNode.asText();
45-
4641
// deserialize log offset
4742
long logOffset = node.get(LOG_OFFSET).asLong();
4843

49-
return new BucketOffset(logOffset, bucketId, partitionId, partitionName);
44+
return new BucketOffset(logOffset, bucketId, partitionId);
5045
}
5146

5247
@Override
@@ -59,11 +54,6 @@ public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws
5954
}
6055
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
6156

62-
// serialize partition name
63-
if (bucketOffset.getPartitionQualifiedName() != null) {
64-
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionQualifiedName());
65-
}
66-
6757
// serialize bucket offset
6858
generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());
6959

fluss-common/src/test/java/org/apache/fluss/utils/json/BucketOffsetJsonSerdeTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,13 @@ public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> {
2828

2929
@Override
3030
protected BucketOffset[] createObjects() {
31-
return new BucketOffset[] {
32-
new BucketOffset(10, 1, 1L, "country=eu-central/year=2023/month=12"),
33-
new BucketOffset(20, 2, null, null)
34-
};
31+
return new BucketOffset[] {new BucketOffset(10, 1, 1L), new BucketOffset(20, 2, null)};
3532
}
3633

3734
@Override
3835
protected String[] expectedJsons() {
3936
return new String[] {
40-
"{\"partition_id\":1,\"bucket\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"offset\":10}",
41-
"{\"bucket\":2,\"offset\":20}"
37+
"{\"partition_id\":1,\"bucket\":1,\"offset\":10}", "{\"bucket\":2,\"offset\":20}"
4238
};
4339
}
4440
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.fluss.flink.tiering.committer;
1919

2020
import org.apache.fluss.metadata.TableBucket;
21-
import org.apache.fluss.utils.types.Tuple2;
2221

2322
import java.util.HashMap;
2423
import java.util.Map;
@@ -31,19 +30,13 @@ class FlussTableLakeSnapshot {
3130

3231
private final long lakeSnapshotId;
3332

34-
// <table_bucket, partition_name> -> log end offsets,
35-
// if the bucket is not of a partition, the partition_name is null
36-
private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
37-
38-
// <table_bucket, partition_name> -> max timestamps,
39-
// if the bucket is not of a partition, the partition_name is null
40-
private final Map<Tuple2<TableBucket, String>, Long> maxTimestamps;
33+
// table_bucket -> log end offsets,
34+
private final Map<TableBucket, Long> logEndOffsets;
4135

4236
FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
4337
this.tableId = tableId;
4438
this.lakeSnapshotId = lakeSnapshotId;
4539
this.logEndOffsets = new HashMap<>();
46-
this.maxTimestamps = new HashMap<>();
4740
}
4841

4942
public long tableId() {
@@ -54,27 +47,20 @@ public long lakeSnapshotId() {
5447
return lakeSnapshotId;
5548
}
5649

57-
public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
50+
public Set<TableBucket> tableBuckets() {
5851
return logEndOffsets.keySet();
5952
}
6053

61-
public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset, long timestamp) {
62-
logEndOffsets.put(Tuple2.of(bucket, null), offset);
63-
maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
64-
}
65-
66-
public void addPartitionBucketOffsetAndTimestamp(
67-
TableBucket bucket, String partitionName, long offset, long timestamp) {
68-
logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
69-
maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
54+
public void addBucketOffset(TableBucket bucket, long offset) {
55+
logEndOffsets.put(bucket, offset);
7056
}
7157

72-
public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
73-
return logEndOffsets.get(bucketPartition);
58+
public void addPartitionBucketOffset(TableBucket bucket, long offset) {
59+
logEndOffsets.put(bucket, offset);
7460
}
7561

76-
public long getMaxTimestamp(Tuple2<TableBucket, String> bucketPartition) {
77-
return maxTimestamps.get(bucketPartition);
62+
public long getLogEndOffset(TableBucket bucket) {
63+
return logEndOffsets.get(bucket);
7864
}
7965

8066
@Override
@@ -86,8 +72,6 @@ public String toString() {
8672
+ lakeSnapshotId
8773
+ ", logEndOffsets="
8874
+ logEndOffsets
89-
+ ", maxTimestamps="
90-
+ maxTimestamps
9175
+ '}';
9276
}
9377
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
2323
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
24-
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2524
import org.apache.fluss.metadata.TableBucket;
2625
import org.apache.fluss.metrics.registry.MetricRegistry;
2726
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -88,26 +87,10 @@ public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
8887
Long partitionId = partitionBucket.f0;
8988
if (partitionId == null) {
9089
tableBucket = new TableBucket(tableId, partitionBucket.f1);
91-
// we use -1 since we don't store timestamp in lake snapshot property for
92-
// simplicity, it may cause the timestamp to be -1 during constructing lake
93-
// snapshot to commit to Fluss.
94-
// But it should happen rarely and should be a normal value after next tiering.
95-
flussTableLakeSnapshot.addBucketOffsetAndTimestamp(
96-
tableBucket, entry.getValue(), -1);
90+
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
9791
} else {
9892
tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
99-
// the partition name is qualified partition name in format:
100-
// key1=value1/key2=value2.
101-
// we need to convert to partition name in format: value1$value2$
102-
String qualifiedPartitionName =
103-
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
104-
ResolvedPartitionSpec resolvedPartitionSpec =
105-
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
106-
flussTableLakeSnapshot.addPartitionBucketOffsetAndTimestamp(
107-
tableBucket,
108-
resolvedPartitionSpec.getPartitionName(),
109-
entry.getValue(),
110-
-1);
93+
flussTableLakeSnapshot.addPartitionBucketOffset(tableBucket, entry.getValue());
11194
}
11295
}
11396
commit(flussTableLakeSnapshot);
@@ -122,23 +105,15 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
122105

123106
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
124107
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
125-
for (Tuple2<TableBucket, String> bucketPartition :
126-
flussTableLakeSnapshot.tablePartitionBuckets()) {
108+
for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
127109
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
128110
pbLakeTableSnapshotInfo.addBucketsReq();
129-
TableBucket tableBucket = bucketPartition.f0;
130-
String partitionName = bucketPartition.f1;
131-
long endOffset = flussTableLakeSnapshot.getLogEndOffset(bucketPartition);
132-
long maxTimestamp = flussTableLakeSnapshot.getMaxTimestamp(bucketPartition);
111+
long endOffset = flussTableLakeSnapshot.getLogEndOffset(tableBucket);
133112
if (tableBucket.getPartitionId() != null) {
134113
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
135114
}
136-
if (partitionName != null) {
137-
pbLakeTableOffsetForBucket.setPartitionName(partitionName);
138-
}
139115
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
140116
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
141-
pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
142117
}
143118
return commitLakeTableSnapshotRequest;
144119
}

0 commit comments

Comments
 (0)