Skip to content

Commit d2dc98f

Browse files
authored
[lake] Store partition name in lake snapshot property (#1485)
1 parent 72e4c43 commit d2dc98f

File tree

27 files changed

+429
-149
lines changed

27 files changed

+429
-149
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.annotation.PublicEvolving;
2121
import com.alibaba.fluss.metadata.TableBucket;
2222

23+
import java.util.Collections;
2324
import java.util.Map;
2425

2526
/**
@@ -36,17 +37,29 @@ public class LakeSnapshot {
3637
// the specific log offset of the snapshot
3738
private final Map<TableBucket, Long> tableBucketsOffset;
3839

39-
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
40+
// the partition name by partition id of this lake snapshot if
41+
// is a partitioned table, empty if not a partitioned table
42+
private final Map<Long, String> partitionNameById;
43+
44+
public LakeSnapshot(
45+
long snapshotId,
46+
Map<TableBucket, Long> tableBucketsOffset,
47+
Map<Long, String> partitionNameById) {
4048
this.snapshotId = snapshotId;
4149
this.tableBucketsOffset = tableBucketsOffset;
50+
this.partitionNameById = partitionNameById;
4251
}
4352

4453
public long getSnapshotId() {
4554
return snapshotId;
4655
}
4756

4857
public Map<TableBucket, Long> getTableBucketsOffset() {
49-
return tableBucketsOffset;
58+
return Collections.unmodifiableMap(tableBucketsOffset);
59+
}
60+
61+
public Map<Long, String> getPartitionNameById() {
62+
return Collections.unmodifiableMap(partitionNameById);
5063
}
5164

5265
@Override
@@ -56,6 +69,8 @@ public String toString() {
5669
+ snapshotId
5770
+ ", tableBucketsOffset="
5871
+ tableBucketsOffset
72+
+ ", partitionNameById="
73+
+ partitionNameById
5974
+ '}';
6075
}
6176
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,20 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
202202
long snapshotId = response.getSnapshotId();
203203
Map<TableBucket, Long> tableBucketsOffset =
204204
new HashMap<>(response.getBucketSnapshotsCount());
205+
Map<Long, String> partitionNameById = new HashMap<>();
205206
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : response.getBucketSnapshotsList()) {
206207
Long partitionId =
207208
pbLakeSnapshotForBucket.hasPartitionId()
208209
? pbLakeSnapshotForBucket.getPartitionId()
209210
: null;
210211
TableBucket tableBucket =
211212
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
213+
if (partitionId != null && pbLakeSnapshotForBucket.hasPartitionName()) {
214+
partitionNameById.put(partitionId, pbLakeSnapshotForBucket.getPartitionName());
215+
}
212216
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
213217
}
214-
return new LakeSnapshot(snapshotId, tableBucketsOffset);
218+
return new LakeSnapshot(snapshotId, tableBucketsOffset, partitionNameById);
215219
}
216220

217221
public static List<FsPathAndFileName> toFsPathAndFileName(

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ public class BucketOffset implements Serializable {
3131
private final long logOffset;
3232
private final int bucket;
3333
private final @Nullable Long partitionId;
34-
private final @Nullable String partitionName;
34+
private final @Nullable String partitionQualifiedName;
3535

3636
public BucketOffset(
3737
long logOffset,
3838
int bucket,
3939
@Nullable Long partitionId,
40-
@Nullable String partitionName) {
40+
@Nullable String partitionQualifiedName) {
4141
this.logOffset = logOffset;
4242
this.bucket = bucket;
4343
this.partitionId = partitionId;
44-
this.partitionName = partitionName;
44+
this.partitionQualifiedName = partitionQualifiedName;
4545
}
4646

4747
public long getLogOffset() {
@@ -58,8 +58,8 @@ public Long getPartitionId() {
5858
}
5959

6060
@Nullable
61-
public String getPartitionName() {
62-
return partitionName;
61+
public String getPartitionQualifiedName() {
62+
return partitionQualifiedName;
6363
}
6464

6565
@Override
@@ -74,6 +74,6 @@ public boolean equals(Object o) {
7474
return bucket == that.bucket
7575
&& logOffset == that.logOffset
7676
&& Objects.equals(partitionId, that.partitionId)
77-
&& Objects.equals(partitionName, that.partitionName);
77+
&& Objects.equals(partitionQualifiedName, that.partitionQualifiedName);
7878
}
7979
}

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public class CommittedLakeSnapshot {
3434
// partition bucket
3535
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3636

37+
// partition id -> partition name, will be empty if is not a partitioned table
38+
// the partition name is a qualified name in the format: key1=value1/key2=value2
39+
private final Map<Long, String> qualifiedPartitionNameById = new HashMap<>();
40+
3741
public CommittedLakeSnapshot(long lakeSnapshotId) {
3842
this.lakeSnapshotId = lakeSnapshotId;
3943
}
@@ -46,27 +50,34 @@ public void addBucket(int bucketId, long offset) {
4650
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
4751
}
4852

49-
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
53+
public void addPartitionBucket(
54+
Long partitionId, String partitionQualifiedName, int bucketId, long offset) {
5055
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
56+
qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
5157
}
5258

5359
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
5460
return logEndOffsets;
5561
}
5662

63+
public Map<Long, String> getQualifiedPartitionNameById() {
64+
return qualifiedPartitionNameById;
65+
}
66+
5767
@Override
5868
public boolean equals(Object o) {
5969
if (o == null || getClass() != o.getClass()) {
6070
return false;
6171
}
6272
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
6373
return lakeSnapshotId == that.lakeSnapshotId
64-
&& Objects.equals(logEndOffsets, that.logEndOffsets);
74+
&& Objects.equals(logEndOffsets, that.logEndOffsets)
75+
&& Objects.equals(qualifiedPartitionNameById, that.qualifiedPartitionNameById);
6576
}
6677

6778
@Override
6879
public int hashCode() {
69-
return Objects.hash(lakeSnapshotId, logEndOffsets);
80+
return Objects.hash(lakeSnapshotId, logEndOffsets, qualifiedPartitionNameById);
7081
}
7182

7283
@Override
@@ -76,6 +87,8 @@ public String toString() {
7687
+ lakeSnapshotId
7788
+ ", logEndOffsets="
7889
+ logEndOffsets
90+
+ ", partitionNameById="
91+
+ qualifiedPartitionNameById
7992
+ '}';
8093
}
8194
}

fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,25 @@ public String getPartitionQualifiedName() {
125125
return sb.toString();
126126
}
127127

128+
public static ResolvedPartitionSpec fromPartitionQualifiedName(String qualifiedPartitionName) {
129+
// convert from qualified name to ResolvedPartitionSpec
130+
List<String> keys = new ArrayList<>();
131+
List<String> values = new ArrayList<>();
132+
133+
String[] keyValuePairs = qualifiedPartitionName.split("/");
134+
135+
for (String pair : keyValuePairs) {
136+
String[] keyValue = pair.split("=", 2);
137+
if (keyValue.length != 2) {
138+
throw new IllegalArgumentException(
139+
"Invalid partition name format. Expected key=value, got: " + pair);
140+
}
141+
keys.add(keyValue[0]);
142+
values.add(keyValue[1]);
143+
}
144+
return new ResolvedPartitionSpec(keys, values);
145+
}
146+
128147
@Override
129148
public boolean equals(Object o) {
130149
if (o == null || getClass() != o.getClass()) {

fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws
6060
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
6161

6262
// serialize partition name
63-
if (bucketOffset.getPartitionName() != null) {
64-
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionName());
63+
if (bucketOffset.getPartitionQualifiedName() != null) {
64+
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionQualifiedName());
6565
}
6666

6767
// serialize bucket offset

fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ void testResolvedPartitionSpec() {
3838
new PartitionSpec(Collections.singletonMap("a", "1")));
3939
assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo("1");
4040
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1");
41+
assertThat(
42+
ResolvedPartitionSpec.fromPartitionQualifiedName(
43+
resolvedPartitionSpec.getPartitionQualifiedName()))
44+
.isEqualTo(resolvedPartitionSpec);
45+
4146
assertThat(resolvedPartitionSpec.getPartitionKeys())
4247
.isEqualTo(Collections.singletonList("a"));
4348
assertThat(resolvedPartitionSpec.getPartitionValues())
@@ -52,5 +57,9 @@ void testResolvedPartitionSpec() {
5257
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1/b=2");
5358
assertThat(resolvedPartitionSpec.getPartitionKeys()).isEqualTo(Arrays.asList("a", "b"));
5459
assertThat(resolvedPartitionSpec.getPartitionValues()).isEqualTo(Arrays.asList("1", "2"));
60+
assertThat(
61+
ResolvedPartitionSpec.fromPartitionQualifiedName(
62+
resolvedPartitionSpec.getPartitionQualifiedName()))
63+
.isEqualTo(resolvedPartitionSpec);
5564
}
5665
}

fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> {
2929
@Override
3030
protected BucketOffset[] createObjects() {
3131
return new BucketOffset[] {
32-
new BucketOffset(10, 1, 1L, "eu-central$2023$12"), new BucketOffset(20, 2, null, null)
32+
new BucketOffset(10, 1, 1L, "country=eu-central/year=2023/month=12"),
33+
new BucketOffset(20, 2, null, null)
3334
};
3435
}
3536

3637
@Override
3738
protected String[] expectedJsons() {
3839
return new String[] {
39-
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"eu-central$2023$12\",\"log_offset\":10}",
40+
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"log_offset\":10}",
4041
"{\"bucket_id\":2,\"log_offset\":20}"
4142
};
4243
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,26 @@
1818
package com.alibaba.fluss.flink.tiering.committer;
1919

2020
import com.alibaba.fluss.metadata.TableBucket;
21+
import com.alibaba.fluss.utils.types.Tuple2;
2122

2223
import java.util.HashMap;
2324
import java.util.Map;
2425

2526
/** A lake snapshot for a Fluss table. */
26-
public class FlussTableLakeSnapshot {
27+
class FlussTableLakeSnapshot {
2728

2829
private final long tableId;
2930

3031
private final long lakeSnapshotId;
3132

32-
private final Map<TableBucket, Long> logEndOffsets;
33+
// <table_bucket, partition_name> -> log end offsets,
34+
// if the bucket is not of a partition, the partition_name is null
35+
private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
3336

34-
public FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
35-
this(tableId, lakeSnapshotId, new HashMap<>());
36-
}
37-
38-
public FlussTableLakeSnapshot(
39-
long tableId, long lakeSnapshotId, Map<TableBucket, Long> logEndOffsets) {
37+
FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
4038
this.tableId = tableId;
4139
this.lakeSnapshotId = lakeSnapshotId;
42-
this.logEndOffsets = logEndOffsets;
40+
this.logEndOffsets = new HashMap<>();
4341
}
4442

4543
public long tableId() {
@@ -50,12 +48,16 @@ public long lakeSnapshotId() {
5048
return lakeSnapshotId;
5149
}
5250

53-
public Map<TableBucket, Long> logEndOffsets() {
51+
public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
5452
return logEndOffsets;
5553
}
5654

5755
public void addBucketOffset(TableBucket bucket, long offset) {
58-
logEndOffsets.put(bucket, offset);
56+
logEndOffsets.put(Tuple2.of(bucket, null), offset);
57+
}
58+
59+
public void addPartitionBucketOffset(TableBucket bucket, String partitionName, long offset) {
60+
logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
5961
}
6062

6163
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.config.ConfigOptions;
2222
import com.alibaba.fluss.config.Configuration;
2323
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
24+
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
2425
import com.alibaba.fluss.metadata.TableBucket;
2526
import com.alibaba.fluss.metrics.registry.MetricRegistry;
2627
import com.alibaba.fluss.rpc.GatewayClientProxy;
@@ -33,8 +34,6 @@
3334
import com.alibaba.fluss.utils.ExceptionUtils;
3435
import com.alibaba.fluss.utils.types.Tuple2;
3536

36-
import javax.annotation.Nullable;
37-
3837
import java.io.IOException;
3938
import java.util.Map;
4039

@@ -63,7 +62,7 @@ public void open() {
6362
metadataUpdater::getCoordinatorServer, rpcClient, CoordinatorGateway.class);
6463
}
6564

66-
public void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
65+
void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
6766
try {
6867
CommitLakeTableSnapshotRequest request =
6968
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
@@ -77,10 +76,7 @@ public void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOExcep
7776
}
7877
}
7978

80-
public void commit(
81-
long tableId,
82-
@Nullable Map<String, Long> partitionIdByName,
83-
CommittedLakeSnapshot committedLakeSnapshot)
79+
public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
8480
throws IOException {
8581
// construct lake snapshot to commit to Fluss
8682
FlussTableLakeSnapshot flussTableLakeSnapshot =
@@ -89,18 +85,22 @@ public void commit(
8985
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
9086
Tuple2<Long, Integer> partitionBucket = entry.getKey();
9187
TableBucket tableBucket;
92-
if (partitionBucket.f0 == null) {
88+
Long partitionId = partitionBucket.f0;
89+
if (partitionId == null) {
9390
tableBucket = new TableBucket(tableId, partitionBucket.f1);
91+
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
9492
} else {
95-
Long partitionId = partitionBucket.f0;
96-
if (partitionId != null) {
97-
tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
98-
} else {
99-
// let's skip the bucket
100-
continue;
101-
}
93+
tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
94+
// the partition name is qualified partition name in format:
95+
// key1=value1/key2=value2.
96+
// we need to convert to partition name in format: value1$value2$
97+
String qualifiedPartitionName =
98+
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
99+
ResolvedPartitionSpec resolvedPartitionSpec =
100+
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
101+
flussTableLakeSnapshot.addPartitionBucketOffset(
102+
tableBucket, resolvedPartitionSpec.getPartitionName(), entry.getValue());
102103
}
103-
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
104104
}
105105
commit(flussTableLakeSnapshot);
106106
}
@@ -114,15 +114,19 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
114114

115115
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
116116
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
117-
for (Map.Entry<TableBucket, Long> bucketEndOffsetEntry :
117+
for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry :
118118
flussTableLakeSnapshot.logEndOffsets().entrySet()) {
119119
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
120120
pbLakeTableSnapshotInfo.addBucketsReq();
121-
TableBucket tableBucket = bucketEndOffsetEntry.getKey();
121+
TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
122+
String partitionName = bucketEndOffsetEntry.getKey().f1;
122123
long endOffset = bucketEndOffsetEntry.getValue();
123124
if (tableBucket.getPartitionId() != null) {
124125
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
125126
}
127+
if (partitionName != null) {
128+
pbLakeTableOffsetForBucket.setPartitionName(partitionName);
129+
}
126130
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
127131
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
128132
}

0 commit comments

Comments
 (0)