Skip to content

Commit c5618bc

Browse files
committed
[lake] Fix zk lake snapshot node compatible issue
1 parent de2d9a0 commit c5618bc

File tree

13 files changed

+198
-54
lines changed

13 files changed

+198
-54
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.flink.tiering.committer;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.client.metadata.MetadataUpdater;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.config.Configuration;
@@ -37,6 +38,10 @@
3738
/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
3839
public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3940

41+
// current version for commit lake snapshot is 2,
42+
// coordinator should use v2 to serialize lake snapshot
43+
private static final int CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT = 2;
44+
4045
private final Configuration flussConf;
4146

4247
private CoordinatorGateway coordinatorGateway;
@@ -63,6 +68,7 @@ void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
6368
try {
6469
CommitLakeTableSnapshotRequest request =
6570
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
71+
request.setLakeSnapshotSerializationVersion(CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT);
6672
coordinatorGateway.commitLakeTableSnapshot(request).get();
6773
} catch (Exception e) {
6874
throw new IOException(
@@ -84,7 +90,8 @@ public void commit(long tableId, long snapshotId, Map<TableBucket, Long> logEndO
8490
commit(flussTableLakeSnapshot);
8591
}
8692

87-
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
93+
@VisibleForTesting
94+
static CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
8895
FlussTableLakeSnapshot flussTableLakeSnapshot) {
8996
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
9097
new CommitLakeTableSnapshotRequest();
@@ -106,6 +113,11 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
106113
return commitLakeTableSnapshotRequest;
107114
}
108115

116+
@VisibleForTesting
117+
CoordinatorGateway getCoordinatorGateway() {
118+
return coordinatorGateway;
119+
}
120+
109121
@Override
110122
public void close() throws Exception {
111123
if (rpcClient != null) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,25 @@
2121
import org.apache.fluss.flink.utils.FlinkTestBase;
2222
import org.apache.fluss.metadata.TableBucket;
2323
import org.apache.fluss.metadata.TablePath;
24+
import org.apache.fluss.server.zk.ZooKeeperClient;
25+
import org.apache.fluss.server.zk.data.ZkData;
26+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
27+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshotJsonSerde;
28+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
29+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.fluss.utils.types.Tuple2;
2431

2532
import org.junit.jupiter.api.AfterEach;
2633
import org.junit.jupiter.api.BeforeEach;
2734
import org.junit.jupiter.params.ParameterizedTest;
2835
import org.junit.jupiter.params.provider.ValueSource;
2936

30-
import java.util.ArrayList;
37+
import java.util.Collection;
3138
import java.util.Collections;
3239
import java.util.HashMap;
33-
import java.util.List;
3440
import java.util.Map;
3541

42+
import static org.apache.fluss.flink.tiering.committer.FlussTableLakeSnapshotCommitter.toCommitLakeTableSnapshotRequest;
3643
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
3744
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
3845
import static org.assertj.core.api.Assertions.assertThat;
@@ -61,44 +68,94 @@ void afterEach() throws Exception {
6168
void testCommit(boolean isPartitioned) throws Exception {
6269
TablePath tablePath =
6370
TablePath.of("fluss", "test_commit" + (isPartitioned ? "_partitioned" : ""));
64-
long tableId =
65-
createTable(
66-
tablePath,
67-
isPartitioned
68-
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
69-
: DATA1_TABLE_DESCRIPTOR);
71+
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
72+
long tableId = tableIdAndPartitions.f0;
73+
Collection<Long> partitions = tableIdAndPartitions.f1;
7074

71-
List<String> partitions;
72-
Map<String, Long> partitionNameAndIds = new HashMap<>();
73-
if (!isPartitioned) {
74-
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
75-
partitions = Collections.singletonList(null);
76-
} else {
77-
partitionNameAndIds = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
78-
partitions = new ArrayList<>(partitionNameAndIds.keySet());
75+
Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);
76+
77+
long snapshotId = 3;
78+
// commit offsets
79+
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
80+
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
81+
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
82+
83+
// get and check the offsets
84+
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
85+
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
86+
}
87+
88+
@ParameterizedTest
89+
@ValueSource(booleans = {true, false})
90+
void testCompatibilityWithoutSerializationVersion(boolean isPartitioned) throws Exception {
91+
TablePath tablePath =
92+
TablePath.of(
93+
"fluss",
94+
"test_legacy_version_commit" + (isPartitioned ? "_partitioned" : ""));
95+
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
96+
long tableId = tableIdAndPartitions.f0;
97+
Collection<Long> partitions = tableIdAndPartitions.f1;
98+
99+
Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);
100+
101+
long snapshotId = 3;
102+
// commit offsets
103+
FlussTableLakeSnapshot flussTableLakeSnapshot =
104+
new FlussTableLakeSnapshot(tableId, snapshotId);
105+
for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
106+
flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue());
79107
}
80108

109+
// not set commit lake snapshot version to mock old version behavior
110+
flussTableLakeSnapshotCommitter
111+
.getCoordinatorGateway()
112+
.commitLakeTableSnapshot(toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot))
113+
.get();
114+
115+
// test deserialize with old version deserializer
116+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
117+
// read the the json node from lake table node
118+
JsonNode jsonNode =
119+
new ObjectMapper()
120+
.readTree(zkClient.getOrEmpty(ZkData.LakeTableZNode.path(tableId)).get());
121+
LakeTableSnapshot lakeTableSnapshot =
122+
LakeTableSnapshotJsonSerde.INSTANCE.deserializeVersion1(jsonNode);
123+
124+
// verify the deserialized lakeTableSnapshot
125+
assertThat(lakeTableSnapshot.getSnapshotId()).isEqualTo(3);
126+
assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets);
127+
}
128+
129+
private Map<TableBucket, Long> mockLogEndOffsets(long tableId, Collection<Long> partitionsIds) {
81130
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
82131
for (int bucket = 0; bucket < 3; bucket++) {
83132
long bucketOffset = bucket * bucket;
84-
for (String partitionName : partitions) {
85-
if (partitionName == null) {
133+
for (Long partitionId : partitionsIds) {
134+
if (partitionId == null) {
86135
logEndOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
87136
} else {
88-
long partitionId = partitionNameAndIds.get(partitionName);
89137
logEndOffsets.put(new TableBucket(tableId, partitionId, bucket), bucketOffset);
90138
}
91139
}
92140
}
141+
return logEndOffsets;
142+
}
93143

94-
long snapshotId = 3;
95-
// commit offsets
96-
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
97-
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
98-
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
99-
100-
// get and check the offsets
101-
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
102-
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
144+
private Tuple2<Long, Collection<Long>> createTable(TablePath tablePath, boolean isPartitioned)
145+
throws Exception {
146+
long tableId =
147+
createTable(
148+
tablePath,
149+
isPartitioned
150+
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
151+
: DATA1_TABLE_DESCRIPTOR);
152+
Collection<Long> partitions;
153+
if (!isPartitioned) {
154+
partitions = Collections.singletonList(null);
155+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
156+
} else {
157+
partitions = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath).values();
158+
}
159+
return new Tuple2<>(tableId, partitions);
103160
}
104161
}

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,19 @@ message NotifyRemoteLogOffsetsResponse {
451451

452452
message CommitLakeTableSnapshotRequest {
453453
repeated PbLakeTableSnapshotInfo tables_req = 1;
454+
// The version number for serializing lake_snapshot. This field tells the coordinator server
455+
// which version to use when serializing the lake snapshot data.
456+
//
457+
// Legacy tiering services (before this field was introduced) do not set this field. This field
458+
// is primarily used to handle compatibility during Fluss upgrades:
459+
//
460+
// - During upgrade: Fluss may use a new serialization format, but some tablet servers may not
461+
// have been upgraded yet and cannot deserialize the new format. The coordinator server can
462+
// check this field to determine which serialization format to use.
463+
// - After upgrade: Once all Fluss components are upgraded, tiering services can be updated to
464+
// use the new format. The coordinator server will recognize this field and use the new
465+
// serialization method, and all tablet servers will be able to deserialize the new format.
466+
optional int32 lake_snapshot_serialization_version = 2;
454467
}
455468

456469
message PbLakeTableSnapshotInfo {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1260,10 +1260,14 @@ private void tryProcessCommitLakeTableSnapshot(
12601260
+ tableId
12611261
+ " not found in coordinator context.");
12621262
}
1263-
1264-
// this involves IO operation (ZK), so we do it in ioExecutor
1265-
lakeTableHelper.upsertLakeTable(
1266-
tableId, tablePath, lakeTableSnapshotEntry.getValue());
1263+
if (commitLakeTableSnapshotData.getSerializationVersion() == null) {
1264+
lakeTableHelper.upsertLakeTableV1(
1265+
tableId, lakeTableSnapshotEntry.getValue());
1266+
} else {
1267+
// this involves IO operation (ZK), so we do it in ioExecutor
1268+
lakeTableHelper.upsertLakeTable(
1269+
tableId, tablePath, lakeTableSnapshotEntry.getValue());
1270+
}
12671271
} catch (Exception e) {
12681272
ApiError error = ApiError.fromThrowable(e);
12691273
tableResp.setError(error.error().code(), error.message());

fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
2222
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
2323

24+
import javax.annotation.Nullable;
25+
2426
import java.util.Map;
2527
import java.util.Objects;
2628

@@ -30,11 +32,17 @@ public class CommitLakeTableSnapshotData {
3032
private final Map<Long, LakeTableSnapshot> lakeTableSnapshots;
3133
private final Map<TableBucket, Long> tableBucketsMaxTieredTimestamp;
3234

35+
// the serialization version for lake table snapshot, will be null
36+
// before 0.8
37+
private final Integer serializationVersion;
38+
3339
public CommitLakeTableSnapshotData(
3440
Map<Long, LakeTableSnapshot> lakeTableSnapshots,
35-
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp) {
41+
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp,
42+
@Nullable Integer serializationVersion) {
3643
this.lakeTableSnapshots = lakeTableSnapshots;
3744
this.tableBucketsMaxTieredTimestamp = tableBucketsMaxTieredTimestamp;
45+
this.serializationVersion = serializationVersion;
3846
}
3947

4048
public Map<Long, LakeTableSnapshot> getLakeTableSnapshot() {
@@ -45,6 +53,10 @@ public Map<TableBucket, Long> getTableBucketsMaxTieredTimestamp() {
4553
return tableBucketsMaxTieredTimestamp;
4654
}
4755

56+
public Integer getSerializationVersion() {
57+
return serializationVersion;
58+
}
59+
4860
@Override
4961
public boolean equals(Object o) {
5062
if (this == o) {
@@ -56,12 +68,14 @@ public boolean equals(Object o) {
5668
CommitLakeTableSnapshotData that = (CommitLakeTableSnapshotData) o;
5769
return Objects.equals(lakeTableSnapshots, that.lakeTableSnapshots)
5870
&& Objects.equals(
59-
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp);
71+
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp)
72+
&& Objects.equals(serializationVersion, that.serializationVersion);
6073
}
6174

6275
@Override
6376
public int hashCode() {
64-
return Objects.hash(lakeTableSnapshots, tableBucketsMaxTieredTimestamp);
77+
return Objects.hash(
78+
lakeTableSnapshots, tableBucketsMaxTieredTimestamp, serializationVersion);
6579
}
6680

6781
@Override
@@ -71,6 +85,8 @@ public String toString() {
7185
+ lakeTableSnapshots
7286
+ ", tableBucketsMaxTieredTimestamp="
7387
+ tableBucketsMaxTieredTimestamp
88+
+ ", serializationVersion="
89+
+ serializationVersion
7490
+ '}';
7591
}
7692
}

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1574,7 +1574,13 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15741574
lakeTableInfoByTableId.put(
15751575
tableId, new LakeTableSnapshot(snapshotId, bucketLogEndOffset));
15761576
}
1577-
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp);
1577+
1578+
Integer serializationVersion =
1579+
request.hasLakeSnapshotSerializationVersion()
1580+
? request.getLakeSnapshotSerializationVersion()
1581+
: null;
1582+
return new CommitLakeTableSnapshotData(
1583+
lakeTableInfoByTableId, tableBucketsMaxTimestamp, serializationVersion);
15781584
}
15791585

15801586
public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket(

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,9 +1027,15 @@ public Optional<RemoteLogManifestHandle> getRemoteLogManifestHandle(TableBucket
10271027
}
10281028

10291029
/** Upsert the {@link LakeTable} to Zk Node. */
1030-
public void upsertLakeTable(long tableId, LakeTable lakeTable, boolean isUpdate)
1030+
public void upsertLakeTable(
1031+
long tableId, LakeTable lakeTable, boolean isUpdate, boolean isLegacyVersion)
10311032
throws Exception {
1032-
byte[] zkData = LakeTableZNode.encode(lakeTable);
1033+
byte[] zkData;
1034+
if (isLegacyVersion) {
1035+
zkData = LakeTableZNode.encodeV1(tableId, lakeTable);
1036+
} else {
1037+
zkData = LakeTableZNode.encode(lakeTable);
1038+
}
10331039
String zkPath = LakeTableZNode.path(tableId);
10341040
if (isUpdate) {
10351041
zkClient.setData().forPath(zkPath, zkData);

fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import javax.annotation.Nullable;
3333

34+
import java.io.IOException;
3435
import java.nio.charset.StandardCharsets;
3536
import java.util.Map;
3637

@@ -589,6 +590,14 @@ public static String path(long tableId) {
589590
return TableIdZNode.path(tableId) + "/laketable";
590591
}
591592

593+
/**
594+
* Encodes a {@link LakeTable} to JSON bytes using Version 1 format (legacy) for storage in
595+
* ZooKeeper.
596+
*/
597+
public static byte[] encodeV1(long tableId, LakeTable lakeTable) throws IOException {
598+
return LakeTableJsonSerde.serializeV1(tableId, lakeTable);
599+
}
600+
592601
/**
593602
* Encodes a LakeTable to JSON bytes for storage in ZK.
594603
*

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public List<LakeSnapshotMetadata> getLakeSnapshotMetadatas() {
112112
*
113113
* @return the LakeTableSnapshot
114114
*/
115-
public LakeTableSnapshot getLatestTableSnapshot() throws Exception {
115+
public LakeTableSnapshot getLatestTableSnapshot() throws IOException {
116116
if (lakeTableSnapshot != null) {
117117
return lakeTableSnapshot;
118118
}

0 commit comments

Comments
 (0)